Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.swarms.world/llms.txt

Use this file to discover all available pages before exploring further.

SequentialWorkflow exposes two streaming methods that yield tokens from each agent in pipeline order, in real time. Each agent’s tokens are streamed the moment the LLM produces them; once an agent finishes, its full output is handed off to the next agent — same hand-off as run(), just streamed.
  • workflow.run_stream(task) — sync generator
  • workflow.arun_stream(task) — async generator
  • Pass with_events=True to either to receive structured agent_start / token / agent_end events instead of plain token strings.

Building the Pipeline

from swarms import Agent, SequentialWorkflow


def make_agent(name: str, system_prompt: str) -> Agent:
    return Agent(
        agent_name=name,
        system_prompt=system_prompt,
        model_name="gpt-4.1-mini",
        max_loops=1,
        persistent_memory=False,
        print_on=False,
    )


workflow = SequentialWorkflow(
    agents=[
        make_agent(
            "Researcher",
            "Research the topic and produce a concise factual brief.",
        ),
        make_agent(
            "Analyst",
            "Take the brief and produce sharp analytical insights.",
        ),
        make_agent(
            "Writer",
            "Take the analysis and produce a polished, reader-friendly summary.",
        ),
    ],
    autosave=False,
)

Sync Streaming

Plain token strings, yielded in pipeline order. Agent 1’s tokens stream first, then Agent 2’s, then Agent 3’s.
for token in workflow.run_stream("the rise of solid-state batteries"):
    print(token, end="", flush=True)

Async Streaming

import asyncio


async def main():
    async for token in workflow.arun_stream(
        "the rise of solid-state batteries"
    ):
        print(token, end="", flush=True)


asyncio.run(main())

Structured Events with with_events=True

By default the stream yields plain token strings. Pass with_events=True to receive event dicts instead — useful when you want to render a separate panel per agent, attribute every token to the emitting agent, or know exactly when each agent starts and finishes.
import asyncio


async def main():
    async for evt in workflow.arun_stream(
        "the rise of solid-state batteries",
        with_events=True,
    ):
        if evt["type"] == "agent_start":
            print(f"\n--- {evt['agent']} starting ---")
        elif evt["type"] == "token":
            print(evt["token"], end="", flush=True)
        elif evt["type"] == "agent_end":
            print(
                f"\n--- {evt['agent']} finished "
                f"({len(evt['output'])} chars) ---"
            )


asyncio.run(main())
The three event types are:
TypeFieldsWhen emitted
agent_startagentRight before an agent begins streaming
tokenagent, tokenFor every token the agent emits
agent_endagent, outputAfter the agent finishes; carries the full output
max_loops > 1 and drift_detection are not applied in streaming mode. Use workflow.run() if you need those.