訓練家的快寫筆記

The legend of trainer's paper


搜尋:

     關閉     
標題:C# mqttnet 3.0.3 監聽全部
內容:

using Microsoft.SqlServer.Server;
using MQTTnet;
using MQTTnet.Client;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using utility;

namespace Wintec_Listen
{
    internal class Program
    {
        public string VERSION = "0.01";
        static FileStream lockFileStream = null;
        public static myinclude my = new myinclude();
        public static IMqttClient mqttClient = null;
        public static ConcurrentDictionary<string, Thread> TASKS = new ConcurrentDictionary<string, Thread>(); // KEY = GROUP_CHANNEL_SN , CUSTOM_CHANNEL_SN
        public static Dictionary<string, string> GLOBAL_SETTING = new Dictionary<string, string>();
        static void getGlobals()
        {
            GLOBAL_SETTING["PWD"] = my.pwd();
            GLOBAL_SETTING["LOCK_FILE"] = my.pwd() + "\\lock.pid";
            GLOBAL_SETTING["MQTT_HOST"] = my.getSystemKey("MQTT_HOST");
            GLOBAL_SETTING["MQTT_PORT"] = my.getSystemKey("MQTT_PORT");
            GLOBAL_SETTING["MQTT_LOGIN_ID"] = my.getSystemKey("MQTT_LOGIN_ID");
            GLOBAL_SETTING["MQTT_LOGIN_PD"] = my.getSystemKey("MQTT_LOGIN_PD");
            GLOBAL_SETTING["RESEND_SECONDS"] = my.getSystemKey("RESEND_SECONDS"); //隔多久重發
            GLOBAL_SETTING["RESEND_MAX_TIMES"] = my.getSystemKey("RESEND_MAX_TIMES"); //最多發幾次
            if (!my.is_file(GLOBAL_SETTING["LOCK_FILE"]))
            {
                my.touch(GLOBAL_SETTING["LOCK_FILE"]);
            }
        }
        public static void logC(string data) // 只 log 
        {
            Console.WriteLine(data);
        }
        public static void logF(string data) //會寫檔的
        {
            logC(data);
            my.myLog(data);
        }
        private static void myCrash(object sender, UnhandledExceptionEventArgs args)
        {
            //killAllThreads();
            System.Environment.Exit(0);
        }
        static void register_connection()
        {

            logC("連接MQTT Server: " + GLOBAL_SETTING["MQTT_HOST"] + " ...");
            logC("連接MQTT Port: " + GLOBAL_SETTING["MQTT_PORT"] + " ...");
            var factory = new MqttFactory();
            mqttClient = factory.CreateMqttClient();

            mqttClient.UseConnectedHandler(e =>
            {
                logC("MQTT 連線成功...");
            });
            mqttClient.UseDisconnectedHandler(async e =>
            {
                //斷線!?
                logC("MQTT 斷線...");
                Thread.Sleep(5000);
                register_connection();
            });

            // Create TCP based options using the builder.
            var optionsBuilder = new MQTTnet.Client.Options.MqttClientOptionsBuilder().WithTcpServer(GLOBAL_SETTING["MQTT_HOST"], Convert.ToInt32(GLOBAL_SETTING["MQTT_PORT"])); // Port is optional              
            //.WithCredentials(GLOBAL_SETTING["MQTT_LOGIN_ID"], GLOBAL_SETTING["MQTT_LOGIN_PD"])                                                                                                                                                                                //.Build();
            if (GLOBAL_SETTING["MQTT_LOGIN_ID"] != "")
            {
                optionsBuilder.WithCredentials(GLOBAL_SETTING["MQTT_LOGIN_ID"], GLOBAL_SETTING["MQTT_LOGIN_PD"]);
            }
            var options = optionsBuilder.Build();
            mqttClient.ConnectAsync(options);
            logC("進入等待5秒...");
            Thread.Sleep(5000);
            //Make sure connect success ~_~
            //監聽所有資料

            mqttClient.UseApplicationMessageReceivedHandler(e =>
            {
                string topic = e.ApplicationMessage.Topic;
                string payload = my.b2s(e.ApplicationMessage.Payload).Trim().Replace("\r", "").Replace("\n", "");

                //這裡收到的 topic 看有沒有是註冊指令,如果是,就把註冊指令改成已完成                
                try
                {
                    // 解 payload
                    var jd = my.json_decode(payload)[0];
                    // 可能會收到
                    // {"cmdId":"AA05","cmd":[{"pcode":"WW5H2B0000000284","Vout":[2,1]}]}
                    if (jd["cmdId"] != null && jd["cmdId"].ToString() == "AA05")
                    {
                        logC("Recive Topic: " + topic);
                        logC("Recive Payload: " + payload);
                        string CHANNEL_ID = my.explode("/", topic)[1];
                        logC("CHANNEL_ID: " + CHANNEL_ID);
                        string SQL = @"
                            SELECT TOP 1 [S02_STATUS] FROM [CHANNEL] WHERE CHANNEL_ID = @CHANNEL_ID
                        ";
                        Dictionary<string, string> pa = new Dictionary<string, string>();
                        pa["CHANNEL_ID"] = CHANNEL_ID;
                        var ra = my.selectSQL_SAFE(SQL, pa);
                        if (ra.Rows.Count == 0) return;
                        var ra_jd = my.json_decode(ra.Rows[0]["S02_STATUS"].ToString())[0];
                        ra_jd["S02_" + jd["cmd"][0]["Vout"][0].ToString()] = jd["cmd"][0]["Vout"][1].ToString();
                        //寫回資料庫
                        var m = new Dictionary<string, string>();
                        m["S02_STATUS"] = my.json_encode(ra_jd);
                        var mpa = new Dictionary<string, string>();
                        mpa["CHANNEL_ID"] = CHANNEL_ID;
                        my.updateSQL_SAFEretry("CHANNEL", m, "CHANNEL_ID = @CHANNEL_ID", mpa);

                    }
                }
                catch (Exception ex)
                {
                    if (topic == "gis/2533557726/EGuWVehfctccQcu")
                    {
                        logC("#124 Exception: " + "\r\n" + ex.Message + "\r\n" + ex.StackTrace);
                    }
                }
            });
            mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build()); //監聽所有排程
        }

        static void Main(string[] args)
        {
            AppDomain currentDomain = AppDomain.CurrentDomain;
            currentDomain.UnhandledException += new UnhandledExceptionEventHandler(myCrash);

            //鎖定檔案
            getGlobals(); //取得設定
            if (my.is_file_in_use(GLOBAL_SETTING["LOCK_FILE"]))
            {
                logC("排程執行中...");
                //Environment.Exit(0);
                System.Environment.Exit(0);
                return;
            }
            lockFileStream = my.lock_file(GLOBAL_SETTING["LOCK_FILE"]);
            //程式開始
            TASKS["MAIN"] = new Thread(() =>
            {
                register_connection();
            });
            TASKS["MAIN"].Start();
            while (true)
            {
                Thread.Sleep(3000);
            }
        }
    }
}