Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add log and metrics on TBase message dataconverter usage #986

Merged
merged 1 commit into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.gson.reflect.TypeToken;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import com.uber.m3.tally.Scope;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
Expand All @@ -48,12 +49,22 @@ public final class JsonDataConverter implements DataConverter {
private static final String TYPE_FIELD_NAME = "type";
private static final String JSON_CONVERTER_TYPE = "JSON";
private static final String CLASS_NAME_FIELD_NAME = "className";
private static Scope metricsScope;
private final Gson gson;

public static DataConverter getInstance() {
return INSTANCE;
}

/**
* Used to set the metrics scope for this class.
*
* @param metricsScope metrics scope to set
*/
public static void setMetricsScope(Scope metricsScope) {
JsonDataConverter.metricsScope = metricsScope;
}

private JsonDataConverter() {
this((b) -> b);
}
Expand All @@ -68,7 +79,7 @@ public JsonDataConverter(Function<GsonBuilder, GsonBuilder> builderInterceptor)
new GsonBuilder()
.serializeNulls()
.registerTypeAdapterFactory(new ThrowableTypeAdapterFactory())
.registerTypeAdapterFactory(new TBaseTypeAdapterFactory())
.registerTypeAdapterFactory(new TBaseTypeAdapterFactory(metricsScope))
.registerTypeAdapterFactory(new TEnumTypeAdapterFactory());
GsonBuilder intercepted = builderInterceptor.apply(gsonBuilder);
gson = intercepted.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,30 @@
import com.google.gson.reflect.TypeToken;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import com.uber.m3.tally.Scope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TJSONProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Special handling of TBase message serialization and deserialization. This is to support for
* inline Thrift fields in Java class.
*/
public class TBaseTypeAdapterFactory implements TypeAdapterFactory {

private static final Logger logger = LoggerFactory.getLogger(TBaseTypeAdapterFactory.class);
private final Scope metricsScope;

public TBaseTypeAdapterFactory(Scope metricsScope) {
this.metricsScope = metricsScope;
}

@Override
public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> typeToken) {
// this class only serializes 'TBase' and its subtypes
Expand All @@ -47,19 +57,31 @@ public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> typeToken) {
new TypeAdapter<T>() {
@Override
public void write(JsonWriter jsonWriter, T value) throws IOException {
if (metricsScope != null) {
metricsScope.counter("tbase_message_write").inc(1);
}
try {
String result =
newThriftSerializer().toString((TBase) value, StandardCharsets.UTF_8.name());
jsonWriter.value(result);
logger.warn(
"TBase message will no longer be support in cadence-java-client V4, payload {}",
result);
} catch (TException e) {
throw new DataConverterException("Failed to serialize TBase", e);
}
}

@Override
public T read(JsonReader jsonReader) throws IOException {
if (metricsScope != null) {
metricsScope.counter("tbase_message_read").inc(1);
}
String value = jsonReader.nextString();
try {
logger.warn(
"TBase message will no longer be support in cadence-java-client V4, payload {}",
value);
@SuppressWarnings("unchecked")
T instance = (T) typeToken.getRawType().getConstructor().newInstance();
newThriftDeserializer()
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/uber/cadence/worker/WorkerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.uber.cadence.PollForDecisionTaskResponse;
import com.uber.cadence.client.WorkflowClient;
import com.uber.cadence.converter.DataConverter;
import com.uber.cadence.converter.JsonDataConverter;
import com.uber.cadence.internal.common.InternalUtils;
import com.uber.cadence.internal.metrics.MetricsTag;
import com.uber.cadence.internal.replay.DeciderCache;
Expand Down Expand Up @@ -107,6 +108,9 @@ public WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factory
return;
}

// initialize the JsonDataConverter with the metrics scope
JsonDataConverter.setMetricsScope(workflowClient.getOptions().getMetricsScope());

Scope stickyScope =
workflowClient
.getOptions()
Expand Down