ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

2021SC@SDUSC(dolphinscheduler- common)

2021-11-16 21:02:21  阅读:258  来源: 互联网

标签:task 2021SC 实例 dolphinscheduler processInstance dag processDag SDUSC DAG


executeProcess按顺序调用了prepareProcess、runProcess、endProcess三个方法,简单来说就是初始化、执行、释放资源。 prepareProcess又按顺序调用了initTaskQueue、buildFlowDag。

initTaskQueue就是一些资源的初始化操作,比如通过流程定义ID查询到当前的任务实例。下面是其核心逻辑,可以发现,就是查询了完成的任务列表,报错且不能重试的任务列表。

List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId());
for(TaskInstance task : taskInstanceList){
    if(task.isTaskComplete()){
        completeTaskList.put(task.getName(), task);
    }
    if(task.getState().typeIsFailure() && !task.taskCanRetry()){
        errorTaskList.put(task.getName(), task);
    }
}

buildFlowDag看名字应该是生成DAG实例的,代码虽短,但调用了好几个函数,我们只重点分析最后一个函数调用。

private void buildFlowDag() throws Exception {
    recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());

    forbiddenTaskList = DagHelper.getForbiddenTaskNodeMaps(processInstance.getProcessInstanceJson());
    // generate process to get DAG info
    List<String> recoveryNameList = getRecoveryNodeNameList();
    List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
    ProcessDag processDag = generateFlowDag(processInstance.getProcessInstanceJson(),
            startNodeNameList, recoveryNameList, processInstance.getTaskDependType());
    if(processDag == null){
        logger.error("processDag is null");
        return;
    }
    // generate process dag
    dag = DagHelper.buildDagGraph(processDag);
}

DagHelper.buildDagGraph生成了一个DAG对象实例,根据名字和注释猜测,这应该是对有向无环图的一个抽象。

/**
 * the object of DAG
 */
private DAG<String,TaskNode,TaskNodeRelation> dag;

来看下DAG类的定义

/**
 * analysis of DAG
 * Node: node
 * NodeInfo:node description information
 * EdgeInfo: edge description information
 */
public class DAG<Node, NodeInfo, EdgeInfo>

DAG有三个类型参数,分别代表节点key、节点信息、边信息。

下面是TaskNode的字段

TaskNode

发现TaskNode的字段跟UI一一对应

TaskNodeRelation

 

TaskNodeRelation代表边的信息,字段比较少,只有startNode、endNode两个String类型的字段。这其实是DAG类的第一个类型参数,节点的key。

public static DAG<String, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) {

    DAG<String,TaskNode,TaskNodeRelation> dag = new DAG<>();

    /**
     * add vertex
     */
    if (CollectionUtils.isNotEmpty(processDag.getNodes())){
        for (TaskNode node : processDag.getNodes()){
            dag.addNode(node.getName(),node);
        }
    }

    /**
     * add edge
     */
    if (CollectionUtils.isNotEmpty(processDag.getEdges())){
        for (TaskNodeRelation edge : processDag.getEdges()){
            dag.addEdge(edge.getStartNode(),edge.getEndNode());
        }
    }
    return dag;
}

 

上面是buildDagGraph的源码。可以看出,增加节点时,第一个参数是TaskNode的getName。跟猜测的一样,DAG的第一个参数就是node的key,而key就是名称。

细心的读者一定发现,DAG对象是根据ProcessDag来创建的

ProcessDag

 

DAG是把节点、边的一个List转化成了一个Graph。

初始化完成之后,来看一下具体如何执行流程定义的。

runProcess

 

这个方法源码很长,我们首先从整体简要分析。

  1. submitPostNode(null)
  2. 起一个while循环,直至流程定义实例停止(成功、失败、取消、暂停、等待)
  3. 首先判断是否超时,超时则发送预警邮件
  4. 获取当前活动的任务节点的Map。key是MasterBaseTaskExecThread对象,value是Future<Boolean>。value其实是MasterBaseTaskExecThread线程的当前状态。
  5. 如果当前任务实例已经结束,则从Map中移除
  6. 如果当前任务实例成功,则put到completeTaskList且调用submitPostNode(task.getName())
  7. 如果当前任务实例失败,则重试;否则直接结束(比如手动停止或暂停)
  8. 更新当前流程定义实例的状态,进入下一个循环

标签:task,2021SC,实例,dolphinscheduler,processInstance,dag,processDag,SDUSC,DAG
来源: https://blog.csdn.net/Santan1412/article/details/121317599

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有