Skip to content

Commit

Permalink
Add timestamp rendering options (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
rzheka authored Jun 25, 2019
1 parent b117a4d commit 8102e87
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pq2json/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ itertools = "0.8"
serde = "1"
serde_json = "1"
num-bigint = "0.2"
chrono = "0.4"
98 changes: 96 additions & 2 deletions pq2json/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use num_bigint::{BigInt, Sign};
use parquet::data_type::Decimal;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::record::{FieldType, List, ListAccessor, Map, MapAccessor, Row, RowAccessor};
use parquet::schema::printer::{print_file_metadata, print_parquet_metadata};
use serde_json::{Number, Value};
use std::error::Error;
use std::fs::File;
Expand Down Expand Up @@ -44,6 +45,21 @@ fn main() {
.takes_value(false)
.required(false),
)
.arg(
Arg::with_name("timestamp")
.short("t")
.long("timestamp")
.possible_values(&["isostr", "ticks", "unixms"])
.default_value("isostr")
.help(
"Timestamp rendering option. Either \
ticks (100ns ticks since 01-01-01), \
isostr (ISO8601 string) \
or unixms (milliseconds since Unix epoch)",
)
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name("output")
.short("o")
Expand All @@ -53,6 +69,13 @@ fn main() {
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name("schema")
.long("schema")
.help("Print schema")
.takes_value(false)
.required(false),
)
.arg(
Arg::with_name("INPUT")
.help("Input file to use")
Expand All @@ -69,13 +92,28 @@ fn main() {

let input = matches.value_of("INPUT").expect("INPUT must be provided");
let output = matches.value_of("OUT_FILE").unwrap_or("");

let timestamp_rendering = match matches.value_of("timestamp").unwrap_or("ticks") {
"ticks" => TimestampRendering::Ticks,
"isostr" => TimestampRendering::IsoStr,
"unixms" => TimestampRendering::UnixMs,
_ => TimestampRendering::IsoStr,
};

let settings = Settings {
omit_nulls: matches.is_present("omit-nulls") || matches.is_present("prune"),
omit_empty_bags: matches.is_present("omit-empty-bags") || matches.is_present("prune"),
omit_empty_lists: matches.is_present("omit-empty-lists") || matches.is_present("prune"),
timestamp_rendering,
};

let res = if matches.is_present("schema") {
print_schema(input)
} else {
convert(&settings, input, output)
};

match convert(&settings, input, output) {
match res {
Ok(()) => (),
Err(e) => {
eprintln!("ERROR: {:?}", e);
Expand All @@ -89,6 +127,14 @@ struct Settings {
omit_nulls: bool,
omit_empty_bags: bool,
omit_empty_lists: bool,
timestamp_rendering: TimestampRendering,
}

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
enum TimestampRendering {
Ticks,
IsoStr,
UnixMs,
}

const WRITER_BUF_CAP: usize = 256 * 1024;
Expand All @@ -114,6 +160,25 @@ fn convert(settings: &Settings, input: &str, output: &str) -> Result<(), Box<dyn
Ok(())
}

fn print_schema(input: &str) -> Result<(), Box<dyn Error>> {
let file = File::open(&Path::new(input))?;
let reader = SerializedFileReader::new(file)?;
let meta = reader.metadata();
let mut output = Vec::new();
print_parquet_metadata(&mut output, &meta);
println!("\n\nParquet metadata");
println!("=================================================");
println!("{}", String::from_utf8(output)?);

let mut output = Vec::new();
let file_meta = reader.metadata().file_metadata();
print_file_metadata(&mut output, &file_meta);
println!("\n\nFile metadata");
println!("=================================================");
println!("{}", String::from_utf8(output)?);
Ok(())
}

macro_rules! element_to_value {
($ft:expr, $obj:ident, $i:ident, $settings:ident) => {
match $ft {
Expand All @@ -133,7 +198,7 @@ macro_rules! element_to_value {
FieldType::Str => Value::String($obj.get_string($i)?.to_string()),
FieldType::Bytes => bytes_to_value($obj.get_bytes($i)?.data()),
FieldType::Date => Value::Number($obj.get_date($i)?.into()),
FieldType::Timestamp => Value::Number($obj.get_timestamp($i)?.into()),
FieldType::Timestamp => timestamp_to_value($settings, $obj.get_timestamp($i)?)?,
FieldType::Group => row_to_value($settings, $obj.get_group($i)?)?,
FieldType::List => list_to_value($settings, $obj.get_list($i)?)?,
FieldType::Map => map_to_value($settings, $obj.get_map($i)?)?,
Expand Down Expand Up @@ -223,6 +288,35 @@ fn float_to_value(f: f64) -> Value {
.unwrap_or_else(|| Value::Null)
}

const TICKS_TILL_UNIX_TIME: u64 = 621355968000000000u64;

fn timestamp_to_value(settings: &Settings, ts: u64) -> Result<Value, Box<dyn Error>> {
match settings.timestamp_rendering {
TimestampRendering::Ticks => {
let ticks = ts
.checked_mul(10000)
.and_then(|t| t.checked_add(TICKS_TILL_UNIX_TIME));
let v = ticks
.map(|t| Value::Number(t.into()))
.unwrap_or(Value::Null);
Ok(v)
}
TimestampRendering::IsoStr => {
let seconds = (ts / 1000) as i64;
let nanos = ((ts % 1000) * 1000000) as u32;
let datetime =
if let Some(dt) = chrono::NaiveDateTime::from_timestamp_opt(seconds, nanos) {
dt
} else {
return Ok(Value::Null);
};
let iso_str = datetime.format("%Y-%m-%dT%H:%M:%S.%6fZ").to_string();
Ok(Value::String(iso_str))
}
TimestampRendering::UnixMs => Ok(Value::Number(ts.into())),
}
}

fn decimal_to_string(decimal: &Decimal) -> String {
assert!(decimal.scale() >= 0 && decimal.precision() > decimal.scale());

Expand Down

0 comments on commit 8102e87

Please sign in to comment.