Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 101 additions & 15 deletions docs/development/extensions-contrib/thrift.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,114 @@ title: "Thrift"

To use this Apache Druid extension, [include](../../configuration/extensions.md#loading-extensions) `druid-thrift-extensions` in the extensions load list.

This extension enables Druid to ingest thrift compact data online (`ByteBuffer`) and offline (SequenceFile of type `<Writable, BytesWritable>` or LzoThriftBlock File).
This extension enables Druid to ingest Thrift-encoded data from streaming sources such as Kafka and Kinesis, as well as from Hadoop batch jobs reading SequenceFile or LzoThriftBlock files. The binary, compact, and JSON Thrift wire protocols are all supported, with optional Base64 encoding.

You may want to use another version of thrift, change the dependency in pom and compile yourself.

## Thrift input format

Thrift-encoded data for streaming ingestion (Kafka, Kinesis) can be ingested using the Thrift [input format](../../ingestion/data-formats.md#input-format). It supports `flattenSpec` for extracting fields from nested Thrift structs using JSONPath expressions.

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | Must be `thrift` | yes |
| thriftClass | String | Fully qualified class name of the Thrift-generated `TBase` class to deserialize into. | yes |
| thriftJar | String | Path to a JAR file containing the Thrift class. If not provided, the class is looked up from the classpath. | no |
| flattenSpec | JSON Object | Specifies flattening of nested Thrift structs. See [Flattening nested data](../../ingestion/data-formats.md#flattenspec) for details. | no |

### Example: Kafka ingestion

Consider the following Thrift schema definition:

```
namespace java com.example.druid

struct Author {
1: string firstName;
2: string lastName;
}

struct Book {
1: string date;
2: double price;
3: string title;
4: Author author;
}
```

Compile it to produce `com.example.druid.Book` (and `com.example.druid.Author`) and make the resulting JAR available on the classpath of your Druid processes, or reference it via `thriftJar`.

The following Kafka supervisor spec ingests compact-encoded `Book` messages, using a `flattenSpec` to extract the nested `author.lastName` field:

```json
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "books",
"timestampSpec": {
"column": "date",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"title",
"lastName"
]
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "kafka"
},
"ioConfig": {
"type": "kafka",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"topic": "books",
"inputFormat": {
"type": "thrift",
"thriftClass": "com.example.druid.Book",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "lastName",
"expr": "$.author.lastName"
}
]
}
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
}
}
}
```

## LZO Support

If you plan to read LZO-compressed Thrift files, you will need to download version 0.4.19 of the [hadoop-lzo JAR](https://mvnrepository.com/artifact/com.hadoop.gplcompression/hadoop-lzo/0.4.19) and place it in your `extensions/druid-thrift-extensions` directory.

## Thrift Parser

## Thrift parser (deprecated)

| Field | Type | Description | Required |
| ----------- | ----------- | ---------------------------------------- | -------- |
| type | String | This should say `thrift` | yes |
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be a JSON parseSpec. | yes |
| thriftJar | String | path of thrift jar, if not provided, it will try to find the thrift class in classpath. Thrift jar in batch ingestion should be uploaded to HDFS first and configure `jobProperties` with `"tmpjars":"/path/to/your/thrift.jar"` | no |
| thriftClass | String | classname of thrift | yes |
`ThriftInputRowParser` is the legacy parser-based approach to Thrift ingestion. It is deprecated in favor of `ThriftInputFormat` above and will be removed in a future release.

- Batch Ingestion example - `inputFormat` and `tmpjars` should be set.
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | Must be `thrift` | yes |
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be a JSON parseSpec. | yes |
| thriftJar | String | Path to the Thrift JAR. If not provided, the class is looked up from the classpath. For Hadoop batch ingestion the JAR should be uploaded to HDFS first and `jobProperties` configured with `"tmpjars":"/path/to/your/thrift.jar"`. | no |
| thriftClass | String | Fully qualified class name of the Thrift-generated class. | yes |

This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of inputSpec in ioConfig could be one of `"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"` and `com.twitter.elephantbird.mapreduce.input.LzoThriftBlockInputFormat`. Be careful, when `LzoThriftBlockInputFormat` is used, thrift class must be provided twice.
Batch ingestion example using the HadoopDruidIndexer. The `inputFormat` of `inputSpec` in `ioConfig` can be either `"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"` or `"com.twitter.elephantbird.mapreduce.input.LzoThriftBlockInputFormat"`. When using `LzoThriftBlockInputFormat`, the Thrift class must be provided twice.

```json
{
Expand All @@ -60,7 +147,8 @@ This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of inp
"protocol": "compact",
"parseSpec": {
"format": "json",
...
"timestampSpec": {},
"dimensionsSpec": {}
}
},
"metricsSpec": [],
Expand All @@ -71,15 +159,13 @@ This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of inp
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
// "inputFormat": "com.twitter.elephantbird.mapreduce.input.LzoThriftBlockInputFormat",
"paths": "/user/to/some/book.seq"
}
},
"tuningConfig": {
"type": "hadoop",
"jobProperties": {
"tmpjars":"/user/h_user_profile/du00/druid/test/book.jar",
// "elephantbird.class.for.MultiInputFormat" : "${YOUR_THRIFT_CLASS_NAME}"
"tmpjars": "/user/h_user_profile/du00/druid/test/book.jar"
}
}
}
Expand Down
19 changes: 18 additions & 1 deletion embedded-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,18 @@
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions.contrib</groupId>
<artifactId>druid-thrift-extensions</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>joda-time</groupId>
Expand Down Expand Up @@ -591,7 +603,12 @@
<version>5.5</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.13.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.druid.data.input.protobuf.ProtobufInputFormat;
import org.apache.druid.data.input.protobuf.ProtobufInputRowParser;
import org.apache.druid.data.input.protobuf.SchemaRegistryBasedProtobufBytesDecoder;
import org.apache.druid.data.input.thrift.ThriftExtensionsModule;
import org.apache.druid.data.input.thrift.ThriftInputFormat;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.query.DruidMetrics;
Expand All @@ -54,6 +56,8 @@
import org.apache.druid.testing.embedded.EmbeddedOverlord;
import org.apache.druid.testing.embedded.StreamIngestResource;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.apache.druid.testing.embedded.tools.ThriftEventSerializer;
import org.apache.druid.testing.embedded.tools.WikipediaThriftEvent;
import org.apache.druid.testing.tools.AvroEventSerializer;
import org.apache.druid.testing.tools.AvroSchemaRegistryEventSerializer;
import org.apache.druid.testing.tools.CsvEventSerializer;
Expand Down Expand Up @@ -81,6 +85,8 @@
* <li>CSV</li>
* <li>JSON</li>
* <li>Protobuf (with and without schema registry)</li>
* <li>Thrift</li>
* <li>TSV</li>
* </ul>
*
* This tests both InputFormat and Parser. Parser is deprecated for Streaming Ingestion,
Expand Down Expand Up @@ -138,6 +144,7 @@ public EmbeddedDruidCluster createCluster()
coordinator.addProperty("druid.manager.segments.useIncrementalCache", "ifSynced");
cluster.addExtension(ProtobufExtensionsModule.class)
.addExtension(AvroExtensionsModule.class)
.addExtension(ThriftExtensionsModule.class)
.useLatchableEmitter()
.addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s")
.addResource(streamResource)
Expand Down Expand Up @@ -543,6 +550,28 @@ public void test_tsvDataFormat()
stopSupervisor(supervisorSpec);
}

@Test
@Timeout(30)
public void test_thriftDataFormat()
{
streamResource.createTopicWithPartitions(dataSource, 3);
EventSerializer serializer = new ThriftEventSerializer();
int recordCount = generateStreamAndPublish(dataSource, serializer, false);

ThriftInputFormat inputFormat = new ThriftInputFormat(
new JSONPathSpec(true, null),
null,
WikipediaThriftEvent.class.getName()
);

SupervisorSpec supervisorSpec = createSupervisor(dataSource, dataSource, inputFormat);
final String supervisorId = cluster.callApi().postSupervisor(supervisorSpec);
Assertions.assertEquals(dataSource, supervisorId);

waitForDataAndVerifyIngestedEvents(dataSource, recordCount);
stopSupervisor(supervisorSpec);
}

private void waitForDataAndVerifyIngestedEvents(String dataSource, int expectedCount)
{
// Wait for the task to succeed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testing.embedded.tools;

import org.apache.druid.java.util.common.Pair;
import org.apache.druid.testing.tools.EventSerializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TCompactProtocol;

import java.util.List;

/**
* {@link EventSerializer} that serializes Wikipedia stream events as Thrift compact-encoded
* {@link WikipediaThriftEvent} objects. All field values are converted to strings so that
* integer values like {@code added}, {@code deleted}, and {@code delta} are serialized
* uniformly without requiring a mixed-type Thrift struct.
*/
public class ThriftEventSerializer implements EventSerializer
{
private static final TSerializer SERIALIZER = new TSerializer(new TCompactProtocol.Factory());

@Override
public byte[] serialize(List<Pair<String, Object>> event)
{
WikipediaThriftEvent wikiEvent = new WikipediaThriftEvent();
for (Pair<String, Object> pair : event) {
String value = pair.rhs == null ? null : String.valueOf(pair.rhs);
switch (pair.lhs) {
case "timestamp":
wikiEvent.timestamp = value;
break;
case "page":
wikiEvent.page = value;
break;
case "language":
wikiEvent.language = value;
break;
case "user":
wikiEvent.user = value;
break;
case "unpatrolled":
wikiEvent.unpatrolled = value;
break;
case "newPage":
wikiEvent.newPage = value;
break;
case "robot":
wikiEvent.robot = value;
break;
case "anonymous":
wikiEvent.anonymous = value;
break;
case "namespace":
wikiEvent.namespace = value;
break;
case "continent":
wikiEvent.continent = value;
break;
case "country":
wikiEvent.country = value;
break;
case "region":
wikiEvent.region = value;
break;
case "city":
wikiEvent.city = value;
break;
case "added":
wikiEvent.added = value;
break;
case "deleted":
wikiEvent.deleted = value;
break;
case "delta":
wikiEvent.delta = value;
break;
default:
break;
}
}
try {
return SERIALIZER.serialize(wikiEvent);
}
catch (TException e) {
throw new RuntimeException("Failed to serialize WikipediaThriftEvent", e);
}
}

@Override
public void close()
{
}
}
Loading
Loading