Skip to content

Commit 43463c4

Browse files
committed
Fix MGET aggregation to map individual values to keys across shards
1 parent aea0f18 commit 43463c4

File tree

5 files changed

+159
-94
lines changed

5 files changed

+159
-94
lines changed

command.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4474,7 +4474,7 @@ func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
44744474
rawTips[k] = v
44754475
}
44764476
}
4477-
cmdInfo.CommandPolicy = parseCommandPolicies(rawTips)
4477+
cmdInfo.CommandPolicy = parseCommandPolicies(rawTips, cmdInfo.FirstKeyPos)
44784478

44794479
if err := rd.DiscardNext(); err != nil {
44804480
return err
@@ -4573,9 +4573,12 @@ func (c *cmdsInfoCache) Refresh() {
45734573
const requestPolicy = "request_policy"
45744574
const responsePolicy = "response_policy"
45754575

4576-
func parseCommandPolicies(commandInfoTips map[string]string) *routing.CommandPolicy {
4576+
func parseCommandPolicies(commandInfoTips map[string]string, firstKeyPos int8) *routing.CommandPolicy {
45774577
req := routing.ReqDefault
4578-
resp := routing.RespAllSucceeded
4578+
resp := routing.RespDefaultKeyless
4579+
if firstKeyPos > 0 {
4580+
resp = routing.RespDefaultHashSlot
4581+
}
45794582

45804583
tips := make(map[string]string, len(commandInfoTips))
45814584
for k, v := range commandInfoTips {

internal/routing/aggregator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ type AggregatorResErr struct {
4444
func NewResponseAggregator(policy ResponsePolicy, cmdName string) ResponseAggregator {
4545
switch policy {
4646
case RespDefaultKeyless:
47-
return &DefaultKeylessAggregator{}
47+
return &DefaultKeylessAggregator{results: make([]interface{}, 0)}
4848
case RespDefaultHashSlot:
49-
return &DefaultKeyedAggregator{}
49+
return &DefaultKeyedAggregator{results: make(map[string]interface{})}
5050
case RespAllSucceeded:
5151
return &AllSucceededAggregator{}
5252
case RespOneSucceeded:

osscluster.go

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@ type ClusterOptions struct {
121121

122122
TLSConfig *tls.Config
123123

124+
// DisableRoutingPolicies disables the request/response policy routing system.
125+
// When disabled, all commands use the legacy routing behavior.
126+
// Experimental. Will be removed when shard picker is fully implemented.
127+
DisableRoutingPolicies bool
128+
124129
// DisableIndentity - Disable set-lib on connect.
125130
//
126131
// default: false
@@ -939,6 +944,29 @@ func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) {
939944
return nodes[randomNodes[0]], nil
940945
}
941946

947+
func (c *clusterState) slotShardPickerSlaveNode(slot int, shardPicker routing.ShardPicker) (*clusterNode, error) {
948+
nodes := c.slotNodes(slot)
949+
if len(nodes) == 0 {
950+
return c.nodes.Random()
951+
}
952+
953+
// nodes[0] is master, nodes[1:] are slaves
954+
// First, try all slave nodes for this slot using ShardPicker order
955+
slaves := nodes[1:]
956+
if len(slaves) > 0 {
957+
for i := 0; i < len(slaves); i++ {
958+
idx := shardPicker.Next(len(slaves))
959+
slave := slaves[idx]
960+
if !slave.Failing() && !slave.Loading() {
961+
return slave, nil
962+
}
963+
}
964+
}
965+
966+
// All slaves are failing or loading - return master
967+
return nodes[0], nil
968+
}
969+
942970
func (c *clusterState) slotNodes(slot int) []*clusterNode {
943971
i := sort.Search(len(c.slots), func(i int) bool {
944972
return c.slots[i].end >= slot
@@ -1096,7 +1124,11 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
10961124

10971125
if node == nil {
10981126
var err error
1099-
node, err = c.cmdNode(ctx, cmd.Name(), slot)
1127+
if !c.opt.DisableRoutingPolicies && c.opt.ShardPicker != nil {
1128+
node, err = c.cmdNodeWithShardPicker(ctx, cmd.Name(), slot, c.opt.ShardPicker)
1129+
} else {
1130+
node, err = c.cmdNode(ctx, cmd.Name(), slot)
1131+
}
11001132
if err != nil {
11011133
return err
11021134
}
@@ -1109,8 +1141,11 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
11091141
_ = pipe.Process(ctx, cmd)
11101142
_, lastErr = pipe.Exec(ctx)
11111143
} else {
1112-
// Execute the command on the selected node
1113-
lastErr = c.routeAndRun(ctx, cmd, node)
1144+
if !c.opt.DisableRoutingPolicies {
1145+
lastErr = c.routeAndRun(ctx, cmd, node)
1146+
} else {
1147+
lastErr = node.Client.Process(ctx, cmd)
1148+
}
11141149
}
11151150

11161151
// If there is no error - we are done.
@@ -2098,13 +2133,33 @@ func (c *ClusterClient) cmdNode(
20982133
return nil, err
20992134
}
21002135

2136+
if c.opt.ReadOnly {
2137+
cmdInfo := c.cmdInfo(ctx, cmdName)
2138+
if cmdInfo != nil && cmdInfo.ReadOnly {
2139+
return c.slotReadOnlyNode(state, slot)
2140+
}
2141+
}
2142+
return state.slotMasterNode(slot)
2143+
}
2144+
2145+
func (c *ClusterClient) cmdNodeWithShardPicker(
2146+
ctx context.Context,
2147+
cmdName string,
2148+
slot int,
2149+
shardPicker routing.ShardPicker,
2150+
) (*clusterNode, error) {
2151+
state, err := c.state.Get(ctx)
2152+
if err != nil {
2153+
return nil, err
2154+
}
2155+
21012156
// For keyless commands (slot == -1), use ShardPicker to select a shard
21022157
// This respects the user's configured ShardPicker policy
21032158
if slot == -1 {
21042159
if len(state.Masters) == 0 {
21052160
return nil, errClusterNoNodes
21062161
}
2107-
idx := c.opt.ShardPicker.Next(len(state.Masters))
2162+
idx := shardPicker.Next(len(state.Masters))
21082163
return state.Masters[idx], nil
21092164
}
21102165

@@ -2124,6 +2179,11 @@ func (c *ClusterClient) slotReadOnlyNode(state *clusterState, slot int) (*cluste
21242179
if c.opt.RouteRandomly {
21252180
return state.slotRandomNode(slot)
21262181
}
2182+
2183+
if c.opt.ShardPicker != nil {
2184+
return state.slotShardPickerSlaveNode(slot, c.opt.ShardPicker)
2185+
}
2186+
21272187
return state.slotSlaveNode(slot)
21282188
}
21292189

osscluster_router.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func (c *ClusterClient) executeMultiSlot(ctx context.Context, cmd Cmder, slotMap
138138
go func(slot int, keys []string) {
139139
defer wg.Done()
140140

141-
node, err := c.cmdNode(ctx, cmd.Name(), slot)
141+
node, err := c.cmdNodeWithShardPicker(ctx, cmd.Name(), slot, c.opt.ShardPicker)
142142
if err != nil {
143143
results <- slotResult{nil, keys, err}
144144
return
@@ -280,7 +280,7 @@ func (c *ClusterClient) executeCursorCommand(ctx context.Context, cmd Cmder) err
280280

281281
// Route based on cursor ID to maintain stickiness
282282
slot := hashtag.Slot(cursorID)
283-
node, err := c.cmdNode(ctx, cmd.Name(), slot)
283+
node, err := c.cmdNodeWithShardPicker(ctx, cmd.Name(), slot, c.opt.ShardPicker)
284284
if err != nil {
285285
return err
286286
}
@@ -351,12 +351,29 @@ func (c *ClusterClient) aggregateMultiSlotResults(ctx context.Context, cmd Cmder
351351
firstErr = result.err
352352
}
353353
if result.cmd != nil && result.err == nil {
354-
// For other commands, map each key to the entire result
355354
value, err := ExtractCommandValue(result.cmd)
356-
for _, key := range result.keys {
357-
keyedResults[key] = routing.AggregatorResErr{Result: value, Err: err}
355+
356+
// Check if the result is a slice (e.g., from MGET)
357+
if sliceValue, ok := value.([]interface{}); ok {
358+
// Map each element to its corresponding key
359+
for i, key := range result.keys {
360+
if i < len(sliceValue) {
361+
keyedResults[key] = routing.AggregatorResErr{Result: sliceValue[i], Err: err}
362+
} else {
363+
keyedResults[key] = routing.AggregatorResErr{Result: nil, Err: err}
364+
}
365+
}
366+
} else {
367+
// For non-slice results, map the entire result to each key
368+
for _, key := range result.keys {
369+
keyedResults[key] = routing.AggregatorResErr{Result: value, Err: err}
370+
}
358371
}
359372
}
373+
374+
if result.err != nil {
375+
firstErr = result.err
376+
}
360377
}
361378

362379
return c.aggregateKeyedValues(cmd, keyedResults, keyOrder, policy)

0 commit comments

Comments
 (0)