Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 84 additions & 66 deletions packages/core/src/cross-spawn-spawner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ const toPlatformError = (
}

type ExitSignal = Deferred.Deferred<readonly [code: number | null, signal: NodeJS.Signals | null]>
// Node emits `exit` when the process ends and `close` only after shared stdio closes.
// Descendants can inherit those handles, so process and stdio lifetimes are distinct.
type ProcessLifecycle = {
readonly exited: ExitSignal
readonly closed: ExitSignal
}
type SpawnedProcess = {
readonly proc: NodeChildProcess.ChildProcess
readonly lifecycle: ProcessLifecycle
}

export const make = Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem
Expand Down Expand Up @@ -263,24 +273,24 @@ export const make = Effect.gen(function* () {
}

const spawn = (command: ChildProcess.StandardCommand, opts: NodeChildProcess.SpawnOptions) =>
Effect.callback<readonly [NodeChildProcess.ChildProcess, ExitSignal], PlatformError.PlatformError>((resume) => {
const signal = Deferred.makeUnsafe<readonly [code: number | null, signal: NodeJS.Signals | null]>()
Effect.callback<SpawnedProcess, PlatformError.PlatformError>((resume) => {
const lifecycle: ProcessLifecycle = {
exited: Deferred.makeUnsafe(),
closed: Deferred.makeUnsafe(),
}
const proc = launch(command.command, command.args, opts)
let end = false
let exit: readonly [code: number | null, signal: NodeJS.Signals | null] | undefined
proc.on("error", (err) => {
resume(Effect.fail(toPlatformError("spawn", err, command)))
})
proc.on("exit", (...args) => {
exit = args
Deferred.doneUnsafe(lifecycle.exited, Exit.succeed(args))
})
proc.on("close", (...args) => {
if (end) return
end = true
Deferred.doneUnsafe(signal, Exit.succeed(exit ?? args))
Deferred.doneUnsafe(lifecycle.closed, Exit.succeed(args))
Deferred.doneUnsafe(lifecycle.exited, Exit.succeed(args))
})
proc.on("spawn", () => {
resume(Effect.succeed([proc, signal]))
resume(Effect.succeed({ proc, lifecycle }))
})
return Effect.sync(() => {
proc.kill("SIGTERM")
Expand Down Expand Up @@ -319,26 +329,51 @@ export const make = Effect.gen(function* () {
return Effect.fail(toPlatformError("kill", new Error("Failed to kill child process"), command))
})

const timeout =
(
proc: NodeChildProcess.ChildProcess,
command: ChildProcess.StandardCommand,
opts: ChildProcess.KillOptions | undefined,
) =>
<A, E, R>(
f: (
command: ChildProcess.StandardCommand,
proc: NodeChildProcess.ChildProcess,
signal: NodeJS.Signals,
) => Effect.Effect<A, E, R>,
) => {
const signal = opts?.killSignal ?? "SIGTERM"
if (Predicate.isUndefined(opts?.forceKillAfter)) return f(command, proc, signal)
return Effect.timeoutOrElse(f(command, proc, signal), {
duration: opts.forceKillAfter,
orElse: () => f(command, proc, "SIGKILL"),
})
}
const signalProcessGroup = (
command: ChildProcess.StandardCommand,
proc: NodeChildProcess.ChildProcess,
signal: NodeJS.Signals,
) => Effect.catch(killGroup(command, proc, signal), () => killOne(command, proc, signal))

const terminateProcessGroup = (
command: ChildProcess.StandardCommand,
proc: NodeChildProcess.ChildProcess,
lifecycle: ProcessLifecycle,
opts?: ChildProcess.KillOptions,
) => {
const graceful = signalProcessGroup(command, proc, opts?.killSignal ?? "SIGTERM").pipe(
Effect.andThen(Deferred.await(lifecycle.closed)),
Effect.asVoid,
)
if (!opts?.forceKillAfter) return graceful
return Effect.timeoutOrElse(graceful, {
duration: opts.forceKillAfter,
// Preserve the existing escalation policy, but stop waiting for shared stdio:
// an escaped descendant can retain those handles after the process exits.
orElse: () =>
signalProcessGroup(command, proc, "SIGKILL").pipe(
Effect.andThen(Deferred.await(lifecycle.exited)),
Effect.asVoid,
),
})
}

const reapFailedProcessGroup = (
command: ChildProcess.StandardCommand,
proc: NodeChildProcess.ChildProcess,
lifecycle: ProcessLifecycle,
) => {
const signal = signalProcessGroup(command, proc, command.options.killSignal ?? "SIGTERM")
if (!command.options.forceKillAfter) return signal
return signal.pipe(
Effect.andThen(Deferred.await(lifecycle.closed)),
Effect.timeoutOrElse({
duration: command.options.forceKillAfter,
orElse: () => signalProcessGroup(command, proc, "SIGKILL"),
}),
Effect.asVoid,
)
}

const source = (handle: ChildProcessHandle, from: ChildProcess.PipeFromOption | undefined) => {
const opt = from ?? "stdout"
Expand Down Expand Up @@ -368,7 +403,7 @@ export const make = Effect.gen(function* () {
const extra = fds(command.options)
const dir = yield* cwd(command.options)

const [proc, signal] = yield* Effect.acquireRelease(
const spawned = yield* Effect.acquireRelease(
spawn(command, {
cwd: dir,
env: env(command.options),
Expand All @@ -377,42 +412,34 @@ export const make = Effect.gen(function* () {
shell: command.options.shell,
windowsHide: process.platform === "win32",
}),
Effect.fnUntraced(function* ([proc, signal]) {
const done = yield* Deferred.isDone(signal)
const kill = timeout(proc, command, command.options)
if (done) {
const [code] = yield* Deferred.await(signal)
Effect.fnUntraced(function* (spawned) {
const exited = yield* Deferred.isDone(spawned.lifecycle.exited)
if (exited) {
const [code] = yield* Deferred.await(spawned.lifecycle.exited)
if (process.platform === "win32") return yield* Effect.void
if (code !== 0 && Predicate.isNotNull(code)) return yield* Effect.ignore(kill(killGroup))
if (code !== 0 && Predicate.isNotNull(code))
return yield* Effect.ignore(reapFailedProcessGroup(command, spawned.proc, spawned.lifecycle))
return yield* Effect.void
}
const send = (s: NodeJS.Signals) =>
Effect.catch(killGroup(command, proc, s), () => killOne(command, proc, s))
const sig = command.options.killSignal ?? "SIGTERM"
const attempt = send(sig).pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid)
const escalated = command.options.forceKillAfter
? Effect.timeoutOrElse(attempt, {
duration: command.options.forceKillAfter,
orElse: () => send("SIGKILL").pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid),
})
: attempt
return yield* Effect.ignore(escalated)
return yield* Effect.ignore(
terminateProcessGroup(command, spawned.proc, spawned.lifecycle, command.options),
)
}),
)

const fd = yield* setupFds(command, proc, extra)
const out = setupOutput(command, proc, sout, serr)
const fd = yield* setupFds(command, spawned.proc, extra)
const out = setupOutput(command, spawned.proc, sout, serr)
let ref = true
return makeHandle({
pid: ProcessId(proc.pid!),
stdin: yield* setupStdin(command, proc, sin),
pid: ProcessId(spawned.proc.pid!),
stdin: yield* setupStdin(command, spawned.proc, sin),
stdout: out.stdout,
stderr: out.stderr,
all: out.all,
getInputFd: fd.getInputFd,
getOutputFd: fd.getOutputFd,
isRunning: Effect.map(Deferred.isDone(signal), (done) => !done),
exitCode: Effect.flatMap(Deferred.await(signal), ([code, signal]) => {
isRunning: Effect.map(Deferred.isDone(spawned.lifecycle.exited), (done) => !done),
exitCode: Effect.flatMap(Deferred.await(spawned.lifecycle.exited), ([code, signal]) => {
if (Predicate.isNotNull(code)) return Effect.succeed(ExitCode(code))
return Effect.fail(
toPlatformError(
Expand All @@ -422,25 +449,16 @@ export const make = Effect.gen(function* () {
),
)
}),
kill: (opts?: ChildProcess.KillOptions) => {
const sig = opts?.killSignal ?? "SIGTERM"
const send = (s: NodeJS.Signals) =>
Effect.catch(killGroup(command, proc, s), () => killOne(command, proc, s))
const attempt = send(sig).pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid)
if (!opts?.forceKillAfter) return attempt
return Effect.timeoutOrElse(attempt, {
duration: opts.forceKillAfter,
orElse: () => send("SIGKILL").pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid),
})
},
kill: (opts?: ChildProcess.KillOptions) =>
terminateProcessGroup(command, spawned.proc, spawned.lifecycle, opts),
unref: Effect.sync(() => {
if (ref) {
proc.unref()
spawned.proc.unref()
ref = false
}
return Effect.sync(() => {
if (!ref) {
proc.ref()
spawned.proc.ref()
ref = true
}
})
Expand Down
Loading
Loading