使用ETL实现MongoDB聚合查询数据并传输到mysql,postgresql中

在RestCloud ETL中目前MongoDB的输入节点只能读取集合中的行列数据,而不能对集合中的数据进行聚合运算再传入后续节点,本文介绍如何通过脚本节点实现在MongoDB中对数据进行统计然后再把统计结果传入到mysql,postgresql中。




ETL数据管道如上图所示,在RestCloud 通过WEB界面画一个出来就可以了,主要是前面的MongoDB查询节点的代码实现如下:

package cn.restcloud.etl.rule.ext;import org.apache.commons.lang3.StringUtils;import org.bson.Document;import java.sql.Connection;import cn.restcloud.framework.core.context.*;import cn.restcloud.etl.base.IETLBaseEvent;import cn.restcloud.etl.base.IETLBaseProcessEngine;import cn.restcloud.framework.core.util.*;import cn.restcloud.framework.core.util.db.rdb.*;import java.util.*;import java.util.ArrayList;import java.util.List;import org.bson.BsonDocument;import org.bson.Document;import org.bson.conversions.Bson;import com.mongodb.client.AggregateIterable;import cn.restcloud.etl.base.IETLBaseProcessEngine;import cn.restcloud.framework.core.util.db.mongo.MongoUtil;/**indoc为流数据执行成功必须返回字符1,返回0表示终止流程*/public class ETL_T00002_1EJ0RJSV8LZ implements IETLBaseEvent {public String execute(IETLBaseProcessEngine engine, Document modelNodeDoc, Document indoc,String fieldId,String params) throws Exception {        List filters=new ArrayList();        filters.add(BsonDocument.parse("{'$match':{\"actionTime\":{$gte:'2022-06-30 00:00',$lte:'2022-07-30 00:00'},appId:{$eq:'core'}}}")); //数据查询条件,相当于sql where条件        filters.add(BsonDocument.parse("{$group :" +         "  {" +         "      _id : {mapUrl:\"$actionMapUrl\",configName:'$actionName',appId:\"$appId\",methodType:'$methodType'}, \r
" +         "      total : {$sum : 1}," +         "      avgTime:{$avg:'$runTotalTime'}" +         "   min:{$min:'$runTotalTime'},"+        "   max:{$max:'$runTotalTime'},"+        "  }" +         "}")); //增加一个聚合查询条件                filters.add(BsonDocument.parse("{'$sort':{total:-1}}"));//指定排序方式        filters.add(BsonDocument.parse("{ $limit : 300 }"));//指定最大返回数                List docs=new ArrayList();        AggregateIterable aggdocs=MongoUtil.getCollection("RC_ApiLog", "P_ActionUrlAccessLog").aggregate(filters); //执行查询        for(Document doc:aggdocs) {        //把结果转换为二级结构的数据        Document newdoc=new Document();        newdoc.putAll((Document)doc.get("_id"));        newdoc.put("accessTotal", doc.get("total"));        newdoc.put("avgTime", doc.get("avgTime"));        newdoc.put("min", doc.get("min"));        newdoc.put("max",doc.get("max"));        docs.add(newdoc);        }                indoc.put("data", docs); //传到后面的节点中去                return "1";}}

把上述代码贴到MongoDB数据查询节点中即可完成MongoDB的数据统计分析并把结果传入到其他数据库中的目的。

免费下载:RestCloud ETL https://club.restcloud.cn/download

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章