Skip to content

Commit 45ccaae

Browse files
Refactored messaging flows.
1 parent db8f40d commit 45ccaae

File tree

8 files changed

+143
-93
lines changed

8 files changed

+143
-93
lines changed

onixlabs-corda-core-workflow/src/main/kotlin/io/onixlabs/corda/core/workflow/Extensions.FlowLogic.Transaction.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,7 @@ fun FlowLogic<*>.collectSignatures(transaction: SignedTransaction, sessions: Ite
155155

156156
sessions.forEach { it.send(it in signingSessions) }
157157

158-
return if (signingSessions.isEmpty()) transaction
159-
else subFlow(
158+
return if (signingSessions.isEmpty()) transaction else subFlow(
160159
CollectSignaturesFlow(transaction, signingSessions, CollectTransactionSignaturesStep.childProgressTracker())
161160
)
162161
}

onixlabs-corda-core-workflow/src/main/kotlin/io/onixlabs/corda/core/workflow/Message.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,4 @@ import java.util.*
2727
* @property id The identity of the message.
2828
*/
2929
@CordaSerializable
30-
open class Message<T>(val data: T, val id: UUID = UUID.randomUUID()) where T : Any
30+
open class Message<T : Any>(val data: T, val id: UUID = UUID.randomUUID())

onixlabs-corda-core-workflow/src/main/kotlin/io/onixlabs/corda/core/workflow/ReceiveMessageAcknowledgementFlow.kt

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,23 @@ import net.corda.core.utilities.unwrap
3131
* @param session The flow session with the counter-party who is sending the message acknowledgement.
3232
* @property progressTracker The progress tracker that will be used to track the progress of communication in this flow.
3333
*/
34-
class ReceiveMessageAcknowledgementFlow<T>(
34+
class ReceiveMessageAcknowledgementFlow<T : MessageAcknowledgement>(
3535
private val session: FlowSession,
3636
override val progressTracker: ProgressTracker = tracker()
3737
) : FlowLogic<Pair<Party, T>>() {
3838

3939
companion object {
4040
@JvmStatic
41-
fun tracker() = ProgressTracker(RECEIVING)
41+
fun tracker() = ProgressTracker(ReceivingMessageAcknowledgementStep)
4242

43-
private object RECEIVING : Step("Receiving message acknowledgement from counter-party.")
43+
private object ReceivingMessageAcknowledgementStep :
44+
Step("Receiving message acknowledgement from counter-party.")
4445
}
4546

4647
@Suspendable
4748
@Suppress("UNCHECKED_CAST")
4849
override fun call(): Pair<Party, T> {
49-
currentStep(RECEIVING, additionalLogInfo = session.counterparty.toString())
50+
currentStep(ReceivingMessageAcknowledgementStep, additionalLogInfo = session.counterparty.toString())
5051
return session.counterparty to session.receive<MessageAcknowledgement>().unwrap { it as T }
5152
}
5253

@@ -56,20 +57,25 @@ class ReceiveMessageAcknowledgementFlow<T>(
5657
* @param session The flow session with the counter-party who is sending the message acknowledgement.
5758
*/
5859
@InitiatedBy(SendMessageAcknowledgementFlow.Initiator::class)
59-
class Receiver<T>(private val session: FlowSession) : FlowLogic<Pair<Party, T>>() where T : MessageAcknowledgement {
60+
class Receiver<T : MessageAcknowledgement>(private val session: FlowSession) : FlowLogic<Pair<Party, T>>() {
6061

6162
private companion object {
62-
object RECEIVING : Step("Receiving message acknowledgement.") {
63+
object ReceivingMessageAcknowledgementStep : Step("Receiving message acknowledgement.") {
6364
override fun childProgressTracker() = tracker()
6465
}
6566
}
6667

67-
override val progressTracker = ProgressTracker(RECEIVING)
68+
override val progressTracker = ProgressTracker(ReceivingMessageAcknowledgementStep)
6869

6970
@Suspendable
7071
override fun call(): Pair<Party, T> {
71-
currentStep(RECEIVING)
72-
return subFlow(ReceiveMessageAcknowledgementFlow(session, RECEIVING.childProgressTracker()))
72+
currentStep(ReceivingMessageAcknowledgementStep)
73+
return subFlow(
74+
ReceiveMessageAcknowledgementFlow(
75+
session,
76+
ReceivingMessageAcknowledgementStep.childProgressTracker()
77+
)
78+
)
7379
}
7480
}
7581
}

onixlabs-corda-core-workflow/src/main/kotlin/io/onixlabs/corda/core/workflow/ReceiveMessageFlow.kt

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,32 +33,40 @@ import net.corda.core.utilities.unwrap
3333
* @param requestAcknowledgement Specifies whether the counter-parties are required to acknowledge the message.
3434
* @property progressTracker The progress tracker that will be used to track the progress of communication in this flow.
3535
*/
36-
class ReceiveMessageFlow<T>(
36+
class ReceiveMessageFlow<T : Message<*>>(
3737
private val session: FlowSession,
3838
private val requestAcknowledgement: Boolean = false,
3939
override val progressTracker: ProgressTracker = tracker()
40-
) : FlowLogic<Pair<Party, T>>() where T : Message<*> {
40+
) : FlowLogic<Pair<Party, T>>() {
4141

4242
companion object {
4343
@JvmStatic
44-
fun tracker() = ProgressTracker(RECEIVING, SENDING)
44+
fun tracker() = ProgressTracker(ReceivingMessageStep, SendingMessageAcknowledgementStep)
4545

46-
private object RECEIVING : Step("Receiving message from counter-party.")
47-
private object SENDING : Step("Sending message acknowledgement to counter-party.")
46+
private object ReceivingMessageStep : Step("Receiving message from counter-party.")
47+
private object SendingMessageAcknowledgementStep : Step("Sending message acknowledgement to counter-party.")
4848
}
4949

5050
@Suspendable
51-
@Suppress("UNCHECKED_CAST")
5251
override fun call(): Pair<Party, T> {
53-
currentStep(RECEIVING, additionalLogInfo = session.counterparty.toString())
54-
val message = session.receive<Message<*>>().unwrap { it as T }
52+
val message = receiveMessageFromCounterparty()
53+
sendMessageAcknowledgementToCounterparty(message)
54+
return session.counterparty to message
55+
}
56+
57+
@Suspendable
58+
@Suppress("UNCHECKED_CAST")
59+
private fun receiveMessageFromCounterparty(): T {
60+
currentStep(ReceivingMessageStep, additionalLogInfo = session.counterparty.toString())
61+
return session.receive<Message<*>>().unwrap { it as T }
62+
}
5563

64+
@Suspendable
65+
private fun sendMessageAcknowledgementToCounterparty(message: T) {
5666
if (requestAcknowledgement) {
57-
currentStep(SENDING, additionalLogInfo = session.counterparty.toString())
67+
currentStep(SendingMessageAcknowledgementStep, additionalLogInfo = session.counterparty.toString())
5868
session.send(MessageAcknowledgement(message.id))
5969
}
60-
61-
return session.counterparty to message
6270
}
6371

6472
/**
@@ -68,20 +76,20 @@ class ReceiveMessageFlow<T>(
6876
* @param session The flow session with the counter-party who is sending the message.
6977
*/
7078
@InitiatedBy(SendMessageFlow.Initiator::class)
71-
class Receiver<T>(private val session: FlowSession) : FlowLogic<Pair<Party, T>>() where T : Message<*> {
79+
class Receiver<T : Message<*>>(private val session: FlowSession) : FlowLogic<Pair<Party, T>>() {
7280

7381
private companion object {
74-
object RECEIVING : Step("Receiving message.") {
82+
object ReceivingMessageStep : Step("Receiving message.") {
7583
override fun childProgressTracker() = tracker()
7684
}
7785
}
7886

79-
override val progressTracker = ProgressTracker(RECEIVING)
87+
override val progressTracker = ProgressTracker(ReceivingMessageStep)
8088

8189
@Suspendable
8290
override fun call(): Pair<Party, T> {
83-
currentStep(RECEIVING)
84-
return subFlow(ReceiveMessageFlow(session, false, RECEIVING.childProgressTracker()))
91+
currentStep(ReceivingMessageStep)
92+
return subFlow(ReceiveMessageFlow(session, false, ReceivingMessageStep.childProgressTracker()))
8593
}
8694
}
8795
}

onixlabs-corda-core-workflow/src/main/kotlin/io/onixlabs/corda/core/workflow/ReceiveTransactionNoteFlow.kt

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,43 +31,52 @@ import net.corda.core.utilities.unwrap
3131
* Represents a sub-flow that receives a transaction note from the specified counter-party.
3232
*
3333
* @param session The flow session with the counter-party who is sending the transaction note.
34-
* @param persist Determines whether the transaction note should be persisted in the local node's vault.
34+
* @param addTransactionNoteToVault Determines whether the transaction note should be added in the local node's vault.
3535
* @param expectedTransactionId An optional, expected transaction ID which can be checked upon receiving the note.
3636
* @property progressTracker The progress tracker that will be used to track the progress of communication in this flow.
3737
*/
3838
class ReceiveTransactionNoteFlow(
3939
private val session: FlowSession,
40-
private val persist: Boolean = true,
40+
private val addTransactionNoteToVault: Boolean = true,
4141
private val expectedTransactionId: SecureHash? = null,
4242
override val progressTracker: ProgressTracker = tracker()
4343
) : FlowLogic<Pair<Party, TransactionNote>>() {
4444

4545
companion object {
4646
@JvmStatic
47-
fun tracker() = ProgressTracker(RECEIVING, ADDING)
47+
fun tracker() = ProgressTracker(ReceivingTransactionNoteStep, AddingTransactionNoteStep)
4848

49-
private object ADDING : Step("Adding note to transaction.")
50-
private object RECEIVING : Step("Receiving transaction note from counter-party.")
49+
private object AddingTransactionNoteStep : Step("Adding note to transaction.")
50+
private object ReceivingTransactionNoteStep : Step("Receiving transaction note from counter-party.")
5151
}
5252

5353
@Suspendable
5454
override fun call(): Pair<Party, TransactionNote> {
55-
currentStep(RECEIVING, additionalLogInfo = session.counterparty.toString())
56-
val transactionNote = session.receive<TransactionNote>().unwrap { it }
55+
val transactionNote = receiveTransactionNoteFromCounterparty()
56+
checkExpectedTransactionNote(transactionNote)
57+
addTransactionNoteToVault(transactionNote)
58+
return session.counterparty to transactionNote
59+
}
60+
61+
@Suspendable
62+
private fun receiveTransactionNoteFromCounterparty(): TransactionNote {
63+
currentStep(ReceivingTransactionNoteStep, additionalLogInfo = session.counterparty.toString())
64+
return session.receive<TransactionNote>().unwrap { it }
65+
}
5766

67+
@Suspendable
68+
private fun checkExpectedTransactionNote(transactionNote: TransactionNote) {
5869
if (expectedTransactionId != null && expectedTransactionId != transactionNote.transactionId) {
59-
with("Received an unexpected transaction ID from counter-party: ${session.counterparty}.") {
60-
logger.error(this)
61-
throw FlowException(this)
62-
}
70+
throw FlowException("Received an unexpected transaction ID from counter-party: ${session.counterparty}.")
6371
}
72+
}
6473

65-
if (persist) {
66-
currentStep(ADDING, additionalLogInfo = transactionNote.transactionId.toString())
74+
@Suspendable
75+
private fun addTransactionNoteToVault(transactionNote: TransactionNote) {
76+
if (addTransactionNoteToVault) {
77+
currentStep(AddingTransactionNoteStep, additionalLogInfo = transactionNote.transactionId.toString())
6778
serviceHub.vaultService.addNoteToTransaction(transactionNote.transactionId, transactionNote.text)
6879
}
69-
70-
return session.counterparty to transactionNote
7180
}
7281

7382
/**
@@ -79,17 +88,22 @@ class ReceiveTransactionNoteFlow(
7988
class Receiver(private val session: FlowSession) : FlowLogic<Pair<Party, TransactionNote>>() {
8089

8190
private companion object {
82-
object RECEIVING : Step("Receiving transaction note.") {
91+
object ReceivingTransactionNoteStep : Step("Receiving transaction note.") {
8392
override fun childProgressTracker() = tracker()
8493
}
8594
}
8695

87-
override val progressTracker = ProgressTracker(RECEIVING)
96+
override val progressTracker = ProgressTracker(ReceivingTransactionNoteStep)
8897

8998
@Suspendable
9099
override fun call(): Pair<Party, TransactionNote> {
91-
currentStep(RECEIVING)
92-
return subFlow(ReceiveTransactionNoteFlow(session, progressTracker = RECEIVING.childProgressTracker()))
100+
currentStep(ReceivingTransactionNoteStep)
101+
return subFlow(
102+
ReceiveTransactionNoteFlow(
103+
session,
104+
progressTracker = ReceivingTransactionNoteStep.childProgressTracker()
105+
)
106+
)
93107
}
94108
}
95109
}

onixlabs-corda-core-workflow/src/main/kotlin/io/onixlabs/corda/core/workflow/SendMessageAcknowledgementFlow.kt

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,24 +29,24 @@ import net.corda.core.utilities.ProgressTracker.Step
2929
* @param session The session with the counter-party who will receive the message acknowledgement.
3030
* @property progressTracker The progress tracker that will be used to track the progress of communication in this flow.
3131
*/
32-
class SendMessageAcknowledgementFlow<T>(
32+
class SendMessageAcknowledgementFlow<T : MessageAcknowledgement>(
3333
private val acknowledgement: T,
3434
private val session: FlowSession,
3535
override val progressTracker: ProgressTracker = tracker()
36-
) : FlowLogic<Unit>() where T : MessageAcknowledgement {
36+
) : FlowLogic<Unit>() {
3737

3838
companion object {
3939
@JvmStatic
40-
fun tracker() = ProgressTracker(SENDING)
40+
fun tracker() = ProgressTracker(SendingMessageAcknowledgementStep)
4141

4242
private const val FLOW_VERSION_1 = 1
4343

44-
private object SENDING : Step("Sending message acknowledgement to counter-party.")
44+
private object SendingMessageAcknowledgementStep : Step("Sending message acknowledgement to counter-party.")
4545
}
4646

4747
@Suspendable
4848
override fun call() {
49-
currentStep(SENDING, additionalLogInfo = session.counterparty.toString())
49+
currentStep(SendingMessageAcknowledgementStep, additionalLogInfo = session.counterparty.toString())
5050
session.send(acknowledgement)
5151
}
5252

@@ -59,24 +59,30 @@ class SendMessageAcknowledgementFlow<T>(
5959
@StartableByRPC
6060
@StartableByService
6161
@InitiatingFlow(version = FLOW_VERSION_1)
62-
class Initiator<T>(
62+
class Initiator<T : MessageAcknowledgement>(
6363
private val acknowledgement: T,
6464
private val counterparty: Party
65-
) : FlowLogic<Unit>() where T : MessageAcknowledgement {
65+
) : FlowLogic<Unit>() {
6666

6767
private companion object {
68-
private object SENDING : Step("Sending message acknowledgement.") {
69-
override fun childProgressTracker() = tracker()
68+
private object SendingMessageAcknowledgementStep : Step("Sending message acknowledgement.") {
69+
override fun childProgressTracker(): ProgressTracker = tracker()
7070
}
7171
}
7272

73-
override val progressTracker = ProgressTracker(SENDING)
73+
override val progressTracker = ProgressTracker(SendingMessageAcknowledgementStep)
7474

7575
@Suspendable
7676
override fun call() {
77-
currentStep(SENDING)
77+
currentStep(SendingMessageAcknowledgementStep)
7878
val session = initiateFlow(counterparty)
79-
subFlow(SendMessageAcknowledgementFlow(acknowledgement, session, SENDING.childProgressTracker()))
79+
subFlow(
80+
SendMessageAcknowledgementFlow(
81+
acknowledgement,
82+
session,
83+
SendingMessageAcknowledgementStep.childProgressTracker()
84+
)
85+
)
8086
}
8187
}
8288
}

0 commit comments

Comments
 (0)