MQTT基础概念:

MQTT是一种通信架构;
MQTT是一个一对多的通信架构,一个服务端(Broker)可以有多个客户端(Client);

MQTT中有三个概念:

  1. 主题(Topic)
  2. 发布(Publish)
  3. 订阅(Subscribe)

主题是一种寻址方式,类似于Window的文件夹多级目录
Topic

例如 factory/line1/machine3/temperature 等等,不同的Topic指向不同的内容以及类别;

客户端Client可以即作为发布的来源也可以订阅发布的内容;
Client

而Broker作为信息的中转站,接收发布的信息并推送给订阅的用户端;

Broker

发布者无需担心是否将信息发送给了订阅者,准确来说发布者与订阅者之间不是直连的!

LuatOS中MQTT的API

常量:

const

订阅主题:

subscribe

1
2
3
4
-- 订阅单个topic, 且qos=0mqttc:subscribe("/luatos/123456", 0)
-- 订阅单个topic, 且qos=1mqttc:subscribe("/luatos/12345678", 1)
-- 订阅多个topic, 且使用不同的qos -> { , }区分不同topic
mqttc:subscribe({["/luatos/1234567"]=1,["/luatos/12345678"]=2})

创建Client:

Create Client

1
2
3
4
5
6
7
8
9
10
11
12
-- 普通TCP链接
mqttc = mqtt.create(nil,"120.55.137.106", 1884)
-- 普通TCP链接,mqtt接收缓冲区4096
mqttc = mqtt.create(nil,"120.55.137.106", 1884, nil, {rxSize = 4096})
-- 加密TCP链接,不验证服务器证书
mqttc = mqtt.create(nil,"120.55.137.106", 8883, true)
-- 加密TCPTCP链接,单服务器证书验证
mqttc = mqtt.create(nil,"120.55.137.106", 8883, {server_cert=io.readFile("/luadb/ca.crt")})
-- 加密TCPTCP链接,单服务器证书验证, 但可选认证
mqttc = mqtt.create(nil,"120.55.137.106", 8883, {server_cert=io.readFile("/luadb/ca.crt"), verify=1})
-- 加密TCPTCP链接,双向证书验证
mqttc = mqtt.create(nil,"120.55.137.106", 8883, {server_cert=io.readFile("/luadb/ca.crt"),client_cert=io.readFile("/luadb/client.pem"),client_key=io.readFile("/luadb/client.key"),client_password="123456",})

上述MQTT的通信协议为TCP,官方文档说不支持Websocket!

设置登录Client的信息:

auth

MQTT服务器连接:

connect

MQTT发布:

publish

1
2
-- 				  topic         data
mqttc:publish("/luatos/123456", "123")

检测状态:

Check State

Lua语言的整体框架


在Lua中,通过调用 require("sys") 定义一个实例sys,可以通过这个实例来创建系统任务:
example
可以同时创建多个任务,每个任务之间并行运行,且运行都限制在自己的上下文中,保障不同任务之间不会相互干扰。如果想要让不同的任务之间有先后执行的逻辑可以通过

1
2
3
sys.publish("net_ready", device_IMEI)
sys.waitUntil("net_ready")

这两句代码实现。一个用于发布一条事件,另一个等待该事件的完成,由此得到一个执行的先后顺序;

MQTT+Lua语言示例代码:

1
2
3
-- LuaTools需要PROJECT和VERSION这两个信息
PROJECT = "mqttdemo"
VERSION = "1.0.0"

.lua文件的开头必须要PROJECT的名称以及VERSION的版本;

1
2
3
4
5
6
7
8
9
10
11
12
13
--根据自己的服务器修改以下参数
local mqtt_host = "lbsmqtt.airm2m.com"
local mqtt_port = 1884
local mqtt_isssl = false
local ca_file = false

--必须跟服务器的配置一样
local client_id = "mqttx_b55c41b7"
local user_name = "user"
local password = "password"

local pub_topic = "/luatos/pub/123"-- .. (mcu.unique_id():toHex())
local sub_topic = "/luatos/sub/123"-- .. (mcu.unique_id():toHex())

用户的参数配置:
host以及port是自己服务器的参数,isssl表示是否加密连接;

1
2
3
-- 结尾总是这一句
sys.run()
-- sys.run()之后后面不要加任何语句!!!!!

结束的语句,启动sys并且循环运行任务列表里面的任务。

整体的流程:

  1. 设置两个系统内部任务,任务一为联网,4G模块提供SIM卡成功联网并且返回设备的IMEI号;
  2. 设置第二个任务,将设备的IMEI号分配给Client,并且设置发布、订阅的主题,确保独立性;
  3. 创建mqtt连接(不一定正常连接上),以设备号以及预设置的user name和password配置mqtt,可以设置自动重连;
  4. 编写mqtt回调函数,在mqtt回调函数中检测事件,当连接上mqtt时将数据打包并且通过publish发给broker;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
mqttc:on(function(mqtt_client, event, data, payload)
log.info("mqtt", "event", event) -- 可有
if event == "conack" then
-- MQTT 连接成功
sys.publish("mqtt_conack")
mqtt_client:subscribe(sub_topic)

-- 采集设备信息
local imei = mobile.imei()
local csq = mobile.csq() -- 信号强度
local voltage = mobile.getVbatt() -- 电池电压(单位 mV)

-- 构造 JSON 格式的 payload 数据
local device_data = json.encode({
imei = imei,
csq = csq,
voltage = voltage,
timestamp = os.time()
})

log.info("mqtt", "publish data", device_data)

-- 发布数据到 topic(发布一次)
mqtt_client:publish(pub_topic, device_data, 0)

-- 可选:配置一个上网指示灯亮起
gpio.set(11, 1) -- 设置 GPIO11 为高电平,表示联网成功
end
end)

一些Lua语言的细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- _G -> GLOBAL VALUE
_G.sys = require("sys")
_G.sysplus = require("sysplus")
_G.mobile = require("mobile")
_G.uart = require("uart")

-- uart init
uart.setup(
uart_id,
uart_baud,
uart_bitwidth,
uart_stopbit
)

-- system inner tasks
sys.taskInit(function()
local device_id = mobile.imei() -- read AIR780EG's IMEI
-- waiting for 4G connected
sys.waitUntil("IP_READY")
-- system publish event
sys.publish("net_ready", device_id)
end)

下面是一个完整的脚本,具体的内容如下:

  1. 任务一:实现联网功能并在联网后发布事件;
  2. 任务二:等待联网事件,并根据IMEI设置client-id以及发布、订阅主题;之后根据配置创建mqtt连接并尝试连接;在mqtt回调函数中,如果连接上了则通过串口向单片机发送信息,并发布mqtt连接事件;
  3. 任务三:等待mqtt连接事件,之后开始接收单片传入的数据并且定位数据位置,将数据打包并通过mqtt上传云端;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
PROJECT = "AIR780EG_DEMO"
VERSION = "1.0.0"

-- Setup instance
_G.sys = require("sys")
_G.sysplus = require("sysplus")

local wdt = require("wdt")
local log = require("log")
local uart = require("uart")
local mobile = require("mobile")
local mqtt = require("mqtt") -- 加载 mqtt 模块
local rtos = require("rtos") -- 加载 rtos 模块(用于识别平台)
local pm = require("pm") -- 加载电源管理模块(用于电源配置)

if wdt then
--添加硬狗防止程序卡死,在支持的设备上启用这个功能
wdt.init(9000)--初始化watchdog设置为9s
sys.timerLoopStart(wdt.feed, 3000)--3s喂一次狗
end

-- uart configuration
local uart_id = 1
local uart_baud = 115200
local uart_bitwidth = 8
local uart_stopbit = 1
uart.Setup(uart_id, uart_baud, uart_bitwidth, uart_stopbit)

-- server configuration
local mqtt_host = "lbsmqtt.airm2m.com"
local mqtt_port = 1884
local mqtt_isssl = false
local ca_file = false

-- mqtt client configuration
local client_id = "air780eg"
local user_name = "user"
local password = "password"

local mqttc = nil
local publish_topic = "/air780eg/publish/123"
local subscribe_topic = "/air780eg/subscribe/123"

local IsFirstConnect = true

-- Task list

-- Air780E的AT固件默认会为开机键防抖, 导致部分用户刷机很麻烦
if rtos.bsp() == "EC618" and pm and pm.PWK_MODE then
pm.power(pm.PWK_MODE, false)
end

-- network connect
sys.taskInit(function ()
local device_IMEI = mobile.imei() -- string
sys.waitUntil("IP_READY")
sys.publish("net_ready", device_IMEI)
end)

-- mqtt connect
sys.taskInit(function ()
local ret, IMEI = sys.waitUntil("net_ready")
if ret then
-- use IMEI to set unique topic
client_id = IMEI
publish_topic = client_id .. "/publish"
subscribe_topic = client_id .. "/subscribe"
if IsFirstConnect then
-- uart transmit logger to MCU
uart.write(uart_id, "IMEI: " .. IMEI .. "\r\n")
uart.write(uart_id, "publish_topic:" .. publish_topic .. "\r\n")
uart.write(uart_id, "subscribe_topic:" .. subscribe_topic .. "\r\n")
IsFirstConnect = false
end
end

-- create mqtt client
mqttc = mqtt.create(mqtt_host, mqtt_port, mqtt_isssl, ca_file)
mqttc:auth(client_id, user_name, password) -- set mqtt user-name and password(should be equal to the server setting)
mqttc:autoreconn(true, 3000) -- set auto-reconnect

-- define callback function
mqttc:on(function(mqtt_client, event, data, payload)
if event == "conack" then
uart.write(uart_id, "MQTT Connect\r\n")
sys.publish("mqtt_ready")
elseif event == "recv" then
log.info("mqtt", "收到消息", data, payload)
elseif event == "sent" then
log.info("mqtt", "发送成功", data)
elseif event == "disconnect" then
log.warn("mqtt", "断开连接")
end
end)

-- start to conneect server
mqttc:connect()
end)

-- UART 数据接收 + MQTT 上传
sys.taskInit(function()
sys.waitUntil("mqtt_ready")

log.info("uart", "开始监听 UART 数据上传")
local buffer = ""

-- uart接收触发中断 从id端口接收len长度的数据
uart.on(uart_id, "receive", function(id, len)
local data = uart.read(id, len)
buffer = buffer .. data

-- 示例:以换行符作为一条完整数据的结束
while buffer:find("\r\n") do
local packet, remain = buffer:match("^(.-)\r\n(.*)$")
buffer = remain or ""

-- 打印接收到的串口数据
log.info("uart", "收到数据", packet)
uart.write(uart_id, "Receive Finish!\r\n")

-- 发布到 MQTT(QoS=0)
if mqttc then
mqttc:publish(publish_topic, packet, 0)
end
end
end)
end)

--[[
说明:
1. 该代码示例展示了如何使用 UART 接收数据并通过 MQTT 上传。
2. 使用了 sysplus 库来处理系统任务和事件。
3. 使用了 mqtt 库来处理 MQTT 客户端的连接和消息发送。
4. 使用了 log 库来记录日志信息。
]]
sys.run()