LLMs are general-purpose models created from huge bodies of publicly available datasets. However, many, if not most, AI Agents for enterprise use cases require access to context such as internal data and resources, tools and services. How can this be implemented when building an agentic system using Flink SQL?
First, let’s consider the case of structured data, for instance details about a given customer stored in an external database. SQL is a natural fit for accessing that kind of data: Flink SQL allows you to enrich the data to be sent to an LLM using SQL join semantics. One option is to join streams sourced from one of the wide range of source connectors (and by extension, also using the Kafka Connect source connector ecosystem). Alternatively, in particular for reference data which doesn’t frequently change, you also can use look-up joins, which let you retrieve data from external data sources, such as databases or CRM systems. In that case, Flink will take care of caching look-up results in a local RocksDB instance for the sake of efficiency, fetching data from the upstream source only when needed.
When it comes to feeding non-public unstructured data—documentation and wiki pages, reports, knowledgebases, customer contracts, etc.—to an LLM, retrieval-augmented generation (RAG) is a proven solution. With the help of a language model, unstructured domain-specific information is encoded into embeddings, which are stored in a vector database such as Pinecone or Elasticsearch, or alternatively using a vector index of a more traditional data store like Postgres or MongoDB. Thanks to Flink SQL’s rich type system, vectors are natively supported as ARRAY<FLOAT>
. When an agent is about to make a query to an LLM, the input data is used to query the vector store, allowing the agent to enrich the LLM prompt with relevant domain-specific information, yielding higher quality results, based on the latest data and information of your specific business context.
What does that mean for our thought experiment of building AI Agents as Flink SQL queries? Following up on the example of summarizing research papers, let’s assume we’re also doing company-internal research, the results of which are documented in an internal wiki. Based on the summary of an incoming research paper, we’d like to identify relevant internal research and get some understanding of the relationship between the new paper and our own research, for instance providing new angles and perspectives for future research activities. To solve that task, we could think of having two streaming SQL jobs, which both taken together form an agentic system:
One job creates and updates the embeddings in the vector store, whenever there’s a change in the internal research wiki. In other scenarios, thanks to the rich eco system of Flink connectors, the data could also be retrieved in real-time from a relational database using change data capture, through a web hook which receives a notification after changes to a company’s wiki pages, etc. To create the vector embeddings (A1), the ML_PREDICT()
function can be used with an embedding model such as OpenAI’s text-embedding-3-small model. That way, the embedding representation in the vector store is continuously kept in sync with the original data (A2).
In the actual agent job itself, we’d create a summary of each new paper as described above (B1). Next, we’d use ML_PREDICT()
with the same embedding model for creating a vector representation of that summary (B2). This embedding then is used to query the vector store and identify the most relevant internal research documents, for instance based on cosine similarity (B3). Currently, there’s no support for this built into Apache Flink itself, so this is something you’d have to implement yourself with a user-defined function (UDF). When running on Confluent Cloud, there’s a ready-made function VECTOR_SEARCH()
, which lets you execute queries against different vector stores; eventually, I’d expect this capability to also be available in upstream Flink. Finally, we’d use the results to augment another LLM invocation via ML_PREDICT()
for establishing the relationship between the new paper and our own research (B4).
Arguably, so far we’ve stayed on the workflow side of the workflow/agent dichotomy mentioned initially. For building a true AI Agent, it may be necessary to let the LLM itself decide which resources or tools to tap into for a given prompt. Anthropic’s MCP standard (Model Context Protocol) has seen a massive uptake over the last few months for exactly this use case, allowing you to integrate custom services and data sources into your agentic workflows.
Unfortunately, as of today, this is not something which is supported by Flink SQL out-of-the-box. But you can close this gap by implementing a UDF. In particular, Process Table Functions (PTF, defined by FLIP-440), a new kind of UDF available in Flink 2.1 come in very handy for this purpose. They allow you to integrate arbitrary logic written in Java into your SQL pipelines, which means you could build a PTF for the integration of external tools via MCP, for instance using the LangChain4j API.
PTFs allow for very flexible customizations of the processing logic of Flink SQL jobs. The integration of MCP into a PTF may be a subject for a future post; in the mean time, refer to this post for taking a first look at using PTFs in the context of a change data capture pipeline for Postgres. |
As PTFs are table valued functions, they can not only operate on single rows and events, but also on groups of rows, for instance all the events pertaining to a specific customer or workflow instance. This makes them a candidate for implementing agent memory; more on that in the following.