Skip to content

Commit 0156d10

Browse files
authored
Streams: Support subqueries with multiple parameters (#416)
1 parent 57f7660 commit 0156d10

File tree

4 files changed

+177
-9
lines changed

4 files changed

+177
-9
lines changed

.changeset/old-buckets-lay.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-sync-rules': patch
3+
---
4+
5+
Fix compiling streams with multiple parameter matchers in a subquery.

packages/sync-rules/src/streams/from_sql.ts

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -407,18 +407,63 @@ class SyncStreamCompiler {
407407
tools.error('This subquery must return exactly one column', query);
408408
}
409409

410-
const column = tools.compileRowValueExtractor(query.columns?.[0]?.expr);
411-
if (isClauseError(column)) {
410+
const columnOrError = tools.compileRowValueExtractor(query.columns?.[0]?.expr);
411+
if (isClauseError(columnOrError)) {
412412
return;
413413
}
414+
const column = columnOrError;
414415

415-
const where = tools.compileClause(query.where);
416+
const where = this.whereClauseToFilters(tools, query.where);
417+
const filter = where.toDisjunctiveNormalForm(tools);
416418

419+
function checkValidSubqueryFilter(
420+
operator: FilterOperator
421+
): CompareRowValueWithStreamParameter | EvaluateSimpleCondition | null {
422+
if (operator instanceof CompareRowValueWithStreamParameter || operator instanceof EvaluateSimpleCondition) {
423+
return operator;
424+
}
425+
426+
tools.error('Unsupported condition for stream subqueries', operator.location ?? undefined);
427+
return null;
428+
}
429+
430+
function constructSubquery(filter: FilterOperator) {
431+
if (filter instanceof Or) {
432+
// Subqueries can't have variants, so the DNF must be a single conjunction.
433+
if (filter.inner.length != 1) {
434+
tools.error("Stream subqueries can't use OR filters", filter.location ?? undefined);
435+
}
436+
437+
return constructSubquery(filter.inner[0]);
438+
} else if (filter instanceof And) {
439+
const first = checkValidSubqueryFilter(filter.inner[0]);
440+
if (!first) {
441+
return;
442+
}
443+
const subquery = new Subquery(sourceTable, column, first);
444+
for (const rest of filter.inner.slice(1)) {
445+
const checked = checkValidSubqueryFilter(rest);
446+
if (checked) {
447+
subquery.addFilter(checked);
448+
}
449+
}
450+
451+
return subquery;
452+
} else {
453+
const validated = checkValidSubqueryFilter(filter);
454+
if (validated) {
455+
return new Subquery(sourceTable, column, validated);
456+
}
457+
}
458+
}
459+
460+
const compiledSubquery = constructSubquery(filter);
417461
this.errors.push(...tools.errors);
418-
return [
419-
new Subquery(sourceTable, column, this.compiledClauseToFilter(tools, query.where?._location, where)),
420-
tools
421-
];
462+
463+
if (!compiledSubquery) {
464+
return;
465+
}
466+
return [compiledSubquery, tools];
422467
}
423468

424469
private checkValidSelectStatement(stmt: Statement) {

packages/sync-rules/src/streams/variant.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,10 @@ export class StreamVariant {
214214
return [];
215215
}
216216

217-
// This will be an array of values (i.e. a total evaluation) because there are no dynamic parameters.
218-
return this.partiallyEvaluateParameters(params) as SqliteJsonValue[][];
217+
return this.cartesianProductOfParameterInstantiations(
218+
// This will be an array of values (i.e. a total evaluation) because there are no dynamic parameters.
219+
this.partiallyEvaluateParameters(params) as SqliteJsonValue[][]
220+
);
219221
}
220222

221223
/**

packages/sync-rules/test/src/streams.test.ts

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,33 @@ describe('streams', () => {
511511
]);
512512
});
513513

514+
test('OR in subquery', () => {
515+
const [_, errors] = syncStreamFromSql(
516+
's',
517+
`select * from comments where issue_id in (select id from issues where owner_id = auth.user_id() or name = 'test')`,
518+
options
519+
);
520+
521+
expect(errors).toMatchObject([
522+
expect.toBeSqlRuleError(`Stream subqueries can't use OR filters`, `owner_id = auth.user_id() or name = 'test'`)
523+
]);
524+
});
525+
526+
test('nested subqueries', () => {
527+
const [_, errors] = syncStreamFromSql(
528+
's',
529+
`select * from comments where issue_id in (select id from issues where owner_id in (select id from users where is_admin))`,
530+
options
531+
);
532+
533+
expect(errors).toMatchObject([
534+
expect.toBeSqlRuleError(
535+
`Unsupported condition for stream subqueries`,
536+
`owner_id in (select id from users where is_admin`
537+
)
538+
]);
539+
});
540+
514541
test('subquery with two columns', () => {
515542
const [_, errors] = syncStreamFromSql(
516543
's',
@@ -719,6 +746,95 @@ describe('streams', () => {
719746
stream.resolveResultSets(schema, outputSchema);
720747
expect(Object.keys(outputSchema)).toStrictEqual(['outer']);
721748
});
749+
750+
test('multiple matchers in subquery', async () => {
751+
// https://discord.com/channels/1138230179878154300/1422138173907144724/1443338137660031117
752+
const scene = new TestSourceTable('Scene');
753+
const projectInvitation = new TestSourceTable('ProjectInvitation');
754+
const schema = new StaticSchema([
755+
{
756+
tag: DEFAULT_TAG,
757+
schemas: [
758+
{
759+
name: 'test_schema',
760+
tables: [
761+
{
762+
name: 'Scene',
763+
columns: [
764+
{ name: '_id', pg_type: 'uuid' },
765+
{ name: 'project', pg_type: 'uuid' }
766+
]
767+
},
768+
{
769+
name: 'ProjectInvitation',
770+
columns: [
771+
{ name: 'project', pg_type: 'uuid' },
772+
{ name: 'appliedTo', pg_type: 'text' },
773+
{ name: 'appliedTo', pg_type: 'text' },
774+
{ name: 'status', pg_type: 'text' }
775+
]
776+
}
777+
]
778+
}
779+
]
780+
}
781+
]);
782+
783+
const desc = parseStream(
784+
`SELECT _id as id, *
785+
FROM "Scene"
786+
WHERE
787+
project IN (
788+
SELECT project
789+
FROM "ProjectInvitation"
790+
WHERE "appliedTo" != ''
791+
AND (auth.parameters() ->> 'haystack_id') IN "appliedTo"
792+
AND project = subscription.parameter('project')
793+
AND "status" = 'CLAIMED'
794+
)
795+
`,
796+
'stream',
797+
{ ...options, schema }
798+
);
799+
800+
expect(evaluateBucketIds(desc, scene, { _id: 'scene', project: 'foo' })).toStrictEqual(['1#stream|0["foo"]']);
801+
802+
expect(
803+
desc.evaluateParameterRow(projectInvitation, {
804+
project: 'foo',
805+
appliedTo: '[1,2]',
806+
status: 'CLAIMED'
807+
})
808+
).toStrictEqual([
809+
{
810+
lookup: ParameterLookup.normalized('stream', '0', [1n, 'foo']),
811+
bucketParameters: [
812+
{
813+
result: 'foo'
814+
}
815+
]
816+
},
817+
{
818+
lookup: ParameterLookup.normalized('stream', '0', [2n, 'foo']),
819+
bucketParameters: [
820+
{
821+
result: 'foo'
822+
}
823+
]
824+
}
825+
]);
826+
827+
expect(
828+
await queryBucketIds(desc, {
829+
token: { sub: 'user1', haystack_id: 1 },
830+
parameters: { project: 'foo' },
831+
getParameterSets(lookups) {
832+
expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', [1n, 'foo'])]);
833+
return [{ result: 'foo' }];
834+
}
835+
})
836+
).toStrictEqual(['1#stream|0["foo"]']);
837+
});
722838
});
723839
});
724840

0 commit comments

Comments
 (0)