|
| 1 | +--- |
| 2 | +title: Input Streams |
| 3 | +sidebarTitle: Input Streams |
| 4 | +description: Send data into running tasks from your backend code |
| 5 | +--- |
| 6 | + |
| 7 | +The Input Streams API allows you to send data into running Trigger.dev tasks from your backend code. This enables bidirectional communication — while [output streams](/realtime/backend/streams) let you read data from tasks, input streams let you push data into them. |
| 8 | + |
| 9 | +<Note> |
| 10 | + To learn how to receive input stream data inside your tasks, see our [Input Streams](/tasks/input-streams) documentation. For frontend applications using React, see our [React hooks input streams documentation](/realtime/react-hooks/input-streams). |
| 11 | +</Note> |
| 12 | + |
| 13 | +## Sending data to a running task |
| 14 | + |
| 15 | +### Using defined input streams (Recommended) |
| 16 | + |
| 17 | +The recommended approach is to use [defined input streams](/tasks/input-streams#defining-input-streams) for full type safety: |
| 18 | + |
| 19 | +```ts |
| 20 | +import { cancelSignal, approval } from "./trigger/streams"; |
| 21 | + |
| 22 | +// Cancel a running AI stream |
| 23 | +await cancelSignal.send(runId, { reason: "User clicked stop" }); |
| 24 | + |
| 25 | +// Approve a draft |
| 26 | +await approval.send(runId, { approved: true, reviewer: "alice@example.com" }); |
| 27 | +``` |
| 28 | + |
| 29 | +The `.send()` method is fully typed — the data parameter must match the generic type you defined on the input stream. |
| 30 | + |
| 31 | +<Note> |
| 32 | + `.send()` works the same regardless of how the task is listening — whether it uses `.wait()` (suspending), `.once()` (non-suspending), or `.on()` (continuous). The sender doesn't need to know how the task is consuming the data. See [Input Streams](/tasks/input-streams) for details on each receiving method. |
| 33 | +</Note> |
| 34 | + |
| 35 | +## Practical examples |
| 36 | + |
| 37 | +### Cancel from a Next.js API route |
| 38 | + |
| 39 | +```ts app/api/cancel/route.ts |
| 40 | +import { cancelStream } from "@/trigger/streams"; |
| 41 | + |
| 42 | +export async function POST(req: Request) { |
| 43 | + const { runId } = await req.json(); |
| 44 | + |
| 45 | + await cancelStream.send(runId, { reason: "User clicked stop" }); |
| 46 | + |
| 47 | + return Response.json({ cancelled: true }); |
| 48 | +} |
| 49 | +``` |
| 50 | + |
| 51 | +### Approval workflow API |
| 52 | + |
| 53 | +```ts app/api/approve/route.ts |
| 54 | +import { approval } from "@/trigger/streams"; |
| 55 | + |
| 56 | +export async function POST(req: Request) { |
| 57 | + const { runId, approved, reviewer } = await req.json(); |
| 58 | + |
| 59 | + await approval.send(runId, { |
| 60 | + approved, |
| 61 | + reviewer, |
| 62 | + }); |
| 63 | + |
| 64 | + return Response.json({ success: true }); |
| 65 | +} |
| 66 | +``` |
| 67 | + |
| 68 | +### Remix action handler |
| 69 | + |
| 70 | +```ts app/routes/api.approve.ts |
| 71 | +import { json, type ActionFunctionArgs } from "@remix-run/node"; |
| 72 | +import { approval } from "~/trigger/streams"; |
| 73 | + |
| 74 | +export async function action({ request }: ActionFunctionArgs) { |
| 75 | + const formData = await request.formData(); |
| 76 | + const runId = formData.get("runId") as string; |
| 77 | + const approved = formData.get("approved") === "true"; |
| 78 | + const reviewer = formData.get("reviewer") as string; |
| 79 | + |
| 80 | + await approval.send(runId, { approved, reviewer }); |
| 81 | + |
| 82 | + return json({ success: true }); |
| 83 | +} |
| 84 | +``` |
| 85 | + |
| 86 | +### Express handler |
| 87 | + |
| 88 | +```ts |
| 89 | +import express from "express"; |
| 90 | +import { cancelSignal } from "./trigger/streams"; |
| 91 | + |
| 92 | +const app = express(); |
| 93 | +app.use(express.json()); |
| 94 | + |
| 95 | +app.post("/api/cancel", async (req, res) => { |
| 96 | + const { runId, reason } = req.body; |
| 97 | + |
| 98 | + await cancelSignal.send(runId, { reason }); |
| 99 | + |
| 100 | + res.json({ cancelled: true }); |
| 101 | +}); |
| 102 | +``` |
| 103 | + |
| 104 | +### Sending from another task |
| 105 | + |
| 106 | +You can send input stream data from one task to another running task: |
| 107 | + |
| 108 | +```ts |
| 109 | +import { task } from "@trigger.dev/sdk"; |
| 110 | +import { approval } from "./streams"; |
| 111 | + |
| 112 | +export const reviewerTask = task({ |
| 113 | + id: "auto-reviewer", |
| 114 | + run: async (payload: { targetRunId: string }) => { |
| 115 | + // Perform automated review logic... |
| 116 | + const isApproved = await performReview(); |
| 117 | + |
| 118 | + // Send approval to the waiting task |
| 119 | + await approval.send(payload.targetRunId, { |
| 120 | + approved: isApproved, |
| 121 | + reviewer: "auto-reviewer", |
| 122 | + }); |
| 123 | + }, |
| 124 | +}); |
| 125 | +``` |
| 126 | + |
| 127 | +## Error handling |
| 128 | + |
| 129 | +The `.send()` method will throw if: |
| 130 | + |
| 131 | +- The run has already completed, failed, or been canceled |
| 132 | +- The payload exceeds the 1MB size limit |
| 133 | +- The run ID is invalid |
| 134 | + |
| 135 | +```ts |
| 136 | +import { cancelSignal } from "./trigger/streams"; |
| 137 | + |
| 138 | +try { |
| 139 | + await cancelSignal.send(runId, { reason: "User clicked stop" }); |
| 140 | +} catch (error) { |
| 141 | + console.error("Failed to send:", error); |
| 142 | + // Handle the error — the run may have already completed |
| 143 | +} |
| 144 | +``` |
| 145 | + |
| 146 | +## Important notes |
| 147 | + |
| 148 | +- Maximum payload size per `.send()` call is **1MB** |
| 149 | +- You cannot send data to a completed, failed, or canceled run |
| 150 | +- Data sent before a listener is registered inside the task is **buffered** and delivered when a listener attaches |
| 151 | +- Input streams require [Realtime Streams v2](/tasks/streams#enabling-streams-v2) (enabled by default in SDK 4.1.0+) |
0 commit comments