使用 SQLite 构建持久化的执行引擎
Building a Durable Execution Engine with SQLite

原始链接: https://www.morling.dev/blog/building-durable-execution-engine-with-sqlite/

持久执行 (DE) 引擎依赖于持久日志来跟踪工作流步骤,从而实现重试和可重放性——类似于数据库的写前日志。该日志记录执行步骤的*意图*、其参数以及最终结果。 主要有两种方法:使用外部状态存储(如 Temporal 或 Restate),或将状态持久化到应用程序的数据库中(如 DBOS 与 Postgres)。Persistasaurus 选择后者,利用 SQLite 以简化操作,尽管 SQLite 也可以用于生产环境,尤其是在自包含的代理系统中。 SQLite 日志表存储流程 ID、步骤编号、时间戳、方法详情、状态(待处理、等待中、完成)、尝试次数以及序列化的参数/返回值。Persistasaurus 不使用直接的 API 调用来访问 DE 引擎,而是使用代理模式和 ByteBuddy 来透明地拦截方法调用,在执行前后记录它们。 这允许定义简洁的工作流(例如,`stepOne(); stepTwo();`)。拦截器从日志中重放已完成的步骤,以实现确定性执行,但承认一个潜在问题:执行*之后*但*在记录之前*发生的崩溃可能导致步骤重复执行——通过对产生副作用的操作使用幂等性键来缓解。

## 耐用执行引擎:摘要 这次Hacker News讨论围绕一篇关于使用SQLite构建耐用执行(DE)引擎的博客文章展开。核心思想是创建能够在失败后可靠地恢复长时间运行的操作的系统,而无需人工干预。 然而,对话强调DE并不能消除错误处理——它将重点转移到确保过程的每个步骤的*原子性*。一旦实现了强大的原子性,那么完整DE系统的额外复杂性可能就不那么有益了。 关键点包括**幂等性**(使操作可以安全重复)的重要性以及不同方法之间的权衡。一些框架,如Temporal和DBOS,旨在简化这一过程,其中DBOS特别强调一种轻量级方法,避免了Temporal的基础设施开销。人们对这些系统的性能影响和复杂性表示担忧,一些人提倡更简单的解决方案,如队列,甚至在规模要求时构建自定义系统。最终,其价值在于改善开发人员体验以及DE鼓励的架构模式——将任务分解为可靠可重复的步骤。
相关文章

原文

At the core of every DE engine there’s some form of persistent durable execution log. You can think of this a bit like the write-ahead log of a database. It captures the intent to execute a given flow step, which makes it possible to retry that step should it fail, using the same parameter values. Once successfully executed, a step’s result will also be recorded in the log, so that it can be replayed from there if needed, without having to actually re-execute the step itself.

DE logs come in two flavours largely speaking; one is in the form of an external state store which is accessed via some sort of SDK. Example frameworks taking this approach include Temporal, Restate, Resonate, and Inngest. The other option is to persist DE state in the local database of a given application or (micro)service. One solution in this category is DBOS, which implements DE on top of Postgres.

To keep things simple, I went with the local database model for Persistasaurus, using SQLite for storing the execution log. But as we’ll see later on, depending on your specific use case, SQLite actually might also be a great choice for a production scenario, for instance when building a self-contained agentic system.

The structure of the execution log table in SQLite is straight-forward. It contains one entry for each durable execution step:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
CREATE TABLE IF NOT EXISTS execution_log (
  flowId TEXT NOT NULL, (1)
  step INTEGER NOT NULL, (2)
  timestamp INTEGER NOT NULL, (3)
  class_name TEXT NOT NULL, (4)
  method_name TEXT NOT NULL, (5)
  delay INTEGER, (6)
  status TEXT (7)
      CHECK( status IN ('PENDING','WAITING_FOR_SIGNAL','COMPLETE') )
      NOT NULL,
  attempts INTEGER NOT NULL DEFAULT 1, (8)
  parameters BLOB, (9)
  return_value BLOB, (10)
  PRIMARY KEY (flowId, step)
)
1 The UUID of the flow
2 The sequence number of the step within the flow, in the order of execution
3 The timestamp of first running this step
4 The name of the class defining the step method
5 The name of the step method (currently ignoring overloaded methods for this PoC)
6 For delayed steps, the delay in milli-seconds
7 The current status of the step
8 A counter for keeping track of how many times the step has been tried
9 The serialized form of the step’s input parameters, if any
10 The serialized form of the step’s result, if any

This log table stores all information needed to capture execution intent and persist results. More details on the notion of delays and signals follow further down.

When running a flow, the engine needs to know when a given step gets executed so it can be logged. One common way for doing so is via explicit API calls into the engine, e.g. like so with DBOS Transact:

1
2
3
4
5
@Workflow
public void workflow() {
  DBOS.runStep(() -> stepOne(), "stepOne");
  DBOS.runStep(() -> stepTwo(), "stepTwo");
}

This works, but tightly couples workflows to the DE engine’s API. For Persistaurus I aimed to avoid this dependency as much as possible. Instead, the idea is to transparently intercept the invocations of all step methods and track them in the execution log, allowing for a very concise flow expression, without any API dependencies:

1
2
3
4
5
@Flow
public void workflow() {
  stepOne();
  stepTwo();
}

In order for the DE engine to know when a flow or step method gets invoked, the proxy pattern is being used: a proxy wraps the actually flow object and handles each of its method invocations, updating the state in the execution log before and after passing the call on to the flow itself. Thanks to Java’s dynamic nature, creating such a proxy is relatively easy, requiring just a little bit of bytecode generation. Unsurprisingly, I’m using the ByteBuddy library for this job:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
private static <T> T getFlowProxy(Class<T> clazz, UUID id) {
  try {
    return new ByteBuddy()
        .subclass(clazz) (1)
        .method(ElementMatchers.any()) (2)
        .intercept( (3)
            MethodDelegation.withDefaultConfiguration()
                .withBinders(
                    Morph.Binder.install(OverrideCallable.class))
                .to(new Interceptor(id)))
        .make()
        .load(Persistasaurus.class.getClassLoader()) (4)
        .getLoaded()
        .getDeclaredConstructor()
        .newInstance(); (5)
  }
  catch (Exception e) {
    throw new RuntimeException("Couldn't instantiate flow", e);
  }
}
1 Create a sub-class proxy for the flow type
2 Intercept all method invocations on this proxy…​
3 …​and delegate them to an Interceptor object
4 Load the generated proxy class
5 Instantiate the flow proxy

As an aside, Claude Code does an excellent job in creating code using the ByteBuddy API, which is not always self-explanatory. Now, whenever a method is invoked on the flow proxy, the call is delegated to the Interceptor class, which will record the step in the execution log before invoking the actual flow method. I am going to spare you the complete details of the method interceptor implementation (you can find it here on GitHub), but the high-level logic looks like so:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public Object intercept(@This Object instance,
    @Origin Method method,
    @AllArguments Object[] args,
    @Morph OverrideCallable callable) throws Throwable {

  if (!isFlowOrStep(method)) {
    return callable.call(args);
  }

  Invocation loggedInvocation = executionLog.getInvocation(id, step);

  if (loggedInvocation != null &&
      loggedInvocation.status() == InvocationStatus.COMPLETE) { (1)
    step++;
    return loggedInvocation.returnValue();
  }
  else {
    executionLog.logInvocationStart(
        id, step, method.getName(), InvocationStatus.PENDING, args); (2)

    int currentStep = step;
    step++;

    Object result = callable.call(args); (3)

    executionLog.logInvocationCompletion(id, currentStep, result); (4)

    return result;
  }
}
1 Replay completed step if present
2 Log invocation
3 Execute the actual step method
4 Log result

Replaying completed steps from the log is essential for ensuring deterministic execution. Each step typically runs exactly once, capturing non-deterministic values such as the current time or random numbers while doing so.

There’s an important failure mode, though: if the system crashes after a step has been executed but before the result can be recorded in the log, that step would be repeated when rerunning the flow. Odds for this to happen are pretty small, but whether it is acceptable or not depends on the particular use case. When executing steps with side-effects, such as remote API calls, it may be a good idea to add idempotency keys to the requests, which lets the invoked services detect and ignore any potential duplicate calls.

The actual execution log implementation isn’t that interesting, you can find its source code here. All it does is persist step invocations and their status in the execution_log SQLite table shown above.

联系我们 contact @ memedata.com