Commit 6bcd49bc authored by Ioannis Tsafaras's avatar Ioannis Tsafaras
Browse files

Flink job responsible for word-counting the hashtags of random tweets. The...

Flink job responsible for word-counting the hashtags of random tweets. The input tweets must be placed into hdfs, and the wordcount result is also output into the hdfs.
parent 14ed8f92
# lambda instance demo
## Batch process demo
Contains a java class (flink job) responsible for counting the hashtags of random tweets. The input tweets must be placed into hdfs, and the wordcount result is also output into the hdfs.
### Things to do before running
- Build the project using "mvn clean package" on the project root (maven is required). Alternatively, use the compiled jar file (target directory).
- Put the input files into hdfs directory /user/root/input
### Prerequisites
- Hadoop (HDFS & YARN) must be installed and running (tested version 2.7.0)
- Flink must be installed and running (tested version 0.8.1)
### How to run
flink run hwc-1.0.jar -v -p <number of processes>
### How to get the output
The wordcount output will be placed into the hdfs directory /user/root/output. Each worker will have it's own output file, named after it's number.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=""
<source>1.7</source> <!-- If you want to use Java 8, change this to "1.8" -->
<target>1.7</target> <!-- If you want to use Java 8, change this to "1.8" -->
\ No newline at end of file
package com.john.hwc;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
* Implements the "Hashtag_WordCount" program that computes a simple hashtag occurrence histogram
* over some randomly generated tweets
* <p>
* This example shows how to:
* <ul>
* <li>write a simple Flink program.
* <li>use Tuple data types.
* <li>write and use user-defined functio±ns.
* </ul>
public class Hashtag_WordCount {
// Program
public static void main(String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = env.readTextFile("hdfs:///user/root/input");
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new LineSplitter())
// group by the tuple field "0" and sum up tuple field "1"
// emit result to hdfs
counts.writeAsText("hdfs:///user/root/output", FileSystem.WriteMode.OVERWRITE);
// execute program
env.execute("Hashtag WordCount");
// User Functions
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into
* multiple pairs in the form of "(hashtag,1)" (Tuple2<String, Integer>).
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// Acquire hashtags
List<String> hashtags = new ArrayList<String>();
Matcher m = Pattern.compile("#(\\w+)")
while (m.find()) {
String[] tokens = hashtags.toArray(new String[hashtags.size()]);
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/java" isTestSource="false" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-core:0.8.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-java:0.8.0" level="project" />
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment