MicroPython 提供有 umqtt 模組可以使用 MQTT 協定, 不過在 umqtt 裡面有再分為 simple 和 robust 模組, 這兩種模組功能基本相同, 但如果設計純訂閱端 (只收資料不送資料) 這樣的程式, 在使用上就必須多加留意。
簡易的純訂閱端範例
以下我們先以 simple 模組為例, 寫一個簡單的 MQTT 用戶端程式, 連接可免費無須帳密即可測試的 test.mosquitto.org:
import time
import network
from umqtt.simple import MQTTClient
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('FLAG-SCHOOL', '12345678')
while not sta_if.isconnected():
pass
print("connected")
client = MQTTClient(
client_id="clientXXX",
keepalive=5,
server="test.mosquitto.org",
ssl=False)
client.connect()
def get_msg(topic, msg):
print(msg)
client.set_callback(get_msg)
client.subscribe(b'meebox')
counter = 0
while True:
client.check_msg()
print(counter)
counter = counter + 1
time.sleep(1)
keepalive 參數
在連接 MQTT 時, 有個重要的參數叫做 keepalive, 它可以控制 MQTT 用戶端在多久 (規格上是 1.5 個 keepalive 指定的秒數) 沒有傳或收資料後會由 MQTT 伺服端主動斷線。像是上面的範例, keepalive 設為 5 秒, 實際執行結果如下:
>>> %Run -c $EDITOR_CONTENT
connected
0
1
2
3
4
5
6
7
8
9
10
11
12
Traceback (most recent call last):
File "<stdin>", line 28, in <module>
File "umqtt/simple.py", line 204, in check_msg
File "umqtt/simple.py", line 173, in wait_msg
OSError: -1
執行後約 12 秒 (比設定的 1.5×5 秒久) 就會發生叫用 check_msg 時因為伺服端主動斷線而引發例外。若設定為更短的 3 秒則是大約 9 秒會斷線:
>>> %Run -c $EDITOR_CONTENT
connected
0
1
2
3
4
5
6
7
8
9
Traceback (most recent call last):
File "<stdin>", line 28, in <module>
File "umqtt/simple.py", line 204, in check_msg
File "umqtt/simple.py", line 173, in wait_msg
OSError: -1
如果你不設定 keepalive, 預設值為 0, 等同於關閉自動斷線的功能, 採用伺服端的預設閒置時間, 像是 test.mosquitto.org 的預設值大概會在 7~8 分鐘後主動斷線;若是 Adafruit IO 的 MQTT 大約會在 4~5 分鐘後主動斷線。
ping
如果想要一直維持連線, 可以不定時 ping 伺服端, 這樣伺服端就知道雖然沒有傳輸資料, 但是這個用戶端是活著的:
while True:
client.check_msg()
print(counter)
counter = counter + 1
client.ping()
time.sleep(1)
只要加上 ping(), 就不會被斷線了:
>>> %Run -c $EDITOR_CONTENT
connected
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
自行處理例外
如果覺得不定時 ping 伺服端還要浪費網路流量, 那也可以自行處理例外, 並重新連線訂閱主題, 例如:
import time
import network
from umqtt.simple import MQTTClient
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('FLAG-SCHOOL', '12345678')
# sta_if.connect('MEE_MI', 'PinkFloyd1969')
while not sta_if.isconnected():
pass
print("connected")
client = MQTTClient(
client_id="clientXXX",
keepalive=3,
server="test.mosquitto.org",
ssl=False)
client.connect(False) # 連線時採用 False 不清除會談資料
def get_msg(topic, msg):
print(msg)
client.set_callback(get_msg)
client.subscribe(b'meebox')
counter = 0
while True:
try:
client.check_msg()
print(counter)
counter = counter + 1
time.sleep(1)
except OSError as e:
print("reconnecting...")
client.connect(False) # 重新連線時也採用 False 不清除會談資料
print("reconected.")
只要處理伺服端斷線的 OSError 例外, 就可以加上重新連線的程式, 繼續收到訊息。注意到在這個程式中, 不論是一開始連線或是重新連線時傳入的 False, 這表示不清除 (clean) 用戶端會談資訊 (session), 意思就是接續之前使用相同 client_id 的連線, 恢復訂閱, 才能繼續收到新的訊息。這個參數預設為 True, 會清除會談資訊, 也就是當成全新的用戶端。像是這樣利用處理例外的機制, 就可以自動重新連線了:
>>> %Run -c $EDITOR_CONTENT
connected
0
1
2
3
4
5
reconnecting...
reconected.
6
7
8
9
自動幫你處理斷線的 robust 模組
自行處理例外雖然可以解決問題, 但是程式就會比較複雜, 因此 MicroPython 提供了 robust 模組, 幫你自動處理斷線問題, 只要把原先的程式改成從 umqtt.robust 匯入 MQTTCLient 類別, 並且在連線時指定 False 不清除會談資料即可:
import time
import network
from umqtt.robust import MQTTClient
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('FLAG-SCHOOL', '12345678')
while not sta_if.isconnected():
pass
print("connected")
client = MQTTClient(
client_id="clientXXX",
keepalive=3,
server="test.mosquitto.org",
ssl=False)
client.connect(False) # 記得要指定 False
def get_msg(topic, msg):
print(msg)
client.set_callback(get_msg)
client.subscribe(b'meebox')
counter = 0
while True:
client.check_msg()
print(counter)
counter = counter + 1
time.sleep(1)
這和最一開始使用 simple 模組的範例是一樣的, 只是改用 robust 模組, 並且在連線時指定 False 不清除會談資訊。執行結果卻有點小怪:
>>> %Run -c $EDITOR_CONTENT
connected
b'hello'
0
1
2
3
4
5
停在 5 就不動了?這主要是因為 check_msg() 的實作如下:
def check_msg(self):
self.sock.setblocking(False)
return self.wait_msg()
check_msg() 會將 socket 設為非擱置 (non-blocking) 模式, 然後才叫用 wait_msg(), 這樣當伺服端沒有傳入資料時嘗試讀取的動作就會立刻結束而不會等待。wait_msg() 的實作如下:
def wait_msg(self):
while 1:
try:
return super().wait_msg()
except OSError as e:
self.log(False, e)
self.reconnect()
你可以看到它會處理例外, 並且重新連線, 不過因為重新連線後會再次叫用 wait_msg(), 但是卻又沒有先設定為非擱置模式, 所以若沒有收到資料, 在嘗試讀取時若沒有資料就會等待新資料而不會立即返回, 這也就是執行結果之所以停住不動的原因。如果這時有發佈端發佈新訊息, 就會看到程式繼續動作了:
>>> %Run -c $EDITOR_CONTENT
connected
b'hello'
0
1
2
3
4
5
b'hello'
6
綜合上述, 我建議最簡單的方式就是持續的 ping 伺服端來保持連線。
client_id 的陷阱
在使用 MQTT 時, 很重要的是個別的用戶端 client_id 不能相同, 甚至像是 Adafruit IO (AIO) 的 MQTT 中, 即使是登入到不同的帳號, 也不能使用相同的 client_id, 否則一定會有用戶端被踢出來, 以下就是以 robust 模組連接 AIO 使用相同 client_id 的結果:
>>> %Run -c $EDITOR_CONTENT
connected
Traceback (most recent call last):
File "<stdin>", line 52, in <module>
File "umqtt/simple.py", line 204, in check_msg
File "umqtt/robust.py", line 43, in wait_msg
File "umqtt/robust.py", line 23, in reconnect
File "umqtt/simple.py", line 99, in connect
MQTTException: 6
MQTTException 例外號碼 6 表示用戶端被斷線後想要重新連線, 但是遭到伺服端拒絕。
對於這個問題, 最簡單的解法就是把 client_id 設定為 "" 空字串, 這樣 AIO 會幫你自動產生一個獨一無二的名稱, 就不會跟別人重複了。不過這樣每次都會產生不一樣的 client_id, 就無法以不清除會談資料的方式接續連線了。
如果一定要自己指定 client_id, 那麼可以使用這類隨機產生獨一無二編碼的工具, 就可以避免與其他人重複的狀況。
通知訂閱端發佈端意外斷線的 will (遺囑) 訊息
訊息的發佈端可以在連線時設定 will 訊息的頻道與內容, 伺服端會記住發佈端的設定, 這樣一旦偵測到發佈端意外斷線時, 伺服端會將發佈端設定的頻道與內容發佈給所有的訂閱者。舉例來說, 如果有以下的發佈端:
import time
import network
from umqtt.simple import MQTTClient
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('FLAG-SCHOOL', '12345678')
while not sta_if.isconnected():
pass
print("connected")
client = MQTTClient(
client_id="clientXXXPUB",
keepalive=3,
server="test.mosquitto.org",
ssl=False)
client.set_last_will(b'disconnect', b'oh!oh!')
client.connect()
client.publish(b'meebox', b'online')
這個發佈端發佈完訊息就結束了, 並沒有透過 client.disconnect() 斷線, 如果我們執行前面範例的訂閱端程式, 就會看到類似這樣的結果:
0
1
2
3
b'online'
4
5
6
7
8
b'oh!oh!'
9
10
請特別注意, 這個 will 訊息並不需要訂閱, 就會自動接收到。
如果發佈端是使用 client.disconnect() 正常結束連線, 伺服端就不會發送 will 訊息。
Top comments (4)
非常感谢,确实是大佬推荐后找到的最详尽的资料了
還有大佬, 感覺很厲害
太感謝啦,mpy的中文資料太稀少了,實在是很好的文章。
這都是自己被雷到後才下定決心好好認識一下 MQTT