Datax 任务执行流程

Datax 任务执行流程

加载配置

Datax启动是从Engine类开始的。Engine会读取配置文件,并且初始化和运行JobContainer。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public class Engine {
	public static void entry(final String[] args) throws Throwable {
    	// .....
        Configuration configuration = ConfigParser.parse(jobPath);
        Engine engine = new Engine();
        engine.start(configuration);
	}
    
    public void start(Configuration allConf) {
        // ......
        container = new JobContainer(allConf);
        container.start();
    }
}

JobContainer原理

JobContaier的源码涉及到插件的动态加载,可以参考此篇博客 {% post_link datax-plugin %} 。

首先看看JobContainer的start方法,它将任务的执行分为多个阶段。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
public class JobContainer extends AbstractContainer {
    public void start() {
        LOG.debug("jobContainer starts to do preHandle ...");
        this.preHandle();
        LOG.debug("jobContainer starts to do init ...");
        this.init();
        LOG.info("jobContainer starts to do prepare ...");
        this.prepare();
        LOG.info("jobContainer starts to do split ...");
        this.totalStage = this.split();
        LOG.info("jobContainer starts to do schedule ...");
        this.schedule();
        LOG.debug("jobContainer starts to do post ...");
        this.post();
        LOG.debug("jobContainer starts to do postHandle ...");
        this.postHandle();
        LOG.info("DataX jobId [{}] completed successfully.", this.jobId);
    }
}
        

加载 和运行 Handler的初始化函数

这里会根据配置,加载指定的Handler类,并且执行它的preHandler回调函数。相关的配置如下:

  • job.preHandler.pluginType, 插件类型
  • job.preHandler.pluginName, 插件名称
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void preHandle() {
    // 获取preHandler的类型,由job.preHandler.pluginType指定
    String handlerPluginTypeStr = this.configuration.getString(
            CoreConstant.DATAX_JOB_PREHANDLER_PLUGINTYPE);
    // 实例化PluginType
    PluginType handlerPluginType = PluginType.valueOf(handlerPluginTypeStr.toUpperCase());
	// 获取preHandler的名称
    String handlerPluginName = this.configuration.getString(
            CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);
    // 切换类加载器
    classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
            handlerPluginType, handlerPluginName));
	// 加载 Handler
    AbstractJobPlugin handler = LoadUtil.loadJobPlugin(
            handlerPluginType, handlerPluginName);
	// 初始化handler的jobPluginCollector, 通过它可以知道任务的详情
    JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
            this.getContainerCommunicator());
    handler.setJobPluginCollector(jobPluginCollector);

    //调用handler的preHandler
    handler.preHandler(configuration);
    // 切回类加载器
    classLoaderSwapper.restoreCurrentThreadClassLoader();
}

加载Reader和Writer

init方法会将加载配置中指定的Reader和Writer,来完成数据的读取和写入。Reader的初始化和Writer相同,这里以Reader为例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void init() {
    JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(this.getContainerCommunicator());
    //必须先Reader ,后Writer
    this.jobReader = this.initJobReader(jobPluginCollector);
    this.jobWriter = this.initJobWriter(jobPluginCollector);
}

// 加载Reader
private Reader.Job initJobReader(JobPluginCollector jobPluginCollector) {
    // 读取 Reader的名称
    this.readerPluginName = this.configuration.getString(
        CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
    // 切换类加载器
    classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
        PluginType.READER, this.readerPluginName));
    // 加载Reader插件
    Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
        PluginType.READER, this.readerPluginName);
    // 更新Reader的配置
    jobReader.setPluginJobConf(this.configuration.getConfiguration(
        CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
    jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
        CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));
    // 初始化Reader的jobPluginCollector
    jobReader.setJobPluginCollector(jobPluginCollector);
    // 调用Reader的init回调函数
    jobReader.init();
    // 切回类加载器
    classLoaderSwapper.restoreCurrentThreadClassLoader();
    return jobReader;
}
    

Reader和Writer的初始化回调函数

prepare方法会执行Reader和Writer的prepare回调函数。Reader和Writer相同,下面以Reader为例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
private void prepare() {
	this.prepareJobReader();
    this.prepareJobWriter();
}
    
private void prepareJobReader() {
    // 切换类加载器
    classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
            PluginType.READER, this.readerPluginName));
    LOG.info(String.format("DataX Reader.Job [%s] do prepare work .",
            this.readerPluginName));
    // 调用jobReader的prepare方法
    this.jobReader.prepare();
    // 切回类加载器
    classLoaderSwapper.restoreCurrentThreadClassLoader();
}

切分任务

split方法会根据channel的数目,将整个job任务,划分成多个小的task。具体原理参见这篇博客 {% post_link datax-job %} 。

分配和执行任务

schedule方法会将task发送给各个Channel执行。具体原理参见这篇博客 {% post_link datax-job %} 。

调用Reader和Writer的完成回调函数

post会执行Reader和Writer的post回调函数。Reader和Writer相同,下面以Reader为例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
private void post() {
    this.postJobWriter();
    this.postJobReader();
}

private void postJobReader() {
	// 切换类加载器
	classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
		PluginType.READER, this.readerPluginName));
    LOG.info("DataX Reader.Job [{}] do post work.", this.readerPluginName);
    // 调用jobReader的post方法
    this.jobReader.post();
    // 切回类加载器
    classLoaderSwapper.restoreCurrentThreadClassLoader();
}

加载 和运行 Handler的完成函数

这里会根据配置,加载指定的Handler类,并且执行它的postHandler回调函数。相关的配置如下:

  • job.postHandler.pluginType, 插件类型
  • job.postHandler.pluginName, 插件名称
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void postHandle() {
    // 获取插件类型
	String handlerPluginTypeStr = this.configuration.getString(
		CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINTYPE);
    PluginType handlerPluginType = PluginType.valueOf(handlerPluginTypeStr.toUpperCase());
    
    // 获取插件名称
    String handlerPluginName = this.configuration.getString(
        CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);
    // 切换类加载器
    classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
        handlerPluginType, handlerPluginName));
    // 加载插件
    AbstractJobPlugin handler = LoadUtil.loadJobPlugin(
        handlerPluginType, handlerPluginName);
    
    // 配置jobPluginCollector, 通过它可以得到任务执行的详情
    JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
        this.getContainerCommunicator());
    handler.setJobPluginCollector(jobPluginCollector);
    
    // 调用handler的postHandler函数
    handler.postHandler(configuration);
    // 切回类加载器
    classLoaderSwapper.restoreCurrentThreadClassLoader();

扩展 handler 插件

如果有个需求,需要将任务的完成情况,记录下来。这个时候需要自定义handler。

修改插件加载

首先需要修改读取插件配置的源码,因为从ConfigParser的源码可以看到,datax已经从配置文件中,提取了posthandler的插件名称,但是在寻找插件的时候,只是从reader和writer目录下去寻找,所以需要增加从handler目录的寻找。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
public class CoreConstant {
	public static String DATAX_PLUGIN_HANDLER_HOME = StringUtils.join(
			new String[] { DATAX_HOME, "plugin", "handler" }, File.separator);
}

public final class ConfigParser {
    public static Configuration parsePluginConfig(List<String> wantPluginNames) {
        .......
         for(final String each : ConfigParser
             	.getDirAsList(CoreConstant.DATAX_PLUGIN_HANDLER_HOME)) {
             Configuration eachHandlerConfig = ConfigParser.parseOnePluginConfig(each, 
                  		"handler", replicaCheckPluginSet, wantPluginNames);
             if (eachHandlerConfig != null) {
                 configuration.merge(eachHandlerConfig, true);
                 complete += 1;
             }
         }
    }
}
     

自定义Handler类

因为postHandler是属于Job运行类型的插件,所有必须在里面新建一个名称为Job的类,这个Job类必须继承AbstractJobPlugin。下面就是自定义插件的JobResultCollector的基本雏形

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class JobResultCollector {

    private static final Logger LOG = LoggerFactory.getLogger(Collector.class);

    public static class Job extends AbstractJobPlugin {

        @Override
        public void init() {
            LOG.info("plugin Collector start init");
        }

        @Override
        public void destroy() {
            LOG.info("plugin Collector destroy");
        }

        @Override
        public void postHandler(Configuration jobConfiguration) {
            LOG.info("plugin Collector post handler");
        }
    }
}

增加结果数据读取功能

因为这个插件需要记录任务完成结果,所以需要了解下怎么获取统计数据。关于统计数据的原理,可以参见此篇文章

因为自定义Handler属于Job运行类型的插件,所以需要继承AbstractJobPlugin。AbstractJobPlugin有一个重要属性jobPluginCollector,通过它可以获得需要的数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public abstract class AbstractJobPlugin extends AbstractPlugin {
    
    private JobPluginCollector jobPluginCollector;

    public JobPluginCollector getJobPluginCollector() {
        return jobPluginCollector;
    }
    
    public void setJobPluginCollector(
            JobPluginCollector jobPluginCollector) {
		this.jobPluginCollector = jobPluginCollector;
	}   
}

通过加载posthandler的源码中,可以看到jobPluginCollector变量的初始化,实质上是DefaultJobPluginCollector的实例。并且每个插件都有着独立的JobPluginCollector, 但这些JobPluginCollector都共享者同一个AbstractContainerCommunicator

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public class JobContainer extends AbstractContainer {

	private void postHandle() {
        ........
        // 加载和实例化handler
        AbstractJobPlugin handler = LoadUtil.loadJobPlugin(
                handlerPluginType, handlerPluginName);
		// 实例化DefaultJobPluginCollector
        JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
                this.getContainerCommunicator());
        // 设置handler的jobPluginCollector
        handler.setJobPluginCollector(jobPluginCollector);
        ........
        handler.postHandler(configuration);
}

接下来看看DefaultJobPluginCollector的源码,可以看到它只提供了关于返回消息的方法,却没有提供别的数据的访问。感觉像是阿里这边的源码并没有开放完全。所以如果想访问到其他的数据,就需要修改源码了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public final class DefaultJobPluginCollector implements JobPluginCollector {
    private AbstractContainerCommunicator jobCollector;

    public DefaultJobPluginCollector(AbstractContainerCommunicator containerCollector) {
        this.jobCollector = containerCollector;
    }

    @Override
    public Map<String, List<String>> getMessage() {
        Communication totalCommunication = this.jobCollector.collect();
        return totalCommunication.getMessage();
    }

    @Override
    public List<String> getMessage(String key) {
        Communication totalCommunication = this.jobCollector.collect();
        return totalCommunication.getMessage(key);
    }
}

原本我是想增加一个方法返回Communication。这样根据返回的Communication,可以查看里面任何的数据。因为DefaultJobPluginCollector实现JobPluginCollector接口,所以也需要加上。 代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public final class DefaultJobPluginCollector implements JobPluginCollector {
    
    public Communication getCommunication() {
        return this.jobCollector.collect();
    }
}

public interface JobPluginCollector extends PluginCollector {
    Communication getCommunication();
}

但是行不通,因为DefaultJobPluginCollector是属于datax-core模块的类,JobPluginCollector属于datax-common模块的类。datax-core是依赖于datax-common的。而Communication是属于datax-core模块的类,如果在JobPluginCollector引用Communication,就会产生循环依赖。

所以增加方法必须考虑依赖关系,因为Communication的数据类型比较简单,这样接口只需要返回简单的数据类型即可

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public final class DefaultJobPluginCollector implements JobPluginCollector {
    
    @Override
    public Map<String, Number> getCounter() {
        return this.jobCollector.collect().getCounter();
    }
}

public interface JobPluginCollector extends PluginCollector {
    Map<String, Number> getCounter();
}

现在解决了读取数据的需求了,接下来实现JobResultCollector的postHandler方法。可以结合CommunicationTool里面的变量,获取对应的数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Override
public void postHandler(Configuration jobConfiguration) {
    LOG.info("plugin Collector post handler");
    Map<String, Number> counter = getJobPluginCollector().getCounter();
    long successReadRecords = getLongCounter(counter, CommunicationTool.READ_SUCCEED_RECORDS);
    long failedReadRecords = getLongCounter(counter, CommunicationTool.READ_FAILED_RECORDS);
    long totalReadRecords = successReadRecords - failedReadRecords;
    LOG.info("total records : " + totalReadRecords);
}

private long getLongCounter(Map<String, Number> counter, String key) {
    Number value = counter.get(key);
    return value == null ? 0 : value.longValue();
}
updatedupdated2023-07-022023-07-02