Skip to content
This repository was archived by the owner on Mar 23, 2022. It is now read-only.

Commit

Permalink
Use Storm 0.9.2
Browse files Browse the repository at this point in the history
  • Loading branch information
miguno committed Jun 30, 2014
1 parent a70c4dd commit ef3a727
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 98 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# 0.1.1 (unreleased)
# 0.2.0 (unreleased)

* Use Storm 0.9.2. This includes two notable improvements:
* We can and do use the Kafka 0.8 compatible Kafka spout included in Storm 0.9.2.
* We use ZooKeeper 3.4.5, up from 3.3.x before.
* AvroKafkaSinkBolt should not declare any output fields because it writes to Kafka only, it does not emit any tuples.


Expand Down
102 changes: 30 additions & 72 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ Kafka topic.

Note that this example will actually run _two_ in-memory instances of ZooKeeper: the first (listening at
`127.0.0.1:2181/tcp`) is used by the Kafka instance, the second (listening at `127.0.0.1:2000/tcp`) is automatically
started and used by the in-memory Storm cluster. This is because, when running in local aka in-memory mode, Storm does
not allow you to reconfigure or disable its own ZooKeeper instance (see the [Storm FAQ](#FAQ-Storm) below for further
information).
started and used by the in-memory Storm cluster. This is because, when running in local aka in-memory mode, Storm
until version 0.9.2 does not allow you to reconfigure or disable its own ZooKeeper instance (see the
[Storm FAQ](#FAQ-Storm) below for further information).

**To stop the demo application you must kill or `Ctrl-C` the process in the terminal.**

Expand All @@ -164,7 +164,6 @@ way to get started with such an infrastructure is by deploying Kafka, Storm, and
[Wirbelsturm](https://github.com/miguno/wirbelsturm).



<a name="Features"></a>

# Features
Expand Down Expand Up @@ -235,16 +234,9 @@ What features do we showcase in kafka-storm-starter? Note that we focus on show
[custom Kryo serializer for Storm](src/main/scala/com/miguno/kafkastorm/storm/TweetAvroKryoDecorator.scala) that
handles our Avro-derived Java class `Tweet` from [twitter.avsc](src/main/avro/twitter.avsc).
* Unit and integration tests are implemented with [ScalaTest](http://scalatest.org/).
* We use [ZooKeeper 3.3.4](https://zookeeper.apache.org/) instead of the latest version 3.4.5.
See section _Known issues_ below for why we do that.
* We use the Kafka spout [wurstmeister/storm-kafka-0.8-plus](https://github.com/wurstmeister/storm-kafka-0.8-plus).
Unfortunately that spout is not yet released for Scala 2.10. For that reason [@miguno](https://github.com/miguno/)
has [forked and branched](https://github.com/miguno/storm-kafka-0.8-plus/tree/miguno_clojars) the code to add Scala
2.10 support, and released such a version to [Clojars](https://clojars.org/com.miguno/storm-kafka-0.8-plus_2.10).
See [build.sbt](build.sbt) for details.
* _Once Storm 0.9.2 is released we will migrate to the new_
_[Kafka spout that ships with Storm](https://github.com/apache/incubator-storm/tree/master/external/storm-kafka)_
_(which is based on the spout developed by wurstmeister)._
* We use [ZooKeeper 3.4.5](https://zookeeper.apache.org/).
* We use the [official Kafka spout](https://github.com/apache/incubator-storm/tree/master/external/storm-kafka) of the
Storm project, which is compatible with Kafka 0.8.


<a name="Development"></a>
Expand Down Expand Up @@ -433,25 +425,25 @@ To create a normal ("slim") jar:

$ ./sbt clean package

>>> Generates `target/scala-2.10/kafka-storm-starter_2.10-0.1.0-SNAPSHOT.jar`
>>> Generates `target/scala-2.10/kafka-storm-starter_2.10-0.2.0-SNAPSHOT.jar`

To create a fat jar, which includes any dependencies of kafka-storm-starter:

$ ./sbt assembly

>>> Generates `target/scala-2.10/kafka-storm-starter-assembly-0.1.0-SNAPSHOT.jar`
>>> Generates `target/scala-2.10/kafka-storm-starter-assembly-0.2.0-SNAPSHOT.jar`

To create a scaladoc/javadoc jar:

$ ./sbt packageDoc

>>> Generates `target/scala-2.10/kafka-storm-starter_2.10-0.1.0-SNAPSHOT-javadoc.jar`
>>> Generates `target/scala-2.10/kafka-storm-starter_2.10-0.2.0-SNAPSHOT-javadoc.jar`

To create a sources jar:

$ ./sbt packageSrc

>>> Generates `target/scala-2.10/kafka-storm-starter_2.10-0.1.0-SNAPSHOT-sources.jar`
>>> Generates `target/scala-2.10/kafka-storm-starter_2.10-0.2.0-SNAPSHOT-sources.jar`

To create API docs:

Expand Down Expand Up @@ -525,6 +517,7 @@ contain the messages that are being sent to the Kafka topics) under `/tmp/kafka-
You may need to manually remove this directory in case you want start from a clean state. At the moment the unit tests
do not remove this directory for you.


### ZooKeeper exceptions "KeeperException: NoNode for /[ZK path]" logged at INFO level

In short you can normally safely ignore those errors -- it's for a reason they are logged at INFO level and not at ERROR
Expand Down Expand Up @@ -567,11 +560,12 @@ for details):

where `zk-port` is the final port chosen.

As of May 2014 it is not possible to launch a local Storm cluster via `LocalCluster` without its own embedded ZooKeeper.
Likewise it is not possible to control on which port the embedded ZooKeeper process will listen -- it will always follow
the `2000/tcp` based algorithm above to set the port. A JIRA ticket was opened to untangle this hard wiring between
`LocalCluster` and ZooKeeper, cf.
[STORM-213: Decouple In-Process ZooKeeper from LocalCluster](https://issues.apache.org/jira/browse/STORM-213).
In Storm versions <= 0.9.2 it is not possible to launch a local Storm cluster via `LocalCluster` without its own embedded
ZooKeeper. Likewise it is not possible to control on which port the embedded ZooKeeper process will listen -- it will
always follow the `2000/tcp` based algorithm above to set the port.

In Storm 0.9.3 and later you can configure `LocalCluster` to use a custom ZooKeeper instance, thanks to
[STORM-213](https://issues.apache.org/jira/browse/STORM-213).


<a name="Known-issues"></a>
Expand All @@ -586,19 +580,6 @@ own code.

## Upstream code

### Kryo version conflict in Storm

_Note: This problem is resolved in the upcoming 0.9.2 version of Storm._

There is a Kryo version conflict between Storm 0.9.1 (uses Kryo 2.17) and Twitter Chill (uses Kryo 2.21).

In this code project we use the workaround to exclude Kryo (2.21) from the Twitter Chill dependency, but this may not
be a universal workaround. Twitter have apparently run into data corruption issues with Kryo 2.17, and for that reason
have built their own version of Storm using Kryo 2.21.
See [CHILL-173: Kryo version conflict between Chill and Storm 0.9.1-incubating causes Avro serialization to fail](https://github.com/twitter/chill/issues/173)
for details.


### ZooKeeper throws InstanceAlreadyExistsException during tests

You will see the following exception when running the integration tests, which you can safely ignore:
Expand All @@ -612,49 +593,26 @@ instances trying to use the same JMX setup. Since the JMX setup is not relevant
safely ignored, albeit we'd prefer to come up with a proper fix, of course.


### ZooKeeper version 3.3.x recommended for use with Storm 0.9.1 and Kafka 0.8.x

_Note: The upcoming version 0.9.2 of Storm uses ZooKeeper 3.4.5._
### ZooKeeper version 3.3.4 recommended for use with Kafka 0.8

At the time of writing both Storm (<= 0.9.1) and Kafka (<= 0.8.1.1) are not officially compatible with ZooKeeper 3.4.x
yet, which is the latest stable version of ZooKeeper. Instead the use of ZooKeeper 3.3.x is recommended.
At the time of writing Kafka 0.8 is not officially compatible with ZooKeeper 3.4.x, which is the latest stable version
of ZooKeeper. Instead the Kafka project
[recommends ZooKeeper 3.3.4](https://kafka.apache.org/documentation.html#zkversion).

So which version of ZooKeeper should you do pick, particularly if you are already running a ZooKeeper cluster for other
parts of your infrastructure (such as an Hadoop cluster)?

**The TL;DR version is:** Try using ZooKeeper 3.4.5 for both Kafka and Storm, but see the caveats and workarounds
below. If you do run into problems, consider downgrading to ZooKeeper 3.3.6. If that fails, too, try 3.3.4. In the
worst case use separate ZooKeeper clusters/versions for Storm (3.3.3) and Kafka (3.3.4).

**The longer version is:** Storm versions up to and including 0.9.1 want ZK 3.3.3, but the upcoming 0.9.2 version
relies on ZooKeeper 3.4.x.
[All current versions of Kafka still prefer ZK 3.3.4](https://kafka.apache.org/documentation.html#zkversion).
Generally speaking though, the best 3.3.x version of ZooKeeper is 3.3.6, which is the latest stable 3.3.x version. This
is because 3.3.6 fixed a number of serious bugs that could lead to data corruption.

_Tip: You can verify against which ZK version the code in this project is actually built by running_
_`./sbt dependency-graph`._

**The really long version is:** In the _code and tests_ of this project we cannot use ZK 3.4.x just yet because Storm
0.9.1 is not 100% incompatible with ZK 3.4.x. For instance, Storm will throw errors if you try to run a Storm
`LocalCluster` (for unit testing) against ZK 3.4.x. At the same time, and somewhat surprisingly, you can run a "real"
Storm cluster against ZK 3.4.x. For instance, Netflix have reportedly been using ZK 3.4.5 in production since some
time.

* Storm and ZooKeeper: Storm versions up to and including 0.9.1 are built against ZooKeeper 3.3.3 because of Storm's
dependency on [Netflix Curator 1.0.1](https://github.com/Netflix/curator). These versions of Zookeeper and Curator
are very old, and the upcoming Storm 0.9.2 therefore switches to Apache Curator 2.4.0 with ZooKeeper 3.4.x.
* Kafka and ZooKeeper: LinkedIn recommend the use of ZK 3.3.x but warn against the use of 3.3.3 because that
version has known serious issues regarding ephemeral node deletion and session expirations. For these reasons
LinkedIn run ZK 3.3.4 in production.
See [ZooKeeper version](https://kafka.apache.org/documentation.html#zkversion) in the Kafka documentation.
Lastly, there is an open Kafka JIRA ticket that covers upgrading Kafka to ZK 3.4.5, see
below. In the worst case use separate ZooKeeper clusters/versions for Storm (3.4.5) and Kafka (3.3.4). Generally
speaking though, the best 3.3.x version of ZooKeeper is 3.3.6, which is the latest stable 3.3.x version. This is
because 3.3.6 fixed a number of serious bugs that could lead to data corruption.

_Tip: You can verify the exact ZK version used in kafka-storm-starter by running `./sbt dependency-graph`._

Notes:

* There is an open Kafka JIRA ticket that covers upgrading Kafka to ZK 3.4.5, see
[KAFKA-854: Upgrade dependencies for 0.8](https://issues.apache.org/jira/browse/KAFKA-854).
* Storm and Cloudera CDH 4.5:
* [Storm cannot run in combination with a recent Hadoop/HBase version](http://mail-archives.apache.org/mod_mbox/storm-user/201402.mbox/%3CCADoiZqom8Wuzi9uiqT4d01cTNn2r_nOmXyZyCSqEko-vOyrQBA@mail.gmail.com%3E)
-- The author ran into problems when using Storm in combination with Cloudera CDH 4. It looks as if he is trying
to build a code project that lists both Storm and Hadoop/HBase as its dependencies (similar to how we combine
Storm with Kafka), and due to that runs into ZooKeeper version conflicts as CDH 4 runs ZooKeeper 3.4.5.
* If in a production environment you run into problems when using ZooKeeper 3.4.5 with Storm <= 0.9.1, you can try
a [workaround using Google jarjar](https://groups.google.com/forum/#!topic/storm-user/TVVF_jqvD_A) in order to
deploy ZooKeeper 3.4.5 alongside Storm's/Curator's hard dependency on ZooKeeper 3.3.3.
Expand Down
33 changes: 11 additions & 22 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,25 @@ resolvers ++= Seq(
libraryDependencies ++= Seq(
"com.twitter" %% "bijection-core" % "0.6.2",
"com.twitter" %% "bijection-avro" % "0.6.2",
// Chill uses Kryo 2.21, which is not fully compatible with 2.17 (used by Storm).
// We must exclude the newer Kryo version, otherwise we run into the problem described at
// https://github.com/thinkaurelius/titan/issues/301.
//
// TODO: Once Storm 0.9.2 is released we can update our dependencies to use Chill as-is (without excludes) because
// Storm then uses Kryo 2.21 (via Carbonite 1.3.3) just like Chill does.
"com.twitter" %% "chill" % "0.3.6"
exclude("com.esotericsoftware.kryo", "kryo"),
"com.twitter" % "chill-avro" % "0.3.6"
exclude("com.esotericsoftware.kryo", "kryo"),
"com.twitter" %% "chill-bijection" % "0.3.6"
exclude("com.esotericsoftware.kryo", "kryo"),
"com.twitter" %% "chill" % "0.3.6",
"com.twitter" % "chill-avro" % "0.3.6",
"com.twitter" %% "chill-bijection" % "0.3.6",
// The excludes of jms, jmxtools and jmxri are required as per https://issues.apache.org/jira/browse/KAFKA-974.
// The exclude of slf4j-simple is because it overlaps with our use of logback with slf4j facade; without the exclude
// we get slf4j warnings and logback's configuration is not picked up.
"org.apache.kafka" % "kafka_2.10" % "0.8.1.1"
exclude("javax.jms", "jms")
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
exclude("org.slf4j", "slf4j-simple"),
"org.apache.storm" % "storm-core" % "0.9.1-incubating" % "provided"
exclude("org.slf4j", "slf4j-simple")
exclude("log4j", "log4j")
exclude("org.apache.zookeeper", "zookeeper"),
"org.apache.storm" % "storm-core" % "0.9.2-incubating" % "provided"
exclude("org.apache.zookeeper", "zookeeper")
exclude("org.slf4j", "log4j-over-slf4j"),
// We exclude curator-framework because storm-kafka-0.8-plus recently switched from curator 1.0.1 to 1.3.3, which
// pulls in a newer version of ZooKeeper with which Storm 0.9.1 is not yet compatible.
//
// TODO: Remove the exclude once Storm 0.9.2 is released, because that version depends on a newer version (3.4.x) of
// ZooKeeper.
"com.miguno" %% "storm-kafka-0.8-plus" % "0.5.0-SNAPSHOT"
exclude("com.netflix.curator", "curator-framework"),
"com.netflix.curator" % "curator-test" % "1.0.1",
"org.apache.storm" % "storm-kafka" % "0.9.2-incubating"
exclude("org.apache.zookeeper", "zookeeper"),
"org.apache.curator" % "curator-test" % "2.4.0",
"com.101tec" % "zkclient" % "0.4",
// Logback with slf4j facade
"ch.qos.logback" % "logback-classic" % "1.1.2",
Expand Down
2 changes: 1 addition & 1 deletion sonar-project.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Required metadata
sonar.projectKey=com.miguno.kafkastorm:kafka-storm-starter
sonar.projectName=kafka-storm-starter
sonar.projectVersion=0.1.1-SNAPSHOT
sonar.projectVersion=0.2.0-SNAPSHOT

# Base configuration of paths
sonar.sources=src/main/java,src/main/scala
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.miguno.kafkastorm.zookeeper

import com.netflix.curator.test.TestingServer
import kafka.utils.Logging
import org.apache.curator.test.TestingServer

/**
* Runs an in-memory, "embedded" instance of a ZooKeeper server.
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.1.1-SNAPSHOT"
version in ThisBuild := "0.2.0-SNAPSHOT"

0 comments on commit ef3a727

Please sign in to comment.