Skip to content

feat: add ProcessPlatformAdapter for Node.js process environments#627

Open
jumski wants to merge 1 commit into
portable-worker-startfrom
portable-worker-process-adapter
Open

feat: add ProcessPlatformAdapter for Node.js process environments#627
jumski wants to merge 1 commit into
portable-worker-startfrom
portable-worker-process-adapter

Conversation

@jumski
Copy link
Copy Markdown
Contributor

@jumski jumski commented Jun 7, 2026

Adds ProcessPlatformAdapter to support running pgflow workers in Node.js (or any process-based runtime), complementing the existing SupabasePlatformAdapter for Deno/Edge environments.

The adapter handles OS signal registration (SIGTERM, SIGINT, SIGQUIT) for graceful shutdown, including double-signal force-exit behavior. It manages worker lifecycle by calling startOnlyOnce with startMode: 'process', marking the worker stopped in the database on shutdown, and conditionally closing the SQL connection only when the adapter owns it.

A processDeps abstraction is introduced to make the adapter fully testable without a real Node.js process, allowing signal handlers, process.exit, exit codes, and crypto.randomUUID to be injected in tests.

DATABASE_URL is added as a connection resolution option, slotting in between connectionString and EDGE_WORKER_DB_URL in priority order. createAdapter is updated to detect a process environment and return a ProcessPlatformAdapter automatically. CurrentPlatformEnv is broadened from SupabaseEnv to Record<string, string | undefined> to accommodate non-Supabase environments.

@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented Jun 7, 2026

⚠️ No Changeset found

Latest commit: 5e9c0a9

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Copy Markdown
Contributor Author

jumski commented Jun 7, 2026

@nx-cloud
Copy link
Copy Markdown

nx-cloud Bot commented Jun 7, 2026

View your CI Pipeline Execution ↗ for commit 5e9c0a9

Command Status Duration Result
nx test:types:health dsl ✅ Succeeded 11s View ↗

💡 Verify your cache is correct by running tasks in a sandbox. Read docs ↗


☁️ Nx Cloud last updated this comment at 2026-06-08 18:25:28 UTC

Comment on lines +80 to +95
async stopWorker(): Promise<void> {
this.requestShutdown();

try {
if (this.worker) {
await this.worker.stop();
}
if (this.workerId) {
await this.queries.markWorkerStopped(this.workerId);
}
} finally {
if (this.ownsSql) {
await this._platformResources.sql.end();
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition: stopWorker() can be called concurrently

If stopWorker() is called manually and then a signal arrives before it completes, handleSignal() will call stopWorker() again since shutdownStarted is only set in handleSignal(), not in stopWorker(). This causes:

  1. worker.stop() called twice (may not be idempotent)
  2. markWorkerStopped() called twice (could fail or create duplicate entries)
  3. sql.end() potentially called twice (will error on second call)

Fix: Add a guard in stopWorker() or set the shutdown flag there as well:

async stopWorker(): Promise<void> {
  if (this.shutdownStarted) {
    return;
  }
  this.shutdownStarted = true;
  this.requestShutdown();
  // ... rest of implementation
}
Suggested change
async stopWorker(): Promise<void> {
this.requestShutdown();
try {
if (this.worker) {
await this.worker.stop();
}
if (this.workerId) {
await this.queries.markWorkerStopped(this.workerId);
}
} finally {
if (this.ownsSql) {
await this._platformResources.sql.end();
}
}
}
async stopWorker(): Promise<void> {
if (this.shutdownStarted) {
return;
}
this.shutdownStarted = true;
this.requestShutdown();
try {
if (this.worker) {
await this.worker.stop();
}
if (this.workerId) {
await this.queries.markWorkerStopped(this.workerId);
}
} finally {
if (this.ownsSql) {
await this._platformResources.sql.end();
}
}
}

Spotted by Graphite

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

@jumski jumski force-pushed the portable-worker-process-adapter branch from 3cb8c81 to efd6d99 Compare June 8, 2026 17:50
@jumski jumski force-pushed the portable-worker-start branch from ca1dd9b to 8f4bb3e Compare June 8, 2026 17:50
@jumski jumski force-pushed the portable-worker-process-adapter branch from efd6d99 to 6b47ef1 Compare June 8, 2026 18:08
@jumski jumski force-pushed the portable-worker-process-adapter branch from 6b47ef1 to 5e9c0a9 Compare June 8, 2026 18:24
@jumski jumski force-pushed the portable-worker-start branch from 8f4bb3e to 30f8253 Compare June 8, 2026 18:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant