Skip to content

Commit a88a0f6

Browse files
committed
feat(filters): process the same headers but different order of columns for CSV files
Support this scenario with same formats of CSV files but different order of header columns. Implement the process for the same headers but different order of columns for CSV files.
1 parent 2b45914 commit a88a0f6

File tree

2 files changed

+112
-0
lines changed

2 files changed

+112
-0
lines changed

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AbstractDelimitedRowFilter.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public abstract class AbstractDelimitedRowFilter<T extends AbstractRecordFilter<
4242

4343
private StructSchema schema;
4444

45+
private String cachedHeaders;
46+
4547
private final Map<Integer, TypedField> columnsTypesByIndex = new HashMap<>();
4648

4749
/**
@@ -103,6 +105,11 @@ public RecordsIterable<TypedStruct> apply(final FilterContext context,
103105
if (schema == null || isSchemaDynamic()) {
104106
inferSchemaFromRecord(record, columnValues.length);
105107
}
108+
109+
if (schema != null && configs.extractColumnName() != null && shouldInferSchema(record)) {
110+
inferSchemaFromRecord(record, columnValues.length);
111+
}
112+
106113
final TypedStruct struct = buildStructForFields(columnValues);
107114
return RecordsIterable.of(struct);
108115
}
@@ -115,12 +122,23 @@ public boolean isSchemaDynamic() {
115122
configs.isAutoGenerateColumnNames();
116123
}
117124

125+
private boolean shouldInferSchema(TypedStruct record) {
126+
if (cachedHeaders == null) {
127+
return false;
128+
}
129+
final String fieldName = configs.extractColumnName();
130+
String field = record.first(fieldName).getString();
131+
return cachedHeaders.length() == field.length() && !cachedHeaders.equals(field);
132+
}
133+
118134
private void inferSchemaFromRecord(final TypedStruct record, int numColumns) {
119135
schema = Schema.struct();
120136

121137
if (configs.extractColumnName() != null) {
122138
final String fieldName = configs.extractColumnName();
123139
String field = record.first(fieldName).getString();
140+
cachedHeaders = field;
141+
124142
if (field == null) {
125143
throw new FilterException(
126144
"Cannot find field for name '" + fieldName + "' to determine columns names"

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/CSVFilterTest.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,100 @@ public void setUp() {
3939
configs.put(CSVFilter.PARSER_SEPARATOR_CONFIG, ";");
4040
}
4141

42+
@Test
43+
public void should_extract_column_names_from_diff_order_headers() {
44+
configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers");
45+
filter.configure(configs, alias -> null);
46+
47+
RecordsIterable<TypedStruct> output = filter.apply(null, DEFAULT_STRUCT, false);
48+
Assert.assertNotNull(output);
49+
Assert.assertEquals(1, output.size());
50+
51+
final TypedStruct record = output.iterator().next();
52+
Assert.assertEquals("value1", record.getString("col1"));
53+
Assert.assertEquals("2", record.getString("col2"));
54+
Assert.assertEquals("true", record.getString("col3"));
55+
56+
final TypedStruct input1 = TypedStruct.create()
57+
.put("message", "false;3;value2")
58+
.put("headers", Arrays.asList("col3;col2;col1"));
59+
RecordsIterable<TypedStruct> output1 = filter.apply(null, input1, false);
60+
Assert.assertNotNull(output1);
61+
Assert.assertEquals(1, output1.size());
62+
63+
final TypedStruct record1 = output1.iterator().next();
64+
Assert.assertEquals("value2", record1.getString("col1"));
65+
Assert.assertEquals("3", record1.getString("col2"));
66+
Assert.assertEquals("false", record1.getString("col3"));
67+
68+
final TypedStruct input2 = TypedStruct.create()
69+
.put("message", "4;false;value3")
70+
.put("headers", Arrays.asList("col2;col3;col1"));
71+
72+
RecordsIterable<TypedStruct> output2 = filter.apply(null, input2, false);
73+
Assert.assertNotNull(output2);
74+
Assert.assertEquals(1, output2.size());
75+
76+
final TypedStruct record2 = output2.iterator().next();
77+
Assert.assertEquals("value3", record2.getString("col1"));
78+
Assert.assertEquals("4", record2.getString("col2"));
79+
Assert.assertEquals("false", record2.getString("col3"));
80+
}
81+
82+
@Test
83+
public void should_extract_column_names_from_diff_order_headers_and_null_value() {
84+
configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers");
85+
filter.configure(configs, alias -> null);
86+
87+
RecordsIterable<TypedStruct> output = filter.apply(null, DEFAULT_STRUCT, false);
88+
Assert.assertNotNull(output);
89+
Assert.assertEquals(1, output.size());
90+
91+
final TypedStruct record = output.iterator().next();
92+
Assert.assertEquals("value1", record.getString("col1"));
93+
Assert.assertEquals("2", record.getString("col2"));
94+
Assert.assertEquals("true", record.getString("col3"));
95+
96+
final TypedStruct input1 = TypedStruct.create()
97+
.put("message", "false;;")
98+
.put("headers", Arrays.asList("col3;col2;col1"));
99+
RecordsIterable<TypedStruct> output1 = filter.apply(null, input1, false);
100+
Assert.assertNotNull(output1);
101+
Assert.assertEquals(1, output1.size());
102+
103+
final TypedStruct record1 = output1.iterator().next();
104+
Assert.assertNull(record1.getString("col1"));
105+
Assert.assertNull(record1.getString("col2"));
106+
Assert.assertEquals("false", record1.getString("col3"));
107+
}
108+
109+
@Test
110+
public void should_extract_column_names_from_diff_order_headers_and_diff_size() {
111+
configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers");
112+
filter.configure(configs, alias -> null);
113+
114+
RecordsIterable<TypedStruct> output = filter.apply(null, DEFAULT_STRUCT, false);
115+
Assert.assertNotNull(output);
116+
Assert.assertEquals(1, output.size());
117+
118+
final TypedStruct record = output.iterator().next();
119+
Assert.assertEquals("value1", record.getString("col1"));
120+
Assert.assertEquals("2", record.getString("col2"));
121+
Assert.assertEquals("true", record.getString("col3"));
122+
123+
final TypedStruct input1 = TypedStruct.create()
124+
.put("message", "false;4;")
125+
.put("headers", Arrays.asList("col3;col2"));
126+
RecordsIterable<TypedStruct> output1 = filter.apply(null, input1, false);
127+
Assert.assertNotNull(output1);
128+
Assert.assertEquals(1, output1.size());
129+
130+
final TypedStruct record1 = output1.iterator().next();
131+
Assert.assertEquals("false", record1.getString("col1"));
132+
Assert.assertEquals("4", record1.getString("col2"));
133+
Assert.assertNull(record1.getString("col3"));
134+
}
135+
42136
@Test
43137
public void should_auto_generate_schema_given_no_schema_field() {
44138
filter.configure(configs, alias -> null);

0 commit comments

Comments
 (0)