Skip to content

Streaming Benchmarks

lxy edited this page Oct 16, 2016 · 21 revisions

The Flow of Testing

Our testing framework consists of the following parts:

  • Data generator

The role of Data Generator is to generate a steady stream of data to Kafka Cluster. The data are laebled with timestamp and the feed name is Topic A.

  • Kafka cluster

Kafka is a message queue or messaging system. The Topic A is flowing from Kafka Cluster to Test Cluster.

  • Test cluster

Get records(Ra) from Kafka Cluster, run some simple tests and generate records(Rb) of Topic B. The records consist of the timestamp of Ra and the time generation of Rb. And asynchronously send Rb to Kafka Cluster.

  • Metrics reader

Get records of Topic B from Kafka Cluster, calculate the time difference and generate reports.

Getting Started for StreamingBench

Prerequirements

After you finished configuration described in Getting Started, the following steps are necessary:

  • Download & setup ZooKeeper (3.4.8 is preferred).

  • Download & setup Apache Kafka (0.8.2.2, scala version 2.10 is preferred).

You can choose the framework you want to test.

HiBench setup

The configuration file for Streaming Benchmarks consists of two directories: the conf and the workloads/streambench/conf/, the latter is overwrite the former.

  • The first file you need to configure is: conf/01-default-streamingbench.conf. The parameter of this file is the default value, the settings should not be changed again, if you need to adjust the parameters should be in the latter file.
Param Name Param Meaning
hibench.streambench.testCase Available benchname: identity, repartition, wordcount, fixwindow.
hibench.streambench.zkHost Zookeeper address for Kafka server. Written in mode HOSTNAME:HOSTPORT.
hibench.streambench.sampleProbability Probability used in sample test case.
hibench.streambench.debugMode Indicate whether in debug mode for correctness verification (default as false).
hibench.streambench.kafka.home /PATH/TO/KAFKA/HOME
hibench.streambench.kafka.topicPartitions The number of partitions of generated topic (default as: 20).
hibench.streambench.kafka.consumerGroup The consumer group of the consumer for Kafka (default as: HiBench).
hibench.streambench.kafka.brokerList Kafka broker lists, written in mode "host:port,host:port,..." (default: HOSTNAME:HOSTPORT).
hibench.streambench.kafka.offsetReset Set the starting offset of kafkaConsumer (default: largest).
hibench.streambench.datagen.intervalSpan Interval span in millisecond (default: 50).
hibench.streambench.datagen.recordsPerInterval Number of records to generate per interval span (default: 5).
hibench.streambench.datagen.totalRecords Number of total records that will be generated (default: -1 means infinity).
hibench.streambench.datagen.totalRounds Total round count of data send (default: -1 means infinity).
hibench.streambench.datagen.dir Default path to store seed files (default: ${hibench.hdfs.data.dir}/Streaming).
hibench.streambench.datagen.recordLength Fixed length of record (default: 200).
hibench.streambench.datagen.producerNumber Number of KafkaProducer running on different thread (default: 1). The limitation of a single KafkaProducer is about 100Mb/s.
hibench.streambench.fixWindowDuration The duration of window (in ms).
hibench.streambench.fixWindowSlideStep The slide step of window (in ms).
hibench.streambench.spark.receiverNumber Number of nodes that will receive kafka input (default: 4).
hibench.streambench.spark.batchInterval Spark streaming Batchnterval in millisecond (default 100).
hibench.streambench.spark.storageLevel Indicate RDD storage level. (default: 2). 0 means StorageLevel.MEMORY_ONLY. 1 means StorageLevel.MEMORY_AND_DISK_SER. other means StorageLevel.MEMORY_AND_DISK_SER_2.
hibench.streambench.spark.enableWAL Indicate whether to test the write ahead log new feature (default: false).
hibench.streambench.spark.checkpointPath If testWAL is true, this path to store stream context in hdfs shall be specified. If false, it can be empty (default: /var/tmp)
hibench.streambench.spark.useDirectMode Whether to use direct approach or not (dafault: true).
hibench.streambench.flink.home /PATH/TO/FLINK/HOME
hibench.streambench.flink.parallelism Default parallelism of Flink job.
hibench.streambench.flink.bufferTimeout
hibench.streambench.flink.checkpointDuration
hibench.streambench.storm.home /PATH/TO/STORM/HOME
hibench.streambench.storm.nimbus Nimbus of storm cluster.
hibench.streambench.storm.nimbusAPIPort Nimbus port (default as 6627).
hibench.streambench.storm.nimbusContactInterval Time interval to contact nimbus to judge if finished.
hibench.streambench.storm.worker_count Number of workers of Storm. Number of most bolt threads is also equal to this param.
hibench.streambench.storm.spout_threads Number of kafka spout threads of Storm.
hibench.streambench.storm.bolt_threads Number of bolt threads altogether.
hibench.streambench.storm.read_from_start Kafka arg indicating whether to read data from kafka from the start or go on to read from last position (default as true).
hibench.streambench.storm.ackon Whether to run on ack (default as true).
hibench.streambench.gearpump.home /PATH/TO/GEARPUMP/HOME
hibench.streambench.gearpump.executors
hibench.streambench.gearpump.parallelism
  • The second file you can configure is: workloads/streambench/conf/10-streamingbench-userdefine.conf. The configuration parameters of this file are to rewrite the above parameters, all the parameters need to be adjusted can be rewritten here.

Run.

Usually you need to run the streaming data generation scripts to push data to Kafka while running the streaming job. Please create the Kafka topics first, generate the seed file and then generate the real data. You can run the following 3 scripts.

  workloads/strembench/prepare/initTopics.sh

  workloads/strembench/prepare/genSeedDataset.sh

  workloads/strembench/prepare/gendata.sh

While the data are being sent to kafka, start the streaming job like Spark Streaming to process the data:

  workloads/strembench/spark/bin/run.sh

View the report.

You can use metrics reader to collect the report data. The metrics reader need to read data from Kafka, so you need to provide the topic (topic_b).

  workloads/streambench/common/metrics_reader.sh

Note: You need to modify several variables before using this script:

  • KAFKA_FOLDER: The folder of Kafka.
  • ZK_HOST: Zookeeper address for Kafka server. Written in mode HOST:PORT,HOST:PORT.
  • NUM_OF_RECORDS: The number of records reading data from Kafka.
  • NUM_OF_THREADS: The number of threads used to read data from Kafka.
  • REPORT_PATH: The location where the report is stored.

Now you can see the report(topic_b.csv) in the ${REPORT_PATH} directory.

Stop the streaming workloads.

For SparkStreaming press ctrl+c will stop the works. For Storm & Trident, you'll need to execute storm/bin/stop.sh to stop the works. For Samza, currently you'll have to kill all applications in YARN manually, or restart YARN cluster directly.