Skip to content
5 changes: 5 additions & 0 deletions .changeset/seven-chefs-promise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core': minor
---

Added ability to specify app_metadata for sync/stream requests
9 changes: 9 additions & 0 deletions packages/service-core/src/routes/endpoints/socket-route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { SocketRouteGenerator } from '../router-socket.js';
import { SyncRoutes } from './sync-stream.js';

import { APIMetric, event_types } from '@powersync/service-types';
import { limitParamsForLogging } from '../../util/param-logging.js';

export const syncStreamReactive: SocketRouteGenerator = (router) =>
router.reactiveStream<util.StreamingSyncRequest, any>(SyncRoutes.STREAM, {
Expand Down Expand Up @@ -97,6 +98,13 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
// Must be set before we start the stream
tracker.setCompressed(connection.tracker.encoding);
}

const formattedAppMetadata = params.app_metadata ? limitParamsForLogging(params.app_metadata) : undefined;
logger.info('Sync stream started', {
app_metadata: formattedAppMetadata,
client_params: params.parameters ? limitParamsForLogging(params.parameters) : undefined
});

try {
for await (const data of sync.streamResponse({
syncContext: syncContext,
Expand Down Expand Up @@ -179,6 +187,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
}
logger.info(`Sync stream complete`, {
...tracker.getLogMeta(),
app_metadata: formattedAppMetadata,
stream_ms: Date.now() - streamStart,
close_reason: closeReason ?? 'unknown'
});
Expand Down
14 changes: 13 additions & 1 deletion packages/service-core/src/routes/endpoints/sync-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import { Readable } from 'stream';
import * as sync from '../../sync/sync-index.js';
import * as util from '../../util/util-index.js';

import { APIMetric, event_types } from '@powersync/service-types';
import { authUser } from '../auth.js';
import { routeDefinition } from '../router.js';
import { APIMetric, event_types } from '@powersync/service-types';

import { limitParamsForLogging } from '../../util/param-logging.js';
import { maybeCompressResponseStream } from '../compression.js';

export enum SyncRoutes {
Expand Down Expand Up @@ -75,6 +76,16 @@ export const syncStreamed = routeDefinition({

const controller = new AbortController();
const tracker = new sync.RequestTracker(metricsEngine);

const formattedAppMetadata = payload.params.app_metadata
? limitParamsForLogging(payload.params.app_metadata)
: undefined;

logger.info('Sync stream started', {
app_metadata: formattedAppMetadata,
client_params: payload.params.parameters ? limitParamsForLogging(payload.params.parameters) : undefined
});

try {
metricsEngine.getUpDownCounter(APIMetric.CONCURRENT_CONNECTIONS).add(1);
service_context.eventsEngine.emit(event_types.EventsEngineEventType.SDK_CONNECT_EVENT, sdkData);
Expand Down Expand Up @@ -149,6 +160,7 @@ export const syncStreamed = routeDefinition({
});
logger.info(`Sync stream complete`, {
...tracker.getLogMeta(),
app_metadata: formattedAppMetadata,
stream_ms: Date.now() - streamStart,
close_reason: closeReason ?? 'unknown'
});
Expand Down
14 changes: 4 additions & 10 deletions packages/service-core/src/sync/sync.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
import { JSONBig, JsonContainer } from '@powersync/service-jsonbig';
import {
BucketDescription,
BucketPriority,
RequestJwtPayload,
RequestParameters,
SqlSyncRules
} from '@powersync/service-sync-rules';
import { BucketDescription, BucketPriority, RequestJwtPayload } from '@powersync/service-sync-rules';

import { AbortError } from 'ix/aborterror.js';

Expand All @@ -14,11 +8,11 @@ import * as storage from '../storage/storage-index.js';
import * as util from '../util/util-index.js';

import { Logger, logger as defaultLogger } from '@powersync/lib-services-framework';
import { BucketChecksumState, CheckpointLine, VersionedSyncRules } from './BucketChecksumState.js';
import { mergeAsyncIterables } from '../streams/streams-index.js';
import { acquireSemaphoreAbortable, settledPromise, tokenStream, TokenStreamOptions } from './util.js';
import { SyncContext } from './SyncContext.js';
import { BucketChecksumState, CheckpointLine, VersionedSyncRules } from './BucketChecksumState.js';
import { OperationsSentStats, RequestTracker, statsForBatch } from './RequestTracker.js';
import { SyncContext } from './SyncContext.js';
import { TokenStreamOptions, acquireSemaphoreAbortable, settledPromise, tokenStream } from './util.js';

export interface SyncStreamParameters {
syncContext: SyncContext;
Expand Down
60 changes: 60 additions & 0 deletions packages/service-core/src/util/param-logging.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Options for {@link limitParamsForLogging}.
*/
export type ParamLoggingFormatOptions = {
maxKeyCount: number;
maxStringLength: number;
};

/**
* Default options for {@link limitParamsForLogging}.
*/
export const DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS: ParamLoggingFormatOptions = {
maxKeyCount: 20,
maxStringLength: 100
};

/**
* Formats potentially arbitrary parameters for logging.
* This limits the number of keys and strings to a maximum length.
* A warning key-value is added if the number of keys exceeds the maximum.
* String values exceeding the maximum length are truncated.
* Non-String values are stringified, the maximum length is then applied.
* @param params - The parameters to format.
* @param options - The options to use.
* @default DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS
* @returns The formatted parameters.
*/
export function limitParamsForLogging(
params: Record<string, any>,
options: Partial<ParamLoggingFormatOptions> = DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS
) {
const {
maxStringLength = DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxStringLength,
maxKeyCount = DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxKeyCount
} = options;

function trimString(value: string): string {
if (value.length > maxStringLength) {
return value.slice(0, maxStringLength - 3) + '...';
}
return value;
}

return Object.fromEntries(
Object.entries(params).map(([key, value], index) => {
if (index == maxKeyCount) {
return ['⚠️', 'Additional parameters omitted'];
}

if (index > maxKeyCount) {
return [];
}

if (typeof value == 'string') {
return [key, trimString(value)];
}
return [key, trimString(JSON.stringify(value))];
})
);
}
9 changes: 7 additions & 2 deletions packages/service-core/src/util/protocol-types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as t from 'ts-codec';
import { BucketPriority, SqliteJsonRow } from '@powersync/service-sync-rules';
import { JsonContainer } from '@powersync/service-jsonbig';
import { BucketPriority, SqliteJsonRow } from '@powersync/service-sync-rules';
import * as t from 'ts-codec';

export const BucketRequest = t.object({
name: t.string,
Expand Down Expand Up @@ -81,6 +81,11 @@ export const StreamingSyncRequest = t.object({
*/
parameters: t.record(t.any).optional(),

/**
* Application metadata to be used in logging.
*/
app_metadata: t.record(t.string).optional(),

/**
* Unique client id.
*/
Expand Down
95 changes: 94 additions & 1 deletion packages/service-core/test/src/routes/stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { BasicRouterRequest, Context, SyncRulesBucketStorage } from '@/index.js';
import { logger, RouterResponse, ServiceError } from '@powersync/lib-services-framework';
import { RouterResponse, ServiceError, logger } from '@powersync/lib-services-framework';
import { SqlSyncRules } from '@powersync/service-sync-rules';
import { Readable, Writable } from 'stream';
import { pipeline } from 'stream/promises';
import { describe, expect, it } from 'vitest';
import winston from 'winston';
import { syncStreamed } from '../../../src/routes/endpoints/sync-stream.js';
import { DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS, limitParamsForLogging } from '../../../src/util/param-logging.js';
import { mockServiceContext } from './mocks.js';

describe('Stream Route', () => {
Expand Down Expand Up @@ -77,6 +79,97 @@ describe('Stream Route', () => {
const r = await drainWithTimeout(stream).catch((error) => error);
expect(r.message).toContain('Simulated storage error');
});

it('logs the application metadata', async () => {
const storage = {
getParsedSyncRules() {
return new SqlSyncRules('bucket_definitions: {}');
},
watchCheckpointChanges: async function* (options) {
throw new Error('Simulated storage error');
}
} as Partial<SyncRulesBucketStorage>;
const serviceContext = mockServiceContext(storage);

// Create a custom format to capture log info objects (which include defaultMeta)
const capturedLogs: any[] = [];
const captureFormat = winston.format((info) => {
// Capture the info object which includes defaultMeta merged in
capturedLogs.push({ ...info });
return info;
});

// Create a test logger with the capture format
const testLogger = winston.createLogger({
format: winston.format.combine(captureFormat(), winston.format.json()),
transports: [new winston.transports.Console()]
});

const context: Context = {
logger: testLogger,
service_context: serviceContext,
token_payload: {
exp: new Date().getTime() / 1000 + 10000,
iat: new Date().getTime() / 1000 - 10000,
sub: 'test-user'
}
};

const request: BasicRouterRequest = {
headers: {
'accept-encoding': 'gzip'
},
hostname: '',
protocol: 'http'
};

const inputMeta = {
test: 'test',
long_meta: 'a'.repeat(1000)
};

const response = await (syncStreamed.handler({
context,
params: {
app_metadata: inputMeta,
parameters: {
user_name: 'bob',
nested_object: {
nested_key: 'b'.repeat(1000)
}
}
},
request
}) as Promise<RouterResponse>);
expect(response.status).toEqual(200);
const stream = response.data as Readable;
const r = await drainWithTimeout(stream).catch((error) => error);
expect(r.message).toContain('Simulated storage error');

// Find the "Sync stream started" log entry
const syncStartedLog = capturedLogs.find((log) => log.message === 'Sync stream started');
expect(syncStartedLog).toBeDefined();

// Verify that app_metadata from defaultMeta is present in the log
expect(syncStartedLog?.app_metadata).toBeDefined();
expect(syncStartedLog?.app_metadata).toEqual(limitParamsForLogging(inputMeta));
// Should trim long metadata
expect(syncStartedLog?.app_metadata.long_meta.length).toEqual(
DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxStringLength
);

// Verify the explicit log parameters
expect(syncStartedLog?.client_params).toEqual(
expect.objectContaining({
user_name: 'bob'
})
);

expect(typeof syncStartedLog?.client_params.nested_object).toEqual('string');
expect(syncStartedLog?.client_params.nested_object.length).toEqual(
DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxStringLength
);
});
});
});

Expand Down