Reference
Streaming
֍
Receive streaming outputs

When streaming, Substrate returns individual node outputs as they complete, as well as incremental chunks from nodes that support streaming. The stream ends with the full graph result.

Substrate uses Server-Sent Events (SSE) to stream results and each event may contain the following:

  • node.result: Completed result of a node.
  • node.delta: Incremental chunks of output from a running node.
  • node.error: Error encountered when running a node.
  • graph.result: The full result of the completed graph run.

Server

To stream results from a graph run, call substrate.stream instead of substrate.run:

TypeScript
Python

const stream = await substrate.stream(story, summary);

Depending on your use case, you can continue to work with the streaming response in your server-side code.

You can also forward the streaming response directly to your client, and work with the streaming response client-side.

Forward to client

To forward a streaming response, return the stream's apiResponse.body in your API response with Content-Type text/event-stream:

api/endpoint/route.ts

export async function POST(request: Request) {
// ...
const body = stream.apiResponse.body;
return new Response(body, {
headers: { "Content-Type": "text/event-stream" },
});
}

Client

To handle a forwarded streaming response in client-side JavaScript, use sb.streaming.fromSSEResponse to turn the response into an async iterator.

example.tsx

"use client";
import { useState } from "react";
import { sb } from "substrate";
export default function Demo() {
const [output, setOutput] = useState<any[]>([]);
const [text, setText] = useState<string>("");
async function submitPrompt(event: any) {
event.preventDefault();
const request = new Request("/api/endpoint", {
method: "POST",
body: new FormData(event.currentTarget),
});
const response = await fetch(request);
if (response.ok) {
setOutput([]);
setText("");
const stream = await sb.streaming.fromSSEResponse(response);
for await (let message of stream) {
// node.result is received after a node runs
// it contains the node's output in the `data` field
if (message.object === "node.result") {
setOutput((state) => [...state, message.data.json_object]);
}
// node.delta is received when a node outputs incremental data
if (message.object === "node.delta") {
setText((state) => state + message.data.choices.item.text);
}
}
}
}
// ...
}

Streaming Event Types

node.result

Contains the entire node's output. Sent as soon as the node completes.


{
object: "node.result";
nodeId: string;
data: $NodeOutput;
}

node.error

Contains an error that occured during a node's run.


{
object: "node.error";
nodeId: string;
data: {
type: "api_error" | "invalid_request_error" | "dependency_error";
message: string;
}
}

node.delta

For nodes that support streaming chunks, delta events are used. These events contain partial results in the same structure as the node's output with one difference: if a field is an array, it contains an array.item structure that describes the array item and which index it belongs to.


{
object: "node.delta";
nodeId: string;
data: $NodeOutputChunk;
}

The array.item object:


{
object: "array.item";
index: number;
item: $NodeOutputArrayItem; // value of the array item
}

graph.result

When the graph run completes the entire graph result is sent as a final message.


{
object: "graph.result";
data: $GraphResult; // contains results from all nodes in the graph
}