Skip to content

Commit

Permalink
publishing code
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanfmartinez committed Mar 26, 2018
0 parents commit 97e082b
Show file tree
Hide file tree
Showing 6 changed files with 561 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
target
*~
# Arquivos do eclipse
.classpath
.project
.settings
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
MQTT basic implementation for traccar

Requires traccar 3.16-SNAPSHOT after 2018-03-25

global configuration parameters :

extra.handlers -
should point to : "com.ivanfm.traccar.MQTTHandler"

mqtt.url
URL for mqtt server
default value tcp://localhost:1883
mqtt.clientid
clientid used to connect
default value traccar.mqtt.handler
mqtt.topicRoot
topic where traccar data will be published
default value /traccar/
mqtt.alarmTopic
topic where alarms will be published
default - does not publish alarms in other topic

device configuration parameters :

mqtt.alias
alias to be used instead of device name
default - name of device
mqtt.alarmTopic
topic where alarms will be published
default - does not publish alarms in other topic
mqtt.position.process.enabled
enabled/disable publishing for the device
default - true
mqtt.position.process.alarms.enabled
enable/disable publishing of alarms for the device
default - true
mqtt.geofence.GEOFENCE_ALIAS.topics
where to publish geofence state changes for device
multiple topics can be used separated by ":"
default - does not publish changes in other topics



107 changes: 107 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ivanfm</groupId>
<artifactId>ivanfm-traccar-mqtt</artifactId>
<packaging>jar</packaging>
<version>2.2.1</version>
<name>ivanfm-traccar-mqtt</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<slf4j.version>1.7.25</slf4j.version>
<junit.version>4.12</junit.version>
<velocity.version>1.7</velocity.version>
<maven-compiler-plugin.version>3.5.1</maven-compiler-plugin.version>
<paho-mqttv3.version>1.2.0</paho-mqttv3.version>
</properties>


<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<!-- or whatever version you use -->
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>

<archive>
<manifest>
</manifest>
</archive>

</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<excludeScope>provided</excludeScope>
</configuration>
</execution>
</executions>
</plugin>

</plugins>

</build>

<dependencies>

<!-- http://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
<!-- https://eclipse.org/paho/clients/java/ -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>${paho-mqttv3.version}</version>
</dependency>

<dependency>
<groupId>org.traccar</groupId>
<artifactId>traccar</artifactId>
<version>3.16-SNAPSHOT</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
116 changes: 116 additions & 0 deletions src/main/java/com/ivanfm/mqtt/MQTTPublisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package com.ivanfm.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.traccar.helper.Log;

public class MQTTPublisher implements MqttCallback {

public static final int AT_LEAST_ONCE = 1;

private final MqttAsyncClient mqtt;
private final String rootTopic;
private transient IMqttToken connectToken;

public MQTTPublisher(String url, String clientId, String rootTopic) throws MqttException {
this.rootTopic = rootTopic;
mqtt = new MqttAsyncClient(url, clientId);
mqtt.setCallback(this);
doConnect();
connectToken.waitForCompletion(2000);
}

private final class ConnectThread extends Thread {

@Override
public void run() {
boolean running = true;
while (running) {
try {
Log.info("trying to connect to mqtt : " + mqtt.getServerURI());
final MqttConnectOptions opt = new MqttConnectOptions();
opt.setCleanSession(false);

connectToken = mqtt.connect(opt);
connectToken.waitForCompletion(2000);
running = false;
} catch (MqttException e) {
Log.warning("Connect error", e);
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
break;
}
}
Log.info("connected to mqtt : " + mqtt.getServerURI());

}
}

private void doConnect() {
final Thread t = new ConnectThread();
t.start();
// Wait connection to start...
while (connectToken == null) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Log.warning(e);
}
}
}

@Override
public void connectionLost(Throwable arg0) {
doConnect();
}

@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
}

@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
}



public void publish(String topic, byte[] msg) {
publishOnRoot(rootTopic + topic, msg, AT_LEAST_ONCE, false);
}

public void publishOnRoot(String topic, byte[] msg, boolean retained) {
publishOnRoot(topic, msg, AT_LEAST_ONCE, retained);
}

private final int MAX_RETRY = 5;

public void publishOnRoot(String topic, byte[] msg, int qos, boolean retained) {
int retryCount = 0;
boolean ok = false;
do {
try {
retryCount++;
mqtt.publish(topic, msg, qos , retained);
ok = true;
} catch (MqttPersistenceException e) {
Log.warning("Error in mqtt publish", e);
} catch (MqttException e) {
if ((e.getReasonCode() == MqttException.REASON_CODE_MAX_INFLIGHT) && (retryCount < MAX_RETRY)) {
// ignore
} else {
Log.warning("error publising to mqtt" , e);
}
}
} while (!ok && (retryCount < MAX_RETRY));
}


}
68 changes: 68 additions & 0 deletions src/main/java/com/ivanfm/traccar/mqtt/MQTTHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.ivanfm.traccar.mqtt;

import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.traccar.BaseDataHandler;
import org.traccar.Config;
import org.traccar.Context;
import org.traccar.model.Position;

import com.ivanfm.mqtt.MQTTPublisher;


public class MQTTHandler extends BaseDataHandler {

private final static Logger log = LoggerFactory.getLogger(MQTTHandler.class);

private static MQTTPublisher globalPublisher;

private synchronized MQTTPublisher getPublisher() {
if (globalPublisher == null) {
try {
final Config config = Context.getConfig();
globalPublisher = new MQTTPublisher(
config.getString("mqtt.url", "tcp://localhost:1883"),
config.getString("mqtt.clientid", "traccar.mqtt.handler"),
config.getString("mqtt.topicRoot", "/traccar/"));

globalPublisher.publish("start", (new Date()).toString().getBytes());
} catch (Exception e) {
log.error("", e);
}
}
return globalPublisher;
}


/**
* Multiple hanlders are created but only one publisher...
*
* @throws Exception
*/
public MQTTHandler() {
// publisher is already instantiated....
}


@Override
protected Position handlePosition(Position position) {
try {
publish(position);
} catch (Throwable t) {
log.error("", t);
}
return position;
}

public void publish(Position position) {
final MQTTPublisher publisher = getPublisher();
if (publisher != null) {
final ProcessPosition ld = new ProcessPosition(publisher, position);
ld.publish();
}
}


}
Loading

0 comments on commit 97e082b

Please sign in to comment.