|
32 | 32 | import io.cloudevents.core.builder.CloudEventBuilder; |
33 | 33 | import io.serverlessworkflow.api.types.Workflow; |
34 | 34 | import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; |
| 35 | +import io.serverlessworkflow.fluent.func.dsl.ListenStep; |
35 | 36 | import io.serverlessworkflow.impl.TaskContextData; |
36 | 37 | import io.serverlessworkflow.impl.WorkflowApplication; |
37 | 38 | import io.serverlessworkflow.impl.WorkflowContextData; |
38 | 39 | import io.serverlessworkflow.impl.WorkflowDefinition; |
39 | 40 | import io.serverlessworkflow.impl.WorkflowInstance; |
40 | 41 | import io.serverlessworkflow.impl.WorkflowModel; |
| 42 | +import io.serverlessworkflow.impl.WorkflowModelCollection; |
41 | 43 | import io.serverlessworkflow.impl.WorkflowStatus; |
42 | 44 | import io.serverlessworkflow.impl.events.EventPublisher; |
43 | 45 | import java.net.URI; |
@@ -107,6 +109,36 @@ void testListenToOneArray() { |
107 | 109 | .build()); |
108 | 110 | } |
109 | 111 |
|
| 112 | + @Test |
| 113 | + void testPrimitiveArray() { |
| 114 | + try (WorkflowApplication app = WorkflowApplication.builder().build()) { |
| 115 | + Workflow workflow = |
| 116 | + FuncWorkflowBuilder.workflow("doubleArray") |
| 117 | + .tasks(function(FuncEventFilterTest::doubleArray)) |
| 118 | + .build(); |
| 119 | + WorkflowModelCollection col = app.modelFactory().createCollection(); |
| 120 | + col.add(app.modelFactory().from(1)); |
| 121 | + col.add(app.modelFactory().from(2)); |
| 122 | + col.add(app.modelFactory().from(3)); |
| 123 | + assertThat( |
| 124 | + app.workflowDefinition(workflow) |
| 125 | + .instance(col) |
| 126 | + .start() |
| 127 | + .join() |
| 128 | + .as(int[].class) |
| 129 | + .orElseThrow()) |
| 130 | + .isEqualTo(new int[] {2, 4, 6}); |
| 131 | + } |
| 132 | + } |
| 133 | + |
| 134 | + private static int[] doubleArray(int[] input) { |
| 135 | + int[] output = new int[input.length]; |
| 136 | + for (int i = 0; i < input.length; i++) { |
| 137 | + output[i] = input[i] << 1; |
| 138 | + } |
| 139 | + return output; |
| 140 | + } |
| 141 | + |
110 | 142 | private Workflow reviewEmitter() { |
111 | 143 | return FuncWorkflowBuilder.workflow("emitReview") |
112 | 144 | .tasks(emitJson("draftReady", "org.acme.test.review", Review.class)) |
@@ -141,42 +173,63 @@ void sendEmail(NewsletterDraft draft) { |
141 | 173 | } |
142 | 174 |
|
143 | 175 | @Test |
144 | | - void testJacksonAutomagicalConversion() throws Exception { |
145 | | - try (WorkflowApplication app = WorkflowApplication.builder().build()) { |
| 176 | + void testAutomaticConversion() throws Exception { |
| 177 | + testConversionWorkflow( |
| 178 | + listen( |
| 179 | + "waitHumanReview", |
| 180 | + to().one( |
| 181 | + consumed("org.acme.newsletter.review.done") |
| 182 | + .extensionByInstanceId("instanceid")))); |
| 183 | + } |
146 | 184 |
|
147 | | - Workflow workflow = |
148 | | - FuncWorkflowBuilder.workflow("intelligent-newsletter") |
149 | | - .tasks( |
150 | | - function("draftAgent", this::writeDraft).exportAsTaskOutput(), |
151 | | - emitJson("draftReady", "org.acme.email.review.required", NewsletterDraft.class), |
152 | | - listen( |
153 | | - "waitHumanReview", |
154 | | - to().one( |
155 | | - consumed("org.acme.newsletter.review.done") |
156 | | - .extensionByInstanceId("instanceid"))) |
157 | | - .outputAs((Collection<?> events) -> events.iterator().next()), |
158 | | - // The engine sees the incoming JsonNode, sees this task expects |
159 | | - // HumanReview.class, |
160 | | - // and natively deserializes it for you before executing the lambda! |
161 | | - switchWhenOrElse( |
162 | | - h -> HumanReview.NEEDS_REVISION.equals(h.status()), |
163 | | - "humanEditorAgent", |
164 | | - "sendNewsletter", |
165 | | - HumanReview.class), |
166 | | - function("humanEditorAgent", this::editDraft) |
167 | | - .exportAsTaskOutput() |
168 | | - .then("draftReady"), |
169 | | - consume("sendNewsletter", this::sendEmail) |
170 | | - // Because we are in Jackson, the payload at this evaluation stage can be a |
171 | | - // Map. |
172 | | - // We simply check for the "status" field to know if it's the review payload. |
173 | | - .inputFrom( |
174 | | - (Map<String, Object> payload, |
175 | | - WorkflowContextData wfc, |
176 | | - TaskContextData tfc) -> |
177 | | - payload.containsKey("status") ? wfc.context() : payload)) |
178 | | - .build(); |
| 185 | + @Test |
| 186 | + void testCollectionConversion() throws Exception { |
| 187 | + testConversionWorkflow( |
| 188 | + listen( |
| 189 | + to().one( |
| 190 | + consumed("org.acme.newsletter.review.done") |
| 191 | + .extensionByInstanceId("instanceid"))) |
| 192 | + .outputAs((Collection<?> col) -> col.iterator().next())); |
| 193 | + } |
179 | 194 |
|
| 195 | + @Test |
| 196 | + void testNodeConversion() throws Exception { |
| 197 | + testConversionWorkflow( |
| 198 | + listen( |
| 199 | + "waitHumanReview", |
| 200 | + to().one( |
| 201 | + consumed("org.acme.newsletter.review.done") |
| 202 | + .extensionByInstanceId("instanceid"))) |
| 203 | + .outputAs((ArrayNode col) -> col.get(0))); |
| 204 | + } |
| 205 | + |
| 206 | + private void testConversionWorkflow(ListenStep listen) throws Exception { |
| 207 | + Workflow workflow = |
| 208 | + FuncWorkflowBuilder.workflow("intelligent-newsletter") |
| 209 | + .tasks( |
| 210 | + function("draftAgent", this::writeDraft).exportAsTaskOutput(), |
| 211 | + emitJson("draftReady", "org.acme.email.review.required", NewsletterDraft.class), |
| 212 | + listen, |
| 213 | + switchWhenOrElse( |
| 214 | + h -> HumanReview.NEEDS_REVISION.equals(h.status()), |
| 215 | + "humanEditorAgent", |
| 216 | + "sendNewsletter", |
| 217 | + HumanReview.class), |
| 218 | + function("humanEditorAgent", this::editDraft) |
| 219 | + .exportAsTaskOutput() |
| 220 | + .then("draftReady"), |
| 221 | + consume("sendNewsletter", this::sendEmail) |
| 222 | + // Because we are in Jackson, the payload at this evaluation stage can be a |
| 223 | + // Map. |
| 224 | + // We simply check for the "status" field to know if it's the review payload. |
| 225 | + .inputFrom( |
| 226 | + (Map<String, Object> payload, |
| 227 | + WorkflowContextData wfc, |
| 228 | + TaskContextData tfc) -> |
| 229 | + payload.containsKey("status") ? wfc.context() : payload)) |
| 230 | + .build(); |
| 231 | + |
| 232 | + try (WorkflowApplication app = WorkflowApplication.builder().build()) { |
180 | 233 | WorkflowDefinition definition = app.workflowDefinition(workflow); |
181 | 234 | WorkflowInstance instance = definition.instance(new NewsletterRequest("Tech Stocks")); |
182 | 235 | CompletableFuture<WorkflowModel> future = instance.start(); |
|
0 commit comments