1. Flink开发环境安装

1.1 前置安装

开发工具IDEA
Docker环境(Flink平台运行在docker中)
jdk支持
Maven支持
netcat支持

jdk,Maven推荐OSX用brew安装,Windows用Chocolatey安装
netcat在Windows上用Chocolatey安装,命令为”choco install netcat”

1.1 docker-compose 安装flink

docker-compose.yml

version: "2.1"
services:
  jobmanager:
    image: flink
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: flink
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

运行

docker-compose up -d

浏览器打开 http://127.0.0.1:8081 可以看到dashboard

2. 编写一个简单的流处理 job

假设这个job项目名称为helloword
这个job用nc程序监听一个tcp服务,使其可以输入文本流
计算单词出现次数

2.1 Maven 初始化项目

mvn archetype:generate \
      -DarchetypeGroupId=org.apache.flink \
      -DarchetypeArtifactId=flink-quickstart-java \
      -DarchetypeVersion=1.7.2

设建立的项目
groupId为com.iamle.flink
artifactId为helloword

如果是 Windows,建议使用 Git Bash 终端执行mvn

2.2 IDEA打开helloword

删除自动生成的 BatchJob.java、StreamingJob.java
新建SocketWindowWordCount.java

package com.iamle.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // 输入tcp流
        final int port;
        final String host;
        port = 9008; // nc监听的tcp端口
        host = "192.168.0.8"; // docker宿主机ip

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream(host, port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // Data type for words with count
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

pom.xml 入口类
com.iamle.flink.StreamingJob 改为
com.iamle.flink.SocketWindowWordCount

2.3 编译项目jar

cd  helloword
mvn clean package

在项目中的target目录中得到 helloword-1.0-SNAPSHOT-shaded.jar

3. 提交job

先运行流的输入服务

nc -l 9008
# nc -l -p 9008 # windows

方式1
打开 http://127.0.0.1:8081/#/submit dashboard后
在Submit new job中上传helloword-1.0-SNAPSHOT-shaded.jar
upload后>勾选>Submit

在nc终端,随便输入一些字符串后回车
在名为taskmanager的docker实例标准终端中可以看到计算结果(可以用kitematic打开docker容器实例查看其标准终端)

4. 官方示例

flink官方示例