[History Server] Add compression library for history server#4828
Conversation
| } | ||
|
|
||
| // ReadCompressedContent reads a gzip-compressed file from cloud storage and returns the decompressed byte slice. | ||
| func ReadCompressedContent(reader StorageContentReader, clusterId string, fileName string) ([]byte, error) { |
There was a problem hiding this comment.
Maybe it's on the followup PR, just curious on read, how do we plan to keep compatibility?
For example, if we change compression algorithms in the future (also existing objects are not compressed), how do we tell whether and which compression is needed?
There was a problem hiding this comment.
Yeah so if there are support for multiple algorithms in the future, I'm assuming that each one of them will also have a unique identifier at the beginning/end of the file similar to what gzip has right now ie the "1F 8B" at the beginning.
But you're right, a verification should be added here since the file also might not even be zipped.
There was a problem hiding this comment.
a verification should be added here since the file also might not even be zipped.
Likely not a verification, but a metadata stored somewhere.
Please notice, we already have uncompressed logs in production, please don't make incompatible changes.
There was a problem hiding this comment.
Just to clarify, this won't be used for ray logs, it will be only be used for the events on the history server. The compression won't impact existing logs that are already in production.
There was a problem hiding this comment.
It has nothing to do with logs specifically - - we have events in s3 as well.
There was a problem hiding this comment.
Please notice, we already have uncompressed logs in production, please don't make incompatible changes.
This line was confusing to me so I just wanted to clarify.
There was a problem hiding this comment.
Please notice, we already have uncompressed logs in production, please don't make incompatible changes.
For sure, I have a few questions:
- When do we enable compression? To me, they should only kick in on large objects (say, larger than 512MiB)
- Still, I think ZSTD is superior than GZIP by all means; the only reasonable justification for GZIP is dependency -- we don't have ZSTD in the codebase right now
- Since history server has already been released for a while (I even see blog post from google https://docs.cloud.google.com/kubernetes-engine/docs/add-on/ray-on-gke/how-to/enable-ray-history-server), so compatibility is something we should keep
- The goal to for the new history server to read currently already persisted uncompressed objects
- To me, we need to persist the compression metadata so the proper decompressor will be used on read
- object metadata (GCS: https://docs.cloud.google.com/storage/docs/metadata) could be a proper place
There was a problem hiding this comment.
Since history server has already been released for a while (I even see blog post from google https://docs.cloud.google.com/kubernetes-engine/docs/add-on/ray-on-gke/how-to/enable-ray-history-server), so compatibility is something we should keep
There's a disclaimer that history server is alpha in this doc. We can break compatibility before the Beta release.
There was a problem hiding this comment.
I understand, the point I want to make is, if there's an easy way to maintain compatibility (which we do), we should do so.
| } | ||
|
|
||
| // ReadCompressedContent reads a gzip-compressed file from cloud storage and returns the decompressed byte slice. | ||
| func ReadCompressedContent(reader StorageContentReader, clusterId string, fileName string) ([]byte, error) { |
There was a problem hiding this comment.
curious how do we plan to use the function (since it's not in the PR or PR description or issue)? dumping to a local file? reading it to memory and display on the UI?
One thing might be useful is, compression algos usually support stream-based decompression (ref for zstd: https://facebook.github.io/zstd/zstd_manual.html#Chapter22), which can be useful to stream decompressed content to history server.
There was a problem hiding this comment.
Well, the plan is to compress the event files on the collector side and then read those on the history server side. I sincerely apologize if I am not getting the question.
So, it does seem like zstd is a vasty superior compression algo to gzip, support can be added in a followup PR.
There was a problem hiding this comment.
It reads to me it's also possible to do stream-based decompression for gzip.
For example: https://docs.rs/flate2/latest/flate2/
| // of file size: | ||
| // 1. Opens the uncompressed local file on disk. | ||
| // 2. Spools compressed output into an ephemeral temporary file via CompressStream. | ||
| // 3. Seeks the temp file to the beginning to satisfy io.ReadSeeker, which is required by cloud SDKs |
There was a problem hiding this comment.
TBH I'm not super convinced, since the current impl involved
- read local + compress + write
- read compressed + write compressed
It seems to me the missing piece is remote storage's append API. Why is it better?
pseudocode
while True
bytes = read chunk
if len(bytes) == 0:
break
writer.append(bytes)
bytes_written += bytes_writtenFor small objects, we could do whatever easy -- memory consumption and disk usage is not a concern at all.
For large objects, we should leverage multipart upload (or even resumable uploads) -- under the hood, the writer.append API is accumulating bytes and doing send write_part request
pseudocode
fn append(buf):
buffer.append(required bytes for buf)
if buffer > threshold:
initiate a multipart upload if necessary
send write_part requestThere was a problem hiding this comment.
So are you suggesting leveraging cloud features that will essentially be appending to the storages? So just read local + write compress?
There was a problem hiding this comment.
Yes, that way we don't increase local storage, meanwhile also being able to cap peak memory consumption. Wdyt?
There was a problem hiding this comment.
Just to add on, I don't think we should utilize storage specific features. The maintenance burden for utilizing the storage solution's append API (I'm assuming this is what you mean by remote storage API) would be large. Just implementing the append API alone for GCS could potentially introduce a lot of overhead and boilerplate in each storage backend.
There was a problem hiding this comment.
No it has nothing to do with specific storage backend -- I don't even thing any object storage has native append support. append is the semantics our storage API should expose.
Checkout https://github.com/apache/opendal/blob/main/core/core/src/raw/oio/write/multipart_write.rs
There was a problem hiding this comment.
Overall, I'm proposing to use opendal or some other battle-tested libraries instead of hand-written one.
Opendal seems cool, but I'm not sure we can remove the existing storage backends we've already developed at this point. While yes they are "hand-written", they have to be to encode logic that is specific to history server implementation. I don't see how opendal would solve this problem but let me know if I'm missing something.
If you're open to contributing, I would be open to reviewing a PR for a new storage backend that uses opendal that we can maintain in parallel to existing storage implementations. That would also help us compare and reconsider consolidation to something like opendal in the future.
A side topic for compression there're a few other requirement nice to have
The SDK should support object metadata, so we could store the compression attributes
Compression should be better implemented as an interceptor for all read/write operations
These are good considerations. I also saw your comment about using zstd instead of gzip which I think is worth considering. @chiayi would be good to evaluate this in a follow-up PR. I think it's more important to land some sort of compression logic first. But we have a couple months before KubeRay v1.7 to change this :)
There was a problem hiding this comment.
I don't see how opendal would solve this problem but let me know if I'm missing something.
All the requirements mentioned above are natively supported by opendal :)
Usually the consideration is whether the sdk is battle tested. I work at Twitter and we use opendal for our internal object storage accessor.
The load test we have is 450GBps download / 400GBps upload at the same time for half a day, single digit millisecond latency within single dc. I think history server's load shouldn't be that harsh.
Overall I'm proposing to use existing sdk with io features supported, instead of us implementing everything
There was a problem hiding this comment.
noted, the storage interface is actually pretty simple, so I don't feel like it's a burden to implement everything ourselves.
Can you open a PR to introduce an opendal-based storage implementation for the interface here: https://github.com/ray-project/kuberay/blob/master/historyserver/pkg/storage/interface.go?
There was a problem hiding this comment.
the storage interface is actually pretty simple, so I don't feel like it's a burden to implement everything ourselves.
Please don't overlook the difficulty to implement a production-grade IO sdk, current API is simple because we haven't start to add any perf features. A few questions I could think of:
- On concurrent read and write, how should we cut the request size, considering both performance and API cost
- To issue HTTP requests, do we plan to use HTTP/2 or HTTP/1.1? A lot of IO libraries don't even support connection pool for HTTP/2 or only have broken support (i.e., rust hyper).
- How do we limit the ongoing request count? We need to do concurrent limit anyway, otherwise we exhaust FD.
- On read, how do we prefetch? since (1) object storage is slow; (2) our access pattern is fully sequential
- How do we collect metrics (maybe tracing) for object level operation and HTTP requests?
- The list goes on.
There was a problem hiding this comment.
those are all good considerations, I think reviewing a reference implementation using opendal that has all of that would be helpful
andrewsykim
left a comment
There was a problem hiding this comment.
Any particular reason we're just adding the library here and not implementing the compression in history server?
Maybe its too forward looking but I wanted to "isolate" the compression part to future proof it as well as give it maintainability. My thought was that implementing it in history server could create unnecessary bloat, but happy to change that if this is leading to being "too modular". |
|
@chiayi the separate package makes sense, I meant why not use the package in the actual implementation in this PR? |
| } | ||
|
|
||
| // WriteCompressedBytes compresses an in-memory byte slice and uploads it to any active cloud provider. | ||
| func WriteCompressedBytes(writer StorageWriter, remotePath string, data []byte) error { |
There was a problem hiding this comment.
We will move store the events in files before flushing to the remote. Maybe we can handle this in another PR.
There was a problem hiding this comment.
I think this will need further discussion. Lets discuss this offline.
| } | ||
|
|
||
| // ReadCompressedContent reads a gzip-compressed file from cloud storage and returns the decompressed byte slice. | ||
| func ReadCompressedContent(reader StorageContentReader, clusterId string, fileName string) ([]byte, error) { |
There was a problem hiding this comment.
| func ReadCompressedContent(reader StorageContentReader, clusterId string, fileName string) ([]byte, error) { | |
| func ReadCompressedContent(reader StorageContentReader, clusterId string, fileName string) (io. ReadCloser, error) { |
Is it possible that we just return an io.ReadCloser instead, so the caller can decide whether to read the entire decompressed content into memory or stream it?
There was a problem hiding this comment.
So just the io.ReaderCloser alone wouldn't work since closing the reader doesn't close the grzip reader. I wrapped it with like a gzipReadCloser that also has the underlying io.Reader. Let me know what you think!
There was a problem hiding this comment.
LGTM. A small nit about error handling:
var errs []error
if err := gr.Close(); err != nil {
errs = append(errs, fmt.Errorf("gzip close: %w", err))
}
if err := rc.Close(); err != nil {
errs = append(errs, fmt.Errorf("source close: %w", err))
}
return errors.Join(errs...)There was a problem hiding this comment.
Updated the error handling for Close(), PTAL!
Oh I see, I'm not using it yet because I'm waiting for the re-achitecture to wrap up. I know it will be used but I'm currently still not sure where in the code. |
b50c20f to
d80fa06
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Reviewed by Cursor Bugbot for commit d80fa06. Configure here.
d80fa06 to
3353e19
Compare
3353e19 to
29a3e66
Compare
andrewsykim
left a comment
There was a problem hiding this comment.
Merging this, but @chiayi please try to address remaining feedback in follow-up PR that uses the compression library

Why are these changes needed?
This PR adds a compression library that could be used with the current reader and writer to compress and upload file to cloud provider.
The current plan is to utilize compression for events only. Further discussion can be made if we want to compress logs as well.
Some implementation details:
WriteCompressedFile(writer StorageWriter, remotePath string, localFilePath string)can directly upload to storage solution with constant memory usage.Related issue number
Part of #4827
Checks