diff --git a/.changeset/kind-squids-rest.md b/.changeset/kind-squids-rest.md new file mode 100644 index 000000000..13539490b --- /dev/null +++ b/.changeset/kind-squids-rest.md @@ -0,0 +1,6 @@ +--- +"@tanstack/db-ivm": patch +"@tanstack/db": patch +--- + +Add `groupByKey` and `groupKeyFn` options to `orderByWithFractionalIndex` and `topKWithFractionalIndex`. This is groundwork for hierarchical “includes” projections in TanStack DB, where child collections need to enforce limits within each parent’s slice of the stream rather than across the entire dataset. ([Issue #288](https://github.com/TanStack/db/issues/288)) diff --git a/packages/db-ivm/src/operators/orderBy.ts b/packages/db-ivm/src/operators/orderBy.ts index ce399b3bc..0ae152be1 100644 --- a/packages/db-ivm/src/operators/orderBy.ts +++ b/packages/db-ivm/src/operators/orderBy.ts @@ -11,11 +11,17 @@ export interface OrderByOptions { offset?: number } -type OrderByWithFractionalIndexOptions = OrderByOptions & { +type OrderByWithFractionalIndexOptions< + Ve, + KeyType = unknown, + ValueType = unknown, +> = OrderByOptions & { setSizeCallback?: (getSize: () => number) => void setWindowFn?: ( windowFn: (options: { offset?: number; limit?: number }) => void ) => void + groupByKey?: boolean + groupKeyFn?: (key: KeyType, value: ValueType) => unknown } /** @@ -142,7 +148,11 @@ export function orderByWithFractionalIndexBase< valueExtractor: ( value: T extends KeyValue ? V : never ) => Ve, - options?: OrderByWithFractionalIndexOptions + options?: OrderByWithFractionalIndexOptions< + Ve, + T extends KeyValue ? K : never, + T extends KeyValue ? V : never + > ) { type KeyType = T extends KeyValue ? K : never type ValueType = T extends KeyValue ? V : never @@ -160,6 +170,19 @@ export function orderByWithFractionalIndexBase< return 1 }) + type GroupKeyFn = (key: KeyType, value: ValueType) => unknown + const shouldGroupByKey = + options?.groupKeyFn !== undefined ? true : (options?.groupByKey ?? true) + + const resolvedGroupKeyFn: GroupKeyFn | undefined = + options?.groupKeyFn ?? + (shouldGroupByKey + ? (((key: KeyType) => + Array.isArray(key) + ? (key as unknown as Array)[0] + : key) as GroupKeyFn) + : undefined) + return ( stream: IStreamBuilder ): IStreamBuilder<[KeyType, [ValueType, string]]> => { @@ -172,6 +195,7 @@ export function orderByWithFractionalIndexBase< offset, setSizeCallback, setWindowFn, + groupKeyFn: resolvedGroupKeyFn, } ), consolidate() diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts index 858503f6f..f35273d5a 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts @@ -11,13 +11,14 @@ import type { HRange } from "../utils.js" import type { DifferenceStreamReader } from "../graph.js" import type { IStreamBuilder, PipedOperator } from "../types.js" -export interface TopKWithFractionalIndexOptions { +export interface TopKWithFractionalIndexOptions { limit?: number offset?: number setSizeCallback?: (getSize: () => number) => void setWindowFn?: ( windowFn: (options: { offset?: number; limit?: number }) => void ) => void + groupKeyFn?: (key: K, value: V) => unknown } export type TopKChanges = { @@ -36,6 +37,13 @@ export type TopKMoveChanges = { moveOuts: Array> } +const DEFAULT_GROUP = Symbol(`topk-default-group`) + +type GroupState = { + multiplicities: Map + topK: TopK> +} + /** * A topK data structure that supports insertions and deletions * and returns changes to the topK. @@ -243,40 +251,38 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< [K, T], [K, IndexedValue] > { - #index: Map = new Map() // maps keys to their multiplicity - - /** - * topK data structure that supports insertions and deletions - * and returns changes to the topK. - */ - #topK: TopK> + #groupStates: Map> = new Map() + #groupKeyFn?: (key: K, value: T) => unknown + #compareTaggedValues: (a: TaggedValue, b: TaggedValue) => number + #offset: number + #limit: number constructor( id: number, inputA: DifferenceStreamReader<[K, T]>, output: DifferenceStreamWriter<[K, IndexedValue]>, comparator: (a: T, b: T) => number, - options: TopKWithFractionalIndexOptions + options: TopKWithFractionalIndexOptions ) { super(id, inputA, output) - const limit = options.limit ?? Infinity - const offset = options.offset ?? 0 - const compareTaggedValues = ( + this.#groupKeyFn = options.groupKeyFn + this.#limit = options.limit ?? Infinity + this.#offset = options.offset ?? 0 + + this.#compareTaggedValues = ( a: TaggedValue, b: TaggedValue ) => { - // First compare on the value const valueComparison = comparator(getVal(a), getVal(b)) if (valueComparison !== 0) { return valueComparison } - // If the values are equal, compare on the tag (object identity) const tieBreakerA = getTag(a) const tieBreakerB = getTag(b) return tieBreakerA - tieBreakerB } - this.#topK = this.createTopK(offset, limit, compareTaggedValues) - options.setSizeCallback?.(() => this.#topK.size) + + options.setSizeCallback?.(() => this.getTotalSize()) options.setWindowFn?.(this.moveTopK.bind(this)) } @@ -288,28 +294,92 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< return new TopKArray(offset, limit, comparator) } - /** - * Moves the topK window based on the provided offset and limit. - * Any changes to the topK are sent to the output. - */ + private getTotalSize(): number { + let size = 0 + for (const state of this.#groupStates.values()) { + size += state.topK.size + } + return size + } + + private resolveGroupKey(key: K, value: T): unknown { + return this.#groupKeyFn ? this.#groupKeyFn(key, value) : DEFAULT_GROUP + } + + private getOrCreateGroupState(groupKey: unknown): GroupState { + let state = this.#groupStates.get(groupKey) + if (!state) { + state = { + multiplicities: new Map(), + topK: this.createTopK( + this.#offset, + this.#limit, + this.#compareTaggedValues + ), + } + this.#groupStates.set(groupKey, state) + } + return state + } + + private updateMultiplicity( + state: GroupState, + key: K, + multiplicity: number + ): { oldMultiplicity: number; newMultiplicity: number } { + if (multiplicity === 0) { + const current = state.multiplicities.get(key) ?? 0 + return { oldMultiplicity: current, newMultiplicity: current } + } + + const oldMultiplicity = state.multiplicities.get(key) ?? 0 + const newMultiplicity = oldMultiplicity + multiplicity + if (newMultiplicity === 0) { + state.multiplicities.delete(key) + } else { + state.multiplicities.set(key, newMultiplicity) + } + return { oldMultiplicity, newMultiplicity } + } + + private cleanupGroupIfEmpty(groupKey: unknown, state: GroupState) { + if (state.multiplicities.size === 0 && state.topK.size === 0) { + this.#groupStates.delete(groupKey) + } + } + moveTopK({ offset, limit }: { offset?: number; limit?: number }) { - if (!(this.#topK instanceof TopKArray)) { - throw new Error( - `Cannot move B+-tree implementation of TopK with fractional index` - ) + if (offset !== undefined) { + this.#offset = offset + } + if (limit !== undefined) { + this.#limit = limit } const result: Array<[[K, IndexedValue], number]> = [] + let hasChanges = false - const diff = this.#topK.move({ offset, limit }) + for (const state of this.#groupStates.values()) { + if (!(state.topK instanceof TopKArray)) { + throw new Error( + `Cannot move B+-tree implementation of TopK with fractional index` + ) + } + + const diff = state.topK.move({ + offset: this.#offset, + limit: this.#limit, + }) - diff.moveIns.forEach((moveIn) => this.handleMoveIn(moveIn, result)) - diff.moveOuts.forEach((moveOut) => this.handleMoveOut(moveOut, result)) + diff.moveIns.forEach((moveIn) => this.handleMoveIn(moveIn, result)) + diff.moveOuts.forEach((moveOut) => this.handleMoveOut(moveOut, result)) - if (diff.changes) { - // There are changes to the topK - // it could be that moveIns and moveOuts are empty - // because the collection is lazy, so we will run the graph again to load the data + if (diff.changes) { + hasChanges = true + } + } + + if (hasChanges) { this.output.sendData(new MultiSet(result)) } } @@ -334,32 +404,31 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< multiplicity: number, result: Array<[[K, IndexedValue], number]> ): void { - const { oldMultiplicity, newMultiplicity } = this.addKey(key, multiplicity) + const groupKey = this.resolveGroupKey(key, value) + const state = this.getOrCreateGroupState(groupKey) + + const { oldMultiplicity, newMultiplicity } = this.updateMultiplicity( + state, + key, + multiplicity + ) let res: TopKChanges> = { moveIn: null, moveOut: null, } if (oldMultiplicity <= 0 && newMultiplicity > 0) { - // The value was invisible but should now be visible - // Need to insert it into the array of sorted values const taggedValue = tagValue(key, value) - res = this.#topK.insert(taggedValue) + res = state.topK.insert(taggedValue) } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { - // The value was visible but should now be invisible - // Need to remove it from the array of sorted values const taggedValue = tagValue(key, value) - res = this.#topK.delete(taggedValue) - } else { - // The value was invisible and it remains invisible - // or it was visible and remains visible - // so it doesn't affect the topK + res = state.topK.delete(taggedValue) } this.handleMoveIn(res.moveIn, result) this.handleMoveOut(res.moveOut, result) - return + this.cleanupGroupIfEmpty(groupKey, state) } private handleMoveIn( @@ -387,24 +456,6 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< result.push([[k, [val, index]], -1]) } } - - private getMultiplicity(key: K): number { - return this.#index.get(key) ?? 0 - } - - private addKey( - key: K, - multiplicity: number - ): { oldMultiplicity: number; newMultiplicity: number } { - const oldMultiplicity = this.getMultiplicity(key) - const newMultiplicity = oldMultiplicity + multiplicity - if (newMultiplicity === 0) { - this.#index.delete(key) - } else { - this.#index.set(key, newMultiplicity) - } - return { oldMultiplicity, newMultiplicity } - } } /** @@ -419,9 +470,9 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< */ export function topKWithFractionalIndex( comparator: (a: T, b: T) => number, - options?: TopKWithFractionalIndexOptions + options?: TopKWithFractionalIndexOptions ): PipedOperator<[KType, T], [KType, IndexedValue]> { - const opts = options || {} + const opts: TopKWithFractionalIndexOptions = options ?? {} return ( stream: IStreamBuilder<[KType, T]> diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts b/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts index 7b094d7c4..218c866d4 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts @@ -277,9 +277,9 @@ export class TopKWithFractionalIndexBTreeOperator< */ export function topKWithFractionalIndexBTree( comparator: (a: T, b: T) => number, - options?: TopKWithFractionalIndexOptions + options?: TopKWithFractionalIndexOptions ): PipedOperator<[KType, T], [KType, IndexedValue]> { - const opts = options || {} + const opts: TopKWithFractionalIndexOptions = options ?? {} if (BTree === undefined) { throw new Error( diff --git a/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts index 5cf574757..c38567593 100644 --- a/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts +++ b/packages/db-ivm/tests/operators/orderByWithFractionalIndex.test.ts @@ -43,7 +43,7 @@ describe(`Operators`, () => { let latestMessage: any = null input.pipe( - orderBy((item) => item.value), + orderBy((item) => item.value, { groupByKey: false }), output((message) => { latestMessage = message }) @@ -93,6 +93,7 @@ describe(`Operators`, () => { input.pipe( orderBy((item) => item.value, { comparator: (a, b) => b.localeCompare(a), // reverse order + groupByKey: false, }), output((message) => { latestMessage = message @@ -141,7 +142,7 @@ describe(`Operators`, () => { let latestMessage: any = null input.pipe( - orderBy((item) => item.value, { limit: 3 }), + orderBy((item) => item.value, { limit: 3, groupByKey: false }), output((message) => { latestMessage = message }) @@ -190,6 +191,7 @@ describe(`Operators`, () => { orderBy((item) => item.value, { limit: 2, offset: 2, + groupByKey: false, }), output((message) => { latestMessage = message @@ -221,6 +223,157 @@ describe(`Operators`, () => { ]) }) + test(`per-key limits applied by default`, () => { + const graph = new D2() + const input = graph.newInput< + KeyValue< + [string, string], + { + id: string + value: number + } + > + >() + let latestMessage: any = null + + input.pipe( + orderBy((item) => item.value, { limit: 2 }), + output((message) => { + latestMessage = message + }) + ) + + graph.finalize() + + input.sendData( + new MultiSet([ + [[[`group1`, `a`], { id: `g1-a`, value: 5 }], 1], + [[[`group1`, `b`], { id: `g1-b`, value: 1 }], 1], + [[[`group1`, `c`], { id: `g1-c`, value: 3 }], 1], + [[[`group2`, `a`], { id: `g2-a`, value: 4 }], 1], + [[[`group2`, `b`], { id: `g2-b`, value: 2 }], 1], + [[[`group2`, `c`], { id: `g2-c`, value: 6 }], 1], + ]) + ) + + graph.run() + + expect(latestMessage).not.toBeNull() + + const result = latestMessage.getInner() + const groupedValues = new Map>() + + for (const [[key, [value]], multiplicity] of result) { + if (multiplicity !== 1) continue + const group = Array.isArray(key) + ? (key as Array)[0]! + : String(key) + const list = groupedValues.get(group) ?? [] + list.push((value as { value: number }).value) + groupedValues.set(group, list) + } + + for (const [group, values] of groupedValues) { + values.sort((a, b) => a - b) + groupedValues.set(group, values) + } + + expect(groupedValues.get(`group1`)).toEqual([1, 3]) + expect(groupedValues.get(`group2`)).toEqual([2, 4]) + }) + + test(`per-key limits stay correct through incremental updates`, () => { + const graph = new D2() + const input = graph.newInput< + KeyValue< + [string, string], + { + id: string + value: number + } + > + >() + const tracker = new MessageTracker< + [[string, string], [{ id: string; value: number }, string]] + >() + + input.pipe( + orderBy((item) => item.value, { limit: 2 }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + const seedData: Array< + [[[string, string], { id: string; value: number }], number] + > = [ + [[[`group1`, `a`], { id: `g1-a`, value: 5 }], 1], + [[[`group1`, `b`], { id: `g1-b`, value: 1 }], 1], + [[[`group1`, `c`], { id: `g1-c`, value: 3 }], 1], + [[[`group2`, `a`], { id: `g2-a`, value: 4 }], 1], + [[[`group2`, `b`], { id: `g2-b`, value: 2 }], 1], + [[[`group2`, `c`], { id: `g2-c`, value: 6 }], 1], + ] + + const collectStateFromResults = ( + entries: Array< + [[string, string], [{ id: string; value: number }, string]] + > + ) => { + const grouped = new Map>() + for (const [keyLike, [value]] of entries) { + const group = Array.isArray(keyLike) ? keyLike[0] : String(keyLike) + const list = grouped.get(group) ?? [] + list.push(value.value) + grouped.set(group, list) + } + for (const [group, values] of grouped) { + values.sort((a, b) => a - b) + grouped.set(group, values) + } + return grouped + } + + // Seed the data + input.sendData(new MultiSet(seedData)) + graph.run() + + const initial = tracker.getResult(compareFractionalIndex) + let messageOffset = initial.messages.length + let currentGroups = collectStateFromResults(initial.sortedResults) + expect(currentGroups.get(`group1`)).toEqual([1, 3]) + expect(currentGroups.get(`group2`)).toEqual([2, 4]) + + // Insert a better value into group1 - should evict the previous second entry + input.sendData( + new MultiSet([[[[`group1`, `d`], { id: `g1-d`, value: 0 }], 1]]) + ) + graph.run() + + const afterInsert = tracker.getResult(compareFractionalIndex) + const group1Delta = afterInsert.messages.slice(messageOffset) + messageOffset = afterInsert.messages.length + + const group1Messages = sortByKeyAndIndex( + group1Delta as Array< + [[[string, string], [{ id: string; value: number }, string]], number] + > + ).map(stripFractionalIndex) + expect(group1Messages).toHaveLength(2) + expect(group1Messages).toEqual( + expect.arrayContaining([ + [[`group1`, `c`], { id: `g1-c`, value: 3 }, -1], + [[`group1`, `d`], { id: `g1-d`, value: 0 }, 1], + ]) + ) + + currentGroups = collectStateFromResults(afterInsert.sortedResults) + expect(currentGroups.get(`group1`)).toEqual([0, 1]) + expect(currentGroups.get(`group2`)).toEqual([2, 4]) + }) + test(`ordering by numeric property`, () => { const graph = new D2() const input = graph.newInput< @@ -235,7 +388,7 @@ describe(`Operators`, () => { let latestMessage: any = null input.pipe( - orderBy((item) => item.id), + orderBy((item) => item.id, { groupByKey: false }), output((message) => { latestMessage = message }) @@ -283,7 +436,7 @@ describe(`Operators`, () => { let latestMessage: any = null input.pipe( - orderBy((item) => item.value, { limit: 3 }), + orderBy((item) => item.value, { limit: 3, groupByKey: false }), output((message) => { latestMessage = message }) @@ -349,7 +502,7 @@ describe(`Operators`, () => { >() input.pipe( - orderBy((item) => item.value, { limit: 3 }), + orderBy((item) => item.value, { limit: 3, groupByKey: false }), output((message) => { tracker.addMessage(message) }) @@ -421,7 +574,7 @@ describe(`Operators`, () => { >() input.pipe( - orderBy((item) => item.value, { limit: 3 }), + orderBy((item) => item.value, { limit: 3, groupByKey: false }), output((message) => { tracker.addMessage(message) }) @@ -504,6 +657,7 @@ describe(`Operators`, () => { orderByWithFractionalIndex((item) => item.value, { limit: 3, offset: 0, + groupByKey: false, setWindowFn: (fn) => { windowFn = fn }, @@ -578,6 +732,7 @@ describe(`Operators`, () => { orderByWithFractionalIndex((item) => item.value, { limit: 3, offset: 3, + groupByKey: false, setWindowFn: (fn) => { windowFn = fn }, @@ -652,6 +807,7 @@ describe(`Operators`, () => { orderByWithFractionalIndex((item) => item.value, { limit: 2, offset: 0, + groupByKey: false, setWindowFn: (fn) => { windowFn = fn }, @@ -741,6 +897,7 @@ describe(`Operators`, () => { orderByWithFractionalIndex((item) => item.value, { limit: 2, offset: 1, + groupByKey: false, setWindowFn: (fn) => { windowFn = fn }, @@ -820,6 +977,7 @@ describe(`Operators`, () => { orderByWithFractionalIndex((item) => item.value, { limit: 2, offset: 0, + groupByKey: false, setWindowFn: (fn) => { windowFn = fn }, @@ -912,15 +1070,13 @@ function sortByKeyAndIndex(results: Array) { ( [[aKey, [_aValue, _aIndex]], _aMultiplicity], [[bKey, [_bValue, _bIndex]], _bMultiplicity] - ) => aKey - bKey + ) => String(aKey).localeCompare(String(bKey)) ) .sort( ( [[_aKey, [_aValue, aIndex]], _aMultiplicity], [[_bKey, [_bValue, bIndex]], _bMultiplicity] ) => { - // lexically compare the index - // return aIndex.localeCompare(bIndex) return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 } ) diff --git a/packages/db/src/query/compiler/order-by.ts b/packages/db/src/query/compiler/order-by.ts index 679dceeda..6aaa4e2db 100644 --- a/packages/db/src/query/compiler/order-by.ts +++ b/packages/db/src/query/compiler/order-by.ts @@ -197,6 +197,7 @@ export function processOrderBy( limit, offset, comparator: compare, + groupByKey: false, setSizeCallback, setWindowFn: ( windowFn: (options: { offset?: number; limit?: number }) => void