Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
itminedu
okeanos-LoD
Commits
c257a5dc
Commit
c257a5dc
authored
Jun 26, 2015
by
efikalti
Browse files
LAM-21 added comments explaining the process
parent
a54ff743
Changes
2
Hide whitespace changes
Inline
Side-by-side
example/StreamingTagCount/Kafka-Streaming-1.0.jar
0 → 100644
View file @
c257a5dc
File added
example/StreamingTagCount/src/main/java/stream/streamingtagcount/TagCount.java
View file @
c257a5dc
...
...
@@ -16,17 +16,21 @@ public class TagCount {
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
//read input message from port 9999 of host and send it to the splitter class
DataStream
<
Tuple2
<
String
,
Integer
>>
dataStream
=
env
.
socketTextStream
(
"0.0.0.0"
,
9999
)
.
flatMap
(
new
Splitter
())
.
groupBy
(
0
)
.
sum
(
1
);
//dataStream.writeAsText("/tmp/streaming_output");
dataStream
.
print
();
//write results to this file
dataStream
.
writeAsText
(
"/root/streaming_output"
);
//run the process
env
.
execute
(
"Socket Stream WordCount"
);
}
//receives the messages, splits it between the words and the hashtags and then emits each hashtag and number of appearence
public
static
class
Splitter
implements
FlatMapFunction
<
String
,
Tuple2
<
String
,
Integer
>>
{
public
void
flatMap
(
String
sentence
,
Collector
<
Tuple2
<
String
,
Integer
>>
out
)
throws
Exception
{
String
words
[]
=
sentence
.
split
(
","
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment