commit 1e7b6d622d18915c78fcb11afd8e7c6c95c0a2b4 Author: wwartana Date: Mon Mar 2 23:45:52 2026 +0800 feat: Add MQTT testing utility for Tasmota devices, including a Python script, configuration, and firmware binary. diff --git a/.env b/.env new file mode 100644 index 0000000..d266856 --- /dev/null +++ b/.env @@ -0,0 +1,4 @@ +MQTT_HOST=mqtt.oncloud.my.id +MQTT_PORT=1883 +MQTT_USER=esp01_NepMB8Yt +MQTT_PASSWORD=V9p3MgUbquQkQ8RB diff --git a/tasmota.bin b/tasmota.bin new file mode 100644 index 0000000..5eec3b0 Binary files /dev/null and b/tasmota.bin differ diff --git a/test_mqtt.py b/test_mqtt.py new file mode 100644 index 0000000..afcd15b --- /dev/null +++ b/test_mqtt.py @@ -0,0 +1,55 @@ +import paho.mqtt.client as mqtt +import time +import sys +import os +from dotenv import load_dotenv + +# Load variables from .env file +load_dotenv() + +def on_connect(client, userdata, flags, reason_code, properties=None): + if reason_code == 0: + print("Connected to MQTT Broker successfully!") + client.subscribe("tele/#") + client.subscribe("stat/#") + client.subscribe("+/tele/#") + client.subscribe("+/stat/#") + else: + print(f"Failed to connect, return code {reason_code}") + +def on_message(client, userdata, msg): + try: + payload = msg.payload.decode('utf-8') + except Exception: + payload = str(msg.payload) + print(f"Topic: {msg.topic} | Message: {payload}") + +# Use older Callback API version for wider compatibility or version 2 if new +try: + client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) +except AttributeError: + client = mqtt.Client() + +# Get credentials from .env +mqtt_user = os.getenv("MQTT_USER") +mqtt_password = os.getenv("MQTT_PASSWORD") +mqtt_host = os.getenv("MQTT_HOST") +mqtt_port = int(os.getenv("MQTT_PORT", 1883)) + +client.username_pw_set(mqtt_user, mqtt_password) +client.on_connect = on_connect +client.on_message = on_message + +print(f"Connecting to {mqtt_host}...") +try: + client.connect(mqtt_host, mqtt_port, 60) +except Exception as e: + print(f"Connection failed: {e}") + sys.exit(1) + +client.loop_start() +# Wait for some messages to arrive +print("Listening for messages for 10 seconds...") +time.sleep(10) +client.loop_stop() +print("Done listening.")