from machine import Pin, PWM from time import ticks_ms, ticks_diff, sleep_ms class IRRxTxPollPair: """Пара ИК-передатчик/приёмник с опросной логикой. Логика цикла: 1) включаем передатчик (непрерывные 38кГц) 2) ждём окно выборки и считаем фронты на RX 3) выключаем передатчик 4) ждём остаток периода Примечание: Многие ИК-приёмники (TSOP/VS1838) демодулируют посылки и могут подавлять "постоянный" carrier из-за AGC. В таком случае фронтов может быть мало/не быть — это особенность модуля приёмника. """ def __init__( self, *, rx_pin: int, tx_pin: int, poll_period_ms: int = 500, tx_on_ms: int = 50, blinks_per_poll: int = 1, blink_off_ms: int = 5, freq_hz: int = 38_000, duty_percent: int = 33, min_edges: int = 1, count_rising: bool = True, count_falling: bool = True, rx_pull: int | None = None, ): if poll_period_ms <= 0: raise ValueError("poll_period_ms must be > 0") if tx_on_ms <= 0 or tx_on_ms > poll_period_ms: raise ValueError("tx_on_ms must be in 1..poll_period_ms") if blinks_per_poll <= 0: raise ValueError("blinks_per_poll must be > 0") if blink_off_ms < 0: raise ValueError("blink_off_ms must be >= 0") if not (1 <= duty_percent <= 100): raise ValueError("duty_percent must be in 1..100") if min_edges < 0: raise ValueError("min_edges must be >= 0") if not (count_rising or count_falling): raise ValueError("At least one of count_rising/count_falling must be True") active_ms = (blinks_per_poll * tx_on_ms) + ((blinks_per_poll - 1) * blink_off_ms) if active_ms > poll_period_ms: raise ValueError("poll_period_ms too small for blinks_per_poll/tx_on_ms/blink_off_ms") self.rx_pin_num = rx_pin self.tx_pin_num = tx_pin self.poll_period_ms = poll_period_ms self.tx_on_ms = tx_on_ms self.blinks_per_poll = blinks_per_poll self.blink_off_ms = blink_off_ms self.freq_hz = freq_hz self.duty_percent = duty_percent self.min_edges = min_edges self.count_rising = count_rising self.count_falling = count_falling pull = 0 if rx_pull is not None: pull = rx_pull self._rx = Pin(rx_pin, Pin.IN, pull) self._tx_pin = Pin(tx_pin, Pin.OUT) self._pwm = PWM(self._tx_pin) self._pwm.freq(freq_hz) self._edge_count = 0 # Храним результат опросов внутри пары. self._last_seen = None # type: bool | None self._prev_seen = None # type: bool | None self._last_edges = 0 self._prev_edges = 0 self._last_poll_ms = None # type: int | None trigger = 0 if count_rising: trigger |= Pin.IRQ_RISING if count_falling: trigger |= Pin.IRQ_FALLING self._rx.irq(trigger=trigger, handler=self._on_rx_edge) self._tx_enabled = False self.disable_tx() # стартуем с выключенного излучения # Неблокирующий опрос: state machine. self._poll_active = False self._phase = 0 # 0=idle, 1=tx_on, 2=tx_off_gap, 3=rest self._blink_idx = 0 self._deadline_ms = 0 def _on_rx_edge(self, _pin): self._edge_count += 1 @property def edge_count(self) -> int: """Количество фронтов на RX в последнем/текущем цикле опроса.""" return self._edge_count @property def last_seen(self) -> bool | None: """Результат последнего poll_once(): True/False или None если ещё не вызывали.""" return self._last_seen @property def prev_seen(self) -> bool | None: """Результат предпоследнего poll_once(): True/False или None если данных нет.""" return self._prev_seen @property def last_edges(self) -> int: """Количество фронтов, зафиксированных в последнем poll_once().""" return self._last_edges @property def prev_edges(self) -> int: """Количество фронтов, зафиксированных в предпоследнем poll_once().""" return self._prev_edges @property def last_poll_ms(self) -> int | None: """Время (ticks_ms) когда завершился последний poll_once(), либо None.""" return self._last_poll_ms @staticmethod def _seen_to_state(seen: bool) -> int: # 1 = луч перекрыт, 0 = луч не перекрыт return 1 if seen else 0 @property def last_state(self) -> int | None: """Состояние последнего опроса: 1=перекрыт, 0=не перекрыт, None=нет данных.""" if self._last_seen is None: return None return self._seen_to_state(self._last_seen) @property def prev_state(self) -> int | None: """Состояние предпоследнего опроса: 1=перекрыт, 0=не перекрыт, None=нет данных.""" if self._prev_seen is None: return None return self._seen_to_state(self._prev_seen) @staticmethod def _set_pwm_duty(pwm: PWM, duty_u16: int): # Совместимость между портами MicroPython. try: pwm.duty_u16(duty_u16) except AttributeError: # ESP32/старые порты duty_10bit = (duty_u16 * 1023) // 65535 duty = getattr(pwm, "duty", None) if duty is None: raise duty(duty_10bit) def enable_tx(self): duty_u16 = int(65535 * self.duty_percent / 100) self._set_pwm_duty(self._pwm, duty_u16) self._tx_enabled = True def disable_tx(self): self._set_pwm_duty(self._pwm, 0) self._tx_enabled = False @property def poll_in_progress(self) -> bool: return self._poll_active def start_poll(self): """Запустить один цикл опроса НЕБЛОКИРУЮЩЕ. Дальше нужно часто вызывать update(); когда update() вернёт True — цикл завершён. """ if self._poll_active: return # Сдвигаем прошлое состояние. self._prev_seen = self._last_seen self._prev_edges = self._last_edges self._edge_count = 0 self._blink_idx = 0 now = ticks_ms() self.enable_tx() self._phase = 1 # tx_on self._deadline_ms = now + self.tx_on_ms self._poll_active = True def update(self) -> bool: """Продвинуть state machine опроса. Возвращает True, если цикл опроса ЗАВЕРШЁН в этом вызове. """ if not self._poll_active: return False now = ticks_ms() if ticks_diff(now, self._deadline_ms) < 0: return False if self._phase == 1: # tx_on закончился self.disable_tx() if self._blink_idx < (self.blinks_per_poll - 1): self._phase = 2 # tx_off_gap self._deadline_ms = now + self.blink_off_ms else: self._phase = 3 # rest active_ms = (self.blinks_per_poll * self.tx_on_ms) + ((self.blinks_per_poll - 1) * self.blink_off_ms) rest = self.poll_period_ms - active_ms self._deadline_ms = now + rest return False if self._phase == 2: # gap закончился, следующий blink self._blink_idx += 1 self.enable_tx() self._phase = 1 self._deadline_ms = now + self.tx_on_ms return False # rest закончился -> фиксируем результат seen = self._edge_count >= self.min_edges self._last_seen = seen self._last_edges = self._edge_count self._last_poll_ms = now self._poll_active = False self._phase = 0 return True def poll_once(self) -> bool: """Один цикл опроса. Возвращает True если RX "видел" активность.""" # Совместимость: блокирующий вариант поверх неблокирующего. self.start_poll() while not self.update(): sleep_ms(1) return bool(self._last_seen) def run(self, callback=None): """Бесконечный опрос. callback(seen: bool, edge_count: int, now_ms: int) вызывается каждый цикл. """ next_t = ticks_ms() while True: now = ticks_ms() # Держим период стабильным даже если callback/лог печатает долго. if ticks_diff(now, next_t) < 0: sleep_ms(1) continue self._edge_count = 0 for i in range(self.blinks_per_poll): self.enable_tx() sleep_ms(self.tx_on_ms) self.disable_tx() if self.blink_off_ms and i != (self.blinks_per_poll - 1): sleep_ms(self.blink_off_ms) # Сдвигаем прошлое состояние и фиксируем текущее. self._prev_seen = self._last_seen self._prev_edges = self._last_edges seen = self._edge_count >= self.min_edges self._last_seen = seen self._last_edges = self._edge_count self._last_poll_ms = ticks_ms() if callback is not None: callback(seen, self._edge_count, now) next_t = next_t + self.poll_period_ms def deinit(self): try: self._rx.irq(handler=None) except Exception: pass try: self.disable_tx() except Exception: pass try: self._pwm.deinit() except Exception: pass # Пример использования: # # from ir_pair import IRRxTxPollPair # # pair = IRRxTxPollPair(rx_pin=16, tx_pin=28, poll_period_ms=200, tx_on_ms=30) # pair.run(lambda seen, edges, now: print("OK" if seen else "NO", edges))