SequentialWorkflow exposes two streaming methods that yield tokens from each agent in pipeline order, in real time. Each agent’s tokens are streamed the moment the LLM produces them; once an agent finishes, its full output is handed off to the next agent — same hand-off as run(), just streamed.
workflow.run_stream(task) — sync generator
workflow.arun_stream(task) — async generator
- Pass
with_events=True to either to receive structured agent_start / token / agent_end events instead of plain token strings.
Building the Pipeline
from swarms import Agent, SequentialWorkflow
def make_agent(name: str, system_prompt: str) -> Agent:
return Agent(
agent_name=name,
system_prompt=system_prompt,
model_name="gpt-4.1-mini",
max_loops=1,
persistent_memory=False,
print_on=False,
)
workflow = SequentialWorkflow(
agents=[
make_agent(
"Researcher",
"Research the topic and produce a concise factual brief.",
),
make_agent(
"Analyst",
"Take the brief and produce sharp analytical insights.",
),
make_agent(
"Writer",
"Take the analysis and produce a polished, reader-friendly summary.",
),
],
autosave=False,
)
Sync Streaming
Plain token strings, yielded in pipeline order. Agent 1’s tokens stream first, then Agent 2’s, then Agent 3’s.
for token in workflow.run_stream("the rise of solid-state batteries"):
print(token, end="", flush=True)
Async Streaming
import asyncio
async def main():
async for token in workflow.arun_stream(
"the rise of solid-state batteries"
):
print(token, end="", flush=True)
asyncio.run(main())
Structured Events with with_events=True
By default the stream yields plain token strings. Pass with_events=True to receive event dicts instead — useful when you want to render a separate panel per agent, attribute every token to the emitting agent, or know exactly when each agent starts and finishes.
import asyncio
async def main():
async for evt in workflow.arun_stream(
"the rise of solid-state batteries",
with_events=True,
):
if evt["type"] == "agent_start":
print(f"\n--- {evt['agent']} starting ---")
elif evt["type"] == "token":
print(evt["token"], end="", flush=True)
elif evt["type"] == "agent_end":
print(
f"\n--- {evt['agent']} finished "
f"({len(evt['output'])} chars) ---"
)
asyncio.run(main())
The three event types are:
| Type | Fields | When emitted |
|---|
agent_start | agent | Right before an agent begins streaming |
token | agent, token | For every token the agent emits |
agent_end | agent, output | After the agent finishes; carries the full output |
max_loops > 1 and drift_detection are not applied in streaming mode. Use workflow.run() if you need those.