API_KEY = "sk-bSjxX2X1FAptXPcBjItcMcbRvcGFZTtRRcUf52BGQr1dscZM" import asyncio import json from typing import AsyncIterator, List, Dict, Any, Optional import httpx API_URL = "https://api.lingyaai.cn/v1/messages" ANTHROPIC_VERSION = "2023-06-01" async def stream_messages( messages: List[Dict[str, Any]], model: str = "claude-sonnet-4-5-20250929", max_tokens: Optional[int] = None, ) -> AsyncIterator[str]: headers = { "anthropic-version": ANTHROPIC_VERSION, "content-type": "application/json", "x-api-key": API_KEY, } payload: Dict[str, Any] = { "model": model, "messages": messages, "stream": True, } if max_tokens is not None: payload["max_tokens"] = max_tokens async with httpx.AsyncClient(trust_env=False, timeout=None) as client: # print(f"Connecting to {API_URL}...") async with client.stream("POST", API_URL, json=payload, headers=headers) as resp: # print(f"Response status: {resp.status_code}") resp.raise_for_status() async for line in resp.aiter_lines(): if not line: continue if line.startswith("data: "): line = line[6:] try: obj = json.loads(line) except Exception: continue t = obj.get("type") if t == "content_block_delta": delta = obj.get("delta", {}) text = delta.get("text") if text: yield text elif t == "message_stop": break async def stream_text(prompt: str, **kwargs) -> None: msgs = [{"role": "user", "content": prompt}] async for chunk in stream_messages(msgs, **kwargs): print(chunk, end="", flush=True) if __name__ == "__main__": asyncio.run(stream_text("讲个故事"))