Skip to content

Commit 6eee0b3

Browse files
feat: Sync Stream Metadata Logs (#413)
* add app metadata * rename variable * Improve truncation. Increase limits. Log at sync stream start and end. * remove from websocket general meta * cleanup * remove streams for now * changeset typo * Code cleanup * explicit client params in test * update logging limits
1 parent 0156d10 commit 6eee0b3

File tree

7 files changed

+192
-14
lines changed

7 files changed

+192
-14
lines changed

.changeset/seven-chefs-promise.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-core': minor
3+
---
4+
5+
Added ability to specify app_metadata for sync/stream requests

packages/service-core/src/routes/endpoints/socket-route.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { SocketRouteGenerator } from '../router-socket.js';
66
import { SyncRoutes } from './sync-stream.js';
77

88
import { APIMetric, event_types } from '@powersync/service-types';
9+
import { limitParamsForLogging } from '../../util/param-logging.js';
910

1011
export const syncStreamReactive: SocketRouteGenerator = (router) =>
1112
router.reactiveStream<util.StreamingSyncRequest, any>(SyncRoutes.STREAM, {
@@ -97,6 +98,13 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
9798
// Must be set before we start the stream
9899
tracker.setCompressed(connection.tracker.encoding);
99100
}
101+
102+
const formattedAppMetadata = params.app_metadata ? limitParamsForLogging(params.app_metadata) : undefined;
103+
logger.info('Sync stream started', {
104+
app_metadata: formattedAppMetadata,
105+
client_params: params.parameters ? limitParamsForLogging(params.parameters) : undefined
106+
});
107+
100108
try {
101109
for await (const data of sync.streamResponse({
102110
syncContext: syncContext,
@@ -179,6 +187,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
179187
}
180188
logger.info(`Sync stream complete`, {
181189
...tracker.getLogMeta(),
190+
app_metadata: formattedAppMetadata,
182191
stream_ms: Date.now() - streamStart,
183192
close_reason: closeReason ?? 'unknown'
184193
});

packages/service-core/src/routes/endpoints/sync-stream.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ import { Readable } from 'stream';
55
import * as sync from '../../sync/sync-index.js';
66
import * as util from '../../util/util-index.js';
77

8+
import { APIMetric, event_types } from '@powersync/service-types';
89
import { authUser } from '../auth.js';
910
import { routeDefinition } from '../router.js';
10-
import { APIMetric, event_types } from '@powersync/service-types';
1111

12+
import { limitParamsForLogging } from '../../util/param-logging.js';
1213
import { maybeCompressResponseStream } from '../compression.js';
1314

1415
export enum SyncRoutes {
@@ -75,6 +76,16 @@ export const syncStreamed = routeDefinition({
7576

7677
const controller = new AbortController();
7778
const tracker = new sync.RequestTracker(metricsEngine);
79+
80+
const formattedAppMetadata = payload.params.app_metadata
81+
? limitParamsForLogging(payload.params.app_metadata)
82+
: undefined;
83+
84+
logger.info('Sync stream started', {
85+
app_metadata: formattedAppMetadata,
86+
client_params: payload.params.parameters ? limitParamsForLogging(payload.params.parameters) : undefined
87+
});
88+
7889
try {
7990
metricsEngine.getUpDownCounter(APIMetric.CONCURRENT_CONNECTIONS).add(1);
8091
service_context.eventsEngine.emit(event_types.EventsEngineEventType.SDK_CONNECT_EVENT, sdkData);
@@ -149,6 +160,7 @@ export const syncStreamed = routeDefinition({
149160
});
150161
logger.info(`Sync stream complete`, {
151162
...tracker.getLogMeta(),
163+
app_metadata: formattedAppMetadata,
152164
stream_ms: Date.now() - streamStart,
153165
close_reason: closeReason ?? 'unknown'
154166
});

packages/service-core/src/sync/sync.ts

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
11
import { JSONBig, JsonContainer } from '@powersync/service-jsonbig';
2-
import {
3-
BucketDescription,
4-
BucketPriority,
5-
RequestJwtPayload,
6-
RequestParameters,
7-
SqlSyncRules
8-
} from '@powersync/service-sync-rules';
2+
import { BucketDescription, BucketPriority, RequestJwtPayload } from '@powersync/service-sync-rules';
93

104
import { AbortError } from 'ix/aborterror.js';
115

@@ -14,11 +8,11 @@ import * as storage from '../storage/storage-index.js';
148
import * as util from '../util/util-index.js';
159

1610
import { Logger, logger as defaultLogger } from '@powersync/lib-services-framework';
17-
import { BucketChecksumState, CheckpointLine, VersionedSyncRules } from './BucketChecksumState.js';
1811
import { mergeAsyncIterables } from '../streams/streams-index.js';
19-
import { acquireSemaphoreAbortable, settledPromise, tokenStream, TokenStreamOptions } from './util.js';
20-
import { SyncContext } from './SyncContext.js';
12+
import { BucketChecksumState, CheckpointLine, VersionedSyncRules } from './BucketChecksumState.js';
2113
import { OperationsSentStats, RequestTracker, statsForBatch } from './RequestTracker.js';
14+
import { SyncContext } from './SyncContext.js';
15+
import { TokenStreamOptions, acquireSemaphoreAbortable, settledPromise, tokenStream } from './util.js';
2216

2317
export interface SyncStreamParameters {
2418
syncContext: SyncContext;
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* Options for {@link limitParamsForLogging}.
3+
*/
4+
export type ParamLoggingFormatOptions = {
5+
maxKeyCount: number;
6+
maxStringLength: number;
7+
};
8+
9+
/**
10+
* Default options for {@link limitParamsForLogging}.
11+
*/
12+
export const DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS: ParamLoggingFormatOptions = {
13+
maxKeyCount: 20,
14+
maxStringLength: 100
15+
};
16+
17+
/**
18+
* Formats potentially arbitrary parameters for logging.
19+
* This limits the number of keys and strings to a maximum length.
20+
* A warning key-value is added if the number of keys exceeds the maximum.
21+
* String values exceeding the maximum length are truncated.
22+
* Non-String values are stringified, the maximum length is then applied.
23+
* @param params - The parameters to format.
24+
* @param options - The options to use.
25+
* @default DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS
26+
* @returns The formatted parameters.
27+
*/
28+
export function limitParamsForLogging(
29+
params: Record<string, any>,
30+
options: Partial<ParamLoggingFormatOptions> = DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS
31+
) {
32+
const {
33+
maxStringLength = DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxStringLength,
34+
maxKeyCount = DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxKeyCount
35+
} = options;
36+
37+
function trimString(value: string): string {
38+
if (value.length > maxStringLength) {
39+
return value.slice(0, maxStringLength - 3) + '...';
40+
}
41+
return value;
42+
}
43+
44+
return Object.fromEntries(
45+
Object.entries(params).map(([key, value], index) => {
46+
if (index == maxKeyCount) {
47+
return ['⚠️', 'Additional parameters omitted'];
48+
}
49+
50+
if (index > maxKeyCount) {
51+
return [];
52+
}
53+
54+
if (typeof value == 'string') {
55+
return [key, trimString(value)];
56+
}
57+
return [key, trimString(JSON.stringify(value))];
58+
})
59+
);
60+
}

packages/service-core/src/util/protocol-types.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import * as t from 'ts-codec';
2-
import { BucketPriority, SqliteJsonRow } from '@powersync/service-sync-rules';
31
import { JsonContainer } from '@powersync/service-jsonbig';
2+
import { BucketPriority, SqliteJsonRow } from '@powersync/service-sync-rules';
3+
import * as t from 'ts-codec';
44

55
export const BucketRequest = t.object({
66
name: t.string,
@@ -81,6 +81,11 @@ export const StreamingSyncRequest = t.object({
8181
*/
8282
parameters: t.record(t.any).optional(),
8383

84+
/**
85+
* Application metadata to be used in logging.
86+
*/
87+
app_metadata: t.record(t.string).optional(),
88+
8489
/**
8590
* Unique client id.
8691
*/

packages/service-core/test/src/routes/stream.test.ts

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import { BasicRouterRequest, Context, SyncRulesBucketStorage } from '@/index.js';
2-
import { logger, RouterResponse, ServiceError } from '@powersync/lib-services-framework';
2+
import { RouterResponse, ServiceError, logger } from '@powersync/lib-services-framework';
33
import { SqlSyncRules } from '@powersync/service-sync-rules';
44
import { Readable, Writable } from 'stream';
55
import { pipeline } from 'stream/promises';
66
import { describe, expect, it } from 'vitest';
7+
import winston from 'winston';
78
import { syncStreamed } from '../../../src/routes/endpoints/sync-stream.js';
9+
import { DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS, limitParamsForLogging } from '../../../src/util/param-logging.js';
810
import { mockServiceContext } from './mocks.js';
911

1012
describe('Stream Route', () => {
@@ -77,6 +79,97 @@ describe('Stream Route', () => {
7779
const r = await drainWithTimeout(stream).catch((error) => error);
7880
expect(r.message).toContain('Simulated storage error');
7981
});
82+
83+
it('logs the application metadata', async () => {
84+
const storage = {
85+
getParsedSyncRules() {
86+
return new SqlSyncRules('bucket_definitions: {}');
87+
},
88+
watchCheckpointChanges: async function* (options) {
89+
throw new Error('Simulated storage error');
90+
}
91+
} as Partial<SyncRulesBucketStorage>;
92+
const serviceContext = mockServiceContext(storage);
93+
94+
// Create a custom format to capture log info objects (which include defaultMeta)
95+
const capturedLogs: any[] = [];
96+
const captureFormat = winston.format((info) => {
97+
// Capture the info object which includes defaultMeta merged in
98+
capturedLogs.push({ ...info });
99+
return info;
100+
});
101+
102+
// Create a test logger with the capture format
103+
const testLogger = winston.createLogger({
104+
format: winston.format.combine(captureFormat(), winston.format.json()),
105+
transports: [new winston.transports.Console()]
106+
});
107+
108+
const context: Context = {
109+
logger: testLogger,
110+
service_context: serviceContext,
111+
token_payload: {
112+
exp: new Date().getTime() / 1000 + 10000,
113+
iat: new Date().getTime() / 1000 - 10000,
114+
sub: 'test-user'
115+
}
116+
};
117+
118+
const request: BasicRouterRequest = {
119+
headers: {
120+
'accept-encoding': 'gzip'
121+
},
122+
hostname: '',
123+
protocol: 'http'
124+
};
125+
126+
const inputMeta = {
127+
test: 'test',
128+
long_meta: 'a'.repeat(1000)
129+
};
130+
131+
const response = await (syncStreamed.handler({
132+
context,
133+
params: {
134+
app_metadata: inputMeta,
135+
parameters: {
136+
user_name: 'bob',
137+
nested_object: {
138+
nested_key: 'b'.repeat(1000)
139+
}
140+
}
141+
},
142+
request
143+
}) as Promise<RouterResponse>);
144+
expect(response.status).toEqual(200);
145+
const stream = response.data as Readable;
146+
const r = await drainWithTimeout(stream).catch((error) => error);
147+
expect(r.message).toContain('Simulated storage error');
148+
149+
// Find the "Sync stream started" log entry
150+
const syncStartedLog = capturedLogs.find((log) => log.message === 'Sync stream started');
151+
expect(syncStartedLog).toBeDefined();
152+
153+
// Verify that app_metadata from defaultMeta is present in the log
154+
expect(syncStartedLog?.app_metadata).toBeDefined();
155+
expect(syncStartedLog?.app_metadata).toEqual(limitParamsForLogging(inputMeta));
156+
// Should trim long metadata
157+
expect(syncStartedLog?.app_metadata.long_meta.length).toEqual(
158+
DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxStringLength
159+
);
160+
161+
// Verify the explicit log parameters
162+
expect(syncStartedLog?.client_params).toEqual(
163+
expect.objectContaining({
164+
user_name: 'bob'
165+
})
166+
);
167+
168+
expect(typeof syncStartedLog?.client_params.nested_object).toEqual('string');
169+
expect(syncStartedLog?.client_params.nested_object.length).toEqual(
170+
DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxStringLength
171+
);
172+
});
80173
});
81174
});
82175

0 commit comments

Comments
 (0)