Skip to content

Commit 0c03297

Browse files
committed
[SPARK-22142][BUILD][STREAMING] Move Flume support behind a profile, take 2
## What changes were proposed in this pull request? Move flume behind a profile, take 2. See apache#19365 for most of the back-story. This change should fix the problem by removing the examples module dependency and moving Flume examples to the module itself. It also adds deprecation messages, per a discussion on dev about deprecating for 2.3.0. ## How was this patch tested? Existing tests, which still enable flume integration. Author: Sean Owen <[email protected]> Closes apache#19412 from srowen/SPARK-22142.2.
1 parent 83488cc commit 0c03297

File tree

16 files changed

+73
-40
lines changed

16 files changed

+73
-40
lines changed

dev/create-release/release-build.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ MVN="build/mvn --force"
8484
# Hive-specific profiles for some builds
8585
HIVE_PROFILES="-Phive -Phive-thriftserver"
8686
# Profiles for publishing snapshots and release to Maven Central
87-
PUBLISH_PROFILES="-Pmesos -Pyarn $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"
87+
PUBLISH_PROFILES="-Pmesos -Pyarn -Pflume $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"
8888
# Profiles for building binary releases
89-
BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Psparkr"
89+
BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Pflume -Psparkr"
9090
# Scala 2.11 only profiles for some builds
9191
SCALA_2_11_PROFILES="-Pkafka-0-8"
9292
# Scala 2.12 only profiles for some builds

dev/mima

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ set -e
2424
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
2525
cd "$FWDIR"
2626

27-
SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
27+
SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pflume -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
2828
TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export tools/fullClasspath" | tail -n1)"
2929
OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)"
3030

dev/scalastyle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ ERRORS=$(echo -e "q\n" \
2525
-Pmesos \
2626
-Pkafka-0-8 \
2727
-Pyarn \
28+
-Pflume \
2829
-Phive \
2930
-Phive-thriftserver \
3031
scalastyle test:scalastyle \

dev/sparktestsupport/modules.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,12 @@ def __hash__(self):
279279
source_file_regexes=[
280280
"external/flume-sink",
281281
],
282+
build_profile_flags=[
283+
"-Pflume",
284+
],
285+
environ={
286+
"ENABLE_FLUME_TESTS": "1"
287+
},
282288
sbt_test_goals=[
283289
"streaming-flume-sink/test",
284290
]
@@ -291,6 +297,12 @@ def __hash__(self):
291297
source_file_regexes=[
292298
"external/flume",
293299
],
300+
build_profile_flags=[
301+
"-Pflume",
302+
],
303+
environ={
304+
"ENABLE_FLUME_TESTS": "1"
305+
},
294306
sbt_test_goals=[
295307
"streaming-flume/test",
296308
]
@@ -302,7 +314,13 @@ def __hash__(self):
302314
dependencies=[streaming_flume, streaming_flume_sink],
303315
source_file_regexes=[
304316
"external/flume-assembly",
305-
]
317+
],
318+
build_profile_flags=[
319+
"-Pflume",
320+
],
321+
environ={
322+
"ENABLE_FLUME_TESTS": "1"
323+
}
306324
)
307325

308326

dev/test-dependencies.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ export LC_ALL=C
2929
# TODO: This would be much nicer to do in SBT, once SBT supports Maven-style resolution.
3030

3131
# NOTE: These should match those in the release publishing script
32-
HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pyarn -Phive"
32+
HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pyarn -Pflume -Phive"
3333
MVN="build/mvn"
3434
HADOOP_PROFILES=(
3535
hadoop-2.6

docs/building-spark.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,13 @@ Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.
100100

101101
Kafka 0.10 support is still automatically built.
102102

103+
## Building with Flume support
104+
105+
Apache Flume support must be explicitly enabled with the `flume` profile.
106+
Note: Flume support is deprecated as of Spark 2.3.0.
107+
108+
./build/mvn -Pflume -DskipTests clean package
109+
103110
## Building submodules individually
104111

105112
It's possible to build Spark sub-modules using the `mvn -pl` option.

docs/streaming-flume-integration.md

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ title: Spark Streaming + Flume Integration Guide
55

66
[Apache Flume](https://flume.apache.org/) is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. Here we explain how to configure Flume and Spark Streaming to receive data from Flume. There are two approaches to this.
77

8+
**Note: Flume support is deprecated as of Spark 2.3.0.**
9+
810
## Approach 1: Flume-style Push-based Approach
911
Flume is designed to push data between Flume agents. In this approach, Spark Streaming essentially sets up a receiver that acts an Avro agent for Flume, to which Flume can push the data. Here are the configuration steps.
1012

@@ -44,26 +46,23 @@ configuring Flume agents.
4446

4547
val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
4648

47-
See the [API docs](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$)
48-
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala).
49+
See the [API docs](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$).
4950
</div>
5051
<div data-lang="java" markdown="1">
5152
import org.apache.spark.streaming.flume.*;
5253

5354
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
5455
FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]);
5556

56-
See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html)
57-
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java).
57+
See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html).
5858
</div>
5959
<div data-lang="python" markdown="1">
6060
from pyspark.streaming.flume import FlumeUtils
6161

6262
flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
6363

6464
By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type.
65-
See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils)
66-
and the [example]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/streaming/flume_wordcount.py).
65+
See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils).
6766
</div>
6867
</div>
6968

@@ -162,8 +161,6 @@ configuring Flume agents.
162161
</div>
163162
</div>
164163

165-
See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala).
166-
167164
Note that each input DStream can be configured to receive data from multiple sinks.
168165

169166
3. **Deploying:** This is same as the first approach.

examples/pom.xml

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
<sbt.project.name>examples</sbt.project.name>
3535
<build.testJarPhase>none</build.testJarPhase>
3636
<build.copyDependenciesPhase>package</build.copyDependenciesPhase>
37-
<flume.deps.scope>provided</flume.deps.scope>
3837
<hadoop.deps.scope>provided</hadoop.deps.scope>
3938
<hive.deps.scope>provided</hive.deps.scope>
4039
<parquet.deps.scope>provided</parquet.deps.scope>
@@ -78,12 +77,6 @@
7877
<version>${project.version}</version>
7978
<scope>provided</scope>
8079
</dependency>
81-
<dependency>
82-
<groupId>org.apache.spark</groupId>
83-
<artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
84-
<version>${project.version}</version>
85-
<scope>provided</scope>
86-
</dependency>
8780
<dependency>
8881
<groupId>org.apache.spark</groupId>
8982
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>

examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java renamed to external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ public static void main(String[] args) throws Exception {
4848
System.exit(1);
4949
}
5050

51-
StreamingExamples.setStreamingLogLevels();
52-
5351
String host = args[0];
5452
int port = Integer.parseInt(args[1]);
5553

examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala renamed to external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@ object FlumeEventCount {
4747
System.exit(1)
4848
}
4949

50-
StreamingExamples.setStreamingLogLevels()
51-
5250
val Array(host, IntParam(port)) = args
5351

5452
val batchInterval = Milliseconds(2000)

examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala renamed to external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@ object FlumePollingEventCount {
4444
System.exit(1)
4545
}
4646

47-
StreamingExamples.setStreamingLogLevels()
48-
4947
val Array(host, IntParam(port)) = args
5048

5149
val batchInterval = Milliseconds(2000)

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.streaming.StreamingContext
3030
import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaReceiverInputDStream, JavaStreamingContext}
3131
import org.apache.spark.streaming.dstream.ReceiverInputDStream
3232

33+
@deprecated("Deprecated without replacement", "2.3.0")
3334
object FlumeUtils {
3435
private val DEFAULT_POLLING_PARALLELISM = 5
3536
private val DEFAULT_POLLING_BATCH_SIZE = 1000

pom.xml

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,13 @@
9898
<module>sql/core</module>
9999
<module>sql/hive</module>
100100
<module>assembly</module>
101-
<module>external/flume</module>
102-
<module>external/flume-sink</module>
103-
<module>external/flume-assembly</module>
104101
<module>examples</module>
105102
<module>repl</module>
106103
<module>launcher</module>
107104
<module>external/kafka-0-10</module>
108105
<module>external/kafka-0-10-assembly</module>
109106
<module>external/kafka-0-10-sql</module>
107+
<!-- See additional modules enabled by profiles below -->
110108
</modules>
111109

112110
<properties>
@@ -2583,6 +2581,15 @@
25832581
</dependencies>
25842582
</profile>
25852583

2584+
<profile>
2585+
<id>flume</id>
2586+
<modules>
2587+
<module>external/flume</module>
2588+
<module>external/flume-sink</module>
2589+
<module>external/flume-assembly</module>
2590+
</modules>
2591+
</profile>
2592+
25862593
<!-- Ganglia integration is not included by default due to LGPL-licensed code -->
25872594
<profile>
25882595
<id>spark-ganglia-lgpl</id>

project/SparkBuild.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,8 @@ object BuildCommons {
4343
"catalyst", "sql", "hive", "hive-thriftserver", "sql-kafka-0-10"
4444
).map(ProjectRef(buildLocation, _))
4545

46-
val streamingProjects@Seq(
47-
streaming, streamingFlumeSink, streamingFlume, streamingKafka010
48-
) = Seq(
49-
"streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka-0-10"
50-
).map(ProjectRef(buildLocation, _))
46+
val streamingProjects@Seq(streaming, streamingKafka010) =
47+
Seq("streaming", "streaming-kafka-0-10").map(ProjectRef(buildLocation, _))
5148

5249
val allProjects@Seq(
5350
core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, launcher, unsafe, tags, sketch, kvstore, _*
@@ -56,9 +53,13 @@ object BuildCommons {
5653
"tags", "sketch", "kvstore"
5754
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects
5855

59-
val optionallyEnabledProjects@Seq(mesos, yarn, streamingKafka, sparkGangliaLgpl,
60-
streamingKinesisAsl, dockerIntegrationTests, hadoopCloud) =
61-
Seq("mesos", "yarn", "streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
56+
val optionallyEnabledProjects@Seq(mesos, yarn,
57+
streamingFlumeSink, streamingFlume,
58+
streamingKafka, sparkGangliaLgpl, streamingKinesisAsl,
59+
dockerIntegrationTests, hadoopCloud) =
60+
Seq("mesos", "yarn",
61+
"streaming-flume-sink", "streaming-flume",
62+
"streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
6263
"docker-integration-tests", "hadoop-cloud").map(ProjectRef(buildLocation, _))
6364

6465
val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKafka010Assembly, streamingKinesisAslAssembly) =

python/pyspark/streaming/flume.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ def createStream(ssc, hostname, port,
5353
:param enableDecompression: Should netty server decompress input stream
5454
:param bodyDecoder: A function used to decode body (default is utf8_decoder)
5555
:return: A DStream object
56+
57+
.. note:: Deprecated in 2.3.0
5658
"""
5759
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
5860
helper = FlumeUtils._get_helper(ssc._sc)
@@ -79,6 +81,8 @@ def createPollingStream(ssc, addresses,
7981
will result in this stream using more threads
8082
:param bodyDecoder: A function used to decode body (default is utf8_decoder)
8183
:return: A DStream object
84+
85+
.. note:: Deprecated in 2.3.0
8286
"""
8387
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
8488
hosts = []

python/pyspark/streaming/tests.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1478,7 +1478,7 @@ def search_kafka_assembly_jar():
14781478
("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) +
14791479
"You need to build Spark with "
14801480
"'build/sbt assembly/package streaming-kafka-0-8-assembly/assembly' or "
1481-
"'build/mvn package' before running this test.")
1481+
"'build/mvn -Pkafka-0-8 package' before running this test.")
14821482
elif len(jars) > 1:
14831483
raise Exception(("Found multiple Spark Streaming Kafka assembly JARs: %s; please "
14841484
"remove all but one") % (", ".join(jars)))
@@ -1495,7 +1495,7 @@ def search_flume_assembly_jar():
14951495
("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) +
14961496
"You need to build Spark with "
14971497
"'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
1498-
"'build/mvn package' before running this test.")
1498+
"'build/mvn -Pflume package' before running this test.")
14991499
elif len(jars) > 1:
15001500
raise Exception(("Found multiple Spark Streaming Flume assembly JARs: %s; please "
15011501
"remove all but one") % (", ".join(jars)))
@@ -1516,6 +1516,9 @@ def search_kinesis_asl_assembly_jar():
15161516
return jars[0]
15171517

15181518

1519+
# Must be same as the variable and condition defined in modules.py
1520+
flume_test_environ_var = "ENABLE_FLUME_TESTS"
1521+
are_flume_tests_enabled = os.environ.get(flume_test_environ_var) == '1'
15191522
# Must be same as the variable and condition defined in modules.py
15201523
kafka_test_environ_var = "ENABLE_KAFKA_0_8_TESTS"
15211524
are_kafka_tests_enabled = os.environ.get(kafka_test_environ_var) == '1'
@@ -1538,9 +1541,16 @@ def search_kinesis_asl_assembly_jar():
15381541

15391542
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
15401543
testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
1541-
FlumeStreamTests, FlumePollingStreamTests,
15421544
StreamingListenerTests]
15431545

1546+
if are_flume_tests_enabled:
1547+
testcases.append(FlumeStreamTests)
1548+
testcases.append(FlumePollingStreamTests)
1549+
else:
1550+
sys.stderr.write(
1551+
"Skipped test_flume_stream (enable by setting environment variable %s=1"
1552+
% flume_test_environ_var)
1553+
15441554
if are_kafka_tests_enabled:
15451555
testcases.append(KafkaStreamTests)
15461556
else:

0 commit comments

Comments
 (0)