feat: Add MQTT testing utility for Tasmota devices, including a Python script, configuration, and firmware binary.

This commit is contained in:
2026-03-02 23:45:52 +08:00
commit 1e7b6d622d
3 changed files with 59 additions and 0 deletions

4
.env Normal file
View File

@@ -0,0 +1,4 @@
MQTT_HOST=mqtt.oncloud.my.id
MQTT_PORT=1883
MQTT_USER=esp01_NepMB8Yt
MQTT_PASSWORD=V9p3MgUbquQkQ8RB

BIN
tasmota.bin Normal file

Binary file not shown.

55
test_mqtt.py Normal file
View File

@@ -0,0 +1,55 @@
import paho.mqtt.client as mqtt
import time
import sys
import os
from dotenv import load_dotenv
# Load variables from .env file
load_dotenv()
def on_connect(client, userdata, flags, reason_code, properties=None):
if reason_code == 0:
print("Connected to MQTT Broker successfully!")
client.subscribe("tele/#")
client.subscribe("stat/#")
client.subscribe("+/tele/#")
client.subscribe("+/stat/#")
else:
print(f"Failed to connect, return code {reason_code}")
def on_message(client, userdata, msg):
try:
payload = msg.payload.decode('utf-8')
except Exception:
payload = str(msg.payload)
print(f"Topic: {msg.topic} | Message: {payload}")
# Use older Callback API version for wider compatibility or version 2 if new
try:
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
except AttributeError:
client = mqtt.Client()
# Get credentials from .env
mqtt_user = os.getenv("MQTT_USER")
mqtt_password = os.getenv("MQTT_PASSWORD")
mqtt_host = os.getenv("MQTT_HOST")
mqtt_port = int(os.getenv("MQTT_PORT", 1883))
client.username_pw_set(mqtt_user, mqtt_password)
client.on_connect = on_connect
client.on_message = on_message
print(f"Connecting to {mqtt_host}...")
try:
client.connect(mqtt_host, mqtt_port, 60)
except Exception as e:
print(f"Connection failed: {e}")
sys.exit(1)
client.loop_start()
# Wait for some messages to arrive
print("Listening for messages for 10 seconds...")
time.sleep(10)
client.loop_stop()
print("Done listening.")