303 lines
11 KiB
Python
303 lines
11 KiB
Python
from machine import Pin, PWM
|
||
from time import ticks_ms, ticks_diff, sleep_ms
|
||
|
||
|
||
class IRRxTxPollPair:
|
||
"""Пара ИК-передатчик/приёмник с опросной логикой.
|
||
|
||
Логика цикла:
|
||
1) включаем передатчик (непрерывные 38кГц)
|
||
2) ждём окно выборки и считаем фронты на RX
|
||
3) выключаем передатчик
|
||
4) ждём остаток периода
|
||
|
||
Примечание:
|
||
Многие ИК-приёмники (TSOP/VS1838) демодулируют посылки и могут
|
||
подавлять "постоянный" carrier из-за AGC. В таком случае фронтов может
|
||
быть мало/не быть — это особенность модуля приёмника.
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
*,
|
||
rx_pin: int,
|
||
tx_pin: int,
|
||
poll_period_ms: int = 500,
|
||
tx_on_ms: int = 50,
|
||
blinks_per_poll: int = 1,
|
||
blink_off_ms: int = 5,
|
||
freq_hz: int = 38_000,
|
||
duty_percent: int = 33,
|
||
min_edges: int = 1,
|
||
count_rising: bool = True,
|
||
count_falling: bool = True,
|
||
rx_pull: int | None = None,
|
||
):
|
||
if poll_period_ms <= 0:
|
||
raise ValueError("poll_period_ms must be > 0")
|
||
if tx_on_ms <= 0 or tx_on_ms > poll_period_ms:
|
||
raise ValueError("tx_on_ms must be in 1..poll_period_ms")
|
||
if blinks_per_poll <= 0:
|
||
raise ValueError("blinks_per_poll must be > 0")
|
||
if blink_off_ms < 0:
|
||
raise ValueError("blink_off_ms must be >= 0")
|
||
if not (1 <= duty_percent <= 100):
|
||
raise ValueError("duty_percent must be in 1..100")
|
||
if min_edges < 0:
|
||
raise ValueError("min_edges must be >= 0")
|
||
if not (count_rising or count_falling):
|
||
raise ValueError("At least one of count_rising/count_falling must be True")
|
||
|
||
active_ms = (blinks_per_poll * tx_on_ms) + ((blinks_per_poll - 1) * blink_off_ms)
|
||
if active_ms > poll_period_ms:
|
||
raise ValueError("poll_period_ms too small for blinks_per_poll/tx_on_ms/blink_off_ms")
|
||
|
||
self.rx_pin_num = rx_pin
|
||
self.tx_pin_num = tx_pin
|
||
|
||
self.poll_period_ms = poll_period_ms
|
||
self.tx_on_ms = tx_on_ms
|
||
self.blinks_per_poll = blinks_per_poll
|
||
self.blink_off_ms = blink_off_ms
|
||
self.freq_hz = freq_hz
|
||
self.duty_percent = duty_percent
|
||
self.min_edges = min_edges
|
||
self.count_rising = count_rising
|
||
self.count_falling = count_falling
|
||
|
||
pull = 0
|
||
if rx_pull is not None:
|
||
pull = rx_pull
|
||
|
||
self._rx = Pin(rx_pin, Pin.IN, pull)
|
||
self._tx_pin = Pin(tx_pin, Pin.OUT)
|
||
self._pwm = PWM(self._tx_pin)
|
||
self._pwm.freq(freq_hz)
|
||
|
||
self._edge_count = 0
|
||
# Храним результат опросов внутри пары.
|
||
self._last_seen = None # type: bool | None
|
||
self._prev_seen = None # type: bool | None
|
||
self._last_edges = 0
|
||
self._prev_edges = 0
|
||
self._last_poll_ms = None # type: int | None
|
||
trigger = 0
|
||
if count_rising:
|
||
trigger |= Pin.IRQ_RISING
|
||
if count_falling:
|
||
trigger |= Pin.IRQ_FALLING
|
||
self._rx.irq(trigger=trigger, handler=self._on_rx_edge)
|
||
|
||
self._tx_enabled = False
|
||
self.disable_tx() # стартуем с выключенного излучения
|
||
|
||
# Неблокирующий опрос: state machine.
|
||
self._poll_active = False
|
||
self._phase = 0 # 0=idle, 1=tx_on, 2=tx_off_gap, 3=rest
|
||
self._blink_idx = 0
|
||
self._deadline_ms = 0
|
||
|
||
def _on_rx_edge(self, _pin):
|
||
self._edge_count += 1
|
||
|
||
@property
|
||
def edge_count(self) -> int:
|
||
"""Количество фронтов на RX в последнем/текущем цикле опроса."""
|
||
return self._edge_count
|
||
|
||
@property
|
||
def last_seen(self) -> bool | None:
|
||
"""Результат последнего poll_once(): True/False или None если ещё не вызывали."""
|
||
return self._last_seen
|
||
|
||
@property
|
||
def prev_seen(self) -> bool | None:
|
||
"""Результат предпоследнего poll_once(): True/False или None если данных нет."""
|
||
return self._prev_seen
|
||
|
||
@property
|
||
def last_edges(self) -> int:
|
||
"""Количество фронтов, зафиксированных в последнем poll_once()."""
|
||
return self._last_edges
|
||
|
||
@property
|
||
def prev_edges(self) -> int:
|
||
"""Количество фронтов, зафиксированных в предпоследнем poll_once()."""
|
||
return self._prev_edges
|
||
|
||
@property
|
||
def last_poll_ms(self) -> int | None:
|
||
"""Время (ticks_ms) когда завершился последний poll_once(), либо None."""
|
||
return self._last_poll_ms
|
||
|
||
@staticmethod
|
||
def _seen_to_state(seen: bool) -> int:
|
||
# 0 = луч перекрыт, 1 = луч не перекрыт
|
||
return 0 if seen else 1
|
||
|
||
@property
|
||
def last_state(self) -> int | None:
|
||
"""Состояние последнего опроса: 1=перекрыт, 0=не перекрыт, None=нет данных."""
|
||
if self._last_seen is None:
|
||
return None
|
||
return self._seen_to_state(self._last_seen)
|
||
|
||
@property
|
||
def prev_state(self) -> int | None:
|
||
"""Состояние предпоследнего опроса: 1=перекрыт, 0=не перекрыт, None=нет данных."""
|
||
if self._prev_seen is None:
|
||
return None
|
||
return self._seen_to_state(self._prev_seen)
|
||
|
||
@staticmethod
|
||
def _set_pwm_duty(pwm: PWM, duty_u16: int):
|
||
# Совместимость между портами MicroPython.
|
||
try:
|
||
pwm.duty_u16(duty_u16)
|
||
except AttributeError:
|
||
# ESP32/старые порты
|
||
duty_10bit = (duty_u16 * 1023) // 65535
|
||
duty = getattr(pwm, "duty", None)
|
||
if duty is None:
|
||
raise
|
||
duty(duty_10bit)
|
||
|
||
def enable_tx(self):
|
||
duty_u16 = int(65535 * self.duty_percent / 100)
|
||
self._set_pwm_duty(self._pwm, duty_u16)
|
||
self._tx_enabled = True
|
||
|
||
def disable_tx(self):
|
||
self._set_pwm_duty(self._pwm, 0)
|
||
self._tx_enabled = False
|
||
|
||
@property
|
||
def poll_in_progress(self) -> bool:
|
||
return self._poll_active
|
||
|
||
def start_poll(self):
|
||
"""Запустить один цикл опроса НЕБЛОКИРУЮЩЕ.
|
||
|
||
Дальше нужно часто вызывать update(); когда update() вернёт True — цикл завершён.
|
||
"""
|
||
if self._poll_active:
|
||
return
|
||
|
||
# Сдвигаем прошлое состояние.
|
||
self._prev_seen = self._last_seen
|
||
self._prev_edges = self._last_edges
|
||
|
||
self._edge_count = 0
|
||
self._blink_idx = 0
|
||
|
||
now = ticks_ms()
|
||
self.enable_tx()
|
||
self._phase = 1 # tx_on
|
||
self._deadline_ms = now + self.tx_on_ms
|
||
self._poll_active = True
|
||
|
||
def update(self) -> bool:
|
||
"""Продвинуть state machine опроса.
|
||
|
||
Возвращает True, если цикл опроса ЗАВЕРШЁН в этом вызове.
|
||
"""
|
||
if not self._poll_active:
|
||
return False
|
||
|
||
now = ticks_ms()
|
||
if ticks_diff(now, self._deadline_ms) < 0:
|
||
return False
|
||
|
||
if self._phase == 1: # tx_on закончился
|
||
self.disable_tx()
|
||
if self._blink_idx < (self.blinks_per_poll - 1):
|
||
self._phase = 2 # tx_off_gap
|
||
self._deadline_ms = now + self.blink_off_ms
|
||
else:
|
||
self._phase = 3 # rest
|
||
active_ms = (self.blinks_per_poll * self.tx_on_ms) + ((self.blinks_per_poll - 1) * self.blink_off_ms)
|
||
rest = self.poll_period_ms - active_ms
|
||
self._deadline_ms = now + rest
|
||
return False
|
||
|
||
if self._phase == 2: # gap закончился, следующий blink
|
||
self._blink_idx += 1
|
||
self.enable_tx()
|
||
self._phase = 1
|
||
self._deadline_ms = now + self.tx_on_ms
|
||
return False
|
||
|
||
# rest закончился -> фиксируем результат
|
||
seen = self._edge_count >= self.min_edges
|
||
self._last_seen = seen
|
||
self._last_edges = self._edge_count
|
||
self._last_poll_ms = now
|
||
|
||
self._poll_active = False
|
||
self._phase = 0
|
||
return True
|
||
|
||
def poll_once(self) -> bool:
|
||
"""Один цикл опроса. Возвращает True если RX "видел" активность."""
|
||
# Совместимость: блокирующий вариант поверх неблокирующего.
|
||
self.start_poll()
|
||
while not self.update():
|
||
sleep_ms(1)
|
||
return bool(self._last_seen)
|
||
|
||
def run(self, callback=None):
|
||
"""Бесконечный опрос.
|
||
|
||
callback(seen: bool, edge_count: int, now_ms: int) вызывается каждый цикл.
|
||
"""
|
||
next_t = ticks_ms()
|
||
while True:
|
||
now = ticks_ms()
|
||
# Держим период стабильным даже если callback/лог печатает долго.
|
||
if ticks_diff(now, next_t) < 0:
|
||
sleep_ms(1)
|
||
continue
|
||
|
||
self._edge_count = 0
|
||
for i in range(self.blinks_per_poll):
|
||
self.enable_tx()
|
||
sleep_ms(self.tx_on_ms)
|
||
self.disable_tx()
|
||
if self.blink_off_ms and i != (self.blinks_per_poll - 1):
|
||
sleep_ms(self.blink_off_ms)
|
||
|
||
# Сдвигаем прошлое состояние и фиксируем текущее.
|
||
self._prev_seen = self._last_seen
|
||
self._prev_edges = self._last_edges
|
||
|
||
seen = self._edge_count >= self.min_edges
|
||
self._last_seen = seen
|
||
self._last_edges = self._edge_count
|
||
self._last_poll_ms = ticks_ms()
|
||
if callback is not None:
|
||
callback(seen, self._edge_count, now)
|
||
|
||
next_t = next_t + self.poll_period_ms
|
||
|
||
def deinit(self):
|
||
try:
|
||
self._rx.irq(handler=None)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
self.disable_tx()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
self._pwm.deinit()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# Пример использования:
|
||
#
|
||
# from ir_pair import IRRxTxPollPair
|
||
#
|
||
# pair = IRRxTxPollPair(rx_pin=16, tx_pin=28, poll_period_ms=200, tx_on_ms=30)
|
||
# pair.run(lambda seen, edges, now: print("OK" if seen else "NO", edges))
|