Skip to content

Commit 1e8c091

Browse files
[CsvIO]: add Coder and FromRowFn to CsvIOParseConfiguration class. (#31989)
1 parent b9a0c2b commit 1e8c091

File tree

3 files changed

+26
-13
lines changed

3 files changed

+26
-13
lines changed

sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParse.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@
3232
class CsvIOParse<T> extends PTransform<PCollection<String>, PCollection<T>> {
3333

3434
/** Stores required parameters for parsing. */
35-
private final CsvIOParseConfiguration.Builder configBuilder;
35+
private final CsvIOParseConfiguration.Builder<T> configBuilder;
3636

37-
CsvIOParse(CsvIOParseConfiguration.Builder configBuilder) {
37+
CsvIOParse(CsvIOParseConfiguration.Builder<T> configBuilder) {
3838
this.configBuilder = configBuilder;
3939
}
4040

sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,31 @@
1818
package org.apache.beam.sdk.io.csv;
1919

2020
import com.google.auto.value.AutoValue;
21+
import java.io.Serializable;
2122
import java.util.HashMap;
2223
import java.util.Map;
2324
import java.util.Optional;
25+
import org.apache.beam.sdk.coders.Coder;
2426
import org.apache.beam.sdk.schemas.Schema;
2527
import org.apache.beam.sdk.transforms.DoFn;
2628
import org.apache.beam.sdk.transforms.PTransform;
2729
import org.apache.beam.sdk.transforms.ParDo;
2830
import org.apache.beam.sdk.transforms.SerializableFunction;
2931
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
3032
import org.apache.beam.sdk.values.PCollection;
33+
import org.apache.beam.sdk.values.Row;
3134
import org.apache.commons.csv.CSVFormat;
3235

3336
/** Stores parameters needed for CSV record parsing. */
3437
@AutoValue
35-
abstract class CsvIOParseConfiguration {
38+
abstract class CsvIOParseConfiguration<T> implements Serializable {
3639

3740
/** A Dead Letter Queue that returns potential errors with {@link BadRecord}. */
3841
final PTransform<PCollection<BadRecord>, PCollection<BadRecord>> errorHandlerTransform =
3942
new BadRecordOutput();
4043

41-
static Builder builder() {
42-
return new AutoValue_CsvIOParseConfiguration.Builder();
44+
static <T> Builder<T> builder() {
45+
return new AutoValue_CsvIOParseConfiguration.Builder<>();
4346
}
4447

4548
/** The expected {@link CSVFormat} of the parsed CSV record. */
@@ -51,20 +54,30 @@ static Builder builder() {
5154
/** A map of the {@link Schema.Field#getName()} to the custom CSV processing lambda. */
5255
abstract Map<String, SerializableFunction<String, Object>> getCustomProcessingMap();
5356

57+
/** The expected {@link Coder} of the target type. */
58+
abstract Coder<T> getCoder();
59+
60+
/** A {@link SerializableFunction} that converts from Row to the target type. */
61+
abstract SerializableFunction<Row, T> getFromRowFn();
62+
5463
@AutoValue.Builder
55-
abstract static class Builder {
56-
abstract Builder setCsvFormat(CSVFormat csvFormat);
64+
abstract static class Builder<T> implements Serializable {
65+
abstract Builder<T> setCsvFormat(CSVFormat csvFormat);
5766

58-
abstract Builder setSchema(Schema schema);
67+
abstract Builder<T> setSchema(Schema schema);
5968

60-
abstract Builder setCustomProcessingMap(
69+
abstract Builder<T> setCustomProcessingMap(
6170
Map<String, SerializableFunction<String, Object>> customProcessingMap);
6271

72+
abstract Builder<T> setCoder(Coder<T> coder);
73+
74+
abstract Builder<T> setFromRowFn(SerializableFunction<Row, T> fromRowFn);
75+
6376
abstract Optional<Map<String, SerializableFunction<String, Object>>> getCustomProcessingMap();
6477

65-
abstract CsvIOParseConfiguration autoBuild();
78+
abstract CsvIOParseConfiguration<T> autoBuild();
6679

67-
final CsvIOParseConfiguration build() {
80+
final CsvIOParseConfiguration<T> build() {
6881
if (!getCustomProcessingMap().isPresent()) {
6982
setCustomProcessingMap(new HashMap<>());
7083
}

sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOReadFiles.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@
3131
// dependencies are completed.
3232
class CsvIOReadFiles<T> extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> {
3333
/** Stores required parameters for parsing. */
34-
private final CsvIOParseConfiguration.Builder configBuilder;
34+
private final CsvIOParseConfiguration.Builder<T> configBuilder;
3535

36-
CsvIOReadFiles(CsvIOParseConfiguration.Builder configBuilder) {
36+
CsvIOReadFiles(CsvIOParseConfiguration.Builder<T> configBuilder) {
3737
this.configBuilder = configBuilder;
3838
}
3939

0 commit comments

Comments
 (0)