diff --git a/hivemq-mqtt-client-1.0/bin/.gitignore b/hivemq-mqtt-client-1.0/bin/.gitignore new file mode 100644 index 0000000..7eed456 --- /dev/null +++ b/hivemq-mqtt-client-1.0/bin/.gitignore @@ -0,0 +1,2 @@ +/main/ +/test/ diff --git a/hivemq-mqtt-client-1.0/build.gradle b/hivemq-mqtt-client-1.0/build.gradle new file mode 100644 index 0000000..4cb513f --- /dev/null +++ b/hivemq-mqtt-client-1.0/build.gradle @@ -0,0 +1,30 @@ + +// Build.gradle generated for instrumentation module hivemq-mqtt-client-1.0 + +apply plugin: 'java' + +dependencies { + implementation 'com.hivemq:hivemq-mqtt-client:1.0.0' + + // New Relic Java Agent dependencies + implementation 'com.newrelic.agent.java:newrelic-agent:6.0.0' + implementation 'com.newrelic.agent.java:newrelic-api:6.0.0' + implementation fileTree(include: ['*.jar'], dir: '../libs') +} + +jar { + manifest { + attributes 'Implementation-Title': 'com.newrelic.instrumentation.hivemq-mqtt-client-1.0' + attributes 'Implementation-Vendor': 'New Relic' + attributes 'Implementation-Vendor-Id': 'com.newrelic' + attributes 'Implementation-Version': 1.0 + } +} + +verifyInstrumentation { + // Verifier plugin documentation: + // https://github.com/newrelic/newrelic-gradle-verify-instrumentation + // Example: + // passes 'javax.servlet:servlet-api:[2.2,2.5]' + // exclude 'javax.servlet:servlet-api:2.4.public_draft' +} \ No newline at end of file diff --git a/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/MqttRxClient.class b/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/MqttRxClient.class new file mode 100644 index 0000000..5b5042c Binary files /dev/null and b/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/MqttRxClient.class differ diff --git a/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.class b/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.class new file mode 100644 index 0000000..db4de57 Binary files /dev/null and b/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.class differ diff --git a/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttIncomingAckFlowable.class b/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttIncomingAckFlowable.class new file mode 100644 index 0000000..e0bd0eb Binary files /dev/null and b/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttIncomingAckFlowable.class differ diff --git a/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Base.class b/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Base.class new file mode 100644 index 0000000..20172fa Binary files /dev/null and b/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Base.class differ diff --git a/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Default.class b/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Default.class new file mode 100644 index 0000000..9a9618b Binary files /dev/null and b/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Default.class differ diff --git a/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Send.class b/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Send.class new file mode 100644 index 0000000..1268769 Binary files /dev/null and b/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Send.class differ diff --git a/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.class b/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.class new file mode 100644 index 0000000..890b309 Binary files /dev/null and b/hivemq-mqtt-client-1.0/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.class differ diff --git a/hivemq-mqtt-client-1.0/build/classes/java/main/com/nr/instrumentation/hivemq/client/InboundWrapper.class b/hivemq-mqtt-client-1.0/build/classes/java/main/com/nr/instrumentation/hivemq/client/InboundWrapper.class new file mode 100644 index 0000000..0126863 Binary files /dev/null and b/hivemq-mqtt-client-1.0/build/classes/java/main/com/nr/instrumentation/hivemq/client/InboundWrapper.class differ diff --git a/hivemq-mqtt-client-1.0/build/classes/java/main/com/nr/instrumentation/hivemq/client/Mqtt5PublisherWrapper.class b/hivemq-mqtt-client-1.0/build/classes/java/main/com/nr/instrumentation/hivemq/client/Mqtt5PublisherWrapper.class new file mode 100644 index 0000000..39e2336 Binary files /dev/null and b/hivemq-mqtt-client-1.0/build/classes/java/main/com/nr/instrumentation/hivemq/client/Mqtt5PublisherWrapper.class differ diff --git a/hivemq-mqtt-client-1.0/build/classes/java/main/com/nr/instrumentation/hivemq/client/OutboundWrapper.class b/hivemq-mqtt-client-1.0/build/classes/java/main/com/nr/instrumentation/hivemq/client/OutboundWrapper.class new file mode 100644 index 0000000..21c137f Binary files /dev/null and b/hivemq-mqtt-client-1.0/build/classes/java/main/com/nr/instrumentation/hivemq/client/OutboundWrapper.class differ diff --git a/hivemq-mqtt-client-1.0/build/classes/java/main/com/nr/instrumentation/hivemq/client/PublisherAdapter.class b/hivemq-mqtt-client-1.0/build/classes/java/main/com/nr/instrumentation/hivemq/client/PublisherAdapter.class new file mode 100644 index 0000000..6816f6e Binary files /dev/null and b/hivemq-mqtt-client-1.0/build/classes/java/main/com/nr/instrumentation/hivemq/client/PublisherAdapter.class differ diff --git a/hivemq-mqtt-client-1.0/build/classes/java/main/com/nr/instrumentation/hivemq/client/SubscriberWrapper.class b/hivemq-mqtt-client-1.0/build/classes/java/main/com/nr/instrumentation/hivemq/client/SubscriberWrapper.class new file mode 100644 index 0000000..b86d225 Binary files /dev/null and b/hivemq-mqtt-client-1.0/build/classes/java/main/com/nr/instrumentation/hivemq/client/SubscriberWrapper.class differ diff --git a/hivemq-mqtt-client-1.0/build/libs/hivemq-mqtt-client-1.0.jar b/hivemq-mqtt-client-1.0/build/libs/hivemq-mqtt-client-1.0.jar new file mode 100644 index 0000000..c52915f Binary files /dev/null and b/hivemq-mqtt-client-1.0/build/libs/hivemq-mqtt-client-1.0.jar differ diff --git a/hivemq-mqtt-client-1.0/build/tmp/compileJava/source-classes-mapping.txt b/hivemq-mqtt-client-1.0/build/tmp/compileJava/source-classes-mapping.txt new file mode 100644 index 0000000..5fbe9ee --- /dev/null +++ b/hivemq-mqtt-client-1.0/build/tmp/compileJava/source-classes-mapping.txt @@ -0,0 +1,21 @@ +com/nr/instrumentation/hivemq/client/Mqtt5PublisherWrapper.java + com.nr.instrumentation.hivemq.client.Mqtt5PublisherWrapper +com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java + com.hivemq.client.internal.mqtt.message.publish.MqttPublishBuilder + com.hivemq.client.internal.mqtt.message.publish.MqttPublishBuilder$Base + com.hivemq.client.internal.mqtt.message.publish.MqttPublishBuilder$Default + com.hivemq.client.internal.mqtt.message.publish.MqttPublishBuilder$Send +com/nr/instrumentation/hivemq/client/OutboundWrapper.java + com.nr.instrumentation.hivemq.client.OutboundWrapper +com/nr/instrumentation/hivemq/client/PublisherAdapter.java + com.nr.instrumentation.hivemq.client.PublisherAdapter +com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java + com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingPublishFlow +com/nr/instrumentation/hivemq/client/SubscriberWrapper.java + com.nr.instrumentation.hivemq.client.SubscriberWrapper +com/hivemq/client/internal/mqtt/MqttRxClient.java + com.hivemq.client.internal.mqtt.MqttRxClient +com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttIncomingAckFlowable.java + com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttIncomingAckFlowable +com/nr/instrumentation/hivemq/client/InboundWrapper.java + com.nr.instrumentation.hivemq.client.InboundWrapper diff --git a/hivemq-mqtt-client-1.0/build/tmp/jar/MANIFEST.MF b/hivemq-mqtt-client-1.0/build/tmp/jar/MANIFEST.MF new file mode 100644 index 0000000..464f199 --- /dev/null +++ b/hivemq-mqtt-client-1.0/build/tmp/jar/MANIFEST.MF @@ -0,0 +1,7 @@ +Manifest-Version: 1.0 +Implementation-Title: com.newrelic.instrumentation.hivemq-mqtt-client- + 1.0 +Implementation-Version: 1.0 +Implementation-Vendor-Id: com.newrelic +Implementation-Vendor: New Relic + diff --git a/hivemq-mqtt-client-1.0/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java b/hivemq-mqtt-client-1.0/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java new file mode 100644 index 0000000..45b78cf --- /dev/null +++ b/hivemq-mqtt-client-1.0/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java @@ -0,0 +1,21 @@ +package com.hivemq.client.internal.mqtt; + +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.hivemq.client.PublisherAdapter; + +import io.reactivex.Flowable; + +@Weave +public abstract class MqttRxClient { + + @Trace + public Flowable publish(Flowable publishFlowable) { + + publishFlowable = publishFlowable.map(new PublisherAdapter()); + return Weaver.callOriginal(); + } +} diff --git a/hivemq-mqtt-client-1.0/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java b/hivemq-mqtt-client-1.0/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java new file mode 100644 index 0000000..a869f07 --- /dev/null +++ b/hivemq-mqtt-client-1.0/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java @@ -0,0 +1,91 @@ +package com.hivemq.client.internal.mqtt.handler.publish.incoming; + +import org.reactivestreams.Subscriber; + +import com.hivemq.client.internal.mqtt.MqttClientConfig; +import com.hivemq.client.internal.mqtt.handler.util.FlowWithEventLoop; +import com.hivemq.client.mqtt.datatypes.MqttTopic; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.newrelic.api.agent.DestinationType; +import com.newrelic.api.agent.MessageConsumeParameters; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Token; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.weaver.MatchType; +import com.newrelic.api.agent.weaver.NewField; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.hivemq.client.InboundWrapper; + +@Weave(type=MatchType.BaseClass) +public abstract class MqttIncomingPublishFlow extends FlowWithEventLoop { + + @NewField + private Token token = null; + + MqttIncomingPublishFlow(final Subscriber subscriber, final MqttClientConfig clientConfig,final MqttIncomingQosHandler incomingQosHandler) { + super(clientConfig); + } + + @Trace(async=true) + public void onNext(Mqtt5Publish result) { + if(token !=null) { + token.link(); + } + MqttTopic topic = result.getTopic(); + String topicName = topic.toString().replace('/', '_'); //Utils.getTopicName(topic); + Mqtt5UserProperties userProperties = result.getUserProperties(); + MessageConsumeParameters params = MessageConsumeParameters.library("HiveMQ").destinationType(DestinationType.NAMED_TOPIC).destinationName(topicName).inboundHeaders(new InboundWrapper(userProperties)).build(); + + NewRelic.getAgent().getTracedMethod().reportAsExternal(params); + Weaver.callOriginal(); + } + + public void onComplete() { + if(token != null) { + token.expire(); + token = null; + } + Weaver.callOriginal(); + } + + public void onError(Throwable t) { + NewRelic.noticeError(t); + if(token != null) { + token.expire(); + token = null; + } + Weaver.callOriginal(); + } + + @Trace + public void request(long n) { + if(token == null) { + Token t = NewRelic.getAgent().getTransaction().getToken(); + if(t != null && t.isActive()) { + token = t; + } else if(t != null) { + t.expire(); + t = null; + } + } + Weaver.callOriginal(); + } + + protected void onCancel() { + if(token != null) { + token.expire(); + token = null; + } + Weaver.callOriginal(); + } + + void runCancel() { + if(token != null) { + token.expire(); + token = null; + } + Weaver.callOriginal(); + } +} diff --git a/hivemq-mqtt-client-1.0/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttIncomingAckFlowable.java b/hivemq-mqtt-client-1.0/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttIncomingAckFlowable.java new file mode 100644 index 0000000..75ad42a --- /dev/null +++ b/hivemq-mqtt-client-1.0/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttIncomingAckFlowable.java @@ -0,0 +1,40 @@ +package com.hivemq.client.internal.mqtt.handler.publish.outgoing; + +import org.reactivestreams.Subscriber; + +import com.hivemq.client.internal.mqtt.MqttClientConfig; +import com.hivemq.client.internal.mqtt.message.publish.MqttPublish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Token; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.hivemq.client.SubscriberWrapper; + +import io.reactivex.Flowable; + +@Weave +public class MqttIncomingAckFlowable { + + + public MqttIncomingAckFlowable(Flowable publishFlowable, MqttClientConfig clientConfig) { + + } + + @Trace(async=true) + protected void subscribeActual(Subscriber subscriber) { + Token t = NewRelic.getAgent().getTransaction().getToken(); + Token token = null; + if(t != null && t.isActive()) { + token = t; + } else if(t != null) { + t.expire(); + t = null; + } + SubscriberWrapper wrapper = new SubscriberWrapper(subscriber, token); + subscriber = wrapper; + Weaver.callOriginal(); + + } +} diff --git a/hivemq-mqtt-client-1.0/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java b/hivemq-mqtt-client-1.0/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java new file mode 100644 index 0000000..d2423ce --- /dev/null +++ b/hivemq-mqtt-client-1.0/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java @@ -0,0 +1,65 @@ +package com.hivemq.client.internal.mqtt.message.publish; + +import java.util.function.Function; + +import com.hivemq.client.internal.mqtt.datatypes.MqttTopicImpl; +import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertiesImpl; +import com.newrelic.api.agent.DestinationType; +import com.newrelic.api.agent.MessageProduceParameters; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.hivemq.client.OutboundWrapper; + +@Weave +public class MqttPublishBuilder> { + + MqttTopicImpl topic = Weaver.callOriginal(); + + MqttUserPropertiesImpl userProperties = Weaver.callOriginal(); + + MqttPublishBuilder() {} + + MqttPublishBuilder(MqttPublish publish) { + + } + + MqttPublishBuilder(MqttPublishBuilder publishBuilder) { + + } + + @Weave + public static class Send

extends Base> { + + public Send(Function parentConsumer) { + + } + + @Trace(leaf=true) + public P send() { + OutboundWrapper wrapper = new OutboundWrapper(userProperties); + String topicName = topic.toString().replace('/', '_'); + MessageProduceParameters params = MessageProduceParameters.library("HiveMQ").destinationType(DestinationType.NAMED_TOPIC).destinationName(topicName).outboundHeaders(wrapper).build(); + NewRelic.getAgent().getTracedMethod().reportAsExternal(params); + userProperties = wrapper.getCurrent(); + return Weaver.callOriginal(); + } + } + + @Weave + private static class Base> extends MqttPublishBuilder { + + Base() {} + + @SuppressWarnings("unused") + Base(MqttPublish publish) { + super(publish); + } + } + + @Weave + public static class Default extends Base { + + } +} diff --git a/hivemq-mqtt-client-1.0/src/main/java/com/nr/instrumentation/hivemq/client/InboundWrapper.java b/hivemq-mqtt-client-1.0/src/main/java/com/nr/instrumentation/hivemq/client/InboundWrapper.java new file mode 100644 index 0000000..7cce04d --- /dev/null +++ b/hivemq-mqtt-client-1.0/src/main/java/com/nr/instrumentation/hivemq/client/InboundWrapper.java @@ -0,0 +1,40 @@ +package com.nr.instrumentation.hivemq.client; + +import java.util.List; + +import com.hivemq.client.mqtt.datatypes.MqttUtf8String; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; +import com.newrelic.api.agent.HeaderType; +import com.newrelic.api.agent.InboundHeaders; + +public class InboundWrapper implements InboundHeaders { + + Mqtt5UserProperties userProperties = null; + + public InboundWrapper(Mqtt5UserProperties userProperties) { + this.userProperties = userProperties; + } + + @Override + public String getHeader(String name) { + List list = userProperties.asList(); + MqttUtf8String searchFor = MqttUtf8String.of(name); + for(Mqtt5UserProperty property : list) { + MqttUtf8String propertyName = property.getName(); + if(propertyName.equals(searchFor)) { + MqttUtf8String value = property.getValue(); + if(value != null) { + return value.toString(); + } + } + } + return null; + } + + @Override + public HeaderType getHeaderType() { + return HeaderType.MESSAGE; + } + +} diff --git a/hivemq-mqtt-client-1.0/src/main/java/com/nr/instrumentation/hivemq/client/Mqtt5PublisherWrapper.java b/hivemq-mqtt-client-1.0/src/main/java/com/nr/instrumentation/hivemq/client/Mqtt5PublisherWrapper.java new file mode 100644 index 0000000..5c627be --- /dev/null +++ b/hivemq-mqtt-client-1.0/src/main/java/com/nr/instrumentation/hivemq/client/Mqtt5PublisherWrapper.java @@ -0,0 +1,98 @@ +package com.nr.instrumentation.hivemq.client; + +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.OptionalLong; + +import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertiesImpl; +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.datatypes.MqttTopic; +import com.hivemq.client.mqtt.datatypes.MqttUtf8String; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder.Complete; +import com.newrelic.api.agent.NewRelic; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5WillPublish; + +public class Mqtt5PublisherWrapper implements Mqtt5Publish { + + private Mqtt5Publish delegate = null; + + public Mqtt5PublisherWrapper(Mqtt5Publish p) { + delegate = p; + } + + @Override + public MqttTopic getTopic() { + return delegate.getTopic(); + } + + @Override + public Optional getPayload() { + return delegate.getPayload(); + } + + @Override + public byte[] getPayloadAsBytes() { + return delegate.getPayloadAsBytes(); + } + + @Override + public MqttQos getQos() { + return delegate.getQos(); + } + + @Override + public boolean isRetain() { + return delegate.isRetain(); + } + + @Override + public OptionalLong getMessageExpiryInterval() { + return delegate.getMessageExpiryInterval(); + } + + @Override + public Optional getPayloadFormatIndicator() { + return delegate.getPayloadFormatIndicator(); + } + + @Override + public Optional getContentType() { + return delegate.getContentType(); + } + + @Override + public Optional getResponseTopic() { + return delegate.getResponseTopic(); + } + + @Override + public Optional getCorrelationData() { + return delegate.getCorrelationData(); + } + + @Override + public Mqtt5UserProperties getUserProperties() { + Mqtt5UserProperties props = delegate.getUserProperties(); + if(props instanceof MqttUserPropertiesImpl) { + MqttUserPropertiesImpl propsImpl = (MqttUserPropertiesImpl)props; + OutboundWrapper wrapper = new OutboundWrapper(propsImpl); + NewRelic.getAgent().getTracedMethod().addOutboundRequestHeaders(wrapper); + return wrapper.getCurrent(); + } + return props; + } + + @Override + public Mqtt5WillPublish asWill() { + return delegate.asWill(); + } + + @Override + public Complete extend() { + return delegate.extend(); + } + +} diff --git a/hivemq-mqtt-client-1.0/src/main/java/com/nr/instrumentation/hivemq/client/OutboundWrapper.java b/hivemq-mqtt-client-1.0/src/main/java/com/nr/instrumentation/hivemq/client/OutboundWrapper.java new file mode 100644 index 0000000..994739c --- /dev/null +++ b/hivemq-mqtt-client-1.0/src/main/java/com/nr/instrumentation/hivemq/client/OutboundWrapper.java @@ -0,0 +1,42 @@ +package com.nr.instrumentation.hivemq.client; + +import java.util.logging.Level; + +import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertiesImpl; +import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertyImpl; +import com.hivemq.client.internal.mqtt.datatypes.MqttUtf8StringImpl; +import com.hivemq.client.internal.util.collections.ImmutableList; +import com.hivemq.client.internal.util.collections.ImmutableList.Builder; +import com.newrelic.api.agent.HeaderType; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.OutboundHeaders; + +public class OutboundWrapper implements OutboundHeaders { + + MqttUserPropertiesImpl userProperties = null; + + public OutboundWrapper(MqttUserPropertiesImpl up) { + userProperties = up; + } + + @Override + public HeaderType getHeaderType() { + return HeaderType.MESSAGE; + } + + @Override + public void setHeader(String name, String value) { + MqttUtf8StringImpl nameStr = MqttUtf8StringImpl.of(name); + MqttUtf8StringImpl valueStr = MqttUtf8StringImpl.of(value); + MqttUserPropertyImpl property = new MqttUserPropertyImpl(nameStr, valueStr); + + Builder builder = ImmutableList.builder(); + ImmutableList list = builder.addAll(userProperties.asList()).add(property).build(); + userProperties = MqttUserPropertiesImpl.of(list); + NewRelic.getAgent().getLogger().log(Level.INFO,"Set outbound headers to: {0}",userProperties); + } + + public MqttUserPropertiesImpl getCurrent() { + return userProperties; + } +} diff --git a/hivemq-mqtt-client-1.0/src/main/java/com/nr/instrumentation/hivemq/client/PublisherAdapter.java b/hivemq-mqtt-client-1.0/src/main/java/com/nr/instrumentation/hivemq/client/PublisherAdapter.java new file mode 100644 index 0000000..a3210aa --- /dev/null +++ b/hivemq-mqtt-client-1.0/src/main/java/com/nr/instrumentation/hivemq/client/PublisherAdapter.java @@ -0,0 +1,36 @@ +package com.nr.instrumentation.hivemq.client; + +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Token; +import com.newrelic.api.agent.Trace; + +import io.reactivex.functions.Function; + +public class PublisherAdapter implements Function { + + + private static boolean isTransformed = false; + + private Token token = null; + + public PublisherAdapter() { + token = NewRelic.getAgent().getTransaction().getToken(); + if(!isTransformed) { + isTransformed = true; + AgentBridge.instrumentation.retransformUninstrumentedClass(getClass()); + } + } + + @Override + @Trace(async=true) + public Mqtt5Publish apply(Mqtt5Publish source) throws Exception { + if(token != null) { + token.linkAndExpire(); + token = null; + } + return new Mqtt5PublisherWrapper(source); + } + +} diff --git a/hivemq-mqtt-client-1.0/src/main/java/com/nr/instrumentation/hivemq/client/SubscriberWrapper.java b/hivemq-mqtt-client-1.0/src/main/java/com/nr/instrumentation/hivemq/client/SubscriberWrapper.java new file mode 100644 index 0000000..86acb42 --- /dev/null +++ b/hivemq-mqtt-client-1.0/src/main/java/com/nr/instrumentation/hivemq/client/SubscriberWrapper.java @@ -0,0 +1,75 @@ +package com.nr.instrumentation.hivemq.client; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.api.agent.DestinationType; +import com.newrelic.api.agent.MessageProduceParameters; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Segment; +import com.newrelic.api.agent.Token; +import com.newrelic.api.agent.Trace; + +public class SubscriberWrapper implements Subscriber { + + private Subscriber delegate = null; + private Token token = null; + private static boolean isTransformed = false; + private Segment segment = null; + + public SubscriberWrapper(Subscriber d, Token t) { + delegate = d; + token = t; + segment = NewRelic.getAgent().getTransaction().startSegment("HiveMQ-Publish"); + if(!isTransformed) { + isTransformed = true; + AgentBridge.instrumentation.retransformUninstrumentedClass(getClass()); + } + } + + @Override + public void onSubscribe(Subscription s) { + delegate.onSubscribe(s); + } + + @Override + @Trace(async=true) + public void onNext(Mqtt5PublishResult t) { + if(token != null) { + token.link(); + } + Mqtt5Publish result = t.getPublish(); + String topicName = result.getTopic().toString(); + MessageProduceParameters params = MessageProduceParameters.library("HiveMQ").destinationType(DestinationType.NAMED_TOPIC).destinationName(topicName).outboundHeaders(null).build(); + if(segment != null) { + segment.reportAsExternal(params); + segment.end(); + segment = null; + } else { + NewRelic.getAgent().getTracedMethod().reportAsExternal(params); + } + delegate.onNext(t); + } + + @Override + public void onError(Throwable t) { + if(token != null) { + token.expire(); + token = null; + } + delegate.onError(t); + } + + @Override + public void onComplete() { + if(token != null) { + token.expire(); + token = null; + } + delegate.onComplete(); + } + +} diff --git a/hivemq-mqtt-client-1.1/bin/.gitignore b/hivemq-mqtt-client-1.1/bin/.gitignore new file mode 100644 index 0000000..7eed456 --- /dev/null +++ b/hivemq-mqtt-client-1.1/bin/.gitignore @@ -0,0 +1,2 @@ +/main/ +/test/ diff --git a/hivemq-mqtt-client-1.1/build.gradle b/hivemq-mqtt-client-1.1/build.gradle new file mode 100644 index 0000000..d341feb --- /dev/null +++ b/hivemq-mqtt-client-1.1/build.gradle @@ -0,0 +1,30 @@ + +// Build.gradle generated for instrumentation module hivemq-mqtt-client-1.1 + +apply plugin: 'java' + +dependencies { + implementation 'com.hivemq:hivemq-mqtt-client:1.1.0' + + // New Relic Java Agent dependencies + implementation 'com.newrelic.agent.java:newrelic-agent:6.0.0' + implementation 'com.newrelic.agent.java:newrelic-api:6.0.0' + implementation fileTree(include: ['*.jar'], dir: '../libs') +} + +jar { + manifest { + attributes 'Implementation-Title': 'com.newrelic.instrumentation.hivemq-mqtt-client-1.1' + attributes 'Implementation-Vendor': 'New Relic' + attributes 'Implementation-Vendor-Id': 'com.newrelic' + attributes 'Implementation-Version': 1.0 + } +} + +verifyInstrumentation { + // Verifier plugin documentation: + // https://github.com/newrelic/newrelic-gradle-verify-instrumentation + // Example: + // passes 'javax.servlet:servlet-api:[2.2,2.5]' + // exclude 'javax.servlet:servlet-api:2.4.public_draft' +} \ No newline at end of file diff --git a/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/MqttRxClient.class b/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/MqttRxClient.class new file mode 100644 index 0000000..5b5042c Binary files /dev/null and b/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/MqttRxClient.class differ diff --git a/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.class b/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.class new file mode 100644 index 0000000..db4de57 Binary files /dev/null and b/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.class differ diff --git a/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.class b/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.class new file mode 100644 index 0000000..4fb980f Binary files /dev/null and b/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.class differ diff --git a/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Base.class b/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Base.class new file mode 100644 index 0000000..20172fa Binary files /dev/null and b/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Base.class differ diff --git a/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Default.class b/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Default.class new file mode 100644 index 0000000..9a9618b Binary files /dev/null and b/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Default.class differ diff --git a/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Send.class b/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Send.class new file mode 100644 index 0000000..1268769 Binary files /dev/null and b/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Send.class differ diff --git a/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.class b/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.class new file mode 100644 index 0000000..890b309 Binary files /dev/null and b/hivemq-mqtt-client-1.1/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.class differ diff --git a/hivemq-mqtt-client-1.1/build/classes/java/main/com/nr/instrumentation/hivemq/client/InboundWrapper.class b/hivemq-mqtt-client-1.1/build/classes/java/main/com/nr/instrumentation/hivemq/client/InboundWrapper.class new file mode 100644 index 0000000..0126863 Binary files /dev/null and b/hivemq-mqtt-client-1.1/build/classes/java/main/com/nr/instrumentation/hivemq/client/InboundWrapper.class differ diff --git a/hivemq-mqtt-client-1.1/build/classes/java/main/com/nr/instrumentation/hivemq/client/OutboundWrapper.class b/hivemq-mqtt-client-1.1/build/classes/java/main/com/nr/instrumentation/hivemq/client/OutboundWrapper.class new file mode 100644 index 0000000..21c137f Binary files /dev/null and b/hivemq-mqtt-client-1.1/build/classes/java/main/com/nr/instrumentation/hivemq/client/OutboundWrapper.class differ diff --git a/hivemq-mqtt-client-1.1/build/classes/java/main/com/nr/instrumentation/hivemq/client/PublisherAdapter.class b/hivemq-mqtt-client-1.1/build/classes/java/main/com/nr/instrumentation/hivemq/client/PublisherAdapter.class new file mode 100644 index 0000000..cf37577 Binary files /dev/null and b/hivemq-mqtt-client-1.1/build/classes/java/main/com/nr/instrumentation/hivemq/client/PublisherAdapter.class differ diff --git a/hivemq-mqtt-client-1.1/build/classes/java/main/com/nr/instrumentation/hivemq/client/SubscriberWrapper.class b/hivemq-mqtt-client-1.1/build/classes/java/main/com/nr/instrumentation/hivemq/client/SubscriberWrapper.class new file mode 100644 index 0000000..b86d225 Binary files /dev/null and b/hivemq-mqtt-client-1.1/build/classes/java/main/com/nr/instrumentation/hivemq/client/SubscriberWrapper.class differ diff --git a/hivemq-mqtt-client-1.1/build/libs/hivemq-mqtt-client-1.1.jar b/hivemq-mqtt-client-1.1/build/libs/hivemq-mqtt-client-1.1.jar new file mode 100644 index 0000000..c9adaf0 Binary files /dev/null and b/hivemq-mqtt-client-1.1/build/libs/hivemq-mqtt-client-1.1.jar differ diff --git a/hivemq-mqtt-client-1.1/build/tmp/compileJava/source-classes-mapping.txt b/hivemq-mqtt-client-1.1/build/tmp/compileJava/source-classes-mapping.txt new file mode 100644 index 0000000..92175d1 --- /dev/null +++ b/hivemq-mqtt-client-1.1/build/tmp/compileJava/source-classes-mapping.txt @@ -0,0 +1,19 @@ +com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java + com.hivemq.client.internal.mqtt.message.publish.MqttPublishBuilder + com.hivemq.client.internal.mqtt.message.publish.MqttPublishBuilder$Base + com.hivemq.client.internal.mqtt.message.publish.MqttPublishBuilder$Default + com.hivemq.client.internal.mqtt.message.publish.MqttPublishBuilder$Send +com/nr/instrumentation/hivemq/client/OutboundWrapper.java + com.nr.instrumentation.hivemq.client.OutboundWrapper +com/nr/instrumentation/hivemq/client/PublisherAdapter.java + com.nr.instrumentation.hivemq.client.PublisherAdapter +com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java + com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingPublishFlow +com/nr/instrumentation/hivemq/client/SubscriberWrapper.java + com.nr.instrumentation.hivemq.client.SubscriberWrapper +com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.java + com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckFlowable +com/hivemq/client/internal/mqtt/MqttRxClient.java + com.hivemq.client.internal.mqtt.MqttRxClient +com/nr/instrumentation/hivemq/client/InboundWrapper.java + com.nr.instrumentation.hivemq.client.InboundWrapper diff --git a/hivemq-mqtt-client-1.1/build/tmp/jar/MANIFEST.MF b/hivemq-mqtt-client-1.1/build/tmp/jar/MANIFEST.MF new file mode 100644 index 0000000..e80eae6 --- /dev/null +++ b/hivemq-mqtt-client-1.1/build/tmp/jar/MANIFEST.MF @@ -0,0 +1,7 @@ +Manifest-Version: 1.0 +Implementation-Title: com.newrelic.instrumentation.hivemq-mqtt-client- + 1.1 +Implementation-Version: 1.0 +Implementation-Vendor-Id: com.newrelic +Implementation-Vendor: New Relic + diff --git a/hivemq-mqtt-client-1.1/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java b/hivemq-mqtt-client-1.1/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java new file mode 100644 index 0000000..45b78cf --- /dev/null +++ b/hivemq-mqtt-client-1.1/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java @@ -0,0 +1,21 @@ +package com.hivemq.client.internal.mqtt; + +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.hivemq.client.PublisherAdapter; + +import io.reactivex.Flowable; + +@Weave +public abstract class MqttRxClient { + + @Trace + public Flowable publish(Flowable publishFlowable) { + + publishFlowable = publishFlowable.map(new PublisherAdapter()); + return Weaver.callOriginal(); + } +} diff --git a/hivemq-mqtt-client-1.1/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java b/hivemq-mqtt-client-1.1/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java new file mode 100644 index 0000000..194c4a9 --- /dev/null +++ b/hivemq-mqtt-client-1.1/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java @@ -0,0 +1,91 @@ +package com.hivemq.client.internal.mqtt.handler.publish.incoming; + +import org.reactivestreams.Subscriber; + +import com.hivemq.client.internal.mqtt.MqttClientConfig; +import com.hivemq.client.internal.mqtt.handler.util.FlowWithEventLoop; +import com.hivemq.client.mqtt.datatypes.MqttTopic; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.newrelic.api.agent.DestinationType; +import com.newrelic.api.agent.MessageConsumeParameters; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Token; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.weaver.MatchType; +import com.newrelic.api.agent.weaver.NewField; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.hivemq.client.InboundWrapper; + +@Weave(type=MatchType.BaseClass) +public abstract class MqttIncomingPublishFlow extends FlowWithEventLoop { + + @NewField + private Token token = null; + + MqttIncomingPublishFlow(final Subscriber subscriber, final MqttClientConfig clientConfig,final MqttIncomingQosHandler incomingQosHandler) { + super(clientConfig); + } + + @Trace(async=true) + public void onNext(Mqtt5Publish result) { + if(token !=null) { + token.link(); + } + MqttTopic topic = result.getTopic(); + String topicName = topic.toString().replace('/', '_'); + Mqtt5UserProperties userProperties = result.getUserProperties(); + MessageConsumeParameters params = MessageConsumeParameters.library("HiveMQ").destinationType(DestinationType.NAMED_TOPIC).destinationName(topicName).inboundHeaders(new InboundWrapper(userProperties)).build(); + + NewRelic.getAgent().getTracedMethod().reportAsExternal(params); + Weaver.callOriginal(); + } + + public void onComplete() { + if(token != null) { + token.expire(); + token = null; + } + Weaver.callOriginal(); + } + + public void onError(Throwable t) { + NewRelic.noticeError(t); + if(token != null) { + token.expire(); + token = null; + } + Weaver.callOriginal(); + } + + @Trace + public void request(long n) { + if(token == null) { + Token t = NewRelic.getAgent().getTransaction().getToken(); + if(t != null && t.isActive()) { + token = t; + } else if(t != null) { + t.expire(); + t = null; + } + } + Weaver.callOriginal(); + } + + protected void onCancel() { + if(token != null) { + token.expire(); + token = null; + } + Weaver.callOriginal(); + } + + void runCancel() { + if(token != null) { + token.expire(); + token = null; + } + Weaver.callOriginal(); + } +} diff --git a/hivemq-mqtt-client-1.1/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.java b/hivemq-mqtt-client-1.1/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.java new file mode 100644 index 0000000..49ad5a7 --- /dev/null +++ b/hivemq-mqtt-client-1.1/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.java @@ -0,0 +1,40 @@ +package com.hivemq.client.internal.mqtt.handler.publish.outgoing; + +import org.reactivestreams.Subscriber; + +import com.hivemq.client.internal.mqtt.MqttClientConfig; +import com.hivemq.client.internal.mqtt.message.publish.MqttPublish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Token; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.hivemq.client.SubscriberWrapper; + +import io.reactivex.Flowable; + +@Weave +public class MqttAckFlowable { + + + public MqttAckFlowable(MqttClientConfig clientConfig, Flowable publishFlowable) { + + } + + @Trace(async=true) + protected void subscribeActual(Subscriber subscriber) { + Token t = NewRelic.getAgent().getTransaction().getToken(); + Token token = null; + if(t != null && t.isActive()) { + token = t; + } else if(t != null) { + t.expire(); + t = null; + } + SubscriberWrapper wrapper = new SubscriberWrapper(subscriber, token); + subscriber = wrapper; + Weaver.callOriginal(); + + } +} diff --git a/hivemq-mqtt-client-1.1/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java b/hivemq-mqtt-client-1.1/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java new file mode 100644 index 0000000..d2423ce --- /dev/null +++ b/hivemq-mqtt-client-1.1/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java @@ -0,0 +1,65 @@ +package com.hivemq.client.internal.mqtt.message.publish; + +import java.util.function.Function; + +import com.hivemq.client.internal.mqtt.datatypes.MqttTopicImpl; +import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertiesImpl; +import com.newrelic.api.agent.DestinationType; +import com.newrelic.api.agent.MessageProduceParameters; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.hivemq.client.OutboundWrapper; + +@Weave +public class MqttPublishBuilder> { + + MqttTopicImpl topic = Weaver.callOriginal(); + + MqttUserPropertiesImpl userProperties = Weaver.callOriginal(); + + MqttPublishBuilder() {} + + MqttPublishBuilder(MqttPublish publish) { + + } + + MqttPublishBuilder(MqttPublishBuilder publishBuilder) { + + } + + @Weave + public static class Send

extends Base> { + + public Send(Function parentConsumer) { + + } + + @Trace(leaf=true) + public P send() { + OutboundWrapper wrapper = new OutboundWrapper(userProperties); + String topicName = topic.toString().replace('/', '_'); + MessageProduceParameters params = MessageProduceParameters.library("HiveMQ").destinationType(DestinationType.NAMED_TOPIC).destinationName(topicName).outboundHeaders(wrapper).build(); + NewRelic.getAgent().getTracedMethod().reportAsExternal(params); + userProperties = wrapper.getCurrent(); + return Weaver.callOriginal(); + } + } + + @Weave + private static class Base> extends MqttPublishBuilder { + + Base() {} + + @SuppressWarnings("unused") + Base(MqttPublish publish) { + super(publish); + } + } + + @Weave + public static class Default extends Base { + + } +} diff --git a/hivemq-mqtt-client-1.1/src/main/java/com/nr/instrumentation/hivemq/client/InboundWrapper.java b/hivemq-mqtt-client-1.1/src/main/java/com/nr/instrumentation/hivemq/client/InboundWrapper.java new file mode 100644 index 0000000..7cce04d --- /dev/null +++ b/hivemq-mqtt-client-1.1/src/main/java/com/nr/instrumentation/hivemq/client/InboundWrapper.java @@ -0,0 +1,40 @@ +package com.nr.instrumentation.hivemq.client; + +import java.util.List; + +import com.hivemq.client.mqtt.datatypes.MqttUtf8String; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; +import com.newrelic.api.agent.HeaderType; +import com.newrelic.api.agent.InboundHeaders; + +public class InboundWrapper implements InboundHeaders { + + Mqtt5UserProperties userProperties = null; + + public InboundWrapper(Mqtt5UserProperties userProperties) { + this.userProperties = userProperties; + } + + @Override + public String getHeader(String name) { + List list = userProperties.asList(); + MqttUtf8String searchFor = MqttUtf8String.of(name); + for(Mqtt5UserProperty property : list) { + MqttUtf8String propertyName = property.getName(); + if(propertyName.equals(searchFor)) { + MqttUtf8String value = property.getValue(); + if(value != null) { + return value.toString(); + } + } + } + return null; + } + + @Override + public HeaderType getHeaderType() { + return HeaderType.MESSAGE; + } + +} diff --git a/hivemq-mqtt-client-1.1/src/main/java/com/nr/instrumentation/hivemq/client/OutboundWrapper.java b/hivemq-mqtt-client-1.1/src/main/java/com/nr/instrumentation/hivemq/client/OutboundWrapper.java new file mode 100644 index 0000000..994739c --- /dev/null +++ b/hivemq-mqtt-client-1.1/src/main/java/com/nr/instrumentation/hivemq/client/OutboundWrapper.java @@ -0,0 +1,42 @@ +package com.nr.instrumentation.hivemq.client; + +import java.util.logging.Level; + +import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertiesImpl; +import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertyImpl; +import com.hivemq.client.internal.mqtt.datatypes.MqttUtf8StringImpl; +import com.hivemq.client.internal.util.collections.ImmutableList; +import com.hivemq.client.internal.util.collections.ImmutableList.Builder; +import com.newrelic.api.agent.HeaderType; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.OutboundHeaders; + +public class OutboundWrapper implements OutboundHeaders { + + MqttUserPropertiesImpl userProperties = null; + + public OutboundWrapper(MqttUserPropertiesImpl up) { + userProperties = up; + } + + @Override + public HeaderType getHeaderType() { + return HeaderType.MESSAGE; + } + + @Override + public void setHeader(String name, String value) { + MqttUtf8StringImpl nameStr = MqttUtf8StringImpl.of(name); + MqttUtf8StringImpl valueStr = MqttUtf8StringImpl.of(value); + MqttUserPropertyImpl property = new MqttUserPropertyImpl(nameStr, valueStr); + + Builder builder = ImmutableList.builder(); + ImmutableList list = builder.addAll(userProperties.asList()).add(property).build(); + userProperties = MqttUserPropertiesImpl.of(list); + NewRelic.getAgent().getLogger().log(Level.INFO,"Set outbound headers to: {0}",userProperties); + } + + public MqttUserPropertiesImpl getCurrent() { + return userProperties; + } +} diff --git a/hivemq-mqtt-client-1.1/src/main/java/com/nr/instrumentation/hivemq/client/PublisherAdapter.java b/hivemq-mqtt-client-1.1/src/main/java/com/nr/instrumentation/hivemq/client/PublisherAdapter.java new file mode 100644 index 0000000..d7bf4e5 --- /dev/null +++ b/hivemq-mqtt-client-1.1/src/main/java/com/nr/instrumentation/hivemq/client/PublisherAdapter.java @@ -0,0 +1,100 @@ +package com.nr.instrumentation.hivemq.client; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; + +import com.hivemq.client.internal.mqtt.datatypes.MqttTopicImpl; +import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertiesImpl; +import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertyImpl; +import com.hivemq.client.internal.mqtt.datatypes.MqttUtf8StringImpl; +import com.hivemq.client.internal.mqtt.message.publish.MqttPublish; +import com.hivemq.client.internal.util.collections.ImmutableList; +import com.hivemq.client.mqtt.datatypes.MqttTopic; +import com.hivemq.client.mqtt.datatypes.MqttUtf8String; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Token; +import com.newrelic.api.agent.Trace; + +import io.reactivex.functions.Function; + +public class PublisherAdapter implements Function { + + + private static boolean isTransformed = false; + + private Token token = null; + + public PublisherAdapter() { + token = NewRelic.getAgent().getTransaction().getToken(); + if(!isTransformed) { + isTransformed = true; + AgentBridge.instrumentation.retransformUninstrumentedClass(getClass()); + } + } + + @Override + @Trace(async=true) + public Mqtt5Publish apply(Mqtt5Publish source) throws Exception { + if(token != null) { + token.linkAndExpire(); + token = null; + } + NewRelic.getAgent().getLogger().log(Level.FINE, "Entering PublisherAdapter"); + Mqtt5UserProperties userProperties = source.getUserProperties(); + NewRelic.getAgent().getLogger().log(Level.FINE, "PublisherAdapter, initial userProps: {0}",userProperties); + + MqttUserPropertiesImpl userPropertiesNew = convertProperties(userProperties); + NewRelic.getAgent().getLogger().log(Level.FINE, "PublisherAdapter, converted userProps: {0}",userPropertiesNew); + + OutboundWrapper wrapper = new OutboundWrapper(userPropertiesNew); + NewRelic.getAgent().getTracedMethod().addOutboundRequestHeaders(wrapper); + userPropertiesNew = wrapper.getCurrent(); + NewRelic.getAgent().getLogger().log(Level.FINE, "PublisherAdapter, populated userProps: {0}",userPropertiesNew); + + MqttTopic topic = source.getTopic(); + + MqttTopicImpl topicImpl = (MqttTopicImpl)topic; + ByteBuffer payload = source.getPayload().isPresent() ? source.getPayload().get() : null; + + long expiry = source.getMessageExpiryInterval().isPresent() ? source.getMessageExpiryInterval().getAsLong() : MqttPublish.NO_MESSAGE_EXPIRY; + + Mqtt5PayloadFormatIndicator format = source.getPayloadFormatIndicator().isPresent() ? source.getPayloadFormatIndicator().get() : null; + + MqttUtf8String ct = source.getContentType().isPresent() ? source.getContentType().get() : null; + + MqttTopicImpl response = source.getResponseTopic().isPresent() ? (MqttTopicImpl)source.getResponseTopic().get() : null; + + ByteBuffer correlationData = source.getCorrelationData().isPresent() ? source.getCorrelationData().get() : null; + MqttPublish pub = new MqttPublish(topicImpl, payload, source.getQos(), source.isRetain(), expiry, format, (MqttUtf8StringImpl) ct, response, correlationData, userPropertiesNew); + return pub; + } + + private MqttUserPropertiesImpl convertProperties(Mqtt5UserProperties props) { + MqttUserPropertiesImpl userProps = null; + + + + if(props instanceof MqttUserPropertiesImpl) { + userProps = (MqttUserPropertiesImpl)props; + } else { + List properties = props.asList(); + List list2 = new ArrayList(); + for(Mqtt5UserProperty prop : properties) { + + MqttUserPropertyImpl uProp = MqttUserPropertyImpl.of(prop.getName().toString(), prop.getValue().toString()); + list2.add(uProp); + } + ImmutableList list = ImmutableList.copyOf(list2); + userProps = MqttUserPropertiesImpl.of(list); + } + + return userProps; + } +} diff --git a/hivemq-mqtt-client-1.1/src/main/java/com/nr/instrumentation/hivemq/client/SubscriberWrapper.java b/hivemq-mqtt-client-1.1/src/main/java/com/nr/instrumentation/hivemq/client/SubscriberWrapper.java new file mode 100644 index 0000000..86acb42 --- /dev/null +++ b/hivemq-mqtt-client-1.1/src/main/java/com/nr/instrumentation/hivemq/client/SubscriberWrapper.java @@ -0,0 +1,75 @@ +package com.nr.instrumentation.hivemq.client; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.api.agent.DestinationType; +import com.newrelic.api.agent.MessageProduceParameters; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Segment; +import com.newrelic.api.agent.Token; +import com.newrelic.api.agent.Trace; + +public class SubscriberWrapper implements Subscriber { + + private Subscriber delegate = null; + private Token token = null; + private static boolean isTransformed = false; + private Segment segment = null; + + public SubscriberWrapper(Subscriber d, Token t) { + delegate = d; + token = t; + segment = NewRelic.getAgent().getTransaction().startSegment("HiveMQ-Publish"); + if(!isTransformed) { + isTransformed = true; + AgentBridge.instrumentation.retransformUninstrumentedClass(getClass()); + } + } + + @Override + public void onSubscribe(Subscription s) { + delegate.onSubscribe(s); + } + + @Override + @Trace(async=true) + public void onNext(Mqtt5PublishResult t) { + if(token != null) { + token.link(); + } + Mqtt5Publish result = t.getPublish(); + String topicName = result.getTopic().toString(); + MessageProduceParameters params = MessageProduceParameters.library("HiveMQ").destinationType(DestinationType.NAMED_TOPIC).destinationName(topicName).outboundHeaders(null).build(); + if(segment != null) { + segment.reportAsExternal(params); + segment.end(); + segment = null; + } else { + NewRelic.getAgent().getTracedMethod().reportAsExternal(params); + } + delegate.onNext(t); + } + + @Override + public void onError(Throwable t) { + if(token != null) { + token.expire(); + token = null; + } + delegate.onError(t); + } + + @Override + public void onComplete() { + if(token != null) { + token.expire(); + token = null; + } + delegate.onComplete(); + } + +} diff --git a/hivemq-mqtt-client-1.2/bin/.gitignore b/hivemq-mqtt-client-1.2/bin/.gitignore new file mode 100644 index 0000000..7eed456 --- /dev/null +++ b/hivemq-mqtt-client-1.2/bin/.gitignore @@ -0,0 +1,2 @@ +/main/ +/test/ diff --git a/hivemq-mqtt-client-1.2/build.gradle b/hivemq-mqtt-client-1.2/build.gradle new file mode 100644 index 0000000..d124120 --- /dev/null +++ b/hivemq-mqtt-client-1.2/build.gradle @@ -0,0 +1,30 @@ + +// Build.gradle generated for instrumentation module hivemq-mqtt-client-1.2 + +apply plugin: 'java' + +dependencies { + implementation 'com.hivemq:hivemq-mqtt-client:1.2.0' + + // New Relic Java Agent dependencies + implementation 'com.newrelic.agent.java:newrelic-agent:6.0.0' + implementation 'com.newrelic.agent.java:newrelic-api:6.0.0' + implementation fileTree(include: ['*.jar'], dir: '../libs') +} + +jar { + manifest { + attributes 'Implementation-Title': 'com.newrelic.instrumentation.hivemq-mqtt-client-1.2' + attributes 'Implementation-Vendor': 'New Relic' + attributes 'Implementation-Vendor-Id': 'com.newrelic' + attributes 'Implementation-Version': 1.0 + } +} + +verifyInstrumentation { + // Verifier plugin documentation: + // https://github.com/newrelic/newrelic-gradle-verify-instrumentation + // Example: + // passes 'javax.servlet:servlet-api:[2.2,2.5]' + // exclude 'javax.servlet:servlet-api:2.4.public_draft' +} \ No newline at end of file diff --git a/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/MqttRxClient.class b/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/MqttRxClient.class new file mode 100644 index 0000000..5b5042c Binary files /dev/null and b/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/MqttRxClient.class differ diff --git a/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.class b/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.class new file mode 100644 index 0000000..0690fe4 Binary files /dev/null and b/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.class differ diff --git a/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.class b/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.class new file mode 100644 index 0000000..4fb980f Binary files /dev/null and b/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.class differ diff --git a/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Base.class b/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Base.class new file mode 100644 index 0000000..20172fa Binary files /dev/null and b/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Base.class differ diff --git a/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Default.class b/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Default.class new file mode 100644 index 0000000..9a9618b Binary files /dev/null and b/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Default.class differ diff --git a/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Send.class b/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Send.class new file mode 100644 index 0000000..1268769 Binary files /dev/null and b/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder$Send.class differ diff --git a/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.class b/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.class new file mode 100644 index 0000000..890b309 Binary files /dev/null and b/hivemq-mqtt-client-1.2/build/classes/java/main/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.class differ diff --git a/hivemq-mqtt-client-1.2/build/classes/java/main/com/nr/instrumentation/hivemq/client/InboundWrapper.class b/hivemq-mqtt-client-1.2/build/classes/java/main/com/nr/instrumentation/hivemq/client/InboundWrapper.class new file mode 100644 index 0000000..0126863 Binary files /dev/null and b/hivemq-mqtt-client-1.2/build/classes/java/main/com/nr/instrumentation/hivemq/client/InboundWrapper.class differ diff --git a/hivemq-mqtt-client-1.2/build/classes/java/main/com/nr/instrumentation/hivemq/client/Mqtt5PublisherWrapper.class b/hivemq-mqtt-client-1.2/build/classes/java/main/com/nr/instrumentation/hivemq/client/Mqtt5PublisherWrapper.class new file mode 100644 index 0000000..5d519e7 Binary files /dev/null and b/hivemq-mqtt-client-1.2/build/classes/java/main/com/nr/instrumentation/hivemq/client/Mqtt5PublisherWrapper.class differ diff --git a/hivemq-mqtt-client-1.2/build/classes/java/main/com/nr/instrumentation/hivemq/client/OutboundWrapper.class b/hivemq-mqtt-client-1.2/build/classes/java/main/com/nr/instrumentation/hivemq/client/OutboundWrapper.class new file mode 100644 index 0000000..21c137f Binary files /dev/null and b/hivemq-mqtt-client-1.2/build/classes/java/main/com/nr/instrumentation/hivemq/client/OutboundWrapper.class differ diff --git a/hivemq-mqtt-client-1.2/build/classes/java/main/com/nr/instrumentation/hivemq/client/PublisherAdapter.class b/hivemq-mqtt-client-1.2/build/classes/java/main/com/nr/instrumentation/hivemq/client/PublisherAdapter.class new file mode 100644 index 0000000..0fc2cd7 Binary files /dev/null and b/hivemq-mqtt-client-1.2/build/classes/java/main/com/nr/instrumentation/hivemq/client/PublisherAdapter.class differ diff --git a/hivemq-mqtt-client-1.2/build/classes/java/main/com/nr/instrumentation/hivemq/client/SubscriberWrapper.class b/hivemq-mqtt-client-1.2/build/classes/java/main/com/nr/instrumentation/hivemq/client/SubscriberWrapper.class new file mode 100644 index 0000000..b86d225 Binary files /dev/null and b/hivemq-mqtt-client-1.2/build/classes/java/main/com/nr/instrumentation/hivemq/client/SubscriberWrapper.class differ diff --git a/hivemq-mqtt-client-1.2/build/libs/hivemq-mqtt-client-1.2.jar b/hivemq-mqtt-client-1.2/build/libs/hivemq-mqtt-client-1.2.jar new file mode 100644 index 0000000..ebb7a91 Binary files /dev/null and b/hivemq-mqtt-client-1.2/build/libs/hivemq-mqtt-client-1.2.jar differ diff --git a/hivemq-mqtt-client-1.2/build/tmp/compileJava/source-classes-mapping.txt b/hivemq-mqtt-client-1.2/build/tmp/compileJava/source-classes-mapping.txt new file mode 100644 index 0000000..bc486fd --- /dev/null +++ b/hivemq-mqtt-client-1.2/build/tmp/compileJava/source-classes-mapping.txt @@ -0,0 +1,21 @@ +com/nr/instrumentation/hivemq/client/Mqtt5PublisherWrapper.java + com.nr.instrumentation.hivemq.client.Mqtt5PublisherWrapper +com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java + com.hivemq.client.internal.mqtt.message.publish.MqttPublishBuilder + com.hivemq.client.internal.mqtt.message.publish.MqttPublishBuilder$Base + com.hivemq.client.internal.mqtt.message.publish.MqttPublishBuilder$Default + com.hivemq.client.internal.mqtt.message.publish.MqttPublishBuilder$Send +com/nr/instrumentation/hivemq/client/OutboundWrapper.java + com.nr.instrumentation.hivemq.client.OutboundWrapper +com/nr/instrumentation/hivemq/client/PublisherAdapter.java + com.nr.instrumentation.hivemq.client.PublisherAdapter +com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java + com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingPublishFlow +com/nr/instrumentation/hivemq/client/SubscriberWrapper.java + com.nr.instrumentation.hivemq.client.SubscriberWrapper +com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.java + com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckFlowable +com/hivemq/client/internal/mqtt/MqttRxClient.java + com.hivemq.client.internal.mqtt.MqttRxClient +com/nr/instrumentation/hivemq/client/InboundWrapper.java + com.nr.instrumentation.hivemq.client.InboundWrapper diff --git a/hivemq-mqtt-client-1.2/build/tmp/jar/MANIFEST.MF b/hivemq-mqtt-client-1.2/build/tmp/jar/MANIFEST.MF new file mode 100644 index 0000000..87c9933 --- /dev/null +++ b/hivemq-mqtt-client-1.2/build/tmp/jar/MANIFEST.MF @@ -0,0 +1,7 @@ +Manifest-Version: 1.0 +Implementation-Title: com.newrelic.instrumentation.hivemq-mqtt-client- + 1.2 +Implementation-Version: 1.0 +Implementation-Vendor-Id: com.newrelic +Implementation-Vendor: New Relic + diff --git a/hivemq-mqtt-client-1.2/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java b/hivemq-mqtt-client-1.2/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java new file mode 100644 index 0000000..45b78cf --- /dev/null +++ b/hivemq-mqtt-client-1.2/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java @@ -0,0 +1,21 @@ +package com.hivemq.client.internal.mqtt; + +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.hivemq.client.PublisherAdapter; + +import io.reactivex.Flowable; + +@Weave +public abstract class MqttRxClient { + + @Trace + public Flowable publish(Flowable publishFlowable) { + + publishFlowable = publishFlowable.map(new PublisherAdapter()); + return Weaver.callOriginal(); + } +} diff --git a/hivemq-mqtt-client-1.2/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java b/hivemq-mqtt-client-1.2/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java new file mode 100644 index 0000000..c5a5960 --- /dev/null +++ b/hivemq-mqtt-client-1.2/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java @@ -0,0 +1,91 @@ +package com.hivemq.client.internal.mqtt.handler.publish.incoming; + +import org.reactivestreams.Subscriber; + +import com.hivemq.client.internal.mqtt.MqttClientConfig; +import com.hivemq.client.internal.mqtt.handler.util.FlowWithEventLoop; +import com.hivemq.client.mqtt.datatypes.MqttTopic; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.newrelic.api.agent.DestinationType; +import com.newrelic.api.agent.MessageConsumeParameters; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Token; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.weaver.MatchType; +import com.newrelic.api.agent.weaver.NewField; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.hivemq.client.InboundWrapper; + +@Weave(type=MatchType.BaseClass) +abstract class MqttIncomingPublishFlow extends FlowWithEventLoop { + + @NewField + private Token token = null; + + MqttIncomingPublishFlow(final Subscriber subscriber, final MqttClientConfig clientConfig,final MqttIncomingQosHandler incomingQosHandler, final boolean manualAcknowledgement) { + super(clientConfig); + } + + @Trace(async=true) + public void onNext(Mqtt5Publish result) { + if(token !=null) { + token.link(); + } + MqttTopic topic = result.getTopic(); + String topicName = topic.toString().replace('/', '_'); + Mqtt5UserProperties userProperties = result.getUserProperties(); + MessageConsumeParameters params = MessageConsumeParameters.library("HiveMQ").destinationType(DestinationType.NAMED_TOPIC).destinationName(topicName).inboundHeaders(new InboundWrapper(userProperties)).build(); + + NewRelic.getAgent().getTracedMethod().reportAsExternal(params); + Weaver.callOriginal(); + } + + public void onComplete() { + if(token != null) { + token.expire(); + token = null; + } + Weaver.callOriginal(); + } + + public void onError(Throwable t) { + NewRelic.noticeError(t); + if(token != null) { + token.expire(); + token = null; + } + Weaver.callOriginal(); + } + + @Trace + public void request(long n) { + if(token == null) { + Token t = NewRelic.getAgent().getTransaction().getToken(); + if(t != null && t.isActive()) { + token = t; + } else if(t != null) { + t.expire(); + t = null; + } + } + Weaver.callOriginal(); + } + + protected void onCancel() { + if(token != null) { + token.expire(); + token = null; + } + Weaver.callOriginal(); + } + + void runCancel() { + if(token != null) { + token.expire(); + token = null; + } + Weaver.callOriginal(); + } +} diff --git a/hivemq-mqtt-client-1.2/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.java b/hivemq-mqtt-client-1.2/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.java new file mode 100644 index 0000000..49ad5a7 --- /dev/null +++ b/hivemq-mqtt-client-1.2/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.java @@ -0,0 +1,40 @@ +package com.hivemq.client.internal.mqtt.handler.publish.outgoing; + +import org.reactivestreams.Subscriber; + +import com.hivemq.client.internal.mqtt.MqttClientConfig; +import com.hivemq.client.internal.mqtt.message.publish.MqttPublish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Token; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.hivemq.client.SubscriberWrapper; + +import io.reactivex.Flowable; + +@Weave +public class MqttAckFlowable { + + + public MqttAckFlowable(MqttClientConfig clientConfig, Flowable publishFlowable) { + + } + + @Trace(async=true) + protected void subscribeActual(Subscriber subscriber) { + Token t = NewRelic.getAgent().getTransaction().getToken(); + Token token = null; + if(t != null && t.isActive()) { + token = t; + } else if(t != null) { + t.expire(); + t = null; + } + SubscriberWrapper wrapper = new SubscriberWrapper(subscriber, token); + subscriber = wrapper; + Weaver.callOriginal(); + + } +} diff --git a/hivemq-mqtt-client-1.2/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java b/hivemq-mqtt-client-1.2/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java new file mode 100644 index 0000000..d2423ce --- /dev/null +++ b/hivemq-mqtt-client-1.2/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java @@ -0,0 +1,65 @@ +package com.hivemq.client.internal.mqtt.message.publish; + +import java.util.function.Function; + +import com.hivemq.client.internal.mqtt.datatypes.MqttTopicImpl; +import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertiesImpl; +import com.newrelic.api.agent.DestinationType; +import com.newrelic.api.agent.MessageProduceParameters; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.hivemq.client.OutboundWrapper; + +@Weave +public class MqttPublishBuilder> { + + MqttTopicImpl topic = Weaver.callOriginal(); + + MqttUserPropertiesImpl userProperties = Weaver.callOriginal(); + + MqttPublishBuilder() {} + + MqttPublishBuilder(MqttPublish publish) { + + } + + MqttPublishBuilder(MqttPublishBuilder publishBuilder) { + + } + + @Weave + public static class Send

extends Base> { + + public Send(Function parentConsumer) { + + } + + @Trace(leaf=true) + public P send() { + OutboundWrapper wrapper = new OutboundWrapper(userProperties); + String topicName = topic.toString().replace('/', '_'); + MessageProduceParameters params = MessageProduceParameters.library("HiveMQ").destinationType(DestinationType.NAMED_TOPIC).destinationName(topicName).outboundHeaders(wrapper).build(); + NewRelic.getAgent().getTracedMethod().reportAsExternal(params); + userProperties = wrapper.getCurrent(); + return Weaver.callOriginal(); + } + } + + @Weave + private static class Base> extends MqttPublishBuilder { + + Base() {} + + @SuppressWarnings("unused") + Base(MqttPublish publish) { + super(publish); + } + } + + @Weave + public static class Default extends Base { + + } +} diff --git a/hivemq-mqtt-client-1.2/src/main/java/com/nr/instrumentation/hivemq/client/InboundWrapper.java b/hivemq-mqtt-client-1.2/src/main/java/com/nr/instrumentation/hivemq/client/InboundWrapper.java new file mode 100644 index 0000000..7cce04d --- /dev/null +++ b/hivemq-mqtt-client-1.2/src/main/java/com/nr/instrumentation/hivemq/client/InboundWrapper.java @@ -0,0 +1,40 @@ +package com.nr.instrumentation.hivemq.client; + +import java.util.List; + +import com.hivemq.client.mqtt.datatypes.MqttUtf8String; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; +import com.newrelic.api.agent.HeaderType; +import com.newrelic.api.agent.InboundHeaders; + +public class InboundWrapper implements InboundHeaders { + + Mqtt5UserProperties userProperties = null; + + public InboundWrapper(Mqtt5UserProperties userProperties) { + this.userProperties = userProperties; + } + + @Override + public String getHeader(String name) { + List list = userProperties.asList(); + MqttUtf8String searchFor = MqttUtf8String.of(name); + for(Mqtt5UserProperty property : list) { + MqttUtf8String propertyName = property.getName(); + if(propertyName.equals(searchFor)) { + MqttUtf8String value = property.getValue(); + if(value != null) { + return value.toString(); + } + } + } + return null; + } + + @Override + public HeaderType getHeaderType() { + return HeaderType.MESSAGE; + } + +} diff --git a/hivemq-mqtt-client-1.2/src/main/java/com/nr/instrumentation/hivemq/client/Mqtt5PublisherWrapper.java b/hivemq-mqtt-client-1.2/src/main/java/com/nr/instrumentation/hivemq/client/Mqtt5PublisherWrapper.java new file mode 100644 index 0000000..b7b72d1 --- /dev/null +++ b/hivemq-mqtt-client-1.2/src/main/java/com/nr/instrumentation/hivemq/client/Mqtt5PublisherWrapper.java @@ -0,0 +1,103 @@ +package com.nr.instrumentation.hivemq.client; + +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.OptionalLong; + +import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertiesImpl; +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.datatypes.MqttTopic; +import com.hivemq.client.mqtt.datatypes.MqttUtf8String; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder.Complete; +import com.newrelic.api.agent.NewRelic; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5WillPublish; + +public class Mqtt5PublisherWrapper implements Mqtt5Publish { + + private Mqtt5Publish delegate = null; + + public Mqtt5PublisherWrapper(Mqtt5Publish p) { + delegate = p; + } + + @Override + public MqttTopic getTopic() { + return delegate.getTopic(); + } + + @Override + public Optional getPayload() { + return delegate.getPayload(); + } + + @Override + public byte[] getPayloadAsBytes() { + return delegate.getPayloadAsBytes(); + } + + @Override + public MqttQos getQos() { + return delegate.getQos(); + } + + @Override + public boolean isRetain() { + return delegate.isRetain(); + } + + @Override + public OptionalLong getMessageExpiryInterval() { + return delegate.getMessageExpiryInterval(); + } + + @Override + public Optional getPayloadFormatIndicator() { + return delegate.getPayloadFormatIndicator(); + } + + @Override + public Optional getContentType() { + return delegate.getContentType(); + } + + @Override + public Optional getResponseTopic() { + return delegate.getResponseTopic(); + } + + @Override + public Optional getCorrelationData() { + return delegate.getCorrelationData(); + } + + @Override + public Mqtt5UserProperties getUserProperties() { + Mqtt5UserProperties props = delegate.getUserProperties(); + if(props instanceof MqttUserPropertiesImpl) { + MqttUserPropertiesImpl propsImpl = (MqttUserPropertiesImpl)props; + OutboundWrapper wrapper = new OutboundWrapper(propsImpl); + NewRelic.getAgent().getTracedMethod().addOutboundRequestHeaders(wrapper); + return wrapper.getCurrent(); + } + return props; + } + + @Override + public void acknowledge() { + delegate.acknowledge(); + } + + @Override + public Mqtt5WillPublish asWill() { + return delegate.asWill(); + } + + @Override + public Complete extend() { + return delegate.extend(); + } + +} diff --git a/hivemq-mqtt-client-1.2/src/main/java/com/nr/instrumentation/hivemq/client/OutboundWrapper.java b/hivemq-mqtt-client-1.2/src/main/java/com/nr/instrumentation/hivemq/client/OutboundWrapper.java new file mode 100644 index 0000000..994739c --- /dev/null +++ b/hivemq-mqtt-client-1.2/src/main/java/com/nr/instrumentation/hivemq/client/OutboundWrapper.java @@ -0,0 +1,42 @@ +package com.nr.instrumentation.hivemq.client; + +import java.util.logging.Level; + +import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertiesImpl; +import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertyImpl; +import com.hivemq.client.internal.mqtt.datatypes.MqttUtf8StringImpl; +import com.hivemq.client.internal.util.collections.ImmutableList; +import com.hivemq.client.internal.util.collections.ImmutableList.Builder; +import com.newrelic.api.agent.HeaderType; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.OutboundHeaders; + +public class OutboundWrapper implements OutboundHeaders { + + MqttUserPropertiesImpl userProperties = null; + + public OutboundWrapper(MqttUserPropertiesImpl up) { + userProperties = up; + } + + @Override + public HeaderType getHeaderType() { + return HeaderType.MESSAGE; + } + + @Override + public void setHeader(String name, String value) { + MqttUtf8StringImpl nameStr = MqttUtf8StringImpl.of(name); + MqttUtf8StringImpl valueStr = MqttUtf8StringImpl.of(value); + MqttUserPropertyImpl property = new MqttUserPropertyImpl(nameStr, valueStr); + + Builder builder = ImmutableList.builder(); + ImmutableList list = builder.addAll(userProperties.asList()).add(property).build(); + userProperties = MqttUserPropertiesImpl.of(list); + NewRelic.getAgent().getLogger().log(Level.INFO,"Set outbound headers to: {0}",userProperties); + } + + public MqttUserPropertiesImpl getCurrent() { + return userProperties; + } +} diff --git a/hivemq-mqtt-client-1.2/src/main/java/com/nr/instrumentation/hivemq/client/PublisherAdapter.java b/hivemq-mqtt-client-1.2/src/main/java/com/nr/instrumentation/hivemq/client/PublisherAdapter.java new file mode 100644 index 0000000..8a96e65 --- /dev/null +++ b/hivemq-mqtt-client-1.2/src/main/java/com/nr/instrumentation/hivemq/client/PublisherAdapter.java @@ -0,0 +1,103 @@ +package com.nr.instrumentation.hivemq.client; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertiesImpl; +import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertyImpl; +import com.hivemq.client.internal.util.collections.ImmutableList; +import com.hivemq.client.mqtt.datatypes.MqttTopic; +import com.hivemq.client.mqtt.datatypes.MqttUtf8String; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Token; +import com.newrelic.api.agent.Trace; + +import io.reactivex.functions.Function; + +public class PublisherAdapter implements Function { + + + private static boolean isTransformed = false; + + private Token token = null; + + public PublisherAdapter() { + token = NewRelic.getAgent().getTransaction().getToken(); + if(!isTransformed) { + isTransformed = true; + AgentBridge.instrumentation.retransformUninstrumentedClass(getClass()); + } + } + + @Override + @Trace(async=true) + public Mqtt5Publish apply(Mqtt5Publish source) throws Exception { + if(token != null) { + token.linkAndExpire(); + token = null; + } + + Optional contentOption = source.getContentType(); + MqttUtf8String ct = contentOption.isPresent() ? contentOption.get() : null; + Optional corrOption = source.getCorrelationData(); + ByteBuffer correlationData = corrOption.isPresent() ? corrOption.get() : null; + OptionalLong expiryOption = source.getMessageExpiryInterval(); + Long expiry = expiryOption.isPresent() ? expiryOption.getAsLong() : null; + Optional payloadOption = source.getPayload(); + ByteBuffer payload = payloadOption.isPresent() ? payloadOption.get() : null; + + Optional formatOption = source.getPayloadFormatIndicator(); + Mqtt5PayloadFormatIndicator payloadFormatIndicator = formatOption.isPresent() ? formatOption.get() : null; + + Optional responseOption = source.getResponseTopic(); + MqttTopic response = responseOption.isPresent() ? responseOption.get() : null; + + if(expiry != null) { + return Mqtt5Publish.builder().topic(source.getTopic()).contentType(ct).correlationData(correlationData).messageExpiryInterval(expiry) + .payload(payload).payloadFormatIndicator(payloadFormatIndicator).qos(source.getQos()).responseTopic(response).retain(source.isRetain()) + .userProperties(getUserProperties(source)).build(); + } else { + return Mqtt5Publish.builder().topic(source.getTopic()).contentType(ct).correlationData(correlationData).noMessageExpiry() + .payload(payload).payloadFormatIndicator(payloadFormatIndicator).qos(source.getQos()).responseTopic(response).retain(source.isRetain()) + .userProperties(getUserProperties(source)).build(); + } + } + + private Mqtt5UserProperties getUserProperties(Mqtt5Publish delegate) { + Mqtt5UserProperties props = delegate.getUserProperties(); + if(props == null) { + MqttUserPropertiesImpl propsImpl = MqttUserPropertiesImpl.NO_USER_PROPERTIES; + OutboundWrapper wrapper = new OutboundWrapper(propsImpl); + NewRelic.getAgent().getTracedMethod().addOutboundRequestHeaders(wrapper); + return wrapper.getCurrent(); + + } + if(props instanceof MqttUserPropertiesImpl) { + MqttUserPropertiesImpl propsImpl = (MqttUserPropertiesImpl)props; + OutboundWrapper wrapper = new OutboundWrapper(propsImpl); + NewRelic.getAgent().getTracedMethod().addOutboundRequestHeaders(wrapper); + return wrapper.getCurrent(); + } else { + List current = props.asList(); + List list = new ArrayList(); + for(Mqtt5UserProperty prop : current) { + MqttUserPropertyImpl newProp = MqttUserPropertyImpl.of(prop.getName().toString(), prop.getValue().toString()); + list.add(newProp); + } + ImmutableList iList = ImmutableList.copyOf(list); + MqttUserPropertiesImpl propsImpl = MqttUserPropertiesImpl.of(iList); + OutboundWrapper wrapper = new OutboundWrapper(propsImpl); + NewRelic.getAgent().getTracedMethod().addOutboundRequestHeaders(wrapper); + return wrapper.getCurrent(); + } + } + +} diff --git a/hivemq-mqtt-client-1.2/src/main/java/com/nr/instrumentation/hivemq/client/SubscriberWrapper.java b/hivemq-mqtt-client-1.2/src/main/java/com/nr/instrumentation/hivemq/client/SubscriberWrapper.java new file mode 100644 index 0000000..86acb42 --- /dev/null +++ b/hivemq-mqtt-client-1.2/src/main/java/com/nr/instrumentation/hivemq/client/SubscriberWrapper.java @@ -0,0 +1,75 @@ +package com.nr.instrumentation.hivemq.client; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.api.agent.DestinationType; +import com.newrelic.api.agent.MessageProduceParameters; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Segment; +import com.newrelic.api.agent.Token; +import com.newrelic.api.agent.Trace; + +public class SubscriberWrapper implements Subscriber { + + private Subscriber delegate = null; + private Token token = null; + private static boolean isTransformed = false; + private Segment segment = null; + + public SubscriberWrapper(Subscriber d, Token t) { + delegate = d; + token = t; + segment = NewRelic.getAgent().getTransaction().startSegment("HiveMQ-Publish"); + if(!isTransformed) { + isTransformed = true; + AgentBridge.instrumentation.retransformUninstrumentedClass(getClass()); + } + } + + @Override + public void onSubscribe(Subscription s) { + delegate.onSubscribe(s); + } + + @Override + @Trace(async=true) + public void onNext(Mqtt5PublishResult t) { + if(token != null) { + token.link(); + } + Mqtt5Publish result = t.getPublish(); + String topicName = result.getTopic().toString(); + MessageProduceParameters params = MessageProduceParameters.library("HiveMQ").destinationType(DestinationType.NAMED_TOPIC).destinationName(topicName).outboundHeaders(null).build(); + if(segment != null) { + segment.reportAsExternal(params); + segment.end(); + segment = null; + } else { + NewRelic.getAgent().getTracedMethod().reportAsExternal(params); + } + delegate.onNext(t); + } + + @Override + public void onError(Throwable t) { + if(token != null) { + token.expire(); + token = null; + } + delegate.onError(t); + } + + @Override + public void onComplete() { + if(token != null) { + token.expire(); + token = null; + } + delegate.onComplete(); + } + +} diff --git a/settings.gradle b/settings.gradle index 6014da5..69970a2 100644 --- a/settings.gradle +++ b/settings.gradle @@ -8,3 +8,6 @@ git { } } rootProject.name = 'java-instrumentation-template' +include 'hivemq-mqtt-client-1.0' +include 'hivemq-mqtt-client-1.1' +include 'hivemq-mqtt-client-1.2'