Skip to content

Add metronome and redis support#358

Closed
kdhillon-stripe wants to merge 13 commits intomainfrom
metronome-redis
Closed

Add metronome and redis support#358
kdhillon-stripe wants to merge 13 commits intomainfrom
metronome-redis

Conversation

@kdhillon-stripe
Copy link
Copy Markdown
Collaborator

@kdhillon-stripe kdhillon-stripe commented May 5, 2026

Summary

Adds

  • source-metronome connector
  • destination-redis connector
  • engine/service registration, generated OpenAPI updates
  • a PixelDraw demo that reads Metronome-derived state from Redis and writes usage back to Metronome.

How to test (optional)

see demo/README.md

@@ -0,0 +1,103 @@
import { describe, it, expect, beforeAll, afterAll } from 'vitest'
@kdhillon-stripe kdhillon-stripe changed the title new prototype Add metronome and redis support May 5, 2026
@kdhillon-stripe kdhillon-stripe marked this pull request as ready for review May 5, 2026 07:40
Copilot AI review requested due to automatic review settings May 5, 2026 07:40
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new source-metronome connector and destination-redis sink, wires both into the engine/service/OpenAPI surfaces, and adds demo tooling around a Metronome→Redis pipeline plus a PixelDraw example app.

Changes:

  • Add the new Metronome source connector, including API client, resource catalog, webhook handling, CLI entrypoint, and tests.
  • Add the new Redis destination connector, including config/spec, write path, teardown, CLI entrypoint, and tests.
  • Register both connectors across engine/service/OpenAPI generation and add demo/e2e scripts plus a PixelDraw Redis-backed demo app.

Reviewed changes

Copilot reviewed 50 out of 56 changed files in this pull request and generated 13 comments.

Show a summary per file
File Description
scripts/webhook-relay.sh Adds webhook.site polling/forwarding helper.
scripts/run-metronome-redis-pipeline.sh Adds foreground Metronome→Redis pipeline runner.
scripts/generate-openapi-specs.ts Registers new source/destination for spec generation.
scripts/e2e-metronome-redis.sh Adds sandbox E2E pipeline script.
pnpm-lock.yaml Locks new workspace packages and Redis deps.
packages/source-stripe/src/spec.ts Clarifies Stripe webhook config descriptions.
packages/source-metronome/tsconfig.json Adds TS config for new source package.
packages/source-metronome/src/webhook.ts Implements Metronome webhook server/signature checks.
packages/source-metronome/src/webhook.test.ts Adds webhook handling tests.
packages/source-metronome/src/spec.ts Defines Metronome source config/state/input spec.
packages/source-metronome/src/resources.ts Declares Metronome stream/resource catalog.
packages/source-metronome/src/logger.ts Adds package logger.
packages/source-metronome/src/index.ts Implements Metronome source behavior.
packages/source-metronome/src/index.test.ts Adds source behavior tests.
packages/source-metronome/src/client.ts Adds Metronome API client/pagination/retry logic.
packages/source-metronome/src/client.test.ts Adds client tests.
packages/source-metronome/src/bin.ts Adds connector CLI entrypoint.
packages/source-metronome/package.json Defines new source package metadata.
packages/destination-redis/tsconfig.json Adds TS config for new destination package.
packages/destination-redis/src/spec.ts Defines Redis destination config spec.
packages/destination-redis/src/logger.ts Adds package logger.
packages/destination-redis/src/integration.test.ts Adds Redis integration tests.
packages/destination-redis/src/index.ts Implements Redis destination write/check/teardown.
packages/destination-redis/src/index.test.ts Adds Redis unit tests.
packages/destination-redis/src/bin.ts Adds connector CLI entrypoint.
packages/destination-redis/package.json Defines new destination package metadata.
examples/pixel-app/start.sh Adds demo launcher/env loader.
examples/pixel-app/server.js Adds Redis-backed PixelDraw demo server.
examples/pixel-app/README.md Documents PixelDraw demo flow.
examples/pixel-app/public/index.html Adds PixelDraw demo UI.
examples/pixel-app/package.json Adds demo app package manifest.
examples/pixel-app/package-lock.json Locks demo app npm deps.
e2e/package.json Adds new connector packages to e2e workspace.
e2e/connector-loading.test.sh Packs/tests new connector tarballs.
Dockerfile Adds demo image target.
demo/webhooksite.sh Generalizes webhook.site helper text.
demo/README.md Documents Metronome→Redis demo flow.
demo/metronome-redis-mvp-catalog.json Adds curated Metronome demo catalog.
demo/compose.metronome-redis.yml Adds demo compose stack for Redis pipeline.
compose.yml Adds local Redis service.
apps/service/src/cli.ts Registers new connectors in service CLI.
apps/service/src/api/app.ts Extends webhook ingress for Metronome.
apps/service/src/api/app.test.ts Adds Metronome webhook API test.
apps/service/src/generated/openapi.json Regenerates service OpenAPI JSON.
apps/service/src/generated/openapi.d.ts Regenerates service OpenAPI types.
apps/service/package.json Adds new connector deps to service app.
apps/engine/src/request-context.ts Adds internal-request header constant.
apps/engine/src/lib/remote-engine.ts Wraps raw remote source input messages.
apps/engine/src/lib/remote-engine.test.ts Adds remote input wrapping tests.
apps/engine/src/lib/default-connectors.ts Registers new default connectors.
apps/engine/src/cli/command.ts Lazily builds API command/OpenAPI CLI.
apps/engine/src/api/app.ts Suppresses logging for internal OpenAPI fetches.
apps/engine/src/tests/cli-command.test.ts Tests new CLI command initialization behavior.
apps/engine/src/generated/openapi.json Regenerates engine OpenAPI JSON.
apps/engine/src/generated/openapi.d.ts Regenerates engine OpenAPI types.
apps/engine/package.json Adds new connector deps to engine app.
Files not reviewed (4)
  • apps/engine/src/generated/openapi.json: Language not supported
  • apps/service/src/generated/openapi.json: Language not supported
  • examples/pixel-app/package-lock.json: Language not supported
  • pnpm-lock.yaml: Language not supported

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +192 to +203
const pk = streamKeyColumns.get(stream) ?? ['id']
const recordData = data as Record<string, unknown>
const key = buildRecordKey(keyPrefix, stream, pk, recordData)
const softDeleteField = streamSoftDeleteFields.get(stream)
const deleteValue =
softDeleteField !== undefined ? recordData[softDeleteField] : recordData.deleted
const buffer = streamBuffers.get(stream)!
buffer.push(
isSoftDeleted(deleteValue)
? { op: 'del', key }
: { op: 'set', key, value: JSON.stringify(data) }
)
Comment on lines +137 to +146
try {
const pipeline = redis.pipeline()
for (const op of buffer) {
if (op.op === 'del') {
pipeline.del(op.key)
} else {
pipeline.set(op.key, op.value)
}
}
await pipeline.exec()
Comment on lines +90 to +95
await push({ event, raw_body: body, verified })
res.writeHead(200).end('{"received":true}')
} catch (err) {
const message = err instanceof Error ? err.message : String(err)
log.error({ error: message }, 'metronome: webhook processing error')
res.writeHead(400).end(message)
Comment on lines +679 to +686
item.resolve()
} catch (err) {
const message = err instanceof Error ? err.message : String(err)
log.error(
{ error: message, eventType: item.event.type },
'metronome: webhook event processing error'
)
item.resolve() // still resolve to unblock HTTP response
Comment on lines +618 to +655
// After backfill: start webhook server for live updates
if (config.webhook_port) {
log.info(
{ port: config.webhook_port, webhookUrl: config.webhook_url },
'metronome: starting webhook listener for live updates'
)

type QueueItem = { event: MetronomeWebhookEvent; resolve: () => void }
const queue: QueueItem[] = []
let waiter: ((item: QueueItem) => void) | null = null

const server = startWebhookServer(config.webhook_port, config.webhook_secret, (input) => {
if (!REFRESH_EVENT_TYPES.has(input.event.type)) {
log.debug({ eventType: input.event.type }, 'metronome: ignoring non-refresh event')
return
}
const { promise, resolve } = Promise.withResolvers<void>()
const item = { event: input.event, resolve }
if (waiter) {
const w = waiter
waiter = null
w(item)
} else {
queue.push(item)
}
// Block HTTP response until we've processed the event
return promise
})

try {
// Process webhook events forever (until abort)
while (true) {
const item: QueueItem = await new Promise((resolve) => {
if (queue.length > 0) {
resolve(queue.shift()!)
} else {
waiter = resolve
}
Comment on lines +148 to +157
curl -s -X POST "${METRONOME_API_ROOT}/v1/ingest" \
-H "Authorization: Bearer $METRONOME_API_TOKEN" \
-H "Content-Type: application/json" \
-d "[
{\"customer_id\": \"$CUSTOMER_ID\", \"event_type\": \"api_call\", \"timestamp\": \"$TS\", \"transaction_id\": \"e2e_$(date +%s)_1\"},
{\"customer_id\": \"$CUSTOMER_ID\", \"event_type\": \"api_call\", \"timestamp\": \"$TS\", \"transaction_id\": \"e2e_$(date +%s)_2\"},
{\"customer_id\": \"$CUSTOMER_ID\", \"event_type\": \"api_call\", \"timestamp\": \"$TS\", \"transaction_id\": \"e2e_$(date +%s)_3\"},
{\"customer_id\": \"$CUSTOMER_ID\", \"event_type\": \"api_call\", \"timestamp\": \"$TS\", \"transaction_id\": \"e2e_$(date +%s)_4\"},
{\"customer_id\": \"$CUSTOMER_ID\", \"event_type\": \"api_call\", \"timestamp\": \"$TS\", \"transaction_id\": \"e2e_$(date +%s)_5\"}
]" >/dev/null
Comment on lines +127 to +140
sleep 5
if ! kill -0 "$PIPE_PID" 2>/dev/null; then
echo "ERROR: Sync pipeline exited early. Last log lines:"
tail -80 "$PIPE_LOG" || true
exit 1
fi

echo "Step 1: Backfill complete."
echo ""

# Step 2: Check initial state
echo "Step 2: Initial Redis state after backfill:"
BALANCE_BEFORE=$(redis_cli GET "$NET_BALANCE_KEY" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('balance', '<missing>'))")
SYNCED_BEFORE=$(redis_cli GET "$NET_BALANCE_KEY" | python3 -c "import sys,json; print(json.load(sys.stdin)['_synced_at'])")
Comment on lines +118 to +128
primaryKey: [['customer_id'], ['_page_slot']],
perCustomer: true,
emitPageSnapshots: true,
pageLimit: 25,
postBodyMerge: {
include_balance: true,
include_contract_balances: true,
include_ledgers: false,
},
catalogNotes:
'One record per paginated page; primary key is customer + page index. Redis intent: metronome:customer:{customer_id}:balances:{page_tag}.',
Comment on lines +618 to +623
// After backfill: start webhook server for live updates
if (config.webhook_port) {
log.info(
{ port: config.webhook_port, webhookUrl: config.webhook_url },
'metronome: starting webhook listener for live updates'
)
Comment on lines +157 to +170
const now = Math.floor(Date.now() / 1000)
for await (const page of client.paginate(
'POST',
endpoint,
{ customer_id: customerId, ...body },
{ pageLimit }
)) {
for (const record of page.data) {
yield msg.record({
stream,
data: {
...(record as Record<string, unknown>),
customer_id: (record as Record<string, unknown>)['customer_id'] ?? customerId,
_synced_at: now,
Copy link
Copy Markdown
Collaborator

@tonyxiao tonyxiao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dockerfile should not be changed, but otherwise looks good

const openapiResponse = await Promise.resolve(app.request('/openapi.json'))
const openapiResponse = await Promise.resolve(
app.request('/openapi.json', {
headers: { [ENGINE_INTERNAL_REQUEST_HEADER]: 'true' },
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this change needed?

/** Default in-process connectors bundled with the engine. */
export const defaultConnectors: RegisteredConnectors = {
sources: { stripe: sourceStripe },
sources: { stripe: sourceStripe, metronome: sourceMetronome },
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still wonder if there is a way for us to do this without having it as part of the default. but can come later. Will require testing before going to prod to ensure we don't impact prod

if (input) {
stdin = []
for await (const m of input) stdin.push(m)
for await (const m of input) stdin.push(toRemoteInputMessage(m))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this change needed?

{ eventId: event.id, eventType: event.type, pipeline_id },
'webhook event ingested'
)
if (pipeline.source.type === 'stripe') {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

later this should be extracted into the source interface so we don't have to do stuff like this. // comment TODO would be nice

@@ -0,0 +1,31 @@
services:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's fine to consolidate into a single root compose file

* Start an HTTP server that receives Metronome webhook events.
* Verifies signatures if a secret is provided, then pushes parsed events.
*/
export function startWebhookServer(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is probably a bad pattern from source-stripe that we should get rid of.

@@ -0,0 +1,217 @@
#!/usr/bin/env bash
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should have an example pipeline.json for this as well

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should use the single pipeline runner for this

REDIS_URL="${REDIS_URL:-redis://localhost:$REDIS_PORT}"
WEBHOOK_PORT="${WEBHOOK_PORT:-4243}"
KEY_PREFIX="${KEY_PREFIX:-sync:}"
METRONOME_API_ROOT="${METRONOME_BASE_URL:-https://api.metronome.com}"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above. custom runner script should not be needed.

Comment thread scripts/webhook-relay.sh
# Usage: ./scripts/webhook-relay.sh <webhook-site-token> <local-url>
set -euo pipefail

TOKEN="${1:?Usage: webhook-relay.sh <webhook-site-token> <local-url>}"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine for now but feels like this should just be a .ts script if it's gonna be this involved and have so much inline py

Comment thread Dockerfile Outdated
CMD ["serve", "--temporal-address", "temporal:7233", "--temporal-task-queue", "sync-engine"]

# ===========================================================================
# Metronome Redis demo runner
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should definitely not be part of the docker file.

@cla-assistant
Copy link
Copy Markdown

cla-assistant Bot commented May 5, 2026

CLA assistant check
All committers have signed the CLA.

tonyxiao and others added 13 commits May 5, 2026 12:19
Add two new connector packages:

- **source-metronome**: Reads from the Metronome billing API (customers,
  contracts, products, rate cards, credit grants, invoices, entitlements).
  Supports cursor-based pagination, per-customer/per-contract fan-out,
  and webhook-driven live updates for credit balance and entitlement
  changes.

- **destination-redis**: Writes synced data to Redis as individual SET
  keys (`{prefix}{stream}:{pk}`). Pipelined batch writes, per-stream
  failure tracking, SCAN-based teardown.

Also:
- Register both connectors in the engine (default-connectors.ts)
- Add Redis service to compose.yml (port 56379)
- Regenerate OpenAPI specs with new connector config schemas
- Add e2e test script (scripts/e2e-metronome-redis.sh)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
PixelDraw demo app: each pixel drawn sends a usage event to Metronome,
credit balance is gated via Metronome-synced data in Redis (no local
state). The sync pipeline keeps Redis up to date via webhooks.

- Express server with /api/draw (hot path) and /api/credits
- Canvas UI with color picker and live credit balance display
- Usage events sent to Metronome ingest API per pixel
- Balance checked from Metronome-synced credit_grants in Redis only

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Snapshot the Metronome-to-Redis MVP so the connector, service webhook path, and PixelDraw demo can be validated together locally.

Committed-By-Agent: cursor
Committed-By-Agent: cursor
Add a Redis sidecar Compose flow and PixelDraw runbook so the Metronome demo can be started and validated consistently.

Committed-By-Agent: cursor
Wire the new Redis and Metronome connector packages into e2e conformance tests and refresh generated OpenAPI artifacts.

Committed-By-Agent: cursor
Use live-shaped Metronome fixtures and package the new connector tarballs in the connector-loading E2E so the PR protects both Metronome behavior and existing bundled connector installs.

Committed-By-Agent: cursor
Committed-By-Agent: cursor
@tonyxiao
Copy link
Copy Markdown
Collaborator

tonyxiao commented May 7, 2026

Merged in eb20481

@tonyxiao tonyxiao closed this May 7, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants