Commit 256ed505 authored by Ioannis Tsafaras's avatar Ioannis Tsafaras
Browse files

LAM-35 Send batch job output to Kafka

parent 4f952d22
......@@ -31,6 +31,19 @@
<target>1.7</target> <!-- If you want to use Java 8, change this to "1.8" -->
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.john.hwc.Hashtag_WordCount</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins></build>
......@@ -38,22 +51,17 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>0.8.0</version>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-connectors</artifactId>
<version>0.8.0</version>
<artifactId>flink-clients</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.8.1</version>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
</dependencies>
......
......@@ -18,21 +18,20 @@ package com.john.hwc;
* limitations under the License.
*/
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
......@@ -70,6 +69,16 @@ public class Hashtag_WordCount {
.groupBy(0)
.sum(1);
// Write result to Kafka
KafkaBatch kb = new KafkaBatch("batch-output", "localhost:9092");
kb.initialize();
List<Tuple4<String, Integer, String, String>> elements = counts.collect();
for (Tuple4<String, Integer, String, String> e: elements) {
kb.write((e.toString()));
}
// emit result to hdfs
counts.writeAsText("hdfs:///user/root/output", FileSystem.WriteMode.OVERWRITE);
......@@ -79,6 +88,45 @@ public class Hashtag_WordCount {
env.execute("Hashtag WordCount");
}
public static class KafkaBatch {
private Producer<String, String> producer;
private Properties props;
private String topicId;
private String brokerAddr;
private boolean initDone = false;
//private SerializationSchema<IN, OUT> scheme;
public KafkaBatch(String topicId, String brokerAddr) {
//SerializationSchema<IN, OUT> serializationSchema) {
this.topicId = topicId;
this.brokerAddr = brokerAddr;
//this.scheme = serializationSchema;
}
/**
* Initializes the connection to Kafka.
*/
public void initialize() {
props = new Properties();
props.put("metadata.broker.list", brokerAddr);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
initDone = true;
}
public void write(String s) {
producer.send(new KeyedMessage<String, String>(topicId, s));
}
}
//
// User Functions
//
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment