micro_mpc_kmk/ir_pair.py

303 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from machine import Pin, PWM
from time import ticks_ms, ticks_diff, sleep_ms
class IRRxTxPollPair:
"""Пара ИК-передатчик/приёмник с опросной логикой.
Логика цикла:
1) включаем передатчик (непрерывные 38кГц)
2) ждём окно выборки и считаем фронты на RX
3) выключаем передатчик
4) ждём остаток периода
Примечание:
Многие ИК-приёмники (TSOP/VS1838) демодулируют посылки и могут
подавлять "постоянный" carrier из-за AGC. В таком случае фронтов может
быть мало/не быть — это особенность модуля приёмника.
"""
def __init__(
self,
*,
rx_pin: int,
tx_pin: int,
poll_period_ms: int = 500,
tx_on_ms: int = 50,
blinks_per_poll: int = 1,
blink_off_ms: int = 5,
freq_hz: int = 38_000,
duty_percent: int = 33,
min_edges: int = 1,
count_rising: bool = True,
count_falling: bool = True,
rx_pull: int | None = None,
):
if poll_period_ms <= 0:
raise ValueError("poll_period_ms must be > 0")
if tx_on_ms <= 0 or tx_on_ms > poll_period_ms:
raise ValueError("tx_on_ms must be in 1..poll_period_ms")
if blinks_per_poll <= 0:
raise ValueError("blinks_per_poll must be > 0")
if blink_off_ms < 0:
raise ValueError("blink_off_ms must be >= 0")
if not (1 <= duty_percent <= 100):
raise ValueError("duty_percent must be in 1..100")
if min_edges < 0:
raise ValueError("min_edges must be >= 0")
if not (count_rising or count_falling):
raise ValueError("At least one of count_rising/count_falling must be True")
active_ms = (blinks_per_poll * tx_on_ms) + ((blinks_per_poll - 1) * blink_off_ms)
if active_ms > poll_period_ms:
raise ValueError("poll_period_ms too small for blinks_per_poll/tx_on_ms/blink_off_ms")
self.rx_pin_num = rx_pin
self.tx_pin_num = tx_pin
self.poll_period_ms = poll_period_ms
self.tx_on_ms = tx_on_ms
self.blinks_per_poll = blinks_per_poll
self.blink_off_ms = blink_off_ms
self.freq_hz = freq_hz
self.duty_percent = duty_percent
self.min_edges = min_edges
self.count_rising = count_rising
self.count_falling = count_falling
pull = 0
if rx_pull is not None:
pull = rx_pull
self._rx = Pin(rx_pin, Pin.IN, pull)
self._tx_pin = Pin(tx_pin, Pin.OUT)
self._pwm = PWM(self._tx_pin)
self._pwm.freq(freq_hz)
self._edge_count = 0
# Храним результат опросов внутри пары.
self._last_seen = None # type: bool | None
self._prev_seen = None # type: bool | None
self._last_edges = 0
self._prev_edges = 0
self._last_poll_ms = None # type: int | None
trigger = 0
if count_rising:
trigger |= Pin.IRQ_RISING
if count_falling:
trigger |= Pin.IRQ_FALLING
self._rx.irq(trigger=trigger, handler=self._on_rx_edge)
self._tx_enabled = False
self.disable_tx() # стартуем с выключенного излучения
# Неблокирующий опрос: state machine.
self._poll_active = False
self._phase = 0 # 0=idle, 1=tx_on, 2=tx_off_gap, 3=rest
self._blink_idx = 0
self._deadline_ms = 0
def _on_rx_edge(self, _pin):
self._edge_count += 1
@property
def edge_count(self) -> int:
"""Количество фронтов на RX в последнем/текущем цикле опроса."""
return self._edge_count
@property
def last_seen(self) -> bool | None:
"""Результат последнего poll_once(): True/False или None если ещё не вызывали."""
return self._last_seen
@property
def prev_seen(self) -> bool | None:
"""Результат предпоследнего poll_once(): True/False или None если данных нет."""
return self._prev_seen
@property
def last_edges(self) -> int:
"""Количество фронтов, зафиксированных в последнем poll_once()."""
return self._last_edges
@property
def prev_edges(self) -> int:
"""Количество фронтов, зафиксированных в предпоследнем poll_once()."""
return self._prev_edges
@property
def last_poll_ms(self) -> int | None:
"""Время (ticks_ms) когда завершился последний poll_once(), либо None."""
return self._last_poll_ms
@staticmethod
def _seen_to_state(seen: bool) -> int:
# 0 = луч перекрыт, 1 = луч не перекрыт
return 0 if seen else 1
@property
def last_state(self) -> int | None:
"""Состояние последнего опроса: 1=перекрыт, 0=не перекрыт, None=нет данных."""
if self._last_seen is None:
return None
return self._seen_to_state(self._last_seen)
@property
def prev_state(self) -> int | None:
"""Состояние предпоследнего опроса: 1=перекрыт, 0=не перекрыт, None=нет данных."""
if self._prev_seen is None:
return None
return self._seen_to_state(self._prev_seen)
@staticmethod
def _set_pwm_duty(pwm: PWM, duty_u16: int):
# Совместимость между портами MicroPython.
try:
pwm.duty_u16(duty_u16)
except AttributeError:
# ESP32/старые порты
duty_10bit = (duty_u16 * 1023) // 65535
duty = getattr(pwm, "duty", None)
if duty is None:
raise
duty(duty_10bit)
def enable_tx(self):
duty_u16 = int(65535 * self.duty_percent / 100)
self._set_pwm_duty(self._pwm, duty_u16)
self._tx_enabled = True
def disable_tx(self):
self._set_pwm_duty(self._pwm, 0)
self._tx_enabled = False
@property
def poll_in_progress(self) -> bool:
return self._poll_active
def start_poll(self):
"""Запустить один цикл опроса НЕБЛОКИРУЮЩЕ.
Дальше нужно часто вызывать update(); когда update() вернёт True — цикл завершён.
"""
if self._poll_active:
return
# Сдвигаем прошлое состояние.
self._prev_seen = self._last_seen
self._prev_edges = self._last_edges
self._edge_count = 0
self._blink_idx = 0
now = ticks_ms()
self.enable_tx()
self._phase = 1 # tx_on
self._deadline_ms = now + self.tx_on_ms
self._poll_active = True
def update(self) -> bool:
"""Продвинуть state machine опроса.
Возвращает True, если цикл опроса ЗАВЕРШЁН в этом вызове.
"""
if not self._poll_active:
return False
now = ticks_ms()
if ticks_diff(now, self._deadline_ms) < 0:
return False
if self._phase == 1: # tx_on закончился
self.disable_tx()
if self._blink_idx < (self.blinks_per_poll - 1):
self._phase = 2 # tx_off_gap
self._deadline_ms = now + self.blink_off_ms
else:
self._phase = 3 # rest
active_ms = (self.blinks_per_poll * self.tx_on_ms) + ((self.blinks_per_poll - 1) * self.blink_off_ms)
rest = self.poll_period_ms - active_ms
self._deadline_ms = now + rest
return False
if self._phase == 2: # gap закончился, следующий blink
self._blink_idx += 1
self.enable_tx()
self._phase = 1
self._deadline_ms = now + self.tx_on_ms
return False
# rest закончился -> фиксируем результат
seen = self._edge_count >= self.min_edges
self._last_seen = seen
self._last_edges = self._edge_count
self._last_poll_ms = now
self._poll_active = False
self._phase = 0
return True
def poll_once(self) -> bool:
"""Один цикл опроса. Возвращает True если RX "видел" активность."""
# Совместимость: блокирующий вариант поверх неблокирующего.
self.start_poll()
while not self.update():
sleep_ms(1)
return bool(self._last_seen)
def run(self, callback=None):
"""Бесконечный опрос.
callback(seen: bool, edge_count: int, now_ms: int) вызывается каждый цикл.
"""
next_t = ticks_ms()
while True:
now = ticks_ms()
# Держим период стабильным даже если callback/лог печатает долго.
if ticks_diff(now, next_t) < 0:
sleep_ms(1)
continue
self._edge_count = 0
for i in range(self.blinks_per_poll):
self.enable_tx()
sleep_ms(self.tx_on_ms)
self.disable_tx()
if self.blink_off_ms and i != (self.blinks_per_poll - 1):
sleep_ms(self.blink_off_ms)
# Сдвигаем прошлое состояние и фиксируем текущее.
self._prev_seen = self._last_seen
self._prev_edges = self._last_edges
seen = self._edge_count >= self.min_edges
self._last_seen = seen
self._last_edges = self._edge_count
self._last_poll_ms = ticks_ms()
if callback is not None:
callback(seen, self._edge_count, now)
next_t = next_t + self.poll_period_ms
def deinit(self):
try:
self._rx.irq(handler=None)
except Exception:
pass
try:
self.disable_tx()
except Exception:
pass
try:
self._pwm.deinit()
except Exception:
pass
# Пример использования:
#
# from ir_pair import IRRxTxPollPair
#
# pair = IRRxTxPollPair(rx_pin=16, tx_pin=28, poll_period_ms=200, tx_on_ms=30)
# pair.run(lambda seen, edges, now: print("OK" if seen else "NO", edges))