Apache Nifi使用笔记

基本概念

虽然Nifi支持二进制数据(如视频流),但是我们一般还是拿来做可读数据(类似表)的处理。Nifi将这种数据称为面向记录的数据(record-oriented data),Nifi使用了一种类似XPath/JsonPath的抽象来定位数据,即RecordPath

首先理解一些基础概念:

  • Record: 记录,可以理解为一条数据,对应一个avro Schema(类似json schema);Record是一个抽象的概念,实际使用时需要落地为具体的格式(一般是avro);
  • FlowFile: 流文件,一段数据,包括多个Record,以及一组attribute;
  • Attribute:属性,流文件关联的一组kv值;如record总量、文件名称等;

使用Nifi做ETL,核心的东西都围绕Record来进行。由于Nifi已经发行了很多版本,为了保持兼容性,很多旧的processor是Record的落地实现(可以理解为接口和实现的关系),一般不用关注。

如果不记得processor的用法,可以在画布上右键组件,view usage查看文档。

一般流程

NIFI没有流程管理的概念,登陆进入主界面就是一个画布。每个ETL流程肯定要先建一个ProcessGroup,类似文件夹的概念;可以按着项目>流程名称这两级分组。页面左下角显示了层级,想要进入进程组,就右键点击后选择enter group(或者双击也行).

一般批量ETL流程是:

GetFile->QueryRecord -> UpdateRecord -> PutRecord

非文件类数据ETL,有一些专用的query方法,比如DBMS一般是ExecuteSQLRecord直接获取Record;Mongo有GetMongoRecord;如果后缀不是Record,一般是返回一个Avro流,后面还是可以跟UpdateRecord,Reader选择AvroSet即可。

基于CDC的ETL流程一般是:

CaptureChangeMySQL -> RouteOnAttribute -> UpdateRecord -> PutRecord

除了MySQL基于binlog的CDC,还支持直接启动HTTP服务接收数据(Listen HTTP)。事实上Nifi做CDC功能很残,窗口、水印之类的常用概念一概没有,一般还是用Flink来做。

Query和Put其实没啥好说的,一般选好对应的source和sink就行,关键是transform.

常用的Transform思路

  1. 首先在Query的时候就可以做简单的、初步的变换,即Select xxx as xxx这种,同时可以做一些简单的字符串拼接之类的操作;Query使用Apache Calcite实现,支持ANSI SQL,具体语法可以查询calcite的文档,注意很多Function和MySQL并不一致;
  2. 在UpdateRecord里,通过RecordPath和Expression Language来更新kv值。注意这两个是不能结合使用的,至少到1.14还不可以:
    • RPath本身有一些字符串处理的Function,但是不能做复杂计算、条件判断之类的;
    • EL可以通过${field.value}引用值,完成常见Transform,但是只能引用RecordPath指定的值;
    • 如果想要结合使用,一般需要用两个连续的UpdateRecord。一个简单的例子:需要使用两个字段来做加法,就先用concat把两个字段的值连起来,然后再通过substring取出来做计算;
    • 当然这个例子里,在QueryRecord里先用SQL做计算更好,事实上这玩意能用SQL就用SQL,EL不太好用;
  3. 使用JoltTransformRecord,这是一种DSL,能够完成简单的数据清洗、计算工作;可以参考这里
  4. 使用ScriptedTransformRecord,直接写js/lua/python脚本进行计算,比较灵活,但是性能理论上比其他的要慢一些。不过这个也不一定,对性能敏感的话可以先测试一下再决定用哪个。脚本语言里性能最好的是groovy,当然如果不熟的话用python也是可以的;
  5. 使用非Record类的processor。Nifi里面有针对特定数据格式的其他Transform,比如对Json有jsonPath计算,后者使用updateAttribute配合attributeToJson之类的。不过总体来说,主要使用上面4个processor做数据清洗、转换。

例子

如果源数据是一堆csv文件,没有header,目标是写到MySQL里。

以手头的某项目告警数据抽取到MySQL为例。

源数据是从oracle中抽取的,格式如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
CREATE TABLE [dbo].[baojing] (
  [Chrono] bigint  NOT NULL,
  [LogList] varchar(12) COLLATE Chinese_PRC_CI_AS  NOT NULL,
  [AssocLabel] varchar(255) COLLATE Chinese_PRC_CI_AS  NULL,
  [EvtNumber] bigint  NOT NULL,
  [EvtTitle] varchar(255) COLLATE Chinese_PRC_CI_AS  NOT NULL,
  [Name] varchar(255) COLLATE Chinese_PRC_CI_AS  NOT NULL,
  [Value] float(53)  NOT NULL,
  [ValueT] varchar(255) COLLATE Chinese_PRC_CI_AS  NULL,
  [Quality] smallint  NOT NULL,
  [AlarmLevel] smallint  NULL,
  [AlarmState] smallint  NULL,
  [UserComment] varchar(40) COLLATE Chinese_PRC_CI_AS  NULL,
  [Threshold] float(53)  NULL,
  [NumParam] float(53)  NULL,
  [TextParam] varchar(255) COLLATE Chinese_PRC_CI_AS  NULL,
  [Description] varchar(255) COLLATE Chinese_PRC_CI_AS  NULL,
  [TS] datetime  NULL
)

例子:

132743569098900000,baojing,NULL,1,报警打开尚未确认,TBDD.B15.RQ.O2_LO_WARNING,1,NULL,192,0,3,NULL,NULL,NULL,NULL,燃气舱氧气低报警状态,2021-08-25 09:21:50.000

可以拆分一下方便映射:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
['132743569098900000',
 'baojing',
 'NULL',
 '1',
 '报警打开尚未确认',
 'TBDD.B15.RQ.O2_LO_WARNING',
 '1',
 'NULL',
 '192',
 '0',
 '3',
 'NULL',
 'NULL',
 'NULL',
 'NULL',
 '燃气舱氧气低报警状态',
 '2021-08-25 09:21:50.000']

目标表:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
create table if not exists sync_remote_alarm
(
    id             varchar(100)                         not null comment 'prjId;subPrjId-remoteId'
        primary key,
    project_id     bigint                               not null comment '项目ID',
    sub_project_id bigint                               not null comment '子项目ID',
    type           tinyint                              not null comment '告警类型;字典TYPE_DEVICE_ALARM',
    level          tinyint                              not null comment '告警级别;LEVEL_DEVICE_ALARM',
    content        mediumtext                           not null comment '告警信息',
    is_valid       tinyint(1) default 1                 null comment '告警是否有效;默认值1,有效',
    is_solved      tinyint(1) default 0                 null comment '告警是否被解决;默认值0,未解决',
    is_blocked     tinyint(1) default 0                 null comment '告警信号是否屏蔽;默认值0,不屏蔽',
    start_ts       bigint                               null comment '告警开始时间戳',
    end_ts         bigint                               null comment '告警结束时间戳',
    end_by         varchar(100)                         null comment '如果是手动关闭;传入用户id',
    result         varchar(100)                         not null comment '告警处理结果;处理结果',
    device_names   text                                 null comment '告警时相关所有的设备名称',
    lng            decimal(10, 7)                       null comment '发生告警经度',
    lat            decimal(10, 7)                       null comment '发生告警时纬度',
    threshold      decimal(10, 7)                       null comment '阈值;如果有',
    realtime_value decimal(10, 7)                       null comment '实际值;如果有',
    remote_time    datetime                             null comment '远端时间',
    create_time    datetime   default CURRENT_TIMESTAMP not null comment '创建时间',
    update_time    datetime   default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间'
) comment '远端系统告警';

映射方式:

源字段目标字段计算方式
id源表没有单个的主键,可以用concat( ‘1-0-‘, chrono, ‘-’, name)拼一个出来
project_id写死是1
sub_project_id写死是0,或者没有这个字段也行
type只有氧气(3),温度(1)和液位(24),可以通过Description字符匹配来映射
AlarmLevellevel已有数据里面全部是0
Descriptioncontent
AlarmStateis_solved
UserCommentresult
TSstart_ts需要将北京时间的字符串转成unix时间戳
TSremote_time对方只有一个时间
Namedevice_names
Thresholdthreshold
NumParamrealtime_value

没提到的就是没用上,使用默认值即可。

在Nifi中添加processor:

  1. 首先添加GetFile,配置文件路径;为了方便测试,我们改为ListFile+FetchFile,这样在文件夹里使用touch更新一下文件创建时间,就可以重新触发抽取;
  2. 添加QueryRecordRecordReader选择CSVReaderRecordWriter选择AvroRecordSetWriter
  3. CSVReader里面Schema Access Strategy选择Use 'Schema Text' Property,然后在下面的Schema Text里面填入avro的schema,注意允许为null的,type都要加上null。另外就是需要标记NULL string为NULL
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
{
    "type": "record",
    "name": "HuangshiWarn",
    "fields": [
        {
            "name": "chrono",
            "type": "long"
        },
        {
            "name": "loglist",
            "type": "string"
        },
        {
            "name": "assocLabel",
            "type": ["string", "null"]
        },
        {
            "name": "evtNumber",
            "type": "int"
        },
        {
            "type": "string",
            "name": "title"
        },
        {
            "type": "string",
            "name": "name"
        },
        {
            "type": "float",
            "name": "value"
        },
        {
            "type": ["string","null"],
            "name": "valueT"
        },
        {
            "type": "int",
            "name": "quality"
        },
        {
            "type": ["int","null"],
            "name": "alarmLevel"
        },
        {
            "type": ["int","null"],
            "name": "alarmState"
        },
        {
            "type": ["string", "null"],
            "name": "userComment"
        },
        {
            "type": ["double", "null"],
            "name": "threshold"
        },
        {
            "type": ["double", "null"],
            "name": "numParam"
        },
        {
            "type": ["string", "null"],
            "name": "textParam"
        },
        {
            "type": ["string", "null"],
            "name": "description"
        },
        {
            "type": "string",
            "name": "ts",
            "logicType": "date"
        }
    ]
}
  1. CSVReader的其他配置一般都保持默认即可,如果数据里面有中文,确认一下csv是不是已经转成了utf8字符集;
  2. QueryRecord里面,添加一个自定义property,假设名字叫extract,内容写SQL语句:
1
2
3
4
5
select '1-0-' || cast(chrono as char) || '-' || name as id, '1' as project_id,
  description as type, alarmLevel as "level", description as content, 
	case alarmState when 0 then 0 else 1 end as is_solved,
	userComment as "result", ts as start_ts,
	ts as remote_time, name as device_names, threshold, numParam as realtime_value from FLOWFILE

如果mysql的话,其实直接在这里就可以完成所有ETL,但是实际遇到了几个问题,所以必须要在后面追加一个UpdateRecord

  • ansi sql里面,没有什么很好的方法将string转成unix时间戳;配置\start_ts${field.value:toDate('yyyy-MM-dd HH:mm:ss.SSS'):toNumber()}进行转换;
  • 另外就是calcite默认不支持中文,目前没找到设置的办法,配置\type等于${field.value:contains('氧气'):ifElse(3, ${field.value:contains('温度'):ifElse(1, 24)})}来转换;
  1. 然后添加一个PutRecord,将QueryRecordPutRecord连接起来,关系就是extract
  2. 配置PutRecord属性,填入目标数据库表信息;

最后大概样子:

截屏2021-09-15 23.25.03

右边的LogAttribute是失败的时候在日志里打印出flowFile属性,方便调试。PutRecord测试的时候还是会报错,因为源表没有主键,不过整个链路还是跑通了。

最后依次启动各个组件,或者点击左侧进程组的启动,进行测试。

0%