Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/indexer/src/tasks/IndexBlockTask.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
BlockQueue,
task,
Task,
TaskSerializer,
TaskWorkerModule,
Expand All @@ -13,6 +14,7 @@ import {
} from "./IndexBlockTaskParameters";

@injectable()
@task()
export class IndexBlockTask
extends TaskWorkerModule
implements Task<IndexBlockTaskParameters, string | void>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import {
ProvableMethodExecutionContext,
} from "@proto-kit/common";

import { TaskWorkerModule } from "../../../worker/worker/TaskWorkerModule";
import {
task,
TaskWorkerModule,
} from "../../../worker/worker/TaskWorkerModule";
import { Task, TaskSerializer } from "../../../worker/flow/Task";
import {
PairProofTaskSerializer,
Expand All @@ -21,6 +24,7 @@ import {

@injectable()
@scoped(Lifecycle.ContainerScoped)
@task()
export class BlockReductionTask
extends TaskWorkerModule
implements Task<PairTuple<BlockProof>, BlockProof>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import {

import { Task, TaskSerializer } from "../../../worker/flow/Task";
import { ProofTaskSerializer } from "../../../helpers/utils";
import { TaskWorkerModule } from "../../../worker/worker/TaskWorkerModule";
import {
task,
TaskWorkerModule,
} from "../../../worker/worker/TaskWorkerModule";
import { PairingDerivedInput } from "../flow/ReductionTaskFlow";
import type { TaskStateRecord } from "../tracing/BlockTracingService";

Expand Down Expand Up @@ -57,6 +60,7 @@ export type NewBlockProvingParameters = PairingDerivedInput<

@injectable()
@scoped(Lifecycle.ContainerScoped)
@task()
export class NewBlockTask
extends TaskWorkerModule
implements Task<NewBlockProvingParameters, BlockProof>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import { CompileRegistry } from "@proto-kit/common";

import { Task, TaskSerializer } from "../../../worker/flow/Task";
import { ProofTaskSerializer } from "../../../helpers/utils";
import { TaskWorkerModule } from "../../../worker/worker/TaskWorkerModule";
import {
task,
TaskWorkerModule,
} from "../../../worker/worker/TaskWorkerModule";
import { PreFilledStateService } from "../../../state/prefilled/PreFilledStateService";
import { PendingTransaction } from "../../../mempool/PendingTransaction";
import { TaskStateRecord } from "../tracing/BlockTracingService";
Expand All @@ -31,6 +34,7 @@ export interface RuntimeProofParameters {

@injectable()
@scoped(Lifecycle.ContainerScoped)
@task()
export class RuntimeProvingTask
extends TaskWorkerModule
implements Task<RuntimeProofParameters, RuntimeProof>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import {
ProvableMethodExecutionContext,
} from "@proto-kit/common";

import { TaskWorkerModule } from "../../../worker/worker/TaskWorkerModule";
import {
task,
TaskWorkerModule,
} from "../../../worker/worker/TaskWorkerModule";
import { Task, TaskSerializer } from "../../../worker/flow/Task";
import {
PairProofTaskSerializer,
Expand All @@ -21,6 +24,7 @@ import {

@injectable()
@scoped(Lifecycle.ContainerScoped)
@task()
export class StateTransitionReductionTask
extends TaskWorkerModule
implements Task<PairTuple<StateTransitionProof>, StateTransitionProof>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ import {
ProvableMethodExecutionContext,
CompileRegistry,
LinkedMerkleTreeWitness,
dependencyFactory,
} from "@proto-kit/common";

import { Task, TaskSerializer } from "../../../worker/flow/Task";
import { ProofTaskSerializer } from "../../../helpers/utils";
import { TaskWorkerModule } from "../../../worker/worker/TaskWorkerModule";
import {
task,
TaskWorkerModule,
} from "../../../worker/worker/TaskWorkerModule";

import { StateTransitionParametersSerializer } from "./serializers/StateTransitionParametersSerializer";

Expand All @@ -31,6 +35,8 @@ export interface StateTransitionProofParameters {

@injectable()
@scoped(Lifecycle.ContainerScoped)
@task()
@dependencyFactory()
export class StateTransitionTask
extends TaskWorkerModule
implements Task<StateTransitionProofParameters, StateTransitionProof>
Expand Down Expand Up @@ -61,6 +67,14 @@ export class StateTransitionTask
);
}

public static dependencies() {
return {
// STProverCompileTask: {
// useClass: STProverCompileTask,
// },
};
}

public async compute(
input: StateTransitionProofParameters
): Promise<StateTransitionProof> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import {
import { ProofTaskSerializer } from "../../../helpers/utils";
import { TaskSerializer, Task } from "../../../worker/flow/Task";
import { PreFilledStateService } from "../../../state/prefilled/PreFilledStateService";
import { TaskWorkerModule } from "../../../worker/worker/TaskWorkerModule";
import {
task,
TaskWorkerModule,
} from "../../../worker/worker/TaskWorkerModule";
import type { TaskStateRecord } from "../tracing/BlockTracingService";

import { TransactionProvingTaskParameterSerializer } from "./serializers/TransactionProvingTaskParameterSerializer";
Expand Down Expand Up @@ -49,6 +52,7 @@ export async function executeWithPrefilledStateService<Return>(

@injectable()
@scoped(Lifecycle.ContainerScoped)
@task()
export class TransactionProvingTask
extends TaskWorkerModule
implements Task<TransactionProvingTaskParameters, TransactionProof>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import {
ProvableMethodExecutionContext,
} from "@proto-kit/common";

import { TaskWorkerModule } from "../../../worker/worker/TaskWorkerModule";
import {
task,
TaskWorkerModule,
} from "../../../worker/worker/TaskWorkerModule";
import { Task, TaskSerializer } from "../../../worker/flow/Task";
import {
PairProofTaskSerializer,
Expand All @@ -21,6 +24,7 @@ import {

@injectable()
@scoped(Lifecycle.ContainerScoped)
@task()
export class TransactionReductionTask
extends TaskWorkerModule
implements Task<PairTuple<TransactionProof>, TransactionProof>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {
Protocol,
} from "@proto-kit/protocol";

import { task } from "../../../../worker/worker/TaskWorkerModule";

import { CircuitCompileTask } from "./CircuitCompileTask";

@injectable()
Expand Down Expand Up @@ -33,20 +35,23 @@ export class ProtocolCompileTask extends CircuitCompileTask {
}

@injectable()
@task()
export class BlockProverCompileTask extends ProtocolCompileTask {
public getTargetProtocolModule() {
return "BlockProver";
}
}

@injectable()
@task()
export class STProverCompileTask extends ProtocolCompileTask {
public getTargetProtocolModule() {
return "StateTransitionProver";
}
}

@injectable()
@task()
export class TransactionProverCompileTask extends ProtocolCompileTask {
public getTargetProtocolModule() {
return "TransactionProver";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import {
Protocol,
} from "@proto-kit/protocol";

import { task } from "../../../../worker/worker/TaskWorkerModule";

import { CircuitCompileTask } from "./CircuitCompileTask";

@injectable()
@scoped(Lifecycle.ContainerScoped)
@task()
export class RuntimeCompileTask extends CircuitCompileTask {
public name = "compile-runtime";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import {
} from "@proto-kit/protocol";

import { BatchProducerModule } from "../../BatchProducerModule";
import { task } from "../../../../worker/worker/TaskWorkerModule";

import { CircuitCompileTask } from "./CircuitCompileTask";

@injectable()
@scoped(Lifecycle.ContainerScoped)
@task()
export class SettlementCompileTask extends CircuitCompileTask {
public name = "compile-settlement";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import {
DynamicProofTaskSerializer,
} from "../../helpers/utils";
import { Task, TaskSerializer } from "../../worker/flow/Task";
import { TaskWorkerModule } from "../../worker/worker/TaskWorkerModule";
import { task, TaskWorkerModule } from "../../worker/worker/TaskWorkerModule";

import { ContractRegistry } from "./ContractRegistry";

Expand Down Expand Up @@ -74,6 +74,7 @@ export class SomeProofSubclass extends Proof<Field, Void> {
*/
@injectable()
@scoped(Lifecycle.ContainerScoped)
@task()
export class SettlementProvingTask
extends TaskWorkerModule
implements Task<TransactionTaskArgs, TransactionTaskResult>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
} from "../../protocol/production/tasks/serializers/ArtifactionRecordSerializer";
import { SignedSettlementPermissions } from "../../settlement/permissions/SignedSettlementPermissions";
import { ProvenSettlementPermissions } from "../../settlement/permissions/ProvenSettlementPermissions";
import { task } from "../worker/TaskWorkerModule";

import { CloseWorkerError } from "./CloseWorkerError";

Expand All @@ -39,6 +40,7 @@ export type WorkerStartupPayload = {
};

@injectable()
@task()
export class WorkerRegistrationTask
extends AbstractStartupTask<WorkerStartupPayload, boolean>
implements Task<WorkerStartupPayload, boolean>
Expand Down
6 changes: 2 additions & 4 deletions packages/sequencer/src/worker/worker/FlowTaskWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,14 @@ const errors = {
};

// Had to use any here, because otherwise you couldn't assign any tasks to it
export class FlowTaskWorker<
Tasks extends Task<any, any>[],
> implements Closeable {
export class FlowTaskWorker implements Closeable {
private readonly queue: TaskQueue;

private workers: Record<string, Closeable> = {};

public constructor(
mq: TaskQueue,
private readonly tasks: Tasks
private readonly tasks: Task<any, any>[]
) {
this.queue = mq;
}
Expand Down
18 changes: 7 additions & 11 deletions packages/sequencer/src/worker/worker/LocalTaskWorkerModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import {
ModulesRecord,
NoConfig,
Presets,
ResolvableModules,
StringKeyOf,
TypedClass,
} from "@proto-kit/common";
import { ReturnType } from "@proto-kit/protocol";
Expand Down Expand Up @@ -70,9 +68,7 @@ export class LocalTaskWorkerModule<Tasks extends TaskWorkerModulesRecord>

public containerEvents = new EventEmitter<LocalTaskWorkerModuleEvents>();

private worker?: FlowTaskWorker<
InstanceType<ResolvableModules<Tasks>[StringKeyOf<Tasks>]>[]
> = undefined;
private worker?: FlowTaskWorker = undefined;

public static from<Tasks extends TaskWorkerModulesRecord>(
modules: Tasks
Expand Down Expand Up @@ -104,14 +100,14 @@ export class LocalTaskWorkerModule<Tasks extends TaskWorkerModulesRecord>
return this.container.resolve<TaskQueue>("TaskQueue");
}

public tasks() {
return this.container.resolveAll<Task<unknown, unknown>>("Task");
}

public async start(): Promise<void> {
const tasks = this.moduleNames.map((moduleName) => {
this.assertIsValidModuleName(moduleName);
const tasks = this.tasks();

const task = this.resolve(moduleName);
log.debug(`Resolved task ${task.name}`);
return task;
});
log.debug(`Resolved tasks ${tasks.map((t) => t.name)}`);

const worker = new FlowTaskWorker(this.taskQueue(), [...tasks]);
this.worker = worker;
Expand Down
8 changes: 7 additions & 1 deletion packages/sequencer/src/worker/worker/TaskWorkerModule.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { ConfigurableModule, NoConfig } from "@proto-kit/common";
import { ConfigurableModule, implement, NoConfig } from "@proto-kit/common";

import { Task } from "../flow/Task";

export abstract class TaskWorkerModule<
Config = NoConfig,
> extends ConfigurableModule<Config> {}

export function task<Input, Output>() {
return implement<Task<Input, Output>>("Task");
}