Skip to content

Commit

Permalink
[SPARK-3912][Streaming] Fixed flakyFlumeStreamSuite
Browse files Browse the repository at this point in the history
@harishreedharan @pwendell
See JIRA for diagnosis of the problem
https://issues.apache.org/jira/browse/SPARK-3912

The solution was to reimplement it.
1. Find a free port (by binding and releasing a server-scoket), and then use that port
2. Remove thread.sleep()s, instead repeatedly try to create a sender and send data and check whether data was sent. Use eventually() to minimize waiting time.
3. Check whether all the data was received, without caring about batches.

Author: Tathagata Das <[email protected]>

Closes apache#2773 from tdas/flume-test-fix and squashes the following commits:

93cd7f6 [Tathagata Das] Reimplimented FlumeStreamSuite to be more robust.
  • Loading branch information
tdas committed Oct 14, 2014
1 parent 9eb49d4 commit 4d26aca
Showing 1 changed file with 102 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,103 +17,141 @@

package org.apache.spark.streaming.flume

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}

import java.net.InetSocketAddress
import java.net.{InetSocketAddress, ServerSocket}
import java.nio.ByteBuffer
import java.nio.charset.Charset

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps

import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
import org.apache.flume.source.avro
import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.socket.SocketChannel
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.handler.codec.compression._
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
import org.scalatest.concurrent.Eventually._

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerReceiverStarted}
import org.apache.spark.util.Utils

import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.socket.SocketChannel
import org.jboss.netty.handler.codec.compression._
class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")

var ssc: StreamingContext = null
var transceiver: NettyTransceiver = null

class FlumeStreamSuite extends TestSuiteBase {
after {
if (ssc != null) {
ssc.stop()
}
if (transceiver != null) {
transceiver.close()
}
}

test("flume input stream") {
runFlumeStreamTest(false)
testFlumeStream(testCompression = false)
}

test("flume input compressed stream") {
runFlumeStreamTest(true)
testFlumeStream(testCompression = true)
}

/** Run test on flume stream */
private def testFlumeStream(testCompression: Boolean): Unit = {
val input = (1 to 100).map { _.toString }
val testPort = findFreePort()
val outputBuffer = startContext(testPort, testCompression)
writeAndVerify(input, testPort, outputBuffer, testCompression)
}

/** Find a free port */
private def findFreePort(): Int = {
Utils.startServiceOnPort(23456, (trialPort: Int) => {
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)
})._2
}

def runFlumeStreamTest(enableDecompression: Boolean) {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val (flumeStream, testPort) =
Utils.startServiceOnPort(9997, (trialPort: Int) => {
val dstream = FlumeUtils.createStream(
ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
(dstream, trialPort)
})

/** Setup and start the streaming context */
private def startContext(
testPort: Int, testCompression: Boolean): (ArrayBuffer[Seq[SparkFlumeEvent]]) = {
ssc = new StreamingContext(conf, Milliseconds(200))
val flumeStream = FlumeUtils.createStream(
ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()
ssc.start()
outputBuffer
}

val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
var client: AvroSourceProtocol = null

if (enableDecompression) {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol],
new NettyTransceiver(new InetSocketAddress("localhost", testPort),
new CompressionChannelFactory(6)))
} else {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver)
}
/** Send data to the flume receiver and verify whether the data was received */
private def writeAndVerify(
input: Seq[String],
testPort: Int,
outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]],
enableCompression: Boolean
) {
val testAddress = new InetSocketAddress("localhost", testPort)

for (i <- 0 until input.size) {
val inputEvents = input.map { item =>
val event = new AvroFlumeEvent
event.setBody(ByteBuffer.wrap(input(i).toString.getBytes("utf-8")))
event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8")))
event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
client.append(event)
Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
event
}

Thread.sleep(1000)

val startTime = System.currentTimeMillis()
while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
Thread.sleep(100)
eventually(timeout(10 seconds), interval(100 milliseconds)) {
// if last attempted transceiver had succeeded, close it
if (transceiver != null) {
transceiver.close()
transceiver = null
}

// Create transceiver
transceiver = {
if (enableCompression) {
new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
} else {
new NettyTransceiver(testAddress)
}
}

// Create Avro client with the transceiver
val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver)
client should not be null

// Send data
val status = client.appendBatch(inputEvents.toList)
status should be (avro.Status.OK)
}
Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
logInfo("Stopping context")
ssc.stop()

val decoder = Charset.forName("UTF-8").newDecoder()

assert(outputBuffer.size === input.length)
for (i <- 0 until outputBuffer.size) {
assert(outputBuffer(i).size === 1)
val str = decoder.decode(outputBuffer(i).head.event.getBody)
assert(str.toString === input(i).toString)
assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")

val decoder = Charset.forName("UTF-8").newDecoder()
eventually(timeout(10 seconds), interval(100 milliseconds)) {
val outputEvents = outputBuffer.flatten.map { _.event }
outputEvents.foreach {
event =>
event.getHeaders.get("test") should be("header")
}
val output = outputEvents.map(event => decoder.decode(event.getBody()).toString)
output should be (input)
}
}

class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
/** Class to create socket channel with compression */
private class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
val encoder = new ZlibEncoder(compressionLevel)
pipeline.addFirst("deflater", encoder)
Expand Down

0 comments on commit 4d26aca

Please sign in to comment.