Commit 77a500d6 authored by Themis Zamani's avatar Themis Zamani

Revert "Batch hashtag wordcount"

parent 9ab96c97
hwc.iml
.idea
target/*
!target/hwc-1.0.jar
# lambda instance demo
## Batch process demo
Contains a java class (flink job) responsible for counting the hashtags of random tweets. The input tweets must be placed into hdfs, and the wordcount result is also output into the hdfs.
### Things to do before running
- Build the project using "mvn clean package" on the project root (maven is required). Alternatively, use the compiled jar file (target directory).
- Put the input files into hdfs directory /user/root/input
### Prerequisites
- Hadoop (HDFS & YARN) must be installed and running (tested version 2.7.0)
- Flink must be installed and running (tested version 0.8.1)
### How to run
flink run hwc-1.0.jar -v -p <number of processes>
### How to get the output
The wordcount output will be placed into the hdfs directory /user/root/output. Each worker will have it's own output file, named after it's number.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.john.hwc</groupId>
<artifactId>hwc</artifactId>
<version>1.0</version>
<build><plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifestEntries>
<Main-Class>com.john.hwc.Hashtag_WordCount</Main-Class>
</manifestEntries>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source> <!-- If you want to use Java 8, change this to "1.8" -->
<target>1.7</target> <!-- If you want to use Java 8, change this to "1.8" -->
</configuration>
</plugin>
</plugins></build>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-connectors</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.8.1</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.john.hwc;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Implements the "Hashtag_WordCount" program that computes a simple hashtag occurrence histogram
* over some randomly generated tweets
*
* <p>
* This example shows how to:
* <ul>
* <li>write a simple Flink program.
* <li>use Tuple data types.
* <li>write and use user-defined functio±ns.
* </ul>
*
*/
public class Hashtag_WordCount {
//
// Program
//
public static void main(String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = env.readTextFile("hdfs:///user/root/input");
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new LineSplitter())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result to hdfs
counts.writeAsText("hdfs:///user/root/output", FileSystem.WriteMode.OVERWRITE);
// execute program
env.execute("Hashtag WordCount");
}
//
// User Functions
//
/**
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into
* multiple pairs in the form of "(hashtag,1)" (Tuple2<String, Integer>).
*/
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// Acquire hashtags
List<String> hashtags = new ArrayList<String>();
Matcher m = Pattern.compile("#(\\w+)")
.matcher(value.toLowerCase());
while (m.find()) {
hashtags.add(m.group(0));
}
String[] tokens = hashtags.toArray(new String[hashtags.size()]);
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/java" isTestSource="false" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-core:0.8.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-java:0.8.0" level="project" />
</component>
</module>
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment