Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions base/dcp_client_stream_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
// to the DCPClient's workers to be processed, but performs the following additional functionality:
// - key-based filtering for document-based events (Deletion, Expiration, Mutation)
// - stream End handling, including restart on error
func (dc *DCPClient) SnapshotMarker(snapshotMarker gocbcore.DcpSnapshotMarker) {
func (dc *GoCBDCPClient) SnapshotMarker(snapshotMarker gocbcore.DcpSnapshotMarker) {

e := snapshotEvent{
streamEventCommon: streamEventCommon{
Expand All @@ -30,7 +30,7 @@ func (dc *DCPClient) SnapshotMarker(snapshotMarker gocbcore.DcpSnapshotMarker) {
dc.workerForVbno(snapshotMarker.VbID).Send(dc.ctx, e)
}

func (dc *DCPClient) Mutation(mutation gocbcore.DcpMutation) {
func (dc *GoCBDCPClient) Mutation(mutation gocbcore.DcpMutation) {

if dc.filteredKey(mutation.Key) {
return
Expand All @@ -56,7 +56,7 @@ func (dc *DCPClient) Mutation(mutation gocbcore.DcpMutation) {
dc.workerForVbno(mutation.VbID).Send(dc.ctx, e)
}

func (dc *DCPClient) Deletion(deletion gocbcore.DcpDeletion) {
func (dc *GoCBDCPClient) Deletion(deletion gocbcore.DcpDeletion) {

if dc.filteredKey(deletion.Key) {
return
Expand All @@ -80,7 +80,7 @@ func (dc *DCPClient) Deletion(deletion gocbcore.DcpDeletion) {

}

func (dc *DCPClient) End(end gocbcore.DcpStreamEnd, err error) {
func (dc *GoCBDCPClient) End(end gocbcore.DcpStreamEnd, err error) {

e := endStreamEvent{
streamEventCommon: streamEventCommon{
Expand All @@ -92,41 +92,41 @@ func (dc *DCPClient) End(end gocbcore.DcpStreamEnd, err error) {

}

func (dc *DCPClient) Expiration(expiration gocbcore.DcpExpiration) {
func (dc *GoCBDCPClient) Expiration(expiration gocbcore.DcpExpiration) {
// SG doesn't opt in to expirations, so they'll come through as deletion events
// (cf.https://github.com/couchbase/kv_engine/blob/master/docs/dcp/documentation/expiry-opcode-output.md)
WarnfCtx(dc.ctx, "Unexpected DCP expiration event (vb:%d) for key %v", expiration.VbID, UD(string(expiration.Key)))
}

func (dc *DCPClient) CreateCollection(creation gocbcore.DcpCollectionCreation) {
func (dc *GoCBDCPClient) CreateCollection(creation gocbcore.DcpCollectionCreation) {
// Not used by SG at this time
}

func (dc *DCPClient) DeleteCollection(deletion gocbcore.DcpCollectionDeletion) {
func (dc *GoCBDCPClient) DeleteCollection(deletion gocbcore.DcpCollectionDeletion) {
// Not used by SG at this time
}

func (dc *DCPClient) FlushCollection(flush gocbcore.DcpCollectionFlush) {
func (dc *GoCBDCPClient) FlushCollection(flush gocbcore.DcpCollectionFlush) {
// Not used by SG at this time
}

func (dc *DCPClient) CreateScope(creation gocbcore.DcpScopeCreation) {
func (dc *GoCBDCPClient) CreateScope(creation gocbcore.DcpScopeCreation) {
// Not used by SG at this time
}

func (dc *DCPClient) DeleteScope(deletion gocbcore.DcpScopeDeletion) {
func (dc *GoCBDCPClient) DeleteScope(deletion gocbcore.DcpScopeDeletion) {
// Not used by SG at this time
}

func (dc *DCPClient) ModifyCollection(modification gocbcore.DcpCollectionModification) {
func (dc *GoCBDCPClient) ModifyCollection(modification gocbcore.DcpCollectionModification) {
// Not used by SG at this time
}

func (dc *DCPClient) OSOSnapshot(snapshot gocbcore.DcpOSOSnapshot) {
func (dc *GoCBDCPClient) OSOSnapshot(snapshot gocbcore.DcpOSOSnapshot) {
// Not used by SG at this time
}

func (dc *DCPClient) SeqNoAdvanced(seqNoAdvanced gocbcore.DcpSeqNoAdvanced) {
func (dc *GoCBDCPClient) SeqNoAdvanced(seqNoAdvanced gocbcore.DcpSeqNoAdvanced) {
dc.workerForVbno(seqNoAdvanced.VbID).Send(dc.ctx, seqnoAdvancedEvent{
streamEventCommon: streamEventCommon{
vbID: seqNoAdvanced.VbID,
Expand All @@ -136,6 +136,6 @@ func (dc *DCPClient) SeqNoAdvanced(seqNoAdvanced gocbcore.DcpSeqNoAdvanced) {
})
}

func (dc *DCPClient) filteredKey(key []byte) bool {
func (dc *GoCBDCPClient) filteredKey(key []byte) bool {
return false
}
4 changes: 2 additions & 2 deletions base/dcp_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func TestContinuousDCPRollback(t *testing.T) {

// forceRollbackvBucket forces the rollback of vBucket IDs that are even
// Test helper function. This should not be used elsewhere.
func (dc *DCPClient) forceRollbackvBucket(uuid gocbcore.VbUUID) {
func (dc *GoCBDCPClient) forceRollbackvBucket(uuid gocbcore.VbUUID) {
metadata := make([]DCPMetadata, dc.numVbuckets)
for i := uint16(0); i < dc.numVbuckets; i++ {
// rollback roughly half the vBuckets
Expand All @@ -458,7 +458,7 @@ func TestResumeStoppedFeed(t *testing.T) {

dataStore := bucket.GetSingleDataStore()

var dcpClient *DCPClient
var dcpClient *GoCBDCPClient

// create callback
mutationCount := uint64(0)
Expand Down
56 changes: 29 additions & 27 deletions base/dcp_client.go → base/gocb_dcp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type endStreamCallbackFunc func(e endStreamEvent)

var ErrVbUUIDMismatch = errors.New("VbUUID mismatch when failOnRollback set")

type DCPClient struct {
type GoCBDCPClient struct {
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The struct name has inconsistent casing: 'GoCB' should be 'Gocb' to follow Go naming conventions (GocbDCPClient).

Suggested change
type GoCBDCPClient struct {
type GocbDCPClient struct {

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't consistent with GoCB and Gocb. Most things use GoCB except GocbV2Bucket.

ctx context.Context
ID string // unique ID for DCPClient - used for DCP stream name, must be unique
agent *gocbcore.DCPAgent // SDK DCP agent, manages connections and calls back to DCPClient stream observer implementation
Expand Down Expand Up @@ -81,7 +81,7 @@ type DCPClientOptions struct {
CheckpointPrefix string
}

func NewDCPClient(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket) (*DCPClient, error) {
func NewDCPClient(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket) (*GoCBDCPClient, error) {

numVbuckets, err := bucket.GetMaxVbno()
if err != nil {
Expand All @@ -91,7 +91,7 @@ func NewDCPClient(ctx context.Context, ID string, callback sgbucket.FeedEventCal
return newDCPClientWithForBuckets(ctx, ID, callback, options, bucket, numVbuckets)
}

func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*DCPClient, error) {
func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*GoCBDCPClient, error) {

numWorkers := DefaultNumWorkers
if options.NumWorkers > 0 {
Expand All @@ -106,7 +106,7 @@ func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucke
return nil, fmt.Errorf("callers must specify a checkpoint prefix when persisting metadata")
}
}
client := &DCPClient{
client := &GoCBDCPClient{
ctx: ctx,
workers: make([]*DCPWorker, numWorkers),
numVbuckets: numVbuckets,
Expand Down Expand Up @@ -155,7 +155,7 @@ func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucke
}

// getCollectionHighSeqNo returns the highSeqNo for a given KV collection ID.
func (dc *DCPClient) getCollectionHighSeqNos(collectionID uint32) ([]uint64, error) {
func (dc *GoCBDCPClient) getCollectionHighSeqNos(collectionID uint32) ([]uint64, error) {
vbucketSeqnoOptions := gocbcore.GetVbucketSeqnoOptions{}
if dc.supportsCollections {
vbucketSeqnoOptions.FilterOptions = &gocbcore.GetVbucketSeqnoFilterOptions{CollectionID: collectionID}
Expand Down Expand Up @@ -213,7 +213,7 @@ func (dc *DCPClient) getCollectionHighSeqNos(collectionID uint32) ([]uint64, err
}

// getHighSeqNos returns the maximum sequence number for every collection configured by the DCP agent.
func (dc *DCPClient) getHighSeqNos() ([]uint64, error) {
func (dc *GoCBDCPClient) getHighSeqNos() ([]uint64, error) {
highSeqNos := make([]uint64, dc.numVbuckets)
// Initialize highSeqNo to the current metadata's StartSeqNo - we don't want to use a value lower than what
// we've already processed
Expand All @@ -235,7 +235,7 @@ func (dc *DCPClient) getHighSeqNos() ([]uint64, error) {
}

// configureOneShot sets highSeqnos for a one shot feed.
func (dc *DCPClient) configureOneShot() error {
func (dc *GoCBDCPClient) configureOneShot() error {
highSeqNos, err := dc.getHighSeqNos()
if err != nil {
return err
Expand All @@ -251,7 +251,7 @@ func (dc *DCPClient) configureOneShot() error {
}

// Start returns an error and a channel to indicate when the DCPClient is done. If Start returns an error, DCPClient.Close() needs to be called.
func (dc *DCPClient) Start() (doneChan chan error, err error) {
func (dc *GoCBDCPClient) Start() (doneChan chan error, err error) {
err = dc.initAgent(dc.spec)
if err != nil {
return dc.doneChannel, err
Expand All @@ -274,13 +274,13 @@ func (dc *DCPClient) Start() (doneChan chan error, err error) {
}

// Close is used externally to stop the DCP client. If the client was already closed due to error, returns that error
func (dc *DCPClient) Close() error {
func (dc *GoCBDCPClient) Close() error {
dc.close()
return dc.getCloseError()
}

// GetMetadata returns metadata for all vbuckets
func (dc *DCPClient) GetMetadata() []DCPMetadata {
func (dc *GoCBDCPClient) GetMetadata() []DCPMetadata {
metadata := make([]DCPMetadata, dc.numVbuckets)
for i := uint16(0); i < dc.numVbuckets; i++ {
metadata[i] = dc.metadata.GetMeta(i)
Expand All @@ -290,7 +290,7 @@ func (dc *DCPClient) GetMetadata() []DCPMetadata {

// close is used internally to stop the DCP client. Sends any fatal errors to the client's done channel, and
// closes that channel.
func (dc *DCPClient) close() {
func (dc *GoCBDCPClient) close() {

// set dc.closing to true, avoid re-triggering close if it's already in progress
if !dc.closing.CompareAndSwap(false, true) {
Expand All @@ -316,7 +316,7 @@ func (dc *DCPClient) close() {
}

// getAgentConfig returns a gocbcore.DCPAgentConfig for the given BucketSpec
func (dc *DCPClient) getAgentConfig(spec BucketSpec) (*gocbcore.DCPAgentConfig, error) {
func (dc *GoCBDCPClient) getAgentConfig(spec BucketSpec) (*gocbcore.DCPAgentConfig, error) {
connStr, err := spec.GetGoCBConnStringForDCP()
if err != nil {
return nil, err
Expand Down Expand Up @@ -360,7 +360,7 @@ func (dc *DCPClient) getAgentConfig(spec BucketSpec) (*gocbcore.DCPAgentConfig,
}

// initAgent creates a DCP agent and waits for it to be ready
func (dc *DCPClient) initAgent(spec BucketSpec) error {
func (dc *GoCBDCPClient) initAgent(spec BucketSpec) error {
agentConfig, err := dc.getAgentConfig(spec)
if err != nil {
return err
Expand Down Expand Up @@ -405,13 +405,13 @@ func (dc *DCPClient) initAgent(spec BucketSpec) error {
return nil
}

func (dc *DCPClient) workerForVbno(vbNo uint16) *DCPWorker {
func (dc *GoCBDCPClient) workerForVbno(vbNo uint16) *DCPWorker {
workerIndex := int(vbNo % uint16(len(dc.workers)))
return dc.workers[workerIndex]
}

// startWorkers initializes the DCP workers to receive stream events from eventFeed
func (dc *DCPClient) startWorkers(ctx context.Context) {
func (dc *GoCBDCPClient) startWorkers(ctx context.Context) {

// vbuckets are assigned to workers as vbNo % NumWorkers. Create set of assigned vbuckets
assignedVbs := make(map[int][]uint16)
Expand All @@ -434,7 +434,7 @@ func (dc *DCPClient) startWorkers(ctx context.Context) {
}
}

func (dc *DCPClient) openStream(vbID uint16, maxRetries uint32) error {
func (dc *GoCBDCPClient) openStream(vbID uint16, maxRetries uint32) error {

var openStreamErr error
var attempts uint32
Expand Down Expand Up @@ -488,7 +488,7 @@ func (dc *DCPClient) openStream(vbID uint16, maxRetries uint32) error {
return fmt.Errorf("openStream failed to complete after %d attempts, last error: %w", attempts, openStreamErr)
}

func (dc *DCPClient) rollback(ctx context.Context, vbID uint16, seqNo gocbcore.SeqNo) {
func (dc *GoCBDCPClient) rollback(ctx context.Context, vbID uint16, seqNo gocbcore.SeqNo) {
if dc.dbStats != nil {
dc.dbStats.Add("dcp_rollback_count", 1)
}
Expand All @@ -497,7 +497,7 @@ func (dc *DCPClient) rollback(ctx context.Context, vbID uint16, seqNo gocbcore.S

// openStreamRequest issues the OpenStream request, but doesn't perform any error handling. Callers
// should generally use openStream() for error and retry handling
func (dc *DCPClient) openStreamRequest(vbID uint16) error {
func (dc *GoCBDCPClient) openStreamRequest(vbID uint16) error {

vbMeta := dc.metadata.GetMeta(vbID)

Expand Down Expand Up @@ -548,7 +548,7 @@ func (dc *DCPClient) openStreamRequest(vbID uint16) error {
// verifyFailoverLog checks for VbUUID changes when failOnRollback is set, and
// writes the failover log to the client metadata store. If previous VbUUID is zero, it's
// not considered a rollback - it's not required to initialize vbUUIDs into meta.
func (dc *DCPClient) verifyFailoverLog(vbID uint16, f []gocbcore.FailoverEntry) error {
func (dc *GoCBDCPClient) verifyFailoverLog(vbID uint16, f []gocbcore.FailoverEntry) error {

if dc.failOnRollback {
previousMeta := dc.metadata.GetMeta(vbID)
Expand All @@ -566,7 +566,7 @@ func (dc *DCPClient) verifyFailoverLog(vbID uint16, f []gocbcore.FailoverEntry)
return nil
}

func (dc *DCPClient) deactivateVbucket(vbID uint16) {
func (dc *GoCBDCPClient) deactivateVbucket(vbID uint16) {
dc.activeVbucketLock.Lock()
delete(dc.activeVbuckets, vbID)
activeCount := len(dc.activeVbuckets)
Expand All @@ -580,7 +580,7 @@ func (dc *DCPClient) deactivateVbucket(vbID uint16) {
}
}

func (dc *DCPClient) onStreamEnd(e endStreamEvent) {
func (dc *GoCBDCPClient) onStreamEnd(e endStreamEvent) {
if e.err == nil {
DebugfCtx(dc.ctx, KeyDCP, "Stream (vb:%d) closed, all items streamed", e.vbID)
dc.deactivateVbucket(e.vbID)
Expand Down Expand Up @@ -616,12 +616,12 @@ func (dc *DCPClient) onStreamEnd(e endStreamEvent) {
}(e.vbID, retries)
}

func (dc *DCPClient) fatalError(err error) {
func (dc *GoCBDCPClient) fatalError(err error) {
dc.setCloseError(err)
dc.close()
}

func (dc *DCPClient) setCloseError(err error) {
func (dc *GoCBDCPClient) setCloseError(err error) {
dc.closeErrorLock.Lock()
defer dc.closeErrorLock.Unlock()
// If the DCPClient is already closing, don't update the error. If an initial error triggered the close,
Expand All @@ -635,7 +635,7 @@ func (dc *DCPClient) setCloseError(err error) {
}
}

func (dc *DCPClient) getCloseError() error {
func (dc *GoCBDCPClient) getCloseError() error {
dc.closeErrorLock.Lock()
defer dc.closeErrorLock.Unlock()
return dc.closeError
Expand All @@ -661,16 +661,18 @@ func getLatestVbUUID(failoverLog []gocbcore.FailoverEntry) (vbUUID gocbcore.VbUU
return entry.VbUUID
}

func (dc *DCPClient) GetMetadataKeyPrefix() string {
func (dc *GoCBDCPClient) GetMetadataKeyPrefix() string {
return dc.metadata.GetKeyPrefix()
}

// StartWorkersForTest will iterate through dcp workers to start them, to be used for caching testing purposes only.
func (dc *DCPClient) StartWorkersForTest(t *testing.T) {
func (dc *GoCBDCPClient) StartWorkersForTest(t *testing.T) {
dc.startWorkers(dc.ctx)
}

// NewDCPClientForTest is a test-only function to create a DCP client with a specific number of vbuckets.
func NewDCPClientForTest(ctx context.Context, t *testing.T, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*DCPClient, error) {
func NewDCPClientForTest(ctx context.Context, t *testing.T, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*GoCBDCPClient, error) {
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name 'NewDCPClientForTest' no longer matches the struct type 'GoCBDCPClient'. Consider renaming to 'NewGoCBDCPClientForTest' for consistency.

Copilot uses AI. Check for mistakes.
return newDCPClientWithForBuckets(ctx, ID, callback, options, bucket, numVbuckets)
}

var _ gocbcore.StreamObserver = &GoCBDCPClient{}
4 changes: 2 additions & 2 deletions tools/cache_perf_tool/dcpDataGeneration.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type dcpDataGen struct {
seqAlloc *sequenceAllocator
delays []time.Duration
dbCtx *db.DatabaseContext
client *base.DCPClient
client *base.GoCBDCPClient
numChannelsPerDoc int
numTotalChannels int
simRapidUpdate bool
Expand Down Expand Up @@ -327,7 +327,7 @@ func (dcp *dcpDataGen) mutateWithDedupe(seqs []uint64, chanCount int, casValue u
return encodedVal, chanCount, nil
}

func createDCPClient(t *testing.T, ctx context.Context, bucket *base.GocbV2Bucket, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map, numWorkers, numVBuckets int) (*base.DCPClient, error) {
func createDCPClient(t *testing.T, ctx context.Context, bucket *base.GocbV2Bucket, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map, numWorkers, numVBuckets int) (*base.GoCBDCPClient, error) {
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name 'createDCPClient' no longer matches the struct type 'GoCBDCPClient'. Consider renaming to 'createGoCBDCPClient' for consistency.

Suggested change
func createDCPClient(t *testing.T, ctx context.Context, bucket *base.GocbV2Bucket, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map, numWorkers, numVBuckets int) (*base.GoCBDCPClient, error) {
func createGoCBDCPClient(t *testing.T, ctx context.Context, bucket *base.GocbV2Bucket, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map, numWorkers, numVBuckets int) (*base.GoCBDCPClient, error) {

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't rename this because it is a test tool.

options := base.DCPClientOptions{
MetadataStoreType: base.DCPMetadataStoreInMemory,
GroupID: "",
Expand Down