Skip to content

Commit

Permalink
Fix DNS Error for S3 files with many columns #12
Browse files Browse the repository at this point in the history
  • Loading branch information
jupiter committed Apr 3, 2022
1 parent b6a52e8 commit deb7f83
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "parquet2json"
description = "A command-line tool for converting Parquet to newline-delimited JSON"
version = "1.6.0"
version = "1.6.1"
edition = "2018"
license = "MIT"
authors = ["Pieter Raubenheimer <[email protected]>"]
Expand Down
6 changes: 4 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use parquet::record::reader::RowIter;
use parquet::schema::printer::print_schema;
use parquet::schema::types::Type as SchemaType;
use rusoto_core::Region;
use rusoto_s3::S3Client;
use url::Url;

mod http_reader;
Expand Down Expand Up @@ -140,13 +141,14 @@ async fn print_json_from(
let url = Url::parse(&url_str).unwrap();
let host_str = url.host_str().unwrap();
let key = &url.path()[1..];
let client = S3Client::new(Region::default());

let mut reader = S3ChunkReader::new_unknown_size(
(String::from(host_str), String::from(key)),
Region::default(),
client.clone(),
)
.await;
reader.start(Region::default(), timeout).await;
reader.start(client.clone(), timeout).await;

let blocking_task = tokio::task::spawn_blocking(move || {
let file_reader = SerializedFileReader::new(reader).unwrap();
Expand Down
16 changes: 10 additions & 6 deletions src/s3_reader.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::panic;
use core::time::Duration;
use std::io::{self, Read};
use std::sync::Arc;

use bytes::buf::Reader;
use bytes::{Buf, Bytes};
Expand All @@ -10,10 +11,10 @@ use parquet::data_type::AsBytes;
use parquet::errors::Result;
use parquet::file::reader::{ChunkReader, Length};
use regex::Regex;
use rusoto_core::Region;
use rusoto_s3::{GetObjectOutput, GetObjectRequest, S3Client, S3};
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Semaphore;
use tokio_stream::StreamExt;

enum Range {
Expand Down Expand Up @@ -54,13 +55,13 @@ async fn fetch_range(client: S3Client, url: (String, String), range: Range) -> G
Range::FromEnd(length) => format!("bytes=-{}", length),
};

println!("{}, {}, {}", url.0, url.1, range_str);
let get_obj_req = GetObjectRequest {
bucket: url.0,
key: url.1,
range: Some(range_str),
..Default::default()
};

client.get_object(get_obj_req).await.unwrap()
}

Expand Down Expand Up @@ -93,8 +94,7 @@ impl S3ChunkReader {
}
}

pub async fn new_unknown_size(url: (String, String), region: Region) -> S3ChunkReader {
let client = S3Client::new(region);
pub async fn new_unknown_size(url: (String, String), client: S3Client) -> S3ChunkReader {
let response = fetch_range(client.clone(), url.clone(), Range::FromEnd(4)).await;
let content_range = get_content_range(&response);
let mut magic_number: Vec<u8> = vec![];
Expand All @@ -111,15 +111,18 @@ impl S3ChunkReader {
Self::new(url, content_range.total_length)
}

pub async fn start(&mut self, region: Region, timeout: Duration) {
pub async fn start(&mut self, base_client: S3Client, timeout: Duration) {
let (s, mut r) = channel(1);
let url = self.url.clone();
self.coordinator = Some(s);
let semaphore = Arc::new(Semaphore::new(32));
tokio::spawn(async move {
while let Some(download_part) = r.recv().await.unwrap_or(None) {
let client = S3Client::new(region.clone());
let client = base_client.clone();
let url = url.clone();
let semaphore_clone = Arc::clone(&semaphore);
tokio::spawn(async move {
let permit = semaphore_clone.acquire().await.unwrap();
let response = fetch_range(
client,
url,
Expand All @@ -133,6 +136,7 @@ impl S3ChunkReader {
let body = response.body.unwrap().timeout(timeout);
tokio::pin!(body);

drop(permit);
while let Ok(Some(data)) = body.try_next().await {
let reader_channel = download_part.reader_channel.clone();
reader_channel.send(data.unwrap()).await.unwrap_or(());
Expand Down

0 comments on commit deb7f83

Please sign in to comment.