micro_mpc_kmk/main.py

165 lines
4.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
from switch import Switch
import select
from machine import Pin
from time import sleep_ms
from ir_pair import IRRxTxPollPair
SEMINSU = dict()
SWITCHES = dict()
LED = Pin("LED", Pin.OUT) # "LED" — специальное имя для встроенного индикатора
def load_switches():
with open("config.json", "r") as file:
import json
config = json.load(file)
for sw_cfg in config["switches"]:
sw = Switch(
id=sw_cfg["id"],
pin=sw_cfg["pin"],
angle_minus=sw_cfg["angle_minus"],
angle_plus=sw_cfg["angle_plus"]
)
SWITCHES[sw.id] = sw
def load_seminsus():
with open("config.json", "r") as file:
import json
config = json.load(file)
for sem_cfg in config["seminsus"]:
seminsu = IRRxTxPollPair(
rx_pin=sem_cfg["pin_rx"],
tx_pin=sem_cfg["pin_tx"],
poll_period_ms=50,
tx_on_ms=3,
blinks_per_poll=10,
blink_off_ms=2,
freq_hz=38_000,
duty_percent=33,
min_edges=10,
count_rising=False,
count_falling=True,
)
SEMINSU[sem_cfg["id"]] = seminsu
def resolve_command(command: str):
parts = command.split()
if len(parts) == 4 and parts[0] == "SWITCH" and parts[2] == "TURN":
try:
sw_id = int(parts[1])
except ValueError:
return "ERROR Invalid ID"
direction = parts[3]
if direction not in ("1", "0"):
return "ERROR Invalid direction"
if sw_id not in SWITCHES:
return f"ERROR Switch {sw_id} not found"
# Выполняем действие
if direction == "0":
SWITCHES[sw_id].set_plus()
else:
SWITCHES[sw_id].set_minus()
# РОВНО ОДНА СТРОКА — подтверждение успеха
return f"EVENT SWITCH {sw_id} {direction}"
elif parts[0] == "GET" and parts[1] == "ALL":
evts = []
for id, sw in SWITCHES.items():
evts.append(f"EVENT SWITCH {id} {sw.pos}")
for id, seminsu in SEMINSU.items():
evts.append(f"EVENT IK_MODULE {id} {seminsu.last_state}")
return "\n".join(evts)
_seminsu_ids = []
_seminsu_idx = 0
_active_seminsu_id = None
def _next_seminsu_id():
global _seminsu_idx
if not _seminsu_ids:
return None
sid = _seminsu_ids[_seminsu_idx]
_seminsu_idx = (_seminsu_idx + 1) % len(_seminsu_ids)
return sid
def poll_seminsus_step():
"""Неблокирующий шаг опроса seminsu.
В каждый момент времени опрашивается только ОДНА пара.
"""
global _active_seminsu_id
if not SEMINSU:
return
if _active_seminsu_id is None:
_active_seminsu_id = _next_seminsu_id()
if _active_seminsu_id is None:
return
SEMINSU[_active_seminsu_id].start_poll()
seminsu = SEMINSU[_active_seminsu_id]
done = seminsu.update()
if not done:
return
# Цикл завершён — печатаем при изменении состояния.
if seminsu.prev_state is not None and seminsu.last_state is not None:
if seminsu.prev_state != seminsu.last_state:
# state: 1 = перекрыт, 0 = не перекрыт
print(f"EVENT IK_MODULE {_active_seminsu_id} {seminsu.last_state}")
_active_seminsu_id = None
def work():
poll = select.poll()
poll.register(sys.stdin, select.POLLIN)
# Готовим список id seminsu для round-robin.
global _seminsu_ids, _seminsu_idx, _active_seminsu_id
_seminsu_ids = list(SEMINSU.keys())
_seminsu_idx = 0
_active_seminsu_id = None
while True:
LED.toggle()
# 1) Обработка stdin НЕ блокирует цикл seminsu.
events = poll.poll(0)
for fd, event in events:
if event & select.POLLIN:
try:
line = sys.stdin.readline().strip()
if not line:
continue
result = resolve_command(line)
if result:
print(result)
except Exception as e:
print(f"ERROR {e}")
# 2) Один неблокирующий шаг опроса seminsu.
poll_seminsus_step()
# Небольшая пауза, чтобы не крутить CPU на 100%.
sleep_ms(1)
if __name__ == "__main__":
load_switches()
load_seminsus()
work()