Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
ad min committed Jan 30, 2015
1 parent e822a71 commit 960b261
Show file tree
Hide file tree
Showing 15 changed files with 1,131 additions and 0 deletions.
110 changes: 110 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>LogTopology</groupId>
<artifactId>LogTopology</artifactId>
<version>1.0</version>

<dependencies>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.2-incubating</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>nl.minvenj.nfi.storm</groupId>
<artifactId>kafka-spout</artifactId>
<version>0.2-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-core</artifactId>
<version>5.5.0.Final</version>
</dependency>

<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-compiler</artifactId>
<version>5.5.0.Final</version>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.5.1</version>
</dependency>
</dependencies>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>

<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.1</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
</execution>
</executions>
<configuration>
<includeTypes>jar</includeTypes>
<overWriteSnapshots>true</overWriteSnapshots>
<type>jar</type>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</plugin>
</plugins>
</build>

</project>
56 changes: 56 additions & 0 deletions src/main/java/storm/etl/log/ConsumerSample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package storm.etl.log;

/**
* Created by endy on 14-9-25.
*/
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;

public class ConsumerSample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect", "10.132.174.98:2181");
props.put("zookeeper.connection.timeout.ms", "1000000");
props.put("group.id", "storm_group");

ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

HashMap<String, Integer> map = new HashMap<String, Integer>();
map.put("fks", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =
consumerConnector.createMessageStreams(map);
List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("fks");

ExecutorService executor = Executors.newFixedThreadPool(1);

for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new Runnable() {
public void run() {
for (MessageAndMetadata msgAndMetadata : stream) {
System.out.println("topic: " + msgAndMetadata.topic());
Message message = (Message) msgAndMetadata.message();
ByteBuffer buffer = message.payload();
byte[] bytes = new byte[message.payloadSize()];
buffer.get(bytes);
String tmp = new String(bytes);
System.out.println("message content: " + tmp);
}
}
});
}

}
}

76 changes: 76 additions & 0 deletions src/main/java/storm/etl/log/LogTopology.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package storm.etl.log;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import nl.minvenj.nfi.storm.kafka.KafkaSpout;
import org.slf4j.Logger;
import storm.etl.log.bolt.LogRulesBolt;
import storm.etl.log.bolt.SaveToRedisCloudBolt;

import java.util.Arrays;

public class LogTopology {
public static Logger logger= org.slf4j.LoggerFactory.getLogger(LogTopology.class);
public LogTopology() {

}

public static void main(String[] args) throws Exception {
Config conf = new Config();
if (args != null && args.length > 0) {
try{
conf.put("kafka.spout.topic", "fks");
conf.put("kafka.group.id", "storm_group");
conf.put("kafka.zookeeper.connect", "10.132.174.98:2181");
conf.put("kafka.consumer.timeout.ms", 100000);
conf.put("kafka.zookeeper.session.timeout.ms", "40000");
conf.put("kafka.spout.buffer.size.max", "512");

TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("logSpout", new KafkaSpout(),1);
topologyBuilder.setBolt("logRules", new LogRulesBolt(args[0]),15).shuffleGrouping("logSpout");
topologyBuilder.setBolt("saveToDB", new SaveToRedisCloudBolt(),15).shuffleGrouping("logRules");

conf.setNumWorkers(5);


conf.put(Config.NIMBUS_HOST, "10.132.176.117");
conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("10.132.176.117","10.132.175.107","10.132.163.27"));

conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32);
conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
StormSubmitter.submitTopology(args[1], conf, topologyBuilder.createTopology());
}
catch(Exception e)
{
logger.error("this topology run fail!");
}
}
else
{
conf.put("kafka.spout.topic", "fks");
conf.put("kafka.group.id", "storm_group");
conf.put("kafka.zookeeper.connect", "10.132.174.98:2181");
conf.put("kafka.consumer.timeout.ms", 100000);
conf.put("kafka.zookeeper.session.timeout.ms", "40000");
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("logSpout", new KafkaSpout(),1);
topologyBuilder.setBolt("logRules", new LogRulesBolt("D:\\etl log analysis\\logtopology\\target\\classes\\Syslog.drl"),5).shuffleGrouping("logSpout");
topologyBuilder.setBolt("saveToDB", new SaveToRedisCloudBolt(),5).shuffleGrouping("logRules");

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, topologyBuilder.createTopology());
Utils.sleep(500000);
cluster.killTopology("test");
cluster.shutdown();

}
}

}
81 changes: 81 additions & 0 deletions src/main/java/storm/etl/log/LogTopology1.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package storm.etl.log;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import org.apache.zookeeper.client.ConnectStringParser;
import org.slf4j.Logger;
import storm.etl.log.bolt.LogRulesBolt;
import storm.etl.log.bolt.SaveToRedisCloudBolt;
import storm.kafka.*;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class LogTopology1 {
public static Logger logger= org.slf4j.LoggerFactory.getLogger(LogTopology1.class);
public LogTopology1() {

}

public static void main(String[] args) throws Exception {
Config conf = new Config();
if (args != null && args.length > 0) {
try{
BrokerHosts brokerHosts = new ZkHosts("10.132.174.98");
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "fks", "", "storm");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.startOffsetTime=-2;
spoutConfig.forceFromStart=true;
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder topologyBuilder = new TopologyBuilder();

topologyBuilder.setSpout("logSpout", kafkaSpout,2);
topologyBuilder.setBolt("logRules", new LogRulesBolt(args[0]),10).shuffleGrouping("logSpout");
topologyBuilder.setBolt("saveToDB", new SaveToRedisCloudBolt(),10).shuffleGrouping("logRules");

conf.setNumWorkers(5);

conf.put(Config.NIMBUS_HOST, "10.132.176.117");
conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("10.132.176.117","10.132.175.107","10.132.163.27"));

conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32);
conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
StormSubmitter.submitTopology(args[1], conf, topologyBuilder.createTopology());
}
catch(Exception e)
{
logger.error("this topology run fail!");
}
}
else
{
BrokerHosts brokerHosts = new ZkHosts("10.132.174.98");
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "fks", "", "storm");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.startOffsetTime=-2;
spoutConfig.forceFromStart=true;
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder topologyBuilder = new TopologyBuilder();

topologyBuilder.setSpout("logSpout", kafkaSpout,2);
topologyBuilder.setBolt("logRules", new LogRulesBolt("D:\\etl log analysis\\logtopology\\target\\classes\\Syslog.drl"),10).shuffleGrouping("logSpout");
topologyBuilder.setBolt("saveToDB", new SaveToRedisCloudBolt(),10).shuffleGrouping("logRules");

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, topologyBuilder.createTopology());
Utils.sleep(5000000);
cluster.killTopology("test");
cluster.shutdown();
}
}

}
Loading

0 comments on commit 960b261

Please sign in to comment.