Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
itminedu
okeanos-LoD
Commits
59473aa7
Commit
59473aa7
authored
Jun 25, 2015
by
efikalti
Browse files
LAM-21 java stream job
parent
e6132fb5
Changes
2
Hide whitespace changes
Inline
Side-by-side
StreamingTagCount/pom.xml
0 → 100644
View file @
59473aa7
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<groupId>
stream
</groupId>
<artifactId>
StreamingTagCount
</artifactId>
<version>
1.0
</version>
<packaging>
jar
</packaging>
<properties>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<maven.compiler.source>
1.7
</maven.compiler.source>
<maven.compiler.target>
1.7
</maven.compiler.target>
</properties>
<build>
<plugins>
<plugin>
<artifactId>
maven-assembly-plugin
</artifactId>
<version>
2.5.5
</version>
<configuration>
<descriptorRefs>
<descriptorRef>
jar-with-dependencies
</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-jar-plugin
</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>
stream.streamingtagcount.TagCount
</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-streaming-core
</artifactId>
<version>
0.8.1
</version>
<type>
jar
</type>
</dependency>
</dependencies>
</project>
\ No newline at end of file
StreamingTagCount/src/main/java/stream/streamingtagcount/TagCount.java
0 → 100644
View file @
59473aa7
package
stream.streamingtagcount
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.java.tuple.Tuple2
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.util.Collector
;
/**
*
* @author Efi Kaltirimidou
*/
public
class
TagCount
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
DataStream
<
Tuple2
<
String
,
Integer
>>
dataStream
=
env
.
socketTextStream
(
"0.0.0.0"
,
9999
)
.
flatMap
(
new
Splitter
())
.
groupBy
(
0
)
.
sum
(
1
);
//dataStream.writeAsText("/tmp/streaming_output");
dataStream
.
print
();
env
.
execute
(
"Socket Stream WordCount"
);
}
public
static
class
Splitter
implements
FlatMapFunction
<
String
,
Tuple2
<
String
,
Integer
>>
{
public
void
flatMap
(
String
sentence
,
Collector
<
Tuple2
<
String
,
Integer
>>
out
)
throws
Exception
{
String
words
[]
=
sentence
.
split
(
","
);
String
tags
=
words
[
1
].
trim
();
tags
=
tags
.
replace
(
"'"
,
""
);
for
(
String
word:
tags
.
split
(
" "
))
{
out
.
collect
(
new
Tuple2
<>(
word
,
1
));
}
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment