Chociaż stworzenie pełnowymiarowej repliki Home Assistant na mikrokontrolerze z MicroPythonem jest nierealistyczne ze względu na ograniczenia sprzętowe (pamięć RAM, moc obliczeniowa), możliwe jest zbudowanie kompaktowego, inteligentnego węzła, który naśladuje kluczowe funkcje systemów automatyki domowej. Ten węzeł może działać samodzielnie lub integrować się z większym systemem (jak Home Assistant) za pomocą protokołów takich jak MQTT.
Home Assistant to potężne oprogramowanie, zazwyczaj uruchamiane na mocniejszym sprzęcie (Raspberry Pi, NUC, serwery). Zarządza setkami integracji, skomplikowanymi automatyzacjami i rozbudowanym interfejsem użytkownika. Mikrokontrolery, na których działa MicroPython, mają znacznie mniej zasobów.
Naszym celem jest stworzenie inteligentnego urządzenia końcowego (węzła), które realizuje konkretne zadania w ekosystemie smart home, a nie replikowanie całego centrum zarządzania. Skupimy się na:
Różne urządzenia komunikujące się w ramach ekosystemu IoT inteligentnego domu.
Do zbudowania naszego węzła potrzebujemy kilku podstawowych elementów:
Komponent | Opis | Rola w projekcie |
---|---|---|
Mikrokontroler (np. ESP32) | Płytka rozwojowa z Wi-Fi i Bluetooth, obsługująca MicroPython. | Mózg operacji, wykonuje kod MicroPython, zarządza komunikacją i peryferiami. |
Czujnik Temperatury/Wilgotności (np. DHT22, DS18B20) | Urządzenie mierzące warunki środowiskowe. | Dostarcza danych wejściowych do systemu (np. do automatyzacji ogrzewania/chłodzenia). |
Czujnik Ruchu (np. PIR HC-SR501) | Wykrywa ruch w swoim zasięgu. | Umożliwia automatyzacje oparte na obecności (np. włączanie światła). |
Moduł Przekaźnika | Pozwala na sterowanie urządzeniami zasilanymi napięciem sieciowym (np. oświetleniem). | Element wykonawczy (aktuator) sterowany przez mikrokontroler. |
Przewody połączeniowe, płytka stykowa | Do połączenia komponentów. | Umożliwiają prototypowanie układu. |
Powyższa tabela przedstawia przykładowy zestaw komponentów. Można go modyfikować w zależności od potrzeb projektu.
Przykład zestawu do nauki automatyki domowej z ESP32 i MicroPython.
Nasz program w MicroPython będzie składał się z kilku kluczowych modułów:
Powyższy diagram przedstawia strukturę oprogramowania naszego kompaktowego węzła smart home. Widzimy podział na konfigurację, obsługę urządzeń, komunikację, logikę automatyzacji oraz główną pętlę programu sterującą całością.
Poniżej znajduje się kompleksowy, ale wciąż kompaktowy, przykład programu dla ESP32 realizujący opisane funkcje. Zakłada on podłączenie czujnika DHT22 do GPIO 4, czujnika PIR do GPIO 5 oraz modułu przekaźnika do GPIO 15.
Ważne: Przed uruchomieniem upewnij się, że masz zainstalowane niezbędne biblioteki MicroPython (umqtt.simple
, dht
). Zastąp również dane konfiguracyjne (SSID, hasło WiFi, adres brokera MQTT) swoimi własnymi.
# Importowanie niezbędnych bibliotek
import network # Do obsługi WiFi
import time # Do obsługi czasu (opóźnienia, timery)
import machine # Do obsługi pinów GPIO i unikalnego ID
import ubinascii # Do konwersji ID na hex
import ujson # Do obsługi formatu JSON (dla MQTT)
from umqtt.simple import MQTTClient # Klient MQTT
import dht # Obsługa czujnika DHT
import socket # Do serwera WWW
import _thread # Do uruchomienia serwera WWW w osobnym wątku
# --- Konfiguracja ---
WIFI_SSID = 'TWOJA_SIEC_WIFI'
WIFI_PASSWORD = 'TWOJE_HASLO_WIFI'
MQTT_BROKER = 'ADRES_TWOJEGO_BROKERA_MQTT' # np. '192.168.1.100' lub adres publiczny
MQTT_PORT = 1883
CLIENT_ID = b'esp32_smarthome_' + ubinascii.hexlify(machine.unique_id()) # Unikalne ID klienta
TOPIC_PREFIX = b'home/esp32/' + CLIENT_ID[-4:] # Prefiks tematów MQTT
TOPIC_STATUS = TOPIC_PREFIX + b'/status'
TOPIC_COMMAND = TOPIC_PREFIX + b'/set'
TOPIC_TEMP = TOPIC_PREFIX + b'/temperature'
TOPIC_HUMIDITY = TOPIC_PREFIX + b'/humidity'
TOPIC_MOTION = TOPIC_PREFIX + b'/motion'
TOPIC_LIGHT_STATE = TOPIC_PREFIX + b'/light/state'
TOPIC_LIGHT_COMMAND = TOPIC_PREFIX + b'/light/set'
# Piny GPIO
DHT_PIN = 4
PIR_PIN = 5
RELAY_PIN = 15
# Zmienne globalne stanu
current_state = {
'temperature': None,
'humidity': None,
'motion': False,
'light': False,
'last_motion_time': 0
}
# Inicjalizacja urządzeń
dht_sensor = dht.DHT22(machine.Pin(DHT_PIN))
pir_sensor = machine.Pin(PIR_PIN, machine.Pin.IN)
relay = machine.Pin(RELAY_PIN, machine.Pin.OUT)
relay.value(0) # Domyślnie wyłączony
# --- Połączenie WiFi ---
def connect_wifi():
"""Łączy z siecią WiFi."""
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print('Łączenie z WiFi...')
sta_if.active(True)
sta_if.connect(WIFI_SSID, WIFI_PASSWORD)
while not sta_if.isconnected():
time.sleep(1)
print('Połączono z WiFi. IP:', sta_if.ifconfig()[0])
return sta_if.ifconfig()[0]
# --- Klient MQTT ---
mqtt_client = None
def mqtt_callback(topic, msg):
"""Obsługuje przychodzące wiadomości MQTT."""
global current_state
print(f'MQTT Otrzymano: Temat="{topic.decode()}", Wiadomość="{msg.decode()}"')
topic_str = topic.decode()
msg_str = msg.decode().upper() # Ujednolicenie do wielkich liter
if topic_str == TOPIC_LIGHT_COMMAND.decode():
if msg_str == 'ON':
current_state['light'] = True
relay.value(1)
print("Światło WŁĄCZONE przez MQTT")
publish_light_state()
elif msg_str == 'OFF':
current_state['light'] = False
relay.value(0)
print("Światło WYŁĄCZONE przez MQTT")
publish_light_state()
else:
print("Nieznana komenda dla światła")
elif topic_str == TOPIC_COMMAND.decode():
# Można dodać obsługę innych poleceń JSON
try:
command_data = ujson.loads(msg)
if 'light' in command_data:
# Obsługa jak wyżej...
pass
except ValueError:
print("Błąd parsowania JSON w poleceniu ogólnym")
def connect_mqtt():
"""Łączy z brokerem MQTT."""
global mqtt_client
try:
mqtt_client = MQTTClient(CLIENT_ID, MQTT_BROKER, port=MQTT_PORT)
mqtt_client.set_callback(mqtt_callback)
mqtt_client.connect()
print(f'Połączono z brokerem MQTT: {MQTT_BROKER}')
# Subskrypcja tematów
mqtt_client.subscribe(TOPIC_COMMAND)
mqtt_client.subscribe(TOPIC_LIGHT_COMMAND)
print(f'Zasubskrybowano tematy: {TOPIC_COMMAND.decode()}, {TOPIC_LIGHT_COMMAND.decode()}')
return True
except OSError as e:
print(f'Błąd połączenia MQTT: {e}')
time.sleep(5)
return False
def publish_state():
"""Publikuje pełny stan urządzeń w formacie JSON."""
if mqtt_client:
try:
payload = ujson.dumps(current_state)
mqtt_client.publish(TOPIC_STATUS, payload.encode('utf-8'))
# Publikuj również indywidualne stany dla łatwiejszej integracji np. z Home Assistant
if current_state['temperature'] is not None:
mqtt_client.publish(TOPIC_TEMP, str(current_state['temperature']).encode('utf-8'))
if current_state['humidity'] is not None:
mqtt_client.publish(TOPIC_HUMIDITY, str(current_state['humidity']).encode('utf-8'))
mqtt_client.publish(TOPIC_MOTION, b'ON' if current_state['motion'] else b'OFF')
publish_light_state()
except (OSError, AttributeError) as e:
print(f"Błąd publikacji MQTT: {e}")
# Spróbuj ponownie połączyć jeśli problem z klientem
if isinstance(e, AttributeError) or 'Broker' in str(e):
connect_mqtt()
def publish_light_state():
"""Publikuje stan światła."""
if mqtt_client:
try:
mqtt_client.publish(TOPIC_LIGHT_STATE, b'ON' if current_state['light'] else b'OFF')
except (OSError, AttributeError) as e:
print(f"Błąd publikacji stanu światła: {e}")
# --- Odczyt czujników ---
def read_sensors():
"""Odczytuje dane z czujników i aktualizuje stan."""
global current_state
# Odczyt DHT22
try:
dht_sensor.measure()
current_state['temperature'] = dht_sensor.temperature()
current_state['humidity'] = dht_sensor.humidity()
except OSError as e:
print(f'Błąd odczytu DHT22: {e}')
# Nie zeruj odczytów, zachowaj ostatnie poprawne
# current_state['temperature'] = None
# current_state['humidity'] = None
# Odczyt PIR
motion_detected = bool(pir_sensor.value())
if motion_detected and not current_state['motion']:
print("Wykryto ruch!")
current_state['last_motion_time'] = time.time()
current_state['motion'] = motion_detected
# --- Logika Automatyzacji ---
MOTION_LIGHT_TIMEOUT = 30 # Czas (w sekundach) przez jaki światło pozostaje włączone po ostatnim ruchu
def run_automations():
"""Wykonuje proste reguły automatyzacji."""
global current_state
# Automatyzacja: Włącz światło po wykryciu ruchu, wyłącz po timeout'cie
if current_state['motion']:
if not current_state['light']:
print("Automatyzacja: Włączam światło z powodu ruchu.")
current_state['light'] = True
relay.value(1)
publish_light_state() # Natychmiast publikuj zmianę stanu
else: # Brak ruchu
if current_state['light'] and (time.time() - current_state['last_motion_time'] > MOTION_LIGHT_TIMEOUT):
print(f"Automatyzacja: Wyłączam światło po {MOTION_LIGHT_TIMEOUT}s braku ruchu.")
current_state['light'] = False
relay.value(0)
publish_light_state() # Natychmiast publikuj zmianę stanu
# Tutaj można dodać więcej reguł, np.
# if current_state['temperature'] is not None and current_state['temperature'] > 28:
# print("Automatyzacja: Temperatura powyżej 28C!")
# # np. włącz wentylator (jeśli podłączony)
# --- Prosty serwer WWW ---
def start_web_server(ip_address):
"""Uruchamia prosty serwer WWW do podglądu stanu i sterowania."""
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Pozwala na szybkie restartowanie serwera
s.bind(addr)
s.listen(1)
print(f'Serwer WWW uruchomiony na http://{ip_address}')
while True:
try:
cl, addr = s.accept()
# print('Połączenie webowe od', addr)
request = cl.recv(1024)
request_str = request.decode('utf-8')
# Proste parsowanie żądania GET
light_command = None
if 'GET /?light=on' in request_str:
light_command = 'ON'
elif 'GET /?light=off' in request_str:
light_command = 'OFF'
if light_command:
global current_state
if light_command == 'ON':
current_state['light'] = True
relay.value(1)
print("Światło WŁĄCZONE przez Web")
else:
current_state['light'] = False
relay.value(0)
print("Światło WYŁĄCZONE przez Web")
publish_light_state() # Aktualizuj stan MQTT
# Przygotowanie odpowiedzi HTML
temp_str = f"{current_state['temperature']:.1f}" if current_state['temperature'] is not None else "N/A"
hum_str = f"{current_state['humidity']:.1f}" if current_state['humidity'] is not None else "N/A"
light_status = "WŁĄCZONE" if current_state['light'] else "WYŁĄCZONE"
motion_status = "WYKRYTO" if current_state['motion'] else "BRAK"
html = f"""<!DOCTYPE html>
<html>
<head>
<title>ESP32 SmartHome Node</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta http-equiv="refresh" content="10"> {/* Odświeżanie co 10 sekund */}
<style>
body {{ font-family: sans-serif; padding: 15px; }}
.button {{ padding: 10px 20px; font-size: 16px; cursor: pointer; margin: 5px; }}
.on {{ background-color: #4CAF50; color: white; }}
.off {{ background-color: #f44336; color: white; }}
.status {{ font-weight: bold; }}
.status-on {{ color: green; }}
.status-off {{ color: red; }}
.status-motion {{ color: orange; }}
</style>
</head>
<body>
<h1>Status Węzła ESP32</h1>
<p>Temperatura: <span class="status">{temp_str} °C</span></p>
<p>Wilgotność: <span class="status">{hum_str} %</span></p>
<p>Ruch: <span class="status {'status-motion' if current_state['motion'] else ''}">{motion_status}</span></p>
<p>Światło: <span class="status {'status-on' if current_state['light'] else 'status-off'}">{light_status}</span></p>
<hr>
<h2>Sterowanie</h2>
<a href="/?light=on"><button class="button on">Włącz Światło</button></a>
<a href="/?light=off"><button class="button off">Wyłącz Światło</button></a>
</body>
</html>"""
# Wysłanie odpowiedzi
cl.send('HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\n\r\n')
cl.sendall(html.encode('utf-8'))
cl.close()
except OSError as e:
print(f"Błąd serwera WWW: {e}")
if cl: cl.close() # Zamknij połączenie w razie błędu
except Exception as e:
print(f"Inny błąd serwera WWW: {e}")
if cl: cl.close()
# --- Główna Pętla ---
def main():
"""Główna funkcja programu."""
ip = connect_wifi()
if not connect_mqtt():
print("Nie udało się połączyć z MQTT przy starcie. Spróbuję ponownie później.")
# Uruchomienie serwera WWW w osobnym wątku
try:
_thread.start_new_thread(start_web_server, (ip,))
except Exception as e:
print(f"Nie udało się uruchomić serwera WWW: {e}")
last_sensor_read = 0
last_mqtt_publish = 0
sensor_read_interval = 5 # Sekundy
mqtt_publish_interval = 30 # Sekundy
while True:
current_time = time.time()
# Sprawdzanie wiadomości MQTT (nieblokujące)
if mqtt_client:
try:
mqtt_client.check_msg()
except OSError as e:
print(f"Błąd sprawdzania wiadomości MQTT: {e}. Próba ponownego połączenia...")
time.sleep(5)
connect_mqtt() # Spróbuj połączyć ponownie
# Odczyt czujników w zadanym interwale
if current_time - last_sensor_read >= sensor_read_interval:
read_sensors()
last_sensor_read = current_time
# Wykonywanie automatyzacji (można częściej niż odczyt sensorów)
run_automations()
# Publikowanie stanu MQTT w zadanym interwale
if current_time - last_mqtt_publish >= mqtt_publish_interval:
if mqtt_client: # Publikuj tylko jeśli połączony
print("Publikowanie stanu przez MQTT...")
publish_state()
last_mqtt_publish = current_time
elif not mqtt_client: # Spróbuj połączyć jeśli niepołączony
print("Próba ponownego połączenia z MQTT przed publikacją...")
connect_mqtt()
time.sleep_ms(100) # Krótkie opóźnienie, aby nie obciążać procesora
# --- Uruchomienie programu ---
if __name__ == '__main__':
main()
connect_wifi()
i connect_mqtt()
zarządzają połączeniami sieciowymi.mqtt_callback
obsługuje polecenia przychodzące (np. włącz/wyłącz światło). publish_state
wysyła aktualne dane z czujników i stan urządzeń do brokera. Użyto formatu JSON dla statusu ogólnego oraz indywidualnych tematów dla łatwiejszej integracji.read_sensors()
odczytuje dane z DHT22 i PIR. Przekaźnik jest sterowany przez zmianę stanu pinu.run_automations()
implementuje prostą logikę (światło włącza się po wykryciu ruchu i wyłącza po 30 sekundach bezczynności).start_web_server()
(uruchamiana w osobnym wątku) tworzy prostą stronę HTML, która wyświetla aktualny stan i pozwala na ręczne sterowanie światłem. Strona automatycznie się odświeża.main()
inicjalizuje system, uruchamia serwer WWW i wchodzi w nieskończoną pętlę, która regularnie sprawdza wiadomości MQTT, odczytuje czujniki, wykonuje automatyzacje i publikuje stan. Dodano obsługę błędów i próby ponownego połączenia z MQTT.Aby lepiej zrozumieć różnice i możliwości naszego kompaktowego systemu w porównaniu do pełnego Home Assistant, spójrzmy na poniższy wykres:
Wykres radarowy ilustruje kompromisy. Nasz węzeł MicroPython jest znacznie prostszy, tańszy i mniej zasobożerny, ale oferuje ograniczoną funkcjonalność interfejsu, integracji i automatyzacji w porównaniu do rozbudowanego systemu Home Assistant. Jest jednak potencjalnie łatwiejszy do skonfigurowania dla podstawowych zadań i może być bardzo niezawodny w swojej ograniczonej roli.
Protokół MQTT (Message Queuing Telemetry Transport) jest kluczowy dla naszego projektu. Działa on na zasadzie publikowania (wysyłania) wiadomości do określonych "tematów" (topics) oraz subskrybowania (odbierania) wiadomości z interesujących nas tematów. Centralnym punktem jest "broker" MQTT, który pośredniczy w wymianie wiadomości.
Film demonstrujący podstawy wykorzystania MQTT w MicroPythonie na przykładzie platformy Adafruit IO (która działa jako broker MQTT).
W naszym kodzie:
home/esp32/xxxx/temperature
).home/esp32/xxxx/light/set
), aby odbierać komendy wysłane przez inne urządzenia lub aplikacje (np. Home Assistant, aplikacja mobilna MQTT).Dzięki MQTT, nasz kompaktowy węzeł może stać się częścią większego ekosystemu. Możesz skonfigurować Home Assistant, aby odczytywał dane z tematów publikowanych przez ESP32 i wysyłał polecenia do tematów, które ESP32 subskrybuje. To pozwala na wykorzystanie zaawansowanych funkcji HA (jak złożone automatyzacje, integracje z innymi usługami, interfejs graficzny) przy jednoczesnym zachowaniu prostoty i niskiego kosztu na poziomie samego urządzenia wykonawczego.
Nie. Jest to kompaktowy węzeł realizujący podstawowe funkcje smart home (odczyt sensorów, sterowanie, prosta automatyzacja, komunikacja MQTT). Home Assistant to znacznie bardziej rozbudowana platforma z tysiącami integracji, zaawansowanym interfejsem i skomplikowaną logiką, wymagająca mocniejszego sprzętu.
Podstawowe biblioteki potrzebne do uruchomienia przykładowego kodu to:
umqtt.simple
: Do komunikacji MQTT. Często wymaga doinstalowania (np. przez `upip` lub ręczne wgranie pliku).dht
: Do obsługi czujników DHT11/DHT22. Zazwyczaj wbudowana w firmware MicroPython dla ESP32/ESP8266 lub łatwo dostępna.network
, time
, machine
, ubinascii
, ujson
, socket
, _thread
są zazwyczaj wbudowane.Tak. Kod jest napisany w sposób modułowy. Możesz dodać obsługę nowych czujników (np. czujnik jakości powietrza, czujnik światła) i aktuatorów (np. serwomechanizmy, kolejne przekaźniki), definiując odpowiednie piny, funkcje odczytu/sterowania i dodając je do głównej pętli oraz publikacji MQTT. Pamiętaj o ograniczeniach liczby pinów GPIO i zasobów mikrokontrolera.
Najprostszym sposobem jest wykorzystanie integracji MQTT w Home Assistant. Możesz dodać sensory MQTT, które będą odczytywać dane z tematów publikowanych przez ESP32 (np. `home/esp32/xxxx/temperature`). Możesz również dodać przełączniki (switch) lub światła (light) MQTT, które będą wysyłać polecenia ('ON'/'OFF') na tematy subskrybowane przez ESP32 (np. `home/esp32/xxxx/light/set`). Dokumentacja Home Assistant zawiera szczegółowe instrukcje konfiguracji komponentów MQTT.