diff --git a/config.json b/config.json index 664b9fe..7fdd93d 100644 --- a/config.json +++ b/config.json @@ -3,7 +3,7 @@ { "id": 1283, "pin": 12, - "angle_minus": 65, + "angle_minus": 70, "angle_plus": 125 }, { @@ -24,5 +24,17 @@ "angle_minus": 125, "angle_plus": 65 } + ], + "seminsus": [ + { + "id": 6000, + "pin_rx": 16, + "pin_tx": 6 + }, + { + "id": 6001, + "pin_rx": 17, + "pin_tx": 7 + } ] } \ No newline at end of file diff --git a/ir_pair.py b/ir_pair.py new file mode 100644 index 0000000..b0838ce --- /dev/null +++ b/ir_pair.py @@ -0,0 +1,302 @@ +from machine import Pin, PWM +from time import ticks_ms, ticks_diff, sleep_ms + + +class IRRxTxPollPair: + """Пара ИК-передатчик/приёмник с опросной логикой. + + Логика цикла: + 1) включаем передатчик (непрерывные 38кГц) + 2) ждём окно выборки и считаем фронты на RX + 3) выключаем передатчик + 4) ждём остаток периода + + Примечание: + Многие ИК-приёмники (TSOP/VS1838) демодулируют посылки и могут + подавлять "постоянный" carrier из-за AGC. В таком случае фронтов может + быть мало/не быть — это особенность модуля приёмника. + """ + + def __init__( + self, + *, + rx_pin: int, + tx_pin: int, + poll_period_ms: int = 500, + tx_on_ms: int = 50, + blinks_per_poll: int = 1, + blink_off_ms: int = 5, + freq_hz: int = 38_000, + duty_percent: int = 33, + min_edges: int = 1, + count_rising: bool = True, + count_falling: bool = True, + rx_pull: int | None = None, + ): + if poll_period_ms <= 0: + raise ValueError("poll_period_ms must be > 0") + if tx_on_ms <= 0 or tx_on_ms > poll_period_ms: + raise ValueError("tx_on_ms must be in 1..poll_period_ms") + if blinks_per_poll <= 0: + raise ValueError("blinks_per_poll must be > 0") + if blink_off_ms < 0: + raise ValueError("blink_off_ms must be >= 0") + if not (1 <= duty_percent <= 100): + raise ValueError("duty_percent must be in 1..100") + if min_edges < 0: + raise ValueError("min_edges must be >= 0") + if not (count_rising or count_falling): + raise ValueError("At least one of count_rising/count_falling must be True") + + active_ms = (blinks_per_poll * tx_on_ms) + ((blinks_per_poll - 1) * blink_off_ms) + if active_ms > poll_period_ms: + raise ValueError("poll_period_ms too small for blinks_per_poll/tx_on_ms/blink_off_ms") + + self.rx_pin_num = rx_pin + self.tx_pin_num = tx_pin + + self.poll_period_ms = poll_period_ms + self.tx_on_ms = tx_on_ms + self.blinks_per_poll = blinks_per_poll + self.blink_off_ms = blink_off_ms + self.freq_hz = freq_hz + self.duty_percent = duty_percent + self.min_edges = min_edges + self.count_rising = count_rising + self.count_falling = count_falling + + pull = 0 + if rx_pull is not None: + pull = rx_pull + + self._rx = Pin(rx_pin, Pin.IN, pull) + self._tx_pin = Pin(tx_pin, Pin.OUT) + self._pwm = PWM(self._tx_pin) + self._pwm.freq(freq_hz) + + self._edge_count = 0 + # Храним результат опросов внутри пары. + self._last_seen = None # type: bool | None + self._prev_seen = None # type: bool | None + self._last_edges = 0 + self._prev_edges = 0 + self._last_poll_ms = None # type: int | None + trigger = 0 + if count_rising: + trigger |= Pin.IRQ_RISING + if count_falling: + trigger |= Pin.IRQ_FALLING + self._rx.irq(trigger=trigger, handler=self._on_rx_edge) + + self._tx_enabled = False + self.disable_tx() # стартуем с выключенного излучения + + # Неблокирующий опрос: state machine. + self._poll_active = False + self._phase = 0 # 0=idle, 1=tx_on, 2=tx_off_gap, 3=rest + self._blink_idx = 0 + self._deadline_ms = 0 + + def _on_rx_edge(self, _pin): + self._edge_count += 1 + + @property + def edge_count(self) -> int: + """Количество фронтов на RX в последнем/текущем цикле опроса.""" + return self._edge_count + + @property + def last_seen(self) -> bool | None: + """Результат последнего poll_once(): True/False или None если ещё не вызывали.""" + return self._last_seen + + @property + def prev_seen(self) -> bool | None: + """Результат предпоследнего poll_once(): True/False или None если данных нет.""" + return self._prev_seen + + @property + def last_edges(self) -> int: + """Количество фронтов, зафиксированных в последнем poll_once().""" + return self._last_edges + + @property + def prev_edges(self) -> int: + """Количество фронтов, зафиксированных в предпоследнем poll_once().""" + return self._prev_edges + + @property + def last_poll_ms(self) -> int | None: + """Время (ticks_ms) когда завершился последний poll_once(), либо None.""" + return self._last_poll_ms + + @staticmethod + def _seen_to_state(seen: bool) -> int: + # 1 = луч перекрыт, 0 = луч не перекрыт + return 1 if seen else 0 + + @property + def last_state(self) -> int | None: + """Состояние последнего опроса: 1=перекрыт, 0=не перекрыт, None=нет данных.""" + if self._last_seen is None: + return None + return self._seen_to_state(self._last_seen) + + @property + def prev_state(self) -> int | None: + """Состояние предпоследнего опроса: 1=перекрыт, 0=не перекрыт, None=нет данных.""" + if self._prev_seen is None: + return None + return self._seen_to_state(self._prev_seen) + + @staticmethod + def _set_pwm_duty(pwm: PWM, duty_u16: int): + # Совместимость между портами MicroPython. + try: + pwm.duty_u16(duty_u16) + except AttributeError: + # ESP32/старые порты + duty_10bit = (duty_u16 * 1023) // 65535 + duty = getattr(pwm, "duty", None) + if duty is None: + raise + duty(duty_10bit) + + def enable_tx(self): + duty_u16 = int(65535 * self.duty_percent / 100) + self._set_pwm_duty(self._pwm, duty_u16) + self._tx_enabled = True + + def disable_tx(self): + self._set_pwm_duty(self._pwm, 0) + self._tx_enabled = False + + @property + def poll_in_progress(self) -> bool: + return self._poll_active + + def start_poll(self): + """Запустить один цикл опроса НЕБЛОКИРУЮЩЕ. + + Дальше нужно часто вызывать update(); когда update() вернёт True — цикл завершён. + """ + if self._poll_active: + return + + # Сдвигаем прошлое состояние. + self._prev_seen = self._last_seen + self._prev_edges = self._last_edges + + self._edge_count = 0 + self._blink_idx = 0 + + now = ticks_ms() + self.enable_tx() + self._phase = 1 # tx_on + self._deadline_ms = now + self.tx_on_ms + self._poll_active = True + + def update(self) -> bool: + """Продвинуть state machine опроса. + + Возвращает True, если цикл опроса ЗАВЕРШЁН в этом вызове. + """ + if not self._poll_active: + return False + + now = ticks_ms() + if ticks_diff(now, self._deadline_ms) < 0: + return False + + if self._phase == 1: # tx_on закончился + self.disable_tx() + if self._blink_idx < (self.blinks_per_poll - 1): + self._phase = 2 # tx_off_gap + self._deadline_ms = now + self.blink_off_ms + else: + self._phase = 3 # rest + active_ms = (self.blinks_per_poll * self.tx_on_ms) + ((self.blinks_per_poll - 1) * self.blink_off_ms) + rest = self.poll_period_ms - active_ms + self._deadline_ms = now + rest + return False + + if self._phase == 2: # gap закончился, следующий blink + self._blink_idx += 1 + self.enable_tx() + self._phase = 1 + self._deadline_ms = now + self.tx_on_ms + return False + + # rest закончился -> фиксируем результат + seen = self._edge_count >= self.min_edges + self._last_seen = seen + self._last_edges = self._edge_count + self._last_poll_ms = now + + self._poll_active = False + self._phase = 0 + return True + + def poll_once(self) -> bool: + """Один цикл опроса. Возвращает True если RX "видел" активность.""" + # Совместимость: блокирующий вариант поверх неблокирующего. + self.start_poll() + while not self.update(): + sleep_ms(1) + return bool(self._last_seen) + + def run(self, callback=None): + """Бесконечный опрос. + + callback(seen: bool, edge_count: int, now_ms: int) вызывается каждый цикл. + """ + next_t = ticks_ms() + while True: + now = ticks_ms() + # Держим период стабильным даже если callback/лог печатает долго. + if ticks_diff(now, next_t) < 0: + sleep_ms(1) + continue + + self._edge_count = 0 + for i in range(self.blinks_per_poll): + self.enable_tx() + sleep_ms(self.tx_on_ms) + self.disable_tx() + if self.blink_off_ms and i != (self.blinks_per_poll - 1): + sleep_ms(self.blink_off_ms) + + # Сдвигаем прошлое состояние и фиксируем текущее. + self._prev_seen = self._last_seen + self._prev_edges = self._last_edges + + seen = self._edge_count >= self.min_edges + self._last_seen = seen + self._last_edges = self._edge_count + self._last_poll_ms = ticks_ms() + if callback is not None: + callback(seen, self._edge_count, now) + + next_t = next_t + self.poll_period_ms + + def deinit(self): + try: + self._rx.irq(handler=None) + except Exception: + pass + try: + self.disable_tx() + except Exception: + pass + try: + self._pwm.deinit() + except Exception: + pass + + +# Пример использования: +# +# from ir_pair import IRRxTxPollPair +# +# pair = IRRxTxPollPair(rx_pin=16, tx_pin=28, poll_period_ms=200, tx_on_ms=30) +# pair.run(lambda seen, edges, now: print("OK" if seen else "NO", edges)) diff --git a/ir_test.py b/ir_test.py new file mode 100644 index 0000000..19a88e8 --- /dev/null +++ b/ir_test.py @@ -0,0 +1,71 @@ +from machine import Pin +from time import ticks_ms, ticks_diff + +from ir_pair import IRRxTxPollPair + + +# --- Настройки пинов --- +# RX_PIN: выход ИК-приёмника (обычно TTL сигнал с модуля VS1838/TSOP*) +# TX_PIN: пин ИК-светодиода/ключа (на него подаётся 38кГц PWM во время окна опроса) +RX_PIN = 16 +TX_PIN = 6 + + +# --- Тайминги опроса --- +POLL_PERIOD_MS = 500 # период опроса (как часто проверяем) +TX_ON_MS = 50 # сколько держим 38кГц включённым в каждом цикле +STATUS_PERIOD_MS = 1000 + + +# --- Условия теста --- +# N влияет сразу на: +# - сколько раз "мигнуть" ИК-передатчиком за 1 цикл опроса +# - сколько фронтов (edges) нужно увидеть, чтобы считать луч НЕ перекрытым +N = 5 + + +LED = Pin("LED", Pin.OUT) + + +def main(): + pair = IRRxTxPollPair( + rx_pin=RX_PIN, + tx_pin=TX_PIN, + poll_period_ms=POLL_PERIOD_MS, + tx_on_ms=TX_ON_MS, + blinks_per_poll=N, + blink_off_ms=5, + freq_hz=38_000, + duty_percent=33, + min_edges=N, + # Считаем только FALLING: обычно на каждом включении carrier RX даёт 1 спад. + # Тогда 5 миганий ~= 5 edges (а при подсчёте обоих фронтов было бы ~10). + count_rising=False, + count_falling=True, + ) + + last_status = ticks_ms() + + print("IR poll test started") + print("RX pin:", RX_PIN, "TX pin:", TX_PIN) + print("poll_period_ms:", POLL_PERIOD_MS, "tx_on_ms:", TX_ON_MS) + print("N:", N, "(blinks_per_poll and min_edges)") + + try: + while True: + pair.poll_once() # результат и счетчики сохраняются внутри pair + LED.toggle() + + now = ticks_ms() + if ticks_diff(now, last_status) >= STATUS_PERIOD_MS: + seen = bool(pair.last_seen) + print( + ("BEAM NOT BLOCKED" if seen else "BEAM BLOCKED") + + " | edges=%d" % pair.last_edges + ) + last_status = now + finally: + pair.deinit() + + +main() diff --git a/main.py b/main.py index 343bf58..200e07b 100644 --- a/main.py +++ b/main.py @@ -2,8 +2,11 @@ import sys from switch import Switch import select from machine import Pin +from time import sleep_ms +from ir_pair import IRRxTxPollPair +SEMINSU = dict() SWITCHES = dict() LED = Pin("LED", Pin.OUT) # "LED" — специальное имя для встроенного индикатора @@ -21,7 +24,28 @@ def load_switches(): angle_plus=sw_cfg["angle_plus"] ) SWITCHES[sw.id] = sw - # print(f"Loaded {len(SWITCHES)} switch{'es' if len(SWITCHES) > 1 else ''}.") + + +def load_seminsus(): + with open("config.json", "r") as file: + import json + config = json.load(file) + for sem_cfg in config["seminsus"]: + seminsu = IRRxTxPollPair( + rx_pin=sem_cfg["pin_rx"], + tx_pin=sem_cfg["pin_tx"], + poll_period_ms=200, + tx_on_ms=10, + blinks_per_poll=10, + blink_off_ms=5, + freq_hz=38_000, + duty_percent=33, + min_edges=10, + count_rising=False, + count_falling=True, + ) + SEMINSU[sem_cfg["id"]] = seminsu + def resolve_command(command: str): @@ -33,14 +57,14 @@ def resolve_command(command: str): return "ERROR Invalid ID" direction = parts[3] - if direction not in ("+", "-"): + if direction not in ("1", "0"): return "ERROR Invalid direction" if sw_id not in SWITCHES: return f"ERROR Switch {sw_id} not found" # Выполняем действие - if direction == "+": + if direction == "0": SWITCHES[sw_id].set_plus() else: SWITCHES[sw_id].set_minus() @@ -51,15 +75,70 @@ def resolve_command(command: str): evts = [] for id, sw in SWITCHES.items(): evts.append(f"EVENT SWITCH {id} {sw.pos}") + for id, seminsu in SEMINSU.items(): + evts.append(f"EVENT IK_MODULE {id} {seminsu.last_state}") return "\n".join(evts) +_seminsu_ids = [] +_seminsu_idx = 0 +_active_seminsu_id = None + + +def _next_seminsu_id(): + global _seminsu_idx + if not _seminsu_ids: + return None + sid = _seminsu_ids[_seminsu_idx] + _seminsu_idx = (_seminsu_idx + 1) % len(_seminsu_ids) + return sid + + +def poll_seminsus_step(): + """Неблокирующий шаг опроса seminsu. + + В каждый момент времени опрашивается только ОДНА пара. + """ + global _active_seminsu_id + + if not SEMINSU: + return + + if _active_seminsu_id is None: + _active_seminsu_id = _next_seminsu_id() + if _active_seminsu_id is None: + return + SEMINSU[_active_seminsu_id].start_poll() + + seminsu = SEMINSU[_active_seminsu_id] + done = seminsu.update() + if not done: + return + + # Цикл завершён — печатаем при изменении состояния. + if seminsu.prev_state is not None and seminsu.last_state is not None: + if seminsu.prev_state != seminsu.last_state: + # state: 1 = перекрыт, 0 = не перекрыт + print(f"EVENT IK_MODULE {_active_seminsu_id} {seminsu.last_state}") + + _active_seminsu_id = None + + def work(): poll = select.poll() poll.register(sys.stdin, select.POLLIN) + # Готовим список id seminsu для round-robin. + global _seminsu_ids, _seminsu_idx, _active_seminsu_id + _seminsu_ids = list(SEMINSU.keys()) + _seminsu_idx = 0 + _active_seminsu_id = None + while True: - events = poll.poll() + LED.toggle() + + # 1) Обработка stdin НЕ блокирует цикл seminsu. + events = poll.poll(0) for fd, event in events: if event & select.POLLIN: try: @@ -67,16 +146,20 @@ def work(): if not line: continue - parts = line.split() result = resolve_command(line) if result: print(result) except Exception as e: - # Любая неожиданная ошибка — тоже одна строка print(f"ERROR {e}") + # 2) Один неблокирующий шаг опроса seminsu. + poll_seminsus_step() + + # Небольшая пауза, чтобы не крутить CPU на 100%. + sleep_ms(1) + if __name__ == "__main__": - LED.on() load_switches() + load_seminsus() work() \ No newline at end of file diff --git a/switch.py b/switch.py index 686f7c6..1602463 100644 --- a/switch.py +++ b/switch.py @@ -8,13 +8,13 @@ class Switch: self.angle_minus = angle_minus self.angle_plus = angle_plus self.servo = Servo(pin) - self.pos = "-" + self.pos = "1" # 1 - минус, 0 - плюс self.set_minus() def set_minus(self): self.servo.angle(self.angle_minus) - self.pos = "-" + self.pos = "1" def set_plus(self): self.servo.angle(self.angle_plus) - self.pos = "+" \ No newline at end of file + self.pos = "0" \ No newline at end of file