Source code for tslumd.sender

from __future__ import annotations
try:
    from loguru import logger
except ImportError: # pragma: no cover
    import logging
    logger = logging.getLogger(__name__)
import asyncio
import socket
import argparse
from typing import Tuple, Iterable

from pydispatch import Dispatcher, Property, DictProperty, ListProperty

from tslumd import (
    MessageType, Message, Display, TallyColor, TallyType, TallyKey,
    Tally, Screen,
)
from tslumd.tallyobj import StrOrTallyType, StrOrTallyColor
from tslumd.utils import logger_catch

Client = Tuple[str, int] #: A network client as a tuple of ``(address, port)``

__all__ = ('Client', 'UmdSender')

class UmdProtocol(asyncio.DatagramProtocol):
    def __init__(self, sender: 'UmdSender'):
        self.sender = sender
    def connection_made(self, transport):
        logger.debug(f'transport={transport}')
        self.transport = transport
        self.sender.connected_evt.set()
    def datagram_received(self, data, addr): # pragma: no cover
        pass

[docs] class UmdSender(Dispatcher): """Send UMD Messages Messages are sent immediately when a change is made to any of the :class:`~.Tally` objects in :attr:`tallies`. These can be added by using the :meth:`add_tally` method. Alternatively, the :meth:`set_tally_color` and :meth:`set_tally_text` methods may be used. Arguments: clients: Intitial value for :attr:`clients` all_off_on_close: Initial value for :attr:`all_off_on_close` .. versionchanged:: 0.0.4 The ``all_off_on_close`` parameter was added """ screens: dict[int, Screen] """Mapping of :class:`~.Screen` objects by :attr:`~.Screen.index` .. versionadded:: 0.0.3 """ tallies: dict[TallyKey, Tally] """Mapping of :class:`~.Tally` objects by their :attr:`~.Tally.id` Note: This should not be altered directly. Use :meth:`add_tally` instead .. versionchanged:: 0.0.3 The keys are now a combination of the :class:`~.Screen` and :class:`.Tally` indices """ running: bool """``True`` if the client / server are running """ tx_interval: float = .3 """Interval to send tally messages, regardless of state changes """ clients: set[Client] """Set of :data:`clients <Client>` to send messages to """ all_off_on_close: bool """If ``True``, a broadcast message will be sent before shutdown to turn off all tally lights in the system. (default is ``False``) .. versionadded:: 0.0.4 """ def __init__(self, clients: Iterable[Client]|None = None, all_off_on_close: bool = False): self.clients = set() if clients is not None: for client in clients: self.clients.add(client) self.all_off_on_close = all_off_on_close self.screens = {} self.tallies = {} self.running = False self.__loop: asyncio.AbstractEventLoop|None = None self.__broadcast_screen: Screen|None = None self.__update_queue: asyncio.PriorityQueue[TallyKey|tuple[int, bool]]|None = None self.update_task = None self.tx_task: asyncio.Task|None = None self.__connected_evt: asyncio.Event| None = None self.__tx_lock: asyncio.Lock|None = None @property def loop(self) -> asyncio.AbstractEventLoop: """The :class:`asyncio.BaseEventLoop` associated with the instance""" loop = self.__loop if loop is None: loop = self.__loop = asyncio.get_running_loop() return loop @property def connected_evt(self) -> asyncio.Event: e = self.__connected_evt if e is None: e = self.__connected_evt = asyncio.Event() return e @property def _tx_lock(self) -> asyncio.Lock: l = self.__tx_lock if l is None: l = self.__tx_lock = asyncio.Lock() return l @property def update_queue(self) -> asyncio.PriorityQueue[TallyKey|tuple[int, bool]]: q = self.__update_queue if q is None: q = self.__update_queue = asyncio.PriorityQueue() return q @property def broadcast_screen(self) -> Screen: """A :class:`~.Screen` instance created using :meth:`.Screen.broadcast` .. versionadded:: 0.0.3 """ return self._build_broadcast_screen() def _build_broadcast_screen(self) -> Screen: if self.__broadcast_screen is not None: return self.__broadcast_screen screen = self.__broadcast_screen = Screen.broadcast() assert screen.is_broadcast self.screens[screen.index] = screen self._bind_screen(screen) return screen
[docs] async def open(self): """Open connections and begin data transmission """ if self.running: return self._build_broadcast_screen() self.connected_evt.clear() assert self.tx_task is None logger.debug('UmdSender.open()') self.running = True self.transport, self.protocol = await self.loop.create_datagram_endpoint( lambda: UmdProtocol(self), family=socket.AF_INET, ) self.tx_task = asyncio.create_task(self.tx_loop()) logger.info('UmdSender running')
[docs] async def close(self): """Stop sending to clients and close connections """ if not self.running: return logger.debug('UmdSender.close()') self.running = False await self.update_queue.put((0, False)) t = self.tx_task self.tx_task = None if t is not None: await t if self.all_off_on_close: logger.debug('sending all off broadcast message') await self.send_broadcast_tally(0xffff) self.transport.close() logger.info('UmdSender closed')
[docs] async def send_scontrol(self, screen_index: int, data: bytes): """Send an :attr:`SCONTROL <.Message.scontrol>` message Arguments: screen_index: The :attr:`~.Message.screen` index for the message data: The data to send in the :attr:`~.Message.scontrol` field .. versionadded:: 0.0.2 """ screen = self.get_or_create_screen(screen_index) screen.scontrol = data
[docs] async def send_broadcast_scontrol(self, data: bytes): """Send a :attr:`broadcast <.Message.is_broadcast>` :attr:`SCONTROL <.Message.scontrol>` message Arguments: data: The data to send in the :attr:`~.Message.scontrol` field .. versionadded:: 0.0.2 """ self.broadcast_screen.scontrol = data
[docs] def add_tally(self, tally_id: TallyKey, **kwargs) -> Tally: """Create a :class:`~.Tally` object and add it to :attr:`tallies` if one does not exist If necessary, creates a :class:`~.Screen` using :meth:`get_or_create_screen` Arguments: tally_id: A tuple of (:attr:`screen_index <.Screen.index>`, :attr:`tally_index <.Tally.index>`) **kwargs: Keyword arguments passed to create the tally instance Raises: KeyError: If the given ``tally_id`` already exists .. versionchanged:: 0.0.3 Chaned the ``tally_index`` parameter to ``tally_id`` """ if tally_id in self.tallies: raise KeyError(f'Tally exists for id {tally_id}') screen_index, tally_index = tally_id screen = self.get_or_create_screen(screen_index) tally = screen.add_tally(tally_index, **kwargs) return tally
[docs] def get_or_create_tally(self, tally_id: TallyKey) -> Tally: """If a :class:`~.Tally` object matching the given tally id exists, return it. Otherwise, create it using :meth:`.Screen.get_or_create_tally` This method is similar to :meth:`add_tally` and it can be used to avoid exception handling. It does not however take keyword arguments and is only intended for object creation. .. versionadded:: 0.0.3 """ tally = self.tallies.get(tally_id) if tally is not None: return tally screen_index, tally_index = tally_id screen = self.get_or_create_screen(screen_index) tally = screen.get_or_create_tally(tally_index) return tally
[docs] def get_or_create_screen(self, index_: int) -> Screen: """Create a :class:`~.Screen` object and add it to :attr:`screens` Arguments: index_: The screen :attr:`~.Screen.index` Raises: KeyError: If the given ``index_`` already exists .. versionadded:: 0.0.3 """ if index_ in self.screens: return self.screens[index_] screen = Screen(index_) self.screens[screen.index] = screen self._bind_screen(screen) return screen
def _bind_screen(self, screen: Screen): screen.bind(on_tally_added=self._on_screen_tally_added) screen.bind_async(self.loop, on_tally_update=self.on_tally_updated, on_tally_control=self.on_tally_control, on_control=self.on_screen_control, )
[docs] def set_tally_color(self, tally_id: TallyKey, tally_type: StrOrTallyType, color: StrOrTallyColor): """Set the tally color for the given index and tally type Uses :meth:`.Tally.set_color`. See the method documentation for details Arguments: tally_id (TallyKey): A tuple of (:attr:`screen_index <.Screen.index>`, :attr:`tally_index <.Tally.index>`) tally_type (TallyType or str): :class:`~.common.TallyType` or member name as described in :meth:`.Tally.set_color` color (TallyColor or str): :class:`~.common.TallyColor` or color name as described in :meth:`.Tally.set_color` .. versionchanged:: 0.0.3 Chaned the ``tally_index`` parameter to ``tally_id`` .. versionchanged:: 0.0.5 Accept string arguments and match behavior of :meth:`.Tally.set_color` """ if tally_type == TallyType.no_tally: raise ValueError() tally = self.get_or_create_tally(tally_id) tally[tally_type] = color
[docs] def set_tally_text(self, tally_id: TallyKey, text: str): """Set the tally text for the given id Arguments: tally_id: A tuple of (:attr:`screen_index <.Screen.index>`, :attr:`tally_index <.Tally.index>`) text: The :attr:`~.Tally.text` to set .. versionchanged:: 0.0.3 Chaned the ``tally_index`` parameter to ``tally_id`` """ tally = self.get_or_create_tally(tally_id) tally.text = text
[docs] async def send_tally_control(self, tally_id: TallyKey, data: bytes): """Send :attr:`~.Display.control` data for the given screen and tally index Arguments: tally_id: A tuple of (:attr:`screen_index <.Screen.index>`, :attr:`tally_index <.Tally.index>`) control: The control data to send .. versionadded:: 0.0.2 .. versionchanged:: 0.0.3 Chaned the ``tally_index`` parameter to ``tally_id`` """ tally = self.get_or_create_tally(tally_id) tally.control = data
[docs] async def send_broadcast_tally_control(self, screen_index: int, data: bytes, **kwargs): """Send :attr:`~.Display.control` data as :attr:`broadcast <.Display.is_broadcast>` to all listening displays Arguments: screen_index: The screen :attr:`~.Screen.index` **kwargs: Additional keyword arguments to pass to the :class:`~.Tally` constructor .. versionadded:: 0.0.2 .. versionchanged:: 0.0.3 Added the screen_index parameter """ await self.send_broadcast_tally(screen_index, control=data, **kwargs)
[docs] async def send_broadcast_tally(self, screen_index: int, **kwargs): """Send a :attr:`broadcast <.Display.is_broadcast>` update to all listening displays Arguments: screen_index: The screen :attr:`~.Screen.index` **kwargs: The keyword arguments to pass to the :class:`~.Tally` constructor .. versionadded:: 0.0.2 .. versionchanged:: 0.0.3 Added the screen_index parameter """ screen = self.get_or_create_screen(screen_index) tally = screen.broadcast_tally(**kwargs) if tally.text == '' or tally.control != b'': msg_type = MessageType.control else: msg_type = MessageType.display msg = self._build_message(screen=screen_index) disp = Display.from_tally(tally, msg_type=msg_type) msg.displays.append(disp) async with self._tx_lock: await self.send_message(msg) screen.unbind(self) for oth_tally in screen: oth_tally.update_from_display(disp) self._bind_screen(screen)
async def on_tally_updated(self, tally: Tally, props_changed: set[str], **kwargs): if self.running: if set(props_changed) == set(['control']): return logger.debug(f'tally update: {tally}') await self.update_queue.put(tally.id) async def on_tally_control(self, tally: Tally, data: bytes, **kwargs): if self.running: async with self._tx_lock: disp = Display.from_tally(tally, msg_type=MessageType.control) assert tally.screen is not None msg = self._build_message( screen=tally.screen.index, displays=[disp], ) await self.send_message(msg) async def on_screen_control(self, screen: Screen, data: bytes, **kwargs): if self.running: async with self._tx_lock: msg = self._build_message( screen=screen.index, type=MessageType.control, scontrol=data, ) await self.send_message(msg) def _on_screen_tally_added(self, tally: Tally, **kwargs): self.tallies[tally.id] = tally logger.debug(f'new tally: {tally}') @logger_catch async def tx_loop(self): async def get_queue_item(timeout): try: item = await asyncio.wait_for(self.update_queue.get(), timeout) if item[1] is False: return False except asyncio.TimeoutError: item = None return item await self.connected_evt.wait() while self.running: item = await get_queue_item(self.tx_interval) if item is False: self.update_queue.task_done() break elif item is None: if not self._tx_lock.locked(): await self.send_full_update() else: screen_index, _ = item ids = set([item]) self.update_queue.task_done() while not self.update_queue.empty(): try: item = self.update_queue.get_nowait() except asyncio.QueueEmpty: break if item is False: self.update_queue.task_done() return _screen_index, _ = item if _screen_index == screen_index: ids.add(item) self.update_queue.task_done() else: await self.update_queue.put(item) break msg = self._build_message(screen=screen_index) tallies = {i:self.tallies[i] for i in ids} async with self._tx_lock: for key in sorted(tallies.keys()): tally = tallies[key] msg.displays.append(Display.from_tally(tally)) await self.send_message(msg) async def send_message(self, msg: Message): assert self._tx_lock.locked() for data in msg.build_messages(): for client in self.clients: self.transport.sendto(data, client) async def send_full_update(self): coros = set() for screen in self.screens.values(): coros.add(self.send_screen_update(screen)) if not len(coros): # pragma: no cover return async with self._tx_lock: await asyncio.gather(*coros) async def send_screen_update(self, screen: Screen): if screen.is_broadcast: return msg = self._build_message(screen=screen.index) for tally in screen: disp = Display.from_tally(tally) msg.displays.append(disp) await self.send_message(msg) def _build_message(self, **kwargs) -> Message: return Message(**kwargs) async def __aenter__(self): await self.open() return self async def __aexit__(self, exc_type, exc_value, traceback): await self.close()
class ClientArgAction(argparse._AppendAction): _default_help = ' '.join([ 'Client(s) to send UMD messages to formatted as "<hostaddr>:<port>".', 'Multiple arguments may be given.', 'If nothing is provided, defaults to "127.0.0.1:65000"', ]) def __init__(self, option_strings, dest, nargs=None, const=None, default=[('127.0.0.1', 65000)], type_=str, choices=None, required=False, help=_default_help, metavar=None): super().__init__( option_strings, dest, nargs, const, default, type_, choices, required, help, metavar, ) def __call__(self, parser, namespace, values, option_string=None): addr, port = values.split(':') # type: ignore values = (addr, int(port)) items = getattr(namespace, self.dest, None) if items == [('127.0.0.1', 65000)]: items = [] else: items = argparse._copy_items(items) # type: ignore items.append(values) setattr(namespace, self.dest, items) def main(): p = argparse.ArgumentParser() p.add_argument( '-c', '--client', dest='clients', action=ClientArgAction#, type=str, ) args = p.parse_args() logger.info(f'Sending to clients: {args.clients!r}') loop = asyncio.get_event_loop() sender = UmdSender(clients=args.clients) loop.run_until_complete(sender.open()) try: loop.run_forever() except KeyboardInterrupt: loop.run_until_complete(sender.close()) finally: loop.close() if __name__ == '__main__': main()