这个AI智能体应该是一个SQL查询。
Building agents using streaming SQL queries

原始链接: https://www.morling.dev/blog/this-ai-agent-should-have-been-sql-query/

利用Flink SQL构建企业级AI Agent,充分利用其访问结构化和非结构化数据的能力。对于客户详细信息等结构化数据,利用SQL连接与数据源连接器或查找连接进行引用数据关联,并利用Flink的缓存机制。对于内部文档等非结构化数据,实现检索增强生成 (RAG)。将文档编码成嵌入向量,并存储在向量数据库中(例如Pinecone、Elasticsearch、Postgres、MongoDB)。Flink SQL原生支持将向量表示为ARRAY。 实现一个由两个作业组成的Agent系统:一个作业使用`ML_PREDICT()`和嵌入模型在向量存储中创建/更新嵌入向量;第二个作业对新数据进行摘要,生成其嵌入向量,查询向量存储以查找相关文档(使用自定义函数UDF计算余弦相似度),并增强另一个`ML_PREDICT()`调用以建立关系。 对于真正的AI Agent,让大型语言模型 (LLM) 决定使用哪些资源,可以使用Anthropic的MCP。利用过程表函数 (PTF) 集成MCP,允许使用任意的Java逻辑,例如LangChain4j API,来集成外部工具。Flink 2.1中提供的PTF,通过对行组进行操作,可以在Agent内实现内存管理。

This Hacker News thread discusses an article exploring the use of streaming SQL queries, specifically with Flink, to build "agents" by integrating LLMs. The core idea is to trigger LLM prompts for tasks like summarizing papers based on new data store entries, then routing alerts via Slack. While the author acknowledges that the term "agent" is broadly defined, some commenters feel the implementation more closely resembles functions within data processing pipelines than true AI agents with features like tool selection and memory. Criticisms include that the approach felt forced and requires workarounds to achieve agent-like behavior. Alternative perspectives propose that the post is a commentary on using immutability for building agents or suggest that a functional approach might be less restrictive than SQL. Another commenter argues that true "agentic workflows" require a more asynchronous and iterative process, with agents recursively finding new tasks, which is difficult to achieve with a single, predefined SQL query.
相关文章

原文

LLMs are general-purpose models created from huge bodies of publicly available datasets. However, many, if not most, AI Agents for enterprise use cases require access to context such as internal data and resources, tools and services. How can this be implemented when building an agentic system using Flink SQL?

First, let’s consider the case of structured data, for instance details about a given customer stored in an external database. SQL is a natural fit for accessing that kind of data: Flink SQL allows you to enrich the data to be sent to an LLM using SQL join semantics. One option is to join streams sourced from one of the wide range of source connectors (and by extension, also using the Kafka Connect source connector ecosystem). Alternatively, in particular for reference data which doesn’t frequently change, you also can use look-up joins, which let you retrieve data from external data sources, such as databases or CRM systems. In that case, Flink will take care of caching look-up results in a local RocksDB instance for the sake of efficiency, fetching data from the upstream source only when needed.

When it comes to feeding non-public unstructured data—documentation and wiki pages, reports, knowledgebases, customer contracts, etc.—to an LLM, retrieval-augmented generation (RAG) is a proven solution. With the help of a language model, unstructured domain-specific information is encoded into embeddings, which are stored in a vector database such as Pinecone or Elasticsearch, or alternatively using a vector index of a more traditional data store like Postgres or MongoDB. Thanks to Flink SQL’s rich type system, vectors are natively supported as ARRAY<FLOAT>. When an agent is about to make a query to an LLM, the input data is used to query the vector store, allowing the agent to enrich the LLM prompt with relevant domain-specific information, yielding higher quality results, based on the latest data and information of your specific business context.

What does that mean for our thought experiment of building AI Agents as Flink SQL queries? Following up on the example of summarizing research papers, let’s assume we’re also doing company-internal research, the results of which are documented in an internal wiki. Based on the summary of an incoming research paper, we’d like to identify relevant internal research and get some understanding of the relationship between the new paper and our own research, for instance providing new angles and perspectives for future research activities. To solve that task, we could think of having two streaming SQL jobs, which both taken together form an agentic system:

One job creates and updates the embeddings in the vector store, whenever there’s a change in the internal research wiki. In other scenarios, thanks to the rich eco system of Flink connectors, the data could also be retrieved in real-time from a relational database using change data capture, through a web hook which receives a notification after changes to a company’s wiki pages, etc. To create the vector embeddings (A1), the ML_PREDICT() function can be used with an embedding model such as OpenAI’s text-embedding-3-small model. That way, the embedding representation in the vector store is continuously kept in sync with the original data (A2).

In the actual agent job itself, we’d create a summary of each new paper as described above (B1). Next, we’d use ML_PREDICT() with the same embedding model for creating a vector representation of that summary (B2). This embedding then is used to query the vector store and identify the most relevant internal research documents, for instance based on cosine similarity (B3). Currently, there’s no support for this built into Apache Flink itself, so this is something you’d have to implement yourself with a user-defined function (UDF). When running on Confluent Cloud, there’s a ready-made function VECTOR_SEARCH(), which lets you execute queries against different vector stores; eventually, I’d expect this capability to also be available in upstream Flink. Finally, we’d use the results to augment another LLM invocation via ML_PREDICT() for establishing the relationship between the new paper and our own research (B4).

Arguably, so far we’ve stayed on the workflow side of the workflow/agent dichotomy mentioned initially. For building a true AI Agent, it may be necessary to let the LLM itself decide which resources or tools to tap into for a given prompt. Anthropic’s MCP standard (Model Context Protocol) has seen a massive uptake over the last few months for exactly this use case, allowing you to integrate custom services and data sources into your agentic workflows.

Unfortunately, as of today, this is not something which is supported by Flink SQL out-of-the-box. But you can close this gap by implementing a UDF. In particular, Process Table Functions (PTF, defined by FLIP-440), a new kind of UDF available in Flink 2.1 come in very handy for this purpose. They allow you to integrate arbitrary logic written in Java into your SQL pipelines, which means you could build a PTF for the integration of external tools via MCP, for instance using the LangChain4j API.

PTFs allow for very flexible customizations of the processing logic of Flink SQL jobs. The integration of MCP into a PTF may be a subject for a future post; in the mean time, refer to this post for taking a first look at using PTFs in the context of a change data capture pipeline for Postgres.

As PTFs are table valued functions, they can not only operate on single rows and events, but also on groups of rows, for instance all the events pertaining to a specific customer or workflow instance. This makes them a candidate for implementing agent memory; more on that in the following.

联系我们 contact @ memedata.com