JavaScript 多线程缺失的标准库
The missing standard library for multithreading in JavaScript

原始链接: https://github.com/W4G1/multithreading

## JavaScript中使用TypeScript的多线程 多线程为JavaScript带来了受Rust启发的并发性,利用Web Workers实现真正的并行性,同时简化了复杂的worker管理。它提供了一个管理的线程池,确保高效的资源利用,并优先考虑内存安全,具有Mutexes、Read-Write Locks和Condition Variables等同步原语。 主要特性包括通过`SharedArrayBuffer`和`SharedJsonBuffer`(用于JSON对象)安全地在线程之间共享状态,以及在worker任务中无缝导入模块(外部库和相对文件)。数据传输到worker通过`move()`函数处理,对兼容对象使用零拷贝传输,对其他对象使用克隆。 核心函数`spawn()`将任务提交到线程池并返回一个句柄以等待结果。同步原语可以防止在访问共享内存时发生竞争条件。建议使用异步方法来获取锁,以避免阻塞worker线程。Channels提供了一个强大、线程安全的队列,用于协调复杂的流程。 该库抽象了Web Worker的复杂性,为JavaScript中的并发编程提供了一种更直观、更强大的方法。它是CPU密集型任务的宝贵工具,可以提高应用程序的响应速度和性能。

一个旨在提供多线程缺失标准的新 JavaScript 库在 Hacker News 上分享。该库由 W4G1 (github.com/w4g1) 开发,它能更轻松地创建和管理线程。 然而,评论者指出一个重大挑战:JavaScript 的词法作用域在这些线程中无法正常工作,导致导入、TypeScript 和变量遮蔽可能出现运行时错误。一位用户建议使用一种专门针对脱离词法作用域的函数的新语法,作为一种潜在但复杂的解决方案。 尽管存在此限制,该库仍受到积极反馈。用户认为它可能是一个有价值的工具,有可能使 JavaScript(和 TypeScript)在需要高效多线程的场景中与 Go 等语言竞争,甚至可能影响多线程 WebAssembly 的未来。 许多评论者指出,使用 Web Workers 的复杂应用程序通常会重新发明类似的功能。
相关文章

原文
Logo

License Downloads NPM version GitHub Repo stars

Multithreading is a TypeScript library that brings robust, Rust-inspired concurrency primitives to the JavaScript ecosystem. It provides a thread-pool architecture, strict memory safety semantics, and synchronization primitives like Mutexes, Read-Write Locks, and Condition Variables.

This library is designed to abstract away the complexity of managing WebWorkers, serialization, and SharedArrayBuffer complexities, allowing developers to write multi-threaded code that looks and feels like standard asynchronous JavaScript.

npm install multithreading

JavaScript is traditionally single-threaded. To achieve true parallelism, this library uses Web Workers. However, unlike standard Workers, this library offers:

  1. Managed Worker Pool: Automatically manages a pool of threads based on hardware concurrency.
  2. Shared Memory Primitives: Tools to safely share state between threads without race conditions.
  3. Scoped Imports: Support for importing external modules and relative files directly within worker tasks.
  4. Move Semantics: Explicit data ownership transfer to prevent cloning overhead.

The entry point for most operations is the spawn function. This submits a task to the thread pool and returns a handle to await the result.

import { spawn } from "multithreading";

// Spawn a task on a background thread
const handle = spawn(() => {
  // This code runs in a separate worker
  const result = Math.random();
  return result;
});

// Wait for the result
const result = await handle.join();

if (result.ok) {
  console.log("Result:", result.value); // 0.6378467071314606
} else {
  console.error("Worker error:", result.error);
}

Passing Data: The move() Function

Because Web Workers run in a completely isolated context, functions passed to spawn cannot capture variables from their outer scope. If you attempt to use a variable inside the worker that was defined outside of it, the code will fail.

To get data from your main thread into the worker, you have to use the move() function.

The move function accepts variadic arguments. These arguments are passed to the worker function in the order they were provided. Despite the name, move handles data in two ways:

  1. Transferable Objects (e.g., ArrayBuffer, Uint32Array): These are "moved" (zero-copy). Ownership transfers to the worker, and the original becomes unusable in the main thread.
  2. Non-Transferable Objects (e.g., JSON, numbers, strings): These are cloned via structured cloning. They remain usable in the main thread.
import { spawn, move } from "multithreading";

// Will be transfered
const largeData = new Uint8Array(1024 * 1024 * 10); // 10MB
// Will be cloned
const metaData = { id: 1 };

// We pass arguments as a comma-separated list.
const handle = spawn(move(largeData, metaData), (data, meta) => {
  console.log("Processing ID:", meta.id);
  return data.byteLength;
});

await handle.join();

SharedJsonBuffer: Sharing Complex Objects

SharedJsonBuffer enables Mutex-protected shared memory for JSON objects, eliminating the overhead of postMessage data copying. Unlike standard buffers, it handles serialization automatically. It supports partial updates, re-serializing only changed bytes rather than the entire object tree for high-performance state synchronization.

import { move, Mutex, SharedJsonBuffer, spawn } from "multithreading";

const sharedState = new Mutex(new SharedJsonBuffer({
  score: 0,
  players: ["Main Thread"],
  level: {
    id: 1,
    title: "Start",
  },
}));

await spawn(move(sharedState), async (lock) => {
  using guard = await lock.acquire();

  const state = guard.value;

  console.log(`Current Score: ${state.score}`);

  // Modify the data
  state.score += 100;
  state.players.push("Worker1");

  // End of scope: Lock is automatically released here
}).join();

// Verify on main thread
using guard = await sharedState.acquire();

console.log(guard.value); // { score: 100, players: ["Main Thread", "Worker1"], ... }

Synchronization Primitives

When multiple threads access shared memory (via SharedArrayBuffer), race conditions occur. This library provides primitives to synchronize access safely.

Best Practice: It is highly recommended to use the asynchronous methods (e.g., acquire, read, write, wait) rather than their synchronous counterparts. Synchronous blocking halts the entire Worker thread, potentially pausing other tasks sharing that worker.

1. Mutex (Mutual Exclusion)

A Mutex ensures that only one thread can access a specific piece of data at a time.

Option A: Automatic Management (Recommended)

This library leverages the Explicit Resource Management proposal (using keyword). When you acquire a lock, it returns a guard. When that guard goes out of scope, the lock is automatically released.

import { spawn, move, Mutex } from "multithreading";

const buffer = new SharedArrayBuffer(4);
const counterMutex = new Mutex(new Int32Array(buffer));

spawn(move(counterMutex), async (mutex) => {
  // 'using' automatically calls dispose() at the end of the scope
  using guard = await mutex.acquire();
  
  guard.value[0]++;
  
  // End of scope: Lock is automatically released here
});

Option B: Manual Management (Bun / Standard JS)

If you are using Bun (which doesn't natively support using and uses a transpiler which is incompatible with this library) or prefer standard JavaScript syntax, you must manually release the lock using drop(). Always use a try...finally block to ensure the lock is released even if an error occurs.

import { spawn, move, Mutex } from "multithreading";

const buffer = new SharedArrayBuffer(4);
const counterMutex = new Mutex(new Int32Array(buffer));

spawn(move(counterMutex), async (mutex) => {
  // Note that we have to import drop here, otherwise it wouldn't be available
  const { drop } = await import("multithreading");

  // 1. Acquire the lock manually
  const guard = await mutex.acquire();

  try {
    // 2. Critical Section
    guard.value[0]++;
  } finally {
    // 3. Explicitly release the lock
    drop(guard);
  }
});

2. RwLock (Read-Write Lock)

A RwLock is optimized for scenarios where data is read often but written rarely. It allows multiple simultaneous readers but only one writer.

import { spawn, move, RwLock } from "multithreading";

const lock = new RwLock(new Int32Array(new SharedArrayBuffer(4)));

// Spawning a Writer
spawn(move(lock), async (l) => {
  // Blocks until all readers are finished (asynchronously)
  using guard = await l.write(); 
  guard.value[0] = 42;
});

// Spawning Readers
spawn(move(lock), async (l) => {
  // Multiple threads can hold this lock simultaneously
  using guard = await l.read(); 
  console.log(guard.value[0]);
});

A Semaphore limits the number of threads that can access a resource simultaneously. Unlike a Mutex (which allows exactly 1 owner), a Semaphore allows N owners. This is essential for rate limiting, managing connection pools, or bounding concurrency.

import { spawn, move, Semaphore } from "multithreading";

// Initialize with 3 permits (allowing 3 concurrent tasks)
const semaphore = new Semaphore(3);

for (let i = 0; i < 10; i++) {
  spawn(move(semaphore), async (sem) => {
    console.log("Waiting for slot...");
    
    // Will wait (async) if 3 threads are already working
    using _ = await sem.acquire(); 
    
    console.log("Acquired slot! Working...");

    await new Promise(r => setTimeout(r, 1000));
    
    // Guard is disposed automatically, releasing the permit for the next thread
  });
}

Like the Mutex, if you cannot use the using keyword, you can manually manage the lifecycle.

spawn(move(semaphore), async (sem) => {
  const { drop } = await import("multithreading");
  // Acquire 2 permits at once
  const guard = await sem.acquire(2);
  
  try {
    // Critical Section
  } finally {
    // Release the 2 permits
    drop(guard);
  }
});

4. Condvar (Condition Variable)

A Condvar allows threads to wait for a specific condition to become true. It saves CPU resources by putting the task to sleep until it is notified, rather than constantly checking a value.

import { spawn, move, Mutex, Condvar } from "multithreading";

const mutex = new Mutex(new Int32Array(new SharedArrayBuffer(4)));
const cv = new Condvar();

spawn(move(mutex, cv), async (lock, cond) => {
  using guard = await lock.acquire();
  
  // Wait until value is not 0
  while (guard.value[0] === 0) {
    // wait() unlocks the mutex, waits for notification, then re-locks.
    await cond.wait(guard);
  }
  
  console.log("Received signal, value is:", guard.value[0]);
});

For higher-level communication, this library provides a Multi-Producer, Multi-Consumer (MPMC) bounded channel. This primitive mimics Rust's std::sync::mpsc but allows for multiple consumers. It acts as a thread-safe queue that handles backpressure, blocking receivers when empty and blocking senders when full.

Channels are the preferred way to coordinate complex workflows (like job queues or pipelines) between workers without manually managing locks.

  • Arbitrary JSON Data: Channels are backed by SharedJsonBuffer, allowing you to send any JSON-serializable value (objects, arrays, strings, numbers, booleans) through the channel, not just raw integers.
  • Bounded: You define a capacity. If the channel is full, send() waits. If empty, recv() waits.
  • Clonable: Both Sender and Receiver can be cloned and moved to different workers.
  • Reference Counted: The channel automatically closes when all Senders are dropped (indicating no more data will arrive) or all Receivers are dropped.

Example: Worker Pipeline with Objects

import { spawn, move, channel } from "multithreading";

// Create a channel that holds objects
const [tx, rx] = channel<{ hello: string }>();

// Producer Thread
spawn(move(tx), async (sender) => {
  await sender.send({ hello: "world" });
  await sender.send({ hello: "multithreading" });
  // Sender is destroyed here, automatically closing the channel
});

// Consumer Thread
spawn(move(rx.clone()), async (rx) => {
  for await (const value of rx) {
    console.log(value); // { hello: "world" }
  }
});

// Because we cloned rx, we can also receive on the main thread 
for await (const value of rx) {
  console.log(value); // { hello: "world" }
}

Importing Modules in Workers

One of the most difficult aspects of Web Workers is handling imports. This library handles this automatically, enabling you to use dynamic await import() calls inside your spawned functions.

You can import:

  1. External Libraries: Packages from npm/CDN (depending on environment).
  2. Relative Files: Files relative to the file calling spawn.

Note: The function passed to spawn must be self-contained or explicitly import what it needs. It cannot access variables from the outer scope unless they are passed via move().

Example: Importing Relative Files and External Libraries

Assume you have a file structure:

  • main.ts
  • utils.ts (contains export const magicNumber = 42;)
// main.ts
import { spawn } from "multithreading";

spawn(async () => {
  // 1. Importing a relative file
  // This path is relative to 'main.ts' (the caller location)
  const utils = await import("./utils.ts");
  // 2. Importing an external library (e.g., from a URL or node_modules resolution)
  const _ = await import("lodash");

  console.log("Magic number from relative file:", utils.magicNumber);
  console.log("Random number via lodash:", _.default.random(1, 100));
  
  return utils.magicNumber;
});

  • spawn(fn): Runs a function in a worker.
  • spawn(move(arg1, arg2, ...), fn): Runs a function in a worker with specific arguments transferred or copied.
  • initRuntime(config): Initializes the thread pool (optional, lazy loaded by default).
  • shutdown(): Terminates all workers in the pool.
  • move(...args): Marks arguments for transfer (ownership move) rather than structured clone. Accepts a variable number of arguments which map to the arguments of the worker function.
  • drop(resource): Explicitly disposes of a resource (calls [Symbol.dispose]). This is required for manual lock management in environments like Bun.
  • SharedJsonBuffer: A class for storing JSON objects in shared memory.
  • channel<T>(capacity): Creates a new channel. Returns [Sender<T>, Receiver<T>].
  • Sender<T>:
    • send(value): Async. Returns Promise<Result<void, Error>>.
    • sendSync(value): Blocking. Returns Result<void, Error>.
    • clone(): Creates a new handle to the same channel (increments ref count).
    • close(): Manually closes the channel for everyone.
  • Receiver<T>:
    • recv(): Async. Returns Promise<Result<T, Error>>.
    • recvSync(): Blocking. Returns Result<T, Error>.
    • clone(): Creates a new handle to the same channel.
    • close(): Manually drops this handle.
  • Mutex<T>:
    • acquire(): Async lock (Recommended). Returns Promise<MutexGuard>.
    • tryLock(): Non-blocking attempt. Returns boolean.
    • acquireSync(): Blocking lock (Halts Worker). Returns MutexGuard.
  • RwLock<T>:
    • read(): Async shared read access (Recommended).
    • write(): Async exclusive write access (Recommended).
    • readSync() / writeSync(): Synchronous/Blocking variants.
  • Semaphore:
    • acquire(amount?): Async wait for n permits. Returns SemaphoreGuard.
    • tryAcquire(amount?): Non-blocking. Returns SemaphoreGuard or null.
    • acquireSync(amount?): Blocking wait. Returns SemaphoreGuard.
  • Condvar:
    • wait(guard): Async wait (Recommended). Yields execution.
    • notifyOne(): Wake one waiting thread.
    • notifyAll(): Wake all waiting threads.
    • waitSync(guard): Blocking wait (Halts Worker).

Technical Implementation Details

For advanced users interested in the internal mechanics:

  • Serialization Protocol: The library uses a custom "Envelope" protocol (PayloadType.RAW vs PayloadType.LIB). This allows complex objects like Mutex handles to be serialized, sent to a worker, and rehydrated into a functional object connected to the same SharedArrayBuffer on the other side.
  • Atomics: Synchronization is built on Int32Array backed by SharedArrayBuffer using Atomics.wait and Atomics.notify.
  • Import Patching: The spawn function analyzes the stack trace to determine the caller's file path. It then regex-patches import() statements within the worker code string to ensure relative paths resolve correctly against the caller's location, rather than the worker's location.
联系我们 contact @ memedata.com