-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: Spill-To-Disk Object Storage Download #2205
Comments
From what I know such high transfer speed you can only achieve between EC2 instances in cluster placement group. For example If I remember correctly r4.2xlarge achieves ~160 MB/s transfer between ec2 and s3. |
I thought placement groups were a mechanism to improve EC2-EC2 traffic, and not EC2-S3? I'll do some digging and report back. I had always assumed EC2-S3 was so much faster than EC2-EC2 because they had dedicated networking for it, but perhaps I was mistaken... |
This AWS blog post from 2018 would suggest up to 25Gpbs EC2-S3 is possible and also highlights placement groups as a way to accelerate EC2-EC2. This support question would suggest the EC2-S3 limit has since been raised to 100Gbps. I also found this benchmark from 2019, which shows speeds in the 1000s of MB/s, including 1,135 MB/s for the r4.2xlarge. I have not been able to find anyone complaining about the network speeds being below what is advertised. FWIW if using VPC networking, you need to make sure you have configured a VPC Gateway and are using a region-specific endpoint for S3. Otherwise your traffic will transit an Internet Gateway or NAT gateway which will make things a lot slower (and cost a LOT of money). |
You might be right, I don't recall testing against region specific endpoint. That's really interesting, I will have to check that. |
I see two major, and somewhat orthogonal usecases: Usecase: Multiple reads of unpredictable column / row group subsets of the same file (e.g. IOx) Goal: Single read of a subset of column/row groups (e.g. Cloud Fuse, other "analytics on S3 parquet files") I have been hoping our ObjectStore interface would allow for both usecases. In terms of the "many small requests to S3" problem, I was imagining that the S3 ObjectStore implementation would implement "pre-fetching" internally (the same way local filesystems do) to coalesce multiple small requests into fewer larger ones. This strategy is particularly effective if we know what parts of the file are likely to be needed. Conveniently, the parquet format is quite amenable to this (as once the reader has figured out it wants to scan a row group, it also knows what file data (offsets) it needs). |
Thus, if I were doing this I would probably make the following three things
With those components I think most usecases could be addressed and if someone needed custom caching logic they would likely get a good head start using the "buffered" or "cached" interfaces |
I think we all agree on where we would like to end up, however, I worry we are trying to run before we can walk here. I would much prefer an approach that does the simplest thing possible, namely downloads the entire file, and then iteratively add functionality, such as fetching to memory, selective fetching, etc... Currently we have an approach that isn't really very effective at either...
I'm not sure this is a fair comparison, object storage has vastly different performance and billing characteristics from a local filesystem?
Why would you implement this in the ObjectStore API, and not some |
I was thinking that keeping things behind an ObjectStore API makes sense because:
So in other words, binding details of caching / resource usage to DataFusion seemed to be unecessary |
I think this makes sense, the situation I'm trying to avoid is:
Currently the trait returns If we can find some way to handle this, that sounds good to me 😀 |
FWIW within each Spark task, it currently process each row group in a sequential manner, and for each of these it'll read all the projected column chunks (with filtered pages after column index), buffer them in memory and then start decompressing + decoding. For interacting with S3/HDFS/etc, it relies on the Hadoop's FileSystem API. @steveloughran is the expert here on the S3 client implementation. |
choosing when/how to scan and prefetch in object stores is a real tricky business abfs and gcs connectors do forward prefetch in block sizes you can config in hadoop site/job settings, cache into memory. The more prefetching you do, the more likely a large process will run out of memory. s3a doesn't and we've been getting complaints about lack of buffering in the client. it does have different seek policies, look at fs.s3a.experimental.fadvise and fs.s3a.readahead.range You can set seek policy cluster-wise, or, if you use the openFile() api, when opening specific files. we have two big bits of work on going there how to help mitigate things, both in feature branches right now
Note also s3a and abfs connectors connect/report stats through the IOStatistics interface. Even if you build against Hadoop versions which don't have that,
|
if the api you came up with mapped well to that vectored api which is not yet too late to freeze, then it'd be really good, even if you don't yet compile against releases with that api. readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate) where each file range returns a completable future to a byte buffer allocated with the allocator function you supplied. |
Thank you @steveloughran It seems like something any of those pre-fetching systems requires is something to tell what to prefetch (or what ranges are needed in scatter / gather or vectored IO. Maybe that is a good place to start @tustvold |
Do try out the vectored api from the feature brach. Any feedback or improvements is highly appreciated. Thanks. |
Proposal in apache/arrow-rs#1605, thank you all for your very helpful feedback 👍 |
DataFusion is now using the async parquet interface, which automatically handles buffering, and so this can be closed |
Creating as high-level ticket to hopefully get consensus on the approach, before potentially creating lower level tickets
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Currently
ObjectStore::file_reader
returns anArc<dyn ObjectReader>
, this in turn has a methodObjectReader::sync_chunk_reader
which takes a byte range.In the case of parquet, a
ChunkObjectReader
wraps thisObjectReader
and adapts it to theparquet::ChunkReader
trait. The result is that the parquet reader callsObjectReader::sync_chunk_reader
for the byte range of each column chunk, of which there will be one per-column per-RowGroup, which in turn performs a range request to object storage to fetch the bytes.As pointed out by @mateuszkj on datafusion-contrib/datafusion-objectstore-s3#53 this unfortunately results in a large number of small requests to S3 (there are also metadata requests which I will cover in a separate ticket concerning catalogs).
In the case of CSV, JSON, etc...
ObjectReader::sync_reader
is used which is equivalent to callingsync_chunk_reader
with the length of the file, and will therefore buffer the entire file in memory.This approach therefore has two aspects that could be improved:
Describe the solution you'd like
The simplest solution is to download the entire file to temporary local storage. This is what IOx currently does and it works well.
The next obvious improvement would then be to use the MemoryManager and DiskManager functionality added by @yjshen in #1526 to buffer in memory initially and only spill to disk under memory pressure.
I suspect for many use-cases this will perform very well, the key observations being:
A final extension might be to add functionality to fetch smaller byte ranges based on projection and predicate pushdown, I started experimenting with an API of what this might look like here, but I don't have a good handle on how to balance the trade-offs of making too many requests vs requesting data we don't need, and I'm personally inclined to punt on this at least initially...
I'm not very familiar with how spark, etc... solve this problem, this is just based on my intuition, and so perhaps @sunchao or someone with more familiarity with that ecosystem might be able to provide some insight here.
Describe alternatives you've considered
One option we are likely to implement for IOx is having a shared, instance-local, read-through, disk-based Object Storage cache. The idea being to use the ephemeral NVMe disk that is available on cloud provider VMs as a shared cache for one or more query engines running on that instance. This effectively works around this problem by making all IO done by the query engine to very fast local disk, with a separate process handling interaction with object storage as required. It will also accelerate repeated queries to the same "hot" dataset. I would be very happy to write up some tickets if someone wanted to take this on.
This blog post written by @jorgecarleitao proposes streaming files block-wise (thank you @xudong963 for the link). This is close to what the implementation currently does, however, it comes with the drawbacks listed above. FWIW I have also not found this approach to perform especially well on local files either, see here, but I could have been doing something wrong.
Additional context
FYI @alamb @houqp @matthewmturner
The text was updated successfully, but these errors were encountered: