Commit bb03b3ec authored by Ioannis Tsafaras's avatar Ioannis Tsafaras
Browse files

LAM-35 Make kafka topic and broker variables. LAM-44 Read input directly from...

LAM-35 Make kafka topic and broker variables. LAM-44 Read input directly from Kafka in stream job. LAM-37 Make readme for demo, restructure example directories.
parent 256ed505
# lambda instance demo
## Batch process demo
Contains a java class (flink job) responsible for counting the hashtags of random tweets. The input tweets must be placed into hdfs, and the wordcount result is also output into the hdfs.
### Things to do before running
- Build the project using "mvn clean package" on the project root (maven is required). Alternatively, use the compiled jar file (target directory).
- Put the input files into hdfs directory /user/root/input
### Prerequisites
- Hadoop (HDFS & YARN) must be installed and running (tested version 2.7.0)
- Flink must be installed and running (tested version 0.8.1)
### How to run
flink run hwc-1.0.jar -v -p <number of processes>
### How to get the output
The wordcount output will be placed into the hdfs directory /user/root/output. Each worker will have it's own output file, named after it's number.
......@@ -47,6 +47,7 @@ import java.util.regex.Pattern;
* <li>write and use user-defined functio±ns.
* </ul>
*
* @author Ioannis Tsafaras
*/
public class Hashtag_WordCount {
......@@ -56,11 +57,14 @@ public class Hashtag_WordCount {
public static void main(String[] args) throws Exception {
String inputDir = "hdfs:///user/root/input", outputDir = "hdfs:///user/root/output",
outputTopic = "batch-output", kafkaBroker = "localhost:9092";
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = env.readTextFile("hdfs:///user/root/input");
DataSet<String> text = env.readTextFile(inputDir);
DataSet<Tuple4<String, Integer, String, String>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
......@@ -70,7 +74,7 @@ public class Hashtag_WordCount {
.sum(1);
// Write result to Kafka
KafkaBatch kb = new KafkaBatch("batch-output", "localhost:9092");
KafkaBatch kb = new KafkaBatch(outputTopic, kafkaBroker);
kb.initialize();
List<Tuple4<String, Integer, String, String>> elements = counts.collect();
for (Tuple4<String, Integer, String, String> e: elements) {
......@@ -81,7 +85,7 @@ public class Hashtag_WordCount {
// emit result to hdfs
counts.writeAsText("hdfs:///user/root/output", FileSystem.WriteMode.OVERWRITE);
counts.writeAsText(outputDir, FileSystem.WriteMode.OVERWRITE);
//timestamp.writeAsText("hdfs:///user/root/output", FileSystem.WriteMode.OVERWRITE);
// execute program
......
# lambda instance demo
## Prerequisites
- Hadoop (HDFS & YARN) must be installed and running (tested version 2.7.0)
- Flink must be installed and running (batch job requires 0.9.0, streaming job requires 0.8.1)
- Kafka must be installed and running (tested version 2.10-0.8.2.1)
## Things to do before running the examples
- Place the Kafka configurations provided into /usr/local/kafka/config/.
- Run create-input-topic.sh script to create the input topic named "input".
- Create (touch) a file named "runrand" in root.
- Run the scripts/rand_kafka_producer.sh. The script will run until the runrand file is deleted, producing random tweets every second.
## Batch process demo
Contains a java class (flink batch job) responsible for counting the hashtags of random tweets. The input tweets are read from hdfs. The wordcount result is sent to a Kafka topic and also output into the hdfs.
### Things to do before running the batch process demo
- Static compile the project using "mvn clean compile assembly:single" on the project root (maven is required).
- Create a Kafka topic named batch-output.
### How to run
- First, run scripts/kafka-batch-input-consumer.sh
- After that, run flink run hwc-1.0-jar-with-dependencies.jar -v -p &lt;number of processes&gt;
- The above can also be set as a crontab, to run the batch job at regular intervals.
### How to get the output
- The wordcount output will be sent into the Kafka topic named batch-output.
- The wordcount output will also be placed into the hdfs directory /user/root/output. Each worker will have it's own output file, named after it's number.
## Stream process demo
Contains a java class (flink stream job) responsible for counting the hashtags of random tweets. The input tweets are read from Kafka. The wordcount result is stored into the local filesystem.
### Things to do before running
- Replace the String zookeeper with your zookeeper hostname.
- Build the project using "mvn clean package" on the project root (maven is required).
- Create a Kafka topic named stream-output.
### How to run
flink run StreamingTagCount-1.0.jar -v -p &lt;number of processes&gt;
### How to get the output
- The wordcount output will also be placed into the local file /root/streaming_output, in any of the nodes.
- TODO: The wordcount output will be sent into the Kafka topic named stream-output.
......@@ -41,5 +41,10 @@
<version>0.8.1</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-connectors</artifactId>
<version>0.8.1</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -4,6 +4,9 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaSink;
import org.apache.flink.streaming.connectors.kafka.KafkaSource;
import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
import org.apache.flink.util.Collector;
/**
......@@ -13,18 +16,28 @@ import org.apache.flink.util.Collector;
public class TagCount {
public static void main(String[] args) throws Exception {
String zookeeper= "snf-xxxxxx:2181", consGId = "consumer-stream", inputTopic = "input", outputTopic = "stream-output",
outputFile = "/root/streaming_output";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Read input message from Kafka topic input and send it to the splitter class
DataStream<Tuple2<String, Integer>> text = env
.addSource(new KafkaSource<String>(zookeeper, consGId, inputTopic, new SimpleStringSchema()))
.flatMap(new Splitter())
.groupBy(0)
.sum(1);
//read input message from port 9999 of host and send it to the splitter class
DataStream<Tuple2<String, Integer>> dataStream = env
/*DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("0.0.0.0", 9999)
.flatMap(new Splitter())
.groupBy(0)
.sum(1);
.sum(1);*/
//write results to this file
dataStream.writeAsText("/root/streaming_output");
text.writeAsText(outputFile);
//run the process
env.execute("Socket Stream WordCount");
......
# lambda instance demo
## Kafka data producer
Runs random tweet generator script (currently placed in /root/data-generator.py), and sends the output to the Kafka topic "input-tweets".
### Prerequisites
- Run on Debian 8.0 node
- Python and Kafka must be installed
- data-generator.py must be placed in /root
### How to run
Create (touch) a file named "runrand" in root.
Run the rand_kafka_producer.sh
The script will run until the runrand file is deleted, producing random tweets every second.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment