diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index 275ff690b041..d89216248747 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -286,6 +286,16 @@ bool HiveConfig::s3UseProxyFromEnv() const { return config_->get(kS3UseProxyFromEnv, false); } +uint8_t HiveConfig::readTimestampUnit(const Config* session) const { + const auto unit = session->get( + kReadTimestampUnitSession, + config_->get(kReadTimestampUnit, 3 /*milli*/)); + VELOX_CHECK( + unit == 3 || unit == 6 /*micro*/ || unit == 9 /*nano*/, + "Invalid timestamp unit."); + return unit; +} + uint8_t HiveConfig::parquetWriteTimestampUnit(const Config* session) const { const auto unit = session->get( kParquetWriteTimestampUnitSession, diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index 312d810b8e60..0486991cf264 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -231,6 +231,12 @@ class HiveConfig { static constexpr const char* kS3UseProxyFromEnv = "hive.s3.use-proxy-from-env"; + // The unit for reading timestamps from files. + static constexpr const char* kReadTimestampUnit = + "hive.reader.timestamp-unit"; + static constexpr const char* kReadTimestampUnitSession = + "hive.reader.timestamp_unit"; + /// Timestamp unit for Parquet write through Arrow bridge. static constexpr const char* kParquetWriteTimestampUnit = "hive.parquet.writer.timestamp-unit"; @@ -333,6 +339,9 @@ class HiveConfig { bool s3UseProxyFromEnv() const; + // Returns the timestamp unit used when reading timestamps from files. + uint8_t readTimestampUnit(const Config* session) const; + /// Returns the timestamp unit used when writing timestamps into Parquet /// through Arrow bridge. 0: second, 3: milli, 6: micro, 9: nano. uint8_t parquetWriteTimestampUnit(const Config* session) const; diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 57c29a3e1d39..7a3004db1a38 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -572,7 +572,9 @@ void configureRowReaderOptions( const std::shared_ptr& scanSpec, std::shared_ptr metadataFilter, const RowTypePtr& rowType, - const std::shared_ptr& hiveSplit) { + const std::shared_ptr& hiveSplit, + const std::shared_ptr& hiveConfig, + const Config* sessionProperties) { auto skipRowsIt = tableParameters.find(dwio::common::TableParameter::kSkipHeaderLineCount); if (skipRowsIt != tableParameters.end()) { @@ -582,6 +584,10 @@ void configureRowReaderOptions( rowReaderOptions.setMetadataFilter(std::move(metadataFilter)); rowReaderOptions.setRequestedType(rowType); rowReaderOptions.range(hiveSplit->start, hiveSplit->length); + if (hiveConfig && sessionProperties) { + rowReaderOptions.setTimestampPrecision(static_cast( + hiveConfig->readTimestampUnit(sessionProperties))); + } } namespace { diff --git a/velox/connectors/hive/HiveConnectorUtil.h b/velox/connectors/hive/HiveConnectorUtil.h index 621a8c59aaf3..fd39c9cd1810 100644 --- a/velox/connectors/hive/HiveConnectorUtil.h +++ b/velox/connectors/hive/HiveConnectorUtil.h @@ -81,7 +81,9 @@ void configureRowReaderOptions( const std::shared_ptr& scanSpec, std::shared_ptr metadataFilter, const RowTypePtr& rowType, - const std::shared_ptr& hiveSplit); + const std::shared_ptr& hiveSplit, + const std::shared_ptr& hiveConfig = nullptr, + const Config* sessionProperties = nullptr); bool testFilters( const common::ScanSpec* scanSpec, diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index 248d1594e820..03a3d6f75718 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -272,7 +272,9 @@ void SplitReader::createReader( scanSpec_, std::move(metadataFilter), ROW(std::move(columnNames), std::move(columnTypes)), - hiveSplit_); + hiveSplit_, + hiveConfig_, + connectorQueryCtx_->sessionProperties()); } bool SplitReader::checkIfSplitIsEmpty( diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 6b2729c0b710..004b0f6b801c 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -31,6 +31,11 @@ namespace facebook::velox::parquet { using thrift::Encoding; using thrift::PageHeader; +struct __attribute__((__packed__)) Int96Timestamp { + int32_t days; + uint64_t nanos; +}; + void PageReader::seekToPage(int64_t row) { defineDecoder_.reset(); repeatDecoder_.reset(); @@ -371,6 +376,42 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { } break; } + case thrift::Type::INT96: { + auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp); + dictionary_.values = AlignedBuffer::allocate(numVeloxBytes, &pool_); + auto numBytes = dictionary_.numValues * sizeof(Int96Timestamp); + if (pageData_) { + memcpy(dictionary_.values->asMutable(), pageData_, numBytes); + } else { + dwio::common::readBytes( + numBytes, + inputStream_.get(), + dictionary_.values->asMutable(), + bufferStart_, + bufferEnd_); + } + // Expand the Parquet type length values to Velox type length. + // We start from the end to allow in-place expansion. + auto values = dictionary_.values->asMutable(); + auto parquetValues = dictionary_.values->asMutable(); + + for (auto i = dictionary_.numValues - 1; i >= 0; --i) { + // Convert the timestamp into seconds and nanos since the Unix epoch, + // 00:00:00.000000 on 1 January 1970. + int64_t nanos; + memcpy( + &nanos, + parquetValues + i * sizeof(Int96Timestamp), + sizeof(int64_t)); + int32_t days; + memcpy( + &days, + parquetValues + i * sizeof(Int96Timestamp) + sizeof(int64_t), + sizeof(int32_t)); + values[i] = Timestamp::fromDaysAndNanos(days, nanos); + } + break; + } case thrift::Type::BYTE_ARRAY: { dictionary_.values = AlignedBuffer::allocate(dictionary_.numValues, &pool_); @@ -461,7 +502,6 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { VELOX_UNSUPPORTED( "Parquet type {} not supported for dictionary", parquetType); } - case thrift::Type::INT96: default: VELOX_UNSUPPORTED( "Parquet type {} not supported for dictionary", parquetType); @@ -488,6 +528,8 @@ int32_t parquetTypeBytes(thrift::Type::type type) { case thrift::Type::INT64: case thrift::Type::DOUBLE: return 8; + case thrift::Type::INT96: + return 12; default: VELOX_FAIL("Type does not have a byte width {}", type); } diff --git a/velox/dwio/parquet/reader/ParquetColumnReader.cpp b/velox/dwio/parquet/reader/ParquetColumnReader.cpp index f87631235601..a87a295a787c 100644 --- a/velox/dwio/parquet/reader/ParquetColumnReader.cpp +++ b/velox/dwio/parquet/reader/ParquetColumnReader.cpp @@ -27,6 +27,7 @@ #include "velox/dwio/parquet/reader/RepeatedColumnReader.h" #include "velox/dwio/parquet/reader/StringColumnReader.h" #include "velox/dwio/parquet/reader/StructColumnReader.h" +#include "velox/dwio/parquet/reader/TimestampColumnReader.h" namespace facebook::velox::parquet { @@ -74,6 +75,10 @@ std::unique_ptr ParquetColumnReader::build( return std::make_unique( requestedType, fileType, params, scanSpec); + case TypeKind::TIMESTAMP: + return std::make_unique( + requestedType, fileType, params, scanSpec); + default: VELOX_FAIL( "buildReader unhandled type: " + diff --git a/velox/dwio/parquet/reader/ParquetData.h b/velox/dwio/parquet/reader/ParquetData.h index 15463b0f2fda..360fe4febc8d 100644 --- a/velox/dwio/parquet/reader/ParquetData.h +++ b/velox/dwio/parquet/reader/ParquetData.h @@ -36,17 +36,24 @@ class ParquetParams : public dwio::common::FormatParams { memory::MemoryPool& pool, dwio::common::ColumnReaderStatistics& stats, const FileMetaDataPtr metaData, - const date::time_zone* sessionTimezone) + const date::time_zone* sessionTimezone, + TimestampPrecision timestampPrecision) : FormatParams(pool, stats), metaData_(metaData), - sessionTimezone_(sessionTimezone) {} + sessionTimezone_(sessionTimezone), + timestampPrecision_(timestampPrecision) {} std::unique_ptr toFormatData( const std::shared_ptr& type, const common::ScanSpec& scanSpec) override; + TimestampPrecision timestampPrecision() const { + return timestampPrecision_; + } + private: const FileMetaDataPtr metaData_; const date::time_zone* sessionTimezone_; + const TimestampPrecision timestampPrecision_; }; /// Format-specific data created for each leaf column of a Parquet rowgroup. diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 987cb8d9f260..02eb14829166 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -764,7 +764,8 @@ class ParquetRowReader::Impl { pool_, columnReaderStats_, readerBase_->fileMetaData(), - readerBase->sessionTimezone()); + readerBase->sessionTimezone(), + options_.timestampPrecision()); requestedType_ = options_.requestedType() ? options_.requestedType() : readerBase_->schema(); columnReader_ = ParquetColumnReader::build( diff --git a/velox/dwio/parquet/reader/TimestampColumnReader.h b/velox/dwio/parquet/reader/TimestampColumnReader.h new file mode 100644 index 000000000000..11eb00e24286 --- /dev/null +++ b/velox/dwio/parquet/reader/TimestampColumnReader.h @@ -0,0 +1,85 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/dwio/parquet/reader/IntegerColumnReader.h" +#include "velox/dwio/parquet/reader/ParquetColumnReader.h" + +namespace facebook::velox::parquet { + +class TimestampColumnReader : public IntegerColumnReader { + public: + TimestampColumnReader( + const TypePtr& requestedType, + std::shared_ptr fileType, + ParquetParams& params, + common::ScanSpec& scanSpec) + : IntegerColumnReader(requestedType, fileType, params, scanSpec), + timestampPrecision_(params.timestampPrecision()) {} + + bool hasBulkPath() const override { + return false; + } + + void getValues(RowSet rows, VectorPtr* result) override { + getFlatValues(rows, result, requestedType_); + if (allNull_) { + return; + } + + // Adjust timestamp nanos to the requested precision. + VectorPtr resultVector = *result; + auto rawValues = + resultVector->asUnchecked>()->mutableRawValues(); + for (auto i = 0; i < numValues_; ++i) { + if (resultVector->isNullAt(i)) { + continue; + } + const auto timestamp = rawValues[i]; + uint64_t nanos = timestamp.getNanos(); + switch (timestampPrecision_) { + case TimestampPrecision::kMilliseconds: + nanos = nanos / 1'000'000 * 1'000'000; + break; + case TimestampPrecision::kMicroseconds: + nanos = nanos / 1'000 * 1'000; + break; + case TimestampPrecision::kNanoseconds: + break; + } + rawValues[i] = Timestamp(timestamp.getSeconds(), nanos); + } + } + + void read( + vector_size_t offset, + RowSet rows, + const uint64_t* /*incomingNulls*/) override { + auto& data = formatData_->as(); + // Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length. + prepareRead(offset, rows, nullptr); + readCommon(rows); + readOffset_ += rows.back() + 1; + } + + private: + // The requested precision can be specified from HiveConfig to read timestamp + // from Parquet. + TimestampPrecision timestampPrecision_; +}; + +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/tests/examples/timestamp_int96.parquet b/velox/dwio/parquet/tests/examples/timestamp_int96.parquet new file mode 100644 index 000000000000..ea3a125aab60 Binary files /dev/null and b/velox/dwio/parquet/tests/examples/timestamp_int96.parquet differ diff --git a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp index f94b5c4ca8db..52f53fa02e92 100644 --- a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp +++ b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp @@ -256,6 +256,19 @@ TEST_F(E2EFilterTest, integerDictionary) { 20); } +TEST_F(E2EFilterTest, timestampDictionary) { + options_.dataPageSize = 4 * 1024; + options_.writeInt96AsTimestamp = true; + + testWithTypes( + "timestamp_val_0:timestamp," + "timestamp_val_1:timestamp", + [&]() {}, + true, + {"timestamp_val_0", "timestamp_val_1"}, + 20); +} + TEST_F(E2EFilterTest, floatAndDoubleDirect) { options_.enableDictionary = false; options_.dataPageSize = 4 * 1024; diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index f7a66ae03952..49c7136376f8 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -27,6 +27,7 @@ #include "velox/type/tests/SubfieldFiltersBuilder.h" #include "velox/connectors/hive/HiveConfig.h" +#include "velox/dwio/parquet/writer/Writer.h" using namespace facebook::velox; using namespace facebook::velox::exec; @@ -189,6 +190,33 @@ class ParquetTableScanTest : public HiveConnectorTestBase { infoColumns)[0]; } + // Write data to a parquet file on specified path. + // @param writeInt96AsTimestamp Write timestamp as Int96 if enabled. + void writeToParquetFile( + const std::string& path, + const std::vector& data, + bool writeInt96AsTimestamp) { + VELOX_CHECK_GT(data.size(), 0); + + WriterOptions options; + options.writeInt96AsTimestamp = writeInt96AsTimestamp; + + auto writeFile = std::make_unique(path, true, false); + auto sink = std::make_unique( + std::move(writeFile), path); + auto childPool = + rootPool_->addAggregateChild("ParquetTableScanTest.Writer"); + options.memoryPool = childPool.get(); + + auto writer = std::make_unique( + std::move(sink), options, asRowType(data[0]->type())); + + for (const auto& vector : data) { + writer->write(vector); + } + writer->close(); + } + private: RowTypePtr getRowType(std::vector&& outputColumnNames) const { std::vector types; @@ -674,6 +702,123 @@ TEST_F(ParquetTableScanTest, sessionTimezone) { assertSelectWithTimezone({"a"}, "SELECT a FROM tmp", "Asia/Shanghai"); } +TEST_F(ParquetTableScanTest, timestampFilter) { + // Timestamp-int96.parquet holds one column (t: TIMESTAMP) and + // 10 rows in one row group. Data is in SNAPPY compressed format. + // The values are: + // |t | + // +-------------------+ + // |2015-06-01 19:34:56| + // |2015-06-02 19:34:56| + // |2001-02-03 03:34:06| + // |1998-03-01 08:01:06| + // |2022-12-23 03:56:01| + // |1980-01-24 00:23:07| + // |1999-12-08 13:39:26| + // |2023-04-21 09:09:34| + // |2000-09-12 22:36:29| + // |2007-12-12 04:27:56| + // +-------------------+ + auto vector = makeFlatVector( + {Timestamp(1433187296, 0), + Timestamp(1433273696, 0), + Timestamp(981171246, 0), + Timestamp(888739266, 0), + Timestamp(1671767761, 0), + Timestamp(317521387, 0), + Timestamp(944660366, 0), + Timestamp(1682068174, 0), + Timestamp(968798189, 0), + Timestamp(1197433676, 0)}); + + loadData( + getExampleFilePath("timestamp_int96.parquet"), + ROW({"t"}, {TIMESTAMP()}), + makeRowVector( + {"t"}, + { + vector, + })); + + assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp"); + assertSelectWithFilter( + {"t"}, + {}, + "t < TIMESTAMP '2000-09-12 22:36:29'", + "SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'"); + assertSelectWithFilter( + {"t"}, + {}, + "t <= TIMESTAMP '2000-09-12 22:36:29'", + "SELECT t from tmp where t <= TIMESTAMP '2000-09-12 22:36:29'"); + assertSelectWithFilter( + {"t"}, + {}, + "t > TIMESTAMP '1980-01-24 00:23:07'", + "SELECT t from tmp where t > TIMESTAMP '1980-01-24 00:23:07'"); + assertSelectWithFilter( + {"t"}, + {}, + "t >= TIMESTAMP '1980-01-24 00:23:07'", + "SELECT t from tmp where t >= TIMESTAMP '1980-01-24 00:23:07'"); + assertSelectWithFilter( + {"t"}, + {}, + "t == TIMESTAMP '2022-12-23 03:56:01'", + "SELECT t from tmp where t == TIMESTAMP '2022-12-23 03:56:01'"); +} + +TEST_F(ParquetTableScanTest, timestampPrecisionMicrosecond) { + // Write timestamp data into parquet. + constexpr int kSize = 10; + auto vector = makeRowVector({ + makeFlatVector( + kSize, [](auto i) { return Timestamp(i, i * 1'001'001); }), + }); + auto schema = asRowType(vector->type()); + auto file = TempFilePath::create(); + writeToParquetFile(file->getPath(), {vector}, true); + auto plan = PlanBuilder().tableScan(schema).planNode(); + + // Read timestamp data from parquet with microsecond precision. + CursorParameters params; + std::shared_ptr executor = + std::make_shared( + std::thread::hardware_concurrency()); + std::shared_ptr queryCtx = + core::QueryCtx::create(executor.get()); + std::unordered_map session = { + {std::string(connector::hive::HiveConfig::kReadTimestampUnitSession), + "6"}}; + queryCtx->setConnectorSessionOverridesUnsafe( + kHiveConnectorId, std::move(session)); + params.queryCtx = queryCtx; + params.planNode = plan; + const int numSplitsPerFile = 1; + + bool noMoreSplits = false; + auto addSplits = [&](exec::Task* task) { + if (!noMoreSplits) { + auto const splits = HiveConnectorTestBase::makeHiveConnectorSplits( + {file->getPath()}, + numSplitsPerFile, + dwio::common::FileFormat::PARQUET); + for (const auto& split : splits) { + task->addSplit("0", exec::Split(split)); + } + task->noMoreSplits("0"); + } + noMoreSplits = true; + }; + auto result = readCursor(params, addSplits); + ASSERT_TRUE(waitForTaskCompletion(result.first->task().get())); + auto expected = makeRowVector({ + makeFlatVector( + kSize, [](auto i) { return Timestamp(i, i * 1'001'000); }), + }); + assertEqualResults({expected}, result.second); +} + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); folly::Init init{&argc, &argv, false}; diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index 56f686126a11..ec36a3749a1e 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -240,6 +240,7 @@ Writer::Writer( arrowContext_->properties = getArrowParquetWriterOptions(options, flushPolicy_); setMemoryReclaimers(); + writeInt96AsTimestamp_ = options.writeInt96AsTimestamp; } Writer::Writer( @@ -257,7 +258,11 @@ Writer::Writer( void Writer::flush() { if (arrowContext_->stagingRows > 0) { if (!arrowContext_->writer) { - auto arrowProperties = ArrowWriterProperties::Builder().build(); + ArrowWriterProperties::Builder builder; + if (writeInt96AsTimestamp_) { + builder.enable_deprecated_int96_timestamps(); + } + auto arrowProperties = builder.build(); PARQUET_ASSIGN_OR_THROW( arrowContext_->writer, FileWriter::Open( diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 98e735af9b88..97b2d1d52f9f 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -105,6 +105,7 @@ struct WriterOptions { columnCompressionsMap; uint8_t parquetWriteTimestampUnit = static_cast(TimestampUnit::kNano); + bool writeInt96AsTimestamp = false; }; // Writes Velox vectors into a DataSink using Arrow Parquet writer. @@ -163,6 +164,9 @@ class Writer : public dwio::common::Writer { const RowTypePtr schema_; ArrowOptions options_{.flattenDictionary = true, .flattenConstant = true}; + + // Whether to write Int96 timestamps in Arrow Parquet write. + bool writeInt96AsTimestamp_; }; class ParquetWriterFactory : public dwio::common::WriterFactory { diff --git a/velox/type/Filter.h b/velox/type/Filter.h index 961807358516..b5e573902444 100644 --- a/velox/type/Filter.h +++ b/velox/type/Filter.h @@ -557,6 +557,10 @@ class IsNotNull final : public Filter { return true; } + bool testInt128(int128_t /* unused */) const final { + return true; + } + bool testInt64Range(int64_t /*min*/, int64_t /*max*/, bool /*hasNull*/) const final { return true; @@ -1816,6 +1820,11 @@ class TimestampRange final : public Filter { nullAllowed_ ? "with nulls" : "no nulls"); } + bool testInt128(int128_t value) const final { + const auto& ts = reinterpret_cast(value); + return ts >= lower_ && ts <= upper_; + } + bool testTimestamp(Timestamp value) const override { return value >= lower_ && value <= upper_; } diff --git a/velox/type/Timestamp.cpp b/velox/type/Timestamp.cpp index 4a6aea4a960c..9ac0835fe4cb 100644 --- a/velox/type/Timestamp.cpp +++ b/velox/type/Timestamp.cpp @@ -36,6 +36,18 @@ inline int64_t getPrestoTZOffsetInSeconds(int16_t tzID) { } // namespace +// static +Timestamp Timestamp::fromDaysAndNanos(int32_t days, int64_t nanos) { + int64_t seconds = + (days - kJulianToUnixEpochDays) * kSecondsInDay + nanos / kNanosInSecond; + int64_t remainingNanos = nanos % kNanosInSecond; + if (remainingNanos < 0) { + remainingNanos += kNanosInSecond; + seconds--; + } + return Timestamp(seconds, remainingNanos); +} + // static Timestamp Timestamp::now() { auto now = std::chrono::system_clock::now(); diff --git a/velox/type/Timestamp.h b/velox/type/Timestamp.h index 9c06c907779a..29bdc0b526f8 100644 --- a/velox/type/Timestamp.h +++ b/velox/type/Timestamp.h @@ -83,6 +83,11 @@ struct Timestamp { static constexpr int64_t kMicrosecondsInMillisecond = 1'000; static constexpr int64_t kNanosecondsInMicrosecond = 1'000; static constexpr int64_t kNanosecondsInMillisecond = 1'000'000; + static constexpr int64_t kNanosInSecond = + kNanosecondsInMillisecond * kMillisecondsInSecond; + // The number of days between the Julian epoch and the Unix epoch. + static constexpr int64_t kJulianToUnixEpochDays = 2440588LL; + static constexpr int64_t kSecondsInDay = 86400LL; // Limit the range of seconds to avoid some problems. Seconds should be // in the range [INT64_MIN/1000 - 1, INT64_MAX/1000]. @@ -108,6 +113,10 @@ struct Timestamp { VELOX_USER_DCHECK_LE(nanos, kMaxNanos, "Timestamp nanos out of range"); } + /// Creates a timestamp from the number of days since the Julian epoch + /// and the number of nanoseconds. + static Timestamp fromDaysAndNanos(int32_t days, int64_t nanos); + // Returns the current unix timestamp (ms precision). static Timestamp now(); diff --git a/velox/type/tests/TimestampTest.cpp b/velox/type/tests/TimestampTest.cpp index 91f70bce7f10..759afe9d3910 100644 --- a/velox/type/tests/TimestampTest.cpp +++ b/velox/type/tests/TimestampTest.cpp @@ -36,6 +36,30 @@ std::string timestampToString( return result; } +TEST(TimestampTest, fromDaysAndNanos) { + EXPECT_EQ( + Timestamp(Timestamp::kSecondsInDay + 2, 1), + Timestamp::fromDaysAndNanos( + Timestamp::kJulianToUnixEpochDays + 1, + 2 * Timestamp::kNanosInSecond + 1)); + EXPECT_EQ( + Timestamp(Timestamp::kSecondsInDay + 2, 0), + Timestamp::fromDaysAndNanos( + Timestamp::kJulianToUnixEpochDays + 1, + 2 * Timestamp::kNanosInSecond)); + EXPECT_EQ( + Timestamp( + Timestamp::kSecondsInDay * 5 - 3, Timestamp::kNanosInSecond - 6), + Timestamp::fromDaysAndNanos( + Timestamp::kJulianToUnixEpochDays + 5, + -2 * Timestamp::kNanosInSecond - 6)); + EXPECT_EQ( + Timestamp(Timestamp::kSecondsInDay * 5 - 2, 0), + Timestamp::fromDaysAndNanos( + Timestamp::kJulianToUnixEpochDays + 5, + -2 * Timestamp::kNanosInSecond)); +} + TEST(TimestampTest, fromMillisAndMicros) { int64_t positiveSecond = 10'000; int64_t negativeSecond = -10'000;