diff --git a/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/WorkflowCompilerSpec.scala b/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/WorkflowCompilerSpec.scala new file mode 100644 index 00000000000..1f04382f725 --- /dev/null +++ b/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/WorkflowCompilerSpec.scala @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.compiler + +import org.apache.texera.amber.compiler.model.{LogicalLink, LogicalPlanPojo} +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType} +import org.apache.texera.amber.core.virtualidentity.WorkflowIdentity +import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext} +import org.apache.texera.amber.core.workflowruntimestate.FatalErrorType.COMPILATION_ERROR +import org.apache.texera.amber.operator.filter.{ + ComparisonType, + FilterPredicate, + SpecializedFilterOpDesc +} +import org.apache.texera.amber.operator.limit.LimitOpDesc +import org.apache.texera.amber.operator.projection.{AttributeUnit, ProjectionOpDesc} +import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc +import org.scalatest.flatspec.AnyFlatSpec + +/** + * Direct unit coverage for the editing-time [[WorkflowCompiler]]. + * + * Owns *compiler-behavior* tests — schema propagation through multi-op + * chains, lenient-mode error accumulation, terminal-storage selection. + * `WorkflowCompilationResourceSpec` owns *resource-layer* tests — HTTP + * status, response type discriminator, JSON envelope. Drawing the line + * here keeps each spec focused on a single failure axis. + * + * Bypassing the resource layer also sidesteps a separate NPE in response + * serialization (apache/texera#5021); these compiler-level tests stay + * green once that bug is fixed. + */ +class WorkflowCompilerSpec extends AnyFlatSpec { + + private def newContext(): WorkflowContext = + new WorkflowContext(workflowId = WorkflowIdentity(0)) + + private def csvOp(fileName: String): CSVScanSourceOpDesc = { + val op = new CSVScanSourceOpDesc() + op.fileName = Some(fileName) + op.customDelimiter = Some(",") + op.hasHeader = true + op + } + + private def csvOpNoFile(): CSVScanSourceOpDesc = { + val op = new CSVScanSourceOpDesc() + op.customDelimiter = Some(",") + op.hasHeader = true + op + } + + private def projectOp(columns: List[String]): ProjectionOpDesc = { + val op = new ProjectionOpDesc() + op.attributes = columns.map(name => new AttributeUnit(name, "")) + op.isDrop = false + op + } + + private def filterOp(predicates: FilterPredicate*): SpecializedFilterOpDesc = { + val op = new SpecializedFilterOpDesc + op.predicates = predicates.toList + op + } + + private def limitOp(limit: Int): LimitOpDesc = { + val op = new LimitOpDesc + op.limit = limit + op + } + + private val realCsvPath = + "workflow-compiling-service/src/test/resources/country_sales_small.csv" + + // -------------------- happy path -------------------- + + "WorkflowCompiler" should "produce a populated physicalPlan and no errors for a well-formed plan" in { + val csv = csvOp(realCsvPath) + val proj = projectOp(List("Region", "Total Profit")) + val result = new WorkflowCompiler(newContext()).compile( + LogicalPlanPojo( + operators = List(csv, proj), + links = List( + LogicalLink( + csv.operatorIdentifier, + PortIdentity(0), + proj.operatorIdentifier, + PortIdentity(0) + ) + ), + opsToViewResult = List.empty, + opsToReuseResult = List.empty + ) + ) + + assert(result.physicalPlan.isDefined, "happy path should yield a physical plan") + assert(result.operatorIdToError.isEmpty, s"unexpected errors: ${result.operatorIdToError}") + // Schema for both operators' output ports should be populated and non-null — + // this is the property whose violation triggers the resource-level NPE. + val projSchemas = result.operatorIdToOutputSchemas(proj.operatorIdentifier) + assert(projSchemas.values.forall(s => s.isDefined && s.get != null)) + } + + it should "propagate schemas through a csv -> projection -> limit -> filter -> filter -> limit chain" in { + // Real-world editing-shape: source then filter/limit/project ops. Asserts + // the compiler threads schema through every link so the frontend sees the + // projected columns at every downstream port. Previously this lived in + // WorkflowCompilationResourceSpec as an HTTP test, but the property being + // pinned is compiler-level (schema propagation) — the REST envelope adds + // no signal. + val csv = csvOp(realCsvPath) + val proj = projectOp(List("Region", "Total Profit")) + val limit1 = limitOp(10) + val filter1 = + filterOp(new FilterPredicate("Total Profit", ComparisonType.GREATER_THAN, "10000")) + val filter2 = filterOp(new FilterPredicate("Region", ComparisonType.NOT_EQUAL_TO, "JPN")) + val limit2 = limitOp(5) + + val result = new WorkflowCompiler(newContext()).compile( + LogicalPlanPojo( + operators = List(csv, proj, limit1, filter1, filter2, limit2), + links = List( + LogicalLink( + csv.operatorIdentifier, + PortIdentity(0), + proj.operatorIdentifier, + PortIdentity(0) + ), + LogicalLink( + proj.operatorIdentifier, + PortIdentity(0), + limit1.operatorIdentifier, + PortIdentity(0) + ), + LogicalLink( + limit1.operatorIdentifier, + PortIdentity(0), + filter1.operatorIdentifier, + PortIdentity(0) + ), + LogicalLink( + filter1.operatorIdentifier, + PortIdentity(0), + filter2.operatorIdentifier, + PortIdentity(0) + ), + LogicalLink( + filter2.operatorIdentifier, + PortIdentity(0), + limit2.operatorIdentifier, + PortIdentity(0) + ) + ), + opsToViewResult = List.empty, + opsToReuseResult = List.empty + ) + ) + + assert(result.physicalPlan.isDefined) + assert(result.operatorIdToError.isEmpty, s"unexpected errors: ${result.operatorIdToError}") + // Projection narrowed [Region, Country, ..., Total Profit] down to two + // columns; every downstream op should see exactly those two attributes. + val filter2Schemas = result.operatorIdToOutputSchemas(filter2.operatorIdentifier) + val outputAttrs = filter2Schemas(PortIdentity(0)).get.attributes + assert( + outputAttrs == List( + new Attribute("Region", AttributeType.STRING), + new Attribute("Total Profit", AttributeType.DOUBLE) + ), + s"projected schema should reach filter2 unchanged, got $outputAttrs" + ) + } + + // -------------------- lenient-mode error accumulation -------------------- + + // The frontend relies on `compile` *never throwing*: a user mid-edit + // routinely produces an inconsistent plan and the editing UI must render + // structured per-operator errors. These tests pin the contract. + + "WorkflowCompiler" should "accumulate, not throw, when a scan source has no fileName" in { + val orphan = csvOpNoFile() + + val result = new WorkflowCompiler(newContext()).compile( + LogicalPlanPojo( + operators = List(orphan), + links = List.empty, + opsToViewResult = List.empty, + opsToReuseResult = List.empty + ) + ) + + assert(result.physicalPlan.isEmpty, "any error must clear the physical plan") + val err = result.operatorIdToError(orphan.operatorIdentifier) + assert(err.`type` == COMPILATION_ERROR) + assert(err.operatorId == orphan.operatorIdentifier.id) + assert(err.message.contains("no input file name"), s"unexpected message: ${err.message}") + assert(err.details.nonEmpty, "stack-trace details should be populated for UI display") + } + + it should "accumulate when a scan source's fileName points to a non-existent path" in { + val broken = csvOp("/does/not/exist/missing.csv") + + val result = new WorkflowCompiler(newContext()).compile( + LogicalPlanPojo( + operators = List(broken), + links = List.empty, + opsToViewResult = List.empty, + opsToReuseResult = List.empty + ) + ) + + assert(result.physicalPlan.isEmpty) + assert(result.operatorIdToError.contains(broken.operatorIdentifier)) + // FileResolver surfaces FileNotFoundException for a missing local path; + // pin the "does not exist" substring from that contract rather than a + // different file-type error wording. + assert( + result.operatorIdToError(broken.operatorIdentifier).message.contains("does not exist"), + s"unexpected message: ${result.operatorIdToError(broken.operatorIdentifier).message}" + ) + } + + it should "accumulate a per-operator error when projection references a non-existent attribute" in { + val csv = csvOp(realCsvPath) + val badProjection = projectOp(List("DoesNotExist")) + + val result = new WorkflowCompiler(newContext()).compile( + LogicalPlanPojo( + operators = List(csv, badProjection), + links = List( + LogicalLink( + csv.operatorIdentifier, + PortIdentity(0), + badProjection.operatorIdentifier, + PortIdentity(0) + ) + ), + opsToViewResult = List.empty, + opsToReuseResult = List.empty + ) + ) + + assert(result.physicalPlan.isEmpty) + assert( + result.operatorIdToError.contains(badProjection.operatorIdentifier), + s"projection should be in errors, got ${result.operatorIdToError.keySet}" + ) + // The upstream csv ran fine, so its output schema should still be present + // — partial progress is the whole point of lenient mode. + assert( + result.operatorIdToOutputSchemas.contains(csv.operatorIdentifier), + "upstream csv's schemas should be retained even when downstream fails" + ) + } + + it should "not throw when given an empty plan" in { + val result = new WorkflowCompiler(newContext()).compile( + LogicalPlanPojo( + operators = List.empty, + links = List.empty, + opsToViewResult = List.empty, + opsToReuseResult = List.empty + ) + ) + assert(result.operatorIdToError.isEmpty) + assert(result.operatorIdToOutputSchemas.isEmpty) + assert(result.physicalPlan.isDefined, "an empty plan compiles to an empty physical plan") + assert(result.physicalPlan.get.operators.isEmpty) + assert(result.physicalPlan.get.links.isEmpty) + } + + // -------------------- multi-error accumulation -------------------- + + // Re-anchor the subject after the sub-section. + "WorkflowCompiler" should "accumulate errors for multiple unrelated failing ops in one compile" in { + val orphan1 = csvOpNoFile() + val orphan2 = csvOpNoFile() + + val result = new WorkflowCompiler(newContext()).compile( + LogicalPlanPojo( + operators = List(orphan1, orphan2), + links = List.empty, + opsToViewResult = List.empty, + opsToReuseResult = List.empty + ) + ) + + assert(result.physicalPlan.isEmpty) + // Both ops must appear in the error map — the frontend renders per-op + // diagnostics in parallel, so swallowing all-but-one would silently break + // multi-error workflows. + assert( + result.operatorIdToError.contains(orphan1.operatorIdentifier) && + result.operatorIdToError.contains(orphan2.operatorIdentifier), + s"expected both csvs in errors, got ${result.operatorIdToError.keySet}" + ) + } +} diff --git a/workflow-compiling-service/src/test/scala/org/apache/texera/service/resource/WorkflowCompilationResourceSpec.scala b/workflow-compiling-service/src/test/scala/org/apache/texera/service/resource/WorkflowCompilationResourceSpec.scala index 17e231270c0..c1e7588ad0e 100644 --- a/workflow-compiling-service/src/test/scala/org/apache/texera/service/resource/WorkflowCompilationResourceSpec.scala +++ b/workflow-compiling-service/src/test/scala/org/apache/texera/service/resource/WorkflowCompilationResourceSpec.scala @@ -24,23 +24,25 @@ import io.dropwizard.testing.junit5.ResourceExtension import jakarta.ws.rs.client.Entity import jakarta.ws.rs.core.{MediaType, Response} import org.apache.texera.amber.compiler.model.{LogicalLink, LogicalPlanPojo} -import org.apache.texera.amber.core.tuple.{Attribute, AttributeType} import org.apache.texera.amber.core.workflow.PortIdentity -import org.apache.texera.amber.operator.filter.{ - ComparisonType, - FilterOpDesc, - FilterPredicate, - SpecializedFilterOpDesc -} -import org.apache.texera.amber.operator.limit.LimitOpDesc import org.apache.texera.amber.operator.projection.{AttributeUnit, ProjectionOpDesc} import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc import org.apache.texera.amber.util.JSONUtils.objectMapper -import org.apache.texera.amber.util.serde.PortIdentityKeySerializer import org.assertj.core.api.Assertions.assertThat import org.scalatest.BeforeAndAfterAll import org.scalatest.flatspec.AnyFlatSpec +/** + * Resource-layer tests for `/compile`. Owns only what the REST envelope + * adds on top of compilation itself: HTTP status, the + * `@JsonTypeInfo` discriminator that lets the frontend route success vs + * failure, and the JSON shape the resource expects on the wire. + * + * All compiler-behavior assertions (schema propagation, lenient-mode + * error accumulation, multi-op chains) live in + * `org.apache.texera.amber.compiler.WorkflowCompilerSpec` so a regression + * lands on the right spec. + */ class WorkflowCompilationResourceSpec extends AnyFlatSpec with BeforeAndAfterAll { private val resources: ResourceExtension = ResourceExtension @@ -48,173 +50,104 @@ class WorkflowCompilationResourceSpec extends AnyFlatSpec with BeforeAndAfterAll .addResource(new WorkflowCompilationResource()) .setMapper(objectMapper) .build() - override protected def beforeAll(): Unit = { - resources.before() - } - - override protected def afterAll(): Unit = { - resources.after() - } - // utility function to create a csv scan op - private def getCsvScanOpDesc( - fileName: String, - header: Boolean - ): CSVScanSourceOpDesc = { - val csvOp = new CSVScanSourceOpDesc() - csvOp.fileName = Some(fileName) - csvOp.customDelimiter = Some(",") - csvOp.hasHeader = header - csvOp - } + override protected def beforeAll(): Unit = resources.before() + override protected def afterAll(): Unit = resources.after() - // utility function to create a projection op - private def getProjectionOpDesc( - attributeNames: List[String], - isDrop: Boolean = false - ): ProjectionOpDesc = { - val projectionOpDesc = new ProjectionOpDesc() - projectionOpDesc.attributes = attributeNames.map(name => new AttributeUnit(name, "")) - projectionOpDesc.isDrop = isDrop - projectionOpDesc - } + private val realCsvPath = + "workflow-compiling-service/src/test/resources/country_sales_small.csv" - // utility function to create a limit op - private def getLimitOpDesc( - limit: Int - ): LimitOpDesc = { - val limitOpDesc = new LimitOpDesc - limitOpDesc.limit = limit - limitOpDesc + private def csvOp(fileName: String): CSVScanSourceOpDesc = { + val op = new CSVScanSourceOpDesc() + op.fileName = Some(fileName) + op.customDelimiter = Some(",") + op.hasHeader = true + op } - // utility function to create a filter op - private def getFilterOpDesc( - filterPredicates: List[FilterPredicate] - ): FilterOpDesc = { - val filterOpDesc = new SpecializedFilterOpDesc - filterOpDesc.predicates = filterPredicates - filterOpDesc + private def projectOp(columns: List[String]): ProjectionOpDesc = { + val op = new ProjectionOpDesc() + op.attributes = columns.map(name => new AttributeUnit(name, "")) + op.isDrop = false + op } - // utility function to transform a logical plan pojo to json that can be deserialized correctly by the compile endpoint - private def transformLogicalPlanPojoToJsonString(logicalPlanPojo: LogicalPlanPojo): String = { - val jsonNode = objectMapper.valueToTree[ObjectNode](logicalPlanPojo) - - // iterate over the "links" array and replace nested "id" fields + // The frontend serializes LogicalLink with `fromOpId` / `toOpId` as flat + // strings, but the Scala case class stores them as nested `OperatorIdentity` + // records. This helper mirrors the wire shape so the test exercises the + // resource's actual JSON contract instead of a Scala-only round trip. + private def encodePojoAsFrontendJson(pojo: LogicalPlanPojo): String = { + val jsonNode = objectMapper.valueToTree[ObjectNode](pojo) val linksArray = jsonNode.withArray("links") linksArray.forEach { linkNode => - // replace "fromOpId" with its "id" field value val fromOpIdNode = linkNode.get("fromOpId") linkNode.asInstanceOf[ObjectNode].put("fromOpId", fromOpIdNode.get("id").asText()) - - // replace "toOpId" with its "id" field value if it exists val toOpIdNode = linkNode.get("toOpId") linkNode.asInstanceOf[ObjectNode].put("toOpId", toOpIdNode.get("id").asText()) } - - // convert the modified JSON node back to a string objectMapper.writeValueAsString(jsonNode) } - // utility function for asserting the successful compilation - private def assertSuccessfulCompilation(response: Response): WorkflowCompilationSuccess = { - val responseBody = response.readEntity(classOf[String]) - val compilationResponse = - objectMapper.readValue(responseBody, classOf[WorkflowCompilationResponse]) - - assertThat(compilationResponse.asInstanceOf[WorkflowCompilationSuccess]) - compilationResponse.asInstanceOf[WorkflowCompilationSuccess] + private def postCompile(pojo: LogicalPlanPojo): Response = + resources + .target("/compile") + .request(MediaType.APPLICATION_JSON) + .post(Entity.json(encodePojoAsFrontendJson(pojo))) + + "POST /compile" should "return HTTP 200 for a well-formed plan" in { + val csv = csvOp(realCsvPath) + val proj = projectOp(List("Region", "Total Profit")) + val response = postCompile( + LogicalPlanPojo( + operators = List(csv, proj), + links = List( + LogicalLink( + csv.operatorIdentifier, + PortIdentity(0), + proj.operatorIdentifier, + PortIdentity(0) + ) + ), + opsToViewResult = List.empty, + opsToReuseResult = List.empty + ) + ) + assertThat(response.getStatus).isEqualTo(200) } - it should "compile workflow successfully with multiple filter and limit operations" in { - // construct the LogicalPlan: CSVScan --> Projection --> Limit --> Filter (TotalProfit > 10000) --> Filter (Region != "JPN") --> Limit - val localCsvFilePath = - "workflow-compiling-service/src/test/resources/country_sales_small.csv" - val csvSourceOp = getCsvScanOpDesc(localCsvFilePath, header = true) - val projectionOpDesc = getProjectionOpDesc(List("Region", "Total Profit")) - val limitOpDesc1 = getLimitOpDesc(10) - - // Create the filter predicate for TotalProfit > 10000 - val filterPredicate1 = new FilterPredicate("Total Profit", ComparisonType.GREATER_THAN, "10000") - val filterOpDesc1 = getFilterOpDesc(List(filterPredicate1)) - - // Create the filter predicate for Region != "JPN" - val filterPredicate2 = new FilterPredicate("Region", ComparisonType.NOT_EQUAL_TO, "JPN") - val filterOpDesc2 = getFilterOpDesc(List(filterPredicate2)) - - // Add a second limit operation - val limitOpDesc2 = getLimitOpDesc(5) - - val logicalPlanPojo = LogicalPlanPojo( - operators = List( - csvSourceOp, - projectionOpDesc, - limitOpDesc1, - filterOpDesc1, - filterOpDesc2, - limitOpDesc2 - ), - links = List( - LogicalLink( - csvSourceOp.operatorIdentifier, - PortIdentity(0), - projectionOpDesc.operatorIdentifier, - PortIdentity(0) + it should "tag the response body with type=success so the frontend can route on the discriminator" in { + // The @JsonTypeInfo on WorkflowCompilationResponse writes a `type` field. + // Both polymorphic deserialization (round-tripping through the trait) and + // a raw-JSON `type == "success"` check need to hold so the Angular client + // can branch without depending on Scala class names. + val csv = csvOp(realCsvPath) + val proj = projectOp(List("Region", "Total Profit")) + val response = postCompile( + LogicalPlanPojo( + operators = List(csv, proj), + links = List( + LogicalLink( + csv.operatorIdentifier, + PortIdentity(0), + proj.operatorIdentifier, + PortIdentity(0) + ) ), - LogicalLink( - projectionOpDesc.operatorIdentifier, - PortIdentity(0), - limitOpDesc1.operatorIdentifier, - PortIdentity(0) - ), - LogicalLink( - limitOpDesc1.operatorIdentifier, - PortIdentity(0), - filterOpDesc1.operatorIdentifier, - PortIdentity(0) - ), - LogicalLink( - filterOpDesc1.operatorIdentifier, - PortIdentity(0), - filterOpDesc2.operatorIdentifier, - PortIdentity(0) - ), - LogicalLink( - filterOpDesc2.operatorIdentifier, - PortIdentity(0), - limitOpDesc2.operatorIdentifier, - PortIdentity(0) - ) - ), - opsToViewResult = List(), - opsToReuseResult = List() + opsToViewResult = List.empty, + opsToReuseResult = List.empty + ) ) - // transform the LogicalPlanPojo to a modified JSON string - val modifiedLogicalPlanJsonString = transformLogicalPlanPojoToJsonString(logicalPlanPojo) - - // send the request to compile endpoint - val response = resources - .target("/compile") - .request(MediaType.APPLICATION_JSON) - .post(Entity.json(modifiedLogicalPlanJsonString)) - - assertThat(response.getStatus).isEqualTo(200) - - // verify the schema is correctly propagated for the final limit operator - val compilationResult = assertSuccessfulCompilation(response) - val finalLimitInputSchema = - compilationResult.operatorOutputSchemas(filterOpDesc2.operatorIdentifier.id) + val body = response.readEntity(classOf[String]) + val node = objectMapper.readTree(body) assert( - finalLimitInputSchema( - PortIdentityKeySerializer.portIdToString(PortIdentity(id = 0, internal = false)) - ).get.equals( - List( - new Attribute("Region", AttributeType.STRING), - new Attribute("Total Profit", AttributeType.DOUBLE) - ) - ) + node.has("type") && node.get("type").asText() == "success", + s"expected type:success discriminator, got $body" ) + val parsed = objectMapper.readValue(body, classOf[WorkflowCompilationResponse]) + assert(parsed.isInstanceOf[WorkflowCompilationSuccess]) + val success = parsed.asInstanceOf[WorkflowCompilationSuccess] + assert(success.physicalPlan != null) + assert(success.operatorOutputSchemas.nonEmpty) } }