在RestCloud ETL中目前MongoDB的输入节点只能读取集合中的行列数据,而不能对集合中的数据进行聚合运算再传入后续节点,本文介绍如何通过脚本节点实现在MongoDB中对数据进行统计然后再把统计结果传入到mysql,postgresql中。
ETL数据管道如上图所示,在RestCloud 通过WEB界面画一个出来就可以了,主要是前面的MongoDB查询节点的代码实现如下:
package cn.restcloud.etl.rule.ext;import org.apache.commons.lang3.StringUtils;import org.bson.Document;import java.sql.Connection;import cn.restcloud.framework.core.context.*;import cn.restcloud.etl.base.IETLBaseEvent;import cn.restcloud.etl.base.IETLBaseProcessEngine;import cn.restcloud.framework.core.util.*;import cn.restcloud.framework.core.util.db.rdb.*;import java.util.*;import java.util.ArrayList;import java.util.List;import org.bson.BsonDocument;import org.bson.Document;import org.bson.conversions.Bson;import com.mongodb.client.AggregateIterable;import cn.restcloud.etl.base.IETLBaseProcessEngine;import cn.restcloud.framework.core.util.db.mongo.MongoUtil;/**indoc为流数据执行成功必须返回字符1,返回0表示终止流程*/public class ETL_T00002_1EJ0RJSV8LZ implements IETLBaseEvent {public String execute(IETLBaseProcessEngine engine, Document modelNodeDoc, Document indoc,String fieldId,String params) throws Exception { List filters=new ArrayList(); filters.add(BsonDocument.parse("{'$match':{\"actionTime\":{$gte:'2022-06-30 00:00',$lte:'2022-07-30 00:00'},appId:{$eq:'core'}}}")); //数据查询条件,相当于sql where条件 filters.add(BsonDocument.parse("{$group :" + " {" + " _id : {mapUrl:\"$actionMapUrl\",configName:'$actionName',appId:\"$appId\",methodType:'$methodType'}, \r
" + " total : {$sum : 1}," + " avgTime:{$avg:'$runTotalTime'}" + " min:{$min:'$runTotalTime'},"+ " max:{$max:'$runTotalTime'},"+ " }" + "}")); //增加一个聚合查询条件 filters.add(BsonDocument.parse("{'$sort':{total:-1}}"));//指定排序方式 filters.add(BsonDocument.parse("{ $limit : 300 }"));//指定最大返回数 List docs=new ArrayList(); AggregateIterable aggdocs=MongoUtil.getCollection("RC_ApiLog", "P_ActionUrlAccessLog").aggregate(filters); //执行查询 for(Document doc:aggdocs) { //把结果转换为二级结构的数据 Document newdoc=new Document(); newdoc.putAll((Document)doc.get("_id")); newdoc.put("accessTotal", doc.get("total")); newdoc.put("avgTime", doc.get("avgTime")); newdoc.put("min", doc.get("min")); newdoc.put("max",doc.get("max")); docs.add(newdoc); } indoc.put("data", docs); //传到后面的节点中去 return "1";}}
把上述代码贴到MongoDB数据查询节点中即可完成MongoDB的数据统计分析并把结果传入到其他数据库中的目的。
免费下载:RestCloud ETL https://club.restcloud.cn/download
留言与评论(共有 0 条评论) “” |