0%

2023-1110-kettle学习

20231110-kettle学习

kettle用repositories

新建一个数据库的仓库,记着要把启动命令中增加 “-Dfile.encoding=UTF-8”,否则打开文件的时候会出现 Invalid byte 1 of 1-byte UTF-8 sequence.

然后新建的转换和作业就保存到数据库中

linux服务器操作

把pdi解压缩后复制到/usr/local/data-integration 目录

设置环境变量

1
export KETTLE_HOME=/usr/local/data-integration

把windows下做好的 repositories.xml 文件复制到/usr/local/data-integration/.kettle

目录下

查看repository:

1
2
3
4
5
./kitchen.sh -listrep
List of repositories:
#1 : birepo [bi-databases] id=KettleDatabaseRepository


查看所有的job

1
2
3
4
./kitchen.sh -rep:birepo  -user:admin -pass:admin   -listjobs
2023/11/10 17:40:59 - Kitchen - Start of run.
2023/11/10 17:40:59 - RepositoriesMeta - Reading repositories XML file: /usr/local/data-integration/.kettle/repositories.xml
zgcw

查看所有的trans

1
2
3
4
5
./pan.sh -rep:birepo  -user:admin -pass:admin   -listtrans
2023/11/10 17:43:09 - Pan - 开始运行.
2023/11/10 17:43:09 - RepositoriesMeta - Reading repositories XML file: /usr/local/data-integration/.kettle/repositories.xml
car_transfer_info
market

执行转换:

1
2
3
4
5
6
7
8
9
10
11
12
13
$ ./pan.sh -rep:birepo  -user:admin -pass:admin   -trans:market
2023/11/10 17:45:35 - Pan - 开始运行.
2023/11/10 17:45:35 - RepositoriesMeta - Reading repositories XML file: /usr/local/data-integration/.kettle/repositories.xml
2023/11/10 17:45:36 - market - 为了转换解除补丁开始 [market]
2023/11/10 17:45:36 - read_from_market.0 - Finished reading query, closing connection
2023/11/10 17:45:36 - read_from_market.0 - 完成处理 (I=443, O=0, R=0, W=443, U=0, E=0)
2023/11/10 17:45:37 - write_to_market.0 - 完成处理 (I=443, O=0, R=443, W=443, U=0, E=0)
2023/11/10 17:45:37 - Pan - 完成!
2023/11/10 17:45:37 - Pan - 开始=2023/11/10 17:45:36.601, 停止=2023/11/10 17:45:37.081
2023/11/10 17:45:37 - Pan - 0 秒后处理结束.
2023/11/10 17:45:37 - market -
2023/11/10 17:45:37 - market - 进程 read_from_market.0 成功结束, 处理了 443 行. ( - 行/秒)
2023/11/10 17:45:37 - market - 进程 write_to_market.0 成功结束, 处理了 443 行. ( - 行/秒)

执行转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ ./pan.sh -rep:birepo  -user:admin -pass:admin   -trans:car_transfer_info
2023/11/10 17:46:18 - Pan - 开始运行.
2023/11/10 17:46:18 - RepositoriesMeta - Reading repositories XML file: /usr/local/data-integration/.kettle/repositories.xml
2023/11/10 17:46:18 - car_transfer_info - 为了转换解除补丁开始 [car_transfer_info]
2023/11/10 17:46:19 - get_lastast_time_of_car_transfer.0 - Finished reading query, closing connection
2023/11/10 17:46:19 - get_lastast_time_of_car_transfer.0 - 完成处理 (I=1, O=0, R=0, W=1, U=0, E=0)
2023/11/10 17:46:19 - read_modifed_record.0 - Finished reading query, closing connection
2023/11/10 17:46:19 - read_modifed_record.0 - 完成处理 (I=17, O=0, R=1, W=17, U=0, E=0)
2023/11/10 17:46:19 - update_car_transfer_info.0 - 完成处理 (I=17, O=15, R=17, W=17, U=2, E=0)
2023/11/10 17:46:19 - Pan - 完成!
2023/11/10 17:46:19 - Pan - 开始=2023/11/10 17:46:18.994, 停止=2023/11/10 17:46:19.542
2023/11/10 17:46:19 - Pan - 0 秒后处理结束.
2023/11/10 17:46:19 - car_transfer_info -
2023/11/10 17:46:19 - car_transfer_info - 进程 get_lastast_time_of_car_transfer.0 成功结束, 处理了 1 行. ( - 行/秒)
2023/11/10 17:46:19 - car_transfer_info - 进程 read_modifed_record.0 成功结束, 处理了 17 行. ( - 行/秒)
2023/11/10 17:46:19 - car_transfer_info - 进程 update_car_transfer_info.0 成功结束, 处理了 17 行. ( - 行/秒)

执行job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
$ ./kitchen.sh -rep:birepo  -user:admin -pass:admin   -job:zgcw
2023/11/10 17:54:09 - Kitchen - Start of run.
2023/11/10 17:54:09 - RepositoriesMeta - Reading repositories XML file: /usr/local/data-integration/.kettle/repositories.xml
2023/11/10 17:54:10 - zgcw - 开始执行任务
2023/11/10 17:54:11 - zgcw - 开始项[market]
2023/11/10 17:54:11 - market - Using run configuration [Pentaho local]
2023/11/10 17:54:11 - market - 为了转换解除补丁开始 [market]
2023/11/10 17:54:11 - read_from_market.0 - Finished reading query, closing connection
2023/11/10 17:54:11 - read_from_market.0 - 完成处理 (I=443, O=0, R=0, W=443, U=0, E=0)
2023/11/10 17:54:11 - write_to_market.0 - 完成处理 (I=443, O=0, R=443, W=443, U=0, E=0)
2023/11/10 17:54:11 - zgcw - 开始项[car_transfer_info]
2023/11/10 17:54:11 - car_transfer_info - Using run configuration [Pentaho local]
2023/11/10 17:54:11 - car_transfer_info - 为了转换解除补丁开始 [car_transfer_info]
2023/11/10 17:54:11 - get_lastast_time_of_car_transfer.0 - Finished reading query, closing connection
2023/11/10 17:54:11 - get_lastast_time_of_car_transfer.0 - 完成处理 (I=1, O=0, R=0, W=1, U=0, E=0)
2023/11/10 17:54:12 - read_modifed_record.0 - Finished reading query, closing connection
2023/11/10 17:54:12 - read_modifed_record.0 - 完成处理 (I=10, O=0, R=1, W=10, U=0, E=0)
2023/11/10 17:54:12 - update_car_transfer_info.0 - 完成处理 (I=10, O=9, R=10, W=10, U=1, E=0)
2023/11/10 17:54:12 - zgcw - 开始项[成功]
2023/11/10 17:54:12 - zgcw - 完成作业项[成功] (结果=[true])
2023/11/10 17:54:12 - zgcw - 完成作业项[car_transfer_info] (结果=[true])
2023/11/10 17:54:12 - zgcw - 完成作业项[market] (结果=[true])
2023/11/10 17:54:12 - zgcw - 任务执行完毕
2023/11/10 17:54:12 - Kitchen - Finished!
2023/11/10 17:54:12 - Kitchen - Start=2023/11/10 17:54:10.967, Stop=2023/11/10 17:54:12.271
2023/11/10 17:54:12 - Kitchen - Processing ended after 1 seconds.

执行job带参数

1
2
./kitchen.sh -rep:erp_kettle  -user:admin -pass:admin -parameter:start_date=2023-12-1 -job:guanbao

做第一个市场每日交易统计

新建表 transfer_daily_report

1
2
3
4
5
6
7
8
9
10
CREATE TABLE `transfer_daily_report` (
`market_id` int(10) DEFAULT NULL COMMENT '市场编号',
`day` DATE DEFAULT NULL COMMENT '天',
`vehicle_type` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '车辆类型',
`amount` varchar(30) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '金额',
`cnt` int DEFAULT NULL COMMENT '数量',
`market` varchar(200) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '市场名称',
`province` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '省',
KEY `p` (`market_id`,`day`,`vehicle_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='日统计(每个市场、每天、分类型统计)';

新建重构作业流程

第一步先清库,新建执行sql的任务

1
delete  from transfer_daily_report;

新建一个获取全部交易执行的语句

1
2
3
4
5
6
7
8
SELECT date(  handle_time) as day,
SUM(vehicle_price) as amount , count(*) as cnt,
i.market_id,m.name as market, m.province_name as province,
i.vehicle_type
FROM zg_vehicle_transfer_info i inner join market m on m.id = i.market_id
where i.handle_time is not null
GROUP BY i.market_id,day ,i.vehicle_type;

新增一个生成车辆分类的脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//Script here
var vehicle_category= ""
if(vehicle_type.indexOf("客车")>=0){
vehicle_category="客车"
}
else if(vehicle_type.indexOf("轿车")>=0){
vehicle_category="客车"
}
else if(vehicle_type.indexOf("小型汽")>=0){
vehicle_category="客车"
}
else if(vehicle_type.indexOf("小型轿")>=0){
vehicle_category="客车"
}
else if(vehicle_type.indexOf("面包车")>=0){
vehicle_category="客车"
}
else if(vehicle_type.indexOf("小型越野")>=0){
vehicle_category="客车"
}

else if(vehicle_type.indexOf("货车")>=0){
vehicle_category="货车"
}
else if(vehicle_type.indexOf("载货")>=0){
vehicle_category="货车"
}
else if(vehicle_type.indexOf("重型")>=0){
vehicle_category="货车"
}
else if(vehicle_type.indexOf("货运")>=0){
vehicle_category="货车"
}
else if(vehicle_type.indexOf("摩托车")>=0){
vehicle_category="摩托车"
}
else if(vehicle_type.indexOf("二轮")>=0){
vehicle_category="摩托车"
}

最后添加一个表输出的节点。

新建日更新的作业

新增一个获取最近10天交易的sql

1
2
3
4
5
6
7
SELECT date(handle_time) as day,
SUM(vehicle_price) as amount , count(*) as cnt,
i.market_id,m.name as market, m.province_name as province,
i.vehicle_type
FROM zg_vehicle_transfer_info i inner join market m on m.id = i.market_id
where i.handle_time is not null and i.handle_time > date_sub( now(),interval 10 day)
GROUP BY i.market_id,day ,i.vehicle_type;

添加生成车辆类别的脚本(同上)

最后增加一个插入或者更新表的节点

这里要根据市场、日期、车辆类型来更新。