Commit 4f952d22 authored by Ioannis Tsafaras's avatar Ioannis Tsafaras
Browse files

LAM-35 Append timestamp to tuples

parent 78734435
......@@ -22,10 +22,16 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
......@@ -57,15 +63,17 @@ public class Hashtag_WordCount {
// get input data
DataSet<String> text = env.readTextFile("hdfs:///user/root/input");
DataSet<Tuple2<String, Integer>> counts =
DataSet<Tuple4<String, Integer, String, String>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new LineSplitter())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result to hdfs
counts.writeAsText("hdfs:///user/root/output", FileSystem.WriteMode.OVERWRITE);
//timestamp.writeAsText("hdfs:///user/root/output", FileSystem.WriteMode.OVERWRITE);
// execute program
env.execute("Hashtag WordCount");
......@@ -80,10 +88,10 @@ public class Hashtag_WordCount {
* FlatMapFunction. The function takes a line (String) and splits it into
* multiple pairs in the form of "(hashtag,1)" (Tuple2<String, Integer>).
*/
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
public static final class LineSplitter implements FlatMapFunction<String, Tuple4<String, Integer, String, String>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
public void flatMap(String value, Collector<Tuple4<String, Integer, String, String>> out) {
// Acquire hashtags
List<String> hashtags = new ArrayList<String>();
......@@ -94,10 +102,17 @@ public class Hashtag_WordCount {
}
String[] tokens = hashtags.toArray(new String[hashtags.size()]);
//SimpleDateFormat sdf = new SimpleDateFormat("dd-MM-yyyy,HH:mm:ss");
SimpleDateFormat sdfd = new SimpleDateFormat(" dd-MM-yyyy");
SimpleDateFormat sdft = new SimpleDateFormat(" HH:mm:ss");
Date dateobj = new Date();
String date = sdfd.format(dateobj);
String time = sdft.format(dateobj);
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
out.collect(new Tuple4<String, Integer, String, String>(token, 1, date, time));
}
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment