-
Notifications
You must be signed in to change notification settings - Fork 3
Contributing
Contribuing to the benchmark can be done by creating/fixing issues, adding new applications to the benchmark or implementing the benchmark on a new platform.
In order to implement the benchmark in another platform one must follow the reference implementation dspbench-threads, both for the logic of each application as well as the organization and code styling (if new implementation is in Java).
Here are some guidelines for a new implementation:
- Follow the naming convention for applications. For using the full name of the application use it removing the hyphens, and for prefixes follow the table.
- Usually stream processing platforms need a
main
method or class that will create all the required artifacts to run the application. As a good practice, we centralize that into a single class/main method (examples: StormRunner.class, SparkStreamingRunner.class, LocalTaskRunner.class). - The benchmark main method should use a good argument parser library (we user in Java JCommander). The minimum list of arguments should be the name of the application (the list of applications can be defined in a registry like this) and the serialized configuration for the application (a comma separated list of key/value pairs, example:
config1=value1,config2=value2
). Any configuration that is related to an application should go into the configuration, not in the arguments. - Abstractions should be used to avoid boilerplate code on platform components like sources and operators.
- Sources and sinks for all applications should be swappable. This means that through configuration one should be able to set the class of the Source or Sink that will be used and change that for another one if necessary. In Storm and Threads we define in configuration the full name of the class and use reflection to instantiate it, and we use an auxiliary parser class to get the raw data from the Source and format it to the proper format type. This way we can run an application using a file source and change that to Kafka without the need to rewrite the application.
- Follow the package naming and organization of Storm, but changing the component names as necessary (change
bolt
tooperator
ortransformation
andspout
tosource
, for example). - If the platform also supports Java, a good amount of code can be copied to the new implementation, as the logic for data processing is still the same. In the future we are going to move common code to another library
dspbench-commons
.
Instead of each application having its own sources and sinks (bolts that send data to other systems), we have defined a few basic sources and sinks.
All but the GeneratorSource
need a Parser
. The parser receives a string and returns a list of values, following the schema defined in the task. To set a source that reads from a file and parses the data as a Common Log Format, the configuration file would look like this:
<app-prefix>.source.threads=1
<app-prefix>.source.class=io.dspbench.base.FileSource
<app-prefix>.source.path=./data/http-server.log
<app-prefix>.source.parser=io.dspbench.applications.logprocessing.CommonLogParser
Defalult parsers:
Parse | Output Fields |
---|---|
AdEventParser | (quer_id, ad_id, event) |
BeijingTaxiTraceParser | (car_id, date, occ, speed, bearing, lat, lon) |
ClickStreamParser | (ip, url, client_key) |
CommonLogParser | (ip, timestamp, minute, request, response, byte_size) |
DublinBusTraceParser | (car_id, date, occ, speed, bearing, lat, lon) |
GoogleTracesParser | (timestamp, id, cpu, memory) |
JsonEmailParser | (id, message[, is_spam]) |
JsonParser | (json_object) |
SensorParser | (id, timestamp, value) |
SmartPlugParser | (id, timestamp, value, property, plugId, householdId, houseId) |
StringParser | (string) |
TransactionParser | (event_id, actions) |
The GeneratorSource
doesn't need a parser, instead it uses an instance of a class that extends the Generator
class. Each time the generator is called it returns a new tuple.
<app-prefix>.spout.threads=1
<app-prefix>.spout.class=storm.applications.spout.GeneratorSpout
<app-prefix>.spout.generator=storm.applications.spout.generator.SensorGenerator
Defalult generators:
Generator | Configurations |
---|---|
CDRGenerator |
vs.generator.population vs.generator.error_prob
|
MachineMetadataGenerator | mo.generator.num_machines |
RandomSentenceGenerator | |
SensorGenerator | sd.generator.count |
SmartPlugGenerator | -- |
The SmartPlugGenerator is an adaptation of a generator built by [Alessandro Sivieri][25]:
Generates a dataset of a random set of smart plugs, each being part of a household, which is, in turn, part of a house. Each smart plug records the actual load (in Watts) at each second. The generated dataset is inspired by the DEBS 2014 challenge and follow a similar format, a sequence of 6 comma separated values for each line (i.e., for each reading):
- a unique identifier of the measurement [64 bit unsigned integer value]
- a timestamp of measurement (number of seconds since January 1, 1970, 00:00:00 GMT) [64 bit unsigned integer value]
- a unique identifier (within a household) of the smart plug [32 bit unsigned integer value]
- a unique identifier of a household (within a house) where the plug is located [32 bit unsigned integer value]
- a unique identifier of a house where the household with the plug is located [32 bit unsigned integer value]
- the measurement [32 bit unsigned integer]
This class generates smart plug readings at fixed time intervals, storing them
into a queue that will be consumed by a GeneratorSpout
.
The readings are generated by a separated thread and the interval resolutions is of seconds. In order to increase the volume of readings you can decrease the interval down to 1 second. If you need more data volume you will have to tune the other configuration parameters.
Configurations parameters:
-
sg.generator.interval_seconds
: interval of record generation in seconds. -
sg.generator.houses.num
: number of houses in the scenario. -
sg.generator.households.min
andsg.generator.households.max
: the range of number of households within a house. -
sg.generator.plugs.min
andsg.generator.plugs.max
: the range of smart plugs within a household. -
sg.generator.load.list
: a comma-separated list of peak loads that will be randomly assigned to smart plugs. -
sg.generator.load.oscillation
: by how much the peak load of the smart plug will oscillate. -
sg.generator.on.probability
: the probability of the smart plug being on. -
sg.generator.on.lengths
: a comma-separated list of lengths of time to be selected from to set the amount of time that the smart plug will be on.
By using hooks and the metrics library it is possible to collect performance metrics of operators and sources. In operators the information is collected about the number of received and emitted tuples and the execution time, while for sources information about the complete latency and emitted tuples is recorded. To enable metric collection, use the following configuration:
metrics.enabled=true
metrics.reporter=csv
metrics.interval.value=2
metrics.interval.unit=seconds
metrics.output=/tmp