1. 需求说明
生产环境中有些数据需要在抽取的时候指定对某个字段进行过滤,判断等等。以将本地文件抽取到HDFS为例,当前我们需要导入的数据有2条,如下:
上面的数据中有uname字段,我们希望增加一个新的字段sex,该字段的值判断如果uname是wangwu,则sex字段的值就为female,否则为male,效果如下:
实现上面的效果需要2步:
- 编写过滤器代码。
- 将过滤器代码写到datax.json中。
2. 编写过滤器代码
导入datax的依赖(这里主要是因为要写日志,另一个是打包功能的配置,根据自己需要来添加依赖)
<dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.12</version> </dependency> </dependencies> <build> <finalName>gtmc-datax-utils-${project.version}</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.7</source> <target>1.7</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build>
编写过滤代码(注意是静态代码块,replaceQuotationMark方法为过滤方法,传递进一个字符串进行替换)
public class StringUtils { private static Logger LOG = LoggerFactory.getLogger(StringUtils.class); /*** * 替换双引号 * @param content * @return * @throws Exception */ public static String replaceQuotationMark(String content) throws Exception { if (null == content || "".equals(content)) { return ""; } try { LOG.info("===" + content); return content.equals("wangwu") ? "female" : "male"; } catch (Exception e) { LOG.error("替换双引号,失败原因:" + e.getMessage()); return ""; } } }
编译打包,将安装包丢到 datax/lib 目录下。
3. 应用过滤器
将插件应用到 datax.json文件中,过滤器的插件配置位置如下:
{
"job": {
"setting": {
……},
"content": [
{
"reader": {
……},
"writer": {
……},
## 过滤器位置
"transformer": [
{
"name": "dx_substr",
"parameter": {
"columnIndex": 5,
"paras": [ "1", "3" ]
}
}
]
}
]
}
}
我们的应用代码如下:
"transformer": [
{
## 这里是固定的
"name": "dx_groovy",
"parameter": {
## 这里定义数据是怎么跟我们的函数关联的
"code": "record.setColumn(4, new StringColumn(StringUtils.replaceQuotationMark(record.getColumn(4).asString())));\nreturn record;",
"extraPackage": [
## 导入函数所在的类,注意如下字符串有双引号,类的全路径后面有分号
"import com.gtmc.datax.utils.StringUtils;"
]
}
}
]