Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions langgraph/checkpoint/redis/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -1724,7 +1724,7 @@ async def _abatch_load_pending_sends(
return_fields=[
"checkpoint_id",
"type",
"blob",
"$.blob",
"task_path",
"task_id",
"idx",
Expand All @@ -1745,20 +1745,27 @@ async def _abatch_load_pending_sends(
# Sort and format results for each parent checkpoint
for parent_checkpoint_id in parent_checkpoint_ids:
batch_key = (thread_id, checkpoint_ns, parent_checkpoint_id)
docs = writes_by_checkpoint.get(parent_checkpoint_id, [])

# Sort for deterministic order
sorted_docs = sorted(
docs,
key=lambda d: (
getattr(d, "task_path", ""),
getattr(d, "task_id", ""),
getattr(d, "idx", 0),
writes = writes_by_checkpoint.get(parent_checkpoint_id, [])

# Sort results by task_path, task_id, idx
sorted_writes = sorted(
writes,
key=lambda x: (
getattr(x, "task_path", ""),
getattr(x, "task_id", ""),
getattr(x, "idx", 0),
),
)

# Convert to expected format
results_map[batch_key] = [(d.type, d.blob) for d in sorted_docs]
# Extract type and blob pairs
# Handle both direct attribute access and JSON path access
results_map[batch_key] = [
(
getattr(doc, "type", ""),
getattr(doc, "$.blob", getattr(doc, "blob", b"")),
)
for doc in sorted_writes
]

return results_map

Expand Down