Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add row group serialization #506

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/release-history.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
- Nullable `TimeSpan` support in `ParquetSerializer` by @cliedeman in #409.
- `DataFrame` support for `int16/uint16` types by @asmirnov82 in #469.
- Dropping build targets for .NET Core 3.1 and .NET 7.0 (STS). This should not affect anyone as .NET 6 and 8 are the LTS versions now.
- Upgraded to latest IronCompress dependency.
- Added convenience methods to serialize/deserialize collections into a single row group in #506 by @piiertho.
- Upgraded to latest IronCompress dependency.

### Bug fixes

- loop will read past the end of a block #487 by @alex-harper.
- decimal scale condition check fixed in #504 by @sierzput.

### Parquet Floor
- telemetry agreement made more clear to understand.
- telemetry agreement changed and made clearer to understand.

## 4.23.5

Expand Down
130 changes: 115 additions & 15 deletions src/Parquet/Serialization/ParquetSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@ private static async Task SerializeRowGroupAsync(ParquetWriter writer,
}
}

/// <summary>
/// Serialize a collection into one row group using an existing writer.
/// </summary>
/// <param name="writer"></param>
/// <param name="objectInstances"></param>
/// <param name="cancellationToken"></param>
/// <typeparam name="T"></typeparam>
public static async Task SerializeRowGroupAsync<T>(ParquetWriter writer, IEnumerable<T> objectInstances,
CancellationToken cancellationToken) {
object boxedStriper = _typeToStriper.GetOrAdd(typeof(T), _ => new Striper<T>(typeof(T).GetParquetSchema(false)));
var striper = (Striper<T>)boxedStriper;

await SerializeRowGroupAsync(writer, striper, objectInstances, cancellationToken);
}

/// <summary>
/// Serialize
/// </summary>
Expand Down Expand Up @@ -187,6 +202,45 @@ public static async Task<IList<T>> DeserializeAsync<T>(Stream source,
return result;
}

/// <summary>
/// Deserialize row group into provided list
/// </summary>
/// <param name="source"></param>
/// <param name="rowGroupIndex"></param>
/// <param name="result"></param>
/// <param name="options"></param>
/// <param name="cancellationToken"></param>
/// <param name="resultsAlreadyAllocated">Set to true if provided result collection already contains allocated elements (like using a pool)</param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
/// <exception cref="ArgumentException"></exception>
public static async Task DeserializeAsync<T>(Stream source, int rowGroupIndex, IList<T> result,
ParquetOptions? options = null, CancellationToken cancellationToken = default,
bool resultsAlreadyAllocated = false) where T : new() {

using ParquetReader reader =
await ParquetReader.CreateAsync(source, options, cancellationToken: cancellationToken);

await DeserializeAsync(reader, rowGroupIndex, result, cancellationToken, resultsAlreadyAllocated);
}

/// <summary>
/// Deserialize row group into provided list, using provided Parquet reader
/// Useful to iterate over row group using a single parquet reader
/// </summary>
/// <param name="reader"></param>
/// <param name="rowGroupIndex"></param>
/// <param name="result"></param>
/// <param name="cancellationToken"></param>
/// <param name="resultsAlreadyAllocated">Set to true if provided result collection already contains allocated elements (like using a pool)</param>
/// <typeparam name="T"></typeparam>
public static async Task DeserializeAsync<T>(ParquetReader reader, int rowGroupIndex, IList<T> result,
CancellationToken cancellationToken = default, bool resultsAlreadyAllocated = false) where T : new() {
Assembler<T> asm = GetAssembler<T>();
await DeserializeRowGroupAsync(reader, rowGroupIndex, asm, result, cancellationToken,
resultsAlreadyAllocated);
}

/// <summary>
/// Deserialize a specific row group from a local file.
/// </summary>
Expand Down Expand Up @@ -306,6 +360,33 @@ public static async IAsyncEnumerable<T> DeserializeAllAsync<T>(Stream source,
result.Clear();
}
}

/// <summary>
/// Deserialize row group per row group as IAsyncEnumerable
/// </summary>
/// <param name="source"></param>
/// <param name="options"></param>
/// <param name="cancellationToken"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static async IAsyncEnumerable<IList<T>> DeserializeAllByGroupsAsync<T>(Stream source,
ParquetOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
where T : new()
{
Assembler<T> asm = GetAssembler<T>();

using ParquetReader reader = await ParquetReader.CreateAsync(source, options, cancellationToken: cancellationToken);

for (int rgi = 0; rgi < reader.RowGroupCount; rgi++)
{
ParquetRowGroupReader rowGroupReader = reader.OpenRowGroupReader(rgi);
List<T> result = GetList<T>(rowGroupReader.RowCount);
await DeserializeRowGroupAsync(rowGroupReader, reader.Schema, asm, result, cancellationToken);
yield return result;
}
}


/// <summary>
/// Deserialise
Expand All @@ -321,16 +402,33 @@ public static async Task<IList<T>> DeserializeAsync<T>(ParquetRowGroupReader row
ParquetSchema schema,
CancellationToken cancellationToken = default)
where T : new() {

Assembler<T> asm = GetAssembler<T>();

List<T> result = GetList<T>(rowGroupReader.RowGroup.NumRows);

await DeserializeRowGroupAsync(rowGroupReader, schema, asm, result, cancellationToken);
await DeserializeRowGroupAsync(rowGroupReader, schema, result, cancellationToken);

return result;
}

/// <summary>
/// Deserialize a single row group into a provided collection.
/// </summary>
/// <param name="rowGroupReader"></param>
/// <param name="schema"></param>
/// <param name="result"></param>
/// <param name="cancellationToken"></param>
/// <param name="resultsAlreadyAllocated">Set to true if provided result collection already contains allocated elements (like using a pool)</param>
/// <typeparam name="T"></typeparam>
public static async Task DeserializeRowGroupAsync<T>(ParquetRowGroupReader rowGroupReader,
ParquetSchema schema,
ICollection<T> result,
CancellationToken cancellationToken = default,
bool resultsAlreadyAllocated = false) where T : new() {
Assembler<T> asm = GetAssembler<T>();

await DeserializeRowGroupAsync(rowGroupReader, schema, asm, result, cancellationToken,
resultsAlreadyAllocated);
}

private static Assembler<T> GetAssembler<T>() where T : new() {
object boxedAssembler = _typeToAssembler.GetOrAdd(typeof(T), _ => new Assembler<T>(typeof(T).GetParquetSchema(true)));
return (Assembler<T>)boxedAssembler;
Expand All @@ -341,14 +439,13 @@ private static Assembler<Dictionary<string, object>> GetAssembler(ParquetSchema
return (Assembler<Dictionary<string, object>>)boxedAssembler;
}

private static async Task DeserializeRowGroupAsync<T>(ParquetReader reader, int rgi,
Assembler<T> asm,
ICollection<T> result,
CancellationToken cancellationToken) where T : new() {
private static async Task DeserializeRowGroupAsync<T>(ParquetReader reader, int rgi, Assembler<T> asm,
ICollection<T> result, CancellationToken cancellationToken, bool resultsAlreadyAllocated = false)
where T : new() {

using ParquetRowGroupReader rg = reader.OpenRowGroupReader(rgi);

await DeserializeRowGroupAsync(rg, reader.Schema, asm, result, cancellationToken);
await DeserializeRowGroupAsync(rg, reader.Schema, asm, result, cancellationToken, resultsAlreadyAllocated);
}

private static async Task DeserializeRowGroupAsync(ParquetReader reader, int rgi,
Expand All @@ -363,14 +460,17 @@ private static async Task DeserializeRowGroupAsync<T>(ParquetRowGroupReader rg,
ParquetSchema schema,
Assembler<T> asm,
ICollection<T> result,
CancellationToken cancellationToken = default) where T : new() {
CancellationToken cancellationToken = default,
bool resultsAlreadyAllocated = false) where T : new() {

// add more empty class instances to the result
int prevRowCount = result.Count;
for(int i = 0; i < rg.RowCount; i++) {
var ne = new T();
result.Add(ne);
}

if(!resultsAlreadyAllocated)
for(int i = 0; i < rg.RowCount; i++) {
var ne = new T();
result.Add(ne);
}

foreach(FieldAssembler<T> fasm in asm.FieldAssemblers) {

Expand All @@ -388,7 +488,7 @@ private static async Task DeserializeRowGroupAsync<T>(ParquetRowGroupReader rg,
DataColumn dc = await rg.ReadColumnAsync(fasm.Field, cancellationToken);

try {
fasm.Assemble(result.Skip(prevRowCount), dc);
fasm.Assemble(resultsAlreadyAllocated ? result : result.Skip(prevRowCount), dc);
} catch(Exception ex) {
throw new InvalidOperationException($"failed to deserialize column '{fasm.Field.Path}', pseudo code: ['{fasm.IterationExpression.GetPseudoCode()}']", ex);
}
Expand Down
Loading