Skip to content

Commit 6d0f083

Browse files
authored
Distinguish nil values in the database (#184)
Use a special marker `__DBOS_NIL` to represent nil values in the database instead of storing as `NULL`.
1 parent 4e8fb10 commit 6d0f083

File tree

4 files changed

+65
-22
lines changed

4 files changed

+65
-22
lines changed

dbos/dbos.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ func (c *dbosContext) Value(key any) any {
185185
return c.ctx.Value(key)
186186
}
187187

188-
189188
// WithValue returns a copy of the DBOS context with the given key-value pair.
190189
// This is similar to context.WithValue but maintains DBOS context capabilities.
191190
// No-op if the provided context is not a concrete dbos.dbosContext.
@@ -300,7 +299,7 @@ func (c *dbosContext) ListRegisteredWorkflows(_ DBOSContext, opts ...ListRegiste
300299

301300
// Get all registered workflows and apply filters
302301
var filteredWorkflows []WorkflowRegistryEntry
303-
c.workflowRegistry.Range(func(key, value interface{}) bool {
302+
c.workflowRegistry.Range(func(key, value any) bool {
304303
workflow := value.(WorkflowRegistryEntry)
305304

306305
// Filter by scheduled only

dbos/serialization.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ import (
77
"reflect"
88
)
99

10+
const (
11+
// nilMarker is a special marker string used to represent nil values in the database.
12+
nilMarker = "__DBOS_NIL"
13+
)
14+
1015
type serializer[T any] interface {
1116
Encode(data T) (*string, error)
1217
Decode(data *string) (T, error)
@@ -21,8 +26,9 @@ func newJSONSerializer[T any]() serializer[T] {
2126
func (j *jsonSerializer[T]) Encode(data T) (*string, error) {
2227
// Check if the value is nil (for pointer types, slice, map, etc.)
2328
if isNilValue(data) {
24-
// For nil values, return nil pointer
25-
return nil, nil
29+
// For nil values, return the special marker so it can be stored in the database
30+
marker := string(nilMarker)
31+
return &marker, nil
2632
}
2733

2834
// Check if the value is a zero value (but not nil)
@@ -36,7 +42,7 @@ func (j *jsonSerializer[T]) Encode(data T) (*string, error) {
3642

3743
func (j *jsonSerializer[T]) Decode(data *string) (T, error) {
3844
// If data is a nil pointer, return nil (for pointer types) or zero value (for non-pointer types)
39-
if data == nil {
45+
if data == nil || *data == nilMarker {
4046
return getNilOrZeroValue[T](), nil
4147
}
4248

dbos/serialization_test.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/jackc/pgx/v5"
1112
"github.com/stretchr/testify/assert"
1213
"github.com/stretchr/testify/require"
1314
)
@@ -141,8 +142,8 @@ func testAllSerializationPaths[T any](
141142
wf := wfs[0]
142143
if isNilExpected {
143144
// Should be an empty string
144-
assert.Nil(t, wf.Input, "Workflow input should be nil")
145-
assert.Nil(t, wf.Output, "Workflow output should be nil")
145+
require.Nil(t, wf.Input, "Workflow input should be nil")
146+
require.Nil(t, wf.Output, "Workflow output should be nil")
146147
} else {
147148
require.NotNil(t, wf.Input)
148149
require.NotNil(t, wf.Output)
@@ -175,6 +176,40 @@ func testAllSerializationPaths[T any](
175176
}
176177
}
177178
})
179+
180+
// If nil is expected, verify the nil marker is stored in the database
181+
if isNilExpected {
182+
t.Run("DatabaseNilMarker", func(t *testing.T) {
183+
// Get the database pool to query directly
184+
dbosCtx, ok := executor.(*dbosContext)
185+
require.True(t, ok, "expected dbosContext")
186+
sysDB, ok := dbosCtx.systemDB.(*sysDB)
187+
require.True(t, ok, "expected sysDB")
188+
189+
// Query the database directly to check for the marker
190+
ctx := context.Background()
191+
query := fmt.Sprintf(`SELECT inputs, output FROM %s.workflow_status WHERE workflow_uuid = $1`, pgx.Identifier{sysDB.schema}.Sanitize())
192+
193+
var inputString, outputString *string
194+
err := sysDB.pool.QueryRow(ctx, query, workflowID).Scan(&inputString, &outputString)
195+
require.NoError(t, err, "failed to query workflow status")
196+
197+
// Both input and output should be the nil marker
198+
require.NotNil(t, inputString, "input should not be NULL in database")
199+
assert.Equal(t, nilMarker, *inputString, "input should be the nil marker")
200+
201+
require.NotNil(t, outputString, "output should not be NULL in database")
202+
assert.Equal(t, nilMarker, *outputString, "output should be the nil marker")
203+
204+
// Also check the step output in operation_outputs
205+
stepQuery := fmt.Sprintf(`SELECT output FROM %s.operation_outputs WHERE workflow_uuid = $1 ORDER BY function_id LIMIT 1`, pgx.Identifier{sysDB.schema}.Sanitize())
206+
var stepOutputString *string
207+
err = sysDB.pool.QueryRow(ctx, stepQuery, workflowID).Scan(&stepOutputString)
208+
require.NoError(t, err, "failed to query step output")
209+
require.NotNil(t, stepOutputString, "step output should not be NULL in database")
210+
assert.Equal(t, nilMarker, *stepOutputString, "step output should be the nil marker")
211+
})
212+
}
178213
}
179214

180215
// Helper function to test Send/Recv communication

dbos/workflow.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2077,28 +2077,30 @@ func (c *dbosContext) ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption)
20772077
if !ok {
20782078
return nil, fmt.Errorf("workflow input must be encoded string, got %T", workflows[i].Input)
20792079
}
2080-
if encodedInput == nil {
2081-
continue
2082-
}
2083-
decodedBytes, err := base64.StdEncoding.DecodeString(*encodedInput)
2084-
if err != nil {
2085-
return nil, fmt.Errorf("failed to decode base64 workflow input for %s: %w", workflows[i].ID, err)
2080+
if encodedInput == nil || *encodedInput == nilMarker {
2081+
workflows[i].Input = nil
2082+
} else {
2083+
decodedBytes, err := base64.StdEncoding.DecodeString(*encodedInput)
2084+
if err != nil {
2085+
return nil, fmt.Errorf("failed to decode base64 workflow input for %s: %w", workflows[i].ID, err)
2086+
}
2087+
workflows[i].Input = string(decodedBytes)
20862088
}
2087-
workflows[i].Input = string(decodedBytes)
20882089
}
20892090
if params.loadOutput && workflows[i].Output != nil {
20902091
encodedOutput, ok := workflows[i].Output.(*string)
20912092
if !ok {
20922093
return nil, fmt.Errorf("workflow output must be encoded *string, got %T", workflows[i].Output)
20932094
}
2094-
if encodedOutput == nil {
2095-
continue
2096-
}
2097-
decodedBytes, err := base64.StdEncoding.DecodeString(*encodedOutput)
2098-
if err != nil {
2099-
return nil, fmt.Errorf("failed to decode base64 workflow output for %s: %w", workflows[i].ID, err)
2095+
if encodedOutput == nil || *encodedOutput == nilMarker {
2096+
workflows[i].Output = nil
2097+
} else {
2098+
decodedBytes, err := base64.StdEncoding.DecodeString(*encodedOutput)
2099+
if err != nil {
2100+
return nil, fmt.Errorf("failed to decode base64 workflow output for %s: %w", workflows[i].ID, err)
2101+
}
2102+
workflows[i].Output = string(decodedBytes)
21002103
}
2101-
workflows[i].Output = string(decodedBytes)
21022104
}
21032105
}
21042106
}
@@ -2205,7 +2207,8 @@ func (c *dbosContext) GetWorkflowSteps(_ DBOSContext, workflowID string) ([]Step
22052207
if loadOutput {
22062208
for i := range steps {
22072209
encodedOutput := steps[i].Output
2208-
if encodedOutput == nil {
2210+
if encodedOutput == nil || *encodedOutput == nilMarker {
2211+
stepInfos[i].Output = nil
22092212
continue
22102213
}
22112214
decodedBytes, err := base64.StdEncoding.DecodeString(*encodedOutput)

0 commit comments

Comments
 (0)