Skip to content

Commit

Permalink
[CsvIO]: add CsvIORecordToObjects Class (#32006)
Browse files Browse the repository at this point in the history
* [CsvIO] add CsvIORecordToObjects class.

Co-authored-by: Lahari Guduru <[email protected]>

* Restart checks

* Restart checks again

Co-authored-by: Lahari Guduru <[email protected]>

---------

Co-authored-by: Lahari Guduru <[email protected]>
  • Loading branch information
francisohara24 and lahariguduru authored Jul 29, 2024
1 parent ffae5b5 commit 3bdf702
Show file tree
Hide file tree
Showing 5 changed files with 504 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public static DoublyNestedDataTypes doublyNestedDataTypes(
.build();
}

private static final TypeDescriptor<AllPrimitiveDataTypes>
public static final TypeDescriptor<AllPrimitiveDataTypes>
ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR = TypeDescriptor.of(AllPrimitiveDataTypes.class);

/** The schema for {@link AllPrimitiveDataTypes}. */
Expand All @@ -160,7 +160,7 @@ public static SerializableFunction<Row, AllPrimitiveDataTypes> allPrimitiveDataT
return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
}

private static final TypeDescriptor<NullableAllPrimitiveDataTypes>
public static final TypeDescriptor<NullableAllPrimitiveDataTypes>
NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR =
TypeDescriptor.of(NullableAllPrimitiveDataTypes.class);

Expand All @@ -187,7 +187,7 @@ public static SerializableFunction<Row, AllPrimitiveDataTypes> allPrimitiveDataT
NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
}

private static final TypeDescriptor<TimeContaining> TIME_CONTAINING_TYPE_DESCRIPTOR =
public static final TypeDescriptor<TimeContaining> TIME_CONTAINING_TYPE_DESCRIPTOR =
TypeDescriptor.of(TimeContaining.class);

/** The schema for {@link TimeContaining}. */
Expand Down Expand Up @@ -250,7 +250,7 @@ public static SerializableFunction<Row, ByteSequenceType> byteSequenceTypeFromRo
return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(BYTE_SEQUENCE_TYPE_TYPE_DESCRIPTOR);
}

private static final TypeDescriptor<ArrayPrimitiveDataTypes>
public static final TypeDescriptor<ArrayPrimitiveDataTypes>
ARRAY_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR = TypeDescriptor.of(ArrayPrimitiveDataTypes.class);

/** The schema for {@link ArrayPrimitiveDataTypes}. */
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.csv;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;

/**
* {@link CsvIORecordToObjects} is a class that takes an input of {@link PCollection<List<String>>}
* and outputs custom type {@link PCollection<T>}.
*/
class CsvIORecordToObjects<T> extends PTransform<PCollection<List<String>>, PCollection<T>> {

/** The expected {@link Schema} of the target type. */
private final Schema schema;

/** A map of the {@link Schema.Field#getName()} to the custom CSV processing lambda. */
private final Map<String, SerializableFunction<String, Object>> customProcessingMap;

/** A {@link Map} of {@link Schema.Field}s to their expected positions within the CSV record. */
private final Map<Integer, Schema.Field> indexToFieldMap;

/**
* A {@link SerializableFunction} that converts from {@link Row} to {@link Schema} mapped custom
* type.
*/
private final SerializableFunction<Row, T> fromRowFn;

/** The expected coder of target type. */
private final Coder<T> coder;

CsvIORecordToObjects(CsvIOParseConfiguration<T> configuration) {
this.schema = configuration.getSchema();
this.customProcessingMap = configuration.getCustomProcessingMap();
this.indexToFieldMap =
CsvIOParseHelpers.mapFieldPositions(configuration.getCsvFormat(), schema);
this.fromRowFn = configuration.getFromRowFn();
this.coder = configuration.getCoder();
}

@Override
public PCollection<T> expand(PCollection<List<String>> input) {
return input.apply(ParDo.of(new RecordToObjectsFn())).setCoder(coder);
}

private class RecordToObjectsFn extends DoFn<List<String>, T> {
@ProcessElement
public void process(@Element List<String> record, OutputReceiver<T> receiver) {
Map<String, Object> fieldNamesToValues = new HashMap<>();
for (Map.Entry<Integer, Schema.Field> entry : indexToFieldMap.entrySet()) {
Schema.Field field = entry.getValue();
int index = entry.getKey();
String cell = record.get(index);
Object value = parseCell(cell, field);
fieldNamesToValues.put(field.getName(), value);
}
Row row = Row.withSchema(schema).withFieldValues(fieldNamesToValues).build();
receiver.output(fromRowFn.apply(row));
}
}

/** Parses cell to emit the value, as well as potential errors with filename. */
Object parseCell(String cell, Schema.Field field) {
if (cell == null) {
if (!field.getType().getNullable()) {
throw new IllegalArgumentException(
"Required org.apache.beam.sdk.schemas.Schema field "
+ field.getName()
+ " has null value");
}
return cell;
}
if (customProcessingMap.containsKey(field.getName())) {
return customProcessingMap.get(field.getName()).apply(cell);
}
return CsvIOParseHelpers.parseCell(cell, field);
}
}

This file was deleted.

Loading

0 comments on commit 3bdf702

Please sign in to comment.