Skip to content

Commit

Permalink
feat(timestamp_ns): Implement timestamps with nanosecond precision (a…
Browse files Browse the repository at this point in the history
…pache#542)

* feat(timestamp_ns): first commit

* feat(timestamp_ns): Add mappings for timestamp_ns/timestamptz_ns

* feat(timestamp_ns): Remove unused dep

* feat(timestamp_ns): Fix unit test

* feat(timestamp_ns): Fix test_all_type_for_write()

* feat(timestamp_ns): fix test_transform_days_literal

* feat(timestamp_ns): fix math for timestamptz_nanos

* chore: formatting

* chore: formatting

* chore: Appease clippy

---------

Co-authored-by: Timothy Maloney <[email protected]>
  • Loading branch information
2 people authored and shaeqahmed committed Dec 9, 2024
1 parent e4e60d3 commit 97701dd
Show file tree
Hide file tree
Showing 14 changed files with 353 additions and 30 deletions.
2 changes: 2 additions & 0 deletions crates/catalog/glue/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ impl SchemaVisitor for GlueSchemaBuilder {
PrimitiveType::Double => "double".to_string(),
PrimitiveType::Date => "date".to_string(),
PrimitiveType::Timestamp => "timestamp".to_string(),
PrimitiveType::TimestampNs => "timestamp_ns".to_string(),
PrimitiveType::TimestamptzNs => "timestamptz_ns".to_string(),
PrimitiveType::Time | PrimitiveType::String | PrimitiveType::Uuid => {
"string".to_string()
}
Expand Down
2 changes: 2 additions & 0 deletions crates/catalog/hms/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ impl SchemaVisitor for HiveSchemaBuilder {
PrimitiveType::Double => "double".to_string(),
PrimitiveType::Date => "date".to_string(),
PrimitiveType::Timestamp => "timestamp".to_string(),
PrimitiveType::TimestampNs => "timestamp_ns".to_string(),
PrimitiveType::TimestamptzNs => "timestamptz_ns".to_string(),
PrimitiveType::Time | PrimitiveType::String | PrimitiveType::Uuid => {
"string".to_string()
}
Expand Down
7 changes: 7 additions & 0 deletions crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,13 @@ impl SchemaVisitor for ToArrowSchemaConverter {
// Timestampz always stored as UTC
DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
)),
crate::spec::PrimitiveType::TimestampNs => Ok(ArrowSchemaOrFieldOrType::Type(
DataType::Timestamp(TimeUnit::Nanosecond, None),
)),
crate::spec::PrimitiveType::TimestamptzNs => Ok(ArrowSchemaOrFieldOrType::Type(
// Store timestamptz_ns as UTC
DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
)),
crate::spec::PrimitiveType::String => {
Ok(ArrowSchemaOrFieldOrType::Type(DataType::Utf8))
}
Expand Down
3 changes: 3 additions & 0 deletions crates/iceberg/src/avro/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ impl SchemaVisitor for SchemaToAvroSchema {
PrimitiveType::Time => AvroSchema::TimeMicros,
PrimitiveType::Timestamp => AvroSchema::TimestampMicros,
PrimitiveType::Timestamptz => AvroSchema::TimestampMicros,
PrimitiveType::TimestampNs => AvroSchema::TimestampNanos,
PrimitiveType::TimestamptzNs => AvroSchema::TimestampNanos,
PrimitiveType::String => AvroSchema::String,
PrimitiveType::Uuid => avro_fixed_schema(UUID_BYTES, Some(UUID_LOGICAL_TYPE))?,
PrimitiveType::Fixed(len) => avro_fixed_schema((*len) as usize, None)?,
Expand Down Expand Up @@ -530,6 +532,7 @@ impl AvroSchemaVisitor for AvroSchemaToSchema {
AvroSchema::Date => Type::Primitive(PrimitiveType::Date),
AvroSchema::TimeMicros => Type::Primitive(PrimitiveType::Time),
AvroSchema::TimestampMicros => Type::Primitive(PrimitiveType::Timestamp),
AvroSchema::TimestampNanos => Type::Primitive(PrimitiveType::TimestampNs),
AvroSchema::Boolean => Type::Primitive(PrimitiveType::Boolean),
AvroSchema::Int => Type::Primitive(PrimitiveType::Int),
AvroSchema::Long => Type::Primitive(PrimitiveType::Long),
Expand Down
10 changes: 10 additions & 0 deletions crates/iceberg/src/spec/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ pub enum PrimitiveType {
Timestamp,
/// Timestamp in microsecond precision, with timezone
Timestamptz,
/// Timestamp in nanosecond precision, without timezone
TimestampNs,
/// Timestamp in nanosecond precision with timezone
TimestamptzNs,
/// Arbitrary-length character sequences encoded in utf-8
String,
/// Universally Unique Identifiers, should use 16-byte fixed
Expand All @@ -250,6 +254,8 @@ impl PrimitiveType {
| (PrimitiveType::Time, PrimitiveLiteral::Long(_))
| (PrimitiveType::Timestamp, PrimitiveLiteral::Long(_))
| (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(_))
| (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(_))
| (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(_))
| (PrimitiveType::String, PrimitiveLiteral::String(_))
| (PrimitiveType::Uuid, PrimitiveLiteral::UInt128(_))
| (PrimitiveType::Fixed(_), PrimitiveLiteral::Binary(_))
Expand Down Expand Up @@ -360,6 +366,8 @@ impl fmt::Display for PrimitiveType {
PrimitiveType::Time => write!(f, "time"),
PrimitiveType::Timestamp => write!(f, "timestamp"),
PrimitiveType::Timestamptz => write!(f, "timestamptz"),
PrimitiveType::TimestampNs => write!(f, "timestamp_ns"),
PrimitiveType::TimestamptzNs => write!(f, "timestamptz_ns"),
PrimitiveType::String => write!(f, "string"),
PrimitiveType::Uuid => write!(f, "uuid"),
PrimitiveType::Fixed(size) => write!(f, "fixed({})", size),
Expand Down Expand Up @@ -1158,6 +1166,8 @@ mod tests {
(PrimitiveType::Time, PrimitiveLiteral::Long(1)),
(PrimitiveType::Timestamptz, PrimitiveLiteral::Long(1)),
(PrimitiveType::Timestamp, PrimitiveLiteral::Long(1)),
(PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(1)),
(PrimitiveType::TimestampNs, PrimitiveLiteral::Long(1)),
(
PrimitiveType::Uuid,
PrimitiveLiteral::UInt128(Uuid::new_v4().as_u128()),
Expand Down
10 changes: 10 additions & 0 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1570,6 +1570,11 @@ mod tests {
"v_ts_ntz",
Type::Primitive(PrimitiveType::Timestamp),
)),
Arc::new(NestedField::optional(
12,
"v_ts_ns_ntz",
Type::Primitive(PrimitiveType::TimestampNs
))),
])
.build()
.unwrap(),
Expand Down Expand Up @@ -1678,6 +1683,11 @@ mod tests {
"v_ts_ntz",
Type::Primitive(PrimitiveType::Timestamp),
)),
Arc::new(NestedField::optional(
12,
"v_ts_ns_ntz",
Type::Primitive(PrimitiveType::TimestampNs
)))
])
.build()
.unwrap(),
Expand Down
11 changes: 8 additions & 3 deletions crates/iceberg/src/spec/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ impl Transform {
| PrimitiveType::Time
| PrimitiveType::Timestamp
| PrimitiveType::Timestamptz
| PrimitiveType::TimestampNs
| PrimitiveType::TimestamptzNs
| PrimitiveType::String
| PrimitiveType::Uuid
| PrimitiveType::Fixed(_)
Expand Down Expand Up @@ -200,6 +202,8 @@ impl Transform {
match p {
PrimitiveType::Timestamp
| PrimitiveType::Timestamptz
| PrimitiveType::TimestampNs
| PrimitiveType::TimestamptzNs
| PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Date)),
_ => Err(Error::new(
ErrorKind::DataInvalid,
Expand All @@ -216,9 +220,10 @@ impl Transform {
Transform::Hour => {
if let Type::Primitive(p) = input_type {
match p {
PrimitiveType::Timestamp | PrimitiveType::Timestamptz => {
Ok(Type::Primitive(PrimitiveType::Int))
}
PrimitiveType::Timestamp
| PrimitiveType::Timestamptz
| PrimitiveType::TimestampNs
| PrimitiveType::TimestamptzNs => Ok(Type::Primitive(PrimitiveType::Int)),
_ => Err(Error::new(
ErrorKind::DataInvalid,
format!("{input_type} is not a valid input type of {self} transform",),
Expand Down
71 changes: 70 additions & 1 deletion crates/iceberg/src/spec/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ use serde::ser::SerializeStruct;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use serde_json::{Map as JsonMap, Number, Value as JsonValue};
use timestamp::nanoseconds_to_datetime;
use uuid::Uuid;

use super::datatypes::{PrimitiveType, Type};
use crate::error::Result;
use crate::spec::values::date::{date_from_naive_date, days_to_date, unix_epoch};
use crate::spec::values::time::microseconds_to_time;
use crate::spec::values::timestamp::microseconds_to_datetime;
use crate::spec::values::timestamptz::microseconds_to_datetimetz;
use crate::spec::values::timestamptz::{microseconds_to_datetimetz, nanoseconds_to_datetimetz};
use crate::spec::MAX_DECIMAL_PRECISION;
use crate::{ensure_data_valid, Error, ErrorKind};

Expand Down Expand Up @@ -326,6 +327,12 @@ impl Display for Datum {
(PrimitiveType::Timestamptz, PrimitiveLiteral::Long(val)) => {
write!(f, "{}", microseconds_to_datetimetz(*val))
}
(PrimitiveType::TimestampNs, PrimitiveLiteral::Long(val)) => {
write!(f, "{}", nanoseconds_to_datetime(*val))
}
(PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(val)) => {
write!(f, "{}", nanoseconds_to_datetimetz(*val))
}
(_, PrimitiveLiteral::String(val)) => write!(f, r#""{}""#, val),
(PrimitiveType::Uuid, PrimitiveLiteral::UInt128(val)) => {
write!(f, "{}", Uuid::from_u128(*val))
Expand Down Expand Up @@ -401,6 +408,12 @@ impl Datum {
PrimitiveType::Timestamptz => {
PrimitiveLiteral::Long(i64::from_le_bytes(bytes.try_into()?))
}
PrimitiveType::TimestampNs => {
PrimitiveLiteral::Long(i64::from_le_bytes(bytes.try_into()?))
}
PrimitiveType::TimestamptzNs => {
PrimitiveLiteral::Long(i64::from_le_bytes(bytes.try_into()?))
}
PrimitiveType::String => {
PrimitiveLiteral::String(std::str::from_utf8(bytes)?.to_string())
}
Expand Down Expand Up @@ -734,6 +747,23 @@ impl Datum {
}
}

/// Creates a timestamp from unix epoch in nanoseconds.
///
/// Example:
///
/// ```rust
/// use iceberg::spec::Datum;
/// let t = Datum::timestamp_nanos(1000);
///
/// assert_eq!(&format!("{t}"), "1970-01-01 00:00:00.000001");
/// ```
pub fn timestamp_nanos(value: i64) -> Self {
Self {
r#type: PrimitiveType::TimestampNs,
literal: PrimitiveLiteral::Long(value),
}
}

/// Creates a timestamp from [`DateTime`].
///
/// Example:
Expand Down Expand Up @@ -792,6 +822,23 @@ impl Datum {
}
}

/// Creates a timestamp with timezone from unix epoch in nanoseconds.
///
/// Example:
///
/// ```rust
/// use iceberg::spec::Datum;
/// let t = Datum::timestamptz_nanos(1000);
///
/// assert_eq!(&format!("{t}"), "1970-01-01 00:00:00.000001 UTC");
/// ```
pub fn timestamptz_nanos(value: i64) -> Self {
Self {
r#type: PrimitiveType::TimestamptzNs,
literal: PrimitiveLiteral::Long(value),
}
}

/// Creates a timestamp with timezone from [`DateTime`].
/// Example:
///
Expand Down Expand Up @@ -1805,6 +1852,18 @@ impl Literal {
.format("%Y-%m-%dT%H:%M:%S%.f+00:00")
.to_string(),
)),
(PrimitiveType::TimestampNs, PrimitiveLiteral::Long(val)) => Ok(JsonValue::String(
timestamp::nanoseconds_to_datetime(val)
.format("%Y-%m-%dT%H:%M:%S%.f")
.to_string(),
)),
(PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(val)) => {
Ok(JsonValue::String(
timestamptz::nanoseconds_to_datetimetz(val)
.format("%Y-%m-%dT%H:%M:%S%.f+00:00")
.to_string(),
))
}
(PrimitiveType::String, PrimitiveLiteral::String(val)) => {
Ok(JsonValue::String(val.clone()))
}
Expand Down Expand Up @@ -1958,6 +2017,10 @@ mod timestamp {
// This shouldn't fail until the year 262000
DateTime::from_timestamp_micros(micros).unwrap().naive_utc()
}

pub(crate) fn nanoseconds_to_datetime(nanos: i64) -> NaiveDateTime {
DateTime::from_timestamp_nanos(nanos).naive_utc()
}
}

mod timestamptz {
Expand All @@ -1972,6 +2035,12 @@ mod timestamptz {

DateTime::from_timestamp(secs, rem as u32 * 1_000).unwrap()
}

pub(crate) fn nanoseconds_to_datetimetz(nanos: i64) -> DateTime<Utc> {
let (secs, rem) = (nanos / 1_000_000_000, nanos % 1_000_000_000);

DateTime::from_timestamp(secs, rem as u32).unwrap()
}
}

mod _serde {
Expand Down
4 changes: 3 additions & 1 deletion crates/iceberg/src/transform/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ mod test {
use crate::expr::PredicateOperator;
use crate::spec::PrimitiveType::{
Binary, Date, Decimal, Fixed, Int, Long, String as StringType, Time, Timestamp,
Timestamptz, Uuid,
TimestampNs, Timestamptz, TimestamptzNs, Uuid,
};
use crate::spec::Type::{Primitive, Struct};
use crate::spec::{Datum, NestedField, PrimitiveType, StructType, Transform, Type};
Expand Down Expand Up @@ -297,6 +297,8 @@ mod test {
(Primitive(Time), Some(Primitive(Int))),
(Primitive(Timestamp), Some(Primitive(Int))),
(Primitive(Timestamptz), Some(Primitive(Int))),
(Primitive(TimestampNs), Some(Primitive(Int))),
(Primitive(TimestamptzNs), Some(Primitive(Int))),
(
Struct(StructType::new(vec![NestedField::optional(
1,
Expand Down
4 changes: 3 additions & 1 deletion crates/iceberg/src/transform/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl TransformFunction for Identity {
mod test {
use crate::spec::PrimitiveType::{
Binary, Date, Decimal, Fixed, Int, Long, String as StringType, Time, Timestamp,
Timestamptz, Uuid,
TimestampNs, Timestamptz, TimestamptzNs, Uuid,
};
use crate::spec::Type::{Primitive, Struct};
use crate::spec::{NestedField, StructType, Transform};
Expand Down Expand Up @@ -81,6 +81,8 @@ mod test {
(Primitive(Time), Some(Primitive(Time))),
(Primitive(Timestamp), Some(Primitive(Timestamp))),
(Primitive(Timestamptz), Some(Primitive(Timestamptz))),
(Primitive(TimestampNs), Some(Primitive(TimestampNs))),
(Primitive(TimestamptzNs), Some(Primitive(TimestamptzNs))),
(
Struct(StructType::new(vec![NestedField::optional(
1,
Expand Down
Loading

0 comments on commit 97701dd

Please sign in to comment.