@@ -312,12 +312,23 @@ func (c *ClusterClient) executeParallel(ctx context.Context, cmd Cmder, nodes []
312312 close (results )
313313 }()
314314
315- // Collect results
315+ // Collect results and check for errors
316316 cmds := make ([]Cmder , 0 , len (nodes ))
317+ var firstErr error
318+
317319 for result := range results {
320+ if result .err != nil && firstErr == nil {
321+ firstErr = result .err
322+ }
318323 cmds = append (cmds , result .cmd )
319324 }
320325
326+ // If there was an error and no policy specified, fail fast
327+ if firstErr != nil && (policy == nil || policy .Response == routing .RespDefaultKeyless ) {
328+ cmd .SetErr (firstErr )
329+ return firstErr
330+ }
331+
321332 return c .aggregateResponses (cmd , cmds , policy )
322333}
323334
@@ -342,11 +353,11 @@ func (c *ClusterClient) aggregateMultiSlotResults(ctx context.Context, cmd Cmder
342353 return firstErr
343354 }
344355
345- return c .aggregateKeyedResponses (ctx , cmd , keyedResults , keyOrder , policy )
356+ return c .aggregateKeyedResponses (cmd , keyedResults , keyOrder , policy )
346357}
347358
348359// aggregateKeyedResponses aggregates responses while preserving key order
349- func (c * ClusterClient ) aggregateKeyedResponses (ctx context. Context , cmd Cmder , keyedResults map [string ]Cmder , keyOrder []string , policy * routing.CommandPolicy ) error {
360+ func (c * ClusterClient ) aggregateKeyedResponses (cmd Cmder , keyedResults map [string ]Cmder , keyOrder []string , policy * routing.CommandPolicy ) error {
350361 if len (keyedResults ) == 0 {
351362 return fmt .Errorf ("redis: no results to aggregate" )
352363 }
0 commit comments