Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
28f01f4
docs: add admin UI and key visualizer design
bootjp Apr 19, 2026
07b6ad0
docs(admin-ui): address review feedback on sampler concurrency and sc…
bootjp Apr 19, 2026
7a8dbae
docs: address keyviz review feedback
bootjp Apr 21, 2026
8766865
Merge branch 'main' into feat/admin_ui_key_visualizer
bootjp Apr 21, 2026
d6ba24a
docs: address keyviz follow-up review
bootjp Apr 21, 2026
11af064
docs: clarify keyviz lineage retention
bootjp Apr 21, 2026
6a88d26
docs: address keyviz review comments on persistence and isolation
bootjp Apr 21, 2026
cdabf38
docs: address second-round keyviz review
bootjp Apr 21, 2026
ddd5039
docs: address coderabbit comments on leader HLC, lineageID, leadershi…
bootjp Apr 21, 2026
763fd3a
feat(admin): phase 0 skeleton for elastickv-admin and Admin gRPC service
bootjp Apr 21, 2026
498a370
fix(admin): harden phase 0 elastickv-admin after review
bootjp Apr 22, 2026
8bd34d9
fix(admin): address phase 0 round-2 gemini review
bootjp Apr 22, 2026
ffc21a0
docs,fix(admin): address round-3 reviews
bootjp Apr 22, 2026
1ffd21a
fix(admin),docs: address round-4 review
bootjp Apr 22, 2026
65c75b3
fix(admin): decouple membership singleflight from caller ctx
bootjp Apr 22, 2026
4d16c85
fix(admin): guard fanout clients map across Close
bootjp Apr 22, 2026
52ae3e2
fix(admin): bound admin binary resources
bootjp Apr 22, 2026
68d7842
Merge branch 'main' into feat/admin_ui_key_visualizer
bootjp Apr 22, 2026
a3e0dc3
feat(admin),fix(admin): wire AdminServer into main and split ctx budgets
bootjp Apr 22, 2026
4a3a49f
fix(admin): plaintext default + populate admin members
bootjp Apr 22, 2026
ac8d6c3
fix(admin): evict seed entries when cache has no non-seed victim
bootjp Apr 23, 2026
3356739
Merge remote-tracking branch 'origin/main' into feat/admin_ui_key_vis…
bootjp Apr 23, 2026
f2935fc
fix(admin),docs: round-11 review + main merge
bootjp Apr 23, 2026
a126e71
Merge branch 'main' into feat/admin_ui_key_visualizer
bootjp Apr 23, 2026
09a38ed
fix(admin),docs: address CodeRabbit round-2 on a126e71a
bootjp Apr 23, 2026
a3c88b4
fix(admin),refactor: address codex+gemini review on 09a38edf
bootjp Apr 23, 2026
1bd4725
fix(admin): chain admin interceptors centrally in startRaftServers
bootjp Apr 23, 2026
e1f0e53
feat(admin): dynamic membership + unknown-contact sentinel
bootjp Apr 23, 2026
37a0435
fix(admin): live Raft config wins over stale bootstrap seed in overview
bootjp Apr 24, 2026
5efc655
fix(admin): deterministic self-addr + duplicate-ID tie-break
bootjp Apr 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
345 changes: 345 additions & 0 deletions adapter/admin_grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,345 @@
package adapter

import (
"context"
"crypto/subtle"
"sort"
"strings"
"sync"
"time"

"github.com/bootjp/elastickv/internal/raftengine"
pb "github.com/bootjp/elastickv/proto"
"github.com/cockroachdb/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

// AdminGroup exposes per-Raft-group state to the Admin service. It is a narrow
// subset of raftengine.Engine so tests can supply an in-memory fake without
// standing up a real Raft cluster. Configuration is polled on each
// GetClusterOverview to pick up scale-out / scale-in events without the
// operator having to restart the admin binary.
type AdminGroup interface {
Status() raftengine.Status
Configuration(ctx context.Context) (raftengine.Configuration, error)
}

// NodeIdentity is the value form of the protobuf NodeIdentity message used for
// AdminServer configuration. It avoids copying pb.NodeIdentity, which embeds a
// protoimpl.MessageState (and a mutex).
type NodeIdentity struct {
NodeID string
GRPCAddress string
}

func (n NodeIdentity) toProto() *pb.NodeIdentity {
return &pb.NodeIdentity{NodeId: n.NodeID, GrpcAddress: n.GRPCAddress}
}

// AdminServer implements the node-side Admin gRPC service described in
// docs/admin_ui_key_visualizer_design.md §4 (Layer A). Phase 0 only implements
// GetClusterOverview and GetRaftGroups; remaining RPCs return Unimplemented so
// the generated client can still compile against older nodes during rollout.
type AdminServer struct {
self NodeIdentity
members []NodeIdentity

groupsMu sync.RWMutex
groups map[uint64]AdminGroup

// now is the clock used for LastContactUnixMs and any other
// timestamping this service needs. It's a per-server field (not a
// package global) so `-race` tests that swap the clock on one server
// instance cannot contend with concurrent RPCs on another instance.
now func() time.Time

pb.UnimplementedAdminServer
}

// NewAdminServer constructs an AdminServer. `self` identifies the local node
// for responses that return node identity. `members` is the static membership
// snapshot shipped to the admin binary; callers that already have a membership
// source may pass nil and let the admin binary's fan-out layer discover peers
// by other means.
func NewAdminServer(self NodeIdentity, members []NodeIdentity) *AdminServer {
cloned := append([]NodeIdentity(nil), members...)
return &AdminServer{
self: self,
members: cloned,
groups: make(map[uint64]AdminGroup),
now: time.Now,
}
}

// SetClock overrides the clock used by GetRaftGroups, letting tests inject a
// fixed time without mutating any package-global state. Concurrent RPCs on
// other AdminServer instances are unaffected.
func (s *AdminServer) SetClock(now func() time.Time) {
if now == nil {
now = time.Now
}
s.groupsMu.Lock()
s.now = now
s.groupsMu.Unlock()
}

// RegisterGroup binds a Raft group ID to its engine so the Admin service can
// report leader and log state for that group.
func (s *AdminServer) RegisterGroup(groupID uint64, g AdminGroup) {
if g == nil {
return
}
s.groupsMu.Lock()
s.groups[groupID] = g
s.groupsMu.Unlock()
}

// GetClusterOverview returns the local node identity, the current member
// list, and per-group leader identity collected from the engines registered
// via RegisterGroup. The member list is the union of (a) the bootstrap seed
// supplied to NewAdminServer and (b) the live Configuration of every
// registered Raft group — the latter picks up scale-out nodes added after
// startup so the admin binary's fan-out discovery does not miss them.
func (s *AdminServer) GetClusterOverview(
ctx context.Context,
_ *pb.GetClusterOverviewRequest,
) (*pb.GetClusterOverviewResponse, error) {
leaders := s.snapshotLeaders()
members := s.snapshotMembers(ctx)
return &pb.GetClusterOverviewResponse{
Self: s.self.toProto(),
Members: members,
GroupLeaders: leaders,
}, nil
}

// snapshotMembers unions the seed members with the live Configuration of each
// registered group, preferring the live address when the same NodeID appears
// in both sources. A stale bootstrap entry cannot outvote a readdressed node:
// if n2 was moved from 10.0.0.12 to 10.0.0.22, the overview reports the
// current 10.0.0.22 so fan-out dials the right target. Configuration errors
// on a single group do not fail the RPC — other groups plus the seed list
// still produce useful output.
func (s *AdminServer) snapshotMembers(ctx context.Context) []*pb.NodeIdentity {
groups := s.cloneGroupsSorted()
addrByID, order := collectLiveMembers(ctx, groups, s.self.NodeID)
mergeSeedMembers(s.members, s.self.NodeID, addrByID, &order)

out := make([]*pb.NodeIdentity, 0, len(order))
for _, id := range order {
out = append(out, &pb.NodeIdentity{NodeId: id, GrpcAddress: addrByID[id]})
}
return out
}

// groupEntry pairs a Raft group ID with its AdminGroup so callers can iterate
// in a deterministic (ID-ascending) order. Sorting matters for
// collectLiveMembers: when two groups report the same NodeID with different
// addresses (e.g., mid-readdress), the iteration order picks which address
// wins, and a Go map's range order is unspecified.
type groupEntry struct {
id uint64
group AdminGroup
}

// cloneGroupsSorted snapshots the registered groups under the read lock and
// returns them sorted by group ID so iteration and tie-break decisions are
// stable across calls.
func (s *AdminServer) cloneGroupsSorted() []groupEntry {
s.groupsMu.RLock()
defer s.groupsMu.RUnlock()
out := make([]groupEntry, 0, len(s.groups))
for id, g := range s.groups {
out = append(out, groupEntry{id: id, group: g})
}
sort.Slice(out, func(i, j int) bool { return out[i].id < out[j].id })
return out
}

// collectLiveMembers polls Configuration for each group (in ascending group
// ID order supplied by the caller) and returns the union of server IDs
// (excluding self) with their live addresses. When two groups report the
// same server ID with different addresses — e.g. mid-readdress before every
// group has converged — the lowest-ID group wins, which is stable across
// calls and matches "trust the primary group" intuition.
func collectLiveMembers(
ctx context.Context,
groups []groupEntry,
selfID string,
) (addrByID map[string]string, order []string) {
addrByID = make(map[string]string)
order = make([]string, 0)
for _, entry := range groups {
cfg, err := entry.group.Configuration(ctx)
if err != nil {
continue
}
for _, srv := range cfg.Servers {
if srv.ID == "" || srv.ID == selfID {
continue
}
if _, dup := addrByID[srv.ID]; dup {
continue
Comment on lines +184 to +185
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reconcile duplicate live member IDs deterministically

When two raft groups report the same srv.ID with different addresses (common during readdress/rolling config convergence), this code keeps whichever address is seen first and drops the rest. Because group iteration originates from a map snapshot, first-seen order is not guaranteed, so the selected address can flap between calls or stick to a stale endpoint, making admin fan-out intermittently target dead nodes.

Useful? React with 👍 / 👎.

}
addrByID[srv.ID] = srv.Address
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Ignore live members with empty gRPC addresses

collectLiveMembers currently treats the first seen NodeID as authoritative even if srv.Address is empty (addrByID[srv.ID] = srv.Address), and then mergeSeedMembers refuses to backfill that ID from seeds. This can drop reachable peers from discovery because the etcd engine can emit configuration entries with Address: "" when peer metadata is missing (internal/raftengine/etcd/engine.go), so GetClusterOverview returns blank grpc_address and cmd/elastickv-admin will trim that entry and stop fanning out to that node. Skipping empty live addresses (or letting seeds overwrite blank live values) avoids this regression.

Useful? React with 👍 / 👎.

order = append(order, srv.ID)
}
}
return addrByID, order
}

// mergeSeedMembers fills in seed entries for IDs no live Configuration
// reported. Seeds never overwrite a live address.
func mergeSeedMembers(
seeds []NodeIdentity,
selfID string,
addrByID map[string]string,
order *[]string,
) {
for _, m := range seeds {
if m.NodeID == "" || m.NodeID == selfID {
continue
}
if _, known := addrByID[m.NodeID]; known {
continue
}
addrByID[m.NodeID] = m.GRPCAddress
*order = append(*order, m.NodeID)
}
}

// GetRaftGroups returns per-group state snapshots. Phase 0 wires commit/applied
// indices only; per-follower contact and term history land in later phases.
func (s *AdminServer) GetRaftGroups(
_ context.Context,
_ *pb.GetRaftGroupsRequest,
) (*pb.GetRaftGroupsResponse, error) {
s.groupsMu.RLock()
defer s.groupsMu.RUnlock()
ids := sortedGroupIDs(s.groups)
out := make([]*pb.RaftGroupState, 0, len(ids))
now := s.now()
for _, id := range ids {
st := s.groups[id].Status()
// Translate LastContact (duration since the last contact with the
// leader, per raftengine.Status) into an absolute unix-ms so UI
// clients can diff against their own clock instead of having to
// reason about the server's uptime. The etcd engine returns a
// sentinel negative duration when contact is unknown (e.g., a
// follower that has never heard from a leader). Report that case
// as `LastContactUnixMs=0` (epoch) so the UI can render "unknown"
// / "never contacted" rather than treating it as "freshly
// contacted just now".
var lastContactUnixMs int64
if st.LastContact >= 0 {
lastContactUnixMs = now.Add(-st.LastContact).UnixMilli()
}
out = append(out, &pb.RaftGroupState{
RaftGroupId: id,
LeaderNodeId: st.Leader.ID,
LeaderTerm: st.Term,
CommitIndex: st.CommitIndex,
AppliedIndex: st.AppliedIndex,
LastContactUnixMs: lastContactUnixMs,
})
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
return &pb.GetRaftGroupsResponse{Groups: out}, nil
}

func (s *AdminServer) snapshotLeaders() []*pb.GroupLeader {
s.groupsMu.RLock()
defer s.groupsMu.RUnlock()
ids := sortedGroupIDs(s.groups)
out := make([]*pb.GroupLeader, 0, len(ids))
for _, id := range ids {
st := s.groups[id].Status()
out = append(out, &pb.GroupLeader{
RaftGroupId: id,
LeaderNodeId: st.Leader.ID,
LeaderTerm: st.Term,
})
}
return out
}

// sortedGroupIDs returns the map's keys in ascending order so Admin responses
// are deterministic across calls — admin tooling and tests both rely on stable
// ordering.
func sortedGroupIDs(m map[uint64]AdminGroup) []uint64 {
ids := make([]uint64, 0, len(m))
for id := range m {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
return ids
}

// adminMethodPrefix is "/Admin/" today but is derived from the generated
// service descriptor so a future proto package declaration (which would
// package-qualify the service name) does not silently bypass the auth gate.
var adminMethodPrefix = "/" + pb.Admin_ServiceDesc.ServiceName + "/"

// AdminTokenAuth builds a gRPC unary+stream interceptor pair enforcing
// "authorization: Bearer <token>" metadata against the supplied token. An
// empty token disables enforcement; callers should pair that mode with a
// --adminInsecureNoAuth flag so operators knowingly opt in.
func AdminTokenAuth(token string) (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor) {
if token == "" {
return nil, nil
}
expected := []byte(token)
check := func(ctx context.Context) error {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return status.Error(codes.Unauthenticated, "missing authorization metadata")
}
values := md.Get("authorization")
if len(values) == 0 {
return status.Error(codes.Unauthenticated, "missing authorization header")
}
got, ok := strings.CutPrefix(values[0], "Bearer ")
if !ok {
return status.Error(codes.Unauthenticated, "authorization is not a bearer token")
}
if subtle.ConstantTimeCompare([]byte(got), expected) != 1 {
return status.Error(codes.Unauthenticated, "invalid admin token")
}
return nil
}
unary := func(
ctx context.Context,
req any,
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (any, error) {
if !strings.HasPrefix(info.FullMethod, adminMethodPrefix) {
return handler(ctx, req)
}
if err := check(ctx); err != nil {
return nil, err
}
return handler(ctx, req)
}
stream := func(
srv any,
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
if !strings.HasPrefix(info.FullMethod, adminMethodPrefix) {
return handler(srv, ss)
}
if err := check(ss.Context()); err != nil {
return err
}
return handler(srv, ss)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return unary, stream
}

// ErrAdminTokenRequired is returned by NewAdminServer helpers when the operator
// failed to supply a token and also did not opt into insecure mode.
var ErrAdminTokenRequired = errors.New("admin token file required; pass --adminInsecureNoAuth to run without")
Loading
Loading