Datax 插件加载原理
插件类型
Datax有好几种类型的插件,每个插件都有不同的作用。
- reader, 读插件。Reader就是属于这种类型的
- writer, 写插件。Writer就是属于这种类型的
- transformer, 目前还未知
- handler, 主要用于任务执行前的准备工作和完成的收尾工作。
插件类型由PluginType枚举表示
1
2
3
|
public enum PluginType {
READER("reader"), TRANSFORMER("transformer"), WRITER("writer"), HANDLER("handler");
}
|
根据运行类型,又可以分为Job级别的插件和Task级别的插件。uml如下图所示
插件配置读取
ConfigParser首先会读取配置文件,提取需要使用的reader,writer,prehandler 和 posthandler的名称。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
public static Configuration parse(final String jobPath) {
Configuration configuration = ConfigParser.parseJobConfig(jobPath);
// 合并 conf/core.json文件的配置, false 表示不覆盖原有的配置
configuration.merge(
//CoreConstant.DATAX_CONF_PATH的值为conf/core.json
ConfigParser.parseCoreConfig
(CoreConstant.DATAX_CONF_PATH),
false);
// 获取job.content列表的第一个reader
String readerPluginName = configuration.getString(
//CoreConstant.DATAX_JOB_CONTENT_READER_NAME的值为job.content[0].reader.name
CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
// 获取job.content列表的第一个writer
String writerPluginName = configuration.getString(
//CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME的值为job.content[0].writer.name
CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
// 读取job.preHandler.pluginName
String preHandlerName = configuration.getString(
//CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME的值为job.preHandler.pluginName
CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);
// 读取job.postHandler.pluginName
String postHandlerName = configuration.getString(
//CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME的值为job.postHandler.pluginName
CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);
Set<String> pluginList = new HashSet<String>();
pluginList.add(readerPluginName);
pluginList.add(writerPluginName);
......
// 调用parsePluginConfig生成plugin的配置,然后合并
configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
......
return configuration;
}
|
提取完插件名称后,会去reader目录和writer目录,寻找插件的位置。目前Datax只支持reader和writer插件,因为它只从这两个目录中寻找。如果想自己扩展其他类型插件的话,比如handler类型的, 需要修改parsePluginConfig的代码。每个插件目录会有一个重要的配置文件 plugin.json ,它定义了插件的名称和对应的类,在LoadUtils类加载插件的时候会使用到。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
public static Configuration parsePluginConfig(List<String> wantPluginNames) {
Configuration configuration = Configuration.newDefault();
......
// 遍历plugin.reader目录下的文件夹
for (final String each : ConfigParser
.getDirAsList(CoreConstant.DATAX_PLUGIN_READER_HOME)) {
// 调用 parseOnePluginConfig解析单个plugin配置
Configuration eachReaderConfig = ConfigParser.parseOnePluginConfig(each, "reader", replicaCheckPluginSet, wantPluginNames);
if(eachReaderConfig!=null) {
configuration.merge(eachReaderConfig, true);
complete += 1;
}
}
// 遍历plugin.writer目录下的文件夹
for (final String each : ConfigParser
.getDirAsList(CoreConstant.DATAX_PLUGIN_WRITER_HOME)) {
// 调用 parseOnePluginConfig解析单个plugin配置
Configuration eachWriterConfig = ConfigParser.parseOnePluginConfig(each, "writer", replicaCheckPluginSet, wantPluginNames);
if(eachWriterConfig!=null) {
configuration.merge(eachWriterConfig, true);
complete += 1;
}
}
......
return configuration;
}
// 读取plugin目录下的plugin.json 文件
public static Configuration parseOnePluginConfig(final String path, final String type, Set<String> pluginSet, List<String> wantPluginNames) {
String filePath = path + File.separator + "plugin.json";
Configuration configuration = Configuration.from(new File(filePath));
String pluginPath = configuration.getString("path");
String pluginName = configuration.getString("name");
if(!pluginSet.contains(pluginName)) {
pluginSet.add(pluginName);
} else {
......
}
//不是想要的插件,就不生成配置,直接返回
if (wantPluginNames != null && wantPluginNames.size() > 0 && !wantPluginNames.contains(pluginName)) {
return null;
}
// plugin.json的path路径,是指插件的jar包。如果没有指定,则默认为和plugin.json文件在同一个目录下
boolean isDefaultPath = StringUtils.isBlank(pluginPath);
if (isDefaultPath) {
configuration.set("path", path);
}
Configuration result = Configuration.newDefault();
// 最后保存在puligin.{type}.{pluginName}路径下
result.set(
String.format("plugin.%s.%s", type, pluginName),
configuration.getInternal());
return result;
}
|
动态加载插件
插件的加载都是使用ClassLoader动态加载。 为了避免类的冲突,对于每个插件的加载,对应着独立的加载器。加载器由JarLoader实现,插件的加载接口由LoadUtil类负责。当要加载一个插件时,需要实例化一个JarLoader,然后切换thread class loader之后,才加载插件。
JarLoader 类
JarLoader继承URLClassLoader,扩充了可以加载目录的功能。可以从指定的目录下,把传入的路径、及其子路径、以及路径中的jar文件加入到class path。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
public class JarLoader extends URLClassLoader {
public JarLoader(String[] paths) {
this(paths, JarLoader.class.getClassLoader());
}
public JarLoader(String[] paths, ClassLoader parent) {
// 调用getURLS,获取所有的jar包路径
super(getURLs(paths), parent);
}
// 获取所有的jar包
private static URL[] getURLs(String[] paths) {
// 获取包括子目录的所有目录路径
List<String> dirs = new ArrayList<String>();
for (String path : paths) {
dirs.add(path);
// 获取path目录和其子目录的所有目录路径
JarLoader.collectDirs(path, dirs);
}
// 遍历目录,获取jar包的路径
List<URL> urls = new ArrayList<URL>();
for (String path : dirs) {
urls.addAll(doGetURLs(path));
}
return urls.toArray(new URL[0]);
}
// 递归的方式,获取所有目录
private static void collectDirs(String path, List<String> collector) {
// path为空,终止
if (null == path || StringUtils.isBlank(path)) {
return;
}
// path不为目录,终止
File current = new File(path);
if (!current.exists() || !current.isDirectory()) {
return;
}
// 遍历完子文件,终止
for (File child : current.listFiles()) {
if (!child.isDirectory()) {
continue;
}
collector.add(child.getAbsolutePath());
collectDirs(child.getAbsolutePath(), collector);
}
}
private static List<URL> doGetURLs(final String path) {
File jarPath = new File(path);
// 只寻找文件以.jar结尾的文件
FileFilter jarFilter = new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.getName().endsWith(".jar");
}
};
File[] allJars = new File(path).listFiles(jarFilter);
List<URL> jarURLs = new ArrayList<URL>(allJars.length);
for (int i = 0; i < allJars.length; i++) {
try {
jarURLs.add(allJars[i].toURI().toURL());
} catch (Exception e) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_INIT_ERROR,
"系统加载jar包出错", e);
}
}
return jarURLs;
}
}
|
LoadUtil 类
LoadUtil管理着插件的加载器,调用getJarLoader返回插件对应的加载器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public class LoadUtil {
// 加载器的HashMap, Key由插件类型和名称决定, 格式为plugin.{pulginType}.{pluginName}
private static Map<String, JarLoader> jarLoaderCenter = new HashMap<String, JarLoader>();
public static synchronized JarLoader getJarLoader(PluginType pluginType, String pluginName) {
Configuration pluginConf = getPluginConf(pluginType, pluginName);
JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,
pluginName));
if (null == jarLoader) {
// 构建加载器JarLoader
// 获取jar所在的目录
String pluginPath = pluginConf.getString("path");
jarLoader = new JarLoader(new String[]{pluginPath});
//添加到HashMap中
jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),
jarLoader);
}
return jarLoader;
}
private static final String pluginTypeNameFormat = "plugin.%s.%s";
// 生成HashMpa的key值
private static String generatePluginKey(PluginType pluginType,
String pluginName) {
return String.format(pluginTypeNameFormat, pluginType.toString(),
pluginName);
}
|
当获取类加载器,就可以调用LoadUtil来加载插件。LoadUtil提供了 loadJobPlugin 和 loadTaskPlugin 两个接口,加载Job 和 Task 的两种插件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
// 加载Job类型的Plugin
public static AbstractJobPlugin loadJobPlugin(PluginType pluginType, String pluginName) {
// 调用loadPluginClass方法,加载插件对应的class
Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(pluginType, pluginName, ContainerType.Job);
// 实例化Plugin,转换为AbstractJobPlugin
AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz.newInstance();
// 设置Job的配置,路径为plugin.{pluginType}.{pluginName}
jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));
return jobPlugin;
}
// 加载Task类型的Plugin
public static AbstractTaskPlugin loadTaskPlugin(PluginType pluginType, String pluginName) {
// 调用loadPluginClass方法,加载插件对应的class
Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(pluginType, pluginName, ContainerType.Task);
// 实例化Plugin,转换为AbstractTaskPlugin
AbstractTaskPlugin taskPlugin = (AbstracTasktTaskPlugin) clazz.newInstance();
// 设置Task的配置,路径为plugin.{pluginType}.{pluginName}
taskPlugin.setPluginConf(getPluginConf(pluginType, pluginName));
}
// 加载插件类
// pluginType 代表插件类型
// pluginName 代表插件名称
// pluginRunType 代表着运行类型,Job或者Task
private static synchronized Class<? extends AbstractPlugin> loadPluginClass(
PluginType pluginType, String pluginName,
ContainerType pluginRunType) {
// 获取插件配置
Configuration pluginConf = getPluginConf(pluginType, pluginName);
// 获取插件对应的ClassLoader
JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);
try {
// 加载插件的class
return (Class<? extends AbstractPlugin>) jarLoader
.loadClass(pluginConf.getString("class") + "$"
+ pluginRunType.value());
} catch (Exception e) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
}
}
|
切换类加载器
ClassLoaderSwapper类,提供了比较方便的切换接口。
1
2
3
4
5
6
7
8
9
|
// 实例化
ClassLoaderSwapper classLoaderSwapper = ClassLoaderSwapper.newCurrentThreadClassLoaderSwapper();
ClassLoader classLoader1 = new URLClassLoader();
// 切换加载器classLoader1
classLoaderSwapper.setCurrentThreadClassLoader(classLoader1);
Class<? extends MyClass> myClass = classLoader1.loadClass("MyClass");
// 切回加载器
classLoaderSwapper.restoreCurrentThreadClassLoader();
|
ClassLoaderSwapper的源码比较简单, 它有一个属性storeClassLoader, 用于保存着切换之前的ClassLoader。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public final class ClassLoaderSwapper {
// 保存切换之前的加载器
private ClassLoader storeClassLoader = null;
public ClassLoader setCurrentThreadClassLoader(ClassLoader classLoader) {
// 保存切换前的加载器
this.storeClassLoader = Thread.currentThread().getContextClassLoader();
// 切换加载器到classLoader
Thread.currentThread().setContextClassLoader(classLoader);
return this.storeClassLoader;
}
public ClassLoader restoreCurrentThreadClassLoader() {
ClassLoader classLoader = Thread.currentThread()
.getContextClassLoader();
// 切换到原来的加载器
Thread.currentThread().setContextClassLoader(this.storeClassLoader);
// 返回切换之前的类加载器
return classLoader;
}
}
|