流式压缩优于帧式压缩。
Streaming compression beats framed compression

原始链接: https://bou.ke/blog/compressed/

## 带宽优化的流式压缩 传统的 WebSocket 压缩将每个消息独立处理。然而,一种更有效的方法——**流式压缩**——在多个消息之间共享单个编码器/解码器上下文。这使得压缩器能够在流上构建“上下文”,从而实现显著更好的压缩比。 作者使用 zstd 实现此功能,用于通过 Wi-Fi 连接控制机器人。最初使用 zstd 字典压缩的尝试证明过于繁琐,促使作者意识到可以随着编码器处理数据,动态地创建“字典”。这与帧间视频编码(如 H264)的优势类似,相比于每消息压缩,实现了**额外的 80% 带宽降低**。 这项技术不仅限于机器人技术。潜在的应用包括 OpenTelemetry Collector(提高批处理导出效率)和 HTTP 响应压缩(开发了一个自定义 Rust crate 来解决现有解决方案(如 `tower-http`)在流式响应方面的局限性)。虽然使用 gRPC 的每消息压缩限制实现起来具有挑战性,但作者的工作证明了通过流式压缩可以实现巨大的收益。

一个黑客新闻的讨论探讨了**流式压缩**相对于传统帧压缩在数据传输方面的优势。核心思想是,压缩连续的数据流,而不是单个“帧”,可以更有效率。 然而,评论者提出了重要的注意事项。在整个流中维护单一的压缩“上下文”需要在服务器和客户端上占用大量内存,并且压缩级别需要仔细考虑。至关重要的是,解码依赖于拥有*整个*先前的流,这阻碍了向多个接收者广播的优化。 讨论还涉及可靠性:流式压缩可能依赖于保证按顺序传输的协议(例如HTTP中使用的TCP),以在双方维护一致的压缩上下文。本质上,底层的传输层处理数据包丢失和重新排序。
相关文章

原文

Standard WebSocket compression uses framed compression where every message is compressed independently of any other. This makes the compression more effective for larger messages, since the compressor has more ‘context’ to work with. To control our robots we’re sending about 10 messages per second that are medium-sized, serde flexbuffer-encoded messages, about 100KB each. These compress fairly well with per-frame compression.

There is a more effective method I’ve figured out however: share a single encoder context across messages. Then for every message, compress the data and flush the output. On the other side the same is done to decompress: create a decoder context, feed received messages into it and whatever is yielded for every frame, that’s the decompressed message. In zstandard terminology, you start a frame but never finish it. Every time you flush it will end a block. Normally when doing streamed compression this is done after the input or output crosses a certain threshold, but flushing makes it work for a framed transport like WebSockets.

I came up with this because we were optimizing the bandwidth for our construction robots, as we work on-site using a Wi-Fi connection. The first attempt used zstd with dictionary compression, but sending around and maintaining the dictionary was tedious. So I realized: what if we could create the dictionary on the fly? This is essentially what happens: the encoder learns and get better at compressing the stream as more data is fed through it. In our case it decreased the bandwidth by another 80% compared to per-message zstandard compression.

This is why H264 interframe video encoding beats MJPEG intraframe encoding: the codec has a lot more context to work with.

Other uses for streaming compression

This approach could also be adopted for the OpenTelemetry Collector, currently every batch export is compressed individually, this could also use streaming compression for a free bandwidth reduction. This doesn’t seem to be easy to do in gRPC (it only supports per-message compression, even in a streaming RPC) so it would require a custom protocol.

I’m such a big fan of streaming compression that I also made a Rust crate to compress HTTP responses, supporting streaming for gRPC-web and SSE. I had to create this crate because the response compression built into tower-http (the most widely used implementation) doesn’t work with streaming responses.

Example Code

Here’s basically what I do to compress/decompress WebSocket frames:

use std::io::Write;
use zstd_safe::{CCtx, CParameter, DCtx, DParameter, FrameFormat, InBuffer, OutBuffer};

pub struct Decoder {
    dctx: DCtx<'static>,
}

impl Decoder {
    pub fn new() -> Self {
        let dctx = DCtx::create();
        Decoder {
            dctx,
        }
    }

    pub fn decode(&mut self, data: &[u8], dst: &mut impl Write) -> std::io::Result<()> {
	    let mut buf: [0u8; 16_384];
        let mut in_buffer = InBuffer::around(data);

        loop {
            let mut out_buffer = OutBuffer::around(&mut buf);
            self.dctx
                .decompress_stream(&mut out_buffer, &mut in_buffer)
                .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
            let pos = out_buffer.pos();
            if in_buffer.pos() >= data.len() && pos == 0 {
                break;
            }
            dst.write_all(&buf[..pos])?;
        }

        Ok(())
    }
}

pub struct Encoder {
    cctx: CCtx<'static>,
}

impl Encoder {
    pub fn new() -> Self {
        let cctx = CCtx::create();
        Encoder {
            cctx,
        }
    }

    pub fn encode(&mut self, data: &[u8], dst: &mut impl Write) -> std::io::Result<()> {
	    let mut buf: [0u8; 16_384];
        let mut in_buffer = InBuffer::around(data);
        while in_buffer.pos() < data.len() {
            let mut out_buffer = OutBuffer::around(&mut self.buf);
            self.cctx
                .compress_stream2(&mut out_buffer, &mut in_buffer, zstd_safe::zstd_sys::ZSTD_EndDirective::ZSTD_e_continue)
                .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
            dst.write_all(&buf[..out_buffer.pos()])?;
        }

        loop {
            let mut out_buffer = OutBuffer::around(&mut buf);
            let remaining = self.cctx
                .compress_stream2(&mut out_buffer, &mut in_buffer, zstd_safe::zstd_sys::ZSTD_EndDirective::ZSTD_e_flush)
                .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
            dst.write_all(&buf[..out_buffer.pos()])?;
            if remaining == 0 {
                break;
            }
        }

        Ok(())
    }
}

Dec 2025

联系我们 contact @ memedata.com