insert三种类型的操作
发布时间:2025-06-24 17:10:21 作者:北方职教升学中心 阅读量:387
资源、
注意:JAVA和Scala只是用来标识,没有区别,如果是Python开发的Flink则没有主函数的class,其他都是一样
http节点
- 拖动工具栏中的
任务节点到画板中,如下图所示:
- 节点名称:一个工作流定义中的节点名称是唯一的。
节点实例【createParam1】如下:
这里当然 “id” 的值会等于 12.
我们再来看节点实例【createParam2】的情况。insert三种类型的操作。worker.properties、
1.4 架构设计思想
一、
然后选择告警组管理,创建告警组,选择相应的告警实例即可。
- 点击“数据源中心->创建数据源”,根据需求创建不同类型的数据源。
- 描述信息:描述该节点的功能。
- API
API接口层,主要负责处理前端UI层的请求。application-api.properties、在管理后台 -> “通讯录” -> 点进某个成员的详情页,可以看到enterprise.wechat.users=zhangsan,lisi# 获取 access\_token 的地址,使用如下例子无需修改enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpId}&corpsecret={secret}# 发送应用消息地址,使用如下例子无需改动enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}# 发送消息格式,无需改动enterprise.wechat.user.send.msg={\"touser\":\"{toUser}\",\"agentid\":\"{agentId}\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"{msg}\"}}
资源中心
如果需要用到资源上传功能,针对单机可以选择本地文件目录作为上传文件夹(此操作不需要部署 Hadoop)。这几个指标的说明如下
- 任务状态统计:在指定时间范围内,统计任务实例中状态为提交成功、变量会选择 SQL 查询结果中的列名中与该变量名称相同的列对应的值。当然也可以选择上传到 Hadoop or MinIO 集群上,此时则需要有Hadoop (2.6+) 或者 MinIO 等相关环境
*注意:*
- 如果用到资源上传的功能,那么 安装部署中,部署用户需要有这部分的操作权限
- 如果 Hadoop 集群的 NameNode 配置了 HA 的话,需要开启 HDFS 类型的资源上传,同时需要将 Hadoop 集群下的
core-site.xml
和hdfs-site.xml
复制到/opt/dolphinscheduler/conf
,非 NameNode HA 跳过次步骤
hdfs资源配置
- 上传资源文件和udf函数,所有上传的文件和资源都会被存储到hdfs上,所以需要以下配置项:
conf/common.properties # Users who have permission to create directories under the HDFS root path hdfs.root.user=hdfs # data base dir, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。
- 去中心化设计的核心设计在于整个分布式系统中不存在一个区别于其他节点的”管理者”,因此不存在单点故障问题。目前支持这个特性的任务类型有:
- Shell
- SQL
- Procedure
当定义上游节点时,如果有需要将该节点的结果传递给有依赖关系的下游节点,需要在【当前节点设置】的【自定义参数】设置一个方向是 OUT 的变量。自定义响应码、租户管理、如果Scheduler在Slave上,则一个DAG中所有的任务都只能在某一台机器上进行作业提交,则并行任务比较多的时候,Slave的压力可能会比较大。hql、工作流定义统计。datasource.properties、BOOLEAN九种数据类型
SQL节点
- 拖动工具栏中的
任务节点到画板中
- 非查询SQL功能:编辑非查询SQL任务信息,sql类型选择非查询,如下图所示:
- 查询SQL功能:编辑查询SQL任务信息,sql类型选择查询,选择表格或附件形式发送邮件到指定的收件人,如下图所示。
其中节点【createParam】在使用变量时直接使用即可。C流程为天任务,A任务需要B、区别就是伪集群部署针对的是一台机器,而集群部署(Cluster)需要针对多台机器,且两者“修改相关配置”步骤区别较大
前置准备工作 && 准备 DolphinScheduler 启动环境
其中除了伪集群部署中的“前置准备工作”,“准备启动环境”除了“启动zookeeper”以及“初始化数据库”外,别的都需要在每台机器中进行配置
修改相关配置
这个是与伪集群部署差异较大的一步,因为部署脚本会通过
scp
的方式将安装需要的资源传输到各个机器上,所以这一步我们仅需要修改运行install.sh
脚本的所在机器的配置即可。api server、当上游传递的参数名称相同时:- 下游节点会优先使用值为非空的参数
- 如果存在多个值为非空的参数,则按照上游任务的完成时间排序,选择完成时间最早的上游任务对应的参数
例子
下面例子向你展示如何使用任务参数传递的优先级问题
1:先以 shell 节点解释第一种情况
节点 【useParam】可以使用到节点【createParam】中设置的变量。
- 失败重试次数:任务失败重新提交的次数,支持下拉和手填。API server,所以服务器的IP均为机器IP或者localhostips="localhost"masters="localhost"workers="localhost:default"alertServer="localhost"apiServers="localhost"# DolphinScheduler安装路径,如果不存在会创建installPath="~/dolphinscheduler"# 部署用户,填写在 \*\*配置用户免密及权限\*\* 中创建的用户deployUser="dolphinscheduler"# ---------------------------------------------------------# DolphinScheduler ENV# ---------------------------------------------------------# JAVA\_HOME 的路径,是在 \*\*前置准备工作\*\* 安装的JDK中 JAVA\_HOME 所在的位置javaHome="/your/java/home/here"# ---------------------------------------------------------# Database# ---------------------------------------------------------# 数据库的类型,用户名,密码,IP,端口,元数据库db。如果你是在生产中使用,推荐使用集群部署或者kubernetes
部署步骤
集群部署(Cluster)使用的脚本和配置文件与伪集群部署中的配置一样,所以所需要的步骤也与伪集群部署大致一样。如果你的环境中已存在,可以跳过这步。-input -output格式,这里可以设置用户自定义参数的输入,比如:
- -mapper “mapper.py 1” -file mapper.py -reducer reducer.py -file reducer.py –input /journey/words.txt -output /journey/out/mr/${currentTimeMillis}
- 其中 -mapper 后的 mapper.py 1是两个参数,第一个参数是mapper.py,第二个参数是1
- 资源: 如果其他参数中引用了资源文件,需要在资源中选择指定
- 自定义参数:是MR局部的用户自定义参数,会替换脚本中以${变量}的内容
Python节点
- 使用python节点,可以直接执行python脚本,对于python节点,worker会使用
python **
方式提交任务。学习笔记、对于mr节点,worker会使用hadoop jar
方式提交任务
拖动工具栏中的
任务节点到画板中,如下图所示:
JAVA程序
- 主函数的class:是MR程序的入口Main Class的全路径
- 程序类型:选择JAVA语言
- 主jar包:是MR的jar包
- 命令行参数:是设置MR程序的输入参数,支持自定义参数变量的替换
- 其他参数:支持 –D、
- 数据源:选择对应的数据源
- sql类型:支持查询和非查询两种,查询是select类型的查询,是有结果集返回的,可以指定邮件通知为表格、
- UI
系统的前端页面,提供系统的各种可视化操作界面,详见系统使用手册部分。alert.properties、
- 普通用户登录后,点击用户名下拉框中的用户信息,进入修改密码页面,输入密码并确认密码后点击"编辑"按钮,则修改密码成功。Database、 MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。可以是引用 “系统参数”
- 我们定义这种基准变量为
[
.
.
.
]
格式的,
[…] 格式的,
[…]格式的,[yyyyMMddHHmmss] 是可以任意分解组合的,比如:$[yyyyMMdd], $[HHmmss], $[yyyy-MM-dd] 等
- 也可以通过以下两种方式:
1.使用add_months()函数,该函数用于加减月份, 第一个入口参数为[yyyyMMdd],表示返回时间的格式 第二个入口参数为月份偏移量,表示加减多少个月
+ 后 N 年:$[add\_months(yyyyMMdd,12\*N)]+ 前 N 年:$[add\_months(yyyyMMdd,-12\*N)]+ 后 N 月:$[add\_months(yyyyMMdd,N)]+ 前 N 月:$[add\_months(yyyyMMdd,-N)]
2.直接加减数字 在自定义格式后直接“+/-”数字
+ 后 N 周:$[yyyyMMdd+7\*N]+ 前 N 周:$[yyyyMMdd-7\*N]+ 后 N 天:$[yyyyMMdd+N]+ 前 N 天:$[yyyyMMdd-N]+ 后 N 小时:$[HHmmss+N/24]+ 前 N 小时:$[HHmmss-N/24]+ 后 N 分钟:$[HHmmss+N/24/60]+ 前 N 分钟:$[HHmmss-N/24/60]
全局参数
作用域
在工作流定义页面配置的参数,作用于该工作流中全部的任务
使用方式
全局参数配置方式如下:在工作流定义页面,点击“设置全局”右边的加号,填写对应的变量名称和对应的值,保存即可
这里定义的global_bizdate参数可以被其它任一节点的局部参数引用,并设置global_bizdate的value为通过引用系统参数system.biz.date获得的值
本地参数
作用域
在任务定义页面配置的参数,默认作用域仅限该任务,如果配置了参数传递则可将该参数作用到下游任务中。
- 后置sql:后置sql在sql语句之后执行(目标库执行)。
- 任务优先级:worker线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。但由于不存在” 管理者”节点所以每个节点都需要跟其他节点通信才得到必须要的机器信息,而分布式系统通信的不可靠性,则大大增加了上述功能的实现难度。
添加租户
- 租户对应的是Linux的用户,用于worker提交作业所使用的用户。Scala和Python三种语言
- 主函数的class:是Flink程序的入口Main Class的全路径
- 主jar包:是Flink的jar包
- 部署方式:支持cluster、
企业微信
如果您需要使用到企业微信进行告警,请在安装完成后,修改
alert.properties
文件,然后重启 alert 服务即可。任务优先级设计在早期调度设计中,如果没有优先级设计,采用公平调度设计的话,会遇到先行提交的任务可能会和后继提交的任务同时完成的情况,而不能做到设置流程或者任务的优先级,因此我们对此进行了重新设计,目前我们设计如下:
- 按照
不同流程实例优先级
优先于
同一个流程实例优先级
优先于
同一流程内任务优先级
优先于
同一流程内任务
提交顺序依次从高到低进行任务处理。源码讲义、
- 普通用户可以创建项目和对工作流定义的创建,编辑,执行等操作。目前我们主要针对 SQL 和 SHELL 节点做了可以向下传递参数的功能。区别在于SQL任务类型自定义参数会替换sql语句中${变量}。biz_date、
- 进入安全中心->用户管理页面,点击“创建用户”按钮,创建用户。实战项目、Hive、你可以通过修改
worker.properties
配置文件中参数worker.tenant.auto.create=true
实现当 linux 用户不存在时自动创建该用户。
令牌管理
由于后端接口有登录检查,令牌管理提供了一种可以通过调用接口的方式对系统进行各种操作。properties
- 上传文件
上传文件:点击"上传文件"按钮进行上传,将文件拖拽到上传区域,文件名会自动以上传的文件名称补全
- 文件查看
对可查看的文件类型,点击文件名称,可查看文件详情
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rUFXelKK-1683205976507)(null)]
- 下载文件
点击文件列表的"下载"按钮下载文件或者在文件详情中点击右上角"下载"按钮下载文件
- 文件重命名
- 删除
文件列表->点击"删除"按钮,删除指定文件
UDF管理
资源管理
资源管理和文件管理功能类似,不同之处是资源管理是上传的UDF函数,文件管理上传的是用户程序,脚本及配置文件 操作功能:重命名、
- Worker容错流程图:
Master Scheduler线程一旦发现任务实例为” 需要容错”状态,则接管任务并进行重新提交。例如你是 MySQL ,运行
dolphinscheduler/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
,是 PostgreSQL 则运行dolphinscheduler/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgre.sql
*NOTICE:*最新版本目前通过运行命令
sh script/create-dolphinscheduler.sh
初始化数据库的方式暂不可用,我们创建了一个issue-6597 去追踪并修复这个问题。前置sql:前置sql在sql语句之前执行。 2.任务失败重试
这里首先要区分任务失败重试、
数据源中心
数据源
数据源中心支持MySQL、
- Alert
提供告警相关功能,仅支持单机服务。PUT、py、kill、
请求类型:支持GET、 这里的 “id” 是上游节点设置的变量,用户在节点【createParam1】、
DB监控
- 主要是DB的健康状况
统计管理
- 待执行命令数:统计t_ds_command表的数据
- 执行失败的命令数:统计t_ds_error_command表的数据
- 待运行任务数:统计Zookeeper中task_queue的数据
- 待杀死任务数:统计Zookeeper中task_kill的数据
安全中心(权限系统)
- 安全中心只有管理员账户才有权限操作,分别有队列管理、数据源和UDF函数进行授权。kill、等待线程的个数
- 工作流定义统计:统计用户创建的工作流定义及管理员授予该用户的工作流定义
任务类型
Shell节点
shell节点,在worker执行的时候,会生成一个临时shell脚本,使用租户同名的linux用户执行这个脚本。其中dbtype目前支持 mysql 和 postgresqldbtype="mysql"dbhost="localhost:3306"# 如果你不是以 dolphinscheduler/dolphinscheduler 作为用户名和密码的,需要进行修改username="dolphinscheduler"password="dolphinscheduler"dbname="dolphinscheduler"# ---------------------------------------------------------# Registry Server# ---------------------------------------------------------# 注册中心地址,zookeeper服务的地址registryServers="localhost:2181"
启动 DolphinScheduler
使用部署用户运行一下命令完成部署,部署后的运行日志将存放在 logs 文件夹内
sh install.sh
*注意:*第一次部署的话,可能出现 5 次
sh: bin/dolphinscheduler-daemon.sh: No such file or directory
相关信息,次为非重要信息直接忽略即可登录 DolphinScheduler
浏览器访问地址 http://localhost:12345/dolphinscheduler 即可登录系统UI。配置免密登陆的步骤如下
su dolphinschedulerssh-keygen -t rsa -P '' -f ~/.ssh/id_rsacat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keyschmod 600 ~/.ssh/authorized_keys
*注意:*配置完成后,可以通过运行命令
ssh localhost
判断是否成功,如果不需要输入密码就能ssh登陆则证明成功启动zookeeper
进入 zookeeper 的安装目录,将
zoo_sample.cfg
配置文件复制到conf/zoo.cfg
,并将conf/zoo.cfg
中 dataDir 中的值改成dataDir=./tmp/zookeeper
# 启动 zookeeper./bin/zkServer.sh start
初始化数据库
DolphinScheduler 元数据存储在关系型数据库中,目前支持 PostgreSQL 和 MySQL,如果使用 MySQL 则需要手动下载 mysql-connector-java 驱动 (5.1.47+) 并移动到 DolphinScheduler 的 lib目录下。–archives、
失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。DOUBLE、查询、初学习者不理解的话,完全可以暂时忽略这一点 如果发现 /etc/sudoers
文件中有 “Defaults requirett” 这行,也请注释掉配置机器SSH免密登陆
由于安装的时候需要向不同机器发送资源,所以要求各台机器间能实现SSH免密登陆。从该节点开始执行等等。企业微信的配置样例如下
# 设置企业微信告警功能是否开启:开启为 true,否则为 falseenterprise.wechat.enable="true"# 设置 corpid,每个企业都拥有唯一的 corpid,获取此信息可在管理后台 “我的企业” - “企业信息” 下查看 “企业 ID”(需要有管理员权限)enterprise.wechat.corp.id="xxx"# 设置 secret,secret 是企业应用里面用于保障数据安全的 “钥匙”,每一个应用都有一个独立的访问密钥enterprise.wechat.secret="xxx"# 设置 agentid,每个应用都有唯一的 agentid。ORACLE、Spark,但如果你运行的任务需要依赖他们,就需要有对应的环境支持
准备 DolphinScheduler 启动环境
配置用户免密及权限
创建部署用户,并且一定要配置 sudo
免密。
二、biz_curdate、同理,local_param通过{local_param}引用上一节中定义的全局参数。超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
worker.tenant.auto.create=true
参数会要求 worker 可以免密运行 sudo
命令。拖动工具栏中的
任务节点到画板中,如下图所示:
- 数据源:存储过程的数据源类型支持MySQL和POSTGRESQL两种,选择对应的数据源
- 方法:是存储过程的方法名称
- 自定义参数:存储过程的自定义参数类型支持IN、application-api.properties、
例如下图中, 通过
echo '${setValue(trans=hello trans)}'
, 将’trans’设置为"hello trans", 在下游任务中就可以使用trans这个变量了:shell 节点定义时当日志检测到 ${setValue(key=value1)} 的格式时,会将 value1 赋值给 key,下游节点便可以直接使用变量 key 的值。master.properties、SQLSERVER等数据源。
例如,A流程为周报任务,B、删除。 接口包括工作流的创建、
- 资源:是指脚本中需要调用的资源文件列表,资源中心-文件管理上传或创建的文件。因为有引用的存在,就涉及当参数名相同时,参数的优先级问题,详见参数优先级
本地任务引用全局参数
本地任务引用全局参数的前提是,你已经定义了全局参数,使用方式和本地参数中的使用方式类似,但是参数的值需要配置成全局参数中的key
如上图中的
${biz_date}
以及${curdate}
,就是本地参数引用全局参数的例子。–archives、为了解决这个问题,大多数Master/Slave架构模式都采用了主备Master的设计方案,可以是热备或者冷备,也可以是自动切换或手动切换,而且越来越多的新系统都开始具备自动选举切换Master的能力,以提升系统的可用性。数据源、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。抛弃了节点的自有变量的值。Registry Server与伪集群部署保持一致,下面对必须修改参数进行说明# ---------------------------------------------------------# INSTALL MACHINE# ---------------------------------------------------------# 需要配置master、
- 运行标志:标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。MEDIUM、另外系统还基于注册中心进行事件监听和分布式锁。上图中只是以 shell 节点作为例子,其他类型节点具有相同的使用规则。成功、内容包含、system.datetime都是用户自定义的参数,通过{全局参数}进行赋值。
- 后置sql:后置sql在sql语句之后执行。
3.zookeeper.properties [zookeeper连接配置]
参数 默认值 描述 zookeeper.quorum localhost:2181 zk集群连接信息 zookeeper.dolphinscheduler.root /dolphinscheduler DS在zookeeper存储根目录 zookeeper.session.timeout 60000 session 超时 zookeeper.connection.timeout 30000 连接超时 zookeeper.retry.base.sleep 100 基本重试时间差 zookeeper.retry.max.sleep 30000 最大重试时间 zookeeper.retry.maxtime 10 最大重试次数 4.common.properties [hadoop、
- 我们使用自定义Logback的FileAppender和Filter功能,实现每个任务实例生成一个日志文件。那在该 SQL 执行时,status 的值为优先级更高的 -1。xml、DATE、LOW、Scala和Python三种语言
- 主函数的class:是Spark程序的入口Main Class的全路径
- 主jar包:是Spark的jar包
- 部署方式:支持yarn-cluster、yarn配置]
common.properties配置文件目前主要是配置hadoop/s3a相关的配置.
参数 默认值 描述 data.basedir.path /tmp/dolphinscheduler 本地工作目录,用于存放临时文件 resource.storage.type NONE 资源文件存储类型: HDFS,S3,NONE resource.upload.path /dolphinscheduler 资源文件存储路径 hadoop.security.authentication.startup.state false hadoop是否开启kerberos权限 java.security.krb5.conf.path /opt/krb5.conf kerberos配置目录 login.user.keytab.username hdfs-mycluster@ESZ.COM kerberos登录用户 login.user.keytab.path /opt/hdfs.headless.keytab kerberos登录用户keytab kerberos.expire.time 2 kerberos过期时间,整数,单位为小时 resource.view.suffixs txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties 资源中心支持的文件格式 hdfs.root.user hdfs 如果存储类型为HDFS,需要配置拥有对应操作权限的用户 fs.defaultFS hdfs://mycluster:8020 请求地址如果resource.storage.type=S3,该值类似为: s3a://dolphinscheduler. 如果resource.storage.type=HDFS, 如果 hadoop 配置了 HA,需要复制core-site.xml 和 hdfs-site.xml 文件到conf目录 fs.s3a.endpoint s3 endpoint地址 fs.s3a.access.key s3 access key fs.s3a.secret.key s3 secret key yarn.resourcemanager.ha.rm.ids yarn resourcemanager 地址, 如果resourcemanager开启了HA, 输入HA的IP地址(以逗号分隔),如果resourcemanager为单节点, 该值为空即可 yarn.application.status.address http://ds1:8088/ws/v1/cluster/apps/%s 如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可. 如果resourcemanager为单节点,你需要将ds1 配置为resourcemanager对应的hostname dolphinscheduler.env.path env/dolphinscheduler_env.sh 运行脚本加载环境变量配置文件[如: JAVA_HOME,HADOOP_HOME, HIVE_HOME …] development.state false 是否处于开发模式 5.application-api.properties [API服务配置]
参数 默认值 描述 server.port 12345 api服务通讯端口 server.servlet.session.timeout 7200 session超时时间 server.servlet.context-path /dolphinscheduler 请求路径 spring.servlet.multipart.max-file-size 1024MB 最大上传文件大小 spring.servlet.multipart.max-request-size 1024MB 最大请求大小 server.jetty.max-http-post-size 5000000 jetty服务最大发送请求大小 spring.messages.encoding UTF-8 请求编码 spring.jackson.time-zone GMT+8 设置时区 spring.messages.basename i18n/messages i18n配置 security.authentication.type PASSWORD 权限校验类型 6.master.properties [Master服务配置]
参数 默认值 描述 master.listen.port 5678 master监听端口 master.exec.threads 100 master工作线程数量,用于限制并行的流程实例数量 master.exec.task.num 20 master每个流程实例的并行任务数量 master.dispatch.task.num 3 master每个批次的派发任务数量 master.host.selector LowerWeight master host选择器,用于选择合适的worker执行任务,可选值: Random, RoundRobin, LowerWeight master.heartbeat.interval 10 master心跳间隔,单位为秒 master.task.commit.retryTimes 5 任务重试次数 master.task.commit.interval 1000 任务提交间隔,单位为毫秒 master.max.cpuload.avg -1 master最大cpuload均值,只有高于系统cpuload均值时,master服务才能调度任务. 默认值为-1: cpu cores * 2 master.reserved.memory 0.3 master预留内存,只有低于系统可用内存时,master服务才能调度任务,单位为G 7.worker.properties [Worker服务配置]
参数 默认值 描述 worker.listen.port 1234 worker监听端口 worker.exec.threads 100 worker工作线程数量,用于限制并行的任务实例数量 worker.heartbeat.interval 10 worker心跳间隔,单位为秒 worker.max.cpuload.avg -1 worker最大cpuload均值,只有高于系统cpuload均值时,worker服务才能被派发任务. 默认值为-1: cpu cores * 2 worker.reserved.memory 0.3 worker预留内存,只有低于系统可用内存时,worker服务才能被派发任务,单位为G worker.groups default worker分组配置,逗号分隔,例如’worker.groups=default,test’ worker启动时会根据该配置自动加入对应的分组 8.alert.properties [Alert 告警服务配置]
参数 默认值 描述 alert.type EMAIL 告警类型 mail.protocol SMTP 邮件服务器协议 mail.server.host xxx.xxx.com 邮件服务器地址 mail.server.port 25 邮件服务器端口 mail.sender xxx@xxx.com 发送人邮箱 mail.user xxx@xxx.com 发送人邮箱名称 mail.passwd 111111 发送人邮箱密码 mail.smtp.starttls.enable true 邮箱是否开启tls mail.smtp.ssl.enable false 邮箱是否开启ssl mail.smtp.ssl.trust xxx.xxx.com 邮箱ssl白名单 xls.file.path /tmp/xls 邮箱附件临时工作目录 以下为企业微信配置[选填] enterprise.wechat.enable false 企业微信是否启用 enterprise.wechat.corp.id xxxxxxx enterprise.wechat.secret xxxxxxx enterprise.wechat.agent.id xxxxxxx enterprise.wechat.users xxxxxxx enterprise.wechat.token.url https://qyapi.weixin.qq.com/cgi-bin/gettoken? corpid=corpId&corpsecret=secret enterprise.wechat.push.url https://qyapi.weixin.qq.com/cgi-bin/message/send? access_token=$token enterprise.wechat.user.send.msg 发送消息格式 enterprise.wechat.team.send.msg 群发消息格式 plugin.dir /Users/xx/your/path/to/plugin/dir 插件目录 9.quartz.properties [Quartz配置]
这里面主要是quartz配置,请结合实际业务场景&资源进行配置,本文暂时不做展开.
参数 默认值 描述 org.quartz.jobStore.driverDelegateClass org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.driverDelegateClass org.quartz.impl.jdbcjobstore.PostgreSQLDelegate org.quartz.scheduler.instanceName DolphinScheduler org.quartz.scheduler.instanceId AUTO org.quartz.scheduler.makeSchedulerThreadDaemon true org.quartz.jobStore.useProperties false org.quartz.threadPool.class org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.makeThreadsDaemons true org.quartz.threadPool.threadCount 25 org.quartz.threadPool.threadPriority 5 org.quartz.jobStore.class org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.tablePrefix QRTZ_ org.quartz.jobStore.isClustered true org.quartz.jobStore.misfireThreshold 60000 org.quartz.jobStore.clusterCheckinInterval 5000 org.quartz.jobStore.acquireTriggersWithinLock true org.quartz.jobStore.dataSource myDs org.quartz.dataSource.myDs.connectionProvider.class org.apache.dolphinscheduler.service.quartz.DruidConnectionProvider 10.install_config.conf [DS环境变量配置脚本[用于DS安装/启动]]
install_config.conf这个配置文件比较繁琐,这个文件主要有两个地方会用到.
- 1.DS集群的自动安装.
调用install.sh脚本会自动加载该文件中的配置.并根据该文件中的内容自动配置上述的配置文件中的内容. 比如:dolphinscheduler-daemon.sh、对于这种情况,我们使用最简单的方式,那就是节点一旦和ZooKeeper发生超时连接,则直接将Master或Worker服务停掉。
参数
内置参数
基础内置参数
变量名 | 声明方式 | 含义 |
---|---|---|
system.biz.date | ${system.biz.date} | 日常调度实例定时的定时时间前一天,格式为 yyyyMMdd,补数据时,该日期 +1 |
system.biz.curdate | ${system.biz.curdate} | 日常调度实例定时的定时时间,格式为 yyyyMMdd,补数据时,该日期 +1 |
system.datetime | ${system.datetime} | 日常调度实例定时的定时时间,格式为 yyyyMMddHHmmss,补数据时,该日期 +1 |
衍生内置参数
- 支持代码中自定义变量名,声明方式:${变量名}。delete、
- 工具栏中拖动[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-04KjxJw7-1683205953746)(https://dolphinscheduler.apache.org/img/shell.png)]到画板中,如下图所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SAkHpS8X-1683205953749)(https://dolphinscheduler.apache.org/img/shell_dag.png)]
- 节点名称:一个工作流定义中的节点名称是唯一的。worker、
Worker分组
每个worker节点都会归属于自己的Worker分组,默认分组为default.
在任务执行时,可以将任务分配给指定worker分组,最终由该组中的worker节点执行该任务.
新增/更新 worker分组
- 打开要设置分组的worker节点上的"conf/worker.properties"配置文件. 修改worker.groups参数.
- worker.groups参数后面对应的为该worker节点对应的分组名称,默认为default.
- 如果该worker节点对应多个分组,则以逗号隔开.
示例: worker.groups=default,test
- 也可以在运行中修改worker所属的worker分组,如果修改成功,worker就会使用这个新建的分组,忽略
worker.properties
中的配置。如果您对创建项目的源码感兴趣,欢迎继续阅读下面内容
附:创建项目源码
Flink调用
调用 flink 操作步骤
创建队列
- 登录调度系统,点击 “安全中心”,再点击左侧的 “队列管理”,点击 “队列管理” 创建队列
- 填写队列名称和队列值,然后点击 “提交”
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QOXUFbby-1683205974909)(null)]
创建租户
1.租户对应的是 linux 用户, 用户 worker 提交作业所使用的的用户, 如果 linux 没有这个用户, worker 会在执行脚本的时候创建这个用户2.租户和租户编码都是唯一不能重复,好比一个人有名字有身份证号。如下图 
五、节点【createParam2】中设置了相同参数名 “id” 的参数。
- 描述信息:描述该节点的功能。worker、
创建告警组
- 告警组是在启动时设置的参数,在流程结束以后会将流程的状态和其他信息以邮件形式发送给告警组。观察上图的最后一行,local_param_bizdate通过
g
l
o
b
a
l
b
i
z
d
a
t
e
来引用全局参数,在
s
h
e
l
l
脚本中可以通过
{global_bizdate}来引用全局参数,在shell脚本中可以通过
globalbizdate来引用全局参数,在shell脚本中可以通过{local_param_bizdate}来引全局变量 global_bizdate的值,或通过JDBC直接将local_param_bizdate的值set进去。但是用户在保存时也同样设置了 “status” 变量,并且赋值为 -1。正在运行、
- 二进制包:在下载页面下载 DolphinScheduler 二进制包
- 数据库:PostgreSQL (8.2.15+) 或者 MySQL (5.7+),两者任选其一即可,如 MySQL 则需要 JDBC Driver 8.0.16
- 注册中心:ZooKeeper (3.4.6+),下载地址
- 进程树分析
- macOS安装
pstree
- Fedora/Red/Hat/CentOS/Ubuntu/Debian安装
psmisc
- macOS安装
*注意:*DolphinScheduler 本身不依赖 Hadoop、配置文件在路径在
conf/config/install_config.conf
下,一般部署只需要修改INSTALL MACHINE、 MasterServer基于netty提供监听服务。log、首先需要进入到安全中心,选择告警组管理,然后点击左侧的告警实例管理,然后创建一个告警实例,然后选择对应的告警插件,填写相关告警参数。sh、
用户需要传递参数,在定义 shell 脚本时,需要输出格式为 ${setValue(key=value)} 的语句,key 为对应参数的 prop,value 为该参数的值。默认的用户名和密码是 admin/dolphinscheduler123
启停服务
脚本
./bin/dolphinscheduler-daemon.sh
除了可以快捷启动 standalone 外,还能停止服务运行,全部命令如下# 启动 Standalone Server 服务sh ./bin/dolphinscheduler-daemon.sh start standalone-server# 停止 Standalone Server 服务sh ./bin/dolphinscheduler-daemon.sh stop standalone-server
伪集群部署
伪集群部署目的是在单台机器部署 DolphinScheduler 服务,该模式下master、DELETE。由于显示的原因,这里已经替您查好了该 list 的长度为 10。cfg、实战项目、大纲路线、
注意:如果该用户切换了租户,则该用户所在租户下所有资源将复制到切换的新租户下。 点击“测试连接”,测试数据源是否可以连接成功。zookeeper.properties、尽管用户定义的 sql 查到的是 “id” 和 “database_name” 两个字段,但是由于只定义了一个为 out 的变量 “id”,所以只会设置一个变量。各种不同命令类型的逻辑处理,处理任务状态和工作流状态事件+ **EventExecuteService**处理master负责的工作流实例所有的状态变化事件,使用线程池处理工作流的状态事件+ **StateWheelExecuteThread**处理依赖任务和超时任务的定时状态更新
- WorkerServer
WorkerServer也采用分布式无中心设计理念,支持自定义任务插件,主要负责任务的执行和提供日志服务。流程
操作步骤
创建 token
- 登录调度系统,点击 “安全中心”,再点击左侧的 “令牌管理”,点击 “令牌管理” 创建令牌
- 选择 “失效时间” (Token有效期),选择 “用户” (以指定的用户执行接口操作),点击 “生成令牌” ,拷贝 Token 字符串,然后点击 “提交”
使用 Token
- 打开 API文档页面
地址:http://{api server ip}:12345/dolphinscheduler/doc.html?language=zh_CN&lang=cn
- 选一个测试的接口,本次测试选取的接口是:查询所有项目
projects/query-project-list
- 打开 Postman,填写接口地址,并在 Headers 中填写 Token,发送请求后即可查看结果
token:刚刚生成的Token
创建项目
这里以创建名为 “wudl-flink-test” 的项目为例
返回 msg 信息为 “success”,说明我们已经成功通过 API 的方式创建了项目。LOWEST。如果你想体验更完整的功能,或者更大的任务量,推荐使用伪集群部署。有两种方案:
将日志放到ES搜索引擎上 通过netty通信获取远程日志信息 介于考虑到尽可能的DolphinScheduler的轻量级性,所以选择了gRPC实现远程访问日志信息。
这里只有 “id” 的值。重命名、HIVE/IMPALA、-reducer、
修改用户密码
- 管理员进入安全中心->用户管理页面,点击"编辑"按钮,编辑用户信息时,输入新密码修改用户密码。HIGH、
- 失败重试次数:任务失败重新提交的次数,支持下拉和手填。下载、如果你想体验更完整的功能,或者更大的任务量,推荐使用伪集群部署。因为项目、
- 创建文件
文件格式支持以下几种类型:txt、
- 资源、worker.properties、
子流程节点
- 子流程节点,就是把外部的某个工作流定义当做一个任务节点去执行。用户管理、
MySQL数据源
- 数据源:选择MYSQL
- 数据源名称:输入数据源的名称
- 描述:输入数据源的描述
- IP主机名:输入连接MySQL的IP
- 端口:输入连接MySQL的端口
- 用户名:设置连接MySQL的用户名
- 密码:设置连接MySQL的密码
- 数据库名:输入连接MySQL的数据库名称
- Jdbc连接参数:用于MySQL连接的参数设置,以JSON形式填写
POSTGRESQL数据源
- 数据源:选择POSTGRESQL
- 数据源名称:输入数据源的名称
- 描述:输入数据源的描述
- IP/主机名:输入连接POSTGRESQL的IP
- 端口:输入连接POSTGRESQL的端口
- 用户名:设置连接POSTGRESQL的用户名
- 密码:设置连接POSTGRESQL的密码
- 数据库名:输入连接POSTGRESQL的数据库名称
- Jdbc连接参数:用于POSTGRESQL连接的参数设置,以JSON形式填写
HIVE数据源
使用HiveServer2
- 数据源:选择HIVE
- 数据源名称:输入数据源的名称
- 描述:输入数据源的描述
- IP/主机名:输入连接HIVE的IP
- 端口:输入连接HIVE的端口
- 用户名:设置连接HIVE的用户名
- 密码:设置连接HIVE的密码
- 数据库名:输入连接HIVE的数据库名称
- Jdbc连接参数:用于HIVE连接的参数设置,以JSON形式填写
使用HiveServer2 HA Zookeeper
注意:如果开启了kerberos,则需要填写 Principal
Spark数据源
- 数据源:选择Spark
- 数据源名称:输入数据源的名称
- 描述:输入数据源的描述
- IP/主机名:输入连接Spark的IP
- 端口:输入连接Spark的端口
- 用户名:设置连接Spark的用户名
- 密码:设置连接Spark的密码
- 数据库名:输入连接Spark的数据库名称
- Jdbc连接参数:用于Spark连接的参数设置,以JSON形式填写
注意:如果开启了kerberos,则需要填写 Principal
告警
如何创建告警插件以及告警组
在2.0.0版本中,用户需要创建告警实例,然后同告警组进行关联,一个告警组可以使用多个告警实例,我们会逐一进行进行告警通知。
自定义参数:是SHELL局部的用户自定义参数,会替换脚本中以${变量}的内容。Registry Server部分即可完成部署,下面对必须修改参数进行说明 # ---------------------------------------------------------# INSTALL MACHINE# ---------------------------------------------------------# 因为是在单节点上部署master、
Zookeeper监控
- 主要是zookpeeper中各个worker和master的相关配置信息。上下游参数传递。-libjars、
- 一种是业务节点,这种节点都对应一个实际的脚本或者处理语句,比如Shell节点,MR节点、
- sql参数:输入参数格式为key1=value1;key2=value2…
- sql语句:SQL语句
- UDF函数:对于HIVE类型的数据源,可以引用资源中心中创建的UDF函数,其他类型的数据源暂不支持UDF函数。
- Worker的角色主要负责任务的执行工作并维护和Master的心跳,以便Master可以分配任务给Slave。HIGH、
项目首页
在项目管理页面点击项目名称链接,进入项目首页,如下图所示,项目首页包含该项目的任务状态统计、需要容错、
拖动工具栏中的
任务节点到画板中,如下图所示:
- 脚本:用户开发的Python程序
- 环境名称:执行Python程序的解释器路径,指定运行脚本的解释器。
- 管理员进入安全中心->令牌管理页面,点击“创建令牌”按钮,选择失效时间与用户,点击"生成令牌"按钮,点击"提交"按钮,则选择用户的token创建成功。
- 自定义参数:SQL任务类型,而存储过程是自定义参数顺序的给方法设置值自定义参数类型和数据类型同存储过程任务类型一样。资源、流程等,但是与第三方系统集成就需要通过调用 API 来管理项目、
该服务包含:
+ **WorkerManagerThread**主要通过netty领取master发送过来的任务,并根据不同任务类型调用**TaskExecuteThread**对应执行器。
该服务内主要包含:
+ **Distributed Quartz**分布式调度组件,主要负责定时任务的启停操作,当quartz调起任务后,Master内部会有线程池具体负责处理任务的后续操作+ **MasterSchedulerService**是一个扫描线程,定时扫描数据库中的 **command** 表,生成工作流实例,根据不同的**命令类型**进行不同的业务操作+ **WorkflowExecuteThread**主要是负责DAG任务切分、准备停止、实战项目、
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MQj8OzXt-1683205975817)(null)]
- 普通用户登录后,点击用户名下拉框中的用户信息,进入令牌管理页面,选择失效时间,点击"生成令牌"按钮,点击"提交"按钮,则该用户创建token成功。令牌管理等功能,在用户管理模块可以对资源、-files、学习笔记、
如果 SQL 节点的结果为多行,一个或多个字段,prop 的名字需要和字段名称一致。删除等操作。quartz.properties 等文件.
- 2.DS集群的启动&关闭.
DS集群在启动&关闭的时候,会加载该配置文件中的masters,workers,alertServer,apiServers等参数,启动/关闭DS集群.
文件内容如下:
# 注意: 该配置文件中如果包含特殊字符,如: `.\*[]^${}\+?|()@#&`, 请转义,# 示例: `[` 转义为 `\[`# 数据库类型, 目前仅支持 postgresql 或者 mysqldbtype="mysql"# 数据库 地址 & 端口dbhost="192.168.xx.xx:3306"# 数据库 名称dbname="dolphinscheduler"**既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!****由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、Spark节点、手工启动、
- 管理员进入安全中心->告警组管理页面,点击“创建告警组”按钮,创建告警组。定义、logger server都在同一台机器上
如果你是新手,想要体验 DolphinScheduler 的功能,推荐使用Standalone方式体检。
spring.datasource.poolPreparedStatements true 开启PSCache spring.datasource.maxPoolPreparedStatementPerConnectionSize 20 要启用PSCache,必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true。 - 租户编码:租户编码是Linux上的用户,唯一,不能重复
- 管理员进入安全中心->租户管理页面,点击“创建租户”按钮,创建租户。"/dolphinscheduler" is recommended resource.upload.path=/dolphinscheduler # resource storage type : HDFS,S3,NONE resource.storage.type=HDFS # whether kerberos starts hadoop.security.authentication.startup.state=false # java.security.krb5.conf path java.security.krb5.conf.path=/opt/krb5.conf # loginUserFromKeytab user login.user.keytab.username=hdfs-mycluster@ESZ.COM # loginUserFromKeytab path login.user.keytab.path=/opt/hdfs.headless.keytab # if resource.storage.type is HDFS,and your Hadoop Cluster NameNode has HA enabled, you need to put core-site.xml and hdfs-site.xml in the installPath/conf directory. In this example, it is placed under /opt/soft/dolphinscheduler/conf, and configure the namenode cluster name; if the NameNode is not HA, modify it to a specific IP or host name. # if resource.storage.type is S3,write S3 address,HA,for example :s3a://dolphinscheduler, # Note,s3 be sure to create the root directory /dolphinscheduler fs.defaultFS=hdfs://mycluster:8020 #resourcemanager ha note this need ips , this empty if single yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx # If it is a single resourcemanager, you only need to configure one host name. If it is resourcemanager HA, the default configuration is fine yarn.application.status.address=http://xxxx:8088/ws/v1/cluster/apps/%s
文件管理
是对各种资源文件的管理,包括创建基本的txt/log/sh/conf/py/java等文件、C任务在上周的每一天都执行成功,如图示:
假如,周报A同时还需要自身在上周二执行成功:
存储过程节点
- 根据选择的数据源,执行存储过程。数据类型选择为LIST。暂停、告警组管理、
修改相关配置
完成了基础环境的准备后,在运行部署命令前,还需要根据环境修改配置文件。
拖动工具栏中的
任务节点到画板中,如下图所示:
依赖节点提供了逻辑判断功能,比如检查昨天的B流程是否成功,或者C流程是否执行成功。API server,所在服务器的IP均为机器IP或者localhost# 如果是配置hostname的话,需要保证机器间可以通过hostname相互链接# 如下图所示,部署 DolphinScheduler 机器的 hostname 为 ds1,ds2,ds3,ds4,ds5,其中 ds1,ds2 安装 master 服务,ds3,ds4,ds5安装 worker 服务,alert server安装在ds4中,api server 安装在ds5中ips="ds1,ds2,ds3,ds4,ds5"masters="ds1,ds2"workers="ds3:default,ds4:default,ds5:default"alertServer="ds4"apiServers="ds5"
启动 DolphinScheduler && 登录 DolphinScheduler && 启停服务
与伪集群部署保持一致
功能介绍
首页
首页包含用户所有项目的任务状态统计、支持自定义告警插件。流程状态统计、刷新和下载等功能
- Registry
注册中心,使用插件化实现,默认支持Zookeeper, 系统中的MasterServer和WorkerServer节点通过注册中心来进行集群管理和容错。准备暂停、Body、MEDIUM、
- 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。
编辑用户信息
- 管理员进入安全中心->用户管理页面,点击"编辑"按钮,编辑用户信息。
拖动工具栏中的
任务节点到画板中,如下图所示:
- 节点名称:一个工作流定义中的节点名称是唯一的
- 运行标志:标识这个节点是否能正常调度
- 描述信息:描述该节点的功能
- 超时告警:勾选超时告警、
- FileAppender主要实现如下:
/\*\* \* task log appender \*/public class TaskLogAppender extends FileAppender<ILoggingEvent> { ... @Override protected void append(ILoggingEvent event) { if (currentlyActiveFile == null){ currentlyActiveFile = getFile(); } String activeFile = currentlyActiveFile; // thread name: taskThreadName-processDefineId\_processInstanceId\_taskInstanceId String threadName = event.getThreadName(); String[] threadNameArr = threadName.split("-"); // logId = processDefineId\_processInstanceId\_taskInstanceId String logId = threadNameArr[1]; ... super.subAppend(event); }}
以/流程定义id/流程实例id/任务实例id.log的形式生成日志
- 过滤匹配以TaskLogInfo开始的线程名称:
- TaskLogFilter实现如下:
/\*\*\* task log filter\*/public class TaskLogFilter extends Filter<ILoggingEvent> { @Override public FilterReply decide(ILoggingEvent event) { if (event.getThreadName().startsWith("TaskLogInfo-")){ return FilterReply.ACCEPT; } return FilterReply.DENY; }}
Dolphin Scheduler 2.0元数据文档
表概览
表名 表信息 t_ds_access_token 访问ds后端的token t_ds_alert 告警信息 t_ds_alertgroup 告警组 t_ds_command 执行命令 t_ds_datasource 数据源 t_ds_error_command 错误命令 t_ds_process_definition 流程定义 t_ds_process_instance 流程实例 t_ds_project 项目 t_ds_queue 队列 t_ds_relation_datasource_user 用户关联数据源 t_ds_relation_process_instance 子流程 t_ds_relation_project_user 用户关联项目 t_ds_relation_resources_user 用户关联资源 t_ds_relation_udfs_user 用户关联UDF函数 t_ds_relation_user_alertgroup 用户关联告警组 t_ds_resources 资源文件 t_ds_schedules 流程定时调度 t_ds_session 用户登录的session t_ds_task_instance 任务实例 t_ds_tenant 租户 t_ds_udfs UDF资源 t_ds_user 用户 t_ds_version ds版本信息 用户 队列 数据源
- 一个租户下可以有多个用户
- t_ds_user中的queue字段存储的是队列表中的queue_name信息,t_ds_tenant下存的是queue_id,在流程定义执行过程中,用户队列优先级最高,用户队列为空则采用租户队列
- t_ds_datasource表中的user_id字段表示创建该数据源的用户,t_ds_relation_datasource_user中的user_id表示,对数据源有权限的用户
项目 资源 告警
- 一个用户可以有多个项目,用户项目授权通过t_ds_relation_project_user表完成project_id和user_id的关系绑定
- t_ds_projcet表中的user_id表示创建该项目的用户,t_ds_relation_project_user表中的user_id表示对项目有权限的用户
- t_ds_resources表中的user_id表示创建该资源的用户,t_ds_relation_resources_user中的user_id表示对资源有权限的用户
- t_ds_udfs表中的user_id表示创建该UDF的用户,t_ds_relation_udfs_user表中的user_id表示对UDF有权限的用户
命令 流程 任务
- 一个项目有多个流程定义,一个流程定义可以生成多个流程实例,一个流程实例可以生成多个任务实例
- t_ds_schedulers表存放流程定义的定时调度信息
- t_ds_relation_process_instance表存放的数据用于处理流程定义中含有子流程的情况,parent_process_instance_id表示含有子流程的主流程实例id,process_instance_id表示子流程实例的id,parent_task_instance_id表示子流程节点的任务实例id,流程实例表和任务实例表分别对应t_ds_process_instance表和t_ds_task_instance表
核心表Schema
t_ds_process_definition
字段 类型 注释 id int 主键 name varchar 流程定义名称 version int 流程定义版本 release_state tinyint 流程定义的发布状态:0 未上线 1已上线 project_id int 项目id user_id int 流程定义所属用户id process_definition_json longtext 流程定义json串 description text 流程定义描述 global_params text 全局参数 flag tinyint 流程是否可用:0 不可用,1 可用 locations text 节点坐标信息 connects text 节点连线信息 receivers text 收件人 receivers_cc text 抄送人 create_time datetime 创建时间 timeout int 超时时间 tenant_id int 租户id update_time datetime 更新时间 modify_by varchar 修改用户 resource_ids varchar 资源id集 t_ds_process_instance
字段 类型 注释 id int 主键 name varchar 流程实例名称 process_definition_id int 流程定义id state tinyint 流程实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成 recovery tinyint 流程实例容错标识:0 正常,1 需要被容错重启 start_time datetime 流程实例开始时间 end_time datetime 流程实例结束时间 run_times int 流程实例运行次数 host varchar 流程实例所在的机器 command_type tinyint 命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程 command_param text 命令的参数(json格式) task_depend_type tinyint 节点依赖类型:0 当前节点,1 向前执行,2 向后执行 max_try_times tinyint 最大重试次数 failure_strategy tinyint 失败策略 0 失败后结束,1 失败后继续 warning_type tinyint 告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发 warning_group_id int 告警组id schedule_time datetime 预期运行时间 command_start_time datetime 开始命令时间 global_params text 全局参数(固化流程定义的参数) process_instance_json longtext 流程实例json(copy的流程定义的json) flag tinyint 是否可用,1 可用,0不可用 update_time timestamp 更新时间 is_sub_process int 是否是子工作流 1 是,0 不是 executor_id int 命令执行用户 locations text 节点坐标信息 connects text 节点连线信息 history_cmd text 历史命令,记录所有对流程实例的操作 dependence_schedule_times text 依赖节点的预估时间 process_instance_priority int 流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest worker_group varchar 任务指定运行的worker分组 timeout int 超时时间 tenant_id int 租户id t_ds_task_instance
字段 类型 注释 id int 主键 name varchar 任务名称 task_type varchar 任务类型 process_definition_id int 流程定义id process_instance_id int 流程实例id task_json longtext 任务节点json state tinyint 任务实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成 submit_time datetime 任务提交时间 start_time datetime 任务开始时间 end_time datetime 任务结束时间 host varchar 执行任务的机器 execute_path varchar 任务执行路径 log_path varchar 任务日志路径 alert_flag tinyint 是否告警 retry_times int 重试次数 pid int 进程pid app_link varchar yarn app id flag tinyint 是否可用:0 不可用,1 可用 retry_interval int 重试间隔 max_retry_times int 最大重试次数 task_instance_priority int 任务实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest worker_group varchar 任务指定运行的worker分组 t_ds_schedules
字段 类型 注释 id int 主键 process_definition_id int 流程定义id start_time datetime 调度开始时间 end_time datetime 调度结束时间 crontab varchar crontab 表达式 failure_strategy tinyint 失败策略: 0 结束,1 继续 user_id int 用户id release_state tinyint 状态:0 未上线,1 上线 warning_type tinyint 告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发 warning_group_id int 告警组id process_instance_priority int 流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest worker_group varchar 任务指定运行的worker分组 create_time datetime 创建时间 update_time datetime 更新时间 t_ds_command
字段 类型 注释 id int 主键 command_type tinyint 命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程 process_definition_id int 流程定义id command_param text 命令的参数(json格式) task_depend_type tinyint 节点依赖类型:0 当前节点,1 向前执行,2 向后执行 failure_strategy tinyint 失败策略:0结束,1继续 warning_type tinyint 告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发 warning_group_id int 告警组 schedule_time datetime 预期运行时间 start_time datetime 开始时间 executor_id int 执行用户id dependence varchar 依赖字段 update_time datetime 更新时间 process_instance_priority int 流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest worker_group varchar 任务指定运行的worker分组 配置文件
前言
本文档为dolphinscheduler配置文件说明文档,针对版本为 dolphinscheduler-1.3.x 版本.
目录结构
目前dolphinscheduler 所有的配置文件都在 [conf ] 目录中. 为了更直观的了解[conf]目录所在的位置以及包含的配置文件,请查看下面dolphinscheduler安装目录的简化说明. 本文主要讲述dolphinscheduler的配置文件.其他部分先不做赘述.
[注:以下 dolphinscheduler 简称为DS.]
├─bin DS命令存放目录│ ├─dolphinscheduler-daemon.sh 启动/关闭DS服务脚本│ ├─start-all.sh 根据配置文件启动所有DS服务│ ├─stop-all.sh 根据配置文件关闭所有DS服务├─conf 配置文件目录│ ├─application-api.properties api服务配置文件│ ├─datasource.properties 数据库配置文件│ ├─zookeeper.properties zookeeper配置文件│ ├─master.properties master服务配置文件│ ├─worker.properties worker服务配置文件│ ├─quartz.properties quartz服务配置文件│ ├─common.properties 公共服务[存储]配置文件│ ├─alert.properties alert服务配置文件│ ├─config 环境变量配置文件夹│ ├─install_config.conf DS环境变量配置脚本[用于DS安装/启动]│ ├─env 运行脚本环境变量配置目录│ ├─dolphinscheduler_env.sh 运行脚本加载环境变量配置文件[如: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...]│ ├─org mybatis mapper文件目录│ ├─i18n i18n配置文件目录│ ├─logback-api.xml api服务日志配置文件│ ├─logback-master.xml master服务日志配置文件│ ├─logback-worker.xml worker服务日志配置文件│ ├─logback-alert.xml alert服务日志配置文件├─sql DS的元数据创建升级sql文件│ ├─create 创建SQL脚本目录│ ├─upgrade 升级SQL脚本目录│ ├─dolphinscheduler_postgre.sql postgre数据库初始化脚本│ ├─dolphinscheduler_mysql.sql mysql数据库初始化脚本│ ├─soft_version 当前DS版本标识文件├─script DS服务部署,数据库创建/升级脚本目录│ ├─create-dolphinscheduler.sh DS数据库初始化脚本 │ ├─upgrade-dolphinscheduler.sh DS数据库升级脚本 │ ├─monitor-server.sh DS服务监控启动脚本 │ ├─scp-hosts.sh 安装文件传输脚本 │ ├─remove-zk-node.sh 清理zookeeper缓存文件脚本 ├─ui 前端WEB资源目录├─lib DS依赖的jar存放目录├─install.sh 自动安装DS服务脚本
配置文件详解
序号 服务分类 配置文件 1 启动/关闭DS服务脚本 dolphinscheduler-daemon.sh 2 数据库连接配置 datasource.properties 3 zookeeper连接配置 zookeeper.properties 4 公共[存储]配置 common.properties 5 API服务配置 application-api.properties 6 Master服务配置 master.properties 7 Worker服务配置 worker.properties 8 Alert 服务配置 alert.properties 9 Quartz配置 quartz.properties 10 DS环境变量配置脚本[用于DS安装/启动] install_config.conf 11 运行脚本加载环境变量配置文件 [如: JAVA_HOME,HADOOP_HOME, HIVE_HOME …] dolphinscheduler_env.sh 12 各服务日志配置文件 api服务日志配置文件 : logback-api.xml master服务日志配置文件 : logback-master.xml worker服务日志配置文件 : logback-worker.xml alert服务日志配置文件 : logback-alert.xml 1.dolphinscheduler-daemon.sh [启动/关闭DS服务脚本]
dolphinscheduler-daemon.sh脚本负责DS的启动&关闭. start-all.sh/stop-all.sh最终也是通过dolphinscheduler-daemon.sh对集群进行启动/关闭操作. 目前DS只是做了一个基本的设置,JVM参数请根据各自资源的实际情况自行设置.
默认简化参数如下:
export DOLPHINSCHEDULER\_OPTS="-server -Xmx16g -Xms1g -Xss512k -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70"
不建议设置"-XX:DisableExplicitGC" , DS使用Netty进行通讯,设置该参数,可能会导致内存泄漏.
2.datasource.properties [数据库连接]
在DS中使用Druid对数据库连接进行管理,默认简化配置如下.
参数 默认值 描述 spring.datasource.driver-class-name 数据库驱动 spring.datasource.url 数据库连接地址 spring.datasource.username 数据库用户名 spring.datasource.password 数据库密码 spring.datasource.initialSize 5 初始连接池数量 spring.datasource.minIdle 5 最小连接池数量 spring.datasource.maxActive 5 最大连接池数量 spring.datasource.maxWait 60000 最大等待时长 spring.datasource.timeBetweenEvictionRunsMillis 60000 连接检测周期 spring.datasource.timeBetweenConnectErrorMillis 60000 重试间隔 spring.datasource.minEvictableIdleTimeMillis 300000 连接保持空闲而不被驱逐的最小时间 spring.datasource.validationQuery SELECT 1 检测连接是否有效的sql spring.datasource.validationQueryTimeout 3 检测连接是否有效的超时时间[seconds] spring.datasource.testWhileIdle true 申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。另外该节点设置了 “key” 和 “key1” 两个变量,这里用户用定义了一个与上游节点传递的变量名相同的变量 key1,并且复制了值为 “12”,但是由于我们设置的优先级的关系,这里的值 “12” 会被抛弃,最终使用上游节点设置的变量值。学习笔记、在这种架构下,集群中的管理者是被动态选择出来的,而不是预置的,并且集群在发生故障的时候,集群的节点会自发的举行"会议"来选举新的"管理者"去主持工作。而节点【use_create】中使用了最先结束的【createParam1】的值。 - Worker分组:任务分配给worker组的机器机执行,选择Default,会随机选择一台worker机执行。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6pUPPoeR-1683205971021)(null)]
参数优先级
DolphinScheduler 中所涉及的参数值的定义可能来自三种类型:
- 全局参数:在工作流保存页面定义时定义的变量
- 上游任务传递的参数:上游任务传递过来的参数
- 本地参数:节点的自有变量,用户在“自定义参数”定义的变量,并且用户可以在工作流定义时定义该部分变量的值
因为参数的值存在多个来源,当参数名相同时,就需要会存在参数优先级的问题。暂停、DolphinScheduler 参数的优先级从高到低为:
全局参数 > 上游任务传递的参数 > 本地参数
在上游任务传递的参数的情况下,由于上游可能存在多个任务向下游传递参数。datasource.properties、每个成员都有唯一的 userid,即所谓 “帐号”。Logback和netty实现日志访问
- 由于Web(UI)和Worker不一定在同一台机器上,所以查看日志不能像查询本地文件那样。Headers。
- 校验条件:支持默认响应码、如果linux没有这个用户,则会导致任务运行失败。–files、去中心化vs中心化
中心化思想
中心化的设计理念比较简单,分布式集群中的节点按照角色分工,大体上分为两种角色:
- Master的角色主要负责任务分发并监督Slave的健康状态,可以动态的将任务均衡到Slave上,以致Slave节点不至于“忙死”或”闲死”的状态。容错设计
容错分为服务宕机容错和任务重试,服务宕机容错又分为Master容错和Worker容错两种情况
1. 宕机容错
服务容错设计依赖于ZooKeeper的Watcher机制,实现原理如图:
其中Master监控其他Master和Worker的目录,如果监听到remove事件,则会根据具体的业务逻辑进行流程实例容错或者任务实例容错。内容不包含。同样,您可以在【工作流实例】页面,找到对应的节点实例,便可以查看该变量的值。最典型的案例就是ZooKeeper及Go语言实现的Etcd。停止、如果你是在生产中使用,推荐使用集群部署或者kubernetes
前置准备工作
伪分布式部署 DolphinScheduler 需要有外部软件的支持
- JDK:下载JDK (1.8+),并将 JAVA_HOME 配置到以及 PATH 变量中。–conf格式
- 资源:如果其他参数中引用了资源文件,需要在资源中选择指定
- 自定义参数:是MR局部的用户自定义参数,会替换脚本中以${变量}的内容
注意:JAVA和Scala只是用来标识,没有区别,如果是Python开发的Spark则没有主函数的class,其他都是一样
MapReduce(MR)节点
- 使用MR节点,可以直接执行MR程序。TIMESTAMP、 目前只支持HIVE的临时UDF函数
- UDF函数名称:输入UDF函数时的名称
- 包名类名:输入UDF函数的全路径
- UDF资源:设置创建的UDF对应的资源文件
监控中心
服务管理
- 服务管理主要是对系统中的各个服务的健康状况和基本信息的监控和显示
master监控
- 主要是master的相关信息。
如果 SQL 节点的结果只有一行,一个或多个字段,prop 的名字需要和字段名称一致。local三种模式
- slot数量:可以设置slot数
- taskManage数量:可以设置taskManage数
- jobManager内存数:可以设置jobManager内存数
- taskManager内存数:可以设置taskManager内存数
- 命令行参数:是设置Spark程序的输入参数,支持自定义参数变量的替换。需要容错、下面以 MySQL 为例,说明如何初始化数据库
mysql -uroot -pmysql> CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;# 修改 {user} 和 {password} 为你希望的用户名和密码mysql> GRANT ALL PRIVILEGES ON dolphinscheduler.* TO '{user}'@'%' IDENTIFIED BY '{password}';mysql> GRANT ALL PRIVILEGES ON dolphinscheduler.* TO '{user}'@'localhost' IDENTIFIED BY '{password}';mysql> flush privileges;
运行对应数据库的最新定义文件,位置在
dolphinscheduler/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_*.sql
。如下图  - 任务的优先级也分为5级,依次为HIGHEST、修改、sql、DolphinScheduler ENV、发布、大纲路线、当你需要使用 Python 虚拟环境时,可以通过创建不同的环境名称来实现。讲解视频,并且后续会持续更新****[需要这份系统化资料的朋友,可以戳这里获取](https://bbs.csdn.net/topics/618545628)**ixin.qq.com/cgi-bin/gettoken? corpid=corpId&corpsecret=secret | || enterprise.wechat.push.url | https://qyapi.weixin.qq.com/cgi-bin/message/send? access\_token=$token | || enterprise.wechat.user.send.msg | | 发送消息格式 || enterprise.wechat.team.send.msg | | 群发消息格式 || plugin.dir | /Users/xx/your/path/to/plugin/dir | 插件目录 |##### 9.quartz.properties [Quartz配置]这里面主要是quartz配置,请结合实际业务场景&资源进行配置,本文暂时不做展开.| 参数 | 默认值 | 描述 || --- | --- | --- || org.quartz.jobStore.driverDelegateClass | org.quartz.impl.jdbcjobstore.StdJDBCDelegate | || org.quartz.jobStore.driverDelegateClass | org.quartz.impl.jdbcjobstore.PostgreSQLDelegate | || org.quartz.scheduler.instanceName | DolphinScheduler | || org.quartz.scheduler.instanceId | AUTO | || org.quartz.scheduler.makeSchedulerThreadDaemon | true | || org.quartz.jobStore.useProperties | false | || org.quartz.threadPool.class | org.quartz.simpl.SimpleThreadPool | || org.quartz.threadPool.makeThreadsDaemons | true | || org.quartz.threadPool.threadCount | 25 | || org.quartz.threadPool.threadPriority | 5 | || org.quartz.jobStore.class | org.quartz.impl.jdbcjobstore.JobStoreTX | || org.quartz.jobStore.tablePrefix | QRTZ\_ | || org.quartz.jobStore.isClustered | true | || org.quartz.jobStore.misfireThreshold | 60000 | || org.quartz.jobStore.clusterCheckinInterval | 5000 | || org.quartz.jobStore.acquireTriggersWithinLock | true | || org.quartz.jobStore.dataSource | myDs | || org.quartz.dataSource.myDs.connectionProvider.class | org.apache.dolphinscheduler.service.quartz.DruidConnectionProvider | |##### 10.install\_config.conf [DS环境变量配置脚本[用于DS安装/启动]]install\_config.conf这个配置文件比较繁琐,这个文件主要有两个地方会用到.* 1.DS集群的自动安装.> > 调用install.sh脚本会自动加载该文件中的配置.并根据该文件中的内容自动配置上述的配置文件中的内容. 比如:dolphinscheduler-daemon.sh、比如A流程依赖昨天的B流程执行成功,依赖节点会去检查B流程在昨天是否有执行成功的实例。
中心化思想设计存在的问题:
- 一旦Master出现了问题,则群龙无首,整个集群就会崩溃。LOW、
- 管理员进入安全中心->队列管理页面,点击“创建队列”按钮,创建队列。OUT两种,数据类型支持VARCHAR、master.properties、
- 管理员进入安全中心->用户管理页面,点击需授权用户的“授权”按钮,如下图所示:
- 选择项目,进行项目授权。超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
- 子节点:是选择子流程的工作流定义,右上角进入该子节点可以跳转到所选子流程的工作流定义
依赖节点
- 依赖节点,就是依赖检查节点。工作流定义统计。默认的用户名和密码是 admin/dolphinscheduler123
启停服务
# 一键停止集群所有服务sh ./bin/stop-all.sh# 一键开启集群所有服务sh ./bin/start-all.sh# 启停 Mastersh ./bin/dolphinscheduler-daemon.sh stop master-serversh ./bin/dolphinscheduler-daemon.sh start master-server# 启停 Workersh ./bin/dolphinscheduler-daemon.sh start worker-serversh ./bin/dolphinscheduler-daemon.sh stop worker-server# 启停 Apish ./bin/dolphinscheduler-daemon.sh start api-serversh ./bin/dolphinscheduler-daemon.sh stop api-server# 启停 Loggersh ./bin/dolphinscheduler-daemon.sh start logger-serversh ./bin/dolphinscheduler-daemon.sh stop logger-server# 启停 Alertsh ./bin/dolphinscheduler-daemon.sh start alert-serversh ./bin/dolphinscheduler-daemon.sh stop alert-server
集群部署(Cluster)
集群部署目的是在多台机器部署 DolphinScheduler 服务,用于运行大量任务情况。
- 自定义参数:是http局部的用户自定义参数,会替换脚本中以${变量}的内容。恢复、+ **RetryReportTaskStatusThread**主要通过netty向master汇报任务状态,如果汇报失败,会一直重试汇报+ **LoggerServer**是一个日志服务,提供日志分片查看、准备暂停、
- 超时告警:勾选超时告警、LOWEST。以创建 dolphinscheduler 用户为例
# 创建用户需使用 root 登录useradd dolphinscheduler# 添加密码echo "dolphinscheduler" | passwd --stdin dolphinscheduler# 配置 sudo 免密sed -i '$adolphinscheduler ALL=(ALL) NOPASSWD: NOPASSWD: ALL' /etc/sudoerssed -i 's/Defaults requirett/#Defaults requirett/g' /etc/sudoers# 修改目录权限,使得部署用户对 dolphinscheduler-bin 目录有操作权限chown -R dolphinscheduler:dolphinscheduler dolphinscheduler-bin
*注意:*
- 因为任务执行服务是以
sudo -u {linux-user}
切换不同 linux 用户的方式来实现多租户运行作业,所以部署用户需要有 sudo 权限,而且是免密的。源码讲义、 - 普通用户登录后,点击用户名下拉框中的用户信息,进入用户信息页面,点击"编辑"按钮,编辑用户信息。alert.properties、
- 请求参数:支持Parameter、数据源、失败、而节点 【useParam】与节点【noUseParam】中并没有依赖关系,所以并不会获取到节点【noUseParam】的变量。附件或表格附件三种模板。下线、POSt、common.properties、
创建普通用户
- 用户分为管理员用户和普通用户
- 管理员有授权和用户管理等权限,没有创建项目和工作流定义的操作的权限。
- 点击项目管理-项目名称-工作流定义,点击"创建工作流"按钮,进入DAG编辑页面。流程失败恢复、
DATAX节点
- 拖动工具栏中的
任务节点到画板中
- 自定义模板:打开自定义模板开关时,可以自定义datax节点的json配置文件内容(适用于控件配置不满足需求时)
- 数据源:选择抽取数据的数据源
- sql语句:目标库抽取数据的sql语句,节点执行时自动解析sql查询列名,映射为目标表同步列名,源表和目标表列名不一致时,可以通过列别名(as)转换
- 目标库:选择数据同步的目标库
- 目标表:数据同步的目标表名
- 前置sql:前置sql在sql语句之前执行(目标库执行)。
- 调用示例:
/\*\* \* test token \*/ public void doPOSTParam()throws Exception{ // create HttpClient CloseableHttpClient httpclient = HttpClients.createDefault(); // create http post request HttpPost httpPost = new HttpPost("http://127.0.0.1:12345/escheduler/projects/create"); httpPost.setHeader("token", "123"); // set parameters List<NameValuePair> parameters = new ArrayList<NameValuePair>(); parameters.add(new BasicNameValuePair("projectName", "qzw")); parameters.add(new BasicNameValuePair("desc", "qzw")); UrlEncodedFormEntity formEntity = new UrlEncodedFormEntity(parameters); httpPost.setEntity(formEntity); CloseableHttpResponse response = null; try { // execute response = httpclient.execute(httpPost); // response status code 200 if (response.getStatusLine().getStatusCode() == 200) { String content = EntityUtils.toString(response.getEntity(), "UTF-8"); System.out.println(content); } } finally { if (response != null) { response.close(); } httpclient.close(); } }
授予权限
- 授予权限包括项目权限,资源权限,数据源权限,UDF函数权限。内容不包含时,需填写校验内容。获取到 SQL 查询结果后会将对应列转化为 LIST,并将该结果转化为 JSON 后作为对应变量的值。–files、依赖节点等。源码讲义、s3、worker、失败、流程失败重跑的概念:
- 任务失败重试是任务级别的,是调度系统自动进行的,比如一个Shell任务设置重试次数为3次,那么在Shell任务运行失败后会自己再最多尝试运行3次
- 流程失败恢复是流程级别的,是手动进行的,恢复是从只能从失败的节点开始执行或从当前节点开始执行
- 流程失败重跑也是流程级别的,是手动进行的,重跑是从开始节点进行
接下来说正题,我们将工作流中的任务节点分了两种类型。
如果工作流中有任务失败达到最大重试次数,工作流就会失败停止,失败的工作流可以手动进行重跑操作或者流程恢复操作
四、POSTGRESQL、LONG、-archives格式
- 资源: 如果其他参数中引用了资源文件,需要在资源中选择指定
- 自定义参数:是MR局部的用户自定义参数,会替换脚本中以${变量}的内容
Python程序
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-p1oexZeW-1683205971444)(null)]
- 程序类型:选择Python语言
- 主jar包:是运行MR的Python jar包
- 其他参数:支持 –D、
- 注意:对于用户自己创建的项目,该用户拥有所有的权限。
- 其他参数:支持 --jars、
注意:由于” 网络抖动”可能会使得节点短时间内失去和ZooKeeper的心跳,从而发生节点的remove事件。反而动态中心化分布式系统正在不断涌出。修改步骤为"安全中心 -> worker分组管理 -> 点击 ‘新建worker分组’ -> 输入’组名称’ -> 选择已有worker -> 点击’提交’"
环境管理
- 在线配置Worker运行环境,一个Worker可以指定多个环境,每个环境等价于dolphinscheduler_env.sh文件.
- 默认环境为dolphinscheduler_env.sh文件.
- 在任务执行时,可以将任务分配给指定worker分组,根据worker分组选择对应的环境,最终由该组中的worker节点执行环境后执行该任务.
创建/更新 环境
- 环境配置等价于dolphinscheduler_env.sh文件内配置
使用 环境
- 在工作流定义中创建任务节点选择Worker分组和Worker分组对应的环境,任务执行时Worker会先执行环境在执行任务.
API 调用
背景
一般都是通过页面来创建项目、
- 校验内容:当校验条件选择自定义响应码、worker分组管理、
去中心化
- 在去中心化设计里,通常没有Master/Slave的概念,所有的角色都是一样的,地位是平等的,全球互联网就是一个典型的去中心化的分布式系统,联网的任意节点设备down机,都只会影响很小范围的功能。Database、
所有任务都可以配置失败重试的次数,当该任务节点失败,会自动重试,直到成功或者超过配置的重试次数。 WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。
SQL
prop 为用户指定;方向选择为 OUT,只有当方向为 OUT 时才会被定义为变量输出;数据类型可以根据需要选择不同数据结构;value 部分不需要填写。conf、
我们再以上图中包含 SQL 节点的流程举例说明:
上图中节点【createParam1】的定义如下:
节点【createParam2】的定义如下:
您可以在【工作流实例】页面,找到对应的节点实例,便可以查看该变量的值。INTEGER、准备停止、暂停、zookeeper.properties、
使用方式
本地参数配置方式如下:在任务定义页面,点击“自定义参数”右边的加号,填写对应的变量名称和对应的值,保存即可
如果想要在本地参数中调用系统内置参数,将内置参数对应的值填到
value
中,如上图中的${biz_date}
以及${curdate}
参数的引用
DolphinScheduler 提供参数间相互引用的能力,包括:本地参数引用全局参数、流程状态统计、
- 管理员有授权和用户管理等权限,没有创建项目和工作流定义的操作的权限。
- 资源:是指脚本中需要调用的资源文件列表
- 自定义参数:是Python局部的用户自定义参数,会替换脚本中以${变量}的内容
- 注意:若引入资源目录树下的python文件,需添加
__init__.py
文件
Flink节点
- 拖动工具栏中的
任务节点到画板中,如下图所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-D8Zut6Pc-1683205976158)(null)]
- 程序类型:支持JAVA、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
- 脚本:用户开发的SHELL程序。成功、上传jar包等各种类型文件,可进行编辑、大纲路线、
- 超时告警:勾选超时告警、数据源和UDF函数授权方式都是一样的,所以以项目授权为例介绍。
2:我们再以 sql 节点来解释另外一种情况
节点【use_create】的定义如下:
“status” 是当前节点设置的节点的自有变量。mapreduce等程序,需要用到“队列”参数时使用的。停止、数据类型可选择为除 LIST 以外的其他类型。-mapper、
SPARK节点
- 通过SPARK节点,可以直接直接执行SPARK程序,对于spark节点,worker会使用
spark-submit
方式提交任务
拖动工具栏中的
任务节点到画板中,如下图所示:
- 程序类型:支持JAVA、
- 任务优先级:worker线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。下载、SPARK、内容包含、
- Worker分组:任务分配给worker组的机器机执行,选择Default,会随机选择一台worker机执行。
Shell
prop 为用户指定;方向选择为 OUT,只有当方向为 OUT 时才会被定义为变量输出;数据类型可以根据需要选择不同数据结构;value 部分不需要填写。
如果你是新手,想要体验 DolphinScheduler 的功能,推荐使用Standalone方式体检。FLOAT、
- 管理员可以对普通用户进行非其创建的项目、yarn-client和local三种模式
- Driver内核数:可以设置Driver内核数及内存数
- Executor数量:可以设置Executor数量、
上游任务传递给下游任务
DolphinScheduler 允许在任务间进行参数传递,目前传递方向仅支持上游单向传递给下游。该服务统一提供RESTful api向外部提供请求服务。
项目管理
创建项目
点击"项目管理"进入项目管理页面,点击“创建项目”按钮,输入项目名称,项目描述,点击“提交”,创建新的项目。讲解视频,并且后续会持续更新
需要这份系统化资料的朋友,可以戳这里获取
登录 DolphinScheduler
浏览器访问地址 http://localhost:12345/dolphinscheduler 即可登录系统UI。
- 上传udf资源
和上传文件相同。则项目列表和已选项目列表中不会显示。
创建用户
创建 Token
- 登录调度系统,点击 “安全中心”,再点击左侧的 “令牌管理”,点击 “令牌管理” 创建令牌
- 选择 “失效时间” (Token有效期),选择 “用户” (以指定的用户执行接口操作),点击 “生成令牌” ,拷贝 Token 字符串,然后点击 “提交”
使用 Token
- 打开 API文档页面
地址:http://{api server ip}:12345/dolphinscheduler/doc.html?language=zh_CN&lang=cn
- 选一个测试的接口,本次测试选取的接口是:查询所有项目
projects/query-project-list
- 打开 Postman,填写接口地址,并在 Headers 中填写 Token,发送请求后即可查看结果
token: 刚刚生成的 Token
用户授权
用户登录
http://192.168.1.163:12345/dolphinscheduler/ui/#/monitor/servers/master
资源上传
创建工作流
查看执行结果
查看日志结果
(二)高级指南
系统架构设计
本章节介绍Apache DolphinScheduler调度系统架构
1.系统架构
1.1 系统架构图
系统架构图
1.2 启动流程活动图
启动流程活动图
1.3 架构说明
- MasterServer
MasterServer采用分布式无中心设计理念,MasterServer主要负责 DAG 任务切分、TIME、
- 因为任务执行服务是以
- 另外一个问题是如果Scheduler在Master上,虽然可以支持一个DAG中不同的任务运行在不同的机器上,但是会产生Master的过负载。
函数管理
- 创建udf函数
点击“创建UDF函数”,输入udf函数参数,选择udf资源,点击“提交”,创建udf函数。Master执行流程
- DolphinScheduler使用分片算法将command取模,根据master的排序id分配,master将拿到的command转换成工作流实例,使用线程池处理工作流实例
- DolphinScheduler对工作流的处理流程:
- 通过UI或者API调用,启动工作流,持久化一条command到数据库中
- Master通过分片算法,扫描Command表,生成工作流实例ProcessInstance,同时删除Command数据
- Master使用线程池运行WorkflowExecuteThread,执行工作流实例的流程,包括构建DAG,创建任务实例TaskInstance,将TaskInstance通过netty发送给worker
- Worker收到任务以后,修改任务状态,并将执行信息返回Master
- Master收到任务信息,持久化到数据库,并且将状态变化事件存入EventExecuteService事件队列
- EventExecuteService根据事件队列调用WorkflowExecuteThread进行后续任务的提交和工作流状态的修改
三、
- 还有一种是逻辑节点,这种节点不做实际的脚本或语句处理,只是整个流程流转的逻辑处理,比如子流程节等。非查询是没有结果集返回的,是针对update、
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、
- 实际上,真正去中心化的分布式系统并不多见。
- Master容错流程图:
ZooKeeper Master容错完成之后则重新由DolphinScheduler中Scheduler线程调度,遍历 DAG 找到”正在运行”和“提交成功”的任务,对”正在运行”的任务监控其任务实例的状态,对”提交成功”的任务需要判断Task Queue中是否已经存在,如果存在则同样监控任务实例的状态,如果不存在则重新提交任务实例。
- Master的角色主要负责任务分发并监督Slave的健康状态,可以动态的将任务均衡到Slave上,以致Slave节点不至于“忙死”或”闲死”的状态。容错设计
spring.datasource.testOnBorrow true 申请连接时执行validationQuery检测连接是否有效 spring.datasource.testOnReturn false 归还连接时执行validationQuery检测连接是否有效 spring.datasource.defaultAutoCommit true 是否开启自动提交 spring.datasource.keepAlive true 连接池中的minIdle数量以内的连接,空闲时间超过minEvictableIdleTimeMillis,则会执行keepAlive操作。 - 运行标志:标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。java、在管理后台 -> “应用与小程序” -> “应用”,点进某个应用,即可看到 agentidenterprise.wechat.agent.id="xxxx"# 设置 userid,多个用逗号分隔。UDF函数授权同项目授权。项目等授权
- 管理员登录,默认用户名密码:admin/dolphinscheduler123
创建队列
- 队列是在执行spark、区别在于SQL任务类型自定义参数会替换sql语句中${变量}。CLICKHOUSE、HEAD、配置文件在路径在
conf/config/install_config.conf
下,此处我们仅需修改INSTALL MACHINE,DolphinScheduler ENV、等待线程的个数 - 流程状态统计:在指定时间范围内,统计工作流实例中状态为提交成功、正在运行、讲解视频,并且后续会持续更新
需要这份系统化资料的朋友,可以戳这里获取
停止、3.创建完租户会在 hdfs 对应的目录上有相关的文件夹。
worker监控
- 主要是worker的相关信息。
+ 具体实现是根据任务实例的json解析优先级,然后把**流程实例优先级\_流程实例id\_任务优先级\_任务id**信息保存在ZooKeeper任务队列中,当从任务队列获取的时候,通过字符串比较即可得出最需要优先执行的任务 - 其中流程定义的优先级是考虑到有些流程需要先于其他流程进行处理,这个可以在流程启动或者定时启动时配置,共有5级,依次为HIGHEST、Executor内存数和Executor内核数
- 命令行参数:是设置Spark程序的输入参数,支持自定义参数变量的替换。任务提交、common.properties、quartz.properties 等文件.> > > * 2.DS集群的启动&关闭.> > DS集群在启动&关闭的时候,会加载该配置文件中的masters,workers,alertServer,apiServers等参数,启动/关闭DS集群.> > > 文件内容如下:
注意: 该配置文件中如果包含特殊字符,如:
.\*[]^${}\+?|()@#&
, 请转义,示例:
[
转义为\[
数据库类型, 目前仅支持 postgresql 或者 mysql
dbtype=“mysql”
数据库 地址 & 端口
dbhost=“192.168.xx.xx:3306”
数据库 名称
dbname=“dolphinscheduler”
[外链图片转存中…(img-knKWkeSX-1714861567395)]
[外链图片转存中…(img-ecbq5gMY-1714861567395)]
[外链图片转存中…(img-IZOsaAjG-1714861567395)]既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、
上一篇:单机卡牌游戏下载量突破百万