我的 Erlang 集群中的一条蠕虫,以及微流控技术的冒险之旅
A worm in my Erlang cluster, and adventures in microfluidics

原始链接: https://lucassifoni.info/blog/a-worm-in-my-erlang-cluster-and-adventures-in-microfluidics/

在 Erlang/Elixir 中,集群通常采用全网状拓扑,但也可以配置为稀疏连接。这就带来了一个挑战:当连接不均匀时,单个节点如何映射集群的拓扑结构? 作者探索了“蠕虫式(worming)”遍历集群的方法——利用自传播代码执行洪泛填充遍历。由于集群节点不会自动共享代码,该解决方案涉及: 1. **代码注入**:使用 `Kernel.ParallelCompiler` 和 `:code.load_binary` 创建一个 `Probe` 模块,可以在运行时将其分发并加载到远程节点上。 2. **拓扑映射**:实现一个探测器,查询节点的邻居,然后递归地在这些邻居上触发自身,从而构建集群边缘的完整映射。 3. **本地测试**:利用 `:peer` 模块模拟自定义网状集群,并验证遍历机制。 作者指出,虽然这种方法能够实现强大的内省功能,但必须处理“组长(group leader)”问题,以防止节点之间产生意外的副作用连接。归根结底,这项实验是对 BEAM 运行时内省的一次深入探索,其灵感源于 Joe Armstrong 对分布式系统优雅的处理方式。

Hacker News 最新 | 过往 | 评论 | 提问 | 展示 | 招聘 | 提交 登录 我的 Erlang 集群中的蠕虫,以及微流控技术的探险 (lucassifoni.info) 8 分,由 chantepierre 发布于 2 小时前 | 隐藏 | 过往 | 收藏 | 讨论 | 帮助 指南 | 常见问题 | 列表 | API | 安全 | 法律 | 申请 YC | 联系 搜索:
相关文章

原文

Just to be clear, it should maybe be “minifluidics” or “millifluidics” but I’ll ask for forgiveness on this one, because microfluidics reads better. We’ll see what this has to do with my Erlang cluster a bit later.

Sparsely connected Erlang clusters

An erlang cluster, by default, is fully meshed, meaning that every node maintains a connection to all the others. Since this can lead to excessive chatter and an explosion of edges, it is possible to not fully connect an Erlang cluster, and connect some peers to select peers. This means that instead of having a full mesh, you can cut sub-meshes in an Erlang (or Elixir) cluster and connect them together via bridges, aka single (or sparse) connections.

We will use this notation for graphs from now on, which can be interpreted by .dot visualizers :

graph G {
a -- b;
b -- c;
a -- c;
}

The above graph describes a fully-meshed 3-node graph, while the below one describes a 4-node cluster where the “d” node only connects to “c”, meaning the nodes form a triangle with a tail.

graph G {
a -- b;
b -- c;
a -- c;
d -- c;
}

An Erlang node can list the nodes it sees by calling :erlang.nodes() or Node.list() in Elixir. My question is then : how can you map an arbitrary cluster with arbitrary connections, from a single node ?

Walking an Erlang cluster like a graph

My answer is : by asking all the nodes for their neighbours, and comparing the answers with the neighbours I can see. In the above graph, if I am on node a, I will ask :

  • b, who answers [:a, :c]
  • c, who answers [:a, :b, :d]

My own neighbors are [:b, :c]. If I take the difference between my own neighbors and the answers, node c gives me new knowledge : there is [:d] that only c can reach. But what if d itself has neighbors ? I can ask c to ask d to query their own neighbors. d would report to c who would report to me. And if d finds that some of its neighbors have neighbors it cannot see ? This must continue.

We need to flood-fill the graph no matter its topology.

I’ve promised I’d talk about fluids : the graph traversal illustrations here and lower were done by moving ink in channels, and there are a few details about that in this child post : behind the scenes post.

And to do this, I would like to not ship code to more than a single node.

The need for self-propagating code in the cluster

My goal is now clear : I want to build a mapping tool that works with any Erlang cluster and reports its full topology, no matter how sparse or dense are the connections, and I want this tool to be a single file that I can ship or paste to a single node. Because clustered nodes have no obligation to share code, and hot-loading mechanisms only load code on one node. So, if you have a Probe module loaded on your node, you can’t :erpc.call(neighbor, Probe, :run, []) on it if it does not have this module.

Thankfully, Erlang has tools for us : :code.load_binary(module, filename, binary) where filename is only to tag the newly created module in the code server, and does not map to a filesystem operation, and :code.get_object_code(module) that gives us the object code for a loaded module, but is unable to recover the object code for an in-memory-loaded module.

If you paste :

defmodule Probe do
  def run() do
  ...
  end
end

into iEx, and you call :code.get_object_code(Probe), despite the module being defined, you get an :error.

So, our first task is to create a module that we can paste in a first node, and that makes this first node have access to the module binary. I settled to use Kernel.ParallelCompiler.compile_to_path/2 which produces a .beam file, and to add the temporary compilation path to the code server via Code.append_path/1. I resorted to this function from the elixir compiler itself because I did not find an equivalent in :code that gave later access to the object code.

defmodule ProbeWrapper do
  def load() do
    payload = """
    defmodule ActualProbe do
      def run() do
       :ok
      end
    end
    """
    
    tmp = System.tmp_dir!
    name = (:crypto.strong_rand_bytes(16) |> Base.encode16) <> ".ex"
    path = Path.join(tmp, name)
    File.write!(path, payload)
    Kernel.ParallelCompiler.compile_to_path([path], tmp)
    Code.append_path(tmp)
  end
end

When you paste this, you get :

{:module, ProbeWrapper,
 <<70, 79, 82, 49, 0, 0, 8, 100, 66, 69, 65, 77, 65, 116, 85, 56, 0, 0, 1, 113,
   0, 0, 0, 34, 19, 69, 108, 105, 120, 105, 114, 46, 80, 114, 111, 98, 101, 87,
   114, 97, 112, 112, 101, 114, 8, 95, 95, ...>>, {:load, 0}}

Then you can run ActualProbe, but also access its code :

iex(2)> ProbeWrapper.load
true
iex(3)> ActualProbe.run
:ok
iex(4)> :code.get_object_code(ActualProbe)
{ActualProbe,
 <<70, 79, 82, 49, 0, 0, 5, 224, 66, 69, 65, 77, 65, 116, 85, 56, 0, 0, 0, 176,
   0, 0, 0, 18, 18, 69, 108, 105, 120, 105, 114, 46, 65, 99, 116, 117, 97, 108,
   80, 114, 111, 98, 101, 8, 95, 95, 105, 110, ...>>,
 ~c"/var/folders/28/fbhkc24n215b3jkjvkwq5bs80000gn/T/Elixir.ActualProbe.beam"}

And, now that we have an expression that returns the module binary, we can load it again :

iex(5)> {module, binary, _path} = :code.get_object_code(ActualProbe)
...
iex(6)> :code.load_binary(ActualProbe, ~c"some.fakename", binary)
{:module, ActualProbe}

So we have a first step : a pastable module that gives us access to object code that we can ship around and load. We can re-engineer ActualProbe to contain its own binary access.

defmodule ProbeWrapper do
  def load() do
    payload = """
    defmodule ActualProbe do
      def run() do
       :ok
      end

      def self_code() do
        {_, bin, _} = :code.get_object_code(ActualProbe)
        bin
      end
    end
    """
    
    tmp = System.tmp_dir!
    name = (:crypto.strong_rand_bytes(16) |> Base.encode16) <> ".ex"
    path = Path.join(tmp, name)
    File.write!(path, payload)
    Kernel.ParallelCompiler.compile_to_path([path], tmp)
    Code.append_path(tmp)
  end
end

We then need a graph traversal, an utility to kick it, to keep track of visited nodes, and to ship each node the probe module :

def run_probe(binary) do
  IO.puts("Launching from #{node()}")
  traverse(node(), MapSet.new(), binary)
end

def run_probe(binary, visited) do
  IO.puts("Hello from #{node()}")
  traverse(node(), visited, binary)
end

def traverse(my_node, visited_nodes, binary) do
  visible_neighbors = :erlang.nodes()
  initial = {Map.put(%{}, my_node, visible_neighbors), MapSet.put(visited_nodes, my_node)}
  {adjacent, _} = Enum.reduce(visible_neighbors, initial, fn (neighbor, {adj, visited}) ->
    if MapSet.member?(visited, neighbor) do
      {adj, visited}
    else
      subgraph = visit(neighbor, visited, binary)
      {Map.merge(adj, subgraph), MapSet.union(visited, MapSet.new(Map.keys(subgraph)))}
    end
  end)
  adjacent
end

def visit(node, visited_nodes, binary) do
  :erpc.call(node, :code, :load_binary, [__MODULE__, ~c"actual_probe.module", binary])
  :erpc.call(node, __MODULE__, :run_probe, [binary, visited_nodes])
end

def self_code() do
  {_, bin, _} = :code.get_object_code(ActualProbe)
  bin
end

With this in place, we can run it :

iex(3)> ActualProbe.run_probe(ActualProbe.self_code)
%{nonode@nohost: []}

Well. We don’t have a cluster yet. Before we build a cluster, note that the current worm only discovers nodes, but it could also perform work on each node. We could very well add a parameter to pass a function to be ran. We also are doing a sequential traversal. We could go parallel with :erpc.multicall for example, but either accept duplicate work, or engineer a distributed data structure over a non-fully connected cluster to prevent nodes being visited multiple times. Said otherwise, we are building a cluster discovery mechanism where we only ask each node “who can you see”, but you could add parameters to answer other questions about the cluster (or even load code over the whole cluster) without caring about its topology.

Building a cluster to worm in

To build our cluster, Erlang provides tools like the :peer module, that allows us to spawn new erlang peers. The node spawning other nodes is called the origin, and we will connect through stdio instead of full EPMD. We will also pass connect_all with a false value to mesh the cluster ourselves instead of the default, fully-connected graph.

{:ok, pid, node} =
      :peer.start(%{
        name: :"some_name",
        connection: :standard_io,
        args: [~c"-connect_all", ~c"false"]
      })

We will create a module that allows us to track the peers, nodes, and edges, and setup all the peers. To mesh it, since we have access to all nodes, we call :kernel.connect_node on a node, targeting the other, via :erpc.

defmodule LocalCluster do
  defstruct [:peers, :edges, :nodes]

  def start(string) when is_binary(string) do
    start(parse_graph(string))
  end

  def start(edges) do
    nodes = edges_to_nodes(edges)
    peers = for node <- nodes do
      {node, start_peer(node)} 
    end |> Enum.into(%{})
    
    local_cluster = %__MODULE__{
      nodes: nodes,
      edges: edges,
      peers: peers
    }

    mesh(local_cluster)

    local_cluster
  end

  def start_peer(node) do
    {:ok, pid, node2} =
      :peer.start(%{
        name: :"#{node}",
        connection: :standard_io,
        args: [~c"-connect_all", ~c"false"]
      })
    :peer.call(pid, :code, :add_pathsa, [:code.get_path()])
    %{name: node, node: node2, pid: pid}
  end

  def run_on(cluster, name, {m, f, a}) do
    :peer.call(cluster.peers[name].pid, m, f, a)
  end

  def mesh(%__MODULE__{peers: peers, edges: edges} = cluster) do
    for {n1, n2} <- unique_edges(edges) do
      run_on(cluster, n1, {:net_kernel, :connect_node, [peers[n2].node]})
    end
  end

  def sample() do
    """
    graph G {
      a -- b;
      b -- c;
      a -- c;
      b -- d;
      d -- e;
      e -- f;
      f -- d;
    }
    """
  end
end

A few utility functions (graph, parse_graph, unique_edges, edges_to_nodes…) are removed so we can focus on the mechanisms, but the full code is at the end of the post. When we paste it into iex, we can start a sample cluster :

{:module, LocalCluster,
 <<70, 79, 82, 49, 0, 0, 29, 4, 66, 69, 65, 77, 65, 116, 85, 56, 0, 0, 3, 24, 0,
   0, 0, 80, 19, 69, 108, 105, 120, 105, 114, 46, 76, 111, 99, 97, 108, 67, 108,
   117, 115, 116, 101, 114, 8, 95, 95, ...>>, {:unique_edges, 1}}
iex(2)> LocalCluster.start(LocalCluster.sample())
%LocalCluster{
  peers: %{
    c: %{name: :c, node: :c@mac, pid: #PID<0.120.0>},
    f: %{name: :f, node: :f@mac, pid: #PID<0.115.0>},
    a: %{name: :a, node: :a@mac, pid: #PID<0.119.0>},
    d: %{name: :d, node: :d@mac, pid: #PID<0.116.0>},
    e: %{name: :e, node: :e@mac, pid: #PID<0.117.0>},
    b: %{name: :b, node: :b@mac, pid: #PID<0.118.0>}
  },
  edges: [f: :d, e: :f, d: :e, b: :d, a: :c, b: :c, a: :b],
  nodes: [:f, :d, :e, :b, :a, :c]
}
[true, true, true, true, true, true, true]

Given that we own the list of nodes thanks to the accumulation in a struct, we can observe its actual topology by directly asking each node which neighbors it sees :

iex> cl = LocalCluster.start(LocalCluster.sample())
iex> LocalCluster.graph(cl)
"graph G {
  a@mac -- c@mac;
  b@mac -- c@mac;
  d@mac -- f@mac;
  e@mac -- f@mac;
  a@mac -- b@mac;
  d@mac -- e@mac;
  b@mac -- d@mac;
}"

We see that we get the edges we described in the sample, but by virtue of observation.

Worming the graph

The full files as I ran them :

Now, in a fresh iex session, we can, in order :

  • paste the LocalCluster module
  • start the sample cluster with cl = LocalCluster.start(LocalCluster.sample())
  • paste the ProbeWrapper module
  • call ProbeWrapper.load
  • call bin = ActualProbe.self_code()
  • call LocalCluster.run_on(cl, :a, {:code, :load_binary, [ActualProbe, ~c"some.name", bin]})
  • call LocalCluster.run_on(cl, :a, {ActualProbe, :run_probe, [bin]})
iex(10)> LocalCluster.run_on(cl, :a, {ActualProbe, :run_probe, [bin]})
 %{
   f@mac: [:d@mac, :e@mac],
   d@mac: [:f@mac, :e@mac, :b@mac],
   e@mac: [:f@mac, :d@mac],
   b@mac: [:d@mac, :c@mac, :a@mac],
   a@mac: [:c@mac, :b@mac],
   c@mac: [:a@mac, :b@mac]
 }

Now we can flood fill complex graphs :-) .

Flood filling a complex graph to traverse it from a single point of entry : see the behind the scenes post for details.

A few gotcha’s

  • I’ve forgotten more than once to call :peer.call(pid, :code, :add_pathsa, [:code.get_path()]) to load the Elixir stdlib on the bare Erlang nodes. Without it, MapSet as used in the probe cannot run.
  • Everything looked fine until my first execution of the whole sequence : in the bowtie-shaped cluster, the worm reported a connexion from a to f. This is because using :erpc.call calls the remote function with the group leader of the calling process. This meant that a pid from a rides along the propagation, and nodes have to grab a connection to a. By default, the VM uses dist_auto_connect: :always, which is orthogonal to connect_all that only prevents a full meshing. So all nodes could see a during the worm propagation despite not being actually connected, as seen in the LocalCluster.graph call. The answer was to reset the group leader to the current node by calling :erlang.group_leader(:erlang.whereis(:init), self()). But this is interfering with the cluster in a way. Can we really perform a measurement without interference ? Your guess ;-)* .

This post is inspired by Joe Armstrong’s post “My favorite erlang program” which I’ve read a few years ago. It came again to my mind recently, and after those years in the VM, I feel that I only start to truly grasp why this code was so elegant to him. This made me want to continue my explorations of the introspection abilities of our runtime.

*(Yes, you could ignore mentions of the node that triggers the worm propagation, and only take into account the edges that emanate from its own :erlang.nodes call. But during a brief period, it would connect to each node in this example.)

联系我们 contact @ memedata.com