Skip to content

Commit ce768cd

Browse files
authored
Merge pull request #74 from powersync-ja/fix-mutex
Fix hanging streams
2 parents a9ec391 + 306b6d8 commit ce768cd

File tree

7 files changed

+73
-37
lines changed

7 files changed

+73
-37
lines changed

.changeset/dull-foxes-explode.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/service-rsocket-router': patch
3+
'@powersync/service-core': patch
4+
---
5+
6+
Fix hanging streams

packages/rsocket-router/src/router/ReactiveSocketRouter.ts

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,19 @@ export class ReactiveSocketRouter<C> {
8181
// RequestStream is currently the only supported connection type
8282
requestStream: (payload, initialN, responder) => {
8383
const observer = new SocketRouterObserver();
84+
const abortController = new AbortController();
8485

8586
// TODO: Consider limiting the number of active streams per connection to prevent abuse
86-
handleReactiveStream(context, { payload, initialN, responder }, observer, params).catch((ex) => {
87-
logger.error(ex);
88-
responder.onError(ex);
89-
responder.onComplete();
90-
});
87+
handleReactiveStream(context, { payload, initialN, responder }, observer, abortController, params).catch(
88+
(ex) => {
89+
logger.error(ex);
90+
responder.onError(ex);
91+
responder.onComplete();
92+
}
93+
);
9194
return {
9295
cancel: () => {
93-
observer.triggerCancel();
96+
abortController.abort();
9497
},
9598
onExtension: () => observer.triggerExtension(),
9699
request: (n) => observer.triggerRequest(n)
@@ -118,6 +121,7 @@ export async function handleReactiveStream<Context>(
118121
responder: SocketResponder;
119122
},
120123
observer: SocketRouterObserver,
124+
abortController: AbortController,
121125
params: CommonParams<Context>
122126
) {
123127
const { payload, responder, initialN } = request;
@@ -154,7 +158,13 @@ export async function handleReactiveStream<Context>(
154158
}
155159

156160
if (authorize) {
157-
const isAuthorized = await authorize({ params: requestPayload, context, observer, responder });
161+
const isAuthorized = await authorize({
162+
params: requestPayload,
163+
context,
164+
observer,
165+
signal: abortController.signal,
166+
responder
167+
});
158168
if (!isAuthorized.authorized) {
159169
return exitWithError(new errors.AuthorizationError(isAuthorized.errors));
160170
}
@@ -165,6 +175,7 @@ export async function handleReactiveStream<Context>(
165175
params: requestPayload,
166176
context,
167177
observer,
178+
signal: abortController.signal,
168179
responder,
169180
initialN
170181
});

packages/rsocket-router/src/router/SocketRouterListener.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,11 @@
11
import { BaseObserver } from '../utils/BaseObserver.js';
22

33
export interface SocketRouterListener {
4-
cancel: () => void;
54
onExtension: () => void;
65
request: (n: number) => void;
76
}
87

98
export class SocketRouterObserver extends BaseObserver<SocketRouterListener> {
10-
triggerCancel() {
11-
this.iterateListeners((l) => l.cancel?.());
12-
}
13-
149
triggerExtension() {
1510
this.iterateListeners((l) => l.onExtension?.());
1611
}

packages/rsocket-router/src/router/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export type SocketResponder = OnTerminalSubscriber & OnNextSubscriber & OnExtens
2525
export type CommonStreamPayload = {
2626
observer: SocketRouterObserver;
2727
responder: SocketResponder;
28+
signal: AbortSignal;
2829
};
2930

3031
export type ReactiveStreamPayload<O> = CommonStreamPayload & {

packages/rsocket-router/tests/src/requests.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ async function handleRoute(path: string, endpoints: ReactiveEndpoint[], responde
2323
responder
2424
},
2525
createMockObserver(),
26+
new AbortController(),
2627
{
2728
contextProvider: async () => ({}),
2829
endpoints,

packages/service-core/src/routes/endpoints/socket-route.ts

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,26 @@ import { SyncRoutes } from './sync-stream.js';
1111
export const syncStreamReactive: SocketRouteGenerator = (router) =>
1212
router.reactiveStream<util.StreamingSyncRequest, any>(SyncRoutes.STREAM, {
1313
validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }),
14-
handler: async ({ context, params, responder, observer, initialN }) => {
14+
handler: async ({ context, params, responder, observer, initialN, signal: upstreamSignal }) => {
1515
const { system } = context;
1616

17+
// Create our own controller that we can abort directly
18+
const controller = new AbortController();
19+
upstreamSignal.addEventListener('abort', () => {
20+
controller.abort();
21+
});
22+
if (upstreamSignal.aborted) {
23+
controller.abort();
24+
}
25+
const signal = controller.signal;
26+
27+
let requestedN = initialN;
28+
const disposer = observer.registerListener({
29+
request(n) {
30+
requestedN += n;
31+
}
32+
});
33+
1734
if (system.closed) {
1835
responder.onError(
1936
new errors.JourneyError({
@@ -26,8 +43,6 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
2643
return;
2744
}
2845

29-
const controller = new AbortController();
30-
3146
const syncParams = new RequestParameters(context.token_payload!, params.parameters ?? {});
3247

3348
const storage = system.storage;
@@ -45,18 +60,8 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
4560
return;
4661
}
4762

48-
let requestedN = initialN;
49-
const disposer = observer.registerListener({
50-
request(n) {
51-
requestedN += n;
52-
},
53-
cancel: () => {
54-
controller.abort();
55-
}
56-
});
57-
5863
const removeStopHandler = system.addStopHandler(() => {
59-
observer.triggerCancel();
64+
controller.abort();
6065
});
6166

6267
Metrics.getInstance().concurrent_connections.add(1);
@@ -75,8 +80,11 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
7580
keep_alive: false
7681
},
7782
tracker,
78-
signal: controller.signal
83+
signal
7984
})) {
85+
if (signal.aborted) {
86+
break;
87+
}
8088
if (data == null) {
8189
// Empty value just to flush iterator memory
8290
continue;
@@ -93,22 +101,25 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
93101
tracker.addDataSynced(serialized.length);
94102
}
95103

96-
if (requestedN <= 0) {
104+
if (requestedN <= 0 && !signal.aborted) {
97105
await new Promise<void>((resolve) => {
98106
const l = observer.registerListener({
99107
request() {
100108
if (requestedN > 0) {
101109
// Management of updating the total requested items is done above
102110
resolve();
103111
l();
112+
signal.removeEventListener('abort', onAbort);
104113
}
105-
},
106-
cancel: () => {
107-
// Don't wait here if the request is cancelled
108-
resolve();
109-
l();
110114
}
111115
});
116+
const onAbort = () => {
117+
// Don't wait here if the request is cancelled
118+
resolve();
119+
l();
120+
signal.removeEventListener('abort', onAbort);
121+
};
122+
signal.addEventListener('abort', onAbort);
112123
});
113124
}
114125
}

packages/service-core/src/sync/sync.ts

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,34 @@
11
import { JSONBig, JsonContainer } from '@powersync/service-jsonbig';
22
import { RequestParameters } from '@powersync/service-sync-rules';
3-
import { Semaphore } from 'async-mutex';
3+
import { Semaphore, withTimeout } from 'async-mutex';
44
import { AbortError } from 'ix/aborterror.js';
55

66
import * as auth from '../auth/auth-index.js';
77
import * as storage from '../storage/storage-index.js';
88
import * as util from '../util/util-index.js';
99

1010
import { logger } from '@powersync/lib-services-framework';
11-
import { Metrics } from '../metrics/Metrics.js';
1211
import { mergeAsyncIterables } from './merge.js';
13-
import { TokenStreamOptions, tokenStream } from './util.js';
1412
import { RequestTracker } from './RequestTracker.js';
13+
import { TokenStreamOptions, tokenStream } from './util.js';
1514

1615
/**
1716
* Maximum number of connections actively fetching data.
1817
*/
1918
const MAX_ACTIVE_CONNECTIONS = 10;
20-
const syncSemaphore = new Semaphore(MAX_ACTIVE_CONNECTIONS);
19+
20+
/**
21+
* Maximum duration to wait for the mutex to become available.
22+
*
23+
* This gives an explicit error if there are mutex issues, rather than just hanging.
24+
*/
25+
const MUTEX_ACQUIRE_TIMEOUT = 30_000;
26+
27+
const syncSemaphore = withTimeout(
28+
new Semaphore(MAX_ACTIVE_CONNECTIONS),
29+
MUTEX_ACQUIRE_TIMEOUT,
30+
new Error(`Timeout while waiting for data`)
31+
);
2132

2233
export interface SyncStreamParameters {
2334
storage: storage.BucketStorageFactory;

0 commit comments

Comments
 (0)