Flink集群部署

Stella981
• 阅读 834

Flink集群部署

Flink集群部署

大数据成神之路:点我去成神之路系列目录^_^

Flink集群部署

1部署方式

一般来讲有三种方式:

  • Local

  • Standalone

  • Flink On Yarn/Mesos/K8s…

2Standalone部署

上一节我们讲了单机模式如何部署启动,这节我们基于CentOS 7虚拟机搭建一个3个节点的集群:

角色分配:

Master: 192.168.246.134Slave: 192.168.246.135Slave: 192.168.246.136192.168.246.134 jobmanager192.168.246.135 taskmanager192.168.246.136 taskmanager

假设三台机器都存在:

用户root 密码为123

192.168.246.134 master192.168.246.135 slave1192.168.246.136 slave2

三台机器首先要做ssh免登,具体方法很简单,可以百度。

下载一个包到本地,这里我选择了1.7.2版本+Hadoop2.8+Scala2.11版本,然后分发到三台机器上:

scp flink-1.7.2-bin-hadoop28-scala_2.11.tgz root@192.168.246.13X:~scp jdk-8u11-linux-x64.tar.gz root@192.168.246.13X:~注意:X代表4、5、6,分发到3台机器修改解压后目录属主:Chown -R  root:root flink/Chown -R root:root jdk8/export JAVA_HOME=/root/jdk8export JRE_HOME=${JAVA_HOME}/jreexport CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/libexport PATH=${JAVA_HOME}/bin:$PATH

分别修改master和slave的flink-conf.yaml文件

Vim flink/conf/flink-conf.yaml##配置master节点ipjobmanager.rpc.address: 192.168.1.100##配置slave节点可用内存,单位MBtaskmanager.heap.mb: 25600##配置每个节点的可用slot,1 核CPU对应 1 slot##the number of available CPUs per machine taskmanager.numberOfTaskSlots: 30##默认并行度 1 slot资源parallelism.default: 1修改slave节点配置文件slaves:192.168.246.135192.168.246.136

启动集群:

##在master节点上执行此脚本,就可以启动集群,前提要保证master节点到slaver节点可以免密登录,##因为它的启动过程是:先在master节点启动jobmanager进程,然后ssh到各slaver节点启动taskmanager进程./bin/start-cluster.sh停止集群:./bin/stop-cluster.sh

3Flink on yarn集群部署

名词解释:指事物的结构形态、运转模型和人们观念的根本性转变过程。

Yarn的简介:

Flink集群部署

ResourceManager

ResourceManager 负责整个集群的资源管理和分配,是一个全局的资源管理系统。 NodeManager 以心跳的方式向 ResourceManager 汇报资源使用情况(目前主要是 CPU 和内存的使用情况)。RM 只接受 NM 的资源回报信息,对于具体的资源处理则交给 NM 自己处理。

NodeManager

NodeManager 是每个节点上的资源和任务管理器,它是管理这台机器的代理,负责该节点程序的运行,以及该节点资源的管理和监控。YARN 集群每个节点都运行一个NodeManager。

NodeManager 定时向 ResourceManager 汇报本节点资源(CPU、内存)的使用情况和Container 的运行状态。当 ResourceManager 宕机时 NodeManager 自动连接 RM 备用节点。

NodeManager 接收并处理来自 ApplicationMaster 的 Container 启动、停止等各种请求。

ApplicationMaster

负责与 RM 调度器协商以获取资源(用 Container 表示)。

将得到的任务进一步分配给内部的任务(资源的二次分配)。

与 NM 通信以启动/停止任务。

监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务

Flink on yarn 集群启动步骤

  • 步骤1 用户向YARN中提交应用程序,其中包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。

  • 步骤2 ResourceManager为该应用程序分配第一个Container,并与对应的Node-Manager通信,要求它在这个Container中启动应用程序的ApplicationMaster。

  • 步骤3 ApplicationMaster首先向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复步骤4~7。

  • 步骤4 ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。

  • 步骤5 一旦ApplicationMaster申请到资源后,便与对应的NodeManager通信,要求它启动任务。

  • 步骤6 NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务。

  • 步骤7 各个任务通过某个RPC协议向ApplicationMaster汇报自己的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。 在应用程序运行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。

  • 步骤8 应用程序运行完成后,ApplicationMaster向ResourceManager注销并关闭自己。

on yarn 集群部署 :

设置Hadoop环境变量:

[root@hadoop2 flink-1.7.2]# vi /etc/profileexport HADOOP_CONF_DIR=这里是你自己的hadoop路径

bin/yarn-session.sh -h 查看使用方法:

bin/yarn-session.sh -hUsage:   Required     -n,--container <arg>   为YARN分配容器的数量 (=Number of Task Managers)   Optional     -D <property=value>             动态属性      -d,--detached                   以分离模式运行作业     -h,--help                       Yarn session帮助.     -id,--applicationId <arg>       连接到一个正在运行的YARN session     -j,--jar <arg>                  Flink jar文件的路径     -jm,--jobManagerMemory <arg>    JobManager的内存大小,driver-memory [in MB]     -m,--jobmanager <arg>           Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.     -n,--container <arg>            TaskManager的数量,相当于executor的数量     -nm,--name <arg>                设置YARN应用自定义名称      -q,--query                      显示可用的YARN资源 (memory, cores)     -qu,--queue <arg>               指定YARN队列     -s,--slots <arg>                每个JobManager的core的数量,executor-cores。建议将slot的数量设置每台机器的处理器数量     -st,--streaming                 在流模式下启动Flink     -t,--ship <arg>                 在指定目录中传送文件(t for transfer)     -tm,--taskManagerMemory <arg>   每个TaskManager的内存大小,executor-memory  [in MB]     -yd,--yarndetached              如果存在,则以分离模式运行作业 (deprecated; use non-YARN specific option instead)     -z,--zookeeperNamespace <arg>   为高可用性模式创建Zookeeper子路径的命名空间

在启动的是可以指定TaskManager的个数以及内存(默认是1G),也可以指定JobManager的内存,但是JobManager的个数只能是一个

我们开启动一个YARN session:

./bin/yarn-session.sh -n 4 -tm 8192 -s 8

上面命令启动了4个TaskManager,每个TaskManager内存为8G且占用了8个核(是每个TaskManager,默认是1个核)。在启动YARN session的时候会加载conf/flink-config.yaml配置文件,我们可以根据自己的需求去修改里面的相关参数.

YARN session启动之后就可以使用bin/flink来启动提交作业:

例如:

./bin/flink run -c com.demo.wangzhiwu.WordCount $DEMO_DIR/target/flink-demo-1.0.SNAPSHOT.jar --port 9000

flink run的用法如下:

用法: run [OPTIONS]

"run" 操作参数:

-c,--class <classname> 如果没有在jar包中指定入口类,则需要在这里通过这个参数指定-m,--jobmanager <host:port> 指定需要连接的jobmanager(主节点)地址使用这个参数可以指定一个不同于配置文件中的jobmanager-p,--parallelism <parallelism> 指定程序的并行度。可以覆盖配置文件中的默认值。

使用run 命令向yarn集群提交一个job。客户端可以确定jobmanager的地址。当然,你也可以通过-m参数指定jobmanager。jobmanager的地址在yarn控制台上可以看到。

值得注意的是:

上面的YARN session是在Hadoop YARN环境下启动一个Flink cluster集群,里面的资源是可以共享给其他的Flink作业。我们还可以在YARN上启动一个Flink作业。这里我们还是使用./bin/flink,但是不需要事先启动YARN session:

./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar \--input hdfs://user/hadoop/input.txt \--output hdfs://user/hadoop/output.txt

上面的命令同样会启动一个类似于YARN session启动的页面。其中的-yn是指TaskManager的个数,必须要指定。

后台运行 yarn session

如果你不希望flink yarn client一直运行,也可以启动一个后台运行的yarn session。使用这个参数:-d 或者 --detached

在这种情况下,flink yarn client将会只提交任务到集群然后关闭自己。注意:在这种情况下,无法使用flink停止yarn session。

必须使用yarn工具来停止yarn session

yarn application -kill <applicationId>

flink on yarn的故障恢复

flink 的 yarn 客户端通过下面的配置参数来控制容器的故障恢复。这些参数可以通过conf/flink-conf.yaml 或者在启动yarn session的时候通过-D参数来指定。

* yarn.reallocate-failed:这个参数控制了flink是否应该重新分配失败的taskmanager容器。默认是true。* yarn.maximum-failed-containers:applicationMaster可以接受的容器最大失败次数,达到这个参数,就会认为yarn session失败。默认这个次数和初始化请求的taskmanager数量相等(-n 参数指定的)。* yarn.application-attempts:applicationMaster重试的次数。如果这个值被设置为1(默认就是1),当application master失败的时候,yarn session也会失败。设置一个比较大的值的话,yarn会尝试重启applicationMaster。

日志文件查看

在某种情况下,flink yarn session 部署失败是由于它自身的原因,用户必须依赖于yarn的日志来进行分析。最有用的就是yarn log aggregation 。启动它,用户必须在yarn-site.xml文件中设置yarn.log-aggregation-enable 属性为true。一旦启用了,用户可以通过下面的命令来查看一个失败的yarn session的所有详细日志。

yarn logs -applicationId <application ID>

Flink集群部署

文章整理在Github点击 原文链接 ,跳到原文观看。

本文分享自微信公众号 - 大数据技术与架构(import_bigdata)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Java修道之路,问鼎巅峰,我辈代码修仙法力齐天
<center<fontcolor00FF7Fsize5face"黑体"代码尽头谁为峰,一见秃头道成空。</font<center<fontcolor00FF00size5face"黑体"编程修真路破折,一步一劫渡飞升。</font众所周知,编程修真有八大境界:1.Javase练气筑基2.数据库结丹3.web前端元婴4.Jav
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
4小时前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(