Commit cf8d9fa1 authored by Efi's avatar Efi
Browse files

Merge pull request #21 from ioantsaf/batch-hashtag-wordcount-timestamp

+1
parents a1264e98 609579df
# lambda instance demo
## Batch process demo
Contains a java class (flink job) responsible for counting the hashtags of random tweets. The input tweets must be placed into hdfs, and the wordcount result is also output into the hdfs.
### Things to do before running
- Build the project using "mvn clean package" on the project root (maven is required). Alternatively, use the compiled jar file (target directory).
- Put the input files into hdfs directory /user/root/input
### Prerequisites
- Hadoop (HDFS & YARN) must be installed and running (tested version 2.7.0)
- Flink must be installed and running (tested version 0.8.1)
### How to run
flink run hwc-1.0.jar -v -p <number of processes>
### How to get the output
The wordcount output will be placed into the hdfs directory /user/root/output. Each worker will have it's own output file, named after it's number.
...@@ -31,6 +31,19 @@ ...@@ -31,6 +31,19 @@
<target>1.7</target> <!-- If you want to use Java 8, change this to "1.8" --> <target>1.7</target> <!-- If you want to use Java 8, change this to "1.8" -->
</configuration> </configuration>
</plugin> </plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.john.hwc.Hashtag_WordCount</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins></build> </plugins></build>
...@@ -38,22 +51,17 @@ ...@@ -38,22 +51,17 @@
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId> <artifactId>flink-java</artifactId>
<version>0.8.0</version> <version>0.9.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId> <artifactId>flink-clients</artifactId>
<version>0.8.0</version> <version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-connectors</artifactId>
<version>0.8.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.kafka</groupId>
<artifactId>flink-clients</artifactId> <artifactId>kafka_2.10</artifactId>
<version>0.8.1</version> <version>0.8.2.1</version>
</dependency> </dependency>
</dependencies> </dependencies>
......
...@@ -18,15 +18,20 @@ package com.john.hwc; ...@@ -18,15 +18,20 @@ package com.john.hwc;
* limitations under the License. * limitations under the License.
*/ */
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Properties;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
...@@ -42,6 +47,7 @@ import java.util.regex.Pattern; ...@@ -42,6 +47,7 @@ import java.util.regex.Pattern;
* <li>write and use user-defined functio±ns. * <li>write and use user-defined functio±ns.
* </ul> * </ul>
* *
* @author Ioannis Tsafaras
*/ */
public class Hashtag_WordCount { public class Hashtag_WordCount {
...@@ -51,26 +57,80 @@ public class Hashtag_WordCount { ...@@ -51,26 +57,80 @@ public class Hashtag_WordCount {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String inputDir = "hdfs:///user/root/input", outputDir = "hdfs:///user/root/output",
outputTopic = "batch-output", kafkaBroker = "localhost:9092";
// set up the execution environment // set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data // get input data
DataSet<String> text = env.readTextFile("hdfs:///user/root/input"); DataSet<String> text = env.readTextFile(inputDir);
DataSet<Tuple2<String, Integer>> counts = DataSet<Tuple4<String, Integer, String, String>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1) // split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new LineSplitter()) text.flatMap(new LineSplitter())
// group by the tuple field "0" and sum up tuple field "1" // group by the tuple field "0" and sum up tuple field "1"
.groupBy(0) .groupBy(0)
.sum(1); .sum(1);
// Write result to Kafka
KafkaBatch kb = new KafkaBatch(outputTopic, kafkaBroker);
kb.initialize();
List<Tuple4<String, Integer, String, String>> elements = counts.collect();
for (Tuple4<String, Integer, String, String> e: elements) {
kb.write((e.toString()));
}
// emit result to hdfs // emit result to hdfs
counts.writeAsText("hdfs:///user/root/output", FileSystem.WriteMode.OVERWRITE); counts.writeAsText(outputDir, FileSystem.WriteMode.OVERWRITE);
//timestamp.writeAsText("hdfs:///user/root/output", FileSystem.WriteMode.OVERWRITE);
// execute program // execute program
env.execute("Hashtag WordCount"); env.execute("Hashtag WordCount");
} }
public static class KafkaBatch {
private Producer<String, String> producer;
private Properties props;
private String topicId;
private String brokerAddr;
private boolean initDone = false;
//private SerializationSchema<IN, OUT> scheme;
public KafkaBatch(String topicId, String brokerAddr) {
//SerializationSchema<IN, OUT> serializationSchema) {
this.topicId = topicId;
this.brokerAddr = brokerAddr;
//this.scheme = serializationSchema;
}
/**
* Initializes the connection to Kafka.
*/
public void initialize() {
props = new Properties();
props.put("metadata.broker.list", brokerAddr);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
initDone = true;
}
public void write(String s) {
producer.send(new KeyedMessage<String, String>(topicId, s));
}
}
// //
// User Functions // User Functions
// //
...@@ -80,10 +140,10 @@ public class Hashtag_WordCount { ...@@ -80,10 +140,10 @@ public class Hashtag_WordCount {
* FlatMapFunction. The function takes a line (String) and splits it into * FlatMapFunction. The function takes a line (String) and splits it into
* multiple pairs in the form of "(hashtag,1)" (Tuple2<String, Integer>). * multiple pairs in the form of "(hashtag,1)" (Tuple2<String, Integer>).
*/ */
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { public static final class LineSplitter implements FlatMapFunction<String, Tuple4<String, Integer, String, String>> {
@Override @Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { public void flatMap(String value, Collector<Tuple4<String, Integer, String, String>> out) {
// Acquire hashtags // Acquire hashtags
List<String> hashtags = new ArrayList<String>(); List<String> hashtags = new ArrayList<String>();
...@@ -94,10 +154,17 @@ public class Hashtag_WordCount { ...@@ -94,10 +154,17 @@ public class Hashtag_WordCount {
} }
String[] tokens = hashtags.toArray(new String[hashtags.size()]); String[] tokens = hashtags.toArray(new String[hashtags.size()]);
//SimpleDateFormat sdf = new SimpleDateFormat("dd-MM-yyyy,HH:mm:ss");
SimpleDateFormat sdfd = new SimpleDateFormat(" dd-MM-yyyy");
SimpleDateFormat sdft = new SimpleDateFormat(" HH:mm:ss");
Date dateobj = new Date();
String date = sdfd.format(dateobj);
String time = sdft.format(dateobj);
// emit the pairs // emit the pairs
for (String token : tokens) { for (String token : tokens) {
if (token.length() > 0) { if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1)); out.collect(new Tuple4<String, Integer, String, String>(token, 1, date, time));
} }
} }
} }
......
# lambda instance demo
## Prerequisites
- Hadoop (HDFS & YARN) must be installed and running (tested version 2.7.0)
- Flink must be installed and running (batch job requires 0.9.0, streaming job requires 0.8.1)
- Kafka must be installed and running (tested version 2.10-0.8.2.1)
## Things to do before running the examples
- Place the Kafka configurations provided into /usr/local/kafka/config/.
- Run create-input-topic.sh script to create the input topic named "input".
- Create (touch) a file named "runrand" in root.
- Run the scripts/rand_kafka_producer.sh. The script will run until the runrand file is deleted, producing random tweets every second.
## Batch process demo
Contains a java class (flink batch job) responsible for counting the hashtags of random tweets. The input tweets are read from hdfs. The wordcount result is sent to a Kafka topic and also output into the hdfs.
### Things to do before running the batch process demo
- Static compile the project using "mvn clean compile assembly:single" on the project root (maven is required).
- Create a Kafka topic named batch-output.
### How to run
- First, run scripts/kafka-batch-input-consumer.sh
- After that, run flink run hwc-1.0-jar-with-dependencies.jar -v -p &lt;number of processes&gt;
- The above can also be set as a crontab, to run the batch job at regular intervals.
### How to get the output
- The wordcount output will be sent into the Kafka topic named batch-output.
- The wordcount output will also be placed into the hdfs directory /user/root/output. Each worker will have it's own output file, named after it's number.
## Stream process demo
Contains a java class (flink stream job) responsible for counting the hashtags of random tweets. The input tweets are read from Kafka. The wordcount result is stored into the local filesystem.
### Things to do before running the stream process demo
- Replace the String zookeeper with your zookeeper hostname.
- Build the project using "mvn clean package" on the project root (maven is required).
- Create a Kafka topic named stream-output.
### How to run
flink run StreamingTagCount-1.0.jar -v -p &lt;number of processes&gt;
### How to get the output
- The wordcount output will also be placed into the local file /root/streaming_output, in any of the nodes.
- TODO: The wordcount output will be sent into the Kafka topic named stream-output.
...@@ -41,5 +41,10 @@ ...@@ -41,5 +41,10 @@
<version>0.8.1</version> <version>0.8.1</version>
<type>jar</type> <type>jar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-connectors</artifactId>
<version>0.8.1</version>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -2,10 +2,18 @@ package stream.streamingtagcount; ...@@ -2,10 +2,18 @@ package stream.streamingtagcount;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaSink;
import org.apache.flink.streaming.connectors.kafka.KafkaSource;
import org.apache.flink.streaming.connectors.util.SerializationSchema;
import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.Date;
/** /**
* *
* @author Efi Kaltirimidou * @author Efi Kaltirimidou
...@@ -13,31 +21,47 @@ import org.apache.flink.util.Collector; ...@@ -13,31 +21,47 @@ import org.apache.flink.util.Collector;
public class TagCount { public class TagCount {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String zookeeper= "snf-xxxxxx:2181", consGId = "consumer-stream", kafkaBroker = "snf-xxxxxx:9092", inputTopic = "input", outputTopic = "streaming-output",
outputFile = "/root/streaming_output";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//read input message from port 9999 of host and send it to the splitter class // Read input message from Kafka topic input and send it to the splitter class
DataStream<Tuple2<String, Integer>> dataStream = env DataStream<Tuple4<String, Integer, String, String>> text = env
.socketTextStream("0.0.0.0", 9999) .addSource(new KafkaSource<String>(zookeeper, consGId, inputTopic, new SimpleStringSchema()))
.flatMap(new Splitter()) .flatMap(new Splitter())
.groupBy(0) .groupBy(0)
.sum(1); .sum(1)
.addSink(new KafkaSink<Tuple4<String, Integer, String, String>, String>(outputTopic, kafkaBroker, new tuple4serialization()));
//write results to this file //write results to this file
dataStream.writeAsText("/root/streaming_output"); //text.writeAsText(outputFile);
//run the process //run the process
env.execute("Socket Stream WordCount"); env.execute("Socket Stream WordCount");
} }
public static class tuple4serialization implements SerializationSchema<Tuple4<String, Integer, String, String>, String> {
public String serialize(Tuple4<String, Integer, String, String> element) {
return element.toString();
}
}
//receives the messages, splits it between the words and the hashtags and then emits each hashtag and number of appearence //receives the messages, splits it between the words and the hashtags and then emits each hashtag and number of appearance
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { public static class Splitter implements FlatMapFunction<String, Tuple4<String, Integer, String, String>> {
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { public void flatMap(String sentence, Collector<Tuple4<String, Integer, String, String>> out) throws Exception {
String words[] = sentence.split(","); String words[] = sentence.split(",");
String tags = words[1].trim(); String tags = words[1].trim();
tags = tags.replace("'", ""); tags = tags.replace("'", "");
SimpleDateFormat sdfd = new SimpleDateFormat(" dd-MM-yyyy");
SimpleDateFormat sdft = new SimpleDateFormat(" HH:mm:ss");
Date dateobj = new Date();
String date = sdfd.format(dateobj);
String time = sdft.format(dateobj);
for (String word: tags.split(" ")) { for (String word: tags.split(" ")) {
out.collect(new Tuple2<>(word, 1)); out.collect(new Tuple4<String, Integer, String, String>(word, 1, date, time));
} }
} }
} }
......
# lambda instance demo
## Kafka data producer
Runs random tweet generator script (currently placed in /root/data-generator.py), and sends the output to the Kafka topic "input-tweets".
### Prerequisites
- Run on Debian 8.0 node
- Python and Kafka must be installed
- data-generator.py must be placed in /root
### How to run
Create (touch) a file named "runrand" in root.
Run the rand_kafka_producer.sh
The script will run until the runrand file is deleted, producing random tweets every second.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment