diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..e7d53a0
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,13 @@
+*.class
+*.log
+
+# sbt specific
+dist/*
+target/
+lib_managed/
+src_managed/
+project/boot/
+project/plugins/project/
+
+# Kafka
+logs/*
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..952b8cf
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,13 @@
+Copyright © 2014 Michael G. Noll
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..07fd587
--- /dev/null
+++ b/README.md
@@ -0,0 +1,790 @@
+# kafka-storm-starter
+
+Code examples that show how to integrate
+[Apache Kafka](http://kafka.apache.org/) 0.8+ (latest stable) with
+[Apache Storm](http://storm.incubator.apache.org/) 0.9+ (latest stable),
+while using [Apache Avro](http://avro.apache.org/) as the data serialization format.
+
+---
+
+Table of Contents
+
+* Quick start
+* Features
+* Implementation details
+* Development
+ * Build requirements
+ * Building the code
+ * Running the tests
+ * Creating code coverage reports
+ * Packaging the code
+ * IDE support
+* FAQ
+ * Kafka
+ * Storm
+* Known issues and limitations
+ * Upstream code
+ * kafka-storm-starter code
+* Contributing
+* License
+* References
+ * Wirbelsturm
+ * Kafka
+ * Storm
+ * Avro
+ * Kryo
+
+---
+
+
+
+
+# Quick start
+
+## Show me!
+
+ $ ./sbt test
+
+This command launches our test suite.
+
+Notably it will run end-to-end tests of Kafka, Storm, and Kafka-Storm integration. See this abridged version of the
+test output:
+
+```
+[...other tests removed...]
+
+[info] KafkaSpec:
+[info] Kafka
+[info] - should synchronously send and receive a Tweet in Avro format
+[info] + Given a ZooKeeper instance
+[info] + And a Kafka broker instance
+[info] + And some tweets
+[info] + And a single-threaded Kafka consumer group
+[info] + When I start a synchronous Kafka producer that sends the tweets in Avro binary format
+[info] + Then the consumer app should receive the tweets
+[info] - should asynchronously send and receive a Tweet in Avro format
+[info] + Given a ZooKeeper instance
+[info] + And a Kafka broker instance
+[info] + And some tweets
+[info] + And a single-threaded Kafka consumer group
+[info] + When I start an asynchronous Kafka producer that sends the tweets in Avro binary format
+[info] + Then the consumer app should receive the tweets
+[info] StormSpec:
+[info] Storm
+[info] - should start a local cluster
+[info] + Given no cluster
+[info] + When I start a LocalCluster instance
+[info] + Then the local cluster should start properly
+[info] - should run a basic topology
+[info] + Given a local cluster
+[info] + And a wordcount topology
+[info] + And the input words alice, bob, joe, alice
+[info] + When I submit the topology
+[info] + Then the topology should properly count the words
+[info] KafkaStormSpec:
+[info] Feature: AvroDecoderBolt[T]
+[info] Scenario: User creates a Storm topology that uses AvroDecoderBolt
+[info] Given a ZooKeeper instance
+[info] And a Kafka broker instance
+[info] And a Storm topology that uses AvroDecoderBolt and that reads tweets from topic testing-input and writes them as-is to topic testing-output
+[info] And some tweets
+[info] And a synchronous Kafka producer app that writes to the topic testing-input
+[info] And a single-threaded Kafka consumer app that reads from topic testing-output
+[info] And a Storm topology configuration that registers an Avro Kryo decorator for Tweet
+[info] When I run the Storm topology
+[info] And I use the Kafka producer app to Avro-encode the tweets and sent them to Kafka
+[info] Then the Kafka consumer app should receive the decoded, original tweets from the Storm topology
+[info] Feature: AvroScheme[T] for Kafka spout
+[info] Scenario: User creates a Storm topology that uses AvroScheme in Kafka spout
+[info] Given a ZooKeeper instance
+[info] And a Kafka broker instance
+[info] And a Storm topology that uses AvroScheme and that reads tweets from topic testing-input and writes them as-is to topic testing-output
+[info] And some tweets
+[info] And a synchronous Kafka producer app that writes to the topic testing-input
+[info] And a single-threaded Kafka consumer app that reads from topic testing-output
+[info] And a Storm topology configuration that registers an Avro Kryo decorator for Tweet
+[info] When I run the Storm topology
+[info] And I use the Kafka producer app to Avro-encode the tweets and sent them to Kafka
+[info] Then the Kafka consumer app should receive the decoded, original tweets from the Storm topology
+[info] Run completed in 21 seconds, 852 milliseconds.
+[info] Total number of tests run: 25
+[info] Suites: completed 8, aborted 0
+[info] Tests: succeeded 25, failed 0, canceled 0, ignored 0, pending 0
+[info] All tests passed.
+[success] Total time: 22 s, completed May 23, 2014 12:31:09 PM
+```
+
+
+## Show me one more time!
+
+ $ ./sbt run
+
+This command launches [KafkaStormDemo](src/main/scala/com/miguno/kafkastorm/storm/KafkaStormDemo.scala). This demo
+starts in-memory instances of ZooKeeper, Kafka, and Storm. It then runs a demo Storm topology that connects to and
+reads from the Kafka instance.
+
+You will see output similar to the following (some parts removed to improve readability):
+
+```
+7031 [Thread-19] INFO backtype.storm.daemon.worker - Worker 3f7f1a51-5c9e-43a5-b431-e39a7272215e for storm kafka-storm-starter-1-1400839826 on daa60807-d440-4b45-94fc-8dd7798453d2:1027 has finished loading
+7033 [Thread-29-kafka-spout] INFO storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=127.0.0.1:9092}}
+7050 [Thread-29-kafka-spout] INFO backtype.storm.daemon.executor - Opened spout kafka-spout:(1)
+7051 [Thread-29-kafka-spout] INFO backtype.storm.daemon.executor - Activating spout kafka-spout:(1)
+7051 [Thread-29-kafka-spout] INFO storm.kafka.ZkCoordinator - Refreshing partition manager connections
+7065 [Thread-29-kafka-spout] INFO storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=127.0.0.1:9092}}
+7066 [Thread-29-kafka-spout] INFO storm.kafka.ZkCoordinator - Deleted partition managers: []
+7066 [Thread-29-kafka-spout] INFO storm.kafka.ZkCoordinator - New partition managers: [Partition{host=127.0.0.1:9092, partition=0}]
+7083 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - Read partition information from: /kafka-spout/kafka-storm-starter/partition_0 --> null
+7100 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - No partition information found, using configuration to determine offset
+7105 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - Starting Kafka 127.0.0.1:0 from offset 18
+7106 [Thread-29-kafka-spout] INFO storm.kafka.ZkCoordinator - Finished refreshing
+7126 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - Committing offset for Partition{host=127.0.0.1:9092, partition=0}
+7126 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - Committed offset 18 for Partition{host=127.0.0.1:9092, partition=0} for topology: 47e82e34-fb36-427e-bde6-8cd971db2527
+9128 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - Committing offset for Partition{host=127.0.0.1:9092, partition=0}
+9129 [Thread-29-kafka-spout] INFO storm.kafka.PartitionManager - Committed offset 18 for Partition{host=127.0.0.1:9092, partition=0} for topology: 47e82e34-fb36-427e-bde6-8cd971db2527
+```
+
+At this point Storm is connected to Kafka (more precisely: to the `testing` topic in Kafka). The last few lines from
+above -- "Committing offset ..." --- will be repeated again and again, because a) this demo Storm topology only reads
+from the Kafka topic but it does nothing to the data that was read and b) because we are not sending any data to the
+Kafka topic.
+
+Note that this example will actually run _two_ in-memory instances of ZooKeeper: the first (listening at
+`127.0.0.1:2181/tcp`) is used by the Kafka instance, the second (listening at `127.0.0.1:2000/tcp`) is automatically
+started and used by the in-memory Storm cluster. This is because, when running in local aka in-memory mode, Storm does
+not allow you to reconfigure or disable its own ZooKeeper instance (see the [Storm FAQ](#FAQ-Storm) below for further
+information).
+
+**To stop the demo application you must kill or `Ctrl-C` the process in the terminal.**
+
+You can use [KafkaStormDemo](src/main/scala/com/miguno/kafkastorm/storm/KafkaStormDemo.scala) as a starting point to
+create your own, "real" Storm topologies that read from a "real" Kafka, Storm, and ZooKeeper infrastructure. An easy
+way to get started with such an infrastructure is by deploying Kafka, Storm, and ZooKeeper via a tool such as
+[Wirbelsturm](https://github.com/miguno/wirbelsturm).
+
+
+
+
+
+# Features
+
+What features do we showcase in kafka-storm-starter? Note that we focus on showcasing, and not necessarily on
+"production ready".
+
+* How to integrate Kafka and Storm.
+* How to use [Avro](http://avro.apache.org/) with Kafka and Storm.
+* Kafka standalone code examples
+ * [KafkaProducerApp](src/main/scala/com/miguno/kafkastorm/kafka/KafkaProducerApp.scala):
+ A simple Kafka producer app for writing Avro-encoded data into Kafka.
+ [KafkaSpec](src/test/scala/com/miguno/kafkastorm/integration/KafkaSpec.scala) puts this producer to use and shows
+ how to use Twitter Bijection to Avro-encode the messages being sent to Kafka.
+ * [KafkaConsumer](src/main/scala/com/miguno/kafkastorm/kafka/KafkaConsumer.scala):
+ A simple Kafka consumer app for reading Avro-encoded data from Kafka.
+ [KafkaSpec](src/test/scala/com/miguno/kafkastorm/integration/KafkaSpec.scala) puts this consumer to use and shows
+ how to use Twitter Bijection to Avro-decode the messages being read from Kafka.
+* Storm standalone code examples
+ * [AvroDecoderBolt[T]](src/main/scala/com/miguno/kafkastorm/storm/AvroDecoderBolt.scala):
+ An `AvroDecoderBolt[T <: org.apache.avro.specific.SpecificRecordBase]` that can be parameterized with the type of
+ the Avro record `T` it will deserialize its data to (i.e. no need to write another decoder bolt just because the
+ bolt needs to handle a different Avro schema).
+ * [AvroScheme[T]](src/main/scala/com/miguno/kafkastorm/storm/AvroScheme.scala):
+ An `AvroScheme[T <: org.apache.avro.specific.SpecificRecordBase]` scheme, i.e. a custom
+ `backtype.storm.spout.Scheme` to auto-deserialize a spout's incoming data. The scheme can be parameterized with
+ the type of the Avro record `T` it will deserializes its data to (i.e. no need to write another scheme just
+ because the scheme needs to handle a different Avro schema).
+ * You can opt to configure a spout (such as the Kafka spout) with `AvroScheme` if you want to perform the Avro
+ decoding step directly in the spout instead of placing an `AvroDecoderBolt` after the Kafka spout. You may
+ want to profile your topology which of the two approaches works best for your use case.
+ * [TweetAvroKryoDecorator](src/main/scala/com/miguno/kafkastorm/storm/TweetAvroKryoDecorator.scala):
+ A custom `backtype.storm.serialization.IKryoDecorator`, i.e. a custom
+ [Kryo serializer for Storm](http://storm.incubator.apache.org/documentation/Serialization.html).
+ * Unfortunately we have not figured out a way to implement a parameterized `AvroKryoDecorator[T]` variant yet.
+ (A "straight-forward" approach we tried -- similar to the other parameterized components -- compiled fine but
+ failed at runtime when running the tests). Code contributions are welcome!
+* Kafka and Storm integration
+ * [AvroKafkaSinkBolt[T]](src/main/scala/com/miguno/kafkastorm/storm/AvroKafkaSinkBolt.scala):
+ An `AvroKafkaSinkBolt[T <: org.apache.avro.specific.SpecificRecordBase]` that can be parameterized with the type
+ of the Avro record `T` it will serialize its data to before sending the encoded data to Kafka (i.e. no
+ need to write another Kafka sink bolt just because the bolt needs to handle a different Avro schema).
+ * Storm topologies that read Avro-encoded data from Kafka:
+ [KafkaStormDemo](src/main/scala/com/miguno/kafkastorm/storm/KafkaStormDemo.scala) and
+ [KafkaStormSpec](src/test/scala/com/miguno/kafkastorm/integration/KafkaStormSpec.scala)
+ * A Storm topology that writes Avro-encoded data to Kafka:
+ [KafkaStormSpec](src/test/scala/com/miguno/kafkastorm/integration/KafkaStormSpec.scala)
+* Integration testing
+ * [KafkaSpec](src/test/scala/com/miguno/kafkastorm/integration/KafkaSpec.scala):
+ Tests for Kafka, which launch and run against in-memory instances of Kafka and ZooKeeper.
+ * [StormSpec](src/test/scala/com/miguno/kafkastorm/integration/StormSpec.scala):
+ Tests for Storm, which launch and run against in-memory instances of Storm and ZooKeeper.
+ * [KafkaStormSpec](src/test/scala/com/miguno/kafkastorm/integration/KafkaStormSpec.scala):
+ Tests for integrating Storm and Kafka, which launch and run against in-memory instances of Kafka, Storm, and
+ ZooKeeper.
+
+
+
+
+# Implementation details
+
+* We use [Twitter Bijection](https://github.com/twitter/bijection) for Avro encoding and decoding.
+* We use [Twitter Chill](https://github.com/twitter/chill/) (which in turn uses Bijection) to implement a
+ [custom Kryo serializer for Storm](src/main/scala/com/miguno/kafkastorm/storm/TweetAvroKryoDecorator.scala) that
+ handles our Avro-derived Java class `Tweet` from [twitter.avsc](src/main/avro/twitter.avsc).
+* Unit and integration tests are implemented with [ScalaTest](http://scalatest.org/).
+* We use [ZooKeeper 3.3.4](https://zookeeper.apache.org/) instead of the latest version 3.4.5.
+ See section _Known issues_ below for why we do that.
+* We use the Kafka spout [wurstmeister/storm-kafka-0.8-plus](https://github.com/wurstmeister/storm-kafka-0.8-plus).
+ Unfortunately that spout is not yet released for Scala 2.10. For that reason [@miguno](https://github.com/miguno/)
+ has [forked and branched](https://github.com/miguno/storm-kafka-0.8-plus/tree/miguno_clojars) the code to add Scala
+ 2.10 support, and released such a version to [Clojars](https://clojars.org/com.miguno/storm-kafka-0.8-plus_2.10).
+ See [build.sbt](build.sbt) for details.
+ * _Once Storm 0.9.2 is released we will migrate to the new_
+ _[Kafka spout that ships with Storm](https://github.com/apache/incubator-storm/tree/master/external/storm-kafka)_
+ _(which is based on the spout developed by wurstmeister)._
+
+
+
+
+# Development
+
+
+
+
+## Git setup: git-flow
+
+This project follows the [git-flow](https://github.com/nvie/gitflow) approach. This means, for instance, that:
+
+* The branch `develop` is used for integration of the "next release".
+* The branch `master` is used for bringing forth production releases.
+
+See [git-flow](https://github.com/nvie/gitflow) and the introduction article
+[Why aren't you using git-flow?](http://jeffkreeftmeijer.com/2010/why-arent-you-using-git-flow/) for details.
+
+
+
+
+## Build requirements
+
+* [Scala](http://www.scala-lang.org/) 2.10.4
+* [sbt](http://www.scala-sbt.org/) 0.13.2
+* Oracle Java JDK 6 (version 6 is still recommended for use with Kafka and Storm)
+ * The code _in this project_ works with Java 7, too. However, some dependencies we use are not published for Java 7
+ yet.
+
+
+### Install on Mac OS X
+
+_The instructions below assume you have [Homebrew](http://brew.sh/) installed on your Mac._
+
+First, install Oracle Java JDK 6 for Mac:
+
+* [Java 6 for Mac OS X](http://support.apple.com/downloads/DL1572/en_US/JavaForOSX2013-05.dmg) aka
+ "Java for OS X 2013-005". This will give you Java 1.6.0_65.
+
+Then, install Scala and sbt:
+
+ $ brew update
+ $ brew install scala210 sbt
+
+
+### Install on RHEL/CentOS 6
+
+First, install Oracle Java JDK 6:
+
+* Follow [these instructions](http://www.if-not-true-then-false.com/2010/install-sun-oracle-java-jdk-jre-6-on-fedora-centos-red-hat-rhel/)
+ (untested!).
+ * Note: As a RHEL 6 user you may have access to a ready-to-use RPM package of Oracle JDK 6 in your existing yum
+ repositories as part of the RedHat Network (RHN). If so, you do not need to follow the instructions in the link
+ above. Instead, you only need to run e.g. `sudo yum install java-1.6.0-sun-devel`
+ ([details](https://access.redhat.com/site/documentation/en-US/JBoss_Enterprise_Application_Platform/5/html/Installation_Guide/appe-install_jdk.html)).
+
+Then, install Scala and sbt:
+
+ $ sudo yum install http://www.scala-lang.org/files/archive/scala-2.10.4.rpm
+ $ sudo yum install http://dl.bintray.com/sbt/rpm/sbt-0.13.2.rpm
+
+
+See [Download Scala 2.10.4](http://www.scala-lang.org/download/2.10.4.html) and
+[Installing sbt](http://www.scala-sbt.org/release/docs/Getting-Started/Setup.html) for details.
+
+
+### Install on Ubuntu/Debian
+
+First, install Oracle JDK 6:
+
+* Follow [these instructions](http://linuxg.net/how-to-install-oracle-java-jdk-678-on-ubuntu-13-04-12-10-12-04/)
+ (untested!). Note that by following these instructions you will install Oracle JDK/JRE from a third-party PPA package
+ repository (`ppa:webupd8team/java`, managed by [webupd8](http://www.webupd8.org/)). Unfortunately Oracle does not
+ provide official apt repositories for Ubuntu, and the Ubuntu team was required to remove "their" Oracle JDK/JRE
+ packages from the Ubuntu repositories because of licensing issues with Oracle.
+
+Then, install Scala and sbt:
+
+ $ wget http://www.scala-lang.org/files/archive/scala-2.10.4.deb
+ $ sudo dpkg -i scala-2.10.4.deb
+ $ wget http://dl.bintray.com/sbt/debian/sbt-0.13.2.deb
+ $ sudo dpkg -i http://dl.bintray.com/sbt/debian/sbt-0.13.2.deb
+
+See [Download Scala 2.10.4](http://www.scala-lang.org/download/2.10.4.html) and
+[Installing sbt](http://www.scala-sbt.org/release/docs/Getting-Started/Setup.html) for details.
+
+
+
+
+## Building the code
+
+ $ ./sbt clean compile
+
+If you want to only (re)generate Java classes from Avro schemas:
+
+ $ ./sbt avro:generate
+
+Generated Java sources are stored under `target/scala-*/src_managed/main/compiled_avro/`.
+
+
+
+
+## Running the tests
+
+ $ ./sbt clean test
+
+Here are some examples that demonstrate how you can run only a certain subset of tests:
+
+ # Use `-l` to exclude tests by tag:
+ # Run all tests WITH THE EXCEPTION of those tagged as integration tests
+ $ ./sbt "test-only * -- -l com.miguno.kafkastorm.integration.IntegrationTest"
+
+ # Use `-n` to include tests by tag (and skip all tests that lack the tag):
+ # Run ONLY tests tagged as integration tests
+ $ ./sbt "test-only * -- -n com.miguno.kafkastorm.integration.IntegrationTest"
+
+ # Run only the tests in suite AvroSchemeSpec:
+ $ ./sbt "test-only com.miguno.kafkastorm.storm.AvroSchemeSpec"
+
+ # You can also combine the examples above, of course.
+
+Test reports in JUnit XML format are written to `target/test-reports/junitxml/*.xml`. Make sure that your actual build
+steps run the `./sbt test` task, otherwise the JUnit XML reports will not be generate (note that `./sbt scoverage:test`
+_will not_ generate the JUnit XML reports unfortunately).
+
+Integration with CI servers:
+
+* Jenkins integration:
+ * Configure the build job.
+ * Go to _Post-build Actions_.
+ * Add a post-build action for _Publish JUnit test result report_.
+ * In the _Test report XMLs_ field add the pattern `**/target/test-reports/junitxml/*.xml`.
+ * Now each build of your job will have a _Test Result_ link.
+* TeamCity integration:
+ * Edit the build configuration.
+ * Select configuration step 3, _Build steps_.
+ * Under _Additional Build Features_ add a new build feature.
+ * Use the following build feature configuration:
+ * Report type: Ant JUnit
+ * Monitoring rules: `target/test-reports/junitxml/*.xml`
+ * Now each build of your job will have a _Tests_ tab.
+
+Further details are available at:
+
+* How to tag tests in ScalaTest: [Tagging your tests](http://www.scalatest.org/user_guide/tagging_your_tests)
+* How to selectively run tests: [Using ScalaTest with sbt](http://www.scalatest.org/user_guide/using_scalatest_with_sbt)
+ and [How to Run Tagged Scala Tests with SBT and ScalaTest](http://code.hootsuite.com/tagged-tests-with-sbt/)
+
+
+
+
+## Creating code coverage reports
+
+We are using [sbt-scoverage](https://github.com/scoverage/sbt-scoverage) to create code coverage reports for unit tests.
+
+Run the unit tests via:
+
+ $ ./sbt clean scoverage:test
+
+* An HTML report will be created at `target/scala-2.10/scoverage-report/index.html`.
+* An XML report will be created at `./target/scala-2.10/scoverage-report/scoverage.xml`.
+
+Integration with CI servers:
+
+* Jenkins integration:
+ * Configure the build.
+ * Go to _Post-build Actions_.
+ * Add a post-build action for _Publish Cobertura Coverage Report_.
+ * In the _Cobertura xml report pattern_ field add the pattern `**/target/scala-2.10/scoverage-report/scoverage.xml`.
+ * Now each build of your job will have a _Coverage Report_ link.
+* TeamCity integration:
+ * Edit the build configuration.
+ * Select configuration step 1, _General settings_.
+ * In the _Artifact Paths_ field add the path `target/scala-2.10/scoverage-report/** => coberturareport/`.
+ * Now each build of your job will have a _Cobertura Coverage Report_ tab.
+
+
+
+
+## Packaging the code
+
+To create a normal ("slim") jar:
+
+ $ ./sbt clean package
+
+ >>> Generates `target/scala-2.10/kafka-storm-starter_2.10-0.1.0-SNAPSHOT.jar`
+
+To create a fat jar, which includes any dependencies of kafka-storm-starter:
+
+ $ ./sbt assembly
+
+ >>> Generates `target/scala-2.10/kafka-storm-starter-assembly-0.1.0-SNAPSHOT.jar`
+
+To create a scaladoc/javadoc jar:
+
+ $ ./sbt packageDoc
+
+ >>> Generates `target/scala-2.10/kafka-storm-starter_2.10-0.1.0-SNAPSHOT-javadoc.jar`
+
+To create a sources jar:
+
+ $ ./sbt packageSrc
+
+ >>> Generates `target/scala-2.10/kafka-storm-starter_2.10-0.1.0-SNAPSHOT-sources.jar`
+
+To create API docs:
+
+ $ ./sbt doc
+
+ >>> Generates `target/scala-2.10/api/*` (HTML files)
+
+
+
+
+## IDE support
+
+### IntelliJ IDEA
+
+kafka-storm-starter integrates the [sbt-idea plugin](https://github.com/mpeltonen/sbt-idea). Use the following command
+to build IDEA project files:
+
+ $ ./sbt gen-idea
+
+You can then open kafka-storm-starter as a project in IDEA via _File > Open..._ and selecting the top-level directory
+of kafka-storm-starter.
+
+**Important note:** There is a bug when using the sbt plugins for Avro and for IntelliJ IDEA in combination. The sbt
+plugin for Avro reads the Avro `*.avsc` schemas stored under `src/main/avro` and generates the corresponding Java
+classes, which it stores under `target/scala-2.10/src_managed/main/compiled_avro` (in the case of kafka-storm-starter,
+a `Tweet.java` class will be generated from the Avro schema [twitter.avsc](src/main/avro/twitter.avsc)). The latter
+path must be added to IDEA's _Source Folders_ setting, which will happen automatically for you. However the
+aforementioned bug will add a second, incorrect path to _Source Folders_, too, which will cause IDEA to complain about
+not being able to find the Avro-generated Java classes (here: the `Tweet` class).
+
+Until this bug is fixed upstream you can use the following workaround, which you must perform everytime you run
+`./sbt gen-idea`:
+
+1. In IntelliJ IDEA open the project structure for kafka-storm-starter via _File > Project Structure..._.
+2. Under _Project settings_ on the left-hand side select _Modules_.
+3. Select the _Sources_ tab on the right-hand side.
+4. Remove the problematic `target/scala-2.10/src_managed/main/compiled_avro/com` entry from the _Source Folders_ listing
+ (the source folders are colored in light-blue). Note the trailing `.../com`, which comes from
+ `com.miguno.avro.Tweet` in the [twitter.avsc](src/main/avro/twitter.avsc) Avro schema.
+5. Click Ok.
+
+See also this screenshot (click to enlarge):
+
+[](images/IntelliJ-IDEA-Avro-bug.png?raw=true)
+
+
+### Eclipse
+
+kafka-storm-starter integrates the [sbt-eclipse plugin](https://github.com/typesafehub/sbteclipse). Use the following
+command to build Eclipse project files:
+
+ $ ./sbt eclipse
+
+Then use the _Import Wizard_ in Eclipse to import _Existing Projects into Workspace_.
+
+
+
+
+# FAQ
+
+
+
+
+## Kafka
+
+### Where do the unit tests store broker logs in the local filesystem?
+
+The in-memory Kafka instances that are launched by the unit tests store their Kafka "log" files (i.e. the files that
+contain the messages that are being sent to the Kafka topics) under `/tmp/kafka-logs/`.
+
+You may need to manually remove this directory in case you want start from a clean state. At the moment the unit tests
+do not remove this directory for you.
+
+### ZooKeeper exceptions "KeeperException: NoNode for /[ZK path]" logged at INFO level
+
+In short you can normally safely ignore those errors -- it's for a reason they are logged at INFO level and not at ERROR
+level.
+
+As described in the mailing list thread [Zookeeper exceptions](http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201204.mbox/%3CCAFbh0Q3BxaAkyBq1_yUHhUkkhxX4RBQZPAA2pkR4U9+m4VY8nA@mail.gmail.com%3E):
+
+"The reason you see those NoNode error code is the following. Every time we want to create a new [ZK] path, say
+`/brokers/ids/1`, we try to create it directly. If this fails because the parent path doesn't exist, we try to create
+the parent path first. This will happen recursively. However, the `NoNode` error should show up only once, not every
+time a broker is started (assuming ZK data hasn't been cleaned up)."
+
+A similar answer was given in the thread
+[Clean up kafka environment](http://grokbase.com/t/kafka/users/137qgfyga0/clean-up-kafka-environmet):
+
+"These info messages show up when Kafka tries to create new consumer groups. While trying to create the children of
+`/consumers/[group]`, if the parent path doesn't exist, the zookeeper server logs these messages. Kafka internally
+handles these cases correctly by first creating the parent node."
+
+
+
+
+## Storm
+
+### Storm `LocalCluster` and ZooKeeper
+
+[LocalCluster](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/LocalCluster.clj)
+starts an embedded ZooKeeper instance listening at `localhost:2000/tcp`. If a different process is already bound to
+`2000/tcp`, then Storm will increment the embedded ZooKeeper's port until it finds a free port (`2000` -> `2001` ->
+`2002`, and so on). `LocalCluster` then reads the Storm defaults and overrides some of Storm's configuration (see the
+`mk-local-storm-cluster` function in
+[testing.clj](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/testing.clj) and
+the `mk-inprocess-zookeeper` function in
+[zookeeper.clj](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/zookeeper.clj)
+for details):
+
+ STORM-CLUSTER-MODE "local"
+ STORM-ZOOKEEPER-PORT zk-port
+ STORM-ZOOKEEPER-SERVERS ["localhost"]}
+
+where `zk-port` is the final port chosen.
+
+As of May 2014 it is not possible to launch a local Storm cluster via `LocalCluster` without its own embedded ZooKeeper.
+Likewise it is not possible to control on which port the embedded ZooKeeper process will listen -- it will always follow
+the `2000/tcp` based algorithm above to set the port. A JIRA ticket was opened to untangle this hard wiring between
+`LocalCluster` and ZooKeeper, cf.
+[STORM-213: Decouple In-Process ZooKeeper from LocalCluster](https://issues.apache.org/jira/browse/STORM-213).
+
+
+
+
+# Known issues and limitations
+
+This section lists known issues and limitations a) for the upstream projects such as Storm and Kafka, and b) for our
+own code.
+
+
+
+
+## Upstream code
+
+### Kryo version conflict in Storm
+
+_Note: This problem is resolved in the upcoming 0.9.2 version of Storm._
+
+There is a Kryo version conflict between Storm 0.9.1 (uses Kryo 2.17) and Twitter Chill (uses Kryo 2.21).
+
+In this code project we use the workaround to exclude Kryo (2.21) from the Twitter Chill dependency, but this may not
+be a universal workaround. Twitter have apparently run into data corruption issues with Kryo 2.17, and for that reason
+have built their own version of Storm using Kryo 2.21.
+See [CHILL-173: Kryo version conflict between Chill and Storm 0.9.1-incubating causes Avro serialization to fail](https://github.com/twitter/chill/issues/173)
+for details.
+
+
+### ZooKeeper throws InstanceAlreadyExistsException during tests
+
+You will see the following exception when running the integration tests, which you can safely ignore:
+
+ [2014-03-07 11:56:59,250] WARN Failed to register with JMX (org.apache.zookeeper.server.ZooKeeperServer)
+ javax.management.InstanceAlreadyExistsException: org.apache.ZooKeeperService:name0=StandaloneServer_port-1
+
+The root cause is that in-memory ZooKeeper instances have a hardcoded JMX setup. And because we cannot prevent Storm's
+`LocalCluster` to start its own ZooKeeper instance alongside "ours" (see FAQ section above), there will be two ZK
+instances trying to use the same JMX setup. Since the JMX setup is not relevant for our testing the exception can be
+safely ignored, albeit we'd prefer to come up with a proper fix, of course.
+
+
+### ZooKeeper version 3.3.x recommended for use with Storm 0.9.1 and Kafka 0.8.x
+
+_Note: The upcoming version 0.9.2 of Storm uses ZooKeeper 3.4.5._
+
+At the time of writing both Storm (<= 0.9.1) and Kafka (<= 0.8.1.1) are not officially compatible with ZooKeeper 3.4.x
+yet, which is the latest stable version of ZooKeeper. Instead the use of ZooKeeper 3.3.x is recommended.
+
+So which version of ZooKeeper should you do pick, particularly if you are already running a ZooKeeper cluster for other
+parts of your infrastructure (such as an Hadoop cluster)?
+
+**The TL;DR version is:** Try using ZooKeeper 3.4.5 for both Kafka and Storm, but see the caveats and workarounds
+below. If you do run into problems, consider downgrading to ZooKeeper 3.3.6. If that fails, too, try 3.3.4. In the
+worst case use separate ZooKeeper clusters/versions for Storm (3.3.3) and Kafka (3.3.4).
+
+**The longer version is:** Storm versions up to and including 0.9.1 want ZK 3.3.3, but the upcoming 0.9.2 version
+relies on ZooKeeper 3.4.x.
+[All current versions of Kafka still prefer ZK 3.3.4](https://kafka.apache.org/documentation.html#zkversion).
+Generally speaking though, the best 3.3.x version of ZooKeeper is 3.3.6, which is the latest stable 3.3.x version. This
+is because 3.3.6 fixed a number of serious bugs that could lead to data corruption.
+
+_Tip: You can verify against which ZK version the code in this project is actually built by running_
+_`./sbt dependency-graph`._
+
+**The really long version is:** In the _code and tests_ of this project we cannot use ZK 3.4.x just yet because Storm
+0.9.1 is not 100% incompatible with ZK 3.4.x. For instance, Storm will throw errors if you try to run a Storm
+`LocalCluster` (for unit testing) against ZK 3.4.x. At the same time, and somewhat surprisingly, you can run a "real"
+Storm cluster against ZK 3.4.x. For instance, Netflix have reportedly been using ZK 3.4.5 in production since some
+time.
+
+* Storm and ZooKeeper: Storm versions up to and including 0.9.1 are built against ZooKeeper 3.3.3 because of Storm's
+ dependency on [Netflix Curator 1.0.1](https://github.com/Netflix/curator). These versions of Zookeeper and Curator
+ are very old, and the upcoming Storm 0.9.2 therefore switches to Apache Curator 2.4.0 with ZooKeeper 3.4.x.
+* Kafka and ZooKeeper: LinkedIn recommend the use of ZK 3.3.x but warn against the use of 3.3.3 because that
+ version has known serious issues regarding ephemeral node deletion and session expirations. For these reasons
+ LinkedIn run ZK 3.3.4 in production.
+ See [ZooKeeper version](https://kafka.apache.org/documentation.html#zkversion) in the Kafka documentation.
+ Lastly, there is an open Kafka JIRA ticket that covers upgrading Kafka to ZK 3.4.5, see
+ [KAFKA-854: Upgrade dependencies for 0.8](https://issues.apache.org/jira/browse/KAFKA-854).
+* Storm and Cloudera CDH 4.5:
+ * [Storm cannot run in combination with a recent Hadoop/HBase version](http://mail-archives.apache.org/mod_mbox/storm-user/201402.mbox/%3CCADoiZqom8Wuzi9uiqT4d01cTNn2r_nOmXyZyCSqEko-vOyrQBA@mail.gmail.com%3E)
+ -- The author ran into problems when using Storm in combination with Cloudera CDH 4. It looks as if he is trying
+ to build a code project that lists both Storm and Hadoop/HBase as its dependencies (similar to how we combine
+ Storm with Kafka), and due to that runs into ZooKeeper version conflicts as CDH 4 runs ZooKeeper 3.4.5.
+* If in a production environment you run into problems when using ZooKeeper 3.4.5 with Storm <= 0.9.1, you can try
+ a [workaround using Google jarjar](https://groups.google.com/forum/#!topic/storm-user/TVVF_jqvD_A) in order to
+ deploy ZooKeeper 3.4.5 alongside Storm's/Curator's hard dependency on ZooKeeper 3.3.3.
+ [Another user reported](http://grokbase.com/t/gg/storm-user/134f2tw5gx/recommended-zookeeper-version-for-storm-0-8-2)
+ that he uses ZK 3.4.5 in production and ZK 3.3.3 for local testing by not including ZooKeeper in the uber jar
+ and putting the correct ZK version in the CLASSPATH at runtime.
+ [STORM-70: Use ZooKeeper 3.4.5](https://issues.apache.org/jira/browse/STORM-70).
+
+
+
+
+## kafka-storm-starter code
+
+* Some code in kafka-storm-starter does not look like idiomatic Scala code. While sometimes this may be our own fault,
+ there is one area where we cannot easily prevent this from happening: When the underlying Java APIs (here: the Java
+ API of Storm) do not lend themselves to a more Scala-like code style. You can see this, for instance, in the way we
+ wire the spouts and bolts of a topology. One alternative, of course, would be to create Scala-fied wrappers but this
+ seemed inappropriate for this project.
+* We are using `Thread.sleep()` in some tests instead of more intelligent approaches. To prevent transient failures we
+ may thus want to improve those tests. In Kafka's test suites, for instance, tests are using `waitUntilTrue()` to
+ detect more reliably when to proceed (or fail/timeout) with the next step. See the related discussion in the
+ [review request 19696 for KAFKA-1317](https://reviews.apache.org/r/19696/#comment71202).
+* We noticed that the tests may fail when using Oracle/Sun JDK 1.6.0_24. Later versions (e.g. 1.6.0_31) work fine.
+
+
+
+
+# Contributing to kafka-storm-starter
+
+Code contributions, bug reports, feature requests etc. are all welcome.
+
+If you are new to GitHub please read [Contributing to a project](https://help.github.com/articles/fork-a-repo) for how
+to send patches and pull requests to kafka-storm-starter.
+
+
+
+
+# License
+
+Copyright © 2014 Michael G. Noll
+
+See [LICENSE](LICENSE) for licensing information.
+
+
+
+
+# References
+
+
+
+
+## Wirbelsturm
+
+Want to perform 1-click deployments of Kafka clusters and/or Storm clusters (with a Graphite instance, with Redis,
+with...)? Take a look at [Wirbelsturm](https://github.com/miguno/wirbelsturm), with which you can deploy such
+environments locally and to Amazon AWS.
+
+
+
+
+## Kafka
+
+Unit testing:
+
+* [buildlackey/cep/kafka-0.8.x](https://github.com/buildlackey/cep/tree/master/kafka-0.8.x)
+ -- A simple Kafka producer/consumer example with in-memory Kafka and Zookeeper instances. For a number of reasons
+ we opted not to use that code. We list it in this section in case someone else may find it helpful.
+
+
+
+
+## Storm
+
+Storm in general:
+
+* [Storm FAQ](http://storm.incubator.apache.org/documentation/FAQ.html)
+* [Config (Java API)](http://storm.incubator.apache.org/apidocs/backtype/storm/Config.html)
+* [Understanding the Internal Message Buffers of Storm](http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/)
+* [Sending Metrics From Storm to Graphite](http://www.michael-noll.com/blog/2013/11/06/sending-metrics-from-storm-to-graphite/)
+
+Unit testing:
+
+* [TestingApiDemo.java](https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java)
+ -- Demonstrates in Java how to use Storm's built-in testing API. Unfortunately the code is more than a year old and
+ not well documented.
+ * Note that `backtype.storm.Testing` is apparently not well suited to test Trident topologies.
+ See [Any Java example to write test cases for storm Transactional topology](https://groups.google.com/forum/#!msg/storm-user/nZs2NwNqqn8/CjKaZK7eRFsJ)
+ (Mar 2013) for details.
+* [MockOutputCollector](https://gist.github.com/k2xl/1782187)
+ -- Code example on how to implement a mock `OutputCollector` for unit testing.
+* [Testing the logic of Storm topologies](https://groups.google.com/forum/#!topic/storm-user/Magc5-vt2Hg)
+ -- Discussion in the old storm-user mailing list, Dec 2011
+* [buildlackey/cep/storm-kafka](https://github.com/buildlackey/cep/tree/master/storm%2Bkafka)
+ -- Kafka spout integration test with an in-memory Storm cluster (`LocalCluster`), and in-memory Kafka and Zookeeper
+ instances. For a number of reasons we opted not to use that code. We list it in this section in case someone else
+ may find it helpful.
+* [buildlackey/cep/esper+storm+kafka](https://github.com/buildlackey/cep/tree/master/esper%2Bstorm%2Bkafka)
+ -- Example illustrating a Kafka consumer spout, a Kafka producer bolt, and an Esper streaming query bolt
+* [schleyfox/storm-test](https://github.com/schleyfox/storm-test)
+ -- Test utilities for Storm (in Clojure).
+
+Kafka spout [wurstmeister/storm-kafka-0.8-plus](https://github.com/wurstmeister/storm-kafka-0.8-plus):
+
+* [Example code on how to use the spout](https://github.com/wurstmeister/storm-kafka-0.8-plus-test)
+
+Kafka spout [HolmesNL/kafka-spout](https://github.com/HolmesNL/kafka-spout), written by the
+[Netherlands Forensics Institute](http://forensicinstitute.nl):
+
+* [Main documentation](https://github.com/HolmesNL/kafka-spout/wiki)
+* [KafkaSpout.java](https://github.com/HolmesNL/kafka-spout/blob/develop/src/main/java/nl/minvenj/nfi/storm/kafka/KafkaSpout.java)
+ -- Helpful to understand how the spout works.
+* [ConfigUtils.java](https://github.com/HolmesNL/kafka-spout/blob/develop/src/main/java/nl/minvenj/nfi/storm/kafka/util/ConfigUtils.java)
+ -- Helpful to understand how the Kafka spout can be configured.
+
+
+
+
+## Avro
+
+Twitter Bijection:
+
+* [SpecificAvroCodecsSpecification.scala](https://github.com/twitter/bijection/blob/develop/bijection-avro/src/test/scala/com/twitter/bijection/avro/SpecificAvroCodecsSpecification.scala)
+ -- How to use Bijection for Avro's `Specific*` API (which is what you would usually do)
+* [GenericAvroCodecsSpecification.scala](https://github.com/twitter/bijection/blob/develop/bijection-avro/src/test/scala/com/twitter/bijection/avro/GenericAvroCodecsSpecification.scala)
+ -- How to use Bijection for Avro's `Generic*` API
+
+Kafka:
+
+* [How to use Kafka and Avro](http://stackoverflow.com/questions/8298308/how-to-use-kafka-and-avro)
+
+
+
+
+## Kryo
+
+* [AdamKryoRegistrator.java](https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/serialization/AdamKryoRegistrator.scala)
+ -- example on how to register serializers with Kyro
+* Twitter Chill examples on how to create Avro-based serializers for Kryo:
+ * [AvroSerializerSpec.scala](https://github.com/twitter/chill/blob/develop/chill-avro/src/test/scala/com/twitter/chill/avro/AvroSerializerSpec.scala)
+ * [BijectionEnrichedKryo.scala](https://github.com/twitter/chill/blob/develop/chill-bijection/src/main/scala/com/twitter/chill/BijectionEnrichedKryo.scala)
diff --git a/assembly.sbt b/assembly.sbt
new file mode 100644
index 0000000..fd69c77
--- /dev/null
+++ b/assembly.sbt
@@ -0,0 +1,16 @@
+import AssemblyKeys._
+
+assemblySettings
+
+// Any customized settings must be written here, i.e. after 'assemblySettings' above.
+// See https://github.com/sbt/sbt-assembly for available parameters.
+
+// Include "provided" dependencies back to run/test tasks' classpath.
+// See:
+// https://github.com/sbt/sbt-assembly#-provided-configuration
+// http://stackoverflow.com/a/21803413/3827
+//
+// In our case, the Storm dependency must be set to "provided (cf. `build.sbt`) because, when deploying and launching
+// our Storm topology code "for real" to a distributed Storm cluster, Storm wants us to exclude the Storm dependencies
+// (jars) as they are provided [no pun intended] by the Storm cluster.
+run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))
diff --git a/build.sbt b/build.sbt
new file mode 100644
index 0000000..2a3bb86
--- /dev/null
+++ b/build.sbt
@@ -0,0 +1,87 @@
+organization := "com.miguno.kafkastorm"
+
+name := "kafka-storm-starter"
+
+scalaVersion := "2.10.4"
+
+seq(sbtavro.SbtAvro.avroSettings : _*)
+
+// Configure the desired Avro version. sbt-avro automatically injects a libraryDependency.
+(version in avroConfig) := "1.7.6"
+
+// Look for *.avsc etc. files in src/test/avro
+(sourceDirectory in avroConfig) <<= (sourceDirectory in Compile)(_ / "avro")
+
+(stringType in avroConfig) := "String"
+
+// https://github.com/jrudolph/sbt-dependency-graph
+net.virtualvoid.sbt.graph.Plugin.graphSettings
+
+resolvers ++= Seq(
+ "typesafe-repository" at "http://repo.typesafe.com/typesafe/releases/",
+ "clojars-repository" at "https://clojars.org/repo",
+ // For retrieving Kafka release artifacts directly from Apache. The artifacts are also available via Maven Central.
+ "Apache releases" at "https://repository.apache.org/content/repositories/releases/"
+)
+
+libraryDependencies ++= Seq(
+ "com.twitter" %% "bijection-core" % "0.6.2",
+ "com.twitter" %% "bijection-avro" % "0.6.2",
+ // Chill uses Kryo 2.21, which is not fully compatible with 2.17 (used by Storm).
+ // We must exclude the newer Kryo version, otherwise we run into the problem described at
+ // https://github.com/thinkaurelius/titan/issues/301.
+ //
+ // TODO: Once Storm 0.9.2 is released we can update our dependencies to use Chill as-is (without excludes) because
+ // Storm then uses Kryo 2.21 (via Carbonite 1.3.3) just like Chill does.
+ "com.twitter" %% "chill" % "0.3.6"
+ exclude("com.esotericsoftware.kryo", "kryo"),
+ "com.twitter" % "chill-avro" % "0.3.6"
+ exclude("com.esotericsoftware.kryo", "kryo"),
+ "com.twitter" %% "chill-bijection" % "0.3.6"
+ exclude("com.esotericsoftware.kryo", "kryo"),
+ // The excludes of jms, jmxtools and jmxri are required as per https://issues.apache.org/jira/browse/KAFKA-974.
+ // The exclude of slf4j-simple is because it overlaps with our use of logback with slf4j facade; without the exclude
+ // we get slf4j warnings and logback's configuration is not picked up.
+ "org.apache.kafka" % "kafka_2.10" % "0.8.1.1"
+ exclude("javax.jms", "jms")
+ exclude("com.sun.jdmk", "jmxtools")
+ exclude("com.sun.jmx", "jmxri")
+ exclude("org.slf4j", "slf4j-simple"),
+ "org.apache.storm" % "storm-core" % "0.9.1-incubating" % "provided"
+ exclude("org.slf4j", "log4j-over-slf4j"),
+ // We exclude curator-framework because storm-kafka-0.8-plus recently switched from curator 1.0.1 to 1.3.3, which
+ // pulls in a newer version of ZooKeeper with which Storm 0.9.1 is not yet compatible.
+ //
+ // TODO: Remove the exclude once Storm 0.9.2 is released, because that version depends on a newer version (3.4.x) of
+ // ZooKeeper.
+ "com.miguno" %% "storm-kafka-0.8-plus" % "0.5.0-SNAPSHOT"
+ exclude("com.netflix.curator", "curator-framework"),
+ "com.netflix.curator" % "curator-test" % "1.0.1",
+ "com.101tec" % "zkclient" % "0.4",
+ // Logback with slf4j facade
+ "ch.qos.logback" % "logback-classic" % "1.1.2",
+ "ch.qos.logback" % "logback-core" % "1.1.2",
+ "org.slf4j" % "slf4j-api" % "1.7.7",
+ // Test dependencies
+ "org.scalatest" %% "scalatest" % "2.1.5" % "test",
+ "org.mockito" % "mockito-all" % "1.9.5" % "test"
+)
+
+// Required IntelliJ workaround. This tells `sbt gen-idea` to include scala-reflect as a compile dependency (and not
+// merely as a test dependency), which we need for TypeTag usage.
+libraryDependencies <+= (scalaVersion)("org.scala-lang" % "scala-reflect" % _)
+
+publishArtifact in Test := false
+
+parallelExecution in Test := false
+
+// Write test results to file in JUnit XML format
+testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-u", "target/test-reports/junitxml")
+
+// Write test results to console/stdout
+testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-o")
+
+// See https://github.com/scoverage/scalac-scoverage-plugin
+ScoverageSbtPlugin.instrumentSettings
+
+mainClass in (Compile,run) := Some("com.miguno.kafkastorm.storm.KafkaStormDemo")
diff --git a/images/IntelliJ-IDEA-Avro-bug.png b/images/IntelliJ-IDEA-Avro-bug.png
new file mode 100644
index 0000000..455bb09
Binary files /dev/null and b/images/IntelliJ-IDEA-Avro-bug.png differ
diff --git a/images/IntelliJ-IDEA-Avro-bug_400x216.png b/images/IntelliJ-IDEA-Avro-bug_400x216.png
new file mode 100644
index 0000000..b5de7ef
Binary files /dev/null and b/images/IntelliJ-IDEA-Avro-bug_400x216.png differ
diff --git a/project/assembly.sbt b/project/assembly.sbt
new file mode 100644
index 0000000..54c3252
--- /dev/null
+++ b/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
diff --git a/project/build.properties b/project/build.properties
new file mode 100644
index 0000000..8ac605a
--- /dev/null
+++ b/project/build.properties
@@ -0,0 +1 @@
+sbt.version=0.13.2
diff --git a/project/build.sbt b/project/build.sbt
new file mode 100644
index 0000000..ccfbb29
--- /dev/null
+++ b/project/build.sbt
@@ -0,0 +1,2 @@
+// https://github.com/sbt/sbt-release
+addSbtPlugin("com.github.gseitz" % "sbt-release" % "0.8.3")
diff --git a/project/plugins.sbt b/project/plugins.sbt
new file mode 100644
index 0000000..a0fcddd
--- /dev/null
+++ b/project/plugins.sbt
@@ -0,0 +1,20 @@
+resolvers ++= Seq(
+ "sbt-plugin-releases-repo" at "http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases",
+ "sbt-idea-repository" at "http://mpeltonen.github.io/maven/"
+)
+
+// https://github.com/mpeltonen/sbt-idea
+addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.6.0")
+
+// https://github.com/typesafehub/sbteclipse
+addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.5.0")
+
+// https://github.com/cavorite/sbt-avro
+addSbtPlugin("com.cavorite" % "sbt-avro" % "0.3.2")
+
+// https://github.com/jrudolph/sbt-dependency-graph
+addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.4")
+
+// See https://github.com/scoverage/scalac-scoverage-plugin
+// and https://github.com/scoverage/sbt-scoverage
+addSbtPlugin("org.scoverage" %% "sbt-scoverage" % "0.98.2")
diff --git a/sbt b/sbt
new file mode 100755
index 0000000..15a6f76
--- /dev/null
+++ b/sbt
@@ -0,0 +1,457 @@
+#!/usr/bin/env bash
+#
+# A more capable sbt runner, coincidentally also called sbt.
+# Author: Paul Phillips
+
+# todo - make this dynamic
+declare -r sbt_release_version="0.13.2"
+declare -r sbt_unreleased_version="0.13.5-RC1"
+declare -r buildProps="project/build.properties"
+
+declare sbt_jar sbt_dir sbt_create sbt_version
+declare scala_version java_home sbt_explicit_version
+declare verbose noshare batch trace_level log_level
+declare sbt_saved_stty
+
+echoerr () { echo >&2 "$@"; }
+vlog () { [[ -n "$verbose" ]] && echoerr "$@"; }
+
+# spaces are possible, e.g. sbt.version = 0.13.0
+build_props_sbt () {
+ [[ -r "$buildProps" ]] && \
+ grep '^sbt\.version' "$buildProps" | tr '=' ' ' | awk '{ print $2; }'
+}
+
+update_build_props_sbt () {
+ local ver="$1"
+ local old="$(build_props_sbt)"
+
+ [[ -r "$buildProps" ]] && [[ "$ver" != "$old" ]] && {
+ perl -pi -e "s/^sbt\.version\b.*\$/sbt.version=${ver}/" "$buildProps"
+ grep -q '^sbt.version[ =]' "$buildProps" || printf "\nsbt.version=%s\n" "$ver" >> "$buildProps"
+
+ vlog "!!!"
+ vlog "!!! Updated file $buildProps setting sbt.version to: $ver"
+ vlog "!!! Previous value was: $old"
+ vlog "!!!"
+ }
+}
+
+set_sbt_version () {
+ sbt_version="${sbt_explicit_version:-$(build_props_sbt)}"
+ [[ -n "$sbt_version" ]] || sbt_version=$sbt_release_version
+ export sbt_version
+}
+
+# restore stty settings (echo in particular)
+onSbtRunnerExit() {
+ [[ -n "$sbt_saved_stty" ]] || return
+ vlog ""
+ vlog "restoring stty: $sbt_saved_stty"
+ stty "$sbt_saved_stty"
+ unset sbt_saved_stty
+}
+
+# save stty and trap exit, to ensure echo is reenabled if we are interrupted.
+trap onSbtRunnerExit EXIT
+sbt_saved_stty="$(stty -g 2>/dev/null)"
+vlog "Saved stty: $sbt_saved_stty"
+
+# this seems to cover the bases on OSX, and someone will
+# have to tell me about the others.
+get_script_path () {
+ local path="$1"
+ [[ -L "$path" ]] || { echo "$path" ; return; }
+
+ local target="$(readlink "$path")"
+ if [[ "${target:0:1}" == "/" ]]; then
+ echo "$target"
+ else
+ echo "${path%/*}/$target"
+ fi
+}
+
+die() {
+ echo "Aborting: $@"
+ exit 1
+}
+
+make_url () {
+ version="$1"
+
+ case "$version" in
+ 0.7.*) echo "http://simple-build-tool.googlecode.com/files/sbt-launch-0.7.7.jar" ;;
+ 0.10.* ) echo "$sbt_launch_repo/org.scala-tools.sbt/sbt-launch/$version/sbt-launch.jar" ;;
+ 0.11.[12]) echo "$sbt_launch_repo/org.scala-tools.sbt/sbt-launch/$version/sbt-launch.jar" ;;
+ *) echo "$sbt_launch_repo/org.scala-sbt/sbt-launch/$version/sbt-launch.jar" ;;
+ esac
+}
+
+init_default_option_file () {
+ local overriding_var="${!1}"
+ local default_file="$2"
+ if [[ ! -r "$default_file" && "$overriding_var" =~ ^@(.*)$ ]]; then
+ local envvar_file="${BASH_REMATCH[1]}"
+ if [[ -r "$envvar_file" ]]; then
+ default_file="$envvar_file"
+ fi
+ fi
+ echo "$default_file"
+}
+
+declare -r cms_opts="-XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC"
+declare -r jit_opts="-XX:ReservedCodeCacheSize=256m -XX:+TieredCompilation"
+declare -r default_jvm_opts="-XX:MaxPermSize=384m -Xms512m -Xmx1536m -Xss2m $jit_opts $cms_opts"
+declare -r noshare_opts="-Dsbt.global.base=project/.sbtboot -Dsbt.boot.directory=project/.boot -Dsbt.ivy.home=project/.ivy"
+declare -r latest_28="2.8.2"
+declare -r latest_29="2.9.3"
+declare -r latest_210="2.10.4"
+declare -r latest_211="2.11.0"
+
+declare -r script_path="$(get_script_path "$BASH_SOURCE")"
+declare -r script_name="${script_path##*/}"
+
+# some non-read-onlies set with defaults
+declare java_cmd="java"
+declare sbt_opts_file="$(init_default_option_file SBT_OPTS .sbtopts)"
+declare jvm_opts_file="$(init_default_option_file JVM_OPTS .jvmopts)"
+declare sbt_launch_repo="http://typesafe.artifactoryonline.com/typesafe/ivy-releases"
+
+# pull -J and -D options to give to java.
+declare -a residual_args
+declare -a java_args
+declare -a scalac_args
+declare -a sbt_commands
+
+# args to jvm/sbt via files or environment variables
+declare -a extra_jvm_opts extra_sbt_opts
+
+# if set, use JAVA_HOME over java found in path
+[[ -e "$JAVA_HOME/bin/java" ]] && java_cmd="$JAVA_HOME/bin/java"
+
+# directory to store sbt launchers
+declare sbt_launch_dir="$HOME/.sbt/launchers"
+[[ -d "$sbt_launch_dir" ]] || mkdir -p "$sbt_launch_dir"
+[[ -w "$sbt_launch_dir" ]] || sbt_launch_dir="$(mktemp -d -t sbt_extras_launchers.XXXXXX)"
+
+build_props_scala () {
+ if [[ -r "$buildProps" ]]; then
+ versionLine="$(grep '^build.scala.versions' "$buildProps")"
+ versionString="${versionLine##build.scala.versions=}"
+ echo "${versionString%% .*}"
+ fi
+}
+
+execRunner () {
+ # print the arguments one to a line, quoting any containing spaces
+ vlog "# Executing command line:" && {
+ for arg; do
+ if [[ -n "$arg" ]]; then
+ if printf "%s\n" "$arg" | grep -q ' '; then
+ printf >&2 "\"%s\"\n" "$arg"
+ else
+ printf >&2 "%s\n" "$arg"
+ fi
+ fi
+ done
+ vlog ""
+ }
+
+ if [[ -n "$batch" ]]; then
+ exec /dev/null; then
+ curl --fail --silent "$url" --output "$jar"
+ elif which wget >/dev/null; then
+ wget --quiet -O "$jar" "$url"
+ fi
+ } && [[ -r "$jar" ]]
+}
+
+acquire_sbt_jar () {
+ sbt_url="$(jar_url "$sbt_version")"
+ sbt_jar="$(jar_file "$sbt_version")"
+
+ [[ -r "$sbt_jar" ]] || download_url "$sbt_url" "$sbt_jar"
+}
+
+usage () {
+ cat < display stack traces with a max of frames (default: -1, traces suppressed)
+ -no-colors disable ANSI color codes
+ -sbt-create start sbt even if current directory contains no sbt project
+ -sbt-dir path to global settings/plugins directory (default: ~/.sbt/)
+ -sbt-boot path to shared boot directory (default: ~/.sbt/boot in 0.11+)
+ -ivy path to local Ivy repository (default: ~/.ivy2)
+ -no-share use all local caches; no sharing
+ -offline put sbt in offline mode
+ -jvm-debug Turn on JVM debugging, open at the given port.
+ -batch Disable interactive mode
+ -prompt Set the sbt prompt; in expr, 's' is the State and 'e' is Extracted
+
+ # sbt version (default: sbt.version from $buildProps if present, otherwise $sbt_release_version)
+ -sbt-version use the specified version of sbt (default: $sbt_release_version)
+ -sbt-jar use the specified jar as the sbt launcher
+ -sbt-launch-dir directory to hold sbt launchers (default: ~/.sbt/launchers)
+ -sbt-launch-repo repo url for downloading sbt launcher jar (default: $sbt_launch_repo)
+
+ # scala version (default: as chosen by sbt)
+ -28 use $latest_28
+ -29 use $latest_29
+ -210 use $latest_210
+ -211 use $latest_211
+ -scala-home use the scala build at the specified directory
+ -scala-version use the specified version of scala
+ -binary-version use the specified scala version when searching for dependencies
+
+ # java version (default: java from PATH, currently $(java -version 2>&1 | grep version))
+ -java-home alternate JAVA_HOME
+
+ # passing options to the jvm - note it does NOT use JAVA_OPTS due to pollution
+ # The default set is used if JVM_OPTS is unset and no -jvm-opts file is found
+ $default_jvm_opts
+ JVM_OPTS environment variable holding either the jvm args directly, or
+ the reference to a file containing jvm args if given path is prepended by '@' (e.g. '@/etc/jvmopts')
+ Note: "@"-file is overridden by local '.jvmopts' or '-jvm-opts' argument.
+ -jvm-opts file containing jvm args (if not given, .jvmopts in project root is used if present)
+ -Dkey=val pass -Dkey=val directly to the jvm
+ -J-X pass option -X directly to the jvm (-J is stripped)
+
+ # passing options to sbt, OR to this runner
+ SBT_OPTS environment variable holding either the sbt args directly, or
+ the reference to a file containing sbt args if given path is prepended by '@' (e.g. '@/etc/sbtopts')
+ Note: "@"-file is overridden by local '.sbtopts' or '-sbt-opts' argument.
+ -sbt-opts file containing sbt args (if not given, .sbtopts in project root is used if present)
+ -S-X add -X to sbt's scalacOptions (-S is stripped)
+EOM
+}
+
+addJava () {
+ vlog "[addJava] arg = '$1'"
+ java_args=( "${java_args[@]}" "$1" )
+}
+addSbt () {
+ vlog "[addSbt] arg = '$1'"
+ sbt_commands=( "${sbt_commands[@]}" "$1" )
+}
+addScalac () {
+ vlog "[addScalac] arg = '$1'"
+ scalac_args=( "${scalac_args[@]}" "$1" )
+}
+addResidual () {
+ vlog "[residual] arg = '$1'"
+ residual_args=( "${residual_args[@]}" "$1" )
+}
+addResolver () {
+ addSbt "set resolvers += $1"
+}
+addDebugger () {
+ addJava "-Xdebug"
+ addJava "-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=$1"
+}
+setScalaVersion () {
+ [[ "$1" == *"-SNAPSHOT" ]] && addResolver 'Resolver.sonatypeRepo("snapshots")'
+ addSbt "++ $1"
+}
+
+process_args ()
+{
+ require_arg () {
+ local type="$1"
+ local opt="$2"
+ local arg="$3"
+
+ if [[ -z "$arg" ]] || [[ "${arg:0:1}" == "-" ]]; then
+ die "$opt requires <$type> argument"
+ fi
+ }
+ while [[ $# -gt 0 ]]; do
+ case "$1" in
+ -h|-help) usage; exit 1 ;;
+ -v) verbose=true && shift ;;
+ -d) addSbt "--debug" && shift ;;
+ -w) addSbt "--warn" && shift ;;
+ -q) addSbt "--error" && shift ;;
+ -trace) require_arg integer "$1" "$2" && trace_level="$2" && shift 2 ;;
+ -ivy) require_arg path "$1" "$2" && addJava "-Dsbt.ivy.home=$2" && shift 2 ;;
+ -no-colors) addJava "-Dsbt.log.noformat=true" && shift ;;
+ -no-share) noshare=true && shift ;;
+ -sbt-boot) require_arg path "$1" "$2" && addJava "-Dsbt.boot.directory=$2" && shift 2 ;;
+ -sbt-dir) require_arg path "$1" "$2" && sbt_dir="$2" && shift 2 ;;
+ -debug-inc) addJava "-Dxsbt.inc.debug=true" && shift ;;
+ -offline) addSbt "set offline := true" && shift ;;
+ -jvm-debug) require_arg port "$1" "$2" && addDebugger "$2" && shift 2 ;;
+ -batch) batch=true && shift ;;
+ -prompt) require_arg "expr" "$1" "$2" && addSbt "set shellPrompt in ThisBuild := (s => { val e = Project.extract(s) ; $2 })" && shift 2 ;;
+
+ -sbt-create) sbt_create=true && shift ;;
+ -sbt-jar) require_arg path "$1" "$2" && sbt_jar="$2" && shift 2 ;;
+ -sbt-version) require_arg version "$1" "$2" && sbt_explicit_version="$2" && shift 2 ;;
+ -sbt-dev) sbt_explicit_version="$sbt_unreleased_version" && shift ;;
+-sbt-launch-dir) require_arg path "$1" "$2" && sbt_launch_dir="$2" && shift 2 ;;
+-sbt-launch-repo) require_arg path "$1" "$2" && sbt_launch_repo="$2" && shift 2 ;;
+ -scala-version) require_arg version "$1" "$2" && setScalaVersion "$2" && shift 2 ;;
+-binary-version) require_arg version "$1" "$2" && addSbt "set scalaBinaryVersion in ThisBuild := \"$2\"" && shift 2 ;;
+ -scala-home) require_arg path "$1" "$2" && addSbt "set every scalaHome := Some(file(\"$2\"))" && shift 2 ;;
+ -java-home) require_arg path "$1" "$2" && java_cmd="$2/bin/java" && shift 2 ;;
+ -sbt-opts) require_arg path "$1" "$2" && sbt_opts_file="$2" && shift 2 ;;
+ -jvm-opts) require_arg path "$1" "$2" && jvm_opts_file="$2" && shift 2 ;;
+
+ -D*) addJava "$1" && shift ;;
+ -J*) addJava "${1:2}" && shift ;;
+ -S*) addScalac "${1:2}" && shift ;;
+ -28) setScalaVersion "$latest_28" && shift ;;
+ -29) setScalaVersion "$latest_29" && shift ;;
+ -210) setScalaVersion "$latest_210" && shift ;;
+ -211) setScalaVersion "$latest_211" && shift ;;
+
+ *) addResidual "$1" && shift ;;
+ esac
+ done
+}
+
+# process the direct command line arguments
+process_args "$@"
+
+# skip #-styled comments and blank lines
+readConfigFile() {
+ while read line; do
+ [[ $line =~ ^# ]] || [[ -z $line ]] || echo "$line"
+ done < "$1"
+}
+
+# if there are file/environment sbt_opts, process again so we
+# can supply args to this runner
+if [[ -r "$sbt_opts_file" ]]; then
+ vlog "Using sbt options defined in file $sbt_opts_file"
+ while read opt; do extra_sbt_opts+=("$opt"); done < <(readConfigFile "$sbt_opts_file")
+elif [[ -n "$SBT_OPTS" && ! ("$SBT_OPTS" =~ ^@.*) ]]; then
+ vlog "Using sbt options defined in variable \$SBT_OPTS"
+ extra_sbt_opts=( $SBT_OPTS )
+else
+ vlog "No extra sbt options have been defined"
+fi
+
+[[ -n "${extra_sbt_opts[*]}" ]] && process_args "${extra_sbt_opts[@]}"
+
+# reset "$@" to the residual args
+set -- "${residual_args[@]}"
+argumentCount=$#
+
+# set sbt version
+set_sbt_version
+
+# only exists in 0.12+
+setTraceLevel() {
+ case "$sbt_version" in
+ "0.7."* | "0.10."* | "0.11."* ) echoerr "Cannot set trace level in sbt version $sbt_version" ;;
+ *) addSbt "set every traceLevel := $trace_level" ;;
+ esac
+}
+
+# set scalacOptions if we were given any -S opts
+[[ ${#scalac_args[@]} -eq 0 ]] || addSbt "set scalacOptions in ThisBuild += \"${scalac_args[@]}\""
+
+# Update build.properties on disk to set explicit version - sbt gives us no choice
+[[ -n "$sbt_explicit_version" ]] && update_build_props_sbt "$sbt_explicit_version"
+vlog "Detected sbt version $sbt_version"
+
+[[ -n "$scala_version" ]] && vlog "Overriding scala version to $scala_version"
+
+# no args - alert them there's stuff in here
+(( argumentCount > 0 )) || {
+ vlog "Starting $script_name: invoke with -help for other options"
+ residual_args=( shell )
+}
+
+# verify this is an sbt dir or -create was given
+[[ -r ./build.sbt || -d ./project || -n "$sbt_create" ]] || {
+ cat < Unit) {
+ val topicCountMap = Map(topic -> numThreads)
+ val valueDecoder = new DefaultDecoder
+ val keyDecoder = valueDecoder
+ val consumerMap = consumerConnector.createMessageStreams(topicCountMap, keyDecoder, valueDecoder)
+ val consumerThreads = consumerMap.get(topic) match {
+ case Some(streams) => streams.view.zipWithIndex map {
+ case (stream, threadId) =>
+ new ConsumerTask(stream, new ConsumerTaskContext(threadId), f)
+ }
+ case _ => Seq()
+ }
+ consumerThreads foreach executor.submit
+ }
+
+ def shutdown() {
+ consumerConnector.shutdown()
+ executor.shutdown()
+ }
+
+ Runtime.getRuntime.addShutdownHook(new Thread() {
+ override def run() {
+ shutdown()
+ }
+ })
+
+}
+
+class ConsumerTask[K, V, C <: ConsumerTaskContext](stream: KafkaStream[K, V], context: C,
+ f: (MessageAndMetadata[K, V], C) => Unit) extends Runnable with Logging {
+
+ override def run() {
+ info(s"Consumer thread ${context.threadId} started")
+ stream foreach {
+ case msg: MessageAndMetadata[_, _] =>
+ trace(s"Thread ${context.threadId} received message: " + msg)
+ f(msg, context)
+ case _ => trace(s"Received unexpected message type from broker")
+ }
+ info(s"Shutting down consumer thread ${context.threadId}")
+ }
+
+}
+
+case class ConsumerTaskContext(threadId: Int)
\ No newline at end of file
diff --git a/src/main/scala/com/miguno/kafkastorm/kafka/KafkaEmbedded.scala b/src/main/scala/com/miguno/kafkastorm/kafka/KafkaEmbedded.scala
new file mode 100644
index 0000000..734e5c4
--- /dev/null
+++ b/src/main/scala/com/miguno/kafkastorm/kafka/KafkaEmbedded.scala
@@ -0,0 +1,68 @@
+package com.miguno.kafkastorm.kafka
+
+import java.util.Properties
+import kafka.server.{KafkaServerStartable, KafkaConfig}
+import kafka.utils.Logging
+
+/**
+ * Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092`.
+ *
+ * Requires a running ZooKeeper instance to connect to. By default, it expects a ZooKeeper instance running at
+ * `127.0.0.1:2181`. You can specify a different ZooKeeper instance by setting the `zookeeper.connect` parameter in the
+ * broker's configuration.
+ *
+ * @param config Broker configuration settings.
+ */
+class KafkaEmbedded(config: Properties = new Properties) extends Logging {
+
+ private val defaultZkConnect = "127.0.0.1:2181"
+
+ private val effectiveConfig = {
+ val c = new Properties
+ c.load(this.getClass.getResourceAsStream("/broker-defaults.properties"))
+ c.putAll(config)
+ c
+ }
+
+ private val kafkaConfig = new KafkaConfig(effectiveConfig)
+ private val kafka = new KafkaServerStartable(kafkaConfig)
+
+ /**
+ * This broker's `metadata.broker.list` value. Example: `127.0.0.1:9092`.
+ *
+ * You can use this to tell Kafka producers and consumers how to connect to this instance.
+ */
+ val brokerList = kafka.serverConfig.hostName + ":" + kafka.serverConfig.port
+
+ /**
+ * The ZooKeeper connection string aka `zookeeper.connect`.
+ */
+ val zookeeperConnect = {
+ val zkConnectLookup = Option(effectiveConfig.getProperty("zookeeper.connect"))
+ zkConnectLookup match {
+ case Some(zkConnect) => zkConnect
+ case _ =>
+ warn(s"zookeeper.connect is not configured -- falling back to default setting $defaultZkConnect")
+ defaultZkConnect
+ }
+ }
+
+ /**
+ * Start the broker.
+ */
+ def start() {
+ debug(s"Starting embedded Kafka broker at $brokerList (using ZooKeeper server at $zookeeperConnect) ...")
+ kafka.startup()
+ debug("Embedded Kafka broker startup completed")
+ }
+
+ /**
+ * Stop the broker.
+ */
+ def stop() {
+ debug("Shutting down embedded Kafka broker...")
+ kafka.shutdown()
+ debug("Embedded Kafka broker shutdown completed")
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/scala/com/miguno/kafkastorm/kafka/KafkaProducerApp.scala b/src/main/scala/com/miguno/kafkastorm/kafka/KafkaProducerApp.scala
new file mode 100644
index 0000000..7b30db7
--- /dev/null
+++ b/src/main/scala/com/miguno/kafkastorm/kafka/KafkaProducerApp.scala
@@ -0,0 +1,77 @@
+package com.miguno.kafkastorm.kafka
+
+import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
+import java.util.Properties
+
+/**
+ * Demonstrates how to implement a simple Kafka producer application to send data to Kafka.
+ *
+ * Don't read too much into the actual implementation of this class. Its sole purpose is to showcase the use of the
+ * Kafka API.
+ *
+ * @param topic The Kafka topic to send data to.
+ * @param brokerList Value for Kafka's `metadata.broker.list` setting.
+ * @param producerConfig Additional producer configuration settings.
+ */
+case class KafkaProducerApp(
+ val topic: String,
+ val brokerList: String,
+ producerConfig: Properties = new Properties
+ ) {
+
+ private val producer = {
+ val effectiveConfig = {
+ val c = new Properties
+ c.load(this.getClass.getResourceAsStream("/producer-defaults.properties"))
+ c.putAll(producerConfig)
+ c.put("metadata.broker.list", brokerList)
+ c
+ }
+ new Producer[Array[Byte], Array[Byte]](new ProducerConfig(effectiveConfig))
+ }
+
+ // The configuration field of the wrapped producer is immutable (including its nested fields), so it's safe to expose
+ // it directly.
+ val config = producer.config
+
+ private def toMessage(key: Option[Array[Byte]], value: Array[Byte]): KeyedMessage[Array[Byte], Array[Byte]] =
+ key match {
+ case Some(key) => new KeyedMessage(topic, key, value)
+ case _ => new KeyedMessage(topic, value)
+ }
+
+ def send(key: Array[Byte], value: Array[Byte]): Unit = producer.send(toMessage(Some(key), value))
+
+ def send(value: Array[Byte]): Unit = producer.send(toMessage(None, value))
+
+}
+
+/**
+ * Creates KafkaProducerApp instances.
+ *
+ * We require such a factory because of how Storm and notably
+ * [[http://storm.incubator.apache.org/documentation/Serialization.html serialization within Storm]] work.
+ * Without such a factory we cannot properly unit tests Storm bolts that need to write to Kafka.
+ *
+ * Preferably we would simply pass a Kafka producer directly to a Storm bolt. During testing we could then mock this
+ * collaborator. However this intuitive approach fails at (Storm) runtime because Kafka producers are not serializable.
+ * The alternative approach of instantiating the Kafka producer from within the bolt (e.g. using a `@transient lazy val`
+ * field) does work at runtime but prevents us from verifying the correct interaction between our bolt's code and its
+ * collaborator, the Kafka producer, because we cannot easily mock the producer in this setup. The chosen approach of
+ * the factory method, while introducing some level of unwanted indirection and complexity, is a pragmatic approach to
+ * make our Storm code work correctly at runtime and to make it testable.
+ *
+ * @param topic The Kafka topic to send data to.
+ * @param brokerList Value for Kafka's `metadata.broker.list` setting.
+ * @param config Additional producer configuration settings.
+ */
+abstract class KafkaProducerAppFactory(topic: String, brokerList: String, config: Properties) extends Serializable {
+ def newInstance(): KafkaProducerApp
+}
+
+class BaseKafkaProducerAppFactory(topic: String, brokerList: String, config: Properties = new Properties)
+ extends KafkaProducerAppFactory(topic, brokerList, config) {
+
+ override def newInstance() = new KafkaProducerApp(topic, brokerList, config)
+
+}
\ No newline at end of file
diff --git a/src/main/scala/com/miguno/kafkastorm/storm/AvroDecoderBolt.scala b/src/main/scala/com/miguno/kafkastorm/storm/AvroDecoderBolt.scala
new file mode 100644
index 0000000..5390e7a
--- /dev/null
+++ b/src/main/scala/com/miguno/kafkastorm/storm/AvroDecoderBolt.scala
@@ -0,0 +1,106 @@
+package com.miguno.kafkastorm.storm
+
+import backtype.storm.topology.base.BaseBasicBolt
+import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
+import backtype.storm.tuple.{Fields, Tuple, Values}
+import com.google.common.base.Throwables
+import com.twitter.bijection.avro.SpecificAvroCodecs
+import com.twitter.bijection.Injection
+import org.apache.avro.specific.SpecificRecordBase
+import org.slf4j.{Logger, LoggerFactory}
+import scala.util.{Try, Failure, Success}
+
+/**
+ * An binaryAvro->pojoAvro converter bolt.
+ *
+ * This bolt expects incoming data in Avro-encoded binary format, serialized according to the Avro schema of `T`. It
+ * will deserialize the incoming data into a `T` pojo, and emit this pojo to downstream consumers. As such this bolt
+ * can be considered the Storm equivalent of Twitter Bijection's `Injection.invert[T, Array[Byte]](bytes)` for
+ * Avro data.
+ *
+ * By using this bolt you don't need to write another decoder bolt just because the bolt needs to handle a different
+ * Avro schema.
+ *
+ * @example {{{
+ * import backtype.storm.topology.TopologyBuilder
+ * import com.miguno.avro.Tweet
+ *
+ * val builder = new TopologyBuilder
+ * // ...spout is set up here...
+ * val decoderBolt = new AvroDecoderBolt[Tweet]
+ * builder.setBolt(decoderBoltId, decoderBolt).shuffleGrouping(spoutId) // or whatever grouping you need
+ * }}}
+ *
+ * @param inputField The name of the field in the input tuple to read from. (Default: "bytes")
+ * @param outputField The name of the field in the output tuple to write to. (Default: "pojo")
+ * @tparam T The type of the Avro record (e.g. a `Tweet`) based on the underlying Avro schema being used. Must be
+ * a subclass of Avro's `SpecificRecordBase`.
+ */
+class AvroDecoderBolt[T <: SpecificRecordBase : Manifest](
+ inputField: String = "bytes",
+ outputField: String = "pojo")
+ extends BaseBasicBolt {
+
+ // Note: Ideally we would like to use TypeTag's instead of Manifest's here. Doing so would only require replacing
+ // `manifest[T]` with `typeOf[T]`, and adding AvroDecoderBolt[T : TypeTag]. Unfortunately there is a known
+ // serialization bug in Scala's TypeTag implementation that will trigger runtime exceptions when submitting/running
+ // this class in a Storm topology.
+ //
+ // See "SI-5919: Type tags (and Exprs as well) should be serializable" (https://issues.scala-lang.org/browse/SI-5919)
+ val tpe = manifest[T]
+
+ // Must be transient because Logger is not serializable
+ @transient lazy private val log: Logger = LoggerFactory.getLogger(classOf[AvroDecoderBolt[T]])
+
+ // Must be transient because Injection is not serializable. Must be implicit because that's who Injection works.
+ @transient lazy implicit private val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
+ SpecificAvroCodecs.toBinary[T]
+
+ override def execute(tuple: Tuple, collector: BasicOutputCollector) {
+ val readTry = Try(tuple.getBinaryByField(inputField))
+ readTry match {
+ case Success(bytes) if bytes != null => decodeAndSinkToKafka(bytes, collector)
+ case Success(_) => log.error("Reading from input tuple returned null")
+ case Failure(e) => log.error("Could not read from input tuple: " + Throwables.getStackTraceAsString(e))
+ }
+ }
+
+ private def decodeAndSinkToKafka(bytes: Array[Byte], collector: BasicOutputCollector) {
+ require(bytes != null, "bytes must not be null")
+ val decodeTry = Injection.invert[T, Array[Byte]](bytes)
+ decodeTry match {
+ case Success(pojo) =>
+ log.debug("Binary data decoded into pojo: " + pojo)
+ collector.emit(new Values(pojo))
+ case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e))
+ }
+ }
+
+ override def declareOutputFields(declarer: OutputFieldsDeclarer) {
+ declarer.declare(new Fields(outputField))
+ }
+
+}
+
+object AvroDecoderBolt {
+
+ /**
+ * Factory method for Java interoperability.
+ *
+ * @example {{{
+ * // in Java
+ * AvroDecoderBolt decoderBolt = AvroDecoderBolt.ofType(Tweet.class);
+ * }}}
+ *
+ * @param cls
+ * @tparam T
+ * @return
+ */
+ def ofType[T <: SpecificRecordBase](cls: java.lang.Class[T]) = {
+ val manifest = Manifest.classType[T](cls)
+ newInstance[T](manifest)
+ }
+
+ private def newInstance[T <: SpecificRecordBase : Manifest] = new AvroDecoderBolt[T]
+
+}
\ No newline at end of file
diff --git a/src/main/scala/com/miguno/kafkastorm/storm/AvroKafkaSinkBolt.scala b/src/main/scala/com/miguno/kafkastorm/storm/AvroKafkaSinkBolt.scala
new file mode 100644
index 0000000..7cd79d8
--- /dev/null
+++ b/src/main/scala/com/miguno/kafkastorm/storm/AvroKafkaSinkBolt.scala
@@ -0,0 +1,104 @@
+package com.miguno.kafkastorm.storm
+
+import backtype.storm.task.TopologyContext
+import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
+import backtype.storm.topology.base.BaseBasicBolt
+import backtype.storm.tuple.{Tuple, Fields}
+import com.miguno.kafkastorm.kafka.{KafkaProducerAppFactory, KafkaProducerApp}
+import com.twitter.bijection.Injection
+import com.twitter.bijection.avro.SpecificAvroCodecs
+import java.util
+import org.apache.avro.specific.SpecificRecordBase
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+ * A Storm->Kafka writer bolt.
+ *
+ * This bolt expects Avro pojos of type `T` as incoming data. It will Avro-encode these pojos into a binary
+ * representation (bytes) according to the Avro schema of `T`, and then send these bytes to Kafka.
+ *
+ * @param producerFactory A factory to instantiate the required Kafka producer. We require such a factory because of
+ * unit testing and the way Storm code is (shipped and) executed in a Storm cluster. Because
+ * a bolt is instantiated on a different JVM we cannot simply pass the "final" Kafka producer
+ * directly to the bolt when we wire the topology. Instead we must enable each bolt instance to
+ * create its own Kafka producer when it is starting up (and this startup typically happens in a
+ * different JVM on a different machine).
+ * @param inputField The name of the field in the input tuple to read from. (Default: "pojo")
+ * @param outputField The name of the field in the output tuple to write to. (Default: "bytes")
+ * @tparam T The type of the Avro record (e.g. a `Tweet`) based on the underlying Avro schema being used. Must be
+ * a subclass of Avro's `SpecificRecordBase`.
+ */
+class AvroKafkaSinkBolt[T <: SpecificRecordBase : Manifest](
+ producerFactory: KafkaProducerAppFactory,
+ inputField: String = "pojo",
+ outputField: String = "bytes")
+ extends BaseBasicBolt {
+
+ // Note: Ideally we would like to use TypeTag's instead of Manifest's here. Doing so would only require replacing
+ // `manifest[T]` with `typeOf[T]`, and adding AvroKafkaSinkBolt[T : TypeTag]. Unfortunately there is a known
+ // serialization bug in Scala's TypeTag implementation that will trigger runtime exceptions when submitting/running
+ // this class in a Storm topology.
+ //
+ // See "SI-5919: Type tags (and Exprs as well) should be serializable" (https://issues.scala-lang.org/browse/SI-5919)
+ val tpe = manifest[T]
+
+ // Must be transient because Logger is not serializable
+ @transient lazy private val log: Logger = LoggerFactory.getLogger(classOf[AvroKafkaSinkBolt[T]])
+
+ // Must be transient because Injection is not serializable
+ @transient lazy implicit private val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
+ SpecificAvroCodecs.toBinary[T]
+
+ // Must be transient because KafkaProducerApp is not serializable. The factory approach to instantiate a Kafka producer
+ // unfortunately means we must use a var combined with `prepare()` -- a val would cause a NullPointerException at
+ // runtime for the producer.
+ @transient private var producer: KafkaProducerApp = _
+
+ override def prepare(stormConf: util.Map[_, _], context: TopologyContext) {
+ producer = producerFactory.newInstance()
+ }
+
+ override def execute(tuple: Tuple, collector: BasicOutputCollector) {
+ tuple.getValueByField(inputField) match {
+ case pojo: T =>
+ val bytes = Injection[T, Array[Byte]](pojo)
+ log.debug("Encoded pojo " + pojo + " to Avro binary format")
+ producer.send(bytes)
+ case _ => log.error("Could not decode binary data")
+ }
+ }
+
+ override def declareOutputFields(declarer: OutputFieldsDeclarer) {
+ declarer.declare(new Fields())
+ }
+
+}
+
+object AvroKafkaSinkBolt {
+
+ /**
+ * Factory method for Java interoperability.
+ *
+ * @example {{{
+ * // Java example
+ * AvroKafkaSinkBolt kafkaSinkBolt = AvroKafkaSinkBolt.ofType(Tweet.class)(brokerList, ...);
+ * }}}
+ *
+ * @param cls
+ * @tparam T
+ * @return
+ */
+ def ofType[T <: SpecificRecordBase](cls: java.lang.Class[T])(
+ producerFactory: KafkaProducerAppFactory,
+ inputFieldName: String = "pojo") = {
+ val manifest = Manifest.classType[T](cls)
+ newInstance[T](producerFactory, inputFieldName)(manifest)
+ }
+
+ private def newInstance[T <: SpecificRecordBase](
+ producerFactory: KafkaProducerAppFactory,
+ inputFieldName: String = "pojo")
+ (implicit man: Manifest[T]) =
+ new AvroKafkaSinkBolt[T](producerFactory, inputFieldName)
+
+}
\ No newline at end of file
diff --git a/src/main/scala/com/miguno/kafkastorm/storm/AvroScheme.scala b/src/main/scala/com/miguno/kafkastorm/storm/AvroScheme.scala
new file mode 100644
index 0000000..8a767e6
--- /dev/null
+++ b/src/main/scala/com/miguno/kafkastorm/storm/AvroScheme.scala
@@ -0,0 +1,80 @@
+package com.miguno.kafkastorm.storm
+
+import backtype.storm.spout.Scheme
+import backtype.storm.tuple.{Fields, Values}
+import com.twitter.bijection.Injection
+import com.twitter.bijection.avro.SpecificAvroCodecs
+import org.apache.avro.specific.SpecificRecordBase
+import scala.util.{Failure, Success}
+
+/**
+ * A custom binaryAvro->pojoAvro `backtype.storm.spout.Scheme` to auto-deserialize a spout's incoming data. You can
+ * parameterize this scheme with the Avro type `T` of the spout's expected input data.
+ *
+ * In the case of `storm.kafka.KafkaSpout` its default scheme is Storm's `backtype.storm.spout.RawMultiScheme`,
+ * which simply returns the raw bytes of the incoming data (i.e. leaving deserialization up to you in subsequent bolts
+ * such as [[AvroDecoderBolt]]). Alternatively, you configure the spout to use this custom scheme. If you do, then the
+ * spout will automatically deserialize its incoming data into pojos. Note that you will need to register a custom
+ * Kryo decorator for the Avro type `T`, see [[TweetAvroKryoDecorator]] for an example.
+ *
+ * @example {{{
+ * import backtype.storm.spout.SchemeAsMultiScheme
+ * import com.miguno.avro.Tweet
+ * storm.kafka.{KafkaSpout, SpoutConfig}
+ *
+ * val spoutConfig = new SpoutConfig(...)
+ * spoutConfig.scheme = new SchemeAsMultiScheme(new AvroScheme[Tweet])
+ * val kafkaSpout = new KafkaSpout(spoutConfig)
+ * }}}
+ *
+ * @tparam T The type of the Avro record (e.g. a `Tweet`) based on the underlying Avro schema being used. Must be
+ * a subclass of Avro's `SpecificRecordBase`.
+ */
+class AvroScheme[T <: SpecificRecordBase : Manifest] extends Scheme {
+
+ // Note: Ideally we would like to use TypeTag's instead of Manifest's here. Doing so would only require replacing
+ // `manifest[T]` with `typeOf[T]`, and adding AvroScheme[T : TypeTag]. Unfortunately there is a known serialization
+ // bug in the TypeTag implementation of Scala versions <= 2.11.1 that will trigger runtime exceptions when
+ // submitting/running this class in a Storm topology.
+ //
+ // See "SI-5919: Type tags (and Exprs as well) should be serializable" (https://issues.scala-lang.org/browse/SI-5919)
+ val tpe = manifest[T]
+
+ private val OutputFieldName = "pojo"
+
+ @transient lazy implicit private val specificAvroBinaryInjection = SpecificAvroCodecs.toBinary[T]
+
+ override def deserialize(bytes: Array[Byte]): java.util.List[AnyRef] = {
+ val result = Injection.invert[T, Array[Byte]](bytes)
+ result match {
+ case Success(pojo) => new Values(pojo)
+ case Failure(e) => throw new RuntimeException("Could not decode input bytes")
+ }
+ }
+
+ override def getOutputFields() = new Fields(OutputFieldName)
+
+}
+
+object AvroScheme {
+
+ /**
+ * Factory method for Java interoperability.
+ *
+ * @example {{{
+ * // in Java
+ * AvroScheme avroScheme = AvroScheme.ofType(Tweet.class);
+ * }}}
+ *
+ * @param cls
+ * @tparam T
+ * @return
+ */
+ def ofType[T <: SpecificRecordBase](cls: java.lang.Class[T]) = {
+ val manifest = Manifest.classType[T](cls)
+ newInstance[T](manifest)
+ }
+
+ private def newInstance[T <: SpecificRecordBase : Manifest] = new AvroScheme[T]
+
+}
\ No newline at end of file
diff --git a/src/main/scala/com/miguno/kafkastorm/storm/KafkaStormDemo.scala b/src/main/scala/com/miguno/kafkastorm/storm/KafkaStormDemo.scala
new file mode 100644
index 0000000..4a81ea9
--- /dev/null
+++ b/src/main/scala/com/miguno/kafkastorm/storm/KafkaStormDemo.scala
@@ -0,0 +1,121 @@
+package com.miguno.kafkastorm.storm
+
+import backtype.storm.{Config, LocalCluster}
+import backtype.storm.generated.KillOptions
+import backtype.storm.topology.TopologyBuilder
+import com.miguno.kafkastorm.kafka.KafkaEmbedded
+import com.miguno.kafkastorm.zookeeper.ZooKeeperEmbedded
+import java.util.Properties
+import kafka.admin.AdminUtils
+import kafka.utils.ZKStringSerializer
+import org.I0Itec.zkclient.ZkClient
+import scala.concurrent.duration._
+import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts}
+
+/**
+ * Showcases how to create a Storm topology that reads data from Kafka. Because it's a demo this topology does not
+ * (yet?) do anything to the input data -- it just reads, that's it. If you want to add functionality you only need to
+ * put one or more Storm bolts after the spout that reads from Kafka.
+ *
+ * The default setup runs the topology against an in-memory instance of Kafka (that is backed by an in-memory instance
+ * of ZooKeeper). Alternatively, you can also point the topology to a "real" Kafka cluster. An easy and quick way to
+ * deploy such a Kafka and ZooKeeper infrastructure is to use a tool such as
+ * [[https://github.com/miguno/wirbelsturm Wirbelsturm]].
+ */
+class KafkaStormDemo(kafkaZkConnect: String, topic: String, numTopicPartitions: Int = 1,
+ topologyName: String = "kafka-storm-starter", runtime: Duration = 1.hour) {
+
+ def runTopologyLocally() {
+ val zkHosts = new ZkHosts(kafkaZkConnect)
+ val topic = "testing"
+ val zkRoot = "/kafka-spout"
+ // The spout appends this id to zkRoot when composing its ZooKeeper path. You don't need a leading `/`.
+ val zkSpoutId = "kafka-storm-starter"
+ val kafkaConfig = new SpoutConfig(zkHosts, topic, zkRoot, zkSpoutId)
+ val kafkaSpout = new KafkaSpout(kafkaConfig)
+ val numSpoutExecutors = numTopicPartitions
+ val builder = new TopologyBuilder
+ val spoutId = "kafka-spout"
+ builder.setSpout(spoutId, kafkaSpout, numSpoutExecutors)
+
+ // Showcases how to customize the topology configuration
+ val topologyConfiguration = {
+ val c = new Config
+ c.setDebug(false)
+ c.setNumWorkers(4)
+ c.setMaxSpoutPending(1000)
+ c.setMessageTimeoutSecs(60)
+ c.setNumAckers(0)
+ c.setMaxTaskParallelism(50)
+ c.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384: Integer)
+ c.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384: Integer)
+ c.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8: Integer)
+ c.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32: Integer)
+ c.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.05: java.lang.Double)
+ c
+ }
+
+ // Now run the topology in a local, in-memory Storm cluster
+ val cluster = new LocalCluster
+ cluster.submitTopology(topologyName, topologyConfiguration, builder.createTopology())
+ Thread.sleep(runtime.toMillis)
+ val killOpts = new KillOptions()
+ killOpts.set_wait_secs(1)
+ cluster.killTopologyWithOpts(topologyName, killOpts)
+ cluster.shutdown()
+ }
+
+}
+
+object KafkaStormDemo {
+
+ private var zookeeperEmbedded: Option[ZooKeeperEmbedded] = None
+ private var zkClient: Option[ZkClient] = None
+ private var kafkaEmbedded: Option[KafkaEmbedded] = None
+
+ def main(args: Array[String]) {
+ val kafkaTopic = "testing"
+ startZooKeeperAndKafka(kafkaTopic)
+ for {z <- zookeeperEmbedded} {
+ val topology = new KafkaStormDemo(z.connectString, kafkaTopic)
+ topology.runTopologyLocally()
+ }
+ stopZooKeeperAndKafka()
+ }
+
+ /**
+ * Launches in-memory, embedded instances of ZooKeeper and Kafka so that our demo Storm topology can connect to and
+ * read from Kafka.
+ */
+ private def startZooKeeperAndKafka(topic: String, numTopicPartitions: Int = 1, numTopicReplicationFactor: Int = 1,
+ zookeeperPort: Int = 2181) {
+
+ zookeeperEmbedded = Some(new ZooKeeperEmbedded(zookeeperPort))
+ for {z <- zookeeperEmbedded} {
+ val brokerConfig = new Properties
+ brokerConfig.put("zookeeper.connect", z.connectString)
+ kafkaEmbedded = Some(new KafkaEmbedded(brokerConfig))
+ for {k <- kafkaEmbedded} {
+ k.start()
+ }
+
+ val sessionTimeout = 30.seconds
+ val connectionTimeout = 30.seconds
+ zkClient = Some(new ZkClient(z.connectString, sessionTimeout.toMillis.toInt, connectionTimeout.toMillis.toInt,
+ ZKStringSerializer))
+ for {
+ zc <- zkClient
+ } {
+ val topicConfig = new Properties
+ AdminUtils.createTopic(zc, topic, numTopicPartitions, numTopicReplicationFactor, topicConfig)
+ }
+ }
+ }
+
+ private def stopZooKeeperAndKafka() {
+ for {k <- kafkaEmbedded} k.stop()
+ for {zc <- zkClient} zc.close()
+ for {z <- zookeeperEmbedded} z.stop()
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/scala/com/miguno/kafkastorm/storm/TweetAvroKryoDecorator.scala b/src/main/scala/com/miguno/kafkastorm/storm/TweetAvroKryoDecorator.scala
new file mode 100644
index 0000000..313a423
--- /dev/null
+++ b/src/main/scala/com/miguno/kafkastorm/storm/TweetAvroKryoDecorator.scala
@@ -0,0 +1,14 @@
+package com.miguno.kafkastorm.storm
+
+import backtype.storm.serialization.IKryoDecorator
+import com.esotericsoftware.kryo.Kryo
+import com.miguno.avro.Tweet
+import com.twitter.chill.KryoSerializer
+import com.twitter.chill.avro.AvroSerializer
+
+class TweetAvroKryoDecorator extends IKryoDecorator {
+ override def decorate(k: Kryo) {
+ k.register(classOf[Tweet], AvroSerializer.SpecificRecordSerializer[Tweet])
+ KryoSerializer.registerAll(k)
+ }
+}
\ No newline at end of file
diff --git a/src/main/scala/com/miguno/kafkastorm/storm/utils/StormRunner.scala b/src/main/scala/com/miguno/kafkastorm/storm/utils/StormRunner.scala
new file mode 100644
index 0000000..2a71e72
--- /dev/null
+++ b/src/main/scala/com/miguno/kafkastorm/storm/utils/StormRunner.scala
@@ -0,0 +1,24 @@
+package com.miguno.kafkastorm.storm.utils
+
+import backtype.storm.{Config, StormSubmitter, LocalCluster}
+import backtype.storm.generated.StormTopology
+import scala.concurrent.duration._
+
+/**
+ * Provides convenience functions to run Storm topologies locally and remotely (i.e. in a "real" Storm cluster).
+ */
+object StormRunner {
+
+ def runTopologyLocally(topology: StormTopology, topologyName: String, conf: Config, runtime: Duration) {
+ val cluster: LocalCluster = new LocalCluster
+ cluster.submitTopology(topologyName, conf, topology)
+ Thread.sleep(runtime.toMillis)
+ cluster.killTopology(topologyName)
+ cluster.shutdown()
+ }
+
+ def runTopologyRemotely(topology: StormTopology, topologyName: String, conf: Config) {
+ StormSubmitter.submitTopology(topologyName, conf, topology)
+ }
+
+}
diff --git a/src/main/scala/com/miguno/kafkastorm/zookeeper/ZooKeeperEmbedded.scala b/src/main/scala/com/miguno/kafkastorm/zookeeper/ZooKeeperEmbedded.scala
new file mode 100644
index 0000000..f7a9e7a
--- /dev/null
+++ b/src/main/scala/com/miguno/kafkastorm/zookeeper/ZooKeeperEmbedded.scala
@@ -0,0 +1,36 @@
+package com.miguno.kafkastorm.zookeeper
+
+import com.netflix.curator.test.TestingServer
+import kafka.utils.Logging
+
+/**
+ * Runs an in-memory, "embedded" instance of a ZooKeeper server.
+ *
+ * The ZooKeeper server instance is automatically started when you create a new instance of this class.
+ *
+ * @param port The port (aka `clientPort`) to listen to. Default: 2181.
+ */
+class ZooKeeperEmbedded(port: Int) extends Logging {
+
+ debug(s"Starting embedded ZooKeeper server on port ${port}...")
+
+ private val server = new TestingServer(port)
+
+ /**
+ * Stop the instance.
+ */
+ def stop() {
+ debug("Shutting down embedded ZooKeeper server...")
+ server.close()
+ debug("Embedded ZooKeeper server shutdown completed")
+ }
+
+ /**
+ * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format.
+ * Example: `127.0.0.1:2181`.
+ *
+ * You can use this to e.g. tell Kafka and Storm how to connect to this instance.
+ */
+ val connectString = server.getConnectString
+
+}
\ No newline at end of file
diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties
new file mode 100644
index 0000000..25ae243
--- /dev/null
+++ b/src/test/resources/log4j.properties
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+kafka.logs.dir=logs
+
+log4j.rootLogger=WARN, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
+log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
+log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
+log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.cleanerAppender.File=log-cleaner.log
+log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
+log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+# Turn on all our debugging info
+#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
+#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
+#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+log4j.logger.kafka=WARN, kafkaAppender
+# Set WARN to INFO to see e.g. effective Kafka broker/consumer/producer config properties (cf. VerifiableProperties)
+log4j.logger.kafka.utils=WARN, kafkaAppender
+
+log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
+log4j.additivity.kafka.network.RequestChannel$=false
+
+#log4j.logger.kafka.network.Processor=TRACE, requestAppender
+#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
+#log4j.additivity.kafka.server.KafkaApis=false
+log4j.logger.kafka.request.logger=WARN, requestAppender
+log4j.additivity.kafka.request.logger=false
+
+log4j.logger.kafka.controller=TRACE, controllerAppender
+log4j.additivity.kafka.controller=false
+
+log4j.logger.kafka.log.LogCleaner=WARN, cleanerAppender
+log4j.additivity.kafka.log.LogCleaner=false
+log4j.logger.kafka.log.Cleaner=WARN, cleanerAppender
+log4j.additivity.kafka.log.Cleaner=false
+
+log4j.logger.state.change.logger=TRACE, stateChangeAppender
+log4j.additivity.state.change.logger=false
+
+# kafka-storm-starter settings
+log4j.logger.com.miguno.kafkastorm=DEBUG, stdout
+# If additivity is not set to false you will see log messages for com.miguno.kafkastorm.* twice.
+log4j.additivity.com.miguno.kafkastorm=false
diff --git a/src/test/scala/com/miguno/kafkastorm/integration/IntegrationSuite.scala b/src/test/scala/com/miguno/kafkastorm/integration/IntegrationSuite.scala
new file mode 100644
index 0000000..7e7b2b2
--- /dev/null
+++ b/src/test/scala/com/miguno/kafkastorm/integration/IntegrationSuite.scala
@@ -0,0 +1,9 @@
+package com.miguno.kafkastorm.integration
+
+import org.scalatest.Stepwise
+
+class IntegrationSuite extends Stepwise(
+ new KafkaSpec,
+ new StormSpec,
+ new KafkaStormSpec
+)
\ No newline at end of file
diff --git a/src/test/scala/com/miguno/kafkastorm/integration/IntegrationTest.scala b/src/test/scala/com/miguno/kafkastorm/integration/IntegrationTest.scala
new file mode 100644
index 0000000..c8c9ecf
--- /dev/null
+++ b/src/test/scala/com/miguno/kafkastorm/integration/IntegrationTest.scala
@@ -0,0 +1,5 @@
+package com.miguno.kafkastorm.integration
+
+import org.scalatest.Tag
+
+object IntegrationTest extends Tag("com.miguno.kafkastorm.integration.IntegrationTest")
diff --git a/src/test/scala/com/miguno/kafkastorm/integration/KafkaSpec.scala b/src/test/scala/com/miguno/kafkastorm/integration/KafkaSpec.scala
new file mode 100644
index 0000000..e58485a
--- /dev/null
+++ b/src/test/scala/com/miguno/kafkastorm/integration/KafkaSpec.scala
@@ -0,0 +1,218 @@
+package com.miguno.kafkastorm.integration
+
+import _root_.kafka.message.MessageAndMetadata
+import _root_.kafka.utils.{Logging, ZKStringSerializer}
+import com.miguno.avro.Tweet
+import com.miguno.kafkastorm.kafka.{KafkaProducerApp, ConsumerTaskContext, KafkaConsumer, KafkaEmbedded}
+import com.miguno.kafkastorm.zookeeper.ZooKeeperEmbedded
+import com.twitter.bijection.Injection
+import com.twitter.bijection.avro.SpecificAvroCodecs
+import java.util.Properties
+import org.I0Itec.zkclient.ZkClient
+import org.scalatest._
+import scala.collection.mutable
+import scala.concurrent.duration._
+import kafka.admin.AdminUtils
+
+@DoNotDiscover
+class KafkaSpec extends FunSpec with Matchers with BeforeAndAfterAll with GivenWhenThen with Logging {
+
+ private val testTopic = "testing"
+ private val testTopicNumPartitions = 1
+ private val testTopicReplicationFactor = 1
+ private val zookeeperPort = 2181
+
+ private var zookeeperEmbedded: Option[ZooKeeperEmbedded] = None
+ private var zkClient: Option[ZkClient] = None
+ private var kafkaEmbedded: Option[KafkaEmbedded] = None
+
+ implicit val specificAvroBinaryInjectionForTweet = SpecificAvroCodecs.toBinary[Tweet]
+
+ override def beforeAll() {
+ // Start embedded ZooKeeper server
+ zookeeperEmbedded = Some(new ZooKeeperEmbedded(zookeeperPort))
+
+ for {z <- zookeeperEmbedded} {
+ // Start embedded Kafka broker
+ val brokerConfig = new Properties
+ brokerConfig.put("zookeeper.connect", z.connectString)
+ kafkaEmbedded = Some(new KafkaEmbedded(brokerConfig))
+ for {k <- kafkaEmbedded} {
+ k.start()
+ }
+
+ // Create test topic
+ val sessionTimeout = 30.seconds
+ val connectionTimeout = 30.seconds
+ zkClient = Some(new ZkClient(z.connectString, sessionTimeout.toMillis.toInt, connectionTimeout.toMillis.toInt,
+ ZKStringSerializer))
+ for {
+ zc <- zkClient
+ } {
+ val topicConfig = new Properties
+ AdminUtils.createTopic(zc, testTopic, testTopicNumPartitions, testTopicReplicationFactor, topicConfig)
+ }
+ }
+ }
+
+ override def afterAll() {
+ for {k <- kafkaEmbedded} k.stop()
+
+ for {
+ zc <- zkClient
+ } {
+ info("ZooKeeper client: shutting down...")
+ zc.close()
+ info("ZooKeeper client: shutdown completed")
+ }
+
+ for {z <- zookeeperEmbedded} z.stop()
+ }
+
+
+ val fixture = {
+ val BeginningOfEpoch = 0.seconds
+ val AnyTimestamp = 1234.seconds
+ val now = System.currentTimeMillis().millis
+
+ new {
+ val t1 = new Tweet("ANY_USER_1", "ANY_TEXT_1", now.toSeconds)
+ val t2 = new Tweet("ANY_USER_2", "ANY_TEXT_2", BeginningOfEpoch.toSeconds)
+ val t3 = new Tweet("ANY_USER_3", "ANY_TEXT_3", AnyTimestamp.toSeconds)
+
+ val messages = Seq(t1, t2, t3)
+ }
+ }
+
+ describe("Kafka") {
+
+ it("should synchronously send and receive a Tweet in Avro format", IntegrationTest) {
+ for {
+ z <- zookeeperEmbedded
+ k <- kafkaEmbedded
+ } {
+ Given("a ZooKeeper instance")
+ And("a Kafka broker instance")
+ And("some tweets")
+ val f = fixture
+ val tweets = f.messages
+ And("a single-threaded Kafka consumer group")
+ // The Kafka consumer group must be running before the first messages are being sent to the topic.
+ val numConsumerThreads = 1
+ val consumerConfig = {
+ val c = new Properties
+ c.put("group.id", "test-consumer")
+ c
+ }
+ val consumer = new KafkaConsumer(testTopic, z.connectString, numConsumerThreads, consumerConfig)
+ val actualTweets = new mutable.SynchronizedQueue[Tweet]
+ consumer.startConsumers(
+ (m: MessageAndMetadata[Array[Byte], Array[Byte]], c: ConsumerTaskContext) => {
+ val tweet = Injection.invert[Tweet, Array[Byte]](m.message)
+ for {t <- tweet} {
+ info(s"Consumer thread ${c.threadId}: received Tweet ${t} from partition ${m.partition} of topic ${m.topic} (offset: ${m.offset})")
+ actualTweets += t
+ }
+ })
+ val waitForConsumerStartup = 300.millis
+ debug(s"Waiting $waitForConsumerStartup ms for Kafka consumer threads to launch")
+ Thread.sleep(waitForConsumerStartup.toMillis)
+ debug("Finished waiting for Kafka consumer threads to launch")
+
+ When("I start a synchronous Kafka producer that sends the tweets in Avro binary format")
+ val syncProducerConfig = {
+ val c = new Properties
+ c.put("producer.type", "sync")
+ c.put("client.id", "test-sync-producer")
+ c.put("request.required.acks", "1")
+ c
+ }
+ val producerApp = new KafkaProducerApp(testTopic, k.brokerList, syncProducerConfig)
+ tweets foreach {
+ case tweet => {
+ val bytes = Injection[Tweet, Array[Byte]](tweet)
+ info(s"Synchronously sending Tweet $tweet to topic ${producerApp.topic}")
+ producerApp.send(bytes)
+ }
+ }
+
+ Then("the consumer app should receive the tweets")
+ val waitForConsumerToReadStormOutput = 300.millis
+ debug(s"Waiting $waitForConsumerToReadStormOutput ms for Kafka consumer threads to read messages")
+ Thread.sleep(waitForConsumerToReadStormOutput.toMillis)
+ debug("Finished waiting for Kafka consumer threads to read messages")
+ debug("Shutting down Kafka consumer threads")
+ consumer.shutdown()
+
+ actualTweets.toSeq should be(f.messages.toSeq)
+ }
+ }
+
+ it("should asynchronously send and receive a Tweet in Avro format", IntegrationTest) {
+ for {
+ z <- zookeeperEmbedded
+ k <- kafkaEmbedded
+ } {
+ Given("a ZooKeeper instance")
+ And("a Kafka broker instance")
+ And("some tweets")
+ val f = fixture
+ val tweets = f.messages
+ And("a single-threaded Kafka consumer group")
+ // The Kafka consumer group must be running before the first messages are being sent to the topic.
+ val numConsumerThreads = 1
+ val consumerConfig = {
+ val c = new Properties
+ c.put("group.id", "test-consumer")
+ c
+ }
+ val consumer = new KafkaConsumer(testTopic, z.connectString, numConsumerThreads, consumerConfig)
+ val actualTweets = new mutable.SynchronizedQueue[Tweet]
+ consumer.startConsumers(
+ (m: MessageAndMetadata[Array[Byte], Array[Byte]], c: ConsumerTaskContext) => {
+ val tweet = Injection.invert[Tweet, Array[Byte]](m.message)
+ for {t <- tweet} {
+ info(s"Consumer thread ${c.threadId}: received Tweet ${t} from partition ${m.partition} of topic ${m.topic} (offset: ${m.offset})")
+ actualTweets += t
+ }
+ })
+ val waitForConsumerStartup = 300.millis
+ debug(s"Waiting $waitForConsumerStartup ms for Kafka consumer threads to launch")
+ Thread.sleep(waitForConsumerStartup.toMillis)
+ debug("Finished waiting for Kafka consumer threads to launch")
+
+ When("I start an asynchronous Kafka producer that sends the tweets in Avro binary format")
+ val syncProducerConfig = {
+ val c = new Properties
+ c.put("producer.type", "async")
+ c.put("client.id", "test-sync-producer")
+ c.put("request.required.acks", "1")
+ // We must set `batch.num.messages` and/or `queue.buffering.max.ms` so that the async producer will actually
+ // send our (typically few) test messages before the unit test finishes.
+ c.put("batch.num.messages", tweets.size.toString)
+ c
+ }
+ val producerApp = new KafkaProducerApp(testTopic, k.brokerList, syncProducerConfig)
+ tweets foreach {
+ case tweet => {
+ val bytes = Injection[Tweet, Array[Byte]](tweet)
+ info(s"Asynchronously sending Tweet $tweet to topic ${producerApp.topic}")
+ producerApp.send(bytes)
+ }
+ }
+
+ Then("the consumer app should receive the tweets")
+ val waitForConsumerToReadStormOutput = 300.millis
+ debug(s"Waiting $waitForConsumerToReadStormOutput ms for Kafka consumer threads to read messages")
+ Thread.sleep(waitForConsumerToReadStormOutput.toMillis)
+ debug("Finished waiting for Kafka consumer threads to read messages")
+ debug("Shutting down Kafka consumer threads")
+ consumer.shutdown()
+
+ actualTweets.toSeq should be(f.messages.toSeq)
+ }
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/scala/com/miguno/kafkastorm/integration/KafkaStormSpec.scala b/src/test/scala/com/miguno/kafkastorm/integration/KafkaStormSpec.scala
new file mode 100644
index 0000000..ed2d8e7
--- /dev/null
+++ b/src/test/scala/com/miguno/kafkastorm/integration/KafkaStormSpec.scala
@@ -0,0 +1,317 @@
+package com.miguno.kafkastorm.integration
+
+import kafka.admin.AdminUtils
+import _root_.kafka.utils.{Logging, ZKStringSerializer}
+import _root_.storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts}
+import backtype.storm.{Testing, ILocalCluster, Config}
+import backtype.storm.generated.StormTopology
+import backtype.storm.spout.SchemeAsMultiScheme
+import backtype.storm.testing._
+import backtype.storm.topology.TopologyBuilder
+import com.miguno.avro.Tweet
+import com.miguno.kafkastorm.kafka._
+import com.miguno.kafkastorm.storm.{AvroDecoderBolt, AvroKafkaSinkBolt, AvroScheme, TweetAvroKryoDecorator}
+import com.miguno.kafkastorm.zookeeper.ZooKeeperEmbedded
+import com.twitter.bijection.Injection
+import com.twitter.bijection.avro.SpecificAvroCodecs
+import java.util.Properties
+import kafka.message.MessageAndMetadata
+import org.I0Itec.zkclient.ZkClient
+import org.scalatest._
+import scala.collection.mutable
+import scala.concurrent.duration._
+
+/**
+ * This Kafka/Storm integration test code is slightly more complicated than the other tests in this project. This is
+ * due to a number of reasons, such as: the way Storm topologies are "wired" and configured, the test facilities
+ * exposed by Storm, and -- on a higher level -- because there are quite a number of components involved (ZooKeeper,
+ * Kafka producers and consumers, Storm) which must be set up, run, and terminated in the correct order. For these
+ * reasons the integration tests are not simple "given/when/then" style tests.
+ */
+@DoNotDiscover
+class KafkaStormSpec extends FeatureSpec with Matchers with BeforeAndAfterAll with GivenWhenThen with Logging {
+
+ private val inputTopic = "testing-input"
+ private val inputTopicNumPartitions = 1
+ private val inputTopicReplicationFactor = 1
+ private val outputTopic = "testing-output"
+ private val outputTopicNumPartitions = 1
+ private val outputTopicReplicationFactor = 1
+ private val zookeeperPort = 2181
+ private var zookeeperEmbedded: Option[ZooKeeperEmbedded] = None
+ private var zkClient: Option[ZkClient] = None
+ private var kafkaEmbedded: Option[KafkaEmbedded] = None
+
+ implicit val specificAvroBinaryInjectionForTweet = SpecificAvroCodecs.toBinary[Tweet]
+
+ override def beforeAll() {
+ // Start embedded ZooKeeper server
+ zookeeperEmbedded = Some(new ZooKeeperEmbedded(zookeeperPort))
+
+ for {z <- zookeeperEmbedded} {
+ // Start embedded Kafka broker
+ val brokerConfig = new Properties
+ brokerConfig.put("zookeeper.connect", z.connectString)
+ kafkaEmbedded = Some(new KafkaEmbedded(brokerConfig))
+ for {k <- kafkaEmbedded} {
+ k.start()
+ }
+
+ // Create test topics
+ val sessionTimeout = 30.seconds
+ val connectionTimeout = 30.seconds
+ // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then createTopic() will only
+ // seem to work (it will return without error). Topic will exist in only ZooKeeper, and will be returned when
+ // listing topics, but Kafka itself does not create the topic.
+ zkClient = Some(new ZkClient(z.connectString, sessionTimeout.toMillis.toInt, connectionTimeout.toMillis.toInt,
+ ZKStringSerializer))
+ for {
+ zc <- zkClient
+ } {
+ val inputTopicConfig = new Properties
+ AdminUtils.createTopic(zc, inputTopic, inputTopicNumPartitions, inputTopicReplicationFactor, inputTopicConfig)
+ val outputTopicConfig = new Properties
+ AdminUtils.createTopic(zc, outputTopic, outputTopicNumPartitions, outputTopicReplicationFactor,
+ outputTopicConfig)
+ }
+ }
+ }
+
+ override def afterAll() {
+ for {k <- kafkaEmbedded} k.stop()
+
+ for {
+ zc <- zkClient
+ } {
+ info("ZooKeeper client: shutting down...")
+ zc.close()
+ info("ZooKeeper client: shutdown completed")
+ }
+
+ for {z <- zookeeperEmbedded} z.stop()
+ }
+
+ val fixture = {
+ val BeginningOfEpoch = 0.seconds
+ val AnyTimestamp = 1234.seconds
+ val now = System.currentTimeMillis().millis
+
+ new {
+ val t1 = new Tweet("ANY_USER_1", "ANY_TEXT_1", now.toSeconds)
+ val t2 = new Tweet("ANY_USER_2", "ANY_TEXT_2", BeginningOfEpoch.toSeconds)
+ val t3 = new Tweet("ANY_USER_3", "ANY_TEXT_3", AnyTimestamp.toSeconds)
+
+ val messages = Seq(t1, t2, t3)
+ }
+ }
+
+ info("As a user of Storm")
+ info("I want to read Avro-encoded data from Kafka")
+ info("so that I can quickly build Kafka<->Storm data flows")
+
+ feature("AvroDecoderBolt[T]") {
+
+ scenario("User creates a Storm topology that uses AvroDecoderBolt", IntegrationTest) {
+ for {
+ k <- kafkaEmbedded
+ z <- zookeeperEmbedded
+ } {
+ Given("a ZooKeeper instance")
+ And("a Kafka broker instance")
+ And(s"a Storm topology that uses AvroDecoderBolt and that reads tweets from topic $inputTopic and writes " +
+ s"them as-is to topic $outputTopic")
+ // We create a topology instance that makes use of an Avro decoder bolt to deserialize the Kafka spout's output
+ // into pojos. Here, the data flow is KafkaSpout -> AvroDecoderBolt -> AvroKafkaSinkBolt.
+ val builder = new TopologyBuilder
+ val kafkaSpoutId = "kafka-spout"
+ val kafkaSpoutConfig = kafkaSpoutBaseConfig(z.connectString, inputTopic)
+ val kafkaSpout = new KafkaSpout(kafkaSpoutConfig)
+ val numSpoutExecutors = inputTopicNumPartitions
+ builder.setSpout(kafkaSpoutId, kafkaSpout, numSpoutExecutors)
+
+ val decoderBoltId = "avro-decoder-bolt"
+ val decoderBolt = new AvroDecoderBolt[Tweet]
+ // Note: Should test messages arrive out-of-order, we may want to enforce a parallelism of 1 for this bolt.
+ builder.setBolt(decoderBoltId, decoderBolt).globalGrouping(kafkaSpoutId)
+
+ val kafkaSinkBoltId = "avro-kafka-sink-bolt"
+ val producerAppFactory = new BaseKafkaProducerAppFactory(outputTopic, k.brokerList)
+ val kafkaSinkBolt = new AvroKafkaSinkBolt[Tweet](producerAppFactory)
+ // Note: Should test messages arrive out-of-order, we may want to enforce a parallelism of 1 for this bolt.
+ builder.setBolt(kafkaSinkBoltId, kafkaSinkBolt).globalGrouping(decoderBoltId)
+ val topology = builder.createTopology()
+
+ baseIntegrationTest(z, k, topology, inputTopic, outputTopic)
+ }
+ }
+ }
+
+ feature("AvroScheme[T] for Kafka spout") {
+ scenario("User creates a Storm topology that uses AvroScheme in Kafka spout", IntegrationTest) {
+ for {
+ k <- kafkaEmbedded
+ z <- zookeeperEmbedded
+ } {
+ Given("a ZooKeeper instance")
+ And("a Kafka broker instance")
+ And(s"a Storm topology that uses AvroScheme and that reads tweets from topic $inputTopic and writes them " +
+ s"as-is to topic $outputTopic")
+ // Creates a topology instance that adds an Avro decoder "scheme" to the Kafka spout, so that the spout's output
+ // are ready-to-use pojos. Here, the data flow is KafkaSpout -> AvroKafkaSinkBolt.
+ //
+ // Note that Storm will still need to re-serialize the spout's pojo output to send the data across the wire to
+ // downstream consumers/bolts, which will then deserialize the data again. In our case we have a custom Kryo
+ // serializer registered with Storm to make this serde step as fast as possible.
+ val builder = new TopologyBuilder
+ val kafkaSpoutId = "kafka-spout"
+ val kafkaSpoutConfig = kafkaSpoutBaseConfig(z.connectString, inputTopic)
+ // You can provide the Kafka spout with a custom `Scheme` to deserialize incoming messages in a particular way.
+ // The default scheme is Storm's `backtype.storm.spout.RawMultiScheme`, which simply returns the raw bytes of the
+ // incoming data (i.e. leaving deserialization up to you). In this example, we configure the spout to use
+ // a custom scheme, AvroScheme[Tweet], which will modify the spout to automatically deserialize incoming data
+ // into pojos.
+ kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new AvroScheme[Tweet])
+ val kafkaSpout = new KafkaSpout(kafkaSpoutConfig)
+ val numSpoutExecutors = inputTopicNumPartitions
+ builder.setSpout(kafkaSpoutId, kafkaSpout, numSpoutExecutors)
+
+ val kafkaSinkBoltId = "avro-kafka-sink-bolt"
+ val producerAppFactory = new BaseKafkaProducerAppFactory(outputTopic, k.brokerList)
+ val kafkaSinkBolt = new AvroKafkaSinkBolt[Tweet](producerAppFactory)
+ // Note: Should test messages arrive out-of-order, we may want to enforce a parallelism of 1 for this bolt.
+ builder.setBolt(kafkaSinkBoltId, kafkaSinkBolt).globalGrouping(kafkaSpoutId)
+ val topology = builder.createTopology()
+
+ baseIntegrationTest(z, k, topology, inputTopic, outputTopic)
+ }
+ }
+ }
+
+ private def kafkaSpoutBaseConfig(zookeeperConnect: String, inputTopic: String): SpoutConfig = {
+ val zkHosts = new ZkHosts(zookeeperConnect)
+ val zkRoot = "/kafka-storm-starter-spout"
+ // This id is appended to zkRoot for constructing a ZK path under which the spout stores partition information.
+ val zkId = "kafka-spout"
+ // To configure the spout to read from the very beginning of the topic (auto.offset.reset = smallest), you can use
+ // either of the following two equivalent approaches:
+ //
+ // 1. spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime
+ // 2. spoutConfig.forceFromStart = true
+ //
+ // To configure the spout to read from the end of the topic (auto.offset.reset = largest), you can use either of
+ // the following two equivalent approaches:
+ //
+ // 1. Do nothing -- reading from the end of the topic is the default behavior.
+ // 2. spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime
+ //
+ val spoutConfig = new SpoutConfig(zkHosts, inputTopic, zkRoot, zkId)
+ spoutConfig
+ }
+
+ /**
+ * This method sends Avro-encoded test data into a Kafka "input" topic. This data is read from Kafka into Storm,
+ * which will then decode and re-encode the data, and then write the data to an "output" topic in Kafka (which is our
+ * means/workaround to "tap into" Storm's output, as we haven't been able yet to use Storm's built-in testing
+ * facilities for such integration tests). Lastly, we read the data from the "output" topic via a Kafka consumer
+ * group, and then compare the output data with the input data, with the latter serving the dual purpose of also
+ * being the expected output data.
+ */
+ private def baseIntegrationTest(zookeeper: ZooKeeperEmbedded, kafka: KafkaEmbedded, topology: StormTopology,
+ inputTopic: String, outputTopic: String) {
+ And("some tweets")
+ val f = fixture
+ val tweets = f.messages
+
+ And(s"a synchronous Kafka producer app that writes to the topic $inputTopic")
+ val kafkaSyncProducerConfig = {
+ val c = new Properties
+ c.put("producer.type", "sync")
+ c.put("client.id", "kafka-storm-test-sync-producer")
+ c.put("request.required.acks", "1")
+ c
+ }
+ val producerApp = new KafkaProducerApp(inputTopic, kafka.brokerList, kafkaSyncProducerConfig)
+
+ And(s"a single-threaded Kafka consumer app that reads from topic $outputTopic")
+ // We start the Kafka consumer group, which (in our case) must be running before the first messages are being sent
+ // to the output Kafka topic. The Storm topology will write its output to this topic. We use the Kafka consumer
+ // group to learn which data was created by Storm, and compare this actual output data to the expected data (which
+ // in our case is the original input data).
+ val numConsumerThreads = 1
+ val kafkaConsumerConfig = {
+ val c = new Properties
+ c.put("group.id", "kafka-storm-test-consumer")
+ c
+ }
+ val consumer = new KafkaConsumer(outputTopic, zookeeper.connectString, numConsumerThreads, kafkaConsumerConfig)
+ val actualTweets = new mutable.SynchronizedQueue[Tweet]
+ consumer.startConsumers(
+ (m: MessageAndMetadata[Array[Byte], Array[Byte]], c: ConsumerTaskContext) => {
+ val tweet = Injection.invert[Tweet, Array[Byte]](m.message())
+ for {t <- tweet} {
+ info(s"Consumer thread ${c.threadId}: received Tweet $t from partition ${m.partition} of topic ${m.topic} " +
+ s"(offset: ${m.offset})")
+ actualTweets += t
+ }
+ })
+ val waitForConsumerStartup = 300.millis
+ Thread.sleep(waitForConsumerStartup.toMillis)
+
+ And("a Storm topology configuration that registers an Avro Kryo decorator for Tweet")
+ // We create the topology configuration here simply to clarify that it is part of the test's initial context defined
+ // under "Given".
+ val topologyConfig = {
+ val conf = new Config
+ // Use more than one worker thread. It looks as if serialization occurs only if you have actual parallelism in
+ // LocalCluster (i.e. numWorkers > 1).
+ conf.setNumWorkers(2)
+ // Never use Java's default serialization. This allows us to see whether Kryo serialization is properly
+ // configured and working for all types.
+ conf.setFallBackOnJavaSerialization(false)
+ // Serialization config, see http://storm.incubator.apache.org/documentation/Serialization.html
+ // Note: We haven't been able yet to come up with a KryoDecorator[Tweet] approach.
+ conf.registerDecorator(classOf[TweetAvroKryoDecorator])
+ conf
+ }
+
+ When("I run the Storm topology")
+ val stormTestClusterParameters = {
+ val mkClusterParam = new MkClusterParam
+ mkClusterParam.setSupervisors(2)
+ val daemonConf = new Config
+ // STORM_LOCAL_MODE_ZMQ: Whether or not to use ZeroMQ for messaging in local mode. If this is set to false, then
+ // Storm will use a pure-Java messaging system. The purpose of this flag is to make it easy to run Storm in local
+ // mode by eliminating the need for native dependencies, which can be difficult to install.
+ daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, false: java.lang.Boolean)
+ mkClusterParam.setDaemonConf(daemonConf)
+ mkClusterParam
+ }
+ Testing.withLocalCluster(stormTestClusterParameters, new TestJob() {
+ override def run(stormCluster: ILocalCluster) {
+ val topologyName = "storm-kafka-integration-test"
+ stormCluster.submitTopology(topologyName, topologyConfig, topology)
+ val waitForTopologyStartupMs = 3.seconds.toMillis
+ Thread.sleep(waitForTopologyStartupMs)
+
+ And("I use the Kafka producer app to Avro-encode the tweets and sent them to Kafka")
+ // Send the test input data to Kafka.
+ tweets foreach {
+ case tweet =>
+ val bytes = Injection[Tweet, Array[Byte]](tweet)
+ info(s"Synchronously sending Tweet $tweet to topic ${producerApp.topic}")
+ producerApp.send(bytes)
+ }
+
+ val waitForStormToReadFromKafka = 1.seconds
+ Thread.sleep(waitForStormToReadFromKafka.toMillis)
+ }
+ })
+
+ Then("the Kafka consumer app should receive the decoded, original tweets from the Storm topology")
+ val waitForConsumerToReadStormOutput = 300.millis
+ Thread.sleep(waitForConsumerToReadStormOutput.toMillis)
+ consumer.shutdown()
+ actualTweets.toSeq should be(tweets.toSeq)
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/scala/com/miguno/kafkastorm/integration/StormSpec.scala b/src/test/scala/com/miguno/kafkastorm/integration/StormSpec.scala
new file mode 100644
index 0000000..cd9ea94
--- /dev/null
+++ b/src/test/scala/com/miguno/kafkastorm/integration/StormSpec.scala
@@ -0,0 +1,113 @@
+package com.miguno.kafkastorm.integration
+
+import _root_.kafka.utils.Logging
+import backtype.storm.{Config, ILocalCluster, Testing}
+import backtype.storm.testing._
+import backtype.storm.topology.TopologyBuilder
+import backtype.storm.tuple.{Fields, Values}
+import org.scalatest._
+
+/**
+ * For more details on Storm unit testing please take a look at:
+ * https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java
+ */
+@DoNotDiscover
+class StormSpec extends FunSpec with Matchers with BeforeAndAfterAll with GivenWhenThen with Logging {
+
+ describe("Storm") {
+
+ it("should start a local cluster", IntegrationTest) {
+ Given("no cluster")
+
+ When("I start a LocalCluster instance")
+ val mkClusterParam = new MkClusterParam
+ mkClusterParam.setSupervisors(2)
+ mkClusterParam.setPortsPerSupervisor(2)
+ val daemonConf = new Config
+ daemonConf.put(Config.SUPERVISOR_ENABLE, false: java.lang.Boolean)
+ daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0: Integer)
+ mkClusterParam.setDaemonConf(daemonConf)
+
+ // When testing your topology, you need a `LocalCluster` to run your topologies. Normally this would mean you'd
+ // have to perform lifecycle management of that local cluster, i.e. you'd need to create it, and after using it,
+ // you'd need to stop it. Using `Testing.withLocalCluster` you don't need to do any of this, just use the
+ // `cluster` provided through the param of `TestJob.run`.`
+ Testing.withLocalCluster(mkClusterParam, new TestJob {
+ override def run(stormCluster: ILocalCluster) {
+ Then("the local cluster should start properly")
+ stormCluster.getState shouldNot be(null)
+ }
+ })
+ }
+
+ it("should run a basic topology", IntegrationTest) {
+ Given("a local cluster")
+ And("a wordcount topology")
+ val mkClusterParam = new MkClusterParam
+ mkClusterParam.setSupervisors(4)
+ val daemonConf = new Config
+ daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, false: java.lang.Boolean)
+ mkClusterParam.setDaemonConf(daemonConf)
+
+ // Base topology setup
+ val builder = new TopologyBuilder
+ val spoutId = "wordSpout"
+ builder.setSpout(spoutId, new TestWordSpout(true), 3)
+ val wordCounterId = "wordCounterBolt"
+ builder.setBolt(wordCounterId, new TestWordCounter, 4).fieldsGrouping(spoutId, new Fields("word"))
+ val globalCountId = "globalCountBolt"
+ builder.setBolt(globalCountId, new TestGlobalCount).globalGrouping(spoutId)
+ val aggregatesCounterId = "aggregatesCounterBolt"
+ builder.setBolt(aggregatesCounterId, new TestAggregatesCounter).globalGrouping(wordCounterId)
+ val topology = builder.createTopology()
+ val completeTopologyParam = new CompleteTopologyParam
+
+ And("the input words alice, bob, joe, alice")
+ val mockedSources = new MockedSources()
+ mockedSources.addMockData(spoutId, new Values("alice"), new Values("bob"), new Values("joe"), new Values("alice"))
+ completeTopologyParam.setMockedSources(mockedSources)
+
+ // Finalize topology config
+ val conf = new Config
+ conf.setNumWorkers(2)
+ completeTopologyParam.setStormConf(conf)
+
+ When("I submit the topology")
+ var result: Option[java.util.Map[_, _]] = None
+ Testing.withSimulatedTimeLocalCluster(mkClusterParam, new TestJob() {
+ override def run(stormCluster: ILocalCluster) {
+ // `completeTopology()` takes your topology, cluster, and configuration. It will mock out the spouts you
+ // specify, and will run the topology until it is idle and all tuples from the spouts have been either acked or
+ // failed, and return all the tuples that have been emitted from all the topology components.
+ result = Some(Testing.completeTopology(stormCluster, topology, completeTopologyParam))
+ }
+ })
+
+ // We could split this `Then()` into multiple ones, each of which covering one of the `Testing.multiseteq()` calls
+ // below. Left as an exercise for the reader. :-)
+ Then("the topology should properly count the words")
+ // Type ascription required for Scala-Java interoperability.
+ val one = 1: Integer
+ val two = 2: Integer
+ val three = 3: Integer
+ val four = 4: Integer
+
+ // Verify the expected behavior for each of the components (spout + bolts) in the topology by comparing
+ // their actual output tuples vs. the corresponding expected output tuples.
+ for {
+ r <- result
+ } {
+ Testing.multiseteq(Testing.readTuples(r, spoutId),
+ new Values(new Values("alice"), new Values("bob"), new Values("joe"), new Values("alice"))) should be(true)
+ Testing.multiseteq(Testing.readTuples(r, wordCounterId),
+ new Values(new Values("alice", one), new Values("alice", two), new Values("bob", one), new Values("joe", one))) should be(true)
+ Testing.multiseteq(Testing.readTuples(r, globalCountId),
+ new Values(new Values(one), new Values(two), new Values(three), new Values(four))) should be(true)
+ Testing.multiseteq(Testing.readTuples(r, aggregatesCounterId),
+ new Values(new Values(one), new Values(two), new Values(three), new Values(four))) should be(true)
+ }
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/scala/com/miguno/kafkastorm/kafka/KafkaProducerAppSpec.scala b/src/test/scala/com/miguno/kafkastorm/kafka/KafkaProducerAppSpec.scala
new file mode 100644
index 0000000..81a05ea
--- /dev/null
+++ b/src/test/scala/com/miguno/kafkastorm/kafka/KafkaProducerAppSpec.scala
@@ -0,0 +1,59 @@
+package com.miguno.kafkastorm.kafka
+
+import _root_.kafka.utils.Logging
+import java.util.Properties
+import org.scalatest.{FunSpec, GivenWhenThen, Matchers}
+
+class KafkaProducerAppSpec extends FunSpec with Matchers with GivenWhenThen with Logging {
+
+ private val AnyTopic = "some-topic"
+ private val AnyBrokerList = "a:9092,b:9093"
+ private val AnyConfigParam = "queue.buffering.max.ms"
+ private val AnyConfigValue = "12345"
+
+ describe("A KafkaProducerApp") {
+
+ it("should let the user configure the broker list") {
+ Given("no app")
+
+ When("I create an app with the broker list set to " + AnyBrokerList)
+ val producerApp = new KafkaProducerApp(AnyTopic, AnyBrokerList)
+
+ Then("the Kafka producer's metadata.broker.list config parameter should be set to this value")
+ producerApp.config.props.getString("metadata.broker.list") should be(AnyBrokerList)
+ }
+
+ it("should use the broker list constructor parameter as the authoritative setting for the broker list") {
+ Given("no app")
+
+ When("I create an app with a producer config that sets the broker list to notMe:1234")
+ val config = {
+ val c = new Properties
+ c.put("metadata.broker.list", "notMe:1234")
+ c
+ }
+ And("with the constructor parameter that sets the broker list to " + AnyBrokerList)
+ val producerApp = new KafkaProducerApp(AnyTopic, AnyBrokerList, config)
+
+ Then("the Kafka producer's actual broker list should be " + AnyBrokerList)
+ producerApp.config.props.getString("metadata.broker.list") should be(AnyBrokerList)
+ }
+
+ it("should let the user customize the Kafka producer configuration") {
+ Given("no app")
+
+ When(s"I create an app with a producer config that sets $AnyConfigParam to $AnyConfigValue")
+ val config = {
+ val c = new Properties
+ c.put(AnyConfigParam, AnyConfigValue)
+ c
+ }
+ val producerApp = new KafkaProducerApp(AnyTopic, AnyBrokerList, config)
+
+ Then(s"the Kafka producer's $AnyConfigParam parameter should be to set to $AnyConfigValue")
+ producerApp.config.props.getString(AnyConfigParam) should be(AnyConfigValue)
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/scala/com/miguno/kafkastorm/storm/AvroDecoderBoltSpec.scala b/src/test/scala/com/miguno/kafkastorm/storm/AvroDecoderBoltSpec.scala
new file mode 100644
index 0000000..fb98046
--- /dev/null
+++ b/src/test/scala/com/miguno/kafkastorm/storm/AvroDecoderBoltSpec.scala
@@ -0,0 +1,144 @@
+package com.miguno.kafkastorm.storm
+
+import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
+import backtype.storm.tuple.{Fields, Tuple, Values}
+import com.miguno.avro.Tweet
+import com.twitter.bijection.Injection
+import com.twitter.bijection.avro.SpecificAvroCodecs
+import org.mockito.Matchers._
+import org.mockito.Mockito.{when => mwhen, _}
+import org.scalatest.{FunSpec, GivenWhenThen, Matchers}
+import org.scalatest.mock.MockitoSugar
+import scala.concurrent.duration._
+
+class AvroDecoderBoltSpec extends FunSpec with Matchers with GivenWhenThen with MockitoSugar {
+
+ implicit val specificAvroBinaryInjection: Injection[Tweet, Array[Byte]] = SpecificAvroCodecs.toBinary[Tweet]
+
+ private type AnyAvroSpecificRecordBase = Tweet
+
+ private val AnyTweet = new Tweet("ANY_USER_1", "ANY_TEXT_1", 1234.seconds.toSeconds)
+ private val AnyTweetInAvroBytes = Injection[Tweet, Array[Byte]](AnyTweet)
+
+ describe("An AvroDecoderBolt") {
+
+ it("should read by default the input field 'bytes' from incoming tuples") {
+ Given("no bolt")
+
+ When("I create a bolt without customizing the input field name")
+ val bolt = new AvroDecoderBolt[AnyAvroSpecificRecordBase]
+ And("the bolt receives a tuple")
+ val tuple = mock[Tuple]
+ val collector = mock[BasicOutputCollector]
+ bolt.execute(tuple, collector)
+
+ Then("the bolt should read the field 'bytes' from the tuple")
+ verify(tuple, times(1)).getBinaryByField("bytes")
+ }
+
+ it("should let the user configure the name of the input field to read from incoming tuples") {
+ Given("no bolt")
+
+ When("I create a bolt with a custom input field name 'foobar'")
+ val bolt = new AvroDecoderBolt[AnyAvroSpecificRecordBase](inputField = "foobar")
+ And("the bolt receives a tuple")
+ val tuple = mock[Tuple]
+ val collector = mock[BasicOutputCollector]
+ bolt.execute(tuple, collector)
+
+ Then("the bolt should read the field 'foobar' from the tuple")
+ verify(tuple, times(1)).getBinaryByField("foobar")
+ }
+
+ it("should deserialize binary records into pojos and send the pojos to downstream bolts") {
+ Given("a bolt of type Tweet")
+ val bolt = new AvroDecoderBolt[Tweet]
+ And("a Tweet record")
+ val tuple = mock[Tuple]
+ mwhen(tuple.getBinaryByField(anyString)).thenReturn(AnyTweetInAvroBytes)
+
+ When("the bolt receives the Tweet record")
+ val collector = mock[BasicOutputCollector]
+ bolt.execute(tuple, collector)
+
+ Then("the bolt should send the decoded Tweet pojo to downstream bolts")
+ verify(collector, times(1)).emit(new Values(AnyTweet))
+ }
+
+ it("should skip over tuples that contain invalid binary records") {
+ Given("a bolt of type Tweet")
+ val bolt = new AvroDecoderBolt[Tweet]
+ And("an invalid binary record")
+ val tuple = mock[Tuple]
+ val invalidBinaryRecord = Array[Byte](1, 2, 3, 4)
+ mwhen(tuple.getBinaryByField(anyString)).thenReturn(invalidBinaryRecord)
+
+ When("the bolt receives the record")
+ val collector = mock[BasicOutputCollector]
+ bolt.execute(tuple, collector)
+
+ Then("the bolt should not send any data to downstream bolts")
+ verifyZeroInteractions(collector)
+ }
+
+ it("should skip over tuples for which reading fails") {
+ Given("a bolt")
+ val bolt = new AvroDecoderBolt[AnyAvroSpecificRecordBase]
+ And("a tuple from which one cannot read")
+ val tuple = mock[Tuple]
+ mwhen(tuple.getBinaryByField(anyString)).thenReturn(null)
+
+ When("the bolt receives the tuple")
+ val collector = mock[BasicOutputCollector]
+ bolt.execute(tuple, collector)
+
+ Then("the bolt should not send any data to downstream bolts")
+ verifyZeroInteractions(collector)
+ }
+
+ it("should declare a single output field with the default name 'pojo'") {
+ Given("no bolt")
+
+ When("I create a bolt without customizing the output field name")
+ val bolt = new AvroDecoderBolt[Tweet]
+
+ Then("the bolt should declare a single output field named 'pojo'")
+ val declarer = mock[OutputFieldsDeclarer]
+ bolt.declareOutputFields(declarer)
+ // We use ArgumentMatcher as a workaround because Storm's Field class does not implement a proper `equals()`
+ // method, and Mockito relies on `equals()` for verification. Because of that the following typical approach
+ // does NOT work: `verify(declarer, times(1)).declare(new Fields("pojo"))`.
+ verify(declarer, times(1)).declare(argThat(FieldsEqualTo(new Fields("pojo"))))
+ }
+
+ it("should let the user define the name of its output field") {
+ Given("no bolt")
+
+ When("I create a bolt with a custom output field name")
+ val bolt = new AvroDecoderBolt[Tweet](outputField = "myCustomFieldName")
+
+ Then("the bolt should declare a single output field with this custom name")
+ val declarer = mock[OutputFieldsDeclarer]
+ bolt.declareOutputFields(declarer)
+ verify(declarer, times(1)).declare(argThat(FieldsEqualTo(new Fields("myCustomFieldName"))))
+ }
+
+ }
+
+ describe("An AvroDecoderBolt companion object") {
+
+ it("should create an AvroDecoderBolt for the correct type") {
+ Given("a companion object")
+
+ When("I ask it to create a bolt for type Tweet")
+ val bolt = AvroDecoderBolt.ofType(classOf[Tweet])
+
+ Then("the bolt should be an AvroDecoderBolt")
+ bolt shouldBe an[AvroDecoderBolt[_]]
+ And("the bolt should be parameterized with the type Tweet")
+ bolt.tpe.shouldEqual(manifest[Tweet])
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/scala/com/miguno/kafkastorm/storm/AvroKafkaSinkBoltSpec.scala b/src/test/scala/com/miguno/kafkastorm/storm/AvroKafkaSinkBoltSpec.scala
new file mode 100644
index 0000000..6306234
--- /dev/null
+++ b/src/test/scala/com/miguno/kafkastorm/storm/AvroKafkaSinkBoltSpec.scala
@@ -0,0 +1,113 @@
+package com.miguno.kafkastorm.storm
+
+import backtype.storm.task.TopologyContext
+import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
+import backtype.storm.tuple.{Fields, Tuple}
+import com.miguno.avro.Tweet
+import com.miguno.kafkastorm.kafka.{KafkaProducerApp, KafkaProducerAppFactory}
+import com.twitter.bijection.Injection
+import com.twitter.bijection.avro.SpecificAvroCodecs
+import java.util
+import org.mockito.AdditionalMatchers
+import org.mockito.Matchers.argThat
+import org.mockito.Mockito.{when => mwhen, _}
+import org.scalatest.{FunSpec, GivenWhenThen, Matchers}
+import org.scalatest.mock.MockitoSugar
+import scala.concurrent.duration._
+
+class AvroKafkaSinkBoltSpec extends FunSpec with Matchers with GivenWhenThen with MockitoSugar {
+
+ implicit val specificAvroBinaryInjection: Injection[Tweet, Array[Byte]] = SpecificAvroCodecs.toBinary[Tweet]
+
+ private type AnyAvroSpecificRecordBase = Tweet
+
+ private val AnyTweet = new Tweet("ANY_USER_1", "ANY_TEXT_1", 1234.seconds.toSeconds)
+ private val AnyTweetInAvroBytes = Injection[Tweet, Array[Byte]](AnyTweet)
+ private val DummyStormConf = new util.HashMap[Object, Object]
+ private val DummyStormContext = mock[TopologyContext]
+
+ describe("An AvroKafkaSinkBolt") {
+
+ it("should send pojos of the configured type to Kafka in Avro-encoded binary format") {
+ Given("a bolt for type Tweet")
+ val producerApp = mock[KafkaProducerApp]
+ val producerAppFactory = mock[KafkaProducerAppFactory]
+ mwhen(producerAppFactory.newInstance()).thenReturn(producerApp)
+ val bolt = new AvroKafkaSinkBolt[Tweet](producerAppFactory)
+ bolt.prepare(DummyStormConf, DummyStormContext)
+
+ When("it receives a Tweet pojo")
+ val tuple = mock[Tuple]
+ // The `Nil: _*` is required workaround because of a known Scala-Java interop problem related to Scala's treatment
+ // of Java's varargs. See http://stackoverflow.com/a/13361530/1743580.
+ mwhen(tuple.getValueByField("pojo")).thenReturn(AnyTweet, Nil: _*)
+ val collector = mock[BasicOutputCollector]
+ bolt.execute(tuple, collector)
+
+ Then("it should send the Avro-encoded pojo to Kafka")
+ // Note: The simpler Mockito variant of `verify(kafkaProducer).send(AnyTweetInAvroBytes)` is not enough because
+ // this variant will not verify whether the Array[Byte] parameter passed to `send()` has the correct value.
+ verify(producerApp).send(AdditionalMatchers.aryEq(AnyTweetInAvroBytes))
+ And("it should not send any data to downstream bolts")
+ verifyZeroInteractions(collector)
+ }
+
+ it("should ignore pojos of an unexpected type") {
+ Given("a bolt for type Tweet")
+ val producerApp = mock[KafkaProducerApp]
+ val producerAppFactory = mock[KafkaProducerAppFactory]
+ mwhen(producerAppFactory.newInstance()).thenReturn(producerApp)
+ val bolt = new AvroKafkaSinkBolt[Tweet](producerAppFactory)
+ bolt.prepare(DummyStormConf, DummyStormContext)
+
+ When("receiving a non-Tweet pojo")
+ val tuple = mock[Tuple]
+ val invalidPojo = "I am not of the expected type!"
+ // The `Nil: _*` is required workaround because of a known Scala-Java interop problem related to Scala's treatment
+ // of Java's varargs. See http://stackoverflow.com/a/13361530/1743580.
+ mwhen(tuple.getValueByField("pojo")).thenReturn(invalidPojo, Nil: _*)
+ val collector = mock[BasicOutputCollector]
+ bolt.execute(tuple, collector)
+
+ Then("it should not send any data to Kafka")
+ verifyZeroInteractions(producerApp)
+ And("it should not send any data to downstream bolts")
+ verifyZeroInteractions(collector)
+ }
+
+ it("should not declare any output fields") {
+ Given("no bolt")
+
+ When("I create a bolt")
+ val producerAppFactory = mock[KafkaProducerAppFactory]
+ val bolt = new AvroKafkaSinkBolt[AnyAvroSpecificRecordBase](producerAppFactory)
+
+ Then("it should declare zero output fields")
+ val declarer = mock[OutputFieldsDeclarer]
+ bolt.declareOutputFields(declarer)
+ // We use ArgumentMatcher as a workaround because Storm's Field class does not implement a proper `equals()`
+ // method, and Mockito relies on `equals()` for verification. Because of that the following typical approach
+ // does NOT work: `verify(declarer, times(1)).declare(new Fields())`.
+ verify(declarer, times(1)).declare(argThat(FieldsEqualTo(new Fields())))
+ }
+
+ }
+
+ describe("An AvroKafkaSinkBolt companion object") {
+
+ it("should create an AvroKafkaSinkBolt for the correct type") {
+ Given("a companion object")
+
+ When("I ask it to create a bolt for type Tweet")
+ val producerAppFactory = mock[KafkaProducerAppFactory]
+ val bolt = AvroKafkaSinkBolt.ofType(classOf[Tweet])(producerAppFactory)
+
+ Then("the bolt should be an AvroKafkaSinkBolt")
+ bolt shouldBe an[AvroKafkaSinkBolt[_]]
+ And("the bolt should be parameterized with the type Tweet")
+ bolt.tpe.shouldEqual(manifest[Tweet])
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/scala/com/miguno/kafkastorm/storm/AvroSchemeSpec.scala b/src/test/scala/com/miguno/kafkastorm/storm/AvroSchemeSpec.scala
new file mode 100644
index 0000000..5b7a2be
--- /dev/null
+++ b/src/test/scala/com/miguno/kafkastorm/storm/AvroSchemeSpec.scala
@@ -0,0 +1,96 @@
+package com.miguno.kafkastorm.storm
+
+import com.miguno.avro.Tweet
+import com.twitter.bijection.Injection
+import com.twitter.bijection.avro.SpecificAvroCodecs
+import org.scalatest.{FunSpec, GivenWhenThen, Matchers}
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+
+class AvroSchemeSpec extends FunSpec with Matchers with GivenWhenThen {
+
+ implicit val specificAvroBinaryInjectionForTweet = SpecificAvroCodecs.toBinary[Tweet]
+
+ val fixture = {
+ val BeginningOfEpoch = 0.seconds
+ val AnyTimestamp = 1234.seconds
+ val now = System.currentTimeMillis().millis
+
+ new {
+ val t1 = new Tweet("ANY_USER_1", "ANY_TEXT_1", now.toSeconds)
+ val t2 = new Tweet("ANY_USER_2", "ANY_TEXT_2", BeginningOfEpoch.toSeconds)
+ val t3 = new Tweet("ANY_USER_3", "ANY_TEXT_3", AnyTimestamp.toSeconds)
+
+ val messages = Seq(t1, t2, t3)
+ }
+ }
+
+ describe("An AvroScheme") {
+
+ it("should have a single output field named 'pojo'") {
+ Given("a scheme")
+ val scheme = new AvroScheme
+
+ When("I get its output fields")
+ val outputFields = scheme.getOutputFields()
+
+ Then("there should only be a single field")
+ outputFields.size() should be(1)
+
+ And("this field should be named 'pojo'")
+ outputFields.contains("pojo") should be(true)
+ }
+
+
+ it("should deserialize binary records of the configured type into pojos") {
+ Given("a scheme for type Tweet ")
+ val scheme = new AvroScheme[Tweet]
+ And("some binary-encoded Tweet records")
+ val f = fixture
+ val encodedTweets = f.messages.map(Injection[Tweet, Array[Byte]])
+
+ When("I deserialize the records into pojos")
+ val actualTweets = for {
+ l <- encodedTweets.map(scheme.deserialize)
+ tweet <- l.asScala
+ } yield tweet
+
+ Then("the pojos should be equal to the original pojos")
+ actualTweets should be(f.messages)
+ }
+
+ it("should throw a runtime exception when serialization fails") {
+ Given("a scheme for type Tweet ")
+ val scheme = new AvroScheme[Tweet]
+ And("an invalid binary record")
+ val invalidBytes = Array[Byte](1, 2, 3, 4)
+
+ When("I deserialize the record into a pojo")
+
+ Then("the scheme should throw a runtime exception")
+ val exception = intercept[RuntimeException] {
+ scheme.deserialize(invalidBytes)
+ }
+ And("the exception should provide a meaningful explanation")
+ exception.getMessage should be("Could not decode input bytes")
+ }
+
+ }
+
+ describe("An AvroScheme companion object") {
+
+ it("should create an AvroScheme for the correct type") {
+ Given("a companion object")
+
+ When("I ask it to create a scheme for type Tweet")
+ val scheme = AvroScheme.ofType(classOf[Tweet])
+
+ Then("the scheme should be an AvroScheme")
+ scheme shouldBe an[AvroScheme[_]]
+ And("the scheme should be parameterized with the type Tweet")
+ scheme.tpe.shouldEqual(manifest[Tweet])
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/scala/com/miguno/kafkastorm/storm/FieldsEqualTo.scala b/src/test/scala/com/miguno/kafkastorm/storm/FieldsEqualTo.scala
new file mode 100644
index 0000000..7f8c741
--- /dev/null
+++ b/src/test/scala/com/miguno/kafkastorm/storm/FieldsEqualTo.scala
@@ -0,0 +1,30 @@
+package com.miguno.kafkastorm.storm
+
+import backtype.storm.tuple.Fields
+import org.mockito.ArgumentMatcher
+import scala.collection.JavaConverters._
+
+/**
+ * [[org.mockito.ArgumentMatcher]] for Storm's [[backtype.storm.tuple.Fields]].
+ *
+ * @example {{{
+ * // Verify that a single field named "pojo" is declared.
+ * verify(declarer).declare(argThat(FieldsEqualTo(new Fields("pojo"))))
+ * }}}
+ *
+ * ==Why this approach is required==
+ * We must use an ArgumentMatcher as a workaround because Storm's Field class does not implement a proper `equals()`
+ * method, and Mockito relies on `equals()` for verification. Because of that the following intuitive approach for
+ * Mockito does not work: `verify(declarer, times(1)).declare(new Fields("bytes"))`.
+ * @param expectedFields
+ */
+class FieldsEqualTo(val expectedFields: Fields) extends ArgumentMatcher[Fields] {
+ override def matches(o: scala.Any): Boolean = {
+ val fields = o.asInstanceOf[Fields].toList.asScala
+ fields == expectedFields.toList.asScala
+ }
+}
+
+object FieldsEqualTo {
+ def apply(expFields: Fields) = new FieldsEqualTo(expFields)
+}
\ No newline at end of file
diff --git a/version.sbt b/version.sbt
new file mode 100644
index 0000000..57b0bcb
--- /dev/null
+++ b/version.sbt
@@ -0,0 +1 @@
+version in ThisBuild := "0.1.0-SNAPSHOT"