DGIOT平台实时展示OPC上报数据全流程代码剖析

[小 迪 导读]:OPC软件作为工业自动化领域应用最广泛的软件,深受工业控制人员的喜爱。但也有许多情况下,OPC软件并不能满足实际的使用需求:
使用场景
1.OPC只在内网运行,希望可以将数据传递至外网,随时随地查看
2.OPC数据难以存库
3.希望可以更好展示数据,进行数据分析

整体交互图

1. dgiot_dtu从kepserver获取数据

在opc与kepserver完成连接之后,dgiot_dtu通过调用GetOpcDaService函数连接kepserver实现数据回调

 public OpcDaService GetOpcDaService(string host, string serviceProgId)        {           var service = hostCollection.Where(a => a.ServiceIds.Contains(serviceProgId) && a.Host == host)                      .FirstOrDefault();            if (service == null)            {                return null;            }            OpcDaService service1 = null;            if (CheckServiceExisted(service, serviceProgId))            {                service1 = opcDaServices.Find(item => { return item.Host == service.Host && item.ServiceId == serviceProgId; });            }            else            {                OpcDaServer daService = new OpcDaServer(serviceProgId, service.Host);                service1 = new OpcDaService()                {                    Host = service.Host,                    ServiceId = serviceProgId,                    Service = daService,                    OpcDaGroupS = new Dictionary()                };                opcDaServices.Add(service1);            }            if (service1.Service.IsConnected == false)            {                try                {                    service1.Service.ConnectionStateChanged += new EventHandler(ConnectionStateChanged);                    service1.Service.Connect();                }                catch (Exception e)                {                    LogHelper.Log("Connect " + service1.Host + ", ServiceId " + service1.ServiceId + "error!!" + e.Message);                }            }            return service1;        }COPY

2. dgiot_dtu数据发布

dgiot_dtu在完成数据收集后,和dgiot平台连接是通过mqtt来进行连接,dgiot_dtu作为设备端进行发布、dgiot平台作为服务端进行订阅

        public void ValueChangedCallBack(OpcDaGroup group, OpcDaItemValue[] values)        {            string groupKey = "";            JsonObject properties = new JsonObject();            values.ToList().ForEach(v =>            {                if (v.Item != null && v.Value != null)                {                    properties.Add(v.Item.ItemId, v.Value);                    groupKey = v.Item.UserData as string;                    OpcDa.setItems(groupKey, v.Item.ItemId, properties);                }            });            string topic = "$dg/thing/" + productId + "/" + devAddr + "/properties/report";            MqttClientHelper.Publish(topic, Encoding.UTF8.GetBytes(properties.ToString()));            return;            }COPY

3.平台通过dlink进行数据点位转换

在dgiot收到订阅信息时,在dink中调用on_message_publish来匹配topic类型进行点位转换

on_message_publish(Message = #message{topic = <<"$dg/thing/", Topic/binary>>, payload = Payload, from = _ClientId, headers = _Headers}, _State) ->    case re:split(Topic, <<"/">>) of        [ProductId, DevAddr, <<"properties">>, <<"report">>] ->            dgiot_dlink_proctol:properties_report(ProductId, DevAddr, get_payload(Payload));        [ProductId, DevAddr, <<"firmware">>, <<"report">>] ->            dgiot_dlink_proctol:firmware_report(ProductId, DevAddr, get_payload(Payload));        _ ->            pass    end,    {ok, Message};COPY

4. 平台通过task将数据存入TD,并使用mqtt上报给物模型

完成点位转换之后,将数据存入TD数据库、同时也将数据上报给物模型展示

save_td(ProductId, DevAddr, Ack, AppData) ->    case length(maps:to_list(Ack)) of        0 ->            #{};        _ ->            NewAck = dgiot_task:get_collection(ProductId, [], Ack, Ack),            NewData = dgiot_task:get_calculated(ProductId, NewAck),            Keys = dgiot_product:get_keys(ProductId),            DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr),            Interval = maps:get(<<"interval">>, AppData, 3),            AllData = merge_cache_data(DeviceId, NewData, Interval),            AllDataKey = maps:keys(AllData),            case Keys -- AllDataKey of                List when length(List) == 0 andalso length(AllDataKey) =/= 0 ->                    ChannelId = dgiot_parse_id:get_channelid(dgiot_utils:to_binary(?BRIDGE_CHL), <<"DGIOTTOPO">>, <<"TOPO组态通道"/utf8>>),                    dgiot_channelx:do_message(ChannelId, {topo_thing, ProductId, DeviceId, AllData}),                    dgiot_tdengine_adapter:save(ProductId, DevAddr, AllData),                    Channel = dgiot_product:get_taskchannel(ProductId),                    dgiot_bridge:send_log(Channel, ProductId, DevAddr, "~s ~p save td => ProductId ~p DevAddr ~p ~ts ", [?FILE, ?LINE, ProductId, DevAddr, unicode:characters_to_list(jsx:encode(AllData))]),                    dgiot_metrics:inc(dgiot_task, <<"task_save">>, 1),                    NotificationTopic = <<"$dg/user/alarm/", ProductId/binary, "/", DeviceId/binary, "/properties/report">>,                    dgiot_mqtt:publish(DeviceId, NotificationTopic, jsx:encode(AllData)),                    AllData;                _ ->                    save_cache_data(DeviceId, AllData),                    AllData            end    end.COPY

5.平台配置物模型并数据展示

[小 迪 点评]

  • 鉴此,dgiot专门提供了基于OPC通讯的OPC接口,以实现OPC数据的简单传输,解决行业痛点。

想了解更多 dgiot 的具体细节,欢迎大家在GitHub上查看相关源代码。

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章