展示HN:Calfkit – 构建分布式、事件驱动的AI智能体SDK
Show HN: Calfkit – an SDK to build distributed, event-driven AI agents on Kafka

原始链接: https://github.com/calf-ai/calfkit-sdk

## Calfkit:构建可扩展、分布式 AI 智能体 Calfkit 是一个 Python SDK,旨在简化异步、分布式 AI 智能体的创建。它解决了传统智能体架构中固有的可扩展性问题,这些问题源于同步 API 调用和紧密耦合——这些问题与早期微服务面临的挑战相似。 Calfkit 不采用直接通信,而是利用异步流的事件驱动方法,从而可以独立扩展智能体组件,例如聊天、工具和路由。这种松散的耦合允许轻松添加新功能,而无需修改现有代码,并促进将通用数据流传输到 CRM 和数据仓库等系统。 主要优势包括横向可扩展性、通过事件持久化实现可靠的消息传递、高吞吐量以及开发期间的团队独立性。Calfkit 利用 Kafka 作为其事件代理,抽象了管理事件驱动基础设施的复杂性。 借助 Calfkit,开发者可以构建健壮、可扩展的 AI 智能体——为更复杂的“AI 员工”和完全由 AI 驱动的公司铺平道路——而无需复杂的编排开销。它需要 Python 3.10+、Docker 和 OpenAI API 密钥才能开始使用。

Hacker News 新闻 | 过去 | 评论 | 提问 | 展示 | 招聘 | 提交 登录 Show HN: Calfkit – 构建分布式、事件驱动型 AI 代理的 SDK (github.com/calf-ai) 4 分,ryanyu 发表于 1 小时前 | 隐藏 | 过去 | 收藏 | 讨论 我认为代理应该像真正的团队一样工作,具有独立的、不同的角色、异步通信以及在不重构整个组织的情况下 onboarding 新的团队成员或工具的能力。我在 Yahoo 和 TikTok 构建后端系统,所以事件驱动型代理感觉很自然。但没有代理 SDK 使用这种模式,所以我制作了 Calfkit。 Calfkit 将代理分解为独立的 services (LLM 推理、工具和路由),通过 Kafka 异步通信。代理、工具服务和下游消费者可以独立部署、添加、删除和扩展。 如果对此感兴趣,请查看!我很想知道大家的想法。 指南 | 常见问题 | 列表 | API | 安全 | 法律 | 申请 YC | 联系方式 搜索:
相关文章

原文

PyPI version License

The SDK to build asynchronous, distributed AI agents.

Calfkit lets you compose agents with independent services—chat, tools, routing—that communicate asynchronously. Add agent capabilities without coordination. Scale each component independently. Stream agent outputs to any downstream system. Build employees that integrate.

Building agents like traditional web applications, with tight coupling and synchronous API calls, creates the same scalability problems that plagued early microservices. Agents and workflows connected through APIs and RPC are plagued by:

  • Tight coupling: Changing one tool or agent breaks dependent agents and tools
  • Scaling bottlenecks: Since all agents and tools live on one runtime, everything must scale together
  • Siloed outputs: Agent and tool outputs stay trapped in your AI layer, streaming outputs to external dependencies is not natural

Event-driven architectures provide the solution. Instead of direct API calls between components, agents and tools interact through asynchronous streams. Each component runs independently, scales horizontally, and outputs can flow anywhere: CRMs, data warehouses, analytics platforms, other agents, or even more tools.

Calfkit is a Python SDK that builds event-driven agents out-the-box. You get all the benefits of a asynchronous, distributed system (loose coupling, horizontal scalability, durability) without the complexity of managing event-driven infrastructure and orchestration yourself.

  • Distributed agents out of the box: Build event-driven, multi-service agents without writing orchestration code or managing infrastructure
  • Add agent capabilities without touching existing code: Deploy new tool capabilities as independent services that agents can dynamically discover, no need to touch your agent code
  • Scale what you need, when you need it: Chat handling, tool execution, and routing each scale independently based on demand
  • Nothing gets lost: Event persistence ensures reliable message delivery and traceability, even during service failures or restarts
  • High throughput under pressure: Asynchronous communication decouples requests from processing, so Calfkit agents work through bursty traffic reliably, maximizing throughput
  • Real-time responses: Low-latency event processing enables agents to react instantly to incoming data
  • Team independence: Different teams can develop and deploy chat, tools, and routing concurrently without cross-team coordination overhead
  • Universal data flow: Decoupling enables data to flow freely in both directions.
    • Downstream, agent outputs can be streamed to any system (CRMs, customer data platforms, warehouses, or even another AI workflow).
    • Upstream, tools can wrap any data sources and deploy independently, no coordination needed.
  • Python 3.10 or later
  • Docker installed and running (for local testing with a Calfkit broker)
  • OpenAI API key (or another OpenAI API compliant LLM provider)

Start the Kafka Broker Using Docker

Calfkit uses Kafka as the event broker. Run the following command to clone the calfkit-broker repo and start a local Kafka broker container:

$ git clone https://github.com/calf-ai/calfkit-broker && cd calfkit-broker && make dev-up

Once the broker is ready, open a new terminal tab to continue with the quickstart.

Define and deploy a tool as an independent service.

# weather_tool.py
import asyncio
from calfkit.nodes import agent_tool
from calfkit.broker import BrokerClient
from calfkit.runners import NodesService

@agent_tool
def get_weather(location: str) -> str:
    """Get the current weather at a location"""
    return f"It's sunny in {location}"

async def main():
    broker_client = BrokerClient(bootstrap_servers="localhost:9092") # Connect to Kafka broker
    service = NodesService(broker_client) # Initialize a service instance
    service.register_node(get_weather) # Register the tool node in the service
    await service.run() # (Blocking call) Deploy the service to start serving traffic

if __name__ == "__main__":
    asyncio.run(main())

Run the file to deploy the tool service:

Deploy the LLM chat node as its own service.

# chat_service.py
import asyncio
from calfkit.nodes import ChatNode
from calfkit.providers import OpenAIModelClient
from calfkit.broker import BrokerClient
from calfkit.runners import NodesService

async def main():
    broker_client = BrokerClient(bootstrap_servers="localhost:9092") # Connect to Kafka broker
    model_client = OpenAIModelClient(model_name="gpt-5-nano")
    chat_node = ChatNode(model_client) # Inject a model client into the chat node definition so the chat deployment can perform LLM calls
    service = NodesService(broker_client) # Initialize a service instance
    service.register_node(chat_node) # Register the chat node in the service
    await service.run() # (Blocking call) Deploy the service to start serving traffic

if __name__ == "__main__":
    asyncio.run(main())

Set your OpenAI API key:

$ export OPENAI_API_KEY=sk-...

Run the file to deploy the chat service:

Deploy the Agent Router Node

Deploy the agent router that orchestrates chat, tools, and conversation-level memory.

# agent_router_service.py
import asyncio
from calfkit.nodes import agent_tool, AgentRouterNode, ChatNode
from calfkit.stores import InMemoryMessageHistoryStore
from calfkit.broker import BrokerClient
from calfkit.runners import NodesService
from weather_tool import get_weather # Import the tool, the tool definition is reusable

async def main():
    broker_client = BrokerClient(bootstrap_servers="localhost:9092") # Connect to Kafka broker
    router_node = AgentRouterNode(
        chat_node=ChatNode(), # Provide the chat node definition for the router to orchestrate the nodes
        tool_nodes=[get_weather],
        system_prompt="You are a helpful assistant",
        message_history_store=InMemoryMessageHistoryStore(), # Stores messages in-memory in the deployment runtime
    )
    service = NodesService(broker_client) # Initialize a service instance
    service.register_node(router_node) # Register the router node in the service
    await service.run() # (Blocking call) Deploy the service to start serving traffic

if __name__ == "__main__":
    asyncio.run(main())

Run the file to deploy the agent router service:

$ python agent_router_service.py

Send a request and receive the response.

When invoking an already-deployed agent, use the RouterServiceClient. The node is just a configuration object, so you don't need to redefine the deployment parameters.

# client.py
import asyncio
from calfkit.nodes import AgentRouterNode
from calfkit.broker import BrokerClient
from calfkit.runners import RouterServiceClient

async def main():
    broker_client = BrokerClient(bootstrap_servers="localhost:9092") # Connect to Kafka broker

    # Thin client - no deployment parameters needed
    router_node = AgentRouterNode()
    client = RouterServiceClient(broker_client, router_node)

    # Invoke and wait for response
    response = await client.invoke(user_prompt="What's the weather in Tokyo?")
    final_msg = await response.get_final_response()
    print(f"Assistant: {final_msg.text}")

if __name__ == "__main__":
    asyncio.run(main())

Run the file to invoke the agent:

The RouterServiceClient handles ephemeral Kafka communication and cleanup automatically. You can also stream intermediate messages:

response = await client.invoke(user_prompt="What's the weather in Tokyo?")

# Stream all messages (tool calls, intermediate responses, etc.)
async for message in response.messages_stream():
    print(message)

To move toward AI employees and AI-run companies, teams of agents must progress beyond brittle, tightly coupled, synchronous coordination. This requires embracing event-driven, asynchronous communication patterns between agents and their dependencies.

Apache-2.0

联系我们 contact @ memedata.com