Skip to content

Conversation

@ggreer
Copy link
Contributor

@ggreer ggreer commented Dec 16, 2025

Based on #524

Still needs more changes. Probably not worth reviewing right now.

Summary by CodeRabbit

  • New Features

    • Added parallel synchronization for connectors with configurable worker behavior.
    • Introduced per-resource sync bucketing to control bucketed sequential vs parallel processing.
    • Added enhanced progress logging with rate-limited, per-type metrics and thread-safe updates.
  • Refactor

    • Simplified chunked-insert boundary logic.
    • Minor test adjustments and syncer API consolidation.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 16, 2025

Walkthrough

Adds a parallel synchronization subsystem and wiring: new parallel syncer, progress logging, CLI/runner/task-manager flags and options, protobuf sync_bucket field, and plumbing to enable parallel processing across worker pools while retaining sequential paths.

Changes

Cohort / File(s) Summary
CLI & Runner
pkg/cli/commands.go, pkg/connectorrunner/runner.go
Adds --parallel-sync flag and WithParallelSyncEnabled() option; threads parallelSync through runner config and into NewC1TaskManager.
Task Manager & Handlers
pkg/tasks/c1api/manager.go, pkg/tasks/c1api/full_sync.go, pkg/tasks/local/syncer.go
Adds parallelSync field to task manager and full-sync handler; passes flag into full-sync flow; introduces WithParallelSyncEnabled(parallel bool) and conditionally sets worker count when enabled.
Core Sync Engine
pkg/sync/syncer.go, pkg/sync/sequential_syncer.go
Exports former private syncer as SequentialSyncer; introduces sequentialSync and parallelSync orchestration methods; adds WithWorkerCount option and branches to parallel/sequential execution.
Parallel Sync Implementation
pkg/sync/parallel_syncer.go
New large parallel syncer: worker pool, bucketed task queue, task/state/result abstractions, worker lifecycle, retry/addition semantics, and parallel orchestration API.
Progress Logging
pkg/sync/progresslog/progresslog.go
New ProgressLog type with thread-safe APIs, options for logger/mode/frequency, and methods to track/log resource/entitlement/grant progress.
State Machine
pkg/sync/state.go
Adds CollectEntitlementsAndGrantsTasksOp action and updates enum/string mapping.
Configuration & Fields
pkg/field/defaults.go, proto/c1/connector/v2/resource.proto
Adds parallelSync BoolField to defaults; adds string sync_bucket = 7 to ResourceType protobuf for bucket assignment.
Utilities & Minor Edits
pkg/dotc1z/sql_helpers.go, pkg/sync/syncer_test.go, pb/c1/connector/v2/resource.pb.validate.go
Simplifies chunk boundary calc; makes test context cancelable; adds validation comment (no functional change).

Sequence Diagram(s)

sequenceDiagram
    participant CLI as CLI
    participant Cmd as MakeMainCommand
    participant Runner as ConnectorRunner
    participant C1API as C1TaskManager
    participant FullSync as FullSyncHandler
    participant Syncer as NewSyncer / SequentialSyncer
    participant Parallel as ParallelSyncer
    participant Workers as Worker Pool

    CLI->>Cmd: run with --parallel-sync
    Cmd->>Runner: NewConnectorRunner(..., WithParallelSyncEnabled())
    Runner->>C1API: NewC1TaskManager(..., parallelSync=true)
    C1API->>FullSync: newFullSyncTaskHandler(..., parallelSync=true)
    FullSync->>Syncer: NewSyncer(..., WithWorkerCount(10))
    alt parallel enabled
        Syncer->>Parallel: NewParallelSyncerFromSyncer(baseSyncer, config)
        Parallel->>Workers: spawn workers & setup bucket queues
        Parallel->>Parallel: generateInitialTasks()
        loop worker processing
            Workers->>Parallel: fetch task from bucket
            Workers->>Workers: process resource/entitlement/grant
            Workers->>Parallel: add deferred tasks / results
        end
        Parallel->>Syncer: finalize and return
    else sequential
        Syncer->>Syncer: run sequentialSync flow
        Syncer-->>FullSync: return
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested reviewers

  • kans
  • jirwin

Poem

🐰 Hopping through buckets, workers align,
Tasks split and scatter, then neatly combine,
Logs whisper progress in steady delight,
Parallel hops under moon and daylight,
A rabbit cheers: syncs run swift and bright!

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title '(WIP) Parallel syncing' is partially related to the changeset—it references a real aspect of the changes (parallel synchronization infrastructure), but is generic and lacks specificity about the main implementation details. Replace with a more specific title that clearly describes the main feature or implementation. For example: 'Implement parallel task-based synchronization with worker pools and bucketed task queues' or 'Add parallel syncer infrastructure with dynamic bucketing and progress logging'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/sync/syncer_test.go (1)

54-73: Fix invalid range over int in TestExpandGrants

for i := range groupCount { won’t compile because range expects an array/slice/map/channel/string, not an int. This should remain a standard counted loop.

Suggested fix:

-	for i := range groupCount {
+	for i := 0; i < groupCount; i++ {
🧹 Nitpick comments (10)
pkg/field/defaults.go (1)

92-92: Parallel-sync field wiring looks consistent; minor ordering nit only

parallel-sync is defined with Persistent storage and ExportTargetNone, matching how other internal sync-tuning flags (e.g. skip-full-sync) are treated, and adding it to DefaultFields ensures it’s always present in connector configs. If you care about readability, you might consider colocating parallelSync next to skipFullSync in DefaultFields so related sync controls stay grouped.

Also applies to: 308-308

pkg/tasks/c1api/full_sync.go (1)

39-40: Parallel sync wrapping is structured correctly; clean up stale TODO and consider configurability

The new parallelSync flag on fullSyncTaskHandler, plus the baseSyncer/NewParallelSyncer split, integrates parallel execution without changing the surrounding error/Close semantics, which is good. A couple of small follow-ups:

  • The // TODO: enable parallel sync here comment above the new logic is now stale and should be removed or reworded to avoid confusion.
  • DefaultParallelSyncConfig().WithWorkerCount(10) bakes in a fixed worker count; if you expect very different connector sizes, you may eventually want this configurable (e.g., via config or env) rather than hard‑coded.
  • Ensure all newFullSyncTaskHandler call sites now pass the appropriate parallelSync value, so the flag actually reaches this path and, in turn, the parallel syncer implementation that enforces the required WAL checkpointing behavior. Based on learnings, this matches the intended design where parallel sync always enables WAL checkpointing.

Also applies to: 96-111, 213-224

proto/c1/connector/v2/resource.proto (1)

49-53: Clarify “not specified” vs empty string semantics for sync_bucket.

In proto3 a string is always present (default ""), so “not specified” effectively means “empty string”. It would be clearer to phrase the comment as “If empty, the default bucket from ParallelSyncConfig will be used,” to match the actual behavior and avoid ambiguity for connector authors.

pkg/tasks/local/syncer.go (1)

19-30: Hard‑coded worker count for parallel sync limits configurability.

WithParallelSyncEnabled always maps to WithWorkerCount(10). That’s fine for an initial implementation, but you’ll likely want the worker count configurable (CLI flag or config) so different connectors / environments can tune concurrency rather than relying on a fixed 10.

Also applies to: 76-80, 109-124

pkg/connectorrunner/runner.go (1)

320-347: Parallel‑sync flag is consistently wired through runner configuration.

WithParallelSyncEnabled toggles runnerConfig.parallelSync, which is then passed both to local.NewSyncer (via local.WithParallelSyncEnabled) and c1api.NewC1TaskManager. This gives a single source of truth for enabling parallel sync across on‑demand and normal flows; once the worker‑count story stabilizes, you may also want to expose that alongside this flag.

Also applies to: 558-563, 817-826, 838-849

pkg/sync/sequential_syncer.go (1)

15-223: sequentialSync and parallelSync duplicate the same state‑machine logic.

Both methods appear line‑for‑line identical today. That’s fine for a WIP, but it will be fragile long‑term: any bugfix or behavior tweak will have to be applied twice. Consider extracting the core loop into a shared helper (e.g., runSync(mode, ...)) so sequentialSync/parallelSync only choose mode‑specific options while sharing the implementation.

Also applies to: 225-433

pkg/sync/progresslog/progresslog.go (1)

15-21: Avoid runtime mutex swapping in SetSequentialMode; prefer construction‑time config.

SetSequentialMode replaces p.mu with a different implementation at runtime. That’s safe only if no other goroutine is concurrently using the logger, which is easy to violate once this type is used more broadly. You already have WithSequentialMode for construction‑time configuration; leaning on that and deprecating/removing SetSequentialMode (as the TODO suggests) would make the API safer and simpler.

Also applies to: 51-60, 257-265

pkg/sync/syncer.go (1)

55-86: WithWorkerCount currently only toggles between two sequential code paths.

Sync branches on workerCount == 0 to call sequentialSync vs parallelSync, but those methods are effectively identical and still run on a single goroutine. The separate worker‑pool implementation lives in parallel_syncer.go and isn’t wired in here yet, so WithWorkerCount doesn’t produce true parallelism today. Before shipping the --parallel-sync flag, you’ll likely want to either:

  • Have NewSyncer wrap the base SequentialSyncer in a parallelSyncer when workerCount > 0, or
  • Move the worker‑pool logic behind this workerCount branch instead of duplicating the current sequential loop.

Also applies to: 197-337, 2687-2691

pkg/sync/parallel_syncer.go (2)

1861-1911: ProgressLog updates for entitlements/grants count pages, not completed resources.

Both syncEntitlementsForResourceLogic and syncGrantsForResourceLogic call:

  • counts.AddEntitlementsProgress(resourceID.ResourceType, 1) /
  • counts.AddGrantsProgress(resourceID.ResourceType, 1)

every time a page is processed, regardless of whether more pages remain. In the sequential implementation, these counters are incremented only once per resource when the final page is consumed, so:

  • resources[resourceType] ≈ number of resources
  • entitlementsProgress[resourceType] / grantsProgress[resourceType] ≈ number of resources fully processed

With the current parallel logic, multi‑page resources will be over‑counted, leading to misleading percentages and spurious “more … than resources” warnings from ProgressLog. It would be better to only increment these counters when NextPageToken == "", mirroring the sequential path.

Also applies to: 1709-1859


891-926: Parallel sync path currently lacks explicit checkpointing / WAL configuration.

parallelSyncer.Sync initializes state and starts workers, but it never calls SequentialSyncer.Checkpoint during processing and finalizeSync only calls EndSync and Cleanup—no final checkpoint, and no explicit WAL behavior. Prior work on the parallel sync design required WAL checkpointing to be always enabled for parallel operations to avoid checkpoint failures under high concurrency, and relied on periodic checkpoints for resumability. As this type is wired into the real --parallel-sync path, please verify that:

  • The underlying store is configured for WAL appropriately, and
  • Any required checkpoint/state semantics (including clearing large structures like the entitlement graph) are preserved for the parallel path.

Also applies to: 1198-1216

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d59a8d1 and d939bfa.

⛔ Files ignored due to path filters (2)
  • pb/c1/connector/v2/resource.pb.go is excluded by !**/*.pb.go
  • pb/c1/connector/v2/resource_protoopaque.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (15)
  • pb/c1/connector/v2/resource.pb.validate.go (1 hunks)
  • pkg/cli/commands.go (1 hunks)
  • pkg/connectorrunner/runner.go (5 hunks)
  • pkg/dotc1z/sql_helpers.go (1 hunks)
  • pkg/field/defaults.go (2 hunks)
  • pkg/sync/parallel_syncer.go (1 hunks)
  • pkg/sync/progresslog/progresslog.go (1 hunks)
  • pkg/sync/sequential_syncer.go (1 hunks)
  • pkg/sync/state.go (3 hunks)
  • pkg/sync/syncer.go (56 hunks)
  • pkg/sync/syncer_test.go (2 hunks)
  • pkg/tasks/c1api/full_sync.go (4 hunks)
  • pkg/tasks/c1api/manager.go (4 hunks)
  • pkg/tasks/local/syncer.go (4 hunks)
  • proto/c1/connector/v2/resource.proto (1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-10-21T23:58:13.768Z
Learnt from: jirwin
Repo: ConductorOne/baton-sdk PR: 524
File: pkg/sync/parallel_syncer.go:827-829
Timestamp: 2025-10-21T23:58:13.768Z
Learning: In the parallel syncer implementation (pkg/sync/parallel_syncer.go), WAL checkpointing must always be enabled for parallel sync operations. The `NewParallelSyncer` function unconditionally sets `baseSyncer.enableWALCheckpoint = true` because high concurrency in parallel sync requires WAL checkpointing to prevent checkpoint failures. This is a design requirement, not a configurable option.

Applied to files:

  • pkg/tasks/c1api/manager.go
  • pkg/tasks/c1api/full_sync.go
  • pkg/connectorrunner/runner.go
  • pkg/sync/sequential_syncer.go
  • pkg/field/defaults.go
  • pkg/tasks/local/syncer.go
  • pkg/sync/syncer.go
  • pkg/sync/parallel_syncer.go
  • pkg/cli/commands.go
📚 Learning: 2025-09-02T15:06:42.596Z
Learnt from: kans
Repo: ConductorOne/baton-sdk PR: 410
File: pkg/sync/syncer.go:650-652
Timestamp: 2025-09-02T15:06:42.596Z
Learning: In the baton-sdk codebase, the syncIDClientWrapper middleware automatically adds ActiveSync annotations to all connector requests that support annotations. This eliminates the need to manually add ActiveSync annotations to individual ListResourceTypes, ListResources, ListEntitlements, ListGrants, and other connector calls in pkg/sync/syncer.go.

Applied to files:

  • pkg/sync/syncer.go
📚 Learning: 2025-06-02T21:28:25.367Z
Learnt from: MarcusGoldschmidt
Repo: ConductorOne/baton-sdk PR: 393
File: pkg/test/integration_wrapper.go:54-54
Timestamp: 2025-06-02T21:28:25.367Z
Learning: The `connectorbuilder.NewConnector` function in the baton-sdk already validates the connector parameter using a type switch and returns appropriate errors for nil or invalid connectors in the default case, so additional nil checks are not needed before calling this function.

Applied to files:

  • pkg/sync/syncer.go
🧬 Code graph analysis (3)
pkg/sync/sequential_syncer.go (3)
pkg/sync/syncer.go (2)
  • SequentialSyncer (56-86)
  • ErrSyncNotComplete (48-48)
pkg/retry/retry.go (1)
  • NewRetryer (31-45)
pkg/sync/state.go (10)
  • InitOp (130-130)
  • Action (145-152)
  • SyncTargetedResourceOp (139-139)
  • SyncResourceTypesOp (131-131)
  • SyncGrantExpansionOp (138-138)
  • SyncExternalResourcesOp (136-136)
  • SyncGrantsOp (135-135)
  • SyncStaticEntitlementsOp (140-140)
  • SyncResourcesOp (132-132)
  • SyncAssetsOp (137-137)
pkg/field/defaults.go (2)
pkg/field/fields.go (1)
  • BoolField (179-199)
pkg/field/field_options.go (4)
  • WithDescription (47-53)
  • WithPersistent (129-135)
  • WithExportTarget (103-111)
  • ExportTargetNone (97-97)
pkg/sync/parallel_syncer.go (3)
pkg/sync/state.go (7)
  • Action (145-152)
  • CollectEntitlementsAndGrantsTasksOp (141-141)
  • SyncEntitlementsOp (133-133)
  • SyncGrantsOp (135-135)
  • SyncResourcesOp (132-132)
  • SyncGrantExpansionOp (138-138)
  • SyncExternalResourcesOp (136-136)
pb/c1/connector/v2/resource_protoopaque.pb.go (6)
  • ResourceId (2629-2636)
  • ResourceId (2649-2649)
  • ResourceType (129-140)
  • ResourceType (153-153)
  • Resource (2714-2726)
  • Resource (2739-2739)
pb/c1/connector/v2/resource.pb.go (6)
  • ResourceId (2655-2662)
  • ResourceId (2675-2675)
  • ResourceType (129-144)
  • ResourceType (157-157)
  • Resource (2740-2752)
  • Resource (2765-2765)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: go-test (1.25.2, ubuntu-latest)
  • GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (6)
pkg/dotc1z/sql_helpers.go (1)

462-462: LGTM! Clean refactoring.

The use of min to clamp the end boundary is more concise and idiomatic than the previous manual check. The logic is functionally equivalent and clearer.

pkg/sync/syncer_test.go (1)

39-40: Using t.Context() is a good lifecycle improvement; verify Go version support

Switching to context.WithCancel(t.Context()) ties the sync to the test’s lifecycle, which is preferable to a background context, but it does rely on a Go toolchain where testing.T.Context() is available. Please confirm your minimum supported Go version exposes t.Context(); otherwise this will break builds on older Go releases.

pkg/sync/state.go (1)

70-71: New CollectEntitlementsAndGrantsTasksOp enum is wired correctly

The new op is appended at the end of the iota block (preserving existing numeric values) and is round‑tripped consistently via String() and newActionOp, so existing serialized tokens remain compatible.

Also applies to: 119-120, 129-142

pb/c1/connector/v2/resource.pb.validate.go (1)

168-169: Generated validator update for SyncBucket is fine

The added “no validation rules for SyncBucket” comment correctly reflects the absence of PGV constraints for the new field. Since this file is generated, just ensure it continues to be regenerated from the proto rather than hand-edited.

pkg/cli/commands.go (1)

336-338: parallel-sync CLI flag is correctly plumbed into runner options

Appending connectorrunner.WithParallelSyncEnabled() when parallel-sync is set cleanly threads the flag into the runner without disturbing existing mode selection (daemon, on‑demand sync, diff/compact, etc.). This aligns with the new default field and leaves room for the runner to enforce any parallel‑sync invariants internally (e.g., WAL checkpointing).

pkg/tasks/c1api/manager.go (1)

46-59: Parallel‑sync flag propagation into full‑sync handler looks solid.

The new parallelSync field is cleanly threaded from NewC1TaskManager into newFullSyncTaskHandler for FullSyncType while leaving other task types untouched. No issues from this change alone.

Also applies to: 245-256, 304-330

Comment on lines +1914 to +1925
func (ps *parallelSyncer) shouldSkipEntitlementsAndGrants(ctx context.Context, r *v2.Resource) (bool, error) {
// This replicates the logic from the original shouldSkipEntitlementsAndGrants method
// Check if the resource has the SkipEntitlementsAndGrants annotation

for _, a := range r.Annotations {
if a.MessageIs((*v2.SkipEntitlementsAndGrants)(nil)) {
return true, nil
}
}

return false, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Skip‑logic for entitlements/grants diverges from the SequentialSyncer behavior.

parallelSyncer.shouldSkipEntitlementsAndGrants only inspects SkipEntitlementsAndGrants on the resource annotations. The sequential path’s SequentialSyncer.shouldSkipEntitlementsAndGrants also honors:

  • Global state.ShouldSkipEntitlementsAndGrants(),
  • Cached per‑resource‑type skip decisions based on resource‑type annotations.

That means the parallel path may sync entitlements/grants that the sequential path would skip (e.g., when --skip-entitlements-and-grants is set or when only the resource type is annotated). Consider delegating to the existing SequentialSyncer method or replicating its full logic to keep semantics consistent.

jirwin and others added 9 commits January 7, 2026 16:49
Fix deadlock

Play around with concurrent resource tasks

A little log cleanup

More log cleanup

fix build

Add task with retry and backoff

default shouldFetchRelated to false

Check context when waiting to add task

Fix syntax error

fix logging

Fix deadlocks

Don't exit the sync early if we aren't making enough progress.

Call expand grants until it is complete

Wire up parallel sync config option to service mode sync tasks.

fix issue where only 10,000 entitlements are synced

add CollectEntitlementsAndGrantsTasksOp

set default worker count to 2

apply lock when updating entitlements progress during parallel sync

use exponential backoff in addTaskWithRetry

refactor so that AddTask* has minimal locking

fix entitlement and grant progress counts

we are tracking the number of resources processed, not the total number of entitlements and grants

lock when expanding queues

Use a transaction per chunk during bulk object put, also retry sqlite busy/dblocked errors.

WAL checkpoint on 5 minute interval

pause sqlite activity during checkpoint

manual WAL checkpoints only apply to parallel sync

update comment
update type for WithSessionStore

Don't check the bucket name when looking to see if tasks are complete. Check context when sleeping.

Check context when sleeping.

Lint fixes

More lint fixes.

Various review feedback fixes.
…ribute from progress logger. Use a fake mutex if we're in sequential mode, and a real mutex otherwise. This simplifies some logic.
…instead of modifying data directly in syncer.
@ggreer ggreer force-pushed the ggreer/kans/parallel branch from d939bfa to e7d626b Compare January 8, 2026 00:53
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🤖 Fix all issues with AI agents
In @pkg/sync/parallel_syncer.go:
- Around line 663-723: The worker currently calls w.taskQueue.Close() and
returns on critical failures (in the worker loop around addTasksAfterCompletion)
but only logs the error, so Sync/waitForCompletion think the run succeeded; add
an error propagation mechanism on parallelSyncer (e.g., an atomic.Pointer[error]
named syncError or an error channel) and have workers store the first failure
into that field before closing the queue (use CompareAndSwap to set first
error). Update waitForCompletion (and/or Sync) to check ps.syncError.Load() (or
read from the channel) and return that error immediately instead of nil so the
main sync loop surfaces worker failures.
- Around line 571-579: taskQueue.Close may panic if q.parallelSyncer.workers is
nil; update Close (method taskQueue.Close) to defensively check that
q.parallelSyncer is non-nil and q.parallelSyncer.workers is non-nil before
ranging over it (or safely guard the loop with a nil check), then still set
q.closed and release the lock; ensure this handles calls that occur before
startWorkers completes without dereferencing a nil slice/map.
- Around line 1218-1237: The syncResourceTypes method currently calls
ListResourceTypes only once and doesn't loop over pagination, so additional
pages (NextPageToken) are ignored; update parallelSyncer.syncResourceTypes to
mirror the sequential logic by repeatedly calling
ps.syncer.connector.ListResourceTypes with a request that sets PageToken from
the previous response until NextPageToken is empty, calling
ps.syncer.store.PutResourceTypes for each page (resp.List...), aggregating
counts via ps.syncer.counts.AddResourceTypes for each batch and returning any
errors from the connector or store as before.
- Around line 1177-1196: The cleanup call to ps.syncer.state.FinishAction(ctx)
runs unconditionally and can leave the state inconsistent on error; make the
function use a named error return (e.g., err error) and add a deferred closure
that only calls ps.syncer.state.FinishAction(ctx) when err == nil, then assign
the result of ps.syncer.SyncExternalResources(ctx) to that named err before
returning so FinishAction is skipped when SyncExternalResources fails.
- Around line 1913-1925: The parallelSyncer.shouldSkipEntitlementsAndGrants
implementation only inspects resource annotations and thus diverges from
SequentialSyncer behavior (it omits checks like
state.ShouldSkipEntitlementsAndGrants() and cached per-type decisions); fix it
by delegating to the existing sequential logic: call
ps.syncer.shouldSkipEntitlementsAndGrants(ctx, r) (or forward the call to
SequentialSyncer.shouldSkipEntitlementsAndGrants) so the global flag and cached
resource-type skip decisions are respected, returning whatever that method
returns.

In @pkg/sync/progresslog/progresslog.go:
- Around line 257-265: The SetSequentialMode method on ProgressLog is unsafe
because it swaps p.mu at runtime without synchronization and can race with
callers like AddResources; remove the SetSequentialMode method entirely and
ensure callers use the creation-time option WithSequentialMode to set the mutex
behavior on ProgressLog construction (ensure ProgressLog's constructor/factory
accepts and applies WithSequentialMode to initialize p.mu once), and update any
call sites that used SetSequentialMode to pass the option instead.
🧹 Nitpick comments (4)
pkg/sync/progresslog/progresslog.go (1)

99-121: Minor: Log frequency check and update are not atomic.

The pattern reads lastEntitlementLog under RLock, releases it, then acquires Lock to update. In high-concurrency scenarios, multiple goroutines could pass the time check simultaneously and all log. This is benign for logging but could result in slightly more frequent logs than intended.

pkg/sync/syncer.go (1)

2725-2729: Consider validating worker count is non-negative.

WithWorkerCount accepts any int value. A negative count would trigger the parallel sync path (workerCount > 0 is false for negative, but the intent is unclear). Consider adding validation or using uint to make the API clearer.

Suggested validation
 func WithWorkerCount(count int) SyncOpt {
 	return func(s *SequentialSyncer) {
+		if count < 0 {
+			count = 0 // Fall back to sequential sync
+		}
 		s.workerCount = count
 	}
 }
pkg/sync/parallel_syncer.go (2)

428-444: Queue expansion replaces channel while workers may hold references.

When expandQueueAndRetry replaces q.bucketQueues[bucket] with a new channel, any worker that previously obtained a reference to the old queue via GetTask could miss tasks added to the new queue. However, since GetTask always reads from the map under lock, this appears safe.

The old channel is not closed, which is actually correct since it prevents panics on closed-channel sends, but leftover items (if any exist due to a race) would be orphaned. Consider adding a comment explaining this is intentional.


110-161: Recursive immediate processing could cause stack overflow.

processTaskImmediately recursively calls itself for sub-tasks (lines 126, 151). In connectors with deeply nested resource hierarchies, this could exhaust the stack. Consider using an iterative approach with a local task stack.

Iterative approach
func (w *worker) processTaskImmediately(initialTask *task) error {
    taskStack := []*task{initialTask}
    
    for len(taskStack) > 0 {
        // Pop task
        t := taskStack[len(taskStack)-1]
        taskStack = taskStack[:len(taskStack)-1]
        
        var subTasks []*task
        var err error
        
        switch t.Action.Op {
        case CollectEntitlementsAndGrantsTasksOp:
            subTasks, err = w.syncer.collectEntitlementsAndGrantsTasks(w.ctx, t.Action)
        // ... other cases ...
        }
        
        if err != nil {
            return err
        }
        taskStack = append(taskStack, subTasks...)
    }
    return nil
}
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d939bfa and e7d626b.

⛔ Files ignored due to path filters (2)
  • pb/c1/connector/v2/resource.pb.go is excluded by !**/*.pb.go
  • pb/c1/connector/v2/resource_protoopaque.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (15)
  • pb/c1/connector/v2/resource.pb.validate.go
  • pkg/cli/commands.go
  • pkg/connectorrunner/runner.go
  • pkg/dotc1z/sql_helpers.go
  • pkg/field/defaults.go
  • pkg/sync/parallel_syncer.go
  • pkg/sync/progresslog/progresslog.go
  • pkg/sync/sequential_syncer.go
  • pkg/sync/state.go
  • pkg/sync/syncer.go
  • pkg/sync/syncer_test.go
  • pkg/tasks/c1api/full_sync.go
  • pkg/tasks/c1api/manager.go
  • pkg/tasks/local/syncer.go
  • proto/c1/connector/v2/resource.proto
🚧 Files skipped from review as they are similar to previous changes (6)
  • pkg/tasks/c1api/manager.go
  • pb/c1/connector/v2/resource.pb.validate.go
  • pkg/sync/sequential_syncer.go
  • pkg/tasks/c1api/full_sync.go
  • pkg/cli/commands.go
  • pkg/sync/state.go
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2026-01-02T17:21:01.723Z
Learnt from: ggreer
Repo: ConductorOne/baton-sdk PR: 616
File: pkg/synccompactor/compactor_test.go:44-52
Timestamp: 2026-01-02T17:21:01.723Z
Learning: In tests that verify cleanup behavior (e.g., ensuring temporary artifacts are removed or directories are empty after an operation), treat cleanup failures as test failures by asserting on the cleanup call (e.g., require.NoError(t, err) or assert.NoError(t, err)). This ensures that the cleanup path is explicitly tested and any cleanup error fails the test, confirming correct behavior of the cleanup code.

Applied to files:

  • pkg/sync/syncer_test.go
📚 Learning: 2025-10-21T23:58:13.768Z
Learnt from: jirwin
Repo: ConductorOne/baton-sdk PR: 524
File: pkg/sync/parallel_syncer.go:827-829
Timestamp: 2025-10-21T23:58:13.768Z
Learning: In the parallel syncer implementation (pkg/sync/parallel_syncer.go), WAL checkpointing must always be enabled for parallel sync operations. The `NewParallelSyncer` function unconditionally sets `baseSyncer.enableWALCheckpoint = true` because high concurrency in parallel sync requires WAL checkpointing to prevent checkpoint failures. This is a design requirement, not a configurable option.

Applied to files:

  • pkg/tasks/local/syncer.go
  • pkg/field/defaults.go
  • pkg/sync/syncer.go
  • pkg/connectorrunner/runner.go
  • pkg/sync/parallel_syncer.go
📚 Learning: 2025-09-02T15:06:42.596Z
Learnt from: kans
Repo: ConductorOne/baton-sdk PR: 410
File: pkg/sync/syncer.go:650-652
Timestamp: 2025-09-02T15:06:42.596Z
Learning: In the baton-sdk codebase, the syncIDClientWrapper middleware automatically adds ActiveSync annotations to all connector requests that support annotations. This eliminates the need to manually add ActiveSync annotations to individual ListResourceTypes, ListResources, ListEntitlements, ListGrants, and other connector calls in pkg/sync/syncer.go.

Applied to files:

  • pkg/sync/syncer.go
📚 Learning: 2025-06-02T21:28:25.367Z
Learnt from: MarcusGoldschmidt
Repo: ConductorOne/baton-sdk PR: 393
File: pkg/test/integration_wrapper.go:54-54
Timestamp: 2025-06-02T21:28:25.367Z
Learning: The `connectorbuilder.NewConnector` function in the baton-sdk already validates the connector parameter using a type switch and returns appropriate errors for nil or invalid connectors in the default case, so additional nil checks are not needed before calling this function.

Applied to files:

  • pkg/sync/syncer.go
🧬 Code graph analysis (2)
pkg/sync/progresslog/progresslog.go (1)
pkg/sync/expand/graph.go (1)
  • EntitlementGraphAction (15-21)
pkg/sync/parallel_syncer.go (2)
pkg/sync/state.go (1)
  • Action (145-152)
pb/c1/connector/v2/resource.pb.go (8)
  • ResourceId (2759-2766)
  • ResourceId (2779-2779)
  • ResourceType (129-144)
  • ResourceType (157-157)
  • ResourcesServiceListResourcesRequest (3032-3042)
  • ResourcesServiceListResourcesRequest (3055-3055)
  • Resource (2844-2856)
  • Resource (2869-2869)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: go-test (1.25.2, ubuntu-latest)
  • GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (13)
pkg/dotc1z/sql_helpers.go (1)

454-454: LGTM: Clean refactor of boundary calculation.

Using min((i+1)*chunkSize, len(rows)) is more concise and idiomatic than the previous if-statement approach, while maintaining identical behavior.

pkg/field/defaults.go (2)

98-98: LGTM: Parallel sync field properly declared.

The field is correctly configured with appropriate options (persistent, no export target) and consistent with other default fields in this file.


368-368: LGTM: Field properly registered.

The parallelSync field is correctly added to the DefaultFields list.

pkg/sync/syncer_test.go (1)

39-40: LGTM: Improved context management.

Using a cancelable context gives the test explicit control over cancellation, which is good practice and aligns well with the parallel sync changes being introduced.

pkg/connectorrunner/runner.go (4)

353-353: LGTM: Parallel sync flag added to runner configuration.

The field is appropriately placed and typed consistently with other boolean flags in the struct.


582-587: LGTM: Option properly implemented.

The WithParallelSyncEnabled() option follows the established pattern for other runner options and correctly sets the configuration flag.


858-858: LGTM: Flag properly propagated to local syncer.

The cfg.parallelSync value is correctly passed through to the local syncer via the WithParallelSyncEnabled option.


871-882: LGTM: Flag properly propagated to C1 task manager.

The cfg.parallelSync parameter is correctly added to the NewC1TaskManager call, ensuring both on-demand and non-on-demand paths receive the parallel sync configuration.

pkg/tasks/local/syncer.go (3)

29-29: LGTM: Parallel sync field added.

The field is properly typed and positioned with the other configuration fields in the struct.


76-80: LGTM: Option properly implemented.

The WithParallelSyncEnabled option is correctly implemented and follows the established pattern for other options in this file.


109-126: Good incremental approach to building sync options.

The refactored option-building logic is clear and allows conditional addition of the worker count option. The hardcoded worker count of 10 has an appropriate TODO comment for future work.

Since this is a WIP PR, the hardcoded value is acceptable. When making this configurable, consider:

  • Exposing a WithWorkerCount() option in the local syncer API
  • Providing sensible defaults based on available resources (similar to how pkg/dotc1z/sql_helpers.go uses min(max(runtime.GOMAXPROCS(0), 1), 4) for worker pool sizing)
  • Documenting the performance implications of different worker counts
proto/c1/connector/v2/resource.proto (1)

48-53: LGTM - Well-documented proto field addition.

The sync_bucket field with clear documentation about sequential-within-bucket and parallel-across-bucket semantics is a good addition. The field is optional with no validation, allowing flexible bucket naming.

pkg/sync/syncer.go (1)

55-86: LGTM - Clean refactoring to exported type.

The rename from syncer to SequentialSyncer with proper export is well-executed. The addition of workerCount to control sequential vs parallel mode is a clear design. The integration with progresslog.ProgressLog for thread-safe progress tracking is appropriate.

Comment on lines +571 to +579
func (q *taskQueue) Close() {
q.mu.Lock()
defer q.mu.Unlock()

for _, w := range q.parallelSyncer.workers {
w.cancel()
}
q.closed = true
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential nil pointer dereference in taskQueue.Close.

Close() iterates over q.parallelSyncer.workers without checking if workers is initialized. If called before startWorkers completes (e.g., during early error handling), this could panic.

Suggested fix
 func (q *taskQueue) Close() {
 	q.mu.Lock()
 	defer q.mu.Unlock()
 
-	for _, w := range q.parallelSyncer.workers {
-		w.cancel()
+	if q.parallelSyncer != nil && q.parallelSyncer.workers != nil {
+		for _, w := range q.parallelSyncer.workers {
+			w.cancel()
+		}
 	}
 	q.closed = true
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (q *taskQueue) Close() {
q.mu.Lock()
defer q.mu.Unlock()
for _, w := range q.parallelSyncer.workers {
w.cancel()
}
q.closed = true
}
func (q *taskQueue) Close() {
q.mu.Lock()
defer q.mu.Unlock()
if q.parallelSyncer != nil && q.parallelSyncer.workers != nil {
for _, w := range q.parallelSyncer.workers {
w.cancel()
}
}
q.closed = true
}
🤖 Prompt for AI Agents
In @pkg/sync/parallel_syncer.go around lines 571 - 579, taskQueue.Close may
panic if q.parallelSyncer.workers is nil; update Close (method taskQueue.Close)
to defensively check that q.parallelSyncer is non-nil and
q.parallelSyncer.workers is non-nil before ranging over it (or safely guard the
loop with a nil check), then still set q.closed and release the lock; ensure
this handles calls that occur before startWorkers completes without
dereferencing a nil slice/map.

Comment on lines +663 to +723
if err != nil {
// Add pending tasks after task completion (even if failed, they might be valid)
if taskResult != nil && len(taskResult.Tasks) > 0 {
err = w.addTasksAfterCompletion(taskResult.Tasks)
if err != nil {
l.Error("failed to add tasks after completion",
zap.Int("worker_id", w.id),
zap.String("bucket", taskBucket),
zap.Error(err))
w.taskQueue.Close()
return
}
}
l.Error("failed to process task",
zap.Int("worker_id", w.id),
zap.String("bucket", taskBucket),
zap.String("operation", task.Action.Op.String()),
zap.String("resource_type", task.Action.ResourceTypeID),
zap.Error(err))

consecutiveFailures++

// Check if this is a rate limit error
if w.isRateLimitError(err) {
w.rateLimited.Store(true)

// If we're hitting rate limits in the current bucket, consider switching
if consecutiveFailures >= maxConsecutiveFailures {
l.Info("worker hitting rate limits in bucket, will try other buckets",
zap.Int("worker_id", w.id),
zap.String("bucket", taskBucket),
zap.Int("consecutive_failures", consecutiveFailures))

// Force bucket switch on next iteration
currentBucket = ""
consecutiveFailures = 0
}

// Wait before retrying with bucket-specific delay
delay := w.getBucketRateLimitDelay(taskBucket)
select {
case <-w.ctx.Done():
return
case <-time.After(delay):
}
} else {
// Non-rate-limit error, reset rate limit flag
w.rateLimited.Store(false)
}
} else {
// Task succeeded, add any pending tasks after completion
if taskResult != nil && len(taskResult.Tasks) > 0 {
err = w.addTasksAfterCompletion(taskResult.Tasks)
if err != nil {
l.Error("failed to add tasks after completion",
zap.Int("worker_id", w.id),
zap.String("bucket", taskBucket),
zap.Error(err))
w.taskQueue.Close()
return
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Worker errors close queue but don't propagate to main sync loop.

When a worker encounters a critical error adding tasks (lines 672, 721), it calls w.taskQueue.Close() and returns. However:

  1. The error is only logged, not propagated to waitForCompletion or Sync.
  2. waitForCompletion will eventually detect the empty queue and return nil, masking the failure.
  3. The sync will complete "successfully" despite worker failures.

Consider adding an error channel or atomic error field to propagate worker failures to the main sync loop.

Suggested approach

Add an error field to parallelSyncer:

type parallelSyncer struct {
    // ... existing fields ...
    syncError atomic.Pointer[error] // First error encountered by any worker
}

In worker error handling:

if err != nil {
    syncErr := fmt.Errorf("worker %d failed: %w", w.id, err)
    w.syncer.syncError.CompareAndSwap(nil, &syncErr)
    w.taskQueue.Close()
    return
}

In waitForCompletion, check ps.syncError.Load() and return it if set.

🤖 Prompt for AI Agents
In @pkg/sync/parallel_syncer.go around lines 663 - 723, The worker currently
calls w.taskQueue.Close() and returns on critical failures (in the worker loop
around addTasksAfterCompletion) but only logs the error, so
Sync/waitForCompletion think the run succeeded; add an error propagation
mechanism on parallelSyncer (e.g., an atomic.Pointer[error] named syncError or
an error channel) and have workers store the first failure into that field
before closing the queue (use CompareAndSwap to set first error). Update
waitForCompletion (and/or Sync) to check ps.syncError.Load() (or read from the
channel) and return that error immediately instead of nil so the main sync loop
surfaces worker failures.

Comment on lines +1042 to +1141
// waitForCompletion waits for all tasks to complete with bucket-aware monitoring.
func (ps *parallelSyncer) waitForCompletion(ctx context.Context) error {
ctx, span := parallelTracer.Start(ctx, "parallelSyncer.waitForCompletion")
defer span.End()

l := ctxzap.Extract(ctx)

// Monitor task completion with periodic status updates
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

lastTaskCount := 0
noProgressCount := 0
maxNoProgressCount := 6 // 30 seconds without progress

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
// Get current bucket statistics
bucketStats := ps.taskQueue.GetBucketStats()
totalTasks := 0
for _, count := range bucketStats {
totalTasks += count
}

// Log progress
if len(bucketStats) > 0 {
// Debug: Log which buckets still have active tasks
activeBuckets := make([]string, 0)
for bucketName, taskCount := range bucketStats {
if taskCount > 0 {
activeBuckets = append(activeBuckets, fmt.Sprintf("%s:%d", bucketName, taskCount))
}
}
l.Debug("active buckets", zap.Strings("active_buckets", activeBuckets))
}

// Check if we're making progress
if totalTasks == lastTaskCount {
noProgressCount++
if noProgressCount >= maxNoProgressCount {
l.Warn("no task progress detected",
zap.Int("no_progress_count", noProgressCount),
zap.Int("last_task_count", lastTaskCount),
zap.Int("total_tasks", totalTasks))
}
} else {
noProgressCount = 0
lastTaskCount = totalTasks
}

// Check if all resource-specific tasks are complete
// We need to ensure ALL resource types have finished processing
if totalTasks == 0 {
// Double-check that we're truly done with resource processing
// Look for any active resource processing in the bucket stats
allResourceProcessingComplete := true
for _, taskCount := range bucketStats {
if taskCount > 0 {
allResourceProcessingComplete = false
break
}
}

if allResourceProcessingComplete {
// Additional safety check: wait a bit more to ensure workers are truly idle
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(2 * time.Second):
}

// Check one more time to ensure no new tasks appeared
finalBucketStats := ps.taskQueue.GetBucketStats()
finalTotalTasks := 0
for _, count := range finalBucketStats {
finalTotalTasks += count
}

if finalTotalTasks == 0 {
// Final check: ensure all workers are actually idle
if ps.areWorkersIdle() {
return nil
} else {
// Reset progress counters since we're not done yet
noProgressCount = 0
lastTaskCount = finalTotalTasks
}
} else {
// Reset progress counters since we're not done yet
noProgressCount = 0
lastTaskCount = finalTotalTasks
}
}
}
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Completion detection relies on polling and heuristics.

The waitForCompletion method uses a 5-second ticker with a "no progress" counter and arbitrary 2-second waits to detect completion. This approach has several risks:

  1. Delayed completion: The 2-second confirmation delay (line 1113) always adds latency even when sync is truly complete.
  2. False completion: If workers are slow and the queue appears empty, the function might return prematurely before workers finish their current task processing.
  3. No hard timeout: The maxNoProgressCount only logs a warning but doesn't fail, potentially allowing infinite loops.

Consider using a sync.WaitGroup or completion channel for more reliable completion signaling.

Comment on lines +1177 to +1196
// syncExternalResources handles external resources by delegating to the base syncer.
func (ps *parallelSyncer) syncExternalResources(ctx context.Context) error {
ctx, span := parallelTracer.Start(ctx, "parallelSyncer.syncExternalResources")
defer span.End()

// The base syncer's SyncExternalResources expects to have actions in its state stack
// We need to set up the proper state context before calling it
ps.syncer.state.PushAction(ctx, Action{
Op: SyncExternalResourcesOp,
})

// Delegate to the base syncer's external resources logic
// This ensures we get the exact same behavior as the sequential sync
err := ps.syncer.SyncExternalResources(ctx)

// Clean up the state
ps.syncer.state.FinishAction(ctx)

return err
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

State cleanup happens even on error.

Line 1193 calls ps.syncer.state.FinishAction(ctx) unconditionally, even when SyncExternalResources returns an error. This could leave the state machine in an inconsistent state.

Suggested fix
 func (ps *parallelSyncer) syncExternalResources(ctx context.Context) error {
 	// ... setup code ...
 
 	err := ps.syncer.SyncExternalResources(ctx)
+	if err != nil {
+		return err
+	}
 
-	// Clean up the state
 	ps.syncer.state.FinishAction(ctx)
-
-	return err
+	return nil
 }
🤖 Prompt for AI Agents
In @pkg/sync/parallel_syncer.go around lines 1177 - 1196, The cleanup call to
ps.syncer.state.FinishAction(ctx) runs unconditionally and can leave the state
inconsistent on error; make the function use a named error return (e.g., err
error) and add a deferred closure that only calls
ps.syncer.state.FinishAction(ctx) when err == nil, then assign the result of
ps.syncer.SyncExternalResources(ctx) to that named err before returning so
FinishAction is skipped when SyncExternalResources fails.

Comment on lines +1218 to +1237
// syncResourceTypes syncs resource types (equivalent to SyncResourceTypes).
func (ps *parallelSyncer) syncResourceTypes(ctx context.Context) error {
ctx, span := parallelTracer.Start(ctx, "parallelSyncer.syncResourceTypes")
defer span.End()

// This replicates the exact logic from the original SyncResourceTypes
resp, err := ps.syncer.connector.ListResourceTypes(ctx, &v2.ResourceTypesServiceListResourceTypesRequest{})
if err != nil {
return err
}

err = ps.syncer.store.PutResourceTypes(ctx, resp.List...)
if err != nil {
return err
}

ps.syncer.counts.AddResourceTypes(len(resp.List))

return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

syncResourceTypes doesn't handle pagination.

The method calls ListResourceTypes once without a page token and doesn't check for NextPageToken. If a connector returns more resource types than a single page, the parallel syncer will miss them while the sequential syncer handles this correctly via its pagination loop.

Suggested fix: Add pagination loop
 func (ps *parallelSyncer) syncResourceTypes(ctx context.Context) error {
 	ctx, span := parallelTracer.Start(ctx, "parallelSyncer.syncResourceTypes")
 	defer span.End()
 
-	resp, err := ps.syncer.connector.ListResourceTypes(ctx, &v2.ResourceTypesServiceListResourceTypesRequest{})
-	if err != nil {
-		return err
-	}
-
-	err = ps.syncer.store.PutResourceTypes(ctx, resp.List...)
-	if err != nil {
-		return err
-	}
-
-	ps.syncer.counts.AddResourceTypes(len(resp.List))
+	pageToken := ""
+	for {
+		resp, err := ps.syncer.connector.ListResourceTypes(ctx, &v2.ResourceTypesServiceListResourceTypesRequest{
+			PageToken: pageToken,
+		})
+		if err != nil {
+			return err
+		}
+
+		err = ps.syncer.store.PutResourceTypes(ctx, resp.List...)
+		if err != nil {
+			return err
+		}
+
+		ps.syncer.counts.AddResourceTypes(len(resp.List))
+
+		if resp.NextPageToken == "" {
+			break
+		}
+		pageToken = resp.NextPageToken
+	}
 
 	return nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// syncResourceTypes syncs resource types (equivalent to SyncResourceTypes).
func (ps *parallelSyncer) syncResourceTypes(ctx context.Context) error {
ctx, span := parallelTracer.Start(ctx, "parallelSyncer.syncResourceTypes")
defer span.End()
// This replicates the exact logic from the original SyncResourceTypes
resp, err := ps.syncer.connector.ListResourceTypes(ctx, &v2.ResourceTypesServiceListResourceTypesRequest{})
if err != nil {
return err
}
err = ps.syncer.store.PutResourceTypes(ctx, resp.List...)
if err != nil {
return err
}
ps.syncer.counts.AddResourceTypes(len(resp.List))
return nil
}
// syncResourceTypes syncs resource types (equivalent to SyncResourceTypes).
func (ps *parallelSyncer) syncResourceTypes(ctx context.Context) error {
ctx, span := parallelTracer.Start(ctx, "parallelSyncer.syncResourceTypes")
defer span.End()
pageToken := ""
for {
resp, err := ps.syncer.connector.ListResourceTypes(ctx, &v2.ResourceTypesServiceListResourceTypesRequest{
PageToken: pageToken,
})
if err != nil {
return err
}
err = ps.syncer.store.PutResourceTypes(ctx, resp.List...)
if err != nil {
return err
}
ps.syncer.counts.AddResourceTypes(len(resp.List))
if resp.NextPageToken == "" {
break
}
pageToken = resp.NextPageToken
}
return nil
}
🤖 Prompt for AI Agents
In @pkg/sync/parallel_syncer.go around lines 1218 - 1237, The syncResourceTypes
method currently calls ListResourceTypes only once and doesn't loop over
pagination, so additional pages (NextPageToken) are ignored; update
parallelSyncer.syncResourceTypes to mirror the sequential logic by repeatedly
calling ps.syncer.connector.ListResourceTypes with a request that sets PageToken
from the previous response until NextPageToken is empty, calling
ps.syncer.store.PutResourceTypes for each page (resp.List...), aggregating
counts via ps.syncer.counts.AddResourceTypes for each batch and returning any
errors from the connector or store as before.

Comment on lines +1913 to +1925
// shouldSkipEntitlementsAndGrants checks if entitlements and grants should be skipped for a resource.
func (ps *parallelSyncer) shouldSkipEntitlementsAndGrants(ctx context.Context, r *v2.Resource) (bool, error) {
// This replicates the logic from the original shouldSkipEntitlementsAndGrants method
// Check if the resource has the SkipEntitlementsAndGrants annotation

for _, a := range r.Annotations {
if a.MessageIs((*v2.SkipEntitlementsAndGrants)(nil)) {
return true, nil
}
}

return false, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Skip-logic for entitlements/grants diverges from SequentialSyncer behavior.

This method only checks for SkipEntitlementsAndGrants annotation on the resource. The SequentialSyncer.shouldSkipEntitlementsAndGrants also checks:

  • state.ShouldSkipEntitlementsAndGrants() (global flag from CLI --skip-entitlements-and-grants)
  • Cached per-resource-type skip decisions from resource type annotations

This means parallel sync may process entitlements/grants that sequential sync would skip. Consider delegating to ps.syncer.shouldSkipEntitlementsAndGrants(ctx, r) for consistent behavior.

Suggested fix: Delegate to the existing method
 func (ps *parallelSyncer) shouldSkipEntitlementsAndGrants(ctx context.Context, r *v2.Resource) (bool, error) {
-	// This replicates the logic from the original shouldSkipEntitlementsAndGrants method
-	// Check if the resource has the SkipEntitlementsAndGrants annotation
-
-	for _, a := range r.Annotations {
-		if a.MessageIs((*v2.SkipEntitlementsAndGrants)(nil)) {
-			return true, nil
-		}
-	}
-
-	return false, nil
+	// Delegate to the base syncer to ensure consistent behavior
+	return ps.syncer.shouldSkipEntitlementsAndGrants(ctx, r)
 }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In @pkg/sync/parallel_syncer.go around lines 1913 - 1925, The
parallelSyncer.shouldSkipEntitlementsAndGrants implementation only inspects
resource annotations and thus diverges from SequentialSyncer behavior (it omits
checks like state.ShouldSkipEntitlementsAndGrants() and cached per-type
decisions); fix it by delegating to the existing sequential logic: call
ps.syncer.shouldSkipEntitlementsAndGrants(ctx, r) (or forward the call to
SequentialSyncer.shouldSkipEntitlementsAndGrants) so the global flag and cached
resource-type skip decisions are respected, returning whatever that method
returns.

Comment on lines +257 to +265
// SetSequentialMode enables/disables mutex protection for sequential sync.
// TODO: Remove this method and use the WithSequentialMode option instead.
func (p *ProgressLog) SetSequentialMode(sequential bool) {
if sequential {
p.mu = &noOpMutex{}
} else {
p.mu = &sync.RWMutex{}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Race condition when switching mutex modes at runtime.

SetSequentialMode replaces p.mu without synchronization. If called while other goroutines are actively using the mutex (e.g., via AddResources), this creates a data race. Since the TODO indicates this should be replaced with WithSequentialMode, consider removing this method or adding a warning that it must only be called before any concurrent access begins.

Suggested fix: Add documentation or remove the method
 // SetSequentialMode enables/disables mutex protection for sequential sync.
 // TODO: Remove this method and use the WithSequentialMode option instead.
+// WARNING: This method is NOT thread-safe. It must only be called before
+// any concurrent operations begin (e.g., during syncer initialization).
 func (p *ProgressLog) SetSequentialMode(sequential bool) {
 	if sequential {
 		p.mu = &noOpMutex{}
 	} else {
 		p.mu = &sync.RWMutex{}
 	}
 }
🤖 Prompt for AI Agents
In @pkg/sync/progresslog/progresslog.go around lines 257 - 265, The
SetSequentialMode method on ProgressLog is unsafe because it swaps p.mu at
runtime without synchronization and can race with callers like AddResources;
remove the SetSequentialMode method entirely and ensure callers use the
creation-time option WithSequentialMode to set the mutex behavior on ProgressLog
construction (ensure ProgressLog's constructor/factory accepts and applies
WithSequentialMode to initialize p.mu once), and update any call sites that used
SetSequentialMode to pass the option instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants