Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support conversion of RFC4180 CSV #1514

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion v2/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<artifactId>common</artifactId>

<properties>
<commons.version>1.8</commons.version>
<commons.version>1.10.0</commons.version>
<commons-text.version>1.10.0</commons-text.version>
<nashorn.version>15.4</nashorn.version>
<truth-proto-extension.version>1.0.1</truth-proto-extension.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -641,7 +641,7 @@
this.linesTag = linesTag;
this.csvFormat = getCsvFormat(csvFormat, delimiter);
this.fileEncoding = fileEncoding;
this.delimiter = String.valueOf(this.csvFormat.getDelimiter());
this.delimiter = this.csvFormat.getDelimiterString();
}

@ProcessElement
Expand All @@ -651,7 +651,7 @@
try {
BufferedReader bufferedReader =
new BufferedReader(
Channels.newReader(filePath.open(), Charset.forName(this.fileEncoding).name()));
Channels.newReader(filePath.open(), Charset.forName(this.fileEncoding)));
CSVParser parser =
CSVParser.parse(bufferedReader, this.csvFormat.withFirstRecordAsHeader());
outputReceiver
Expand All @@ -675,17 +675,17 @@
*/
public static class StringToGenericRecordFn extends DoFn<String, GenericRecord> {
private String serializedSchema;
private final String delimiter;
private CSVFormat csvFormat = CSVFormat.DEFAULT;
private Schema schema;
private boolean logDetailedCsvConversionErrors = false;

public StringToGenericRecordFn(String schemaLocation, String delimiter) {
withSchemaLocation(schemaLocation);
this.delimiter = delimiter;
this.csvFormat = CSVFormat.DEFAULT.builder().setDelimiter(delimiter).build();
}

public StringToGenericRecordFn(String delimiter) {
this.delimiter = delimiter;
this.csvFormat = CSVFormat.DEFAULT.builder().setDelimiter(delimiter).build();
}

public StringToGenericRecordFn withSchemaLocation(String schemaLocation) {
Expand All @@ -712,8 +712,7 @@
@ProcessElement
public void processElement(ProcessContext context) throws IllegalArgumentException {
GenericRecord genericRecord = new GenericData.Record(schema);
String[] rowValue =
Splitter.on(delimiter).splitToList(context.element()).toArray(new String[0]);
CSVRecord csvRecord = parseString(context.element(), csvFormat);
List<Schema.Field> fields = schema.getFields();

try {
Expand All @@ -728,18 +727,18 @@

// Check if Csv data is null.
if ((dataType1.equals("null") || dataType2.equals("null"))
&& rowValue[index].length() == 0) {
&& csvRecord.get(index).isEmpty()) {
genericRecord.put(field.name(), null);
} else {
// Add valid data type to generic record.
if (dataType1.equals("null")) {
populateGenericRecord(genericRecord, dataType2, rowValue[index], field.name());
populateGenericRecord(genericRecord, dataType2, csvRecord.get(index), field.name());

Check warning on line 735 in v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/CsvConverters.java

View check run for this annotation

Codecov / codecov/patch

v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/CsvConverters.java#L735

Added line #L735 was not covered by tests
} else {
populateGenericRecord(genericRecord, dataType1, rowValue[index], field.name());
populateGenericRecord(genericRecord, dataType1, csvRecord.get(index), field.name());

Check warning on line 737 in v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/CsvConverters.java

View check run for this annotation

Codecov / codecov/patch

v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/CsvConverters.java#L737

Added line #L737 was not covered by tests
}
}
} else {
populateGenericRecord(genericRecord, fieldType, rowValue[index], field.name());
populateGenericRecord(genericRecord, fieldType, csvRecord.get(index), field.name());
}
}
} catch (ArrayIndexOutOfBoundsException e) {
Expand All @@ -750,6 +749,16 @@
context.output(genericRecord);
}

private static CSVRecord parseString(String element, CSVFormat format) {
try (CSVParser parser = CSVParser.parse(element, format)) {
List<CSVRecord> records = parser.getRecords();
return records.get(0);
} catch (IOException e) {
LOG.error(e.getMessage());
throw new RuntimeException(e);

Check warning on line 758 in v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/CsvConverters.java

View check run for this annotation

Codecov / codecov/patch

v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/CsvConverters.java#L756-L758

Added lines #L756 - L758 were not covered by tests
}
}

private void populateGenericRecord(
GenericRecord genericRecord, String fieldType, String data, String fieldName) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,18 @@ public class CsvConvertersTest {
private static final String NO_HEADER_CSV_FILE_PATH =
Resources.getResource(CSV_RESOURCES_DIR + "no_header.csv").getPath();

private static final String RFC4180_NO_HEADER_CSV_FILE_PATH =
Resources.getResource(CSV_RESOURCES_DIR + "rfc4180_no_header.csv").getPath();

private static final String HEADER_CSV_FILE_PATH =
Resources.getResource(CSV_RESOURCES_DIR + "with_headers.csv").getPath();

private static final String TEST_JSON_SCHEMA_PATH =
Resources.getResource(CSV_RESOURCES_DIR + "testSchema.json").getPath();

private static final String TEST_RFC4180_AVRO_SCHEMA_PATH =
Resources.getResource(CSV_RESOURCES_DIR + "testRFC4180AvroSchema.json").getPath();

private static final String TEST_AVRO_SCHEMA_PATH =
Resources.getResource(CSV_RESOURCES_DIR + "testAvroSchema.json").getPath();

Expand All @@ -87,6 +93,8 @@ public class CsvConvertersTest {

private static final String RECORD_STRING = "007,CA,26.23";

private static final String RFC4180_RECORD_STRING = "007,\"CA,AZ\",26.23";

private static final String JSON_STRING_RECORD =
"{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23}";

Expand Down Expand Up @@ -144,6 +152,44 @@ public void testReadNoHeadersCsv() {
pipeline.run();
}

/** Tests {@link CsvConverters.ReadCsv} reads a RFC4180 Csv with no headers correctly. */
@Test
public void testReadRFC4180NoHeadersCsv() {

CsvConverters.CsvPipelineOptions options =
PipelineOptionsFactory.create().as(CsvConverters.CsvPipelineOptions.class);

options.setContainsHeaders(false);
options.setDelimiter(",");
options.setCsvFormat("Default");
options.setInputFileSpec(RFC4180_NO_HEADER_CSV_FILE_PATH);

// Build pipeline with no headers.
PCollectionTuple readCsvOut =
pipeline.apply(
"TestReadRFC4180CsvNoHeaders",
CsvConverters.ReadCsv.newBuilder()
.setCsvFormat(options.getCsvFormat())
.setDelimiter(options.getDelimiter())
.setHasHeaders(options.getContainsHeaders())
.setInputFileSpec(options.getInputFileSpec())
.setHeaderTag(CSV_HEADERS)
.setLineTag(CSV_LINES)
.setFileEncoding(options.getCsvFileEncoding())
.build());

PAssert.that(readCsvOut.get(CSV_LINES))
.satisfies(
collection -> {
String result = collection.iterator().next();
assertThat(result, is(equalTo(RFC4180_RECORD_STRING)));
return null;
});

// Execute pipeline
pipeline.run();
}

/** Tests {@link CsvConverters.ReadCsv} reads a Csv with headers correctly. */
@Test
public void testReadWithHeadersCsv() {
Expand Down Expand Up @@ -563,6 +609,44 @@ public void testStringToGenericRecord() {
pipeline.run();
}

/**
* Tests if {@link CsvConverters.StringToGenericRecordFn} creates a proper GenericRecord with
* RFC4180 Csv record.
*/
@Test
public void testStringToGenericRecordWithRFC4180Csv() {
Schema schema = SchemaUtils.getAvroSchema(TEST_RFC4180_AVRO_SCHEMA_PATH);

GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("id", "007");
genericRecord.put("states", "CA,AZ");
genericRecord.put("price", 26.23);

PCollection<GenericRecord> pCollection =
pipeline
.apply(
"ReadCsvFile",
CsvConverters.ReadCsv.newBuilder()
.setHasHeaders(false)
.setInputFileSpec(RFC4180_NO_HEADER_CSV_FILE_PATH)
.setHeaderTag(CSV_HEADERS)
.setLineTag(CSV_LINES)
.setCsvFormat("Default")
.setDelimiter(",")
.setFileEncoding(CSV_FILE_ENCODING)
.build())
.get(CSV_LINES)
.apply(
"ConvertStringToGenericRecord",
ParDo.of(
new CsvConverters.StringToGenericRecordFn(TEST_RFC4180_AVRO_SCHEMA_PATH, ",")))
.setCoder(AvroCoder.of(GenericRecord.class, schema));

PAssert.that(pCollection).containsInAnyOrder(genericRecord);

pipeline.run();
}

/**
* Tests {@link CsvConverters.StringToGenericRecordFn} throws an exception if incorrect header
* information is provided. (for example, if a Csv file containing headers is passed and
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
007,"CA,AZ",26.23
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"type" : "record",
"name" : "test_file",
"namespace" : "com.test.avro",
"fields" : [
{
"name": "id",
"type": "string"
},
{
"name": "states",
"type": "string"
},
{
"name": "price",
"type": "double"
}
]
}