ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

2021SC@SDUSC(dolphinscheduler- common2)

2021-11-20 17:00:22  阅读:179  来源: 互联网

标签:task MasterBaseTaskExecThread taskInstance dolphinscheduler common2 processInsta


activeTaskNode是一个非常重要的对象,从上一篇文章的分析中,可以猜测,activeTaskNode是由submitPostNode间接生成赋值的,并通过while循环驱动了整个流程实例的执行。

private void submitPostNode(String parentNodeName){

    List<TaskInstance> submitTaskList = null;
    if(parentNodeName == null){
        submitTaskList = getStartSubmitTaskList();
    }else{
        submitTaskList = getPostTaskInstanceByNode(dag, parentNodeName);
    }
    // if previous node success , post node submit
    for(TaskInstance task : submitTaskList){
        if(readyToSubmitTaskList.containsKey(task.getName())){
            continue;
        }

        if(completeTaskList.containsKey(task.getName())){
            logger.info("task {} has already run success", task.getName());
            continue;
        }
        if(task.getState().typeIsPause() || task.getState().typeIsCancel()){
            logger.info("task {} stopped, the state is {}", task.getName(), task.getState().toString());
        }else{
            addTaskToStandByList(task);
        }
    }
}

submitPostNode的源码细节不再深入分析,大概就是从dag对象中找出入度为0的节点,放入到准备队列中。其实在runProcess方法中,还调用了submitStandByTask方法,该方法最终调起了可以执行的节点。从这点来看,整个流程实例由submitPostNode、submitStandByTask和while驱动。

那么问题来了,流程实例的任务具体是怎么调起来的呢?下面是submitStandByTask方法中调用的最重要的函数,也是由它调起来的。

/**
 * submit task to execute
 * @param taskInstance task instance
 * @return TaskInstance
 */
private TaskInstance submitTaskExec(TaskInstance taskInstance) {
    MasterBaseTaskExecThread abstractExecThread = null;
    if(taskInstance.isSubProcess()){
        abstractExecThread = new SubProcessTaskExecThread(taskInstance, processInstance);
    }else {
        abstractExecThread = new MasterTaskExecThread(taskInstance, processInstance);
    }
    Future<Boolean> future = taskExecService.submit(abstractExecThread);
    activeTaskNode.putIfAbsent(abstractExecThread, future);
    return abstractExecThread.getTaskInstance();
}

逻辑也比较简单,就是把TaskInstance交给MasterTaskExecThread去执行;taskExecService提交之后,放到activeTaskNode列表,交由主逻辑判断任务是否完成。

MasterTaskExecThread

根据其定义,我们知道MasterTaskExecThread继承了MasterBaseTaskExecThread,且构造函数简单的调用了父类的构造函数。

public class MasterTaskExecThread extends MasterBaseTaskExecThread

MasterBaseTaskExecThread的构造函数也比较简单,给几个关键的字段赋初始值。

/**
 * constructor of MasterBaseTaskExecThread
 * @param taskInstance      task instance
 * @param processInstance   process instance
 */
public MasterBaseTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
    this.processDao = BeanContext.getBean(ProcessDao.class);
    this.alertDao = BeanContext.getBean(AlertDao.class);
    this.processInstance = processInstance;
    this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
    this.cancel = false;
    this.taskInstance = taskInstance;
}

但processDao、alertDao居然是通过BeanContext.getBean获取到的!!!个人感觉这是一个非常恶心的设计。一个优秀的设计,应该是类的创建者负责子类的参数及其功能的边界。BeanContext.getBean扩展了所有类与SpringBoot的ApplicationContext间接打交道的能力,而且无法控制,因为只要调用BeanContext.getBean都可以获取到对应的bean进行操作。

MasterBaseTaskExecThread实现了Callable<Boolean>接口,call方法又调用了submitWaitComplete,MasterTaskExecThread类中对改方法进行了覆盖。

submitWaitComplete根据名称及其注释说明可以知道,它提交了一个任务实例,然后等待其完成。

/**
 * submit task instance and wait complete
 * @return true is task quit is true
 */
@Override
public Boolean submitWaitComplete() {
    Boolean result = false;
    this.taskInstance = submit();
    if(!this.taskInstance.getState().typeIsFinished()) {
        result = waitTaskQuit();
    }
    taskInstance.setEndTime(new Date());
    processDao.updateTaskInstance(taskInstance);
    logger.info("task :{} id:{}, process id:{}, exec thread completed ",
            this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );
    return result;
}

该函数的逻辑简单来说就是,提交一个任务实例,等待任务完成,更新任务结束时间到数据。

我们可以看出,每个任务实例都可以更新数据库,加上其他线程,对数据库的压力可能很大。如果任务非常多,并发非常大的情况下,jdbc连接线程池需要适当调大。否则,数据库会成为系统瓶颈。如果worker节点个数过多,这种压力又会几何倍数的增长。

标签:task,MasterBaseTaskExecThread,taskInstance,dolphinscheduler,common2,processInsta
来源: https://blog.csdn.net/Santan1412/article/details/121441303

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有