Exa-d: 如何在S3中存储网页
Exa-d: How to store the web in S3

原始链接: https://exa.ai/blog/exa-d

## exa-d:用于Web索引的可扩展数据框架 exa-d是一个数据框架,旨在利用Lance高效地处理和更新存储在S3中的大规模Web索引。它解决了Web数据演化中固有的写入放大和复杂依赖性问题。 核心思想是将索引表示为**具有明确依赖关系的类型化列**,形成有向无环图(DAG)。该图决定了执行顺序——计算仅在满足依赖关系且输出缺失时运行——消除了硬编码的流水线。 exa-d的**存储层**利用Lance的分片模式来跟踪细粒度的完整性。更新涉及写入或删除特定分片内的*仅*更改的列数据,避免完全重写。 **执行层**通过比较期望的(完全填充的)状态与实际数据集状态来规划工作,然后使用拓扑排序来计算仅缺失或无效的列。这使用Ray Data实现,并利用Ray Actors进行资源优化(如GPU加载)和流水线并行化。 这种方法能够实现增量更新、有针对性的调试,并确保收敛——在失败后重新运行可以从中断的地方恢复——使exa-d成为动态Web索引的强大且可扩展的解决方案。

## Exa-d:在S3中进行Web规模数据存储 一篇Hacker News上的帖子详细介绍了**exa-d**,exa.ai内部开发的数据处理框架,用于在Amazon S3中存储Web数据。该系统通过声明式类型依赖和对稀疏更新的支持,解决了Web规模数据带来的挑战。 这篇文章引发了关于数据处理编排的讨论,评论者质疑Ray和Anyscale的市场地位。人们对依赖图失效的潜在问题以及对强大异常检测的需求表示担忧——不仅仅依赖于仪表盘和待命支持。 有趣的是,一些用户报告说仅仅*浏览*这篇博客文章时就出现了性能问题,并戏称访问者无意中贡献了计算能力! 这篇文章突出了构建和维护为海量数据集设计的系统的复杂性。
相关文章

原文

To handle these challenges, we built exa-d: a data framework that uses S3 to store the web. The code below roughly outlines what it does:

documents = Column(name="documents", type=str)

tokenized = Column(name="documents_tokenized", type=torch.Tensor).derive()._from(documents).impl(Tokenizer)

embeddings = Column(name="embeddings").derive()._from(tokenized, type=torch.Tensor).impl(EmbeddingModel)

dataset = Dataset(location="s3://exa-data/documents/")

execute_columns(dataset, [tokenized, embeddings])

#The Logical Layer: The Dependency Graph

Data gets transformed in a production web index not as a linear sequence but as a system of independently evolving derived fields. Each field has its own update schedule and dependency surface, such as multiple embedding versions or derived signals like structured extractions. exa-d represents the index as typed columns with declared dependencies. Base columns are ingested data, while derived columns declare intent, forming an explicit dependency graph.

This does two practical things immediately:

  • Execution order is determined by the dependency graph itself vs hardcoded scripts. If embeddings depend on tokenized output, the column declares that dependency and the system determines execution order automatically. Otherwise, a separate script specifying that order would need to be written and maintained for each pipeline variant.

  • Column definitions are contracts. The builder pattern enforces type guarantees, for example Tokenizer: str → Tensor, and makes column definitions reusable instead of relying on string names and ad hoc assumptions about shapes and schemas.

The graph determines what needs to be computed. For each derived column, the system checks whether its inputs exist and whether its output is already computed. Adding a new derived field means adding a node and its edges, not duplicating a pipeline and manually keeping them in sync.

#The Storage Layer: Structuring Data for Precise Updates

While we need to process a lot of data, the index is vast. This means that we are appending relatively small sets of data or replacing a minor fraction of the index. If modifying data required rewriting every column on every interaction or scanning large blocks of rows, this would result in significant write amplification.

exa-d's storage model was designed to account for this with a simple idea: track completeness at the granularity you want to update.

Data lives in Lance on S3. Lance stores the dataset as a collection of fragments with partial schemas. Not every fragment needs the same columns and missing derived columns are expected as updates occur incrementally across the dataset.

This is the core storage operation exa-d relies on: writing or deleting a single column for a specific fragment without rewriting the rest of the fragment.

def write_column_to_fragment(ds: LanceDataset, frag_id: int, col: str, data: pa.Array):

frag = ds.get_fragment(frag_id)

new_file = write_lance_file(

path=f"s3://bucket/{ds.name}/{frag_id}/{col}.lance",

schema=pa.schema([(col, data.type)]),

data=data,

)

patched_frag = bind_file_to_fragment(

frag.metadata,

new_file,

ds.schema,

)

return patched_frag

patched_frags = [

write_column_to_fragment(dataset, fid, "embedding_v2", embeddings[fid])

for fid in missing_frag_ids

]

commit_to_lance(dataset, patched_frags)

Incremental fragment updates lend themselves to a few advantageous properties:

  • Updates at precise granularity. Adding a new derived field or fixing a bug only affects files containing impacted columns. Patching a fragment doesn't rewrite unaffected columns, so efficiency is maintained as the number of columns increases.

  • Global view of column validity. Auxiliary tables, NULL-filled results or external backfill bookkeeping are not required because the fragment metadata records which columns are present. Using the dataset state directly as an atomic source of truth sidesteps tricky transactional logic and state management.

  • Targeted debugging. If a handful of fragments have incorrect values for a derived field, you can delete or invalidate that column for those fragments. The storage format could allow us to modify only the missing or invalid outputs.

#The Execution Layer: Compute Only What is Necessary

Now that we have a dependency graph that declares the workflow we want to execute and the Lance physical layout that shows us what data is already materialized, the last step before workflow execution is query planning: determining what to compute and where.

The bird's eye view provided by Lance allows us to build a detailed query plan with a simple algorithm: We take the difference between the ideal state (all columns are fully populated) and the actual state of the dataset.

ABCD
Fragment 0

1

2

3

-2

Fragment 1

2

4

6

-4

Fragment 2

4

8

93

284

Fragment 3

3

6

9

-6

Fragment 4

5

10

15

-10

Initial state

Step 1 of 6Hover to explore

With the dependency graph and Lance's view of materialized data, query planning becomes a diff: compare the ideal state (all columns populated) against actual state to find what's missing. A topological sort algorithm ensures each column computes after its dependencies, and per-fragment granularity means execution can parallelize across cores or machines. Checkpoints after each fragment avoid redoing work if interrupted.

This gives exa-d a single execution rule: compute missing or invalid columns. Whether a column is missing because it's a new document or because the embedding model changed, the codepath is the same. Backfills and incremental updates follow the same codepath.

#Pipelined Execution on Ray Data

Under the hood, exa-d translates the topologically sorted column graph into Ray Data jobs. Scheduling is gated by fragment completeness, so Ray only sees work items that actually need computation. Expressing each node in the dependency graph as a Ray Data pipeline stage creates separate workers for each Column.

Initial state: All fragments materialized

Step 1 of 5Hover to explore

Loading an embedding model into GPU memory can take seconds to minutes depending on model size and latency stacks across the scale of updated fragments. exa-d uses Ray Actors to load the embedding model once and wait in memory for the next batch of fragments that needs to be updated. Since scheduling is gated by fragment completeness, actors only receive fragments that require recomputation, avoiding redundant inference on already-materialized data.

Separate Actor stages give us pipeline parallelism. If a single worker computed all Columns, the GPU would sit idle during S3 downloads and tokenization. With separate Actors, each resource runs at capacity: the GPU embeds one fragment while the CPU tokenizes the next and the network fetches a third.

#DAG Example

A small synthetic example makes the execution model concrete: define a dependency DAG of derived columns, point it at a dataset where fragments have only some of those columns, and the system materializes only what's missing.

A = Column("A", int)

B = Column("B", int).derive().impl_from(A, lambda a: a * 2)

C = Column("C", int).derive().impl_actor_from(A, TimesThreeActor)

D = Column("D", int).derive().impl_batch_from(B, negate_batch)

E = Column("E", int).derive().from_(B).from_(C).impl(lambda b, c: b+c)

ds = Dataset("s3://bucket/index.lance")

execute_columns(

dataset=ds,

output_columns=[B, C, D, E],

)

The important property is convergence: if execution is rerun after a partial failure, it will eventually reach the same end state where all outputs are computed correctly. Same as usual, exa-d observes missing and valid outputs, recomputes the diff and picks up where it left off.

联系我们 contact @ memedata.com