Skip to content

Commit

Permalink
[Kernel][Writes] Add support for writing data file stats
Browse files Browse the repository at this point in the history
  • Loading branch information
raveeram-db committed Feb 14, 2025
1 parent 5aee635 commit 2594130
Show file tree
Hide file tree
Showing 14 changed files with 592 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ public String toString() {
return "column(" + quoteColumnPath(names) + ")";
}

/**
* Returns a new column that appends the input column name to the current column. Corresponds to
* an additional level of nested reference.
*
* @param name the column name to append
* @return the new column
*/
public Column append(String name) {
String[] newNames = new String[names.length + 1];
System.arraycopy(names, 0, newNames, 0, names.length);
newNames[names.length] = name;
return new Column(newNames);
}

private static String quoteColumnPath(String[] names) {
return Arrays.stream(names)
.map(s -> format("`%s`", s.replace("`", "``")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,20 @@
import static io.delta.kernel.internal.util.Preconditions.checkState;
import static io.delta.kernel.internal.util.Utils.toCloseableIterator;

import io.delta.kernel.*;
import io.delta.kernel.Meta;
import io.delta.kernel.Operation;
import io.delta.kernel.Transaction;
import io.delta.kernel.TransactionCommitResult;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.ConcurrentWriteException;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.hook.PostCommitHook;
import io.delta.kernel.internal.actions.*;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.actions.SetTransaction;
import io.delta.kernel.internal.data.TransactionStateRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.hook.CheckpointHook;
Expand All @@ -37,8 +44,14 @@
import io.delta.kernel.internal.replay.ConflictChecker;
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
import io.delta.kernel.internal.rowtracking.RowTracking;
import io.delta.kernel.internal.skipping.DataSkippingUtils;
import io.delta.kernel.internal.tablefeatures.TableFeatures;
import io.delta.kernel.internal.util.*;
import io.delta.kernel.internal.util.Clock;
import io.delta.kernel.internal.util.ColumnMapping;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.InCommitTimestampUtils;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.metrics.TransactionReport;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterable;
Expand Down Expand Up @@ -446,6 +459,23 @@ private void recordTransactionReport(
*/
public static List<Column> getStatisticsColumns(Engine engine, Row transactionState) {
// TODO: implement this once we start supporting collecting stats
return Collections.emptyList();
int numIndexedCols =
Integer.parseInt(
TransactionStateRow.getConfiguration(transactionState)
.getOrDefault(
DataSkippingUtils.DATA_SKIPPING_NUM_INDEXED_COLS,
String.valueOf(DataSkippingUtils.DEFAULT_DATA_SKIPPING_NUM_INDEXED_COLS)));

// Get the list of partition columns to exclude
Set<String> partitionColumns =
TransactionStateRow.getPartitionColumnsList(transactionState).stream()
.collect(Collectors.toSet());

// For now, only support the first numIndexedCols columns
return TransactionStateRow.getLogicalSchema(engine, transactionState).fields().stream()
.filter(p -> !partitionColumns.contains(p.getName()))
.limit(numIndexedCols)
.map(field -> new Column(field.getName()))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.statistics.DataFileStatistics;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.DataFileStatistics;
import io.delta.kernel.utils.DataFileStatus;
import java.net.URI;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import io.delta.kernel.internal.types.DataTypeJsonSerDe;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.types.*;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

public class TransactionStateRow extends GenericRow {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@

public class DataSkippingUtils {

public static final String DATA_SKIPPING_NUM_INDEXED_COLS = "delta.dataSkippingNumIndexedCols";
public static final int DEFAULT_DATA_SKIPPING_NUM_INDEXED_COLS = 32;

/**
* Given a {@code FilteredColumnarBatch} of scan files and the statistics schema to parse, return
* the parsed JSON stats from the scan files.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.statistics;

import static java.time.ZoneOffset.UTC;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
import static java.time.temporal.ChronoUnit.MILLIS;

import com.fasterxml.jackson.core.JsonGenerator;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.internal.util.JsonUtils;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.JsonUtil;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

/** Statistics about data file in a Delta Lake table. */
public class DataFileStatistics {
private StructType dataSchema;
private final long numRecords;
private final Map<Column, Literal> minValues;
private final Map<Column, Literal> maxValues;
private final Map<Column, Long> nullCounts;

public static final int MICROSECONDS_PER_SECOND = 1_000_000;
public static final int NANOSECONDS_PER_MICROSECOND = 1_000;

private static final DateTimeFormatter TIMESTAMP_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX");

/**
* Create a new instance of {@link DataFileStatistics}.
*
* @param dataSchema Schema of the data file.
* @param numRecords Number of records in the data file.
* @param minValues Map of column to minimum value of it in the data file. If the data file has
* all nulls for the column, the value will be null or not present in the map.
* @param maxValues Map of column to maximum value of it in the data file. If the data file has
* all nulls for the column, the value will be null or not present in the map.
* @param nullCounts Map of column to number of nulls in the data file.
*/
public DataFileStatistics(
StructType dataSchema,
long numRecords,
Map<Column, Literal> minValues,
Map<Column, Literal> maxValues,
Map<Column, Long> nullCounts) {
this.dataSchema = dataSchema;
this.numRecords = numRecords;
this.minValues = Collections.unmodifiableMap(minValues);
this.maxValues = Collections.unmodifiableMap(maxValues);
this.nullCounts = Collections.unmodifiableMap(nullCounts);
}

/**
* Get the number of records in the data file.
*
* @return Number of records in the data file.
*/
public long getNumRecords() {
return numRecords;
}

/**
* Get the minimum values of the columns in the data file. The map may contain statistics for only
* a subset of columns in the data file.
*
* @return Map of column to minimum value of it in the data file.
*/
public Map<Column, Literal> getMinValues() {
return minValues;
}

/**
* Get the maximum values of the columns in the data file. The map may contain statistics for only
* a subset of columns in the data file.
*
* @return Map of column to minimum value of it in the data file.
*/
public Map<Column, Literal> getMaxValues() {
return maxValues;
}

/**
* Get the number of nulls of columns in the data file. The map may contain statistics for only a
* subset of columns in the data file.
*
* @return Map of column to number of nulls in the data file.
*/
public Map<Column, Long> getNullCounts() {
return nullCounts;
}

public String serializeAsJson() {
return JsonUtil.generate(
gen -> {
gen.writeStartObject();
gen.writeNumberField("numRecords", numRecords);

// Only write detailed statistics if dataSchema is available
if (dataSchema != null) {
gen.writeObjectFieldStart("minValues");

writeJsonValues(
gen,
dataSchema,
minValues,
new Column(new String[0]),
(g, v) -> writeJsonValue(g, v));
gen.writeEndObject();

gen.writeObjectFieldStart("maxValues");
writeJsonValues(
gen,
dataSchema,
maxValues,
new Column(new String[0]),
(g, v) -> writeJsonValue(g, v));
gen.writeEndObject();

gen.writeObjectFieldStart("nullCounts");
writeJsonValues(
gen, dataSchema, nullCounts, new Column(new String[0]), (g, v) -> g.writeNumber(v));
gen.writeEndObject();
}

gen.writeEndObject();
});
}

private <T> void writeJsonValues(
JsonGenerator generator,
StructType schema,
Map<Column, T> values,
Column parentColPath,
JsonUtil.JsonValueWriter<T> writer)
throws IOException {
if (schema == null) {
return;
}
for (StructField field : schema.fields()) {
Column colPath = parentColPath.append(field.getName());
if (field.getDataType() instanceof StructType) {
generator.writeObjectFieldStart(field.getName());
writeJsonValues(generator, (StructType) field.getDataType(), values, colPath, writer);
generator.writeEndObject();
} else {
T value = values.get(colPath);
if (value != null) {
generator.writeFieldName(field.getName());
writer.write(generator, value);
}
}
}
}

private void writeJsonValue(JsonGenerator generator, Literal literal) throws IOException {
if (literal == null || literal.getValue() == null) {
return;
}
DataType type = literal.getDataType();
Object value = literal.getValue();
if (type instanceof BooleanType) {
generator.writeBoolean((Boolean) value);
} else if (type instanceof ByteType) {
generator.writeNumber(((Number) value).byteValue());
} else if (type instanceof ShortType) {
generator.writeNumber(((Number) value).shortValue());
} else if (type instanceof IntegerType) {
generator.writeNumber(((Number) value).intValue());
} else if (type instanceof LongType) {
generator.writeNumber(((Number) value).longValue());
} else if (type instanceof FloatType) {
generator.writeNumber(((Number) value).floatValue());
} else if (type instanceof DoubleType) {
generator.writeNumber(((Number) value).doubleValue());
} else if (type instanceof StringType) {
generator.writeString((String) value);
} else if (type instanceof BinaryType) {
generator.writeString(new String((byte[]) value, StandardCharsets.UTF_8));
} else if (type instanceof DecimalType) {
generator.writeNumber((BigDecimal) value);
} else if (type instanceof DateType) {
generator.writeString(
LocalDate.ofEpochDay(((Number) value).longValue()).format(ISO_LOCAL_DATE));
} else if (type instanceof TimestampType || type instanceof TimestampNTZType) {
long epochMicros = (long) value;
long epochSeconds = epochMicros / MICROSECONDS_PER_SECOND;
int nanoAdjustment =
(int) (epochMicros % MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND;
if (nanoAdjustment < 0) {
nanoAdjustment += MICROSECONDS_PER_SECOND * NANOSECONDS_PER_MICROSECOND;
}
Instant instant = Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
generator.writeString(
TIMESTAMP_FORMATTER.format(ZonedDateTime.ofInstant(instant.truncatedTo(MILLIS), UTC)));
} else {
throw new IllegalArgumentException("Unsupported stats data type: " + type);
}
}

@Override
public String toString() {
return serializeAsJson();
}

/**
* Utility method to deserialize statistics from a JSON string. For now only the number of records
* is deserialized, the rest of the statistics are not supported yet.
*
* @param json Data statistics JSON string to deserialize.
* @return An {@link Optional} containing the deserialized {@link DataFileStatistics} if present.
*/
public static Optional<DataFileStatistics> deserializeFromJson(String json) {
Map<String, String> keyValueMap = JsonUtils.parseJSONKeyValueMap(json);

// For now just deserialize the number of records
String numRecordsStr = keyValueMap.get("numRecords");
if (numRecordsStr == null) {
return Optional.empty();
}
long numRecords = Long.parseLong(numRecordsStr);
// TODO: Add support for inferring the statsSchema/dataSchema later.
DataFileStatistics stats =
new DataFileStatistics(
null,
numRecords,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap());
return Optional.of(stats);
}
}
Loading

0 comments on commit 2594130

Please sign in to comment.