Skip to content

Commit ec3da9d

Browse files
Kawronjavabrett
andauthored
Add support for parsing array in HEC formatted messages (#449)
* Initial test set-up, added exception to logging. * Issue #444: Changed all supplementary fields to Map<String,Object> to support multi-value fields/arrays. Fixed #444. * Added index and event values to test data. --------- Co-authored-by: Brett Randall <[email protected]>
1 parent 827376a commit ec3da9d

File tree

10 files changed

+70
-26
lines changed

10 files changed

+70
-26
lines changed

src/main/java/com/splunk/hecclient/Event.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -240,15 +240,15 @@ public final Object getTied() {
240240
return tied;
241241
}
242242

243-
public Event addFields(final Map<String, String> fields) {
243+
public Event addFields(final Map<String, Object> fields) {
244244
return this;
245245
}
246246

247-
public Event setFields(final Map<String, String> fields) {
247+
public Event setFields(final Map<String, Object> fields) {
248248
return this;
249249
}
250250

251-
public Map<String, String> getFields() {
251+
public Map<String, Object> getFields() {
252252
return null;
253253
}
254254

src/main/java/com/splunk/hecclient/EventBatch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public abstract class EventBatch {
4747
public abstract void add(Event event);
4848
public abstract EventBatch createFromThis();
4949

50-
public final void addExtraFields(final Map<String, String> fields) {
50+
public final void addExtraFields(final Map<String, Object> fields) {
5151
// recalculate the batch length since we inject more meta data to each event
5252
int newLength = 0;
5353
for (final Event event: events) {

src/main/java/com/splunk/hecclient/HecChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
final class HecChannel {
2424
private String id;
25-
private Map<String, String> chField;
25+
private Map<String, Object> chField;
2626
private IndexerInf indexer;
2727
private boolean isAvailable;
2828

src/main/java/com/splunk/hecclient/JsonEvent.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
*/
3434
@JsonInclude(JsonInclude.Include.NON_NULL)
3535
public final class JsonEvent extends Event {
36-
private Map<String, String> fields;
36+
private Map<String, Object> fields;
3737

3838
/**
3939
* Creates a new json event.
@@ -67,7 +67,7 @@ public JsonEvent(Object data, Object tied) {
6767
* @since 1.0
6868
*/
6969
@Override
70-
public JsonEvent addFields(final Map<String, String> extraFields) {
70+
public JsonEvent addFields(final Map<String, Object> extraFields) {
7171
if (extraFields == null || extraFields.isEmpty()) {
7272
return this;
7373
}
@@ -93,7 +93,7 @@ public JsonEvent addFields(final Map<String, String> extraFields) {
9393
* @since 1.0
9494
*/
9595
@Override
96-
public JsonEvent setFields(final Map<String, String> extraFields) {
96+
public JsonEvent setFields(final Map<String, Object> extraFields) {
9797
fields = extraFields;
9898
invalidate();
9999
return this;
@@ -108,7 +108,7 @@ public JsonEvent setFields(final Map<String, String> extraFields) {
108108
* @since 1.0
109109
*/
110110
@Override
111-
public Map<String, String> getFields() {
111+
public Map<String, Object> getFields() {
112112
return fields;
113113
}
114114

src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
245245

246246
final String lineBreaker;
247247
final boolean useRecordTimestamp;
248-
final Map<String, String> enrichments;
248+
final Map<String, Object> enrichments;
249249
final boolean trackData;
250250

251251
final boolean hasTrustStorePath;
@@ -461,13 +461,13 @@ private static String[] split(String data, String sep) {
461461
return null;
462462
}
463463

464-
private static Map<String, String> parseEnrichments(String enrichment) {
464+
private static Map<String, Object> parseEnrichments(String enrichment) {
465465
String[] kvs = split(enrichment, ",");
466466
if (kvs == null) {
467467
return null;
468468
}
469469

470-
Map<String, String> enrichmentKvs = new HashMap<>();
470+
Map<String, Object> enrichmentKvs = new HashMap<>();
471471
for (final String kv : kvs) {
472472
String[] kvPairs = split(kv, "=");
473473
if (kvPairs.length != 2) {

src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ private Event createHecEventFrom(final SinkRecord record) {
404404
event.setTied(record);
405405
event.addFields(connectorConfig.enrichments);
406406
} catch(Exception e) {
407-
log.error("event does not follow correct HEC pre-formatted format: {}", record.value().toString());
407+
log.error("event does not follow correct HEC pre-formatted format: {}", record.value().toString(), e);
408408
event = createHECEventNonFormatted(record);
409409
}
410410
} else {
@@ -416,7 +416,7 @@ private Event createHecEventFrom(final SinkRecord record) {
416416
}
417417

418418
if (connectorConfig.trackData) {
419-
Map<String, String> trackMetas = new HashMap<>();
419+
Map<String, Object> trackMetas = new HashMap<>();
420420
trackMetas.put("kafka_offset", String.valueOf(record.kafkaOffset()));
421421
trackMetas.put("kafka_timestamp", String.valueOf(record.timestamp()));
422422
trackMetas.put("kafka_topic", record.topic());
@@ -459,7 +459,7 @@ private Event addHeaders(Event event, SinkRecord record) {
459459
// "custom_header_1,custom_header_2,custom_header_3"
460460
if (!connectorConfig.headerCustom.isEmpty()) {
461461
String[] customHeaders = connectorConfig.headerCustom.split(",\\s?");
462-
Map<String, String> headerMap = new HashMap<>();
462+
Map<String, Object> headerMap = new HashMap<>();
463463
for (String header : customHeaders) {
464464
Header customHeader = headers.lastWithName(header);
465465
if (customHeader != null) {

src/test/java/com/splunk/hecclient/JsonEvenBatchTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void setterGetter() {
113113
Assert.assertEquals(1, events.size());
114114

115115
// Add extra fields
116-
Map<String, String> fields = new HashMap<>();
116+
Map<String, Object> fields = new HashMap<>();
117117
fields.put("hello", "world");
118118
batch.addExtraFields(fields);
119119

src/test/java/com/splunk/hecclient/JsonEventTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,14 @@ public void addFields() {
6565
Assert.assertNull(event.getFields());
6666

6767
// empty extra fields
68-
Map<String, String> fields = new HashMap<>();
68+
Map<String, Object> fields = new HashMap<>();
6969
event.addFields(fields);
7070
Assert.assertNull(event.getFields());
7171

7272
// one item
7373
fields.put("ni", "hao");
7474
event.addFields(fields);
75-
Map<String, String> fieldsGot = event.getFields();
75+
Map<String, Object> fieldsGot = event.getFields();
7676
Assert.assertNotNull(fieldsGot);
7777
Assert.assertEquals(false, fieldsGot.isEmpty());
7878
Assert.assertEquals(1, fieldsGot.size());
@@ -180,15 +180,15 @@ public void getterSetter() {
180180
event.setTied("hao");
181181
Assert.assertEquals("hao", event.getTied());
182182

183-
Map<String, String> fields = new HashMap<>();
183+
Map<String, Object> fields = new HashMap<>();
184184
fields.put("hello", "world");
185185
event.setFields(fields);
186186
Assert.assertEquals(fields, event.getFields());
187187

188-
Map<String, String> moreFields = new HashMap<>();
188+
Map<String, Object> moreFields = new HashMap<>();
189189
moreFields.put("ni", "hao");
190190
event.addFields(moreFields);
191-
Map<String, String> got = event.getFields();
191+
Map<String, Object> got = event.getFields();
192192
Assert.assertNotNull(got);
193193
Assert.assertEquals(2, got.size());
194194
Assert.assertEquals("world", got.get("hello"));
@@ -219,7 +219,7 @@ private void doSerialize(Object data, SerialAndDeserial sad) {
219219
String tied = "tied";
220220
Event event = new JsonEvent(data, tied);
221221

222-
Map<String, String> fields = new HashMap<>();
222+
Map<String, Object> fields = new HashMap<>();
223223
fields.put("ni", "hao");
224224
event.addFields(fields);
225225
event.setHost("localhost");
@@ -239,7 +239,7 @@ private void doSerialize(Object data, SerialAndDeserial sad) {
239239
Assert.assertEquals("test-sourcetype", deserialized.getSourcetype());
240240
Assert.assertEquals(event.getTime(), deserialized.getTime());
241241

242-
Map<String, String> fieldsGot = deserialized.getFields();
242+
Map<String, Object> fieldsGot = deserialized.getFields();
243243
Assert.assertEquals("hao", fieldsGot.get("ni"));
244244
}
245245
}

src/test/java/com/splunk/hecclient/RawEventTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ private void writeTo(final String lineBreaker) {
191191
@Test
192192
public void getterSetter() {
193193
Event event = new RawEvent("ni", null);
194-
Map<String, String> m = new HashMap<String, String>();
194+
Map<String, Object> m = new HashMap<>();
195195
m.put("hello", "world");
196196
event.setFields(m);
197197
Assert.assertNull(event.getFields()); // we ignore extra fields for raw event

src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,12 @@
2121
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2222
import org.apache.kafka.common.TopicPartition;
2323
import org.apache.kafka.common.config.ConfigException;
24-
import org.apache.kafka.common.header.Header;
2524
import org.apache.kafka.common.record.TimestampType;
2625
import org.apache.kafka.connect.errors.RetriableException;
2726
import org.apache.kafka.connect.sink.SinkRecord;
2827
import org.junit.Assert;
2928
import org.junit.Test;
3029

31-
import java.text.ParseException;
3230
import java.text.SimpleDateFormat;
3331
import java.time.Instant;
3432
import java.util.*;
@@ -267,6 +265,52 @@ public void putWithRawAndAck() {
267265
putWithSuccess(true, true);
268266
}
269267

268+
@Test
269+
public void checkFormattedEvent() {
270+
271+
SplunkSinkTask task = new SplunkSinkTask();
272+
UnitUtil uu = new UnitUtil(0);
273+
Map<String, String> config = uu.createTaskConfig();
274+
config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(false));
275+
config.put(SplunkSinkConnectorConfig.HEC_EVENT_FORMATTED_CONF, String.valueOf(true));
276+
277+
Collection<SinkRecord> record = createSinkRecords(
278+
1, "{" +
279+
"\"index\":\"main\"," +
280+
"\"event\":\"Hello, world!\"," +
281+
"\"host\":\"host-01\"," +
282+
"\"source\":\"bu\"," +
283+
"\"fields\":{\"foo\":\"bar\",\"CLASS\":\"class1\",\"cust_id\":[\"000013934\",\"000013935\"]}}"
284+
);
285+
286+
HecMock hec = new HecMock(task);
287+
hec.setSendReturnResult(HecMock.success);
288+
task.setHec(hec);
289+
task.start(config);
290+
task.put(record);
291+
292+
List<EventBatch> batches = hec.getBatches();
293+
for (Iterator<EventBatch> iter = batches.listIterator(); iter.hasNext();) {
294+
EventBatch batch = iter.next();
295+
List<Event> event_list = batch.getEvents();
296+
Iterator<Event> iterator = event_list.listIterator() ;
297+
Event event = iterator.next();
298+
299+
Assert.assertEquals("host-01", event.getHost());
300+
301+
Assert.assertEquals("bar", event.getFields().get("foo"));
302+
303+
Object custIdObject = event.getFields().get("cust_id");
304+
Assert.assertTrue(custIdObject instanceof List);
305+
@SuppressWarnings("unchecked")
306+
List<String> custIdList = (List<String>) custIdObject;
307+
final List<String> expectedCustIdList = Arrays.asList("000013934", "000013935");
308+
Assert.assertEquals(expectedCustIdList, custIdList);
309+
break;
310+
}
311+
task.stop();
312+
}
313+
270314
@Test
271315
public void checkExtractedTimestamp() {
272316

0 commit comments

Comments
 (0)