ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

dolphinscheduler源码解析-MasterSchedulerService

2021-07-29 22:35:14  阅读:236  来源: 互联网

标签:MasterSchedulerService dolphinscheduler private 源码 command master 线程 logger


dolphinscheduler 源码解析-MasterSchedulerService

文章目录

类定义

@Service
public class MasterSchedulerService extends Thread 

可以看出该类继承了线程基类,那该类就可以在线程池内执行。

类属性

    /**
     * logger of MasterSchedulerService
     */
    private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class);

    /**
     * dolphinscheduler database interface
     */
    @Autowired
    private ProcessService processService;

    /**
     * zookeeper master client
     */
    @Autowired
    private MasterRegistryClient masterRegistryClient;

    /**
     * master config
     */
    @Autowired
    private MasterConfig masterConfig;

    /**
     * alert manager
     */
    @Autowired
    private ProcessAlertManager processAlertManager;

    /**
     *  netty remoting client
     */
    private NettyRemotingClient nettyRemotingClient;

    /**
     * master exec service
     */
    private ThreadPoolExecutor masterExecService;

可以看出它有一个ProcessService这个属性集成了很多mappers类,提供数据dao服务。还有一个ProcessAlertManager 告警的管理器,另外也有netty的客户端和一个线程池。

初始化方法

@PostConstruct
public void init() {
    this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());
    NettyClientConfig clientConfig = new NettyClientConfig();
    this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}

对线程池进行赋值,并且创建一个netty的客户端。

既然是线程类,就必须有run方法,我们查看一下run方法

    /**
     * run of MasterSchedulerService
     */
    @Override
    public void run() {
        logger.info("master scheduler started");
        while (Stopper.isRunning()) {
            try {
                boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
                if (!runCheckFlag) {
                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                    continue;
                }
                scheduleProcess();
            } catch (Exception e) {
                logger.error("master scheduler thread error", e);
            }
        }
    }

该run方法会先检查一下资源,看是否有空闲资源,如果没有就让线程睡眠一会儿然后重新检查资源,当有了足够的资源就开始执行scheduleProcess方法

我们向下追踪scheduleProcess方法

    private void scheduleProcess() throws Exception {

        try {
            masterRegistryClient.blockAcquireMutex();

            int activeCount = masterExecService.getActiveCount();
            // make sure to scan and delete command  table in one transaction
            Command command = processService.findOneCommand();
            if (command != null) {
                logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType());

                try {

                    ProcessInstance processInstance = processService.handleCommand(logger,
                            getLocalAddress(),
                            this.masterConfig.getMasterExecThreads() - activeCount, command);
                    if (processInstance != null) {
                        logger.info("start master exec thread , split DAG ...");
                        masterExecService.execute(
                                new MasterExecThread(
                                        processInstance
                                        , processService
                                        , nettyRemotingClient
                                        , processAlertManager
                                        , masterConfig));
                    }
                } catch (Exception e) {
                    logger.error("scan command error ", e);
                    processService.moveToErrorCommand(command, e.toString());
                }
            } else {
                //indicate that no command ,sleep for 1s
                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
            }
        } finally {
            masterRegistryClient.releaseLock();
        }
    }

首先获取一个master的分布式锁

查看master的executor线程池中有多少active状态的线程。

从数据库中拿到一个command命令

然后构造一个该命令对应的实体类ProcessInstance

然后在该类的线程池内执行MasterExecThread线程【点击打开MasterExecThread】。

标签:MasterSchedulerService,dolphinscheduler,private,源码,command,master,线程,logger
来源: https://blog.csdn.net/sinat_35045195/article/details/119222093

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有