Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@ class WorkflowExecutionService(
) extends SubscriptionManager
with LazyLogging {

workflowContext.workflowSettings = request.workflowSettings
val wsInput = new WebsocketInput(errorHandler)

// Wire error/state reporting first, before any other construction work, so a
// fatalErrors update (recorded by errorHandler) always has an emitter.
// Construction itself does no external work and cannot throw; the throwing
// work lives in executeWorkflow(), which runs after this execution is
// published, so its failures reach the UI through this same handler -- no
// separate pre-publish reporting path is needed.
addSubscription(
executionStateStore.metadataStore.registerDiffHandler((oldState, newState) => {
val outputEvents = new mutable.ArrayBuffer[TexeraWebSocketEvent]()
Expand All @@ -85,6 +88,9 @@ class WorkflowExecutionService(
})
)

workflowContext.workflowSettings = request.workflowSettings
val wsInput = new WebsocketInput(errorHandler)

private def createStateEvent(state: ExecutionMetadataStore): WorkflowStateEvent = {
if (state.isRecovering && state.state != COMPLETED) {
WorkflowStateEvent("Recovering")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import org.apache.texera.amber.error.ErrorUtils.{
}
import org.apache.texera.dao.jooq.generated.tables.pojos.User
import org.apache.texera.service.util.LargeBinaryManager
import org.apache.texera.web.model.websocket.event.{TexeraWebSocketEvent, WorkflowErrorEvent}
import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest
import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
import org.apache.texera.web.service.WorkflowService.mkWorkflowStateId
Expand Down Expand Up @@ -101,7 +101,6 @@ class WorkflowService(
with LazyLogging {

// state across execution:
private val errorSubject = BehaviorSubject.create[TexeraWebSocketEvent]().toSerialized
val stateStore = new WorkflowStateStore()
var executionService: BehaviorSubject[WorkflowExecutionService] = BehaviorSubject.create()

Expand Down Expand Up @@ -150,8 +149,7 @@ class WorkflowService(
evtPub.subscribe { evts: Iterable[TexeraWebSocketEvent] => evts.foreach(onNext) }
)
.toSeq
val errorSubscription = errorSubject.subscribe { evt: TexeraWebSocketEvent => onNext(evt) }
new CompositeDisposable(subscriptions :+ errorSubscription: _*)
new CompositeDisposable(subscriptions: _*)
}

def connectToExecution(onNext: TexeraWebSocketEvent => Unit): Disposable = {
Expand Down Expand Up @@ -277,14 +275,12 @@ class WorkflowService(
}
}
}
// Once the execution is published via `executionService.onNext`, the normal
// state-store path surfaces fatal errors to the UI: `errorHandler` writes
// them into `executionStateStore.metadataStore`, whose diff handler (set up
// in the WorkflowExecutionService constructor) emits a WorkflowErrorEvent
// that `connectToExecution` forwards. Before that point, neither the emitter
// nor a subscriber exists yet, so a failure in the constructor itself would
// be recorded but never reach the frontend -- see the fallback in `catch`.
var executionPublished = false
// WorkflowExecutionService construction does no external work and cannot
// throw; it registers its error/state diff handler up front. Once published
// via `executionService.onNext`, any failure in `executeWorkflow()` is
// recorded by `errorHandler` into the metadata store, whose handler emits a
// WorkflowErrorEvent that `connectToExecution` forwards -- a single
// reporting site, no separate pre-publish fallback needed.
Comment thread
aglinxinyuan marked this conversation as resolved.
try {
val execution = new WorkflowExecutionService(
controllerConf,
Expand All @@ -298,36 +294,13 @@ class WorkflowService(
)
lifeCycleManager.registerCleanUpOnStateChange(executionStateStore)
executionService.onNext(execution)
executionPublished = true
execution.executeWorkflow()
} catch {
case e: Throwable =>
errorHandler(e)
// If the execution was never published, no `connectToExecution`
// subscriber is bound to `executionStateStore`, so the state-store path
// above cannot deliver the error. Push it directly in that pre-publish
// window only; once published, the state-store path already surfaces it
// (pushing here too would double-emit).
if (!executionPublished) {
reportFatalErrorsToSubscribers(executionStateStore)
}
case e: Throwable => errorHandler(e)
}

}

/**
* Push the fatal errors currently recorded in `stateStore` to connected
* websocket subscribers (via `errorSubject`).
*
* Fallback used only when execution initialization fails before the execution
* is published (e.g. the WorkflowExecutionService constructor throws): in that
* window the per-execution state store has no diff-handler emitter and no
* websocket subscriber, so the error -- already recorded by `errorHandler` --
* would otherwise be logged but never reach the frontend.
*/
private[service] def reportFatalErrorsToSubscribers(stateStore: ExecutionStateStore): Unit =
errorSubject.onNext(WorkflowErrorEvent(stateStore.metadataStore.getState.fatalErrors))

def convertToJson(frontendVersion: String): String = {
val environmentVersionMap = Map(
"engine_version" -> Json.toJson(frontendVersion)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.web.service

import com.google.protobuf.timestamp.Timestamp
import org.apache.texera.amber.core.workflow.{WorkflowContext, WorkflowSettings}
import org.apache.texera.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE
import org.apache.texera.amber.core.workflowruntimestate.WorkflowFatalError
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.RUNNING
import org.apache.texera.web.model.websocket.event.{
TexeraWebSocketEvent,
WorkflowErrorEvent,
WorkflowStateEvent
}
import org.apache.texera.web.model.websocket.request.{LogicalPlanPojo, WorkflowExecuteRequest}
import org.apache.texera.web.storage.ExecutionStateStore
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.net.URI
import java.time.Instant
import scala.collection.mutable

/**
* Regression guard for the consolidated init-error reporting path (#5921):
* `WorkflowExecutionService` registers its metadata-store diff handler at
* construction, so a fatalErrors update -- e.g. the one `errorHandler` records
* when `executeWorkflow` fails -- surfaces as a `WorkflowErrorEvent` through the
* normal websocket-event observable, with no separate pre-publish fallback.
*
* The unused `controllerConfig` / `resultService` are passed as `null` on
* purpose: construction must stay side-effect-free (all throwing work is in
* `executeWorkflow`), so a future change that dereferences them during
* construction would fail here.
*/
class WorkflowExecutionServiceSpec extends AnyFlatSpec with Matchers {

private def buildService(store: ExecutionStateStore): WorkflowExecutionService = {
val request = WorkflowExecuteRequest(
executionName = "test",
engineVersion = "test",
logicalPlan = LogicalPlanPojo(List.empty, List.empty, List.empty, List.empty),
replayFromExecution = None,
workflowSettings = WorkflowSettings(),
emailNotificationEnabled = false,
computingUnitId = 0
)
new WorkflowExecutionService(
null,
new WorkflowContext(),
null,
request,
store,
(_: Throwable) => (),
None,
new URI("vfs:///test")
)
}

/** Subscribe to the metadata store's websocket-event stream and collect events. */
private def collectEvents(
store: ExecutionStateStore
): mutable.ArrayBuffer[TexeraWebSocketEvent] = {
val events = mutable.ArrayBuffer.empty[TexeraWebSocketEvent]
store.metadataStore.getWebsocketEventObservable.subscribe {
(evts: Iterable[TexeraWebSocketEvent]) => events ++= evts
}
events
}

"WorkflowExecutionService" should
"surface a recorded fatal error as a WorkflowErrorEvent via the metadata-store handler" in {
val store = new ExecutionStateStore()
buildService(store) // registers the diff handler at construction
val events = collectEvents(store)

val err =
WorkflowFatalError(EXECUTION_FAILURE, Timestamp(Instant.now), "boom during init", "", "", "")
store.metadataStore.updateState(_.addFatalErrors(err))

val errorEvents = events.collect { case e: WorkflowErrorEvent => e }
errorEvents should have size 1
errorEvents.head.fatalErrors should contain(err)
}

it should "emit a WorkflowStateEvent when the execution state changes" in {
val store = new ExecutionStateStore()
buildService(store)
val events = collectEvents(store)

store.metadataStore.updateState(_.withState(RUNNING))

events.collect { case e: WorkflowStateEvent => e } should not be empty
}
}

This file was deleted.

Loading