Skip to content

Commit 540e1c8

Browse files
ericallamclaude
andauthored
feat: Input Streams - Bidirectional task communication (#3146)
Input streams enable sending typed data to executing tasks from external callers — backends, frontends, or other tasks. This unlocks interactive use cases like approval UIs, cancel buttons, chat interfaces, and human-in-the-loop AI workflows where the task needs to receive data while running. Three consumption patterns inside a task: * `.wait()` — Suspend the task until data arrives (process freed, most efficient) * `.once()` — Wait for the next message (process stays alive) * `.on()` — Subscribe to a continuous stream of messages One send pattern from outside: * `.send(runId, data)` — Send typed data to a specific run's input stream ## User-facing API ### Define a typed input stream ```ts import { streams, task } from "@trigger.dev/sdk"; const approval = streams.input<{ approved: boolean; reviewer: string }>({ id: "approval" }); ``` ### Consume inside a task ```ts export const myTask = task({ id: "my-task", run: async () => { // Pattern 1: Suspend until data arrives (most efficient — frees the process) const result = await approval.wait({ timeout: "5m" }); // Pattern 2: Wait for next message (process stays alive) const data = await approval.once().unwrap(); // Pattern 3: Subscribe to multiple messages approval.on((data) => { /* handle each message */ }); }, }); ``` ### Send from outside ```ts // From a backend (using secret API key) await approval.send(runId, { approved: true, reviewer: "alice" }); // From a frontend (using public JWT token from trigger response) const { send } = useInputStreamSend("approval", runId, { accessToken }); send({ approved: true, reviewer: "alice" }); ``` --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 2af5c86 commit 540e1c8

File tree

63 files changed

+2875
-155
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+2875
-155
lines changed

.changeset/input-stream-wait.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/react-hooks": patch
4+
---
5+
6+
Add input streams for bidirectional communication with running tasks. Define typed input streams with `streams.input<T>({ id })`, then consume inside tasks via `.wait()` (suspends the process), `.once()` (waits for next message), or `.on()` (subscribes to a continuous stream). Send data from backends with `.send(runId, data)` or from frontends with the new `useInputStreamSend` React hook.
7+
8+
Upgrade S2 SDK from 0.17 to 0.22 with support for custom endpoints (s2-lite) via the new `endpoints` configuration, `AppendRecord.string()` API, and `maxInflightBytes` session option.

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,5 @@ apps/**/public/build
6767
**/.claude/settings.local.json
6868
.mcp.log
6969
.mcp.json
70-
.cursor/debug.log
70+
.cursor/debug.log
71+
ailogger-output.log
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Add input streams with API routes for sending data to running tasks, SSE reading, and waitpoint creation. Includes Redis cache for fast `.send()` to `.wait()` bridging, dashboard span support for input stream operations, and s2-lite support with configurable S2 endpoint, access token skipping, and S2-Basin headers for self-hosted deployments. Adds s2-lite to Docker Compose for local development.

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ const S2EnvSchema = z.preprocess(
3939
S2_ENABLED: z.literal("1"),
4040
S2_ACCESS_TOKEN: z.string(),
4141
S2_DEPLOYMENT_LOGS_BASIN_NAME: z.string(),
42+
S2_DEPLOYMENT_STREAMS_LOCAL: z.string().default("0"),
4243
}),
4344
z.object({
4445
S2_ENABLED: z.literal("0"),
@@ -1344,6 +1345,8 @@ const EnvironmentSchema = z
13441345

13451346
REALTIME_STREAMS_S2_BASIN: z.string().optional(),
13461347
REALTIME_STREAMS_S2_ACCESS_TOKEN: z.string().optional(),
1348+
REALTIME_STREAMS_S2_ENDPOINT: z.string().optional(),
1349+
REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS: z.enum(["true", "false"]).default("false"),
13471350
REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS: z.coerce
13481351
.number()
13491352
.int()

apps/webapp/app/presenters/v3/DeploymentPresenter.server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ export class DeploymentPresenter {
217217
let eventStream = undefined;
218218
if (
219219
env.S2_ENABLED === "1" &&
220-
(buildServerMetadata || gitMetadata?.source === "trigger_github_app")
220+
(buildServerMetadata || gitMetadata?.source === "trigger_github_app" || env.S2_DEPLOYMENT_STREAMS_LOCAL === "1")
221221
) {
222222
const [error, accessToken] = await tryCatch(this.getS2AccessToken(project.externalRef));
223223

@@ -290,9 +290,9 @@ export class DeploymentPresenter {
290290
return cachedToken;
291291
}
292292

293-
const { access_token: accessToken } = await s2.accessTokens.issue({
293+
const { accessToken } = await s2.accessTokens.issue({
294294
id: `${projectRef}-${new Date().getTime()}`,
295-
expires_at: new Date(Date.now() + 60 * 60 * 1000).toISOString(), // 1 hour
295+
expiresAt: new Date(Date.now() + 60 * 60 * 1000), // 1 hour
296296
scope: {
297297
ops: ["read"],
298298
basins: {

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,41 @@ export class SpanPresenter extends BasePresenter {
629629
},
630630
};
631631
}
632+
case "input-stream": {
633+
if (!span.entity.id) {
634+
logger.error(`SpanPresenter: No input stream id`, {
635+
spanId,
636+
inputStreamId: span.entity.id,
637+
});
638+
return { ...data, entity: null };
639+
}
640+
641+
const [runId, streamId] = span.entity.id.split(":");
642+
643+
if (!runId || !streamId) {
644+
logger.error(`SpanPresenter: Invalid input stream id`, {
645+
spanId,
646+
inputStreamId: span.entity.id,
647+
});
648+
return { ...data, entity: null };
649+
}
650+
651+
// Translate user-facing stream ID to internal S2 stream name
652+
const s2StreamKey = `$trigger.input:${streamId}`;
653+
654+
return {
655+
...data,
656+
entity: {
657+
type: "realtime-stream" as const,
658+
object: {
659+
runId,
660+
streamKey: s2StreamKey,
661+
displayName: streamId,
662+
metadata: undefined,
663+
},
664+
},
665+
};
666+
}
632667
default:
633668
return { ...data, entity: null };
634669
}

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.deployments.$deploymentParam/route.tsx

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -198,17 +198,14 @@ export default function Page() {
198198

199199
const readSession = await stream.readSession(
200200
{
201-
seq_num: 0,
202-
wait: 60,
203-
as: "bytes",
201+
start: { from: { seqNum: 0 }, clamp: true },
202+
stop: { waitSecs: 60 },
204203
},
205204
{ signal: abortController.signal }
206205
);
207206

208-
const decoder = new TextDecoder();
209-
210207
for await (const record of readSession) {
211-
const decoded = decoder.decode(record.body);
208+
const decoded = record.body;
212209
const result = DeploymentEventFromString.safeParse(decoded);
213210

214211
if (!result.success) {
@@ -217,8 +214,8 @@ export default function Page() {
217214
const headers: Record<string, string> = {};
218215

219216
if (record.headers) {
220-
for (const [nameBytes, valueBytes] of record.headers) {
221-
headers[decoder.decode(nameBytes)] = decoder.decode(valueBytes);
217+
for (const [name, value] of record.headers) {
218+
headers[name] = value;
222219
}
223220
}
224221
const level = (headers["level"]?.toLowerCase() as LogEntry["level"]) ?? "info";
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
import { json } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import {
4+
CreateInputStreamWaitpointRequestBody,
5+
type CreateInputStreamWaitpointResponseBody,
6+
} from "@trigger.dev/core/v3";
7+
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
8+
import { $replica } from "~/db.server";
9+
import { createWaitpointTag, MAX_TAGS_PER_WAITPOINT } from "~/models/waitpointTag.server";
10+
import {
11+
deleteInputStreamWaitpoint,
12+
setInputStreamWaitpoint,
13+
} from "~/services/inputStreamWaitpointCache.server";
14+
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
15+
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
16+
import { parseDelay } from "~/utils/delays";
17+
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
18+
import { engine } from "~/v3/runEngine.server";
19+
import { ServiceValidationError } from "~/v3/services/baseService.server";
20+
21+
const ParamsSchema = z.object({
22+
runFriendlyId: z.string(),
23+
});
24+
25+
const { action, loader } = createActionApiRoute(
26+
{
27+
params: ParamsSchema,
28+
body: CreateInputStreamWaitpointRequestBody,
29+
maxContentLength: 1024 * 10, // 10KB
30+
method: "POST",
31+
},
32+
async ({ authentication, body, params }) => {
33+
try {
34+
const run = await $replica.taskRun.findFirst({
35+
where: {
36+
friendlyId: params.runFriendlyId,
37+
runtimeEnvironmentId: authentication.environment.id,
38+
},
39+
select: {
40+
id: true,
41+
friendlyId: true,
42+
realtimeStreamsVersion: true,
43+
},
44+
});
45+
46+
if (!run) {
47+
return json({ error: "Run not found" }, { status: 404 });
48+
}
49+
50+
const idempotencyKeyExpiresAt = body.idempotencyKeyTTL
51+
? resolveIdempotencyKeyTTL(body.idempotencyKeyTTL)
52+
: undefined;
53+
54+
const timeout = await parseDelay(body.timeout);
55+
56+
// Process tags (same pattern as api.v1.waitpoints.tokens.ts)
57+
const bodyTags = typeof body.tags === "string" ? [body.tags] : body.tags;
58+
59+
if (bodyTags && bodyTags.length > MAX_TAGS_PER_WAITPOINT) {
60+
throw new ServiceValidationError(
61+
`Waitpoints can only have ${MAX_TAGS_PER_WAITPOINT} tags, you're trying to set ${bodyTags.length}.`
62+
);
63+
}
64+
65+
if (bodyTags && bodyTags.length > 0) {
66+
for (const tag of bodyTags) {
67+
await createWaitpointTag({
68+
tag,
69+
environmentId: authentication.environment.id,
70+
projectId: authentication.environment.projectId,
71+
});
72+
}
73+
}
74+
75+
// Step 1: Create the waitpoint
76+
const result = await engine.createManualWaitpoint({
77+
environmentId: authentication.environment.id,
78+
projectId: authentication.environment.projectId,
79+
idempotencyKey: body.idempotencyKey,
80+
idempotencyKeyExpiresAt,
81+
timeout,
82+
tags: bodyTags,
83+
});
84+
85+
// Step 2: Cache the mapping in Redis for fast lookup from .send()
86+
const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined;
87+
await setInputStreamWaitpoint(
88+
run.friendlyId,
89+
body.streamId,
90+
result.waitpoint.id,
91+
ttlMs && ttlMs > 0 ? ttlMs : undefined
92+
);
93+
94+
// Step 3: Check if data was already sent to this input stream (race condition handling).
95+
// If .send() landed before .wait(), the data is in the S2 stream but no waitpoint
96+
// existed to complete. We check from the client's last known position.
97+
if (!result.isCached) {
98+
try {
99+
const realtimeStream = getRealtimeStreamInstance(
100+
authentication.environment,
101+
run.realtimeStreamsVersion
102+
);
103+
104+
const records = await realtimeStream.readRecords(
105+
run.friendlyId,
106+
`$trigger.input:${body.streamId}`,
107+
body.lastSeqNum
108+
);
109+
110+
if (records.length > 0) {
111+
const record = records[0]!;
112+
113+
// Record data is the raw user payload — no wrapper to unwrap
114+
await engine.completeWaitpoint({
115+
id: result.waitpoint.id,
116+
output: {
117+
value: record.data,
118+
type: "application/json",
119+
isError: false,
120+
},
121+
});
122+
123+
// Clean up the Redis cache since we completed it ourselves
124+
await deleteInputStreamWaitpoint(run.friendlyId, body.streamId);
125+
}
126+
} catch {
127+
// Non-fatal: if the S2 check fails, the waitpoint is still PENDING.
128+
// The next .send() will complete it via the Redis cache path.
129+
}
130+
}
131+
132+
return json<CreateInputStreamWaitpointResponseBody>({
133+
waitpointId: WaitpointId.toFriendlyId(result.waitpoint.id),
134+
isCached: result.isCached,
135+
});
136+
} catch (error) {
137+
if (error instanceof ServiceValidationError) {
138+
return json({ error: error.message }, { status: 422 });
139+
} else if (error instanceof Error) {
140+
return json({ error: error.message }, { status: 500 });
141+
}
142+
143+
return json({ error: "Something went wrong" }, { status: 500 });
144+
}
145+
}
146+
);
147+
148+
export { action, loader };

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ async function responseHeaders(
166166
const claims = {
167167
sub: environment.id,
168168
pub: true,
169-
scopes: [`read:runs:${run.friendlyId}`],
169+
scopes: [`read:runs:${run.friendlyId}`, `write:inputStreams:${run.friendlyId}`],
170170
realtime,
171171
};
172172

0 commit comments

Comments
 (0)