Skip to content

Commit

Permalink
initial checkin
Browse files Browse the repository at this point in the history
  • Loading branch information
dhilpipre committed Feb 22, 2021
1 parent b56d74a commit 547ab81
Show file tree
Hide file tree
Showing 77 changed files with 1,743 additions and 0 deletions.
2 changes: 2 additions & 0 deletions hivemq-mqtt-client-1.0/bin/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/main/
/test/
30 changes: 30 additions & 0 deletions hivemq-mqtt-client-1.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@

// Build.gradle generated for instrumentation module hivemq-mqtt-client-1.0

apply plugin: 'java'

dependencies {
implementation 'com.hivemq:hivemq-mqtt-client:1.0.0'

// New Relic Java Agent dependencies
implementation 'com.newrelic.agent.java:newrelic-agent:6.0.0'
implementation 'com.newrelic.agent.java:newrelic-api:6.0.0'
implementation fileTree(include: ['*.jar'], dir: '../libs')
}

jar {
manifest {
attributes 'Implementation-Title': 'com.newrelic.instrumentation.hivemq-mqtt-client-1.0'
attributes 'Implementation-Vendor': 'New Relic'
attributes 'Implementation-Vendor-Id': 'com.newrelic'
attributes 'Implementation-Version': 1.0
}
}

verifyInstrumentation {
// Verifier plugin documentation:
// https://github.com/newrelic/newrelic-gradle-verify-instrumentation
// Example:
// passes 'javax.servlet:servlet-api:[2.2,2.5]'
// exclude 'javax.servlet:servlet-api:2.4.public_draft'
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
com/nr/instrumentation/hivemq/client/Mqtt5PublisherWrapper.java
com.nr.instrumentation.hivemq.client.Mqtt5PublisherWrapper
com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java
com.hivemq.client.internal.mqtt.message.publish.MqttPublishBuilder
com.hivemq.client.internal.mqtt.message.publish.MqttPublishBuilder$Base
com.hivemq.client.internal.mqtt.message.publish.MqttPublishBuilder$Default
com.hivemq.client.internal.mqtt.message.publish.MqttPublishBuilder$Send
com/nr/instrumentation/hivemq/client/OutboundWrapper.java
com.nr.instrumentation.hivemq.client.OutboundWrapper
com/nr/instrumentation/hivemq/client/PublisherAdapter.java
com.nr.instrumentation.hivemq.client.PublisherAdapter
com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java
com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingPublishFlow
com/nr/instrumentation/hivemq/client/SubscriberWrapper.java
com.nr.instrumentation.hivemq.client.SubscriberWrapper
com/hivemq/client/internal/mqtt/MqttRxClient.java
com.hivemq.client.internal.mqtt.MqttRxClient
com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttIncomingAckFlowable.java
com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttIncomingAckFlowable
com/nr/instrumentation/hivemq/client/InboundWrapper.java
com.nr.instrumentation.hivemq.client.InboundWrapper
7 changes: 7 additions & 0 deletions hivemq-mqtt-client-1.0/build/tmp/jar/MANIFEST.MF
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Manifest-Version: 1.0
Implementation-Title: com.newrelic.instrumentation.hivemq-mqtt-client-
1.0
Implementation-Version: 1.0
Implementation-Vendor-Id: com.newrelic
Implementation-Vendor: New Relic

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.hivemq.client.internal.mqtt;

import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.nr.instrumentation.hivemq.client.PublisherAdapter;

import io.reactivex.Flowable;

@Weave
public abstract class MqttRxClient {

@Trace
public Flowable<Mqtt5PublishResult> publish(Flowable<Mqtt5Publish> publishFlowable) {

publishFlowable = publishFlowable.map(new PublisherAdapter());
return Weaver.callOriginal();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.hivemq.client.internal.mqtt.handler.publish.incoming;

import org.reactivestreams.Subscriber;

import com.hivemq.client.internal.mqtt.MqttClientConfig;
import com.hivemq.client.internal.mqtt.handler.util.FlowWithEventLoop;
import com.hivemq.client.mqtt.datatypes.MqttTopic;
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.newrelic.api.agent.DestinationType;
import com.newrelic.api.agent.MessageConsumeParameters;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Token;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.MatchType;
import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.nr.instrumentation.hivemq.client.InboundWrapper;

@Weave(type=MatchType.BaseClass)
public abstract class MqttIncomingPublishFlow extends FlowWithEventLoop {

@NewField
private Token token = null;

MqttIncomingPublishFlow(final Subscriber<? super Mqtt5Publish> subscriber, final MqttClientConfig clientConfig,final MqttIncomingQosHandler incomingQosHandler) {
super(clientConfig);
}

@Trace(async=true)
public void onNext(Mqtt5Publish result) {
if(token !=null) {
token.link();
}
MqttTopic topic = result.getTopic();
String topicName = topic.toString().replace('/', '_'); //Utils.getTopicName(topic);
Mqtt5UserProperties userProperties = result.getUserProperties();
MessageConsumeParameters params = MessageConsumeParameters.library("HiveMQ").destinationType(DestinationType.NAMED_TOPIC).destinationName(topicName).inboundHeaders(new InboundWrapper(userProperties)).build();

NewRelic.getAgent().getTracedMethod().reportAsExternal(params);
Weaver.callOriginal();
}

public void onComplete() {
if(token != null) {
token.expire();
token = null;
}
Weaver.callOriginal();
}

public void onError(Throwable t) {
NewRelic.noticeError(t);
if(token != null) {
token.expire();
token = null;
}
Weaver.callOriginal();
}

@Trace
public void request(long n) {
if(token == null) {
Token t = NewRelic.getAgent().getTransaction().getToken();
if(t != null && t.isActive()) {
token = t;
} else if(t != null) {
t.expire();
t = null;
}
}
Weaver.callOriginal();
}

protected void onCancel() {
if(token != null) {
token.expire();
token = null;
}
Weaver.callOriginal();
}

void runCancel() {
if(token != null) {
token.expire();
token = null;
}
Weaver.callOriginal();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.hivemq.client.internal.mqtt.handler.publish.outgoing;

import org.reactivestreams.Subscriber;

import com.hivemq.client.internal.mqtt.MqttClientConfig;
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Token;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.nr.instrumentation.hivemq.client.SubscriberWrapper;

import io.reactivex.Flowable;

@Weave
public class MqttIncomingAckFlowable {


public MqttIncomingAckFlowable(Flowable<MqttPublish> publishFlowable, MqttClientConfig clientConfig) {

}

@Trace(async=true)
protected void subscribeActual(Subscriber<? super Mqtt5PublishResult> subscriber) {
Token t = NewRelic.getAgent().getTransaction().getToken();
Token token = null;
if(t != null && t.isActive()) {
token = t;
} else if(t != null) {
t.expire();
t = null;
}
SubscriberWrapper wrapper = new SubscriberWrapper(subscriber, token);
subscriber = wrapper;
Weaver.callOriginal();

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.hivemq.client.internal.mqtt.message.publish;

import java.util.function.Function;

import com.hivemq.client.internal.mqtt.datatypes.MqttTopicImpl;
import com.hivemq.client.internal.mqtt.datatypes.MqttUserPropertiesImpl;
import com.newrelic.api.agent.DestinationType;
import com.newrelic.api.agent.MessageProduceParameters;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.nr.instrumentation.hivemq.client.OutboundWrapper;

@Weave
public class MqttPublishBuilder<B extends MqttPublishBuilder<B>> {

MqttTopicImpl topic = Weaver.callOriginal();

MqttUserPropertiesImpl userProperties = Weaver.callOriginal();

MqttPublishBuilder() {}

MqttPublishBuilder(MqttPublish publish) {

}

MqttPublishBuilder(MqttPublishBuilder<?> publishBuilder) {

}

@Weave
public static class Send<P> extends Base<Send<P>> {

public Send(Function<? super MqttPublish, P> parentConsumer) {

}

@Trace(leaf=true)
public P send() {
OutboundWrapper wrapper = new OutboundWrapper(userProperties);
String topicName = topic.toString().replace('/', '_');
MessageProduceParameters params = MessageProduceParameters.library("HiveMQ").destinationType(DestinationType.NAMED_TOPIC).destinationName(topicName).outboundHeaders(wrapper).build();
NewRelic.getAgent().getTracedMethod().reportAsExternal(params);
userProperties = wrapper.getCurrent();
return Weaver.callOriginal();
}
}

@Weave
private static class Base<B extends Base<B>> extends MqttPublishBuilder<B> {

Base() {}

@SuppressWarnings("unused")
Base(MqttPublish publish) {
super(publish);
}
}

@Weave
public static class Default extends Base<Default> {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.nr.instrumentation.hivemq.client;

import java.util.List;

import com.hivemq.client.mqtt.datatypes.MqttUtf8String;
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties;
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty;
import com.newrelic.api.agent.HeaderType;
import com.newrelic.api.agent.InboundHeaders;

public class InboundWrapper implements InboundHeaders {

Mqtt5UserProperties userProperties = null;

public InboundWrapper(Mqtt5UserProperties userProperties) {
this.userProperties = userProperties;
}

@Override
public String getHeader(String name) {
List<? extends Mqtt5UserProperty> list = userProperties.asList();
MqttUtf8String searchFor = MqttUtf8String.of(name);
for(Mqtt5UserProperty property : list) {
MqttUtf8String propertyName = property.getName();
if(propertyName.equals(searchFor)) {
MqttUtf8String value = property.getValue();
if(value != null) {
return value.toString();
}
}
}
return null;
}

@Override
public HeaderType getHeaderType() {
return HeaderType.MESSAGE;
}

}
Loading

0 comments on commit 547ab81

Please sign in to comment.