Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type FailOverConfig struct {
// propagates the updated topology. Requires kvrocks to support node status
// modification (new versions only). Defaults to false for backward compatibility.
EnableSlaveHAUpdate bool `yaml:"enable_slave_ha_update"`
WaitForSync bool `yaml:"wait_for_sync"`
}

type ControllerConfig struct {
Expand Down
22 changes: 14 additions & 8 deletions controller/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type ClusterCheckOptions struct {
pingInterval time.Duration
maxFailureCount int64
enableSlaveHAUpdate bool
failoverOpts store.FailoverOptions
}

type ClusterChecker struct {
Expand Down Expand Up @@ -72,6 +73,7 @@ func NewClusterChecker(s store.Store, ns, cluster string) *ClusterChecker {
options: ClusterCheckOptions{
pingInterval: time.Second * 3,
maxFailureCount: 5,
failoverOpts: store.DefaultFailoverOptions(),
},
failureCounts: make(map[string]int64),
syncCh: make(chan struct{}, 1),
Expand Down Expand Up @@ -110,6 +112,11 @@ func (c *ClusterChecker) WithSlaveHAUpdate(enable bool) *ClusterChecker {
return c
}

func (c *ClusterChecker) WithFailoverOptions(opts store.FailoverOptions) *ClusterChecker {
c.options.failoverOpts = opts
return c
}

func (c *ClusterChecker) probeNode(ctx context.Context, node store.Node) (int64, error) {
clusterInfo, err := node.GetClusterInfo(ctx)
if err != nil {
Expand Down Expand Up @@ -174,20 +181,19 @@ func (c *ClusterChecker) increaseFailureCount(shardIndex int, node store.Node) i
log.Error("Failed to get the cluster info", zap.Error(err))
return count
}
newMasterID, err := cluster.PromoteNewMaster(c.ctx, shardIndex, node.ID(), "")
if err != nil {
log.Error("Failed to promote the new master", zap.Error(err))
_, newMaster, promoteErr := cluster.PromoteNewMaster(c.ctx, shardIndex, node.ID(), "", c.options.failoverOpts)
if promoteErr != nil {
log.Error("Failed to promote the new master", zap.Error(promoteErr))
return count
}
err = c.clusterStore.UpdateCluster(c.ctx, c.namespace, cluster)
if err != nil {
log.Error("Failed to update the cluster", zap.Error(err))
if updateErr := c.clusterStore.UpdateCluster(c.ctx, c.namespace, cluster); updateErr != nil {
log.Error("Failed to persist cluster after promoting new master", zap.Error(updateErr))
return count
}
// the node is normal if it can be elected as the new master,
// because it requires the node is healthy.
c.resetFailureCount(newMasterID)
log.With(zap.String("new_master_id", newMasterID)).Info("Promote the new master")
c.resetFailureCount(newMaster.ID())
log.With(zap.String("new_master_id", newMaster.ID())).Info("Promote the new master")
}
return count
}
Expand Down
6 changes: 5 additions & 1 deletion controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,14 @@ func (c *Controller) addCluster(namespace, clusterName string) {
return
}

failoverOpts := store.DefaultFailoverOptions()
failoverOpts.WaitForSync = c.config.FailOver.WaitForSync

cluster := NewClusterChecker(c.clusterStore, namespace, clusterName).
WithPingInterval(time.Duration(c.config.FailOver.PingIntervalSeconds) * time.Second).
WithMaxFailureCount(c.config.FailOver.MaxPingCount).
WithSlaveHAUpdate(c.config.FailOver.EnableSlaveHAUpdate)
WithSlaveHAUpdate(c.config.FailOver.EnableSlaveHAUpdate).
WithFailoverOptions(failoverOpts)
cluster.Start()

c.mu.Lock()
Expand Down
68 changes: 63 additions & 5 deletions server/api/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ import (
"errors"
"fmt"
"strconv"
"sync"
"time"

"github.com/gin-gonic/gin"
"go.uber.org/zap"

"github.com/apache/kvrocks-controller/consts"
"github.com/apache/kvrocks-controller/logger"
"github.com/apache/kvrocks-controller/server/helper"
"github.com/apache/kvrocks-controller/store"
)
Expand Down Expand Up @@ -114,12 +118,21 @@ func (handler *ShardHandler) Remove(c *gin.Context) {
helper.ResponseNoContent(c)
}

// FailoverOpts holds optional parameters for manual failover.
type FailoverOpts struct {
WaitForSync bool `json:"wait_for_sync"`
ForceOnTimeout bool `json:"force_on_timeout"`
SyncTimeoutMs int `json:"sync_timeout_ms"` // 0 means use default
PauseTimeoutMs int `json:"pause_timeout_ms"` // 0 means use default
}

func (handler *ShardHandler) Failover(c *gin.Context) {
ns := c.Param("namespace")
cluster, _ := c.MustGet(consts.ContextKeyCluster).(*store.Cluster)

var req struct {
PreferredNodeID string `json:"preferred_node_id"`
PreferredNodeID string `json:"preferred_node_id"`
Options *FailoverOpts `json:"options"`
}
if c.Request.Body != nil {
if err := c.ShouldBindJSON(&req); err != nil {
Expand All @@ -131,16 +144,61 @@ func (handler *ShardHandler) Failover(c *gin.Context) {
helper.ResponseBadRequest(c, fmt.Errorf("invalid node id: %s", req.PreferredNodeID))
return
}
// We have checked this if statement in middleware.RequiredClusterShard
shardIndex, _ := strconv.Atoi(c.Param("shard"))
newMasterNodeID, err := cluster.PromoteNewMaster(c, shardIndex, "", req.PreferredNodeID)

opts := store.DefaultFailoverOptions()
if req.Options != nil {
opts.WaitForSync = req.Options.WaitForSync
if req.Options.SyncTimeoutMs > 0 {
opts.SyncTimeout = time.Duration(req.Options.SyncTimeoutMs) * time.Millisecond
}
if req.Options.PauseTimeoutMs > 0 {
opts.PauseDuration = time.Duration(req.Options.PauseTimeoutMs) * time.Millisecond
}
opts.ForceOnTimeout = req.Options.ForceOnTimeout
}

shardIndex, err := strconv.Atoi(c.Param("shard"))
if err != nil {
helper.ResponseBadRequest(c, err)
return
}
oldMaster, newMaster, err := cluster.PromoteNewMaster(c, shardIndex, "", req.PreferredNodeID, opts)
if err != nil {
helper.ResponseError(c, err)
return
}

unpauseOldMaster := func() {
if !opts.WaitForSync {
return
}
if e := oldMaster.UnpauseClient(c); e != nil {
logger.Get().With(zap.Error(e), zap.String("node", oldMaster.Addr())).Error("Failed to unpause old master")
}
}

if err := handler.s.UpdateCluster(c, ns, cluster); err != nil {
unpauseOldMaster()
helper.ResponseError(c, err)
return
}
helper.ResponseOK(c, gin.H{"new_master_id": newMasterNodeID})

var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
if e := oldMaster.SyncClusterInfo(c, cluster); e != nil {
logger.Get().With(zap.Error(e), zap.String("node", oldMaster.Addr())).Warn("Failed to sync cluster info to old master")
}
}()
go func() {
defer wg.Done()
if e := newMaster.SyncClusterInfo(c, cluster); e != nil {
logger.Get().With(zap.Error(e), zap.String("node", newMaster.Addr())).Warn("Failed to sync cluster info to new master")
}
}()
wg.Wait()

unpauseOldMaster()
helper.ResponseOK(c, gin.H{"new_master_id": newMaster.ID()})
}
11 changes: 5 additions & 6 deletions store/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,17 @@ func (cluster *Cluster) RemoveNode(shardIndex int, nodeID string) error {
}

func (cluster *Cluster) PromoteNewMaster(ctx context.Context,
shardIdx int, masterNodeID, preferredNodeID string,
) (string, error) {
shardIdx int, masterNodeID, preferredNodeID string, opts FailoverOptions) (oldMasterNode Node, newMasterNode Node, err error) {
shard, err := cluster.GetShard(shardIdx)
if err != nil {
return "", err
return nil, nil, err
}
newMasterNodeID, err := shard.promoteNewMaster(ctx, masterNodeID, preferredNodeID)
oldMaster, newMaster, err := shard.promoteNewMaster(ctx, masterNodeID, preferredNodeID, opts)
if err != nil {
return "", err
return nil, nil, err
}
cluster.Shards[shardIdx] = shard
return newMasterNodeID, nil
return oldMaster, newMaster, nil
}

func (cluster *Cluster) SyncToNodes(ctx context.Context) error {
Expand Down
39 changes: 37 additions & 2 deletions store/cluster_mock_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@

package store

import "context"
import (
"context"
"time"
)

// ClusterMockNode is a mock implementation of the Node interface,
// it is used for testing purposes.
type ClusterMockNode struct {
*ClusterNode

Sequence uint64
Sequence uint64
MasterReplOffset uint64 // used when simulating master in GetReplicationInfo
SlaveOffset uint64 // used when simulating slave offset in GetReplicationInfo
SlaveAddr string // when master, slave Addr for matching; empty means use mock.Addr()
}

var _ Node = (*ClusterMockNode)(nil)
Expand All @@ -53,3 +59,32 @@ func (mock *ClusterMockNode) SyncClusterInfo(ctx context.Context, cluster *Clust
func (mock *ClusterMockNode) Reset(ctx context.Context) error {
return nil
}

func (mock *ClusterMockNode) PauseClient(ctx context.Context, timeout time.Duration) error {
return nil
}

func (mock *ClusterMockNode) UnpauseClient(ctx context.Context) error {
return nil
}

func (mock *ClusterMockNode) GetReplicationInfo(ctx context.Context) (*ReplicationInfo, error) {
if mock.IsMaster() {
addr := mock.SlaveAddr
if addr == "" {
addr = mock.Addr()
}
return &ReplicationInfo{
Role: RoleMaster,
MasterReplOffset: mock.MasterReplOffset,
Slaves: []SlaveReplInfo{
{Addr: addr, Offset: mock.SlaveOffset},
},
}, nil
}
return &ReplicationInfo{
Role: RoleSlave,
MasterReplOffset: mock.SlaveOffset,
SlaveReplOffset: mock.SlaveOffset,
}, nil
}
112 changes: 112 additions & 0 deletions store/cluster_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ type Node interface {
CheckClusterMode(ctx context.Context) (int64, error)
MigrateSlot(ctx context.Context, slot SlotRange, NodeID string) error

PauseClient(ctx context.Context, timeout time.Duration) error
UnpauseClient(ctx context.Context) error
GetReplicationInfo(ctx context.Context) (*ReplicationInfo, error)

MarshalJSON() ([]byte, error)
UnmarshalJSON(data []byte) error

Expand All @@ -114,6 +118,36 @@ type ClusterNodeInfo struct {
Role string `json:"role"`
}

// ReplicationInfo holds parsed output from INFO replication.
type ReplicationInfo struct {
Role string
MasterReplOffset uint64
// SlaveReplOffset is the replica's local applied offset (INFO field slave_repl_offset); only set when role is slave.
SlaveReplOffset uint64
// MasterLinkStatus is the replica's master_link_status (e.g. up/down); empty if absent.
MasterLinkStatus string
Slaves []SlaveReplInfo
}

// ReplicaAppliedReplOffset returns the replication offset on a node that should be compared against
// the old master's MasterReplOffset to decide whether the replica has caught up. On replicas,
// Kvrocks/Redis expose slave_repl_offset (preferred); if it is missing, MasterReplOffset is used.
func ReplicaAppliedReplOffset(info *ReplicationInfo) uint64 {
if info == nil {
return 0
}
if info.Role == RoleSlave && info.SlaveReplOffset > 0 {
return info.SlaveReplOffset
}
return info.MasterReplOffset
}

// SlaveReplInfo holds slave replication offset from master's perspective.
type SlaveReplInfo struct {
Addr string // "ip:port", matches node.Addr()
Offset uint64
}

func NewClusterNode(addr, password string) *ClusterNode {
return &ClusterNode{
id: util.GenerateNodeID(),
Expand Down Expand Up @@ -305,6 +339,84 @@ func (n *ClusterNode) MigrateSlot(ctx context.Context, slot SlotRange, targetNod
return n.GetClient().Do(ctx, "CLUSTERX", "MIGRATE", slot.String(), targetNodeID).Err()
}

func (n *ClusterNode) PauseClient(ctx context.Context, timeout time.Duration) error {
ms := timeout.Milliseconds()
if ms <= 0 {
ms = 1
}
return n.GetClient().Do(ctx, "CLIENT", "PAUSE", ms, "WRITE").Err()
}

func (n *ClusterNode) UnpauseClient(ctx context.Context) error {
return n.GetClient().Do(ctx, "CLIENT", "UNPAUSE").Err()
}

func (n *ClusterNode) GetReplicationInfo(ctx context.Context) (*ReplicationInfo, error) {
infoStr, err := n.GetClient().Info(ctx, "replication").Result()
if err != nil {
return nil, err
}

info := &ReplicationInfo{}
lines := strings.Split(infoStr, "\r\n")
for _, line := range lines {
fields := strings.SplitN(line, ":", 2)
if len(fields) != 2 {
continue
}
key := strings.TrimSpace(fields[0])
val := strings.TrimSpace(fields[1])

switch key {
case "role":
info.Role = val
case "master_repl_offset":
info.MasterReplOffset, err = strconv.ParseUint(val, 10, 64)
if err != nil {
return nil, err
}
case "slave_repl_offset":
info.SlaveReplOffset, err = strconv.ParseUint(val, 10, 64)
if err != nil {
return nil, err
}
case "master_link_status":
info.MasterLinkStatus = val
default:
if strings.HasPrefix(key, "slave") {
if slave, ok := parseSlaveReplInfo(val); ok {
info.Slaves = append(info.Slaves, slave)
}
}
}
}
return info, nil
}

// parseSlaveReplInfo parses "ip=127.0.0.1,port=6380,state=online,offset=N,lag=M" into SlaveReplInfo.
func parseSlaveReplInfo(val string) (SlaveReplInfo, bool) {
var ip, port string
var offset uint64
for _, part := range strings.Split(val, ",") {
kv := strings.SplitN(part, "=", 2)
if len(kv) != 2 {
continue
}
switch strings.TrimSpace(kv[0]) {
case "ip":
ip = strings.TrimSpace(kv[1])
case "port":
port = strings.TrimSpace(kv[1])
case "offset":
offset, _ = strconv.ParseUint(strings.TrimSpace(kv[1]), 10, 64)
}
}
if ip == "" || port == "" {
return SlaveReplInfo{}, false
}
return SlaveReplInfo{Addr: ip + ":" + port, Offset: offset}, true
}

func (n *ClusterNode) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]interface{}{
"id": n.id,
Expand Down
Loading
Loading