From 340f70ad78e7f109200a4d172391ef51215f04dc Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 23 Jun 2026 20:29:19 -0700 Subject: [PATCH 1/2] refactor(execution-service): consolidate init-error reporting into WorkflowExecutionService Per #5921 (follow-up to #5781): make error reporting a single site owned by WorkflowExecutionService instead of the two-phase split in WorkflowService.initExecutionService. - WorkflowExecutionService registers its fatalErrors -> WorkflowErrorEvent diff handler as the FIRST construction action. Construction does no external work and cannot throw (workflowSettings assignment, WebsocketInput creation, handler registration); all throwing work is in executeWorkflow(), which runs after the execution is published, so its failures surface through this same handler. - WorkflowService.initExecutionService drops the pre-publish errorSubject fallback (reportFatalErrorsToSubscribers + the executionPublished gating); the catch is simply errorHandler(e). Removes the now-unused errorSubject field and its connect() subscription. - Remove WorkflowServiceSpec (it only tested the removed reportFatalErrorsToSubscribers); the behavior is exercised by the integration/e2e suites. Resolves #5921. --- .../service/WorkflowExecutionService.scala | 12 ++- .../texera/web/service/WorkflowService.scala | 45 ++-------- .../web/service/WorkflowServiceSpec.scala | 85 ------------------- 3 files changed, 18 insertions(+), 124 deletions(-) delete mode 100644 amber/src/test/scala/org/apache/texera/web/service/WorkflowServiceSpec.scala diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala index 741687e02c9..a3bee231280 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala @@ -66,9 +66,12 @@ class WorkflowExecutionService( ) extends SubscriptionManager with LazyLogging { - workflowContext.workflowSettings = request.workflowSettings - val wsInput = new WebsocketInput(errorHandler) - + // Wire error/state reporting first, before any other construction work, so a + // fatalErrors update (recorded by errorHandler) always has an emitter. + // Construction itself does no external work and cannot throw; the throwing + // work lives in executeWorkflow(), which runs after this execution is + // published, so its failures reach the UI through this same handler -- no + // separate pre-publish reporting path is needed. addSubscription( executionStateStore.metadataStore.registerDiffHandler((oldState, newState) => { val outputEvents = new mutable.ArrayBuffer[TexeraWebSocketEvent]() @@ -85,6 +88,9 @@ class WorkflowExecutionService( }) ) + workflowContext.workflowSettings = request.workflowSettings + val wsInput = new WebsocketInput(errorHandler) + private def createStateEvent(state: ExecutionMetadataStore): WorkflowStateEvent = { if (state.isRecovering && state.state != COMPLETED) { WorkflowStateEvent("Recovering") diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala index 90934287ebb..f659dc7af2f 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala @@ -50,7 +50,7 @@ import org.apache.texera.amber.error.ErrorUtils.{ } import org.apache.texera.dao.jooq.generated.tables.pojos.User import org.apache.texera.service.util.LargeBinaryManager -import org.apache.texera.web.model.websocket.event.{TexeraWebSocketEvent, WorkflowErrorEvent} +import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource import org.apache.texera.web.service.WorkflowService.mkWorkflowStateId @@ -101,7 +101,6 @@ class WorkflowService( with LazyLogging { // state across execution: - private val errorSubject = BehaviorSubject.create[TexeraWebSocketEvent]().toSerialized val stateStore = new WorkflowStateStore() var executionService: BehaviorSubject[WorkflowExecutionService] = BehaviorSubject.create() @@ -150,8 +149,7 @@ class WorkflowService( evtPub.subscribe { evts: Iterable[TexeraWebSocketEvent] => evts.foreach(onNext) } ) .toSeq - val errorSubscription = errorSubject.subscribe { evt: TexeraWebSocketEvent => onNext(evt) } - new CompositeDisposable(subscriptions :+ errorSubscription: _*) + new CompositeDisposable(subscriptions: _*) } def connectToExecution(onNext: TexeraWebSocketEvent => Unit): Disposable = { @@ -277,14 +275,12 @@ class WorkflowService( } } } - // Once the execution is published via `executionService.onNext`, the normal - // state-store path surfaces fatal errors to the UI: `errorHandler` writes - // them into `executionStateStore.metadataStore`, whose diff handler (set up - // in the WorkflowExecutionService constructor) emits a WorkflowErrorEvent - // that `connectToExecution` forwards. Before that point, neither the emitter - // nor a subscriber exists yet, so a failure in the constructor itself would - // be recorded but never reach the frontend -- see the fallback in `catch`. - var executionPublished = false + // WorkflowExecutionService construction does no external work and cannot + // throw; it registers its error/state diff handler up front. Once published + // via `executionService.onNext`, any failure in `executeWorkflow()` is + // recorded by `errorHandler` into the metadata store, whose handler emits a + // WorkflowErrorEvent that `connectToExecution` forwards -- a single + // reporting site, no separate pre-publish fallback needed. try { val execution = new WorkflowExecutionService( controllerConf, @@ -298,36 +294,13 @@ class WorkflowService( ) lifeCycleManager.registerCleanUpOnStateChange(executionStateStore) executionService.onNext(execution) - executionPublished = true execution.executeWorkflow() } catch { - case e: Throwable => - errorHandler(e) - // If the execution was never published, no `connectToExecution` - // subscriber is bound to `executionStateStore`, so the state-store path - // above cannot deliver the error. Push it directly in that pre-publish - // window only; once published, the state-store path already surfaces it - // (pushing here too would double-emit). - if (!executionPublished) { - reportFatalErrorsToSubscribers(executionStateStore) - } + case e: Throwable => errorHandler(e) } } - /** - * Push the fatal errors currently recorded in `stateStore` to connected - * websocket subscribers (via `errorSubject`). - * - * Fallback used only when execution initialization fails before the execution - * is published (e.g. the WorkflowExecutionService constructor throws): in that - * window the per-execution state store has no diff-handler emitter and no - * websocket subscriber, so the error -- already recorded by `errorHandler` -- - * would otherwise be logged but never reach the frontend. - */ - private[service] def reportFatalErrorsToSubscribers(stateStore: ExecutionStateStore): Unit = - errorSubject.onNext(WorkflowErrorEvent(stateStore.metadataStore.getState.fatalErrors)) - def convertToJson(frontendVersion: String): String = { val environmentVersionMap = Map( "engine_version" -> Json.toJson(frontendVersion) diff --git a/amber/src/test/scala/org/apache/texera/web/service/WorkflowServiceSpec.scala b/amber/src/test/scala/org/apache/texera/web/service/WorkflowServiceSpec.scala deleted file mode 100644 index 7c1d879c93d..00000000000 --- a/amber/src/test/scala/org/apache/texera/web/service/WorkflowServiceSpec.scala +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.texera.web.service - -import com.google.protobuf.timestamp.Timestamp -import org.apache.texera.amber.core.virtualidentity.WorkflowIdentity -import org.apache.texera.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE -import org.apache.texera.amber.core.workflowruntimestate.WorkflowFatalError -import org.apache.texera.web.model.websocket.event.{TexeraWebSocketEvent, WorkflowErrorEvent} -import org.apache.texera.web.storage.ExecutionStateStore -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -import java.time.Instant -import scala.collection.mutable.ArrayBuffer - -/** - * Unit tests for `WorkflowService.reportFatalErrorsToSubscribers`, the seam - * that surfaces init-time fatal errors to the websocket. When execution - * initialization fails, the error is recorded in the metadata store; this push - * is what makes it visible to connected clients instead of only logged. - */ -class WorkflowServiceSpec extends AnyFlatSpec with Matchers { - - private def fatalError(message: String): WorkflowFatalError = - WorkflowFatalError(EXECUTION_FAILURE, Timestamp(Instant.now), message, "", "", "") - - /** A WorkflowService with a subscriber collecting every event it pushes. */ - private def serviceWithCollector(): (WorkflowService, ArrayBuffer[TexeraWebSocketEvent]) = { - val service = new WorkflowService(WorkflowIdentity(1), computingUnitId = 1, cleanUpTimeout = 30) - val events = ArrayBuffer.empty[TexeraWebSocketEvent] - service.connect(evt => events += evt) - (service, events) - } - - private def errorEventsIn(events: ArrayBuffer[TexeraWebSocketEvent]): Seq[WorkflowErrorEvent] = - events.collect { case e: WorkflowErrorEvent => e }.toSeq - - "WorkflowService" should - "push a WorkflowErrorEvent carrying the store's fatal error to connected subscribers" in { - val (service, events) = serviceWithCollector() - val store = new ExecutionStateStore() - val err = fatalError("boom during init") - store.metadataStore.updateState(_.addFatalErrors(err)) - - service.reportFatalErrorsToSubscribers(store) - - val errorEvents = errorEventsIn(events) - errorEvents should have size 1 - // Forwards exactly the store's fatal errors -- no more, no less. - errorEvents.head.fatalErrors should contain theSameElementsAs Seq(err) - } - - it should "carry every fatal error currently recorded in the store" in { - val (service, events) = serviceWithCollector() - val store = new ExecutionStateStore() - val first = fatalError("first") - val second = fatalError("second") - store.metadataStore.updateState(_.addFatalErrors(first).addFatalErrors(second)) - - service.reportFatalErrorsToSubscribers(store) - - val errorEvents = errorEventsIn(events) - errorEvents should have size 1 - // Exactly the two recorded errors -- no extras. - errorEvents.head.fatalErrors should contain theSameElementsAs Seq(first, second) - } -} From 351fed2f5f0c47a45668582ba010f7351cb16bbf Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Wed, 24 Jun 2026 00:31:44 -0700 Subject: [PATCH 2/2] test(execution-service): cover the consolidated error-reporting path Address review (Copilot + @Yicong-Huang): deleting WorkflowServiceSpec left the single-reporting-site invariant untested. Add WorkflowExecutionServiceSpec: - a recorded fatal error surfaces as a WorkflowErrorEvent through the metadata-store diff handler registered at construction (the regression guard for init-error surfacing); - a state change emits a WorkflowStateEvent (handler's other branch). The construction-unused controllerConfig/resultService are passed as null on purpose, so a future change that does external work during construction (which would reopen the pre-publish gap) fails this test. --- .../WorkflowExecutionServiceSpec.scala | 112 ++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 amber/src/test/scala/org/apache/texera/web/service/WorkflowExecutionServiceSpec.scala diff --git a/amber/src/test/scala/org/apache/texera/web/service/WorkflowExecutionServiceSpec.scala b/amber/src/test/scala/org/apache/texera/web/service/WorkflowExecutionServiceSpec.scala new file mode 100644 index 00000000000..199d6dffed6 --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/web/service/WorkflowExecutionServiceSpec.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.service + +import com.google.protobuf.timestamp.Timestamp +import org.apache.texera.amber.core.workflow.{WorkflowContext, WorkflowSettings} +import org.apache.texera.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE +import org.apache.texera.amber.core.workflowruntimestate.WorkflowFatalError +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.RUNNING +import org.apache.texera.web.model.websocket.event.{ + TexeraWebSocketEvent, + WorkflowErrorEvent, + WorkflowStateEvent +} +import org.apache.texera.web.model.websocket.request.{LogicalPlanPojo, WorkflowExecuteRequest} +import org.apache.texera.web.storage.ExecutionStateStore +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.net.URI +import java.time.Instant +import scala.collection.mutable + +/** + * Regression guard for the consolidated init-error reporting path (#5921): + * `WorkflowExecutionService` registers its metadata-store diff handler at + * construction, so a fatalErrors update -- e.g. the one `errorHandler` records + * when `executeWorkflow` fails -- surfaces as a `WorkflowErrorEvent` through the + * normal websocket-event observable, with no separate pre-publish fallback. + * + * The unused `controllerConfig` / `resultService` are passed as `null` on + * purpose: construction must stay side-effect-free (all throwing work is in + * `executeWorkflow`), so a future change that dereferences them during + * construction would fail here. + */ +class WorkflowExecutionServiceSpec extends AnyFlatSpec with Matchers { + + private def buildService(store: ExecutionStateStore): WorkflowExecutionService = { + val request = WorkflowExecuteRequest( + executionName = "test", + engineVersion = "test", + logicalPlan = LogicalPlanPojo(List.empty, List.empty, List.empty, List.empty), + replayFromExecution = None, + workflowSettings = WorkflowSettings(), + emailNotificationEnabled = false, + computingUnitId = 0 + ) + new WorkflowExecutionService( + null, + new WorkflowContext(), + null, + request, + store, + (_: Throwable) => (), + None, + new URI("vfs:///test") + ) + } + + /** Subscribe to the metadata store's websocket-event stream and collect events. */ + private def collectEvents( + store: ExecutionStateStore + ): mutable.ArrayBuffer[TexeraWebSocketEvent] = { + val events = mutable.ArrayBuffer.empty[TexeraWebSocketEvent] + store.metadataStore.getWebsocketEventObservable.subscribe { + (evts: Iterable[TexeraWebSocketEvent]) => events ++= evts + } + events + } + + "WorkflowExecutionService" should + "surface a recorded fatal error as a WorkflowErrorEvent via the metadata-store handler" in { + val store = new ExecutionStateStore() + buildService(store) // registers the diff handler at construction + val events = collectEvents(store) + + val err = + WorkflowFatalError(EXECUTION_FAILURE, Timestamp(Instant.now), "boom during init", "", "", "") + store.metadataStore.updateState(_.addFatalErrors(err)) + + val errorEvents = events.collect { case e: WorkflowErrorEvent => e } + errorEvents should have size 1 + errorEvents.head.fatalErrors should contain(err) + } + + it should "emit a WorkflowStateEvent when the execution state changes" in { + val store = new ExecutionStateStore() + buildService(store) + val events = collectEvents(store) + + store.metadataStore.updateState(_.withState(RUNNING)) + + events.collect { case e: WorkflowStateEvent => e } should not be empty + } +}