[ PROMPT_NODE_23890 ]
streaming
[ SKILL_DOCUMENTATION ]
# 流式传输 — TypeScript
## 快速开始
typescript
const stream = client.messages.stream({
model: "claude-opus-4-7",
max_tokens: 64000,
messages: [{ role: "user", content: "写一个故事" }],
});
for await (const event of stream) {
if (
event.type === "content_block_delta" &&
event.delta.type === "text_delta"
) {
process.stdout.write(event.delta.text);
}
}
---
## 处理不同内容类型
> **Opus 4.7 / Opus 4.6:** 使用 `thinking: {type: "adaptive"}`。在旧模型上,请改用 `thinking: {type: "enabled", budget_tokens: N}`。
typescript
const stream = client.messages.stream({
model: "claude-opus-4-7",
max_tokens: 64000,
thinking: { type: "adaptive" },
messages: [{ role: "user", content: "分析这个问题" }],
});
for await (const event of stream) {
switch (event.type) {
case "content_block_start":
switch (event.content_block.type) {
case "thinking":
console.log("n[思考中...]");
break;
case "text":
console.log("n[回复:]");
break;
}
break;
case "content_block_delta":
switch (event.delta.type) {
case "thinking_delta":
process.stdout.write(event.delta.thinking);
break;
case "text_delta":
process.stdout.write(event.delta.text);
break;
}
break;
}
}
---
## 使用工具进行流式传输(工具运行器)
在 `stream: true` 时使用工具运行器。外层循环遍历工具运行器的迭代(消息),内层循环处理流事件:
typescript
import Anthropic from "@anthropic-ai/sdk";
import { betaZodTool } from "@anthropic-ai/sdk/helpers/beta/zod";
import { z } from "zod";
const client = new Anthropic();
const getWeather = betaZodTool({
name: "get_weather",
description: "获取指定地点的当前天气",
inputSchema: z.object({
location: z.string().describe("城市和州,例如:San Francisco, CA"),
}),
run: async ({ location }) => `在 ${location} 是 72°F 且晴天`,
});
const runner = client.beta.messages.toolRunner({
model: "claude-opus-4-7",
max_tokens: 64000,
tools: [getWeather],
messages: [
{ role: "user", content: "巴黎和伦敦的天气怎么样?" },
],
stream: true,
});
// 外层循环:每次工具运行器迭代
for await (const messageStream of runner) {
// 内层循环:本次迭代的流事件
for await (const event of messageStream) {
switch (event.type) {