關閉
標題:C# mqttnet 3.0.3 監聽全部
內容:
using Microsoft.SqlServer.Server;
using MQTTnet;
using MQTTnet.Client;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using utility;
namespace Wintec_Listen
{
internal class Program
{
public string VERSION = "0.01";
static FileStream lockFileStream = null;
public static myinclude my = new myinclude();
public static IMqttClient mqttClient = null;
public static ConcurrentDictionary<string, Thread> TASKS = new ConcurrentDictionary<string, Thread>(); // KEY = GROUP_CHANNEL_SN , CUSTOM_CHANNEL_SN
public static Dictionary<string, string> GLOBAL_SETTING = new Dictionary<string, string>();
static void getGlobals()
{
GLOBAL_SETTING["PWD"] = my.pwd();
GLOBAL_SETTING["LOCK_FILE"] = my.pwd() + "\\lock.pid";
GLOBAL_SETTING["MQTT_HOST"] = my.getSystemKey("MQTT_HOST");
GLOBAL_SETTING["MQTT_PORT"] = my.getSystemKey("MQTT_PORT");
GLOBAL_SETTING["MQTT_LOGIN_ID"] = my.getSystemKey("MQTT_LOGIN_ID");
GLOBAL_SETTING["MQTT_LOGIN_PD"] = my.getSystemKey("MQTT_LOGIN_PD");
GLOBAL_SETTING["RESEND_SECONDS"] = my.getSystemKey("RESEND_SECONDS"); //隔多久重發
GLOBAL_SETTING["RESEND_MAX_TIMES"] = my.getSystemKey("RESEND_MAX_TIMES"); //最多發幾次
if (!my.is_file(GLOBAL_SETTING["LOCK_FILE"]))
{
my.touch(GLOBAL_SETTING["LOCK_FILE"]);
}
}
public static void logC(string data) // 只 log
{
Console.WriteLine(data);
}
public static void logF(string data) //會寫檔的
{
logC(data);
my.myLog(data);
}
private static void myCrash(object sender, UnhandledExceptionEventArgs args)
{
//killAllThreads();
System.Environment.Exit(0);
}
static void register_connection()
{
logC("連接MQTT Server: " + GLOBAL_SETTING["MQTT_HOST"] + " ...");
logC("連接MQTT Port: " + GLOBAL_SETTING["MQTT_PORT"] + " ...");
var factory = new MqttFactory();
mqttClient = factory.CreateMqttClient();
mqttClient.UseConnectedHandler(e =>
{
logC("MQTT 連線成功...");
});
mqttClient.UseDisconnectedHandler(async e =>
{
//斷線!?
logC("MQTT 斷線...");
Thread.Sleep(5000);
register_connection();
});
// Create TCP based options using the builder.
var optionsBuilder = new MQTTnet.Client.Options.MqttClientOptionsBuilder().WithTcpServer(GLOBAL_SETTING["MQTT_HOST"], Convert.ToInt32(GLOBAL_SETTING["MQTT_PORT"])); // Port is optional
//.WithCredentials(GLOBAL_SETTING["MQTT_LOGIN_ID"], GLOBAL_SETTING["MQTT_LOGIN_PD"]) //.Build();
if (GLOBAL_SETTING["MQTT_LOGIN_ID"] != "")
{
optionsBuilder.WithCredentials(GLOBAL_SETTING["MQTT_LOGIN_ID"], GLOBAL_SETTING["MQTT_LOGIN_PD"]);
}
var options = optionsBuilder.Build();
mqttClient.ConnectAsync(options);
logC("進入等待5秒...");
Thread.Sleep(5000);
//Make sure connect success ~_~
//監聽所有資料
mqttClient.UseApplicationMessageReceivedHandler(e =>
{
string topic = e.ApplicationMessage.Topic;
string payload = my.b2s(e.ApplicationMessage.Payload).Trim().Replace("\r", "").Replace("\n", "");
//這裡收到的 topic 看有沒有是註冊指令,如果是,就把註冊指令改成已完成
try
{
// 解 payload
var jd = my.json_decode(payload)[0];
// 可能會收到
// {"cmdId":"AA05","cmd":[{"pcode":"WW5H2B0000000284","Vout":[2,1]}]}
if (jd["cmdId"] != null && jd["cmdId"].ToString() == "AA05")
{
logC("Recive Topic: " + topic);
logC("Recive Payload: " + payload);
string CHANNEL_ID = my.explode("/", topic)[1];
logC("CHANNEL_ID: " + CHANNEL_ID);
string SQL = @"
SELECT TOP 1 [S02_STATUS] FROM [CHANNEL] WHERE CHANNEL_ID = @CHANNEL_ID
";
Dictionary<string, string> pa = new Dictionary<string, string>();
pa["CHANNEL_ID"] = CHANNEL_ID;
var ra = my.selectSQL_SAFE(SQL, pa);
if (ra.Rows.Count == 0) return;
var ra_jd = my.json_decode(ra.Rows[0]["S02_STATUS"].ToString())[0];
ra_jd["S02_" + jd["cmd"][0]["Vout"][0].ToString()] = jd["cmd"][0]["Vout"][1].ToString();
//寫回資料庫
var m = new Dictionary<string, string>();
m["S02_STATUS"] = my.json_encode(ra_jd);
var mpa = new Dictionary<string, string>();
mpa["CHANNEL_ID"] = CHANNEL_ID;
my.updateSQL_SAFEretry("CHANNEL", m, "CHANNEL_ID = @CHANNEL_ID", mpa);
}
}
catch (Exception ex)
{
if (topic == "gis/2533557726/EGuWVehfctccQcu")
{
logC("#124 Exception: " + "\r\n" + ex.Message + "\r\n" + ex.StackTrace);
}
}
});
mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build()); //監聽所有排程
}
static void Main(string[] args)
{
AppDomain currentDomain = AppDomain.CurrentDomain;
currentDomain.UnhandledException += new UnhandledExceptionEventHandler(myCrash);
//鎖定檔案
getGlobals(); //取得設定
if (my.is_file_in_use(GLOBAL_SETTING["LOCK_FILE"]))
{
logC("排程執行中...");
//Environment.Exit(0);
System.Environment.Exit(0);
return;
}
lockFileStream = my.lock_file(GLOBAL_SETTING["LOCK_FILE"]);
//程式開始
TASKS["MAIN"] = new Thread(() =>
{
register_connection();
});
TASKS["MAIN"].Start();
while (true)
{
Thread.Sleep(3000);
}
}
}
}