Skip to content

Commit 40f3edc

Browse files
authored
Partitioned queues (#186)
- Users can enable partitioning on a queue - On such queues, a partition key must be provided at enqueue (not enforceable on the client) - Partition keys cannot be provided on non-partitioned queues
1 parent 6d0f083 commit 40f3edc

File tree

12 files changed

+639
-66
lines changed

12 files changed

+639
-66
lines changed

dbos/admin_server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) {
124124
"QueueName": ws.QueueName,
125125
"Timeout": ws.Timeout,
126126
"DeduplicationID": ws.DeduplicationID,
127+
"Priority": ws.Priority,
128+
"QueuePartitionKey": ws.QueuePartitionKey,
127129
"Input": ws.Input,
128130
}
129131

dbos/admin_server_test.go

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,12 @@ func TestAdminServer(t *testing.T) {
560560
// Create a workflow queue with limited concurrency to keep workflows enqueued
561561
queue := NewWorkflowQueue(ctx, "test-queue", WithGlobalConcurrency(1))
562562

563+
// Create a partitioned queue for partition key test
564+
partitionedQueue := NewWorkflowQueue(ctx, "partitioned-test-queue", WithPartitionQueue(), WithGlobalConcurrency(1))
565+
566+
// Create a priority-enabled queue for priority and deduplication tests
567+
priorityQueue := NewWorkflowQueue(ctx, "priority-test-queue", WithPriorityEnabled(), WithGlobalConcurrency(1))
568+
563569
// Define a blocking workflow that will hold up the queue
564570
startEvent := NewEvent()
565571
blockingChan := make(chan struct{})
@@ -610,6 +616,22 @@ func TestAdminServer(t *testing.T) {
610616
enqueuedHandles = append(enqueuedHandles, handle)
611617
}
612618

619+
// Create workflow with partition key
620+
partitionHandle, err := RunWorkflow(ctx, blockingWorkflow, "partition-test", WithQueue(partitionedQueue.Name), WithQueuePartitionKey("partition-1"))
621+
require.NoError(t, err, "Failed to create workflow with partition key")
622+
enqueuedHandles = append(enqueuedHandles, partitionHandle)
623+
624+
// Create workflow with deduplication ID
625+
dedupID := "test-dedup-id"
626+
dedupHandle, err := RunWorkflow(ctx, blockingWorkflow, "dedup-test", WithQueue(priorityQueue.Name), WithDeduplicationID(dedupID))
627+
require.NoError(t, err, "Failed to create workflow with deduplication ID")
628+
enqueuedHandles = append(enqueuedHandles, dedupHandle)
629+
630+
// Create workflow with priority
631+
priorityHandle, err := RunWorkflow(ctx, blockingWorkflow, "priority-test", WithQueue(priorityQueue.Name), WithPriority(5))
632+
require.NoError(t, err, "Failed to create workflow with priority")
633+
enqueuedHandles = append(enqueuedHandles, priorityHandle)
634+
613635
// Create non-queued workflows that should NOT appear in queues-only results
614636
var regularHandles []WorkflowHandle[string]
615637
for i := range 2 {
@@ -639,10 +661,11 @@ func TestAdminServer(t *testing.T) {
639661
err = json.NewDecoder(respQueuesOnly.Body).Decode(&queuesOnlyWorkflows)
640662
require.NoError(t, err, "Failed to decode queues_only workflows response")
641663

642-
// Should have exactly 3 enqueued workflows and 1 pending workflow
643-
assert.Equal(t, 4, len(queuesOnlyWorkflows), "Expected exactly 4 workflows")
664+
// Should have exactly 7 workflows (3 original + 1 pending + 1 partition + 1 dedup + 1 priority)
665+
assert.Equal(t, 7, len(queuesOnlyWorkflows), "Expected exactly 7 workflows")
644666

645667
// Verify all returned workflows are from the queue and have ENQUEUED/PENDING status
668+
// Also verify QueuePartitionKey, DeduplicationID, and Priority fields are present
646669
for _, wf := range queuesOnlyWorkflows {
647670
status, ok := wf["Status"].(string)
648671
require.True(t, ok, "Status should be a string")
@@ -651,7 +674,37 @@ func TestAdminServer(t *testing.T) {
651674

652675
queueName, ok := wf["QueueName"].(string)
653676
require.True(t, ok, "QueueName should be a string")
654-
assert.Equal(t, queue.Name, queueName, "Expected queue name to be 'test-queue'")
677+
assert.NotEmpty(t, queueName, "QueueName should not be empty")
678+
679+
wfID, ok := wf["WorkflowUUID"].(string)
680+
require.True(t, ok, "WorkflowUUID should be a string")
681+
682+
// Verify QueuePartitionKey field is present (may be empty string for non-partitioned workflows)
683+
_, hasPartitionKey := wf["QueuePartitionKey"]
684+
assert.True(t, hasPartitionKey, "QueuePartitionKey field should be present for workflow %s", wfID)
685+
686+
// Verify DeduplicationID field is present (may be empty string for workflows without dedup ID)
687+
_, hasDedupID := wf["DeduplicationID"]
688+
assert.True(t, hasDedupID, "DeduplicationID field should be present for workflow %s", wfID)
689+
690+
// Verify Priority field is present (may be 0 for workflows without priority)
691+
_, hasPriority := wf["Priority"]
692+
assert.True(t, hasPriority, "Priority field should be present for workflow %s", wfID)
693+
694+
// Verify specific values for our test workflows
695+
if wfID == partitionHandle.GetWorkflowID() {
696+
partitionKey, ok := wf["QueuePartitionKey"].(string)
697+
require.True(t, ok, "QueuePartitionKey should be a string")
698+
assert.Equal(t, "partition-1", partitionKey, "Expected partition key to be 'partition-1'")
699+
} else if wfID == dedupHandle.GetWorkflowID() {
700+
dedupIDResp, ok := wf["DeduplicationID"].(string)
701+
require.True(t, ok, "DeduplicationID should be a string")
702+
assert.Equal(t, dedupID, dedupIDResp, "Expected deduplication ID to match")
703+
} else if wfID == priorityHandle.GetWorkflowID() {
704+
priority, ok := wf["Priority"].(float64) // JSON numbers decode as float64
705+
require.True(t, ok, "Priority should be a number")
706+
assert.Equal(t, float64(5), priority, "Expected priority to be 5")
707+
}
655708
}
656709

657710
// Verify that the enqueued workflow IDs match

dbos/client.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,15 @@ func WithEnqueueTimeout(timeout time.Duration) EnqueueOption {
106106
}
107107
}
108108

109+
// WithEnqueueQueuePartitionKey sets the queue partition key for partitioned queues.
110+
// When a queue is partitioned, workflows with the same partition key are processed
111+
// with separate concurrency limits per partition.
112+
func WithEnqueueQueuePartitionKey(partitionKey string) EnqueueOption {
113+
return func(opts *enqueueOptions) {
114+
opts.queuePartitionKey = partitionKey
115+
}
116+
}
117+
109118
type enqueueOptions struct {
110119
workflowName string
111120
workflowID string
@@ -114,6 +123,7 @@ type enqueueOptions struct {
114123
priority uint
115124
workflowTimeout time.Duration
116125
workflowInput any
126+
queuePartitionKey string
117127
}
118128

119129
// EnqueueWorkflow enqueues a workflow to a named queue for deferred execution.
@@ -134,6 +144,19 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu
134144
opt(params)
135145
}
136146

147+
if len(queueName) == 0 {
148+
return nil, fmt.Errorf("queue name is required")
149+
}
150+
151+
if len(workflowName) == 0 {
152+
return nil, fmt.Errorf("workflow name is required")
153+
}
154+
155+
// Validate partition key and deduplication ID are not both provided (they are incompatible)
156+
if len(params.queuePartitionKey) > 0 && len(params.deduplicationID) > 0 {
157+
return nil, fmt.Errorf("partition key and deduplication ID cannot be used together")
158+
}
159+
137160
workflowID := params.workflowID
138161
if workflowID == "" {
139162
workflowID = uuid.New().String()
@@ -160,6 +183,7 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu
160183
QueueName: queueName,
161184
DeduplicationID: params.deduplicationID,
162185
Priority: int(params.priority),
186+
QueuePartitionKey: params.queuePartitionKey,
163187
}
164188

165189
uncancellableCtx := WithoutCancel(dbosCtx)
@@ -205,6 +229,7 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu
205229
// - WithEnqueueDeduplicationID: Deduplication identifier for idempotent enqueuing
206230
// - WithEnqueuePriority: Execution priority
207231
// - WithEnqueueTimeout: Maximum execution time for the workflow
232+
// - WithEnqueueQueuePartitionKey: Queue partition key for partitioned queues
208233
//
209234
// Returns a typed workflow handle that can be used to check status and retrieve results.
210235
// The handle uses polling to check workflow completion since the execution is asynchronous.

dbos/client_test.go

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/stretchr/testify/require"
1313
)
1414

15-
func TestEnqueue(t *testing.T) {
15+
func TestClientEnqueue(t *testing.T) {
1616
// Setup server context - this will process tasks
1717
serverCtx := setupDBOS(t, true, true)
1818

@@ -23,6 +23,10 @@ func TestEnqueue(t *testing.T) {
2323
// Must be created before Launch()
2424
priorityQueue := NewWorkflowQueue(serverCtx, "priority-test-queue", WithGlobalConcurrency(1), WithPriorityEnabled())
2525

26+
// Create a partitioned queue for partition key test
27+
// Must be created before Launch()
28+
partitionedQueue := NewWorkflowQueue(serverCtx, "client-partitioned-queue", WithPartitionQueue())
29+
2630
// Track execution order for priority test
2731
var executionOrder []string
2832
var mu sync.Mutex
@@ -59,6 +63,12 @@ func TestEnqueue(t *testing.T) {
5963
}
6064
RegisterWorkflow(serverCtx, priorityWorkflow, WithWorkflowName("PriorityWorkflow"))
6165

66+
// Simple workflow for partitioned queue test
67+
partitionedWorkflow := func(ctx DBOSContext, input string) (string, error) {
68+
return "partitioned: " + input, nil
69+
}
70+
RegisterWorkflow(serverCtx, partitionedWorkflow, WithWorkflowName("PartitionedWorkflow"))
71+
6272
// Launch the server context to start processing tasks
6373
err := Launch(serverCtx)
6474
require.NoError(t, err)
@@ -257,6 +267,77 @@ func TestEnqueue(t *testing.T) {
257267
assert.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after deduplication test")
258268
})
259269

270+
t.Run("EnqueueToPartitionedQueue", func(t *testing.T) {
271+
// Enqueue a workflow to a partitioned queue with a partition key
272+
handle, err := Enqueue[string, string](client, partitionedQueue.Name, "PartitionedWorkflow", "test-input",
273+
WithEnqueueQueuePartitionKey("partition-1"),
274+
WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion()))
275+
require.NoError(t, err, "failed to enqueue workflow to partitioned queue")
276+
277+
// Verify we got a polling handle
278+
_, ok := handle.(*workflowPollingHandle[string])
279+
require.True(t, ok, "expected handle to be of type workflowPollingHandle, got %T", handle)
280+
281+
// Get the result
282+
result, err := handle.GetResult()
283+
require.NoError(t, err, "failed to get result from partitioned queue workflow")
284+
285+
expectedResult := "partitioned: test-input"
286+
assert.Equal(t, expectedResult, result, "expected result to match")
287+
288+
// Verify the workflow status
289+
status, err := handle.GetStatus()
290+
require.NoError(t, err, "failed to get workflow status")
291+
292+
assert.Equal(t, WorkflowStatusSuccess, status.Status, "expected workflow status to be SUCCESS")
293+
assert.Equal(t, "PartitionedWorkflow", status.Name, "expected workflow name to match")
294+
assert.Equal(t, partitionedQueue.Name, status.QueueName, "expected queue name to match")
295+
296+
assert.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after partitioned queue test")
297+
})
298+
299+
t.Run("EnqueueWithPartitionKeyWithoutQueue", func(t *testing.T) {
300+
// Attempt to enqueue with a partition key but no queue name
301+
_, err := Enqueue[string, string](client, "", "PartitionedWorkflow", "test-input",
302+
WithEnqueueQueuePartitionKey("partition-1"))
303+
require.Error(t, err, "expected error when enqueueing with partition key but no queue name")
304+
305+
// Verify the error message contains the expected text
306+
assert.Contains(t, err.Error(), "queue name is required", "expected error message to contain 'queue name is required'")
307+
})
308+
309+
t.Run("EnqueueWithPartitionKeyAndDeduplicationID", func(t *testing.T) {
310+
// Attempt to enqueue with both partition key and deduplication ID
311+
// This should return an error
312+
_, err := Enqueue[string, string](client, partitionedQueue.Name, "PartitionedWorkflow", "test-input",
313+
WithEnqueueQueuePartitionKey("partition-1"),
314+
WithEnqueueDeduplicationID("dedup-id"))
315+
require.Error(t, err, "expected error when enqueueing with both partition key and deduplication ID")
316+
317+
// Verify the error message contains the expected text
318+
assert.Contains(t, err.Error(), "partition key and deduplication ID cannot be used together", "expected error message to contain validation message")
319+
})
320+
321+
t.Run("EnqueueWithEmptyQueueName", func(t *testing.T) {
322+
// Attempt to enqueue with empty queue name
323+
// This should return an error
324+
_, err := Enqueue[wfInput, string](client, "", "ServerWorkflow", wfInput{Input: "test-input"})
325+
require.Error(t, err, "expected error when enqueueing with empty queue name")
326+
327+
// Verify the error message contains the expected text
328+
assert.Contains(t, err.Error(), "queue name is required", "expected error message to contain 'queue name is required'")
329+
})
330+
331+
t.Run("EnqueueWithEmptyWorkflowName", func(t *testing.T) {
332+
// Attempt to enqueue with empty workflow name
333+
// This should return an error
334+
_, err := Enqueue[wfInput, string](client, queue.Name, "", wfInput{Input: "test-input"})
335+
require.Error(t, err, "expected error when enqueueing with empty workflow name")
336+
337+
// Verify the error message contains the expected text
338+
assert.Contains(t, err.Error(), "workflow name is required", "expected error message to contain 'workflow name is required'")
339+
})
340+
260341
// Verify all queue entries are cleaned up
261342
require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after client tests")
262343
}

dbos/conductor_protocol.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ type listWorkflowsConductorResponseBody struct {
8787
CreatedAt *string `json:"CreatedAt,omitempty"`
8888
UpdatedAt *string `json:"UpdatedAt,omitempty"`
8989
QueueName *string `json:"QueueName,omitempty"`
90+
QueuePartitionKey *string `json:"QueuePartitionKey,omitempty"`
91+
DeduplicationID *string `json:"DeduplicationID,omitempty"`
92+
Priority *int `json:"Priority,omitempty"`
9093
ApplicationVersion *string `json:"ApplicationVersion,omitempty"`
9194
ExecutorID *string `json:"ExecutorID,omitempty"`
9295
}
@@ -167,6 +170,21 @@ func formatListWorkflowsResponseBody(wf WorkflowStatus) listWorkflowsConductorRe
167170
output.QueueName = &wf.QueueName
168171
}
169172

173+
// Copy queue partition key
174+
if wf.QueuePartitionKey != "" {
175+
output.QueuePartitionKey = &wf.QueuePartitionKey
176+
}
177+
178+
// Copy deduplication ID
179+
if wf.DeduplicationID != "" {
180+
output.DeduplicationID = &wf.DeduplicationID
181+
}
182+
183+
// Copy priority
184+
if wf.Priority != 0 {
185+
output.Priority = &wf.Priority
186+
}
187+
170188
// Copy application version
171189
if wf.ApplicationVersion != "" {
172190
output.ApplicationVersion = &wf.ApplicationVersion

dbos/dbos.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ func WithValue(ctx DBOSContext, key, val any) DBOSContext {
205205
applicationVersion: dbosCtx.applicationVersion,
206206
executorID: dbosCtx.executorID,
207207
applicationID: dbosCtx.applicationID,
208+
queueRunner: dbosCtx.queueRunner,
208209
}
209210
childCtx.launched.Store(launched)
210211
return childCtx
@@ -233,6 +234,7 @@ func WithoutCancel(ctx DBOSContext) DBOSContext {
233234
applicationVersion: dbosCtx.applicationVersion,
234235
executorID: dbosCtx.executorID,
235236
applicationID: dbosCtx.applicationID,
237+
queueRunner: dbosCtx.queueRunner,
236238
}
237239
childCtx.launched.Store(launched)
238240
return childCtx
@@ -260,6 +262,7 @@ func WithTimeout(ctx DBOSContext, timeout time.Duration) (DBOSContext, context.C
260262
applicationVersion: dbosCtx.applicationVersion,
261263
executorID: dbosCtx.executorID,
262264
applicationID: dbosCtx.applicationID,
265+
queueRunner: dbosCtx.queueRunner,
263266
}
264267
childCtx.launched.Store(launched)
265268
return childCtx, cancelFunc

dbos/dbos_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ func TestConfig(t *testing.T) {
260260

261261
err = sysDB.pool.QueryRow(dbCtx, "SELECT version FROM dbos.dbos_migrations").Scan(&version)
262262
require.NoError(t, err)
263-
assert.Equal(t, int64(1), version, "migration version should be 1 (after initial migration)")
263+
assert.Equal(t, int64(2), version, "migration version should be 1 (after initial migration)")
264264

265265
// Test manual shutdown and recreate
266266
Shutdown(ctx, 1*time.Minute)
@@ -468,7 +468,7 @@ func TestCustomSystemDBSchema(t *testing.T) {
468468

469469
err = sysDB.pool.QueryRow(dbCtx, fmt.Sprintf("SELECT version FROM %s.dbos_migrations", customSchema)).Scan(&version)
470470
require.NoError(t, err)
471-
assert.Equal(t, int64(1), version, "migration version should be 1 (after initial migration)")
471+
assert.Equal(t, int64(2), version, "migration version should be 2 (after initial migration and queue partition key migration)")
472472
})
473473

474474
// Test workflows for exercising Send/Recv and SetEvent/GetEvent
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
-- Migration 2: Add queue_partition_key column to workflow_status table
2+
-- This enables partitioned queues where workflows can be distributed across
3+
-- dynamically created queue partitions with separate concurrency limits per partition.
4+
5+
ALTER TABLE %s.workflow_status
6+
ADD COLUMN queue_partition_key TEXT;
7+

0 commit comments

Comments
 (0)