Flink On Standalone 即Flink任务运行在Standalone集群中,Standlone集群部署时采用Session模式来构建集群,即:首先构建一个Flink集群,Flink集群资源就固定了,所有提交到该集群的Flink作业都运行在这一个集群中,如果集群中提交的任务多资源不够时,需要手动增加节点,所以Flink 基于Standalone运行任务一般用在开发测试或者企业实时业务较少的场景下。
(资料图片仅供参考)
Flink On Standalone 任务提交支持Session会话模式和Application应用模式,不支持Per-Job单作业模式。下面介绍基于Standalone 的Session会话模式和Application应用模式任务提交命令和原理,演示两类任务提交模式的代码还是以上一章节中读取Socket 数据进行实时WordCount统计代码为例,代码如下:
package com.lanson.flinkjava.code.chapter4;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * 读取Socket数据进行实时WordCount统计 */public class SocketWordCount { public static void main(String[] args) throws Exception { //1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.读取Socket数据 DataStreamSource ds = env.socketTextStream("node3", 9999); //3.准备K,V格式数据 SingleOutputStreamOperator> tupleDS = ds.flatMap((String line, Collector> out) -> { String[] words = line.split(","); for (String word : words) { out.collect(Tuple2.of(word, 1)); } }).returns(Types.TUPLE(Types.STRING, Types.INT)); //4.聚合打印结果 tupleDS.keyBy(tp -> tp.f0).sum(1).print(); //5.execute触发执行 env.execute(); }}
将以上代码进行打包,名称为"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar",并在node3节点上启动socket服务(nc -lk 9999)。
在Standalone集群搭建完成后,基于Standalone集群提交Flink任务方式就是使用的Session模式,提交任务之前首先启动Standalone集群($FLINK_HOME/bin/start-cluster.sh),然后再提交任务,Standalone Session模式提交任务命令如下:
[root@node1 ~]# cd /software/flink-1.16.0/bin/[root@node1 bin]# ./flink run -m node1:8081 -d -c com.lanson.flinkjava.code.chapter4.SocketWordCount /root/flink-jar-test/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar
以上提交任务的参数解释如下:
参数 | 解释 |
---|---|
-m | --jobmanager,指定提交任务连接的JobManager地址。 |
-c | --class,指定运行的class主类。 |
-d | --detached,任务提交后在后台独立运行,退出客户端,也可不指定。 |
-p | --parallelism,执行程序的并行度。 |
以上任务提交完成后,我们可以登录Flink WebUI(https://node1:8081)查看启动一个任务:
再次按照以上命令提交Flink任务可以看到集群中会有2个任务,说明Standalone Session模式下提交的所有Flink任务共享集群资源,如下:
以上提交Flink流任务的名称默认为"Flink Streaming Job",也可以通过参数"pipeline.name"来自定义指定Job 名称,提交命令如下:
./flink run -m node1:8081 -d -Dpipeline.name=socket-wc1 -c com.lanson.flinkjava.code.chapter4.SocketWordCount /root/flink-jar-test/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar
提交之后,可以看到页面中有三个任务,最后一个任务提交的名称改成了自定义任务名称。
Standalone Session模式提交任务中首先需要创建Flink集群,集群创建启动的同时Dispatcher、JobMaster、ResourceManager对象一并创建、TaskManager也一并启动,TaskManager会向集群ResourceManager汇报Slot信息,Flink集群资源也就确定了。Standalone Session模式提交任务流程如下:
在客户端提交Flink任务,客户端会将任务转换成JobGraph提交给JobManager。Dispatcher将提交任务提交给JobMaster。JobMaster向ResourceManager申请Slot资源。ResourceManager会在对应的TaskManager上划分Slot资源。TaskManager向JobMaster offer Slot资源。JobMaster将任务对应的task发送到TaskManager上执行。Standalone Application模式中不会预先创建Flink集群,在提交Flink 任务的同时会创建JobManager,启动Flink集群,然后需要手动启动TaskManager连接该Flink集群,启动的TaskManager会根据$FLINK_HOME/conf/flink-conf.yaml配置文件中的"jobmanager.rpc.address"配置找JobManager,所以这里选择在node1节点上提交任务并启动JobManager,方便后续其他节点启动TaskManager后连接该节点。Standalone Appliction模式提交任务步骤和命令如下:
1.1、准备Flink jar包
在node1节点上将Flink 打好的"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar"jar包放在 $FLINK_HOME/lib目录下。
1.2、提交任务,在node1 节点上启动 JobManager
cd /software/flink-1.16.0/bin/
#执行如下命令,启动JobManager ./standalone-job.sh start --job-classname com.lanson.flinkjava.code.chapter4.SocketWordCount
执行以上命令后会自动从$FLINK_HOME/lib中扫描所有jar包,执行指定的入口类。命令执行后可以访问对应的Flink WebUI:https://node1:8081,可以看到提交的任务,但是由于还没有执行TaskManager任务无法执行。
1.3、启动TaskManager
在node1、node2、node3任意一台节点上启动taskManager,根据$FLINK_HOME/conf/flink-conf.yaml配置文件中"jobmanager.rpc.address"配置项会找到对应node1 JobManager。
#在node1节点上启动TaskManager[root@node1 ~]# cd /software/flink-1.16.0/bin/[root@node1 bin]# ./taskmanager.sh start#在node2节点上启动TaskManager[root@node2 ~]# cd /software/flink-1.16.0/bin/[root@node2 bin]# ./taskmanager.sh start
启动两个TaskManager后可以看到Flink WebUI中对应的有2个TaskManager,可以根据自己任务使用资源的情况,手动启动多个TaskManager。
1.4、停止集群
#停止启动的JobManager[root@node1 bin]# ./standalone-job.sh stop#停止启动的TaskManager[root@node1 bin]# ./taskmanager.sh stop[root@node2 bin]# ./taskmanager.sh stop
我们可以以同样的方式在其他节点上以Standalone Application模式提交先的Flink任务,但是每次提交都是当前提交任务独享集群资源。
Standalone Application模式提交任务中提交任务的同时会启动JobManager创建Flink集群,但是需要手动启动TaskManager,这样提交的任务才能正常运行,如果提交的任务使用资源多,还可以启动多个TaskManager。Standalone Application模式提交任务流程如下:
在客户端提交Flink任务的同时启动JobManager,客户端会将任务转换成JobGraph提交给JobManager。Dispatcher会启动JobMaster,Dispatcher将提交任务提交给JobMaster。JobMaster向ResourceManager申请Slot资源。手动启动TaskManager,TaskManager会向ResourceManager注册Slot资源ResourceManager会在对应的TaskManager上划分Slot资源。TaskManager向JobMaster offer Slot资源。JobMaster将任务对应的task发送到TaskManager上执行。Standalone Application模式任务提交流程和Standalone Session模式类似,两者区别主要是Standalone Session模式中启动Flink集群时JobManager、TaskManager、JobMaster会预先启动;Standalone Application模式中提交任务时同时启动集群JobManager、JobMaster,需要手动启动TaskManager。
标签:
仓储物流“成渝圈”如何乘势而上? 12月3日,连接昆明和万象的中老铁路全线开通运营,被惠及的显...
两件西周青铜簋时隔三千年成功配对 考古工作者介绍,这个铜簋的盖、身分别时隔40余年出土,纹饰...
“医保砍价”不是一个人在战斗 晁星 “我眼泪都快掉下来了”“每一个小群体都不该被放弃”…...
“购物成瘾”真的是一种病 刘艳 牛雅娟 本周日即将迎来“双十二”促销季,很多人又开始摩拳...
因迷恋山间风景,一男子在甘孜州稻城县海拔4000多米的无人区迷失方向,随后与同伴失联。12月的稻城...
嫌疑人DNA信息比中后,成都市公安局刑侦支队技术处DNA实验室民警白小刚一下坐在凳子上,恍惚迟疑间...
一批反映南京大屠杀历史的新书发布 新华社南京12月7日电(记者邱冰清、蒋芳)“以史为鉴,开创未来...
我在现场·照片背后的故事|电影《亲爱的》里面没有的结局,在我眼前“上映” 12月6日,在深圳市...
冥想?泡脚?不如听听助眠音乐 晚上睡不着,白天睡不醒,成为最贴合都市人群的“睡眠画像”。随...
养老话题 老年教育面临缺口 “终身教育”潜力无限 【现实挑战】“新老年”群体愿意在培养兴...
孙海洋被拐14年儿子如何找到的? 警方侦办另一宗拐骗儿童案时发现线索,通过人像比对、DNA确认找...
北京天文馆、圆明园将对未成年人免费开放 12月6日,北京天文馆发布通知称,12月8日起试行对未成...
今年全国粮食总产量再创新高 连续7年保持在1 3万亿斤以上 根据对全国31个省(区、市)的抽样调...
斑块软的很危险 硬的就无碍? 血管里的“垃圾”分类 赶快学起来! 一项最新研究显示:中国...
诺西那生钠注射液大幅降价 聚焦医保谈判背后脊髓性肌萎缩症家庭 医保目录公布那天 好多家长都...
抖音“窗花剪剪”遭抄袭 被判获赔20万元 法院认为“窗花剪剪”的这种表达方式理应受到《著作权...
公安机关近日侦破3起拐卖儿童案件 失散十几年 3组家庭终于团圆了 北京青年报记者12月6日从公...
2021年度十大网络用语发布 本报讯(记者 路艳霞)作为年度“汉语盘点”活动最具网络特色的组成部...
北京天文馆向未成年人免费开放 本报讯(记者 牛伟坤)北京天文馆对票价免费及优惠政策作出调整:1...
2021北京百个网红打卡地发布 本报讯(记者 李洋)2021北京网红打卡地推荐榜单昨晚正式发布。自然...