简介
https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/
Apache Flink 是一个在有界数据流和无界数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨在所有常见的集群环境中运行,以任意规模和内存级速度执行计算。
概览
- 事件驱动(Event-driven)
- 基于流的处理
流分为两种:
- 有界流(离线数据)
- 无界流(实时数据)
- Flink 为流式/批式处理应用程序的开发提供了不同级别的抽象。
- 越顶层越抽象,表达含义越简明,使用越方便
- 越底层越具体,表达能力越丰富,使用越灵活
- Flink API 最底层的抽象为有状态实时流处理。
- Flink API 第二层抽象是 Core APIs。
- Flink API 第三层抽象是 Table API。
- Flink API 最顶层抽象是 SQL。
Flink vs Spark Streaming
- 数据模型
- Spark采用RDD模型,Spark Streaming的Dstream实际上也就是一组小批数据的集合。
- Flink基本数据模型是数据流,以及事件(Event)序列。
- 运行时的架构
- Spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算一个。
- Flink是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理。
快速上手
使用不同的方式实现WordCount
flink-streaming-scala_2.12 => org.apache.flink:flink-runtime_2.12:1.12.0 => com.typesafe.akka:akka-actor_2.12:2.5.21,akka就是用scala实现的。即使这里我们用java语言,还是用到了scala实现的包
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>org.flink</groupId> <artifactId>flink</artifactId> <version>1.0-SNAPSHOT</version>
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.12.0</flink.version> <scala.binary.version>2.12</scala.binary.version> </properties>
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
</project>
|
批处理方式代码实现
批处理=>几组或所有数据到达后才处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector;
public class WordCount { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> inputDataSet = env.readTextFile("/flink/src/main/resources/1.txt");
DataSet<Tuple2<String, Integer>> dataSet = inputDataSet
.flatMap(new MyFlatMapper())
.groupBy(0)
.sum(1); dataSet.print(); }
static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] s = value.split(" "); for (String s1 : s) { out.collect(new Tuple2<>(s1, 1)); } } } }
|
流处理方式代码实现
流处理=>有数据来就直接处理,不等数据堆叠到一定数量级
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class StreamWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(8); DataStreamSource<String> streamSource = env.readTextFile("/flink/src/main/resources/1.txt"); DataStream<Tuple2<String, Integer>> dataStream = streamSource.flatMap(new WordCount.MyFlatMapper()) .keyBy(item->item.f0) .sum(1); dataStream.print(); env.execute(); } }
|
与批处理区别
- 这里不像批处理有groupBy => 所有数据统一处理,而是用流处理的keyBy => 每一个数据都对key进行hash计算,进行类似分区的操作,来一个数据就处理一次,所有中间过程都会输出!
- 并行度:开发环境的并行度默认就是计算机的CPU逻辑核数
- 流处理前面的序号就是并行执行任务的线程编号。
- setParallelism()可以设置线程数。
流式数据源
这里要借用NetCat工具
nc -lk 7777
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class StreamDataSourceWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(8); DataStream<String> streamSource = env.socketTextStream("127.0.0.1", 7777); DataStream<Tuple2<String, Integer>> dataStream = streamSource.flatMap(new WordCount.MyFlatMapper()) .keyBy(item -> item.f0) .sum(1); dataStream.print(); env.execute(); } }
|
在本地开启的socket中输入数据,观察IDEA的console输出。
部署
下载地址
https://flink.apache.org/zh/downloads.html
Local 模式
Local Cluster模式是开箱即用的,直接解压安装包,然后启动即可。
1 2 3 4 5
| tar -zxvf flink-1.12.0-bin-scala_2.12.tgz
cd flink-1.12.0 ./start-cluster.sh
|
浏览器访问:http://localhost:8081
可以看到Flink的控制页面
Standalone模式
Stanalone CLuster是一种独立的集群模式,集群运行不需要依赖外部系统,完全自己独立进行管理。
部署步骤
- 修改conf目录下的
flink-conf.yaml
1 2
| jobmanager.rpc.address: server1
|
其他使用默认配置,其中有一些HA高可用、容错、安全、HistoryServer相关配置,按需进行配置即可,HistoryServer需单独运行启动脚本来启动服务。
- 修改
masters
文件
1 2 3 4
| vim masters
server:8081
|
- 修改
slaves
文件
1 2 3 4
| vim slaves
server2 server3
|
- 将安装目录复制到另外两台节点
- 配置环境变量,修改/etc/profile
1 2 3 4 5 6
| export FLINK_HOME=~/flink-1.10.0 export PATH=$PATH:$FLINK_HOME/bin //使配置文件生效 source /etc/profile
|
- 启动集群
start-cluster.sh
浏览器访问:http://localhost:8081
可以看到Flink的控制页面
yarn模式
YARN模式是使用YARN做为Flink运行平台,JobManager、TaskManager、用户提交的应用程序都运行在YARN上。
Kubernetes部署