![]() |
|
![]() |
| I'm just learning about this tool now and had a brief question if you have the time:
The paper mentions support for zero-copy intranode object sharing which links to serialization in the Ray docs - https://docs.ray.io/en/latest/ray-core/objects/serialization... I'm really curious how this is performant - I recently tried building a pipeline that leveraged substantial multiprocessing in Python, and found that my process was bottlenecked by the serialization/deserialization that occurs during Python multiprocessing. Would love any reading or explanation you can provide as to how this doesn't also bottleneck a process in Ray, since it seems that data transferred between workers and nodes will need to serialized and deserialized. Thanks in advance! Really cool tool, hopefully I'll be able to use it sooner rather than later. |
![]() |
| Yeah, mmap, I think this is the relevant line [1].
Fun fact, very early on, we used to create one mmapped file per serialized object, but that very quickly broke down. Then we switched to mmapping one large file at the start and storing all of the serialized objects in that file. But then as objects get allocated and deallocated, you need to manage the memory inside of that mmapped file, and we just repurposed a malloc implementation to handle that. [1] https://github.com/ray-project/ray/blob/21202f6ddc3ceaf74fbc... |
![]() |
| I haven't thought about sqlite as a data interchange format, but I was looking at deploying sqlite as a data lake format some time ago, and found it wanting.
1. Dynamically typed (with type affinity) [1]. This causes problems with there are multiple data generating processes. The new sqlite has a STRICT table type that enforces types but only for the few basic types that it has. 2. Doesn't have a date/time type [1]. This is problematic because you can store dates as TEXT, REAL or INTEGER (it's up to the developer) and if you have sqlite files from > 1 source, date fields could be any of those types, and you have to convert between them. 3. Isn't columnar, so complex analytics at scale is not performant. I guess one can use sqlite as a data interchange format, but it's not ideal. One area sqlite does excel in is as a application file format [2] and that's where it is mostly used [3]. [1] https://www.sqlite.org/datatype3.html |
![]() |
| Some of the initial differentiators are described at the bottom of our design doc at https://github.com/ray-project/deltacat/blob/main/deltacat/c.... But yes, controlling file I/O was also an important part of this since it allowed us to (1) run more targeted downloads/reads of only the Parquet row groups and columns participating in compaction and (2) track dirty/clean files to skip unnecessary re-writes of "clean" files that weren't altered by compaction. Also, just better leveraging catalog metadata (e.g., primary key indexes if available) to filter out more files in the initial scan, and to copy clean files into the compacted variant by reference (when supported by the underlying catalog format).
The trick with doing compaction in Python was to ensure that the most performance-sensitive code was delegated to more optimal C++ (e.g, Ray and Arrow) and Rust (e.g., Daft) code paths. If we did all of our per-record processing ops in pure Python, compaction would indeed be much slower. |
![]() |
| This is one of the first times I've heard of people using Daft in the wild. Would you be able to elaborate on where Daft came in handy?
Edit: Nvm, I kept reading! Thanks for the interesting post! |
![]() |
| As someone who used Ray for ML, this is impressive as hell.
I did not even realize it was that viable a substitute for straight data processing tasks. |
![]() |
| For those interested, this would be at a cost of:
- ~$1691/hour on demand - ~$1065/hour reserved - ~$521/hour spot Not including any related data transfer costs. |
![]() |
| Some of DeltaCAT's goals and use cases have been discussed in this 2022 talk: https://youtu.be/M3pZDp1zock?t=4676
Today, our immediate next goal for DeltaCAT is to ensure that the compactor, and similar procedures for Ray, can run on Apache Iceberg. So, if you're an Iceberg user relying on procedures like Spark's "rewrite_data_files" and/or "rewrite_positional_delete_files" to compact your datasets today, then DeltaCAT will let you easily run similar compaction procedures on Ray to realize similar efficiency/scale improvements (even if it winds up delegating some of the work to other projects like PyIceberg, Daft, etc. along the way). Going forward, we'd like DeltaCAT to also provide better general-purpose abstractions (e.g., reading/writing/altering large datasets) to simplify writing Ray apps in Python that work across (1) different catalog formats like Iceberg, Hudi, and Delta and (2) different distributed data processing frameworks like Ray Data, Daft, Modin, etc. From the perspective of an internal DeltaCAT developer, another goal is to just reduce the maintainability burden and dev hours required to write something like a compactor that works across multiple catalogs (i.e., by ensuring that all interfaces used by such a procedure can be readily implemented for multiple catalog formats like Iceberg, Hudi, Delta, etc.). |
![]() |
| > From the typical Amazon EC2 customer’s perspective, this translates to saving over $120MM/year on Amazon EC2 on-demand R5 instance charges.
Does the sales team know about this? /jk |
![]() |
| We wrote the compactor in Python but, as noted in my previous response to quadrature, most of the performance sensitive code is written in C++ and Rust (but still invoked from Python). |
![]() |
| Slightly flip, but it's interesting that no one believes in or brags about cost savings via statistical sampling techniques these days. |
1. This is truly impressive work from AWS. Patrick Ames began speaking about this a couple years ago, though at this point the blog post is probably the best reference. https://www.youtube.com/watch?v=h7svj_oAY14
2. This is not a "typical" Ray use case. I'm not aware of any other exabyte scale data processing workloads. Our bread and butter is ML workloads: training, inference, and unstructured data processing.
3. We have a data processing library called Ray Data for ingesting and processing data, often done in conjunction with training and inference. However, I believe in this particular use case, the heavy lifting is largely done with Ray's core APIs (tasks & actors), which are lower level and more flexible, which makes sense for highly custom use cases. Most Ray users use the Ray libraries (train, data, serve), but power users often use the Ray core APIs.
4. Since people often ask about data processing with Ray and Spark, Spark use cases tend to be more geared toward structured data and CPU processing. If you are joining a bunch of tables together or running SQL queries, Spark is going to be way better. If you're working with unstructured data (images, text, video, audio, etc), need mixed CPU & GPU compute, are doing deep learning and running inference, etc, then Ray is going to be much better.