Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions packages/durabletask-js/src/worker/task-hub-grpc-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -967,9 +967,12 @@ export class TaskHubGrpcWorker {

// Add V2 operationInfos if provided (used by DTS backend)
if (operationInfos && operationInfos.length > 0) {
// Take only as many operationInfos as there are results
// Take only as many operationInfos as there are results.
// Use resultsCount directly (not `resultsCount || operationInfos.length`)
// because 0 is a valid count when a framework-level error produces zero
// individual results; the falsy-OR would incorrectly include all infos.
const resultsCount = batchResult.getResultsList().length;
const infosToInclude = operationInfos.slice(0, resultsCount || operationInfos.length);
const infosToInclude = operationInfos.slice(0, resultsCount);
batchResult.setOperationinfosList(infosToInclude);
}

Expand Down
133 changes: 133 additions & 0 deletions packages/durabletask-js/test/worker-entity.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,4 +330,137 @@ describe("TaskHubGrpcWorker", () => {
expect(pendingWorkItems.size).toBe(0);
});
});

describe("V2 Entity operationInfos handling", () => {
it("should include operationInfos matching result count on successful V2 execution", async () => {
// Arrange
const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() });
const factory: EntityFactory = () => new CounterEntity();
worker.addNamedEntity("counter", factory);

const mockStub = createMockStub();
const req = createEntityRequestV2("counter", "key1");

// Act
(worker as any)._executeEntityV2(req, COMPLETION_TOKEN, mockStub.stub);
const pendingWorkItems: Set<Promise<void>> = (worker as any)._pendingWorkItems;
await Promise.all(pendingWorkItems);

// Assert - one result per operation, so one operationInfo should be included
const result = mockStub.capturedResult!;
expect(result.getResultsList().length).toBe(1);
expect(result.getOperationinfosList().length).toBe(1);
expect(result.getOperationinfosList()[0].getRequestid()).toBe("req-1");
});

it("should include zero operationInfos when framework error produces zero results", async () => {
// Arrange - register an entity whose factory throws a framework-level error
const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() });
const throwingFactory: EntityFactory = () => {
throw new Error("factory explosion");
};
worker.addNamedEntity("broken", throwingFactory);

const mockStub = createMockStub();
const req = createEntityRequestV2("broken", "key1");

// Act
(worker as any)._executeEntityV2(req, COMPLETION_TOKEN, mockStub.stub);
const pendingWorkItems: Set<Promise<void>> = (worker as any)._pendingWorkItems;
await Promise.all(pendingWorkItems);

// Assert - framework error: zero results AND zero operationInfos
const result = mockStub.capturedResult!;
expect(result.getResultsList().length).toBe(0);
expect(result.getOperationinfosList().length).toBe(0);
expect(result.getFailuredetails()).toBeDefined();
expect(result.getFailuredetails()!.getErrormessage()).toBe("factory explosion");
});

it("should include operationInfos for all results when entity is not found via V2", async () => {
// Arrange - no entity registered for the name in the request
const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() });

const mockStub = createMockStub();
const req = createEntityRequestV2("nonexistent", "key1");

// Act
(worker as any)._executeEntityV2(req, COMPLETION_TOKEN, mockStub.stub);
const pendingWorkItems: Set<Promise<void>> = (worker as any)._pendingWorkItems;
await Promise.all(pendingWorkItems);

// Assert - not-found path creates one error result per operation, so operationInfos should match
const result = mockStub.capturedResult!;
expect(result.getResultsList().length).toBe(1);
expect(result.getOperationinfosList().length).toBe(1);
});

it("should include operationInfos for multiple operations on successful V2 execution", async () => {
// Arrange
const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() });
const factory: EntityFactory = () => new CounterEntity();
worker.addNamedEntity("counter", factory);

const mockStub = createMockStub();

// Create a V2 request with multiple operations
const req = new pb.EntityRequest();
req.setInstanceid("@counter@key1");

const event1 = new pb.HistoryEvent();
const signaled1 = new pb.EntityOperationSignaledEvent();
signaled1.setOperation("increment");
signaled1.setRequestid("req-1");
event1.setEntityoperationsignaled(signaled1);

const event2 = new pb.HistoryEvent();
const signaled2 = new pb.EntityOperationSignaledEvent();
signaled2.setOperation("increment");
signaled2.setRequestid("req-2");
event2.setEntityoperationsignaled(signaled2);

req.setOperationrequestsList([event1, event2]);

// Act
(worker as any)._executeEntityV2(req, COMPLETION_TOKEN, mockStub.stub);
const pendingWorkItems: Set<Promise<void>> = (worker as any)._pendingWorkItems;
await Promise.all(pendingWorkItems);

// Assert - two results, two operationInfos
const result = mockStub.capturedResult!;
expect(result.getResultsList().length).toBe(2);
expect(result.getOperationinfosList().length).toBe(2);
expect(result.getOperationinfosList()[0].getRequestid()).toBe("req-1");
expect(result.getOperationinfosList()[1].getRequestid()).toBe("req-2");
});

it("should include zero operationInfos when entity run() throws on V2 execution", async () => {
// Arrange - entity whose run() throws
Comment on lines +437 to +438
const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() });

class ThrowingEntity implements ITaskEntity {
run(): never {
throw new Error("run explosion");
}
}
const factory: EntityFactory = () => new ThrowingEntity();
worker.addNamedEntity("thrower", factory);

const mockStub = createMockStub();
const req = createEntityRequestV2("thrower", "key1");

// Act
(worker as any)._executeEntityV2(req, COMPLETION_TOKEN, mockStub.stub);
const pendingWorkItems: Set<Promise<void>> = (worker as any)._pendingWorkItems;
await Promise.all(pendingWorkItems);

// Assert - the entity shim catches per-operation errors and creates per-operation
// failure results, so this should NOT be a framework-level error.
// The entity executor handles operation-level errors internally,
// so we expect one result (failure) and one operationInfo.
const result = mockStub.capturedResult!;
expect(result.getResultsList().length).toBe(1);
expect(result.getOperationinfosList().length).toBe(1);
});
});
});
Loading