Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Strict json writer support #258

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ val scalatestVersion = "2.2.6"
sparkVersion := sys.props.getOrElse("spark.version", "2.3.1")

val hadoopVersion = sys.props.getOrElse("hadoop.version", "3.1.0")
val hiveVersion = sys.props.getOrElse("hive.version", "3.0.0")
val hiveVersion = sys.props.getOrElse("hive.version", "3.1.0")
val log4j2Version = sys.props.getOrElse("log4j2.version", "2.4.1")
val tezVersion = sys.props.getOrElse("tez.version", "0.9.1")
val thriftVersion = sys.props.getOrElse("thrift.version", "0.9.3")
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/hortonworks/spark/sql/hive/llap/HiveStreamingDataSource.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public Optional<DataSourceWriter> createWriter(final String jobId, final StructT

private HiveStreamingDataSourceWriter createDataSourceWriter(final String id, final StructType schema,
final DataSourceOptions options) {
WriterType writerType = WriterType.parse(options.get("writer").orElse("delimited"));
String dbName;
if(options.get("default.db").isPresent()) {
dbName = options.get("default.db").get();
Expand All @@ -42,7 +43,7 @@ private HiveStreamingDataSourceWriter createDataSourceWriter(final String id, fi
LOG.info("OPTIONS - database: {} table: {} partition: {} commitIntervalRows: {} metastoreUri: {} " +
"metastoreKrbPrincipal: {}", dbName, tableName, partition, commitInterval,
metastoreUri, metastoreKrbPrincipal);
return new HiveStreamingDataSourceWriter(id, schema, commitInterval, dbName, tableName,
return new HiveStreamingDataSourceWriter(id, schema, commitInterval, writerType, dbName, tableName,
partitionValues, metastoreUri, metastoreKrbPrincipal);
}

Expand Down
10 changes: 6 additions & 4 deletions src/main/java/com/hortonworks/spark/sql/hive/llap/HiveStreamingDataSourceWriter.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@ public class HiveStreamingDataSourceWriter implements SupportsWriteInternalRow {

private String jobId;
private StructType schema;
private WriterType writerType;
private String db;
private String table;
private List<String> partition;
private long commitIntervalRows;
private String metastoreUri;
private String metastoreKrbPrincipal;

public HiveStreamingDataSourceWriter(String jobId, StructType schema, long commitIntervalRows, String db,
String table, List<String> partition, final String metastoreUri, final String metastoreKrbPrincipal) {
public HiveStreamingDataSourceWriter(String jobId, StructType schema, long commitIntervalRows, WriterType writerType,
String db, String table, List<String> partition, final String metastoreUri, final String metastoreKrbPrincipal) {
this.jobId = jobId;
this.schema = schema;
this.commitIntervalRows = commitIntervalRows;
this.writerType = writerType;
this.db = db;
this.table = table;
this.partition = partition;
Expand All @@ -36,8 +38,8 @@ public HiveStreamingDataSourceWriter(String jobId, StructType schema, long commi

@Override
public DataWriterFactory<InternalRow> createInternalRowWriterFactory() {
return new HiveStreamingDataWriterFactory(jobId, schema, commitIntervalRows, db, table, partition, metastoreUri,
metastoreKrbPrincipal);
return new HiveStreamingDataWriterFactory(jobId, schema, commitIntervalRows, writerType, db, table, partition,
metastoreUri, metastoreKrbPrincipal);
}

@Override
Expand Down
58 changes: 47 additions & 11 deletions src/main/java/com/hortonworks/spark/sql/hive/llap/HiveStreamingDataWriter.java
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package com.hortonworks.spark.sql.hive.llap;

import java.io.ByteArrayInputStream;
import java.io.CharArrayWriter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hive.streaming.HiveStreamingConnection;
import org.apache.hive.streaming.StreamingConnection;
import org.apache.hive.streaming.StreamingException;
import org.apache.hive.streaming.StrictDelimitedInputWriter;
import org.apache.hive.streaming.*;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.json.JacksonGenerator;
import org.apache.spark.sql.catalyst.json.JacksonGeneratorHelper;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;
Expand All @@ -22,10 +23,13 @@
public class HiveStreamingDataWriter implements DataWriter<InternalRow> {
private static Logger LOG = LoggerFactory.getLogger(HiveStreamingDataWriter.class);

private final Charset UTF8_CHARSET = Charset.forName("UTF-8");

private String jobId;
private StructType schema;
private int partitionId;
private int attemptNumber;
private WriterType writerType;
private String db;
private String table;
private List<String> partition;
Expand All @@ -34,14 +38,18 @@ public class HiveStreamingDataWriter implements DataWriter<InternalRow> {
private long commitAfterNRows;
private long rowsWritten = 0;
private String metastoreKrbPrincipal;
private JacksonGenerator jacksonGenerator;
private CharArrayWriter charArrayWriter;


public HiveStreamingDataWriter(String jobId, StructType schema, long commitAfterNRows, int partitionId, int
attemptNumber, String db, String table, List<String> partition, final String metastoreUri,
attemptNumber, WriterType writerType, String db, String table, List<String> partition, final String metastoreUri,
final String metastoreKrbPrincipal) {
this.jobId = jobId;
this.schema = schema;
this.partitionId = partitionId;
this.attemptNumber = attemptNumber;
this.writerType = writerType;
this.db = db;
this.table = table;
this.partition = partition;
Expand All @@ -56,8 +64,13 @@ public HiveStreamingDataWriter(String jobId, StructType schema, long commitAfter
}

private void createStreamingConnection() throws StreamingException {
final StrictDelimitedInputWriter strictDelimitedInputWriter = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',').build();
AbstractRecordWriter inputWriter;
if (writerType == WriterType.JSON) {
inputWriter = StrictJsonWriter.newBuilder().build();
} else {
inputWriter = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',').build();
}
HiveConf hiveConf = new HiveConf();
hiveConf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreUri);
// isolated classloader and shadeprefix are required for reflective instantiation of outputformat class when
Expand All @@ -73,12 +86,17 @@ private void createStreamingConnection() throws StreamingException {
hiveConf.set(MetastoreConf.ConfVars.KERBEROS_PRINCIPAL.getHiveName(), metastoreKrbPrincipal);
}

if (writerType == WriterType.JSON) {
charArrayWriter = new CharArrayWriter();
jacksonGenerator = JacksonGeneratorHelper.createJacksonGenerator(schema, charArrayWriter, null);
}

LOG.info("Creating hive streaming connection..");
streamingConnection = HiveStreamingConnection.newBuilder()
.withDatabase(db)
.withTable(table)
.withStaticPartitionValues(partition)
.withRecordWriter(strictDelimitedInputWriter)
.withRecordWriter(inputWriter)
.withHiveConf(hiveConf)
.withAgentInfo(jobId + "(" + partitionId + ")")
.connect();
Expand All @@ -88,10 +106,20 @@ private void createStreamingConnection() throws StreamingException {

@Override
public void write(final InternalRow record) throws IOException {
String delimitedRow = Joiner.on(",").useForNull("")
.join(scala.collection.JavaConversions.seqAsJavaList(record.toSeq(schema)));
ByteArrayInputStream bais;
if (writerType == WriterType.JSON) {
jacksonGenerator.write(record);
jacksonGenerator.flush();
String jsonRow = charArrayWriter.toString();
charArrayWriter.reset();
bais = new ByteArrayInputStream(jsonRow.getBytes(UTF8_CHARSET));
} else {
String delimitedRow = Joiner.on(",").useForNull("")
.join(scala.collection.JavaConversions.seqAsJavaList(record.toSeq(schema)));
bais = new ByteArrayInputStream(delimitedRow.getBytes(UTF8_CHARSET));
}
try {
streamingConnection.write(delimitedRow.getBytes(Charset.forName("UTF-8")));
streamingConnection.write(bais);
rowsWritten++;
if (rowsWritten > 0 && commitAfterNRows > 0 && (rowsWritten % commitAfterNRows == 0)) {
LOG.info("Committing transaction after rows: {}", rowsWritten);
Expand All @@ -113,6 +141,10 @@ public WriterCommitMessage commit() throws IOException {
String msg = "Committed jobId: " + jobId + " partitionId: " + partitionId + " attemptNumber: " + attemptNumber +
" connectionStats: " + streamingConnection.getConnectionStats();
streamingConnection.close();
if (writerType == WriterType.JSON) {
jacksonGenerator.close();
charArrayWriter.close();
}
LOG.info("Closing streaming connection on commit. Msg: {} rowsWritten: {}", rowsWritten);
return new SimpleWriterCommitMessage(msg);
}
Expand All @@ -128,6 +160,10 @@ public void abort() throws IOException {
String msg = "Aborted jobId: " + jobId + " partitionId: " + partitionId + " attemptNumber: " + attemptNumber +
" connectionStats: " + streamingConnection.getConnectionStats();
streamingConnection.close();
if (writerType == WriterType.JSON) {
jacksonGenerator.close();
charArrayWriter.close();
}
LOG.info("Closing streaming connection on abort. Msg: {} rowsWritten: {}", msg, rowsWritten);
}
}
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/com/hortonworks/spark/sql/hive/llap/HiveStreamingDataWriterFactory.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ public class HiveStreamingDataWriterFactory implements DataWriterFactory<Interna
private String jobId;
private StructType schema;
private long commitIntervalRows;
private WriterType writerType;
private String db;
private String table;
private List<String> partition;
private String metastoreUri;
private String metastoreKrbPrincipal;

public HiveStreamingDataWriterFactory(String jobId, StructType schema, long commitIntervalRows, String db,
String table, List<String> partition, final String metastoreUri, final String metastoreKrbPrincipal) {
public HiveStreamingDataWriterFactory(String jobId, StructType schema, long commitIntervalRows, WriterType writerType,
String db, String table, List<String> partition, final String metastoreUri, final String metastoreKrbPrincipal) {
this.jobId = jobId;
this.schema = schema;
this.writerType = writerType;
this.db = db;
this.table = table;
this.partition = partition;
Expand All @@ -38,7 +40,7 @@ public DataWriter<InternalRow> createDataWriter(int partitionId, int attemptNumb
ClassLoader isolatedClassloader = HiveIsolatedClassLoader.isolatedClassLoader();
try {
Thread.currentThread().setContextClassLoader(isolatedClassloader);
return new HiveStreamingDataWriter(jobId, schema, commitIntervalRows, partitionId, attemptNumber, db,
return new HiveStreamingDataWriter(jobId, schema, commitIntervalRows, partitionId, attemptNumber, writerType, db,
table, partition, metastoreUri, metastoreKrbPrincipal);
} finally {
Thread.currentThread().setContextClassLoader(restoredClassloader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected RecordReader<?, ArrowWrapperWritable> getRecordReader(LlapInputSplit s
attemptId,
childAllocatorReservation,
arrowAllocatorMax);
LlapBaseInputFormat input = new LlapBaseInputFormat(true, allocator);
LlapBaseInputFormat input = new LlapBaseInputFormat(true, arrowAllocatorMax);
return input.getRecordReader(split, conf, null);
}

Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/hortonworks/spark/sql/hive/llap/WriterType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.hortonworks.spark.sql.hive.llap;

public enum WriterType {
DELIMITED,
JSON;

public static WriterType parse(String writerTypeName){
WriterType type;
if(writerTypeName.equals("delimited")) {
type = DELIMITED;
} else if(writerTypeName.equals("json")) {
type = JSON;
} else {
throw new IllegalArgumentException("Use either delimited or json");
}
return type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.List;

import com.hortonworks.spark.sql.hive.llap.HiveWarehouseSession;
import com.hortonworks.spark.sql.hive.llap.WriterType;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.StreamWriteSupport;
Expand All @@ -25,6 +26,7 @@ public StreamWriter createStreamWriter(final String queryId, final StructType sc

private HiveStreamingDataSourceWriter createDataSourceWriter(final String id, final StructType schema,
final DataSourceOptions options) {
WriterType writerType = WriterType.parse(options.get("writer").orElse("delimited"));
String dbName = null;
if(options.get("default.db").isPresent()) {
dbName = options.get("default.db").get();
Expand All @@ -38,7 +40,7 @@ private HiveStreamingDataSourceWriter createDataSourceWriter(final String id, fi
String metastoreKerberosPrincipal = options.get("metastoreKrbPrincipal").orElse(null);
LOG.info("OPTIONS - database: {} table: {} partition: {} metastoreUri: {} metastoreKerberosPrincipal: {}",
dbName, tableName, partition, metastoreUri, metastoreKerberosPrincipal);
return new HiveStreamingDataSourceWriter(id, schema, dbName, tableName,
return new HiveStreamingDataSourceWriter(id, schema, writerType, dbName, tableName,
partitionValues, metastoreUri, metastoreKerberosPrincipal);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.List;

import com.hortonworks.spark.sql.hive.llap.WriterType;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow;
Expand All @@ -18,16 +19,18 @@ public class HiveStreamingDataSourceWriter implements SupportsWriteInternalRow,

private String jobId;
private StructType schema;
private WriterType writerType;
private String db;
private String table;
private List<String> partition;
private String metastoreUri;
private String metastoreKerberosPrincipal;

public HiveStreamingDataSourceWriter(String jobId, StructType schema, String db,
public HiveStreamingDataSourceWriter(String jobId, StructType schema, WriterType writerType, String db,
String table, List<String> partition, final String metastoreUri, final String metastoreKerberosPrincipal) {
this.jobId = jobId;
this.schema = schema;
this.writerType = writerType;
this.db = db;
this.table = table;
this.partition = partition;
Expand All @@ -38,8 +41,8 @@ public HiveStreamingDataSourceWriter(String jobId, StructType schema, String db,
@Override
public DataWriterFactory<InternalRow> createInternalRowWriterFactory() {
// for the streaming case, commit transaction happens on task commit() (atleast-once), so interval is set to -1
return new HiveStreamingDataWriterFactory(jobId, schema, -1, db, table, partition, metastoreUri,
metastoreKerberosPrincipal);
return new HiveStreamingDataWriterFactory(jobId, schema, -1, writerType, db, table, partition,
metastoreUri, metastoreKerberosPrincipal);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*

*/

package org.apache.spark.sql.catalyst.json

import java.io.Writer

import org.apache.spark.sql.types._

object JacksonGeneratorHelper {

def createJacksonGenerator(
dataType: DataType,
writer: Writer,
options: JSONOptions = new JSONOptions( Map.empty, "UTC")): JacksonGenerator = {

new JacksonGenerator(dataType, writer, options)

}

}