From deb7f831ebdb84b43d29d593104f6f113c1ecd2d Mon Sep 17 00:00:00 2001 From: Pieter Raubenheimer Date: Sun, 3 Apr 2022 11:52:58 +0100 Subject: [PATCH] Fix DNS Error for S3 files with many columns #12 --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/main.rs | 6 ++++-- src/s3_reader.rs | 16 ++++++++++------ 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b4f5b65..573bc16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1118,7 +1118,7 @@ dependencies = [ [[package]] name = "parquet2json" -version = "1.6.0" +version = "1.6.1" dependencies = [ "bytes", "chunked-bytes", diff --git a/Cargo.toml b/Cargo.toml index c6cae24..3210dee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "parquet2json" description = "A command-line tool for converting Parquet to newline-delimited JSON" -version = "1.6.0" +version = "1.6.1" edition = "2018" license = "MIT" authors = ["Pieter Raubenheimer "] diff --git a/src/main.rs b/src/main.rs index 9ee933b..5ed7f75 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ use parquet::record::reader::RowIter; use parquet::schema::printer::print_schema; use parquet::schema::types::Type as SchemaType; use rusoto_core::Region; +use rusoto_s3::S3Client; use url::Url; mod http_reader; @@ -140,13 +141,14 @@ async fn print_json_from( let url = Url::parse(&url_str).unwrap(); let host_str = url.host_str().unwrap(); let key = &url.path()[1..]; + let client = S3Client::new(Region::default()); let mut reader = S3ChunkReader::new_unknown_size( (String::from(host_str), String::from(key)), - Region::default(), + client.clone(), ) .await; - reader.start(Region::default(), timeout).await; + reader.start(client.clone(), timeout).await; let blocking_task = tokio::task::spawn_blocking(move || { let file_reader = SerializedFileReader::new(reader).unwrap(); diff --git a/src/s3_reader.rs b/src/s3_reader.rs index 9e6a228..d1a62a9 100644 --- a/src/s3_reader.rs +++ b/src/s3_reader.rs @@ -1,6 +1,7 @@ use core::panic; use core::time::Duration; use std::io::{self, Read}; +use std::sync::Arc; use bytes::buf::Reader; use bytes::{Buf, Bytes}; @@ -10,10 +11,10 @@ use parquet::data_type::AsBytes; use parquet::errors::Result; use parquet::file::reader::{ChunkReader, Length}; use regex::Regex; -use rusoto_core::Region; use rusoto_s3::{GetObjectOutput, GetObjectRequest, S3Client, S3}; use tokio::io::AsyncReadExt; use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::Semaphore; use tokio_stream::StreamExt; enum Range { @@ -54,13 +55,13 @@ async fn fetch_range(client: S3Client, url: (String, String), range: Range) -> G Range::FromEnd(length) => format!("bytes=-{}", length), }; + println!("{}, {}, {}", url.0, url.1, range_str); let get_obj_req = GetObjectRequest { bucket: url.0, key: url.1, range: Some(range_str), ..Default::default() }; - client.get_object(get_obj_req).await.unwrap() } @@ -93,8 +94,7 @@ impl S3ChunkReader { } } - pub async fn new_unknown_size(url: (String, String), region: Region) -> S3ChunkReader { - let client = S3Client::new(region); + pub async fn new_unknown_size(url: (String, String), client: S3Client) -> S3ChunkReader { let response = fetch_range(client.clone(), url.clone(), Range::FromEnd(4)).await; let content_range = get_content_range(&response); let mut magic_number: Vec = vec![]; @@ -111,15 +111,18 @@ impl S3ChunkReader { Self::new(url, content_range.total_length) } - pub async fn start(&mut self, region: Region, timeout: Duration) { + pub async fn start(&mut self, base_client: S3Client, timeout: Duration) { let (s, mut r) = channel(1); let url = self.url.clone(); self.coordinator = Some(s); + let semaphore = Arc::new(Semaphore::new(32)); tokio::spawn(async move { while let Some(download_part) = r.recv().await.unwrap_or(None) { - let client = S3Client::new(region.clone()); + let client = base_client.clone(); let url = url.clone(); + let semaphore_clone = Arc::clone(&semaphore); tokio::spawn(async move { + let permit = semaphore_clone.acquire().await.unwrap(); let response = fetch_range( client, url, @@ -133,6 +136,7 @@ impl S3ChunkReader { let body = response.body.unwrap().timeout(timeout); tokio::pin!(body); + drop(permit); while let Ok(Some(data)) = body.try_next().await { let reader_channel = download_part.reader_channel.clone(); reader_channel.send(data.unwrap()).await.unwrap_or(());