Skip to content

Commit 7eb3818

Browse files
committed
remove thread debugging code && reject commands with policy that cannot be used in pipeline
1 parent 43fcc67 commit 7eb3818

File tree

3 files changed

+95
-173
lines changed

3 files changed

+95
-173
lines changed

internal/routing/policy.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,3 +129,7 @@ type CommandPolicy struct {
129129
// e.g nondeterministic_output, nondeterministic_output_order.
130130
Tips map[string]string
131131
}
132+
133+
func (p *CommandPolicy) CanBeUsedInPipeline() bool {
134+
return p.Request != ReqAllNodes && p.Request != ReqAllShards && p.Request != ReqMultiShard
135+
}

osscluster.go

Lines changed: 90 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -933,9 +933,6 @@ type ClusterClient struct {
933933
// NewClusterClient returns a Redis Cluster client as described in
934934
// http://redis.io/topics/cluster-spec.
935935
func NewClusterClient(opt *ClusterOptions) *ClusterClient {
936-
if opt == nil {
937-
panic("redis: NewClusterClient nil options")
938-
}
939936
opt.init()
940937

941938
c := &ClusterClient{
@@ -1293,23 +1290,11 @@ func (c *ClusterClient) Pipelined(ctx context.Context, fn func(Pipeliner) error)
12931290
}
12941291

12951292
func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error {
1296-
// Separate commands into those that can be batched vs those that need individual routing
1297-
batchableCmds := make([]Cmder, 0)
1298-
individualCmds := make([]Cmder, 0)
1293+
cmdsMap := newCmdsMap()
12991294

1300-
for _, cmd := range cmds {
1301-
policy := c.getCommandPolicy(ctx, cmd)
1302-
1303-
// Commands that need special routing should be handled individually
1304-
if policy != nil && (policy.Request == routing.ReqAllNodes ||
1305-
policy.Request == routing.ReqAllShards ||
1306-
policy.Request == routing.ReqMultiShard ||
1307-
policy.Request == routing.ReqSpecial) {
1308-
individualCmds = append(individualCmds, cmd)
1309-
} else {
1310-
// Single-node commands can be batched
1311-
batchableCmds = append(batchableCmds, cmd)
1312-
}
1295+
if err := c.mapCmdsByNode(ctx, cmdsMap, cmds); err != nil {
1296+
setCmdsErr(cmds, err)
1297+
return err
13131298
}
13141299

13151300
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
@@ -1320,68 +1305,74 @@ func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error
13201305
}
13211306
}
13221307

1323-
var allSucceeded = true
1324-
var failedBatchableCmds []Cmder
1325-
var failedIndividualCmds []Cmder
1308+
failedCmds := newCmdsMap()
1309+
var wg sync.WaitGroup
13261310

1327-
// Handle individual commands using existing router
1328-
for _, cmd := range individualCmds {
1329-
if err := c.routeAndRun(ctx, cmd, nil); err != nil {
1330-
allSucceeded = false
1331-
failedIndividualCmds = append(failedIndividualCmds, cmd)
1332-
}
1311+
for node, cmds := range cmdsMap.m {
1312+
wg.Add(1)
1313+
go func(node *clusterNode, cmds []Cmder) {
1314+
defer wg.Done()
1315+
c.processPipelineNode(ctx, node, cmds, failedCmds)
1316+
}(node, cmds)
13331317
}
13341318

1335-
// Handle batchable commands using original pipeline logic
1336-
if len(batchableCmds) > 0 {
1337-
cmdsMap := newCmdsMap()
1319+
wg.Wait()
1320+
if len(failedCmds.m) == 0 {
1321+
break
1322+
}
1323+
cmdsMap = failedCmds
1324+
}
13381325

1339-
if err := c.mapCmdsByNode(ctx, cmdsMap, batchableCmds); err != nil {
1340-
setCmdsErr(batchableCmds, err)
1341-
allSucceeded = false
1342-
failedBatchableCmds = append(failedBatchableCmds, batchableCmds...)
1343-
} else {
1344-
batchFailedCmds := newCmdsMap()
1345-
var wg sync.WaitGroup
1346-
1347-
for node, nodeCmds := range cmdsMap.m {
1348-
wg.Add(1)
1349-
go func(node *clusterNode, nodeCmds []Cmder) {
1350-
defer wg.Done()
1351-
c.processPipelineNode(ctx, node, nodeCmds, batchFailedCmds)
1352-
}(node, nodeCmds)
1353-
}
1326+
return cmdsFirstErr(cmds)
1327+
}
13541328

1355-
wg.Wait()
1329+
func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmds []Cmder) error {
1330+
state, err := c.state.Get(ctx)
1331+
if err != nil {
1332+
return err
1333+
}
13561334

1357-
if len(batchFailedCmds.m) > 0 {
1358-
allSucceeded = false
1359-
for _, nodeCmds := range batchFailedCmds.m {
1360-
failedBatchableCmds = append(failedBatchableCmds, nodeCmds...)
1361-
}
1362-
}
1335+
if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) {
1336+
for _, cmd := range cmds {
1337+
policy := c.getCommandPolicy(ctx, cmd)
1338+
if policy != nil && !policy.CanBeUsedInPipeline() {
1339+
return fmt.Errorf("redis: cannot pipeline command %q with request policy ReqAllNodes/ReqAllShards/ReqMultiShard", cmd.Name())
1340+
}
1341+
slot := c.cmdSlot(ctx, cmd)
1342+
node, err := c.slotReadOnlyNode(state, slot)
1343+
if err != nil {
1344+
return err
13631345
}
1346+
cmdsMap.Add(node, cmd)
13641347
}
1348+
return nil
1349+
}
13651350

1366-
// If all commands succeeded, we're done
1367-
if allSucceeded {
1368-
break
1351+
for _, cmd := range cmds {
1352+
policy := c.getCommandPolicy(ctx, cmd)
1353+
if policy != nil && !policy.CanBeUsedInPipeline() {
1354+
return fmt.Errorf("redis: cannot pipeline command %q with request policy ReqAllNodes/ReqAllShards/ReqMultiShard", cmd.Name())
13691355
}
1370-
1371-
// If this was the last attempt, return the error
1372-
if attempt == c.opt.MaxRedirects {
1373-
break
1356+
slot := c.cmdSlot(ctx, cmd)
1357+
node, err := state.slotMasterNode(slot)
1358+
if err != nil {
1359+
return err
13741360
}
1375-
1376-
// Update command lists for retry - no reclassification needed
1377-
batchableCmds = failedBatchableCmds
1378-
individualCmds = failedIndividualCmds
1361+
cmdsMap.Add(node, cmd)
13791362
}
1363+
return nil
1364+
}
13801365

1381-
return cmdsFirstErr(cmds)
1366+
func (c *ClusterClient) cmdsAreReadOnly(ctx context.Context, cmds []Cmder) bool {
1367+
for _, cmd := range cmds {
1368+
cmdInfo := c.cmdInfo(ctx, cmd.Name())
1369+
if cmdInfo == nil || !cmdInfo.ReadOnly {
1370+
return false
1371+
}
1372+
}
1373+
return true
13821374
}
13831375

1384-
// processPipelineNode handles batched pipeline commands for a single node
13851376
func (c *ClusterClient) processPipelineNode(
13861377
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
13871378
) {
@@ -1391,8 +1382,7 @@ func (c *ClusterClient) processPipelineNode(
13911382
if !isContextError(err) {
13921383
node.MarkAsFailing()
13931384
}
1394-
// Commands are already mapped to this node, just add them as failed
1395-
failedCmds.Add(node, cmds...)
1385+
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
13961386
setCmdsErr(cmds, err)
13971387
return err
13981388
}
@@ -1417,8 +1407,7 @@ func (c *ClusterClient) processPipelineNodeConn(
14171407
node.MarkAsFailing()
14181408
}
14191409
if shouldRetry(err, true) {
1420-
// Commands are already mapped to this node, just add them as failed
1421-
failedCmds.Add(node, cmds...)
1410+
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
14221411
}
14231412
setCmdsErr(cmds, err)
14241413
return err
@@ -1454,63 +1443,21 @@ func (c *ClusterClient) pipelineReadCmds(
14541443

14551444
if !isRedisError(err) {
14561445
if shouldRetry(err, true) {
1457-
// Commands are already mapped to this node, just add them as failed
1458-
failedCmds.Add(node, cmds[i:]...)
1446+
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
14591447
}
14601448
setCmdsErr(cmds[i+1:], err)
14611449
return err
14621450
}
14631451
}
14641452

14651453
if err := cmds[0].Err(); err != nil && shouldRetry(err, true) {
1466-
// Commands are already mapped to this node, just add them as failed
1467-
failedCmds.Add(node, cmds...)
1454+
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
14681455
return err
14691456
}
14701457

14711458
return nil
14721459
}
14731460

1474-
// Legacy functions needed for transaction pipeline processing
1475-
func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmds []Cmder) error {
1476-
state, err := c.state.Get(ctx)
1477-
if err != nil {
1478-
return err
1479-
}
1480-
1481-
if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) {
1482-
for _, cmd := range cmds {
1483-
slot := c.cmdSlot(ctx, cmd)
1484-
node, err := c.slotReadOnlyNode(state, slot)
1485-
if err != nil {
1486-
return err
1487-
}
1488-
cmdsMap.Add(node, cmd)
1489-
}
1490-
return nil
1491-
}
1492-
1493-
for _, cmd := range cmds {
1494-
slot := c.cmdSlot(ctx, cmd)
1495-
node, err := state.slotMasterNode(slot)
1496-
if err != nil {
1497-
return err
1498-
}
1499-
cmdsMap.Add(node, cmd)
1500-
}
1501-
return nil
1502-
}
1503-
1504-
func (c *ClusterClient) cmdsAreReadOnly(ctx context.Context, cmds []Cmder) bool {
1505-
for _, cmd := range cmds {
1506-
cmdInfo := c.cmdInfo(ctx, cmd.Name())
1507-
if cmdInfo == nil || !cmdInfo.ReadOnly {
1508-
return false
1509-
}
1510-
}
1511-
return true
1512-
}
1513-
15141461
func (c *ClusterClient) checkMovedErr(
15151462
ctx context.Context, cmd Cmder, err error, failedCmds *cmdsMap,
15161463
) bool {
@@ -1538,35 +1485,6 @@ func (c *ClusterClient) checkMovedErr(
15381485
panic("not reached")
15391486
}
15401487

1541-
func (c *ClusterClient) cmdsMoved(
1542-
ctx context.Context, cmds []Cmder,
1543-
moved, ask bool,
1544-
addr string,
1545-
failedCmds *cmdsMap,
1546-
) error {
1547-
node, err := c.nodes.GetOrCreate(addr)
1548-
if err != nil {
1549-
return err
1550-
}
1551-
1552-
if moved {
1553-
c.state.LazyReload()
1554-
for _, cmd := range cmds {
1555-
failedCmds.Add(node, cmd)
1556-
}
1557-
return nil
1558-
}
1559-
1560-
if ask {
1561-
for _, cmd := range cmds {
1562-
failedCmds.Add(node, NewCmd(ctx, "asking"), cmd)
1563-
}
1564-
return nil
1565-
}
1566-
1567-
return nil
1568-
}
1569-
15701488
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
15711489
func (c *ClusterClient) TxPipeline() Pipeliner {
15721490
pipe := Pipeline{
@@ -1734,6 +1652,35 @@ func (c *ClusterClient) txPipelineReadQueued(
17341652
return nil
17351653
}
17361654

1655+
func (c *ClusterClient) cmdsMoved(
1656+
ctx context.Context, cmds []Cmder,
1657+
moved, ask bool,
1658+
addr string,
1659+
failedCmds *cmdsMap,
1660+
) error {
1661+
node, err := c.nodes.GetOrCreate(addr)
1662+
if err != nil {
1663+
return err
1664+
}
1665+
1666+
if moved {
1667+
c.state.LazyReload()
1668+
for _, cmd := range cmds {
1669+
failedCmds.Add(node, cmd)
1670+
}
1671+
return nil
1672+
}
1673+
1674+
if ask {
1675+
for _, cmd := range cmds {
1676+
failedCmds.Add(node, NewCmd(ctx, "asking"), cmd)
1677+
}
1678+
return nil
1679+
}
1680+
1681+
return nil
1682+
}
1683+
17371684
func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
17381685
if len(keys) == 0 {
17391686
return fmt.Errorf("redis: Watch requires at least one key")

osscluster_test.go

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@ import (
66
"errors"
77
"fmt"
88
"net"
9-
"os"
10-
"runtime"
11-
"runtime/pprof"
129
"slices"
1310
"strconv"
1411
"strings"
@@ -17,19 +14,10 @@ import (
1714

1815
. "github.com/bsm/ginkgo/v2"
1916
. "github.com/bsm/gomega"
20-
"github.com/fortytw2/leaktest"
2117
"github.com/redis/go-redis/v9"
2218
"github.com/redis/go-redis/v9/internal/hashtag"
2319
)
2420

25-
// leakCleanup holds the per-spec leak check function
26-
var leakCleanup func()
27-
28-
// sanitizeFilename converts spaces and slashes into underscores
29-
func sanitizeFilename(s string) string {
30-
return strings.NewReplacer(" ", "_", "/", "_").Replace(s)
31-
}
32-
3321
type clusterScenario struct {
3422
ports []string
3523
nodeIDs []string
@@ -266,7 +254,7 @@ func slotEqual(s1, s2 redis.ClusterSlot) bool {
266254

267255
// ------------------------------------------------------------------------------
268256

269-
var _ = Describe("ClusterClient", func() {
257+
var _ = FDescribe("ClusterClient", func() {
270258
var failover bool
271259
var opt *redis.ClusterOptions
272260
var client *redis.ClusterClient
@@ -1391,8 +1379,6 @@ var _ = Describe("ClusterClient", func() {
13911379

13921380
Describe("ClusterClient with ClusterSlots with multiple nodes per slot", func() {
13931381
BeforeEach(func() {
1394-
leakCleanup = leaktest.Check(GinkgoT())
1395-
GinkgoWriter.Printf("[DEBUG] goroutines at start: %d\n", runtime.NumGoroutine())
13961382
failover = true
13971383

13981384
opt = redisClusterOptions()
@@ -1442,21 +1428,6 @@ var _ = Describe("ClusterClient", func() {
14421428
})
14431429

14441430
AfterEach(func() {
1445-
leakCleanup()
1446-
1447-
// on failure, write out a full goroutine dump
1448-
if CurrentSpecReport().Failed() {
1449-
fname := fmt.Sprintf("goroutines-%s.txt", sanitizeFilename(CurrentSpecReport().LeafNodeText))
1450-
if f, err := os.Create(fname); err == nil {
1451-
pprof.Lookup("goroutine").WriteTo(f, 2)
1452-
f.Close()
1453-
GinkgoWriter.Printf("[DEBUG] wrote goroutine dump to %s\n", fname)
1454-
} else {
1455-
GinkgoWriter.Printf("[DEBUG] failed to write goroutine dump: %v\n", err)
1456-
}
1457-
}
1458-
1459-
GinkgoWriter.Printf("[DEBUG] goroutines at end: %d\n", runtime.NumGoroutine())
14601431
failover = false
14611432

14621433
err := client.Close()

0 commit comments

Comments
 (0)