ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

datax的启动文体datax.py解析

2021-08-07 13:03:35  阅读:587  来源: 互联网

标签:文体 option py job jobResource add datax options


datax生产环境启动运行是通过datax.py启动的,如下:

$ python datax.py job/{YOUR_JOB.json}

这篇文章就是打算解读下datax.py这个源码。

我们从main函数开始,沿着程序的执行流程慢慢解读。

if __name__ == "__main__":
    printCopyright()    //打印版权信息
    parser = getOptionParser()  //获取参数解析器
    options, args = parser.parse_args(sys.argv[1:]) //解析参数
    if options.reader is not None and options.writer is not None:
    //如果解析后,入参的 reader和writer不为空,在从github上构建出一个 json的样例模板
        generateJobConfigTemplate(options.reader,options.writer)
        sys.exit(RET_STATE['OK'])
    if len(args) != 1:
        parser.print_help()
        sys.exit(RET_STATE['FAIL'])

    startCommand = buildStartCommand(options, args) //构建job命令
    # print startCommand

    child_process = subprocess.Popen(startCommand, shell=True)  //启动java子进程执行真正的命令(也就是job进程)
    register_signal()
    (stdout, stderr) = child_process.communicate()

    sys.exit(child_process.returncode)

我代码里加了注释,可以看到步骤都很清晰。一共有四个大步骤:

  • 1.打印datax版权信息
  • 2.获取参数解析器解析参数
  • 3.构建启动命令
  • 4.启动java子进程

下面依次展开这个4个流程详细解读。

打印版权信息

这个简单,省略。

获取参数解析器解析参数

def getOptionParser():
    usage = "usage: %prog [options] job-url-or-path"
    parser = OptionParser(usage=usage)

    prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
                                     "Normal user use these options to set jvm parameters, job runtime mode etc. "
                                     "Make sure these options can be used in Product Env.")
    prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",
                                  default=DEFAULT_JVM, help="Set jvm parameters if necessary.")
    prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",
                                  help="Set job unique id when running by Distribute/Local Mode.")
    prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",
                                  action="store", default="standalone",
                                  help="Set job runtime mode such as: standalone, local, distribute. "
                                       "Default mode is standalone.")
    prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",
                                  action="store", dest="params",
                                  help='Set job parameter, eg: the source tableName you want to set it by command, '
                                       'then you can use like this: -p"-DtableName=your-table-name", '
                                       'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".'
                                       'Note: you should config in you job tableName with ${tableName}.')
    prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",
                                  action="store", dest="reader",type="string",
                                  help='View job config[reader] template, eg: mysqlreader,streamreader')
    prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",
                                  action="store", dest="writer",type="string",
                                  help='View job config[writer] template, eg: mysqlwriter,streamwriter')
    parser.add_option_group(prodEnvOptionGroup)

    devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options",
                                    "Developer use these options to trace more details of DataX.")
    devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",
                                 help="Set to remote debug mode.")
    devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",
                                 default="info", help="Set log level such as: debug, info, all etc.")
    parser.add_option_group(devEnvOptionGroup)
    return parser

这里使用python内置的OptionParser来构建parser,通过usage我们知道执行命令的姿势是

datax.py [options] job-url-or-path

options就再接下来的代码中通过add_option来添加。比如我们可以用类似如下的方式执行:

python datax.py -r txtReader -w txtFileWriter

注意到上面的示例并没有指定json文件,因为datax会自动从github拉取对应插件的json的模版给我们,这块的处理正是下面这个代码:

if options.reader is not None and options.writer is not None:
        generateJobConfigTemplate(options.reader,options.writer)

构建启动命令

def buildStartCommand(options, args):
    commandMap = {}
    tempJVMCommand = DEFAULT_JVM
    if options.jvmParameters:
        tempJVMCommand = tempJVMCommand + " " + options.jvmParameters

    if options.remoteDebug:
        tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
        print 'local ip: ', getLocalIp()

    if options.loglevel:
        tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))

    if options.mode:
        commandMap["mode"] = options.mode

    # jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)
    jobResource = args[0]
    if not isUrl(jobResource):
        jobResource = os.path.abspath(jobResource)
        if jobResource.lower().startswith("file://"):
            jobResource = jobResource[len("file://"):]

    jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
    if options.params:
        jobParams = jobParams + " " + options.params

    if options.jobid:
        commandMap["jobid"] = options.jobid

    commandMap["jvm"] = tempJVMCommand
    commandMap["params"] = jobParams
    commandMap["job"] = jobResource

    return Template(ENGINE_COMMAND).substitute(**commandMap)

流程如下:

  1. 处理jvm参数
  2. 处理datax参数(要执行的json,日志等)
  3. 组装java命令

可以通过

print startCommand

打印最终的命令看看,就是一个标准的java命令。类似下面这种:

    # java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=D:\xxxx\github\DataX\target\datax\datax/log -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=D:\xxxx\github\DataX\target\datax\datax/log -Dloglevel=info -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=D:\xxxx\github\DataX\target\datax\datax -Dlogback.configurationFile=D:\xxxx\github\DataX\target\datax\datax/conf/logback.xml -classpath D:\xxxx\github\DataX\target\datax\datax/lib/*  -Dlog.file.name=x\datax\job\job_json com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job D:\xxxx\github\DataX\target\datax\datax\job\job.json

标签:文体,option,py,job,jobResource,add,datax,options
来源: https://blog.csdn.net/pony_maggie/article/details/119484902

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有