ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

2021SC@SDUSC(dolphinscheduler- common)

2021-11-14 14:34:06  阅读:193  来源: 互联网

标签:线程 processDao 2021SC MasterExecThread dolphinscheduler processInstance command S


在深入分析run之前,先简单分析一下 Stopper.isRunning() 的逻辑。

/**
 *  if the process closes, a signal is placed as true, and all threads get this flag to stop working
 */
public class Stopper {

	private static volatile AtomicBoolean signal = new AtomicBoolean(false);
	
	public static final boolean isStoped(){
		return signal.get();
	}
	
	public static final boolean isRunning(){
		return !signal.get();
	}
	
	public static final void stop(){
		signal.getAndSet(true);
	}
}

其逻辑非常简单,就是用一个原子布尔值,标志当前进程是否要退出。如果收到了退出信号,则signal为true,该进程内所有的线程都退出当前循环。

下面我们来分析查询到一个Command之后的逻辑:

if (command != null) {
    logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));

    try{
        processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
        if (processInstance != null) {
            logger.info("start master exec thread , split DAG ...");
            masterExecService.execute(new MasterExecThread(processInstance,processDao));
        }
    }catch (Exception e){
        logger.error("scan command error ", e);
        processDao.moveToErrorCommand(command, e.toString());
    }
}

其实就是根据Command创建了一个ProcessInstance(流程实例),之前也分析过,流程定义是由Scheduler自动创建的,而Quartz已经根据Schedule信息创建了Command保存到了数据库。至此,流程定义与定时的关联逻辑就已经串起来了。

创建流程实例的时候传入了当前可用(masterExecThreadNum - activeCount)的线程数量,如果满足当前dag,则返回ProcessInstance,否则返回null。

ProcessInstance最终交由MasterExecThread去执行。

至此MasterSchedulerThread类的主要逻辑如下:

  1. 调用OSUtils.checkResource,检查当前资源(内存、CPU)。
  2. 资源超出阈值,则休眠1秒进入下一次循环。
  3. 检查zookeeper是否连接成功
  4. 获取一个InterProcessMutex锁(分布式的公平可重入互斥锁)。也就是只有一个master可以获取到这个锁
  5. 查询一个Command,如果当前线程数够用,则创建一个流程实例(ProcessInstance),交给MasterExecThread线程处理。
  6. 休眠1秒,进入下一次循环
  7. 进入下一次循环之前,释放InterProcessMutex锁

在结束MasterExecThread的源码分析之前,我们再简要分析一下这个类比较重要的一个字段:processDao。

ProcessDao

这个类,可以看成是与流程定义相关的操作集合,与流程定义存储相关的操作、逻辑的集合。

processDao.moveToErrorCommand需要稍微注意一下,在异常情况下,它把Command从原来的表中删除,然后插入到了t_ds_error_command表。

MasterExecThread

与MasterSchedulerThread一样,MasterExecThread也是实现了Runnable的线程类,不过我们先来看MasterExecThread的构造函数。

public MasterExecThread(ProcessInstance processInstance,ProcessDao processDao){
    this.processDao = processDao;

    this.processInstance = processInstance;

    int masterTaskExecNum = conf.getInt(Constants.MASTER_EXEC_TASK_THREADS,
            Constants.defaultMasterTaskExecNum);
    this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread",
            masterTaskExecNum);
}

taskExecService这个字段非常重要,它是一个固定大小(20)的后台线程池。这意味着,一个DAG最大的并发任务数就是20。

static {
    try {
        conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH);
    }catch (ConfigurationException e){
        logger.error("load configuration failed : " + e.getMessage(),e);
        System.exit(1);
    }
}

下面分析该类的run方法。

public void run() {

    // process instance is null
    if (processInstance == null){
        logger.info("process instance is not exists");
        return;
    }

    // check to see if it's done
    if (processInstance.getState().typeIsFinished()){
        logger.info("process instance is done : {}",processInstance.getId());
        return;
    }

    try {
        if (processInstance.isComplementData() &&  Flag.NO == processInstance.getIsSubProcess()){
            // sub process complement data
            executeComplementProcess();
        }else{
            // execute flow
            executeProcess();
        }
    }catch (Exception e){
        logger.error("master exec thread exception: " + e.getMessage(), e);
        logger.error("process execute failed, process id:{}", processInstance.getId());
        processInstance.setState(ExecutionStatus.FAILURE);
        processInstance.setEndTime(new Date());
        processDao.updateProcessInstance(processInstance);
    }finally {
        taskExecService.shutdown();
        // post handle
        postHandle();
    }
}

分析源码后,简要总结其逻辑如下:

  1. 判断processInstance是否为null。为null则退出
  2. 判断processInstance是否已经完成(成功、报错、取消、暂停、等待)
  3. 判断是否为补数。是则走补数的逻辑
  4. 执行当前流程定义实例(executeProcess)
  5. 调用taskExecService.shutdown(),等待所有线程正常退出

标签:线程,processDao,2021SC,MasterExecThread,dolphinscheduler,processInstance,command,S
来源: https://blog.csdn.net/Santan1412/article/details/121317128

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有