Skip to content

Commit 4aaa2a3

Browse files
committed
Expose API to compact specific buckets.
1 parent bc9975d commit 4aaa2a3

File tree

2 files changed

+24
-2
lines changed

2 files changed

+24
-2
lines changed

packages/service-core/src/storage/BucketStorage.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,4 +426,11 @@ export interface CompactOptions {
426426
* not be compacted, to avoid invalidating checkpoints in use.
427427
*/
428428
maxOpId?: bigint;
429+
430+
/**
431+
* If specified, compact only the specific buckets.
432+
*
433+
* If not specified, compacts all buckets.
434+
*/
435+
compactBuckets?: string[];
429436
}

packages/service-core/src/storage/mongo/MongoCompactor.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,15 @@ export class MongoCompactor {
5555
private moveBatchQueryLimit: number;
5656
private clearBatchLimit: number;
5757
private maxOpId: bigint | undefined;
58+
private buckets: string[] | undefined;
5859

5960
constructor(private db: PowerSyncMongo, private group_id: number, options?: MongoCompactOptions) {
6061
this.idLimitBytes = (options?.memoryLimitMB ?? DEFAULT_MEMORY_LIMIT_MB) * 1024 * 1024;
6162
this.moveBatchLimit = options?.moveBatchLimit ?? DEFAULT_MOVE_BATCH_LIMIT;
6263
this.moveBatchQueryLimit = options?.moveBatchQueryLimit ?? DEFAULT_MOVE_BATCH_QUERY_LIMIT;
6364
this.clearBatchLimit = options?.clearBatchLimit ?? DEFAULT_CLEAR_BATCH_LIMIT;
6465
this.maxOpId = options?.maxOpId;
66+
this.buckets = options?.compactBuckets;
6567
}
6668

6769
/**
@@ -70,21 +72,34 @@ export class MongoCompactor {
7072
* See /docs/compacting-operations.md for details.
7173
*/
7274
async compact() {
75+
if (this.buckets) {
76+
for (let bucket of this.buckets) {
77+
// We can make this more efficient later on by iterating
78+
// through the buckets in a single query.
79+
// That makes batching more tricky, so we leave for later.
80+
await this.compactInternal(bucket);
81+
}
82+
} else {
83+
await this.compactInternal(undefined);
84+
}
85+
}
86+
87+
async compactInternal(bucket: string | undefined) {
7388
const idLimitBytes = this.idLimitBytes;
7489

7590
let currentState: CurrentBucketState | null = null;
7691

7792
// Constant lower bound
7893
const lowerBound: BucketDataKey = {
7994
g: this.group_id,
80-
b: new MinKey() as any,
95+
b: bucket ?? (new MinKey() as any),
8196
o: new MinKey() as any
8297
};
8398

8499
// Upper bound is adjusted for each batch
85100
let upperBound: BucketDataKey = {
86101
g: this.group_id,
87-
b: new MaxKey() as any,
102+
b: bucket ?? (new MaxKey() as any),
88103
o: new MaxKey() as any
89104
};
90105

0 commit comments

Comments
 (0)