diff --git a/langgraph/checkpoint/redis/aio.py b/langgraph/checkpoint/redis/aio.py index f8e87d3..9ad1d73 100644 --- a/langgraph/checkpoint/redis/aio.py +++ b/langgraph/checkpoint/redis/aio.py @@ -1724,7 +1724,7 @@ async def _abatch_load_pending_sends( return_fields=[ "checkpoint_id", "type", - "blob", + "$.blob", "task_path", "task_id", "idx", @@ -1745,20 +1745,27 @@ async def _abatch_load_pending_sends( # Sort and format results for each parent checkpoint for parent_checkpoint_id in parent_checkpoint_ids: batch_key = (thread_id, checkpoint_ns, parent_checkpoint_id) - docs = writes_by_checkpoint.get(parent_checkpoint_id, []) - - # Sort for deterministic order - sorted_docs = sorted( - docs, - key=lambda d: ( - getattr(d, "task_path", ""), - getattr(d, "task_id", ""), - getattr(d, "idx", 0), + writes = writes_by_checkpoint.get(parent_checkpoint_id, []) + + # Sort results by task_path, task_id, idx + sorted_writes = sorted( + writes, + key=lambda x: ( + getattr(x, "task_path", ""), + getattr(x, "task_id", ""), + getattr(x, "idx", 0), ), ) - # Convert to expected format - results_map[batch_key] = [(d.type, d.blob) for d in sorted_docs] + # Extract type and blob pairs + # Handle both direct attribute access and JSON path access + results_map[batch_key] = [ + ( + getattr(doc, "type", ""), + getattr(doc, "$.blob", getattr(doc, "blob", b"")), + ) + for doc in sorted_writes + ] return results_map