from __future__ import annotations
try:
from loguru import logger
except ImportError: # pragma: no cover
import logging
logger = logging.getLogger(__name__)
import asyncio
from typing import Tuple
from pydispatch import Dispatcher, Property, DictProperty, ListProperty
from tslumd import Tally, Screen, TallyKey, Message
__all__ = ('UmdReceiver',)
class UmdProtocol(asyncio.DatagramProtocol):
def __init__(self, receiver: 'UmdReceiver'):
self.receiver = receiver
def connection_made(self, transport):
logger.debug(f'transport={transport}')
self.transport = transport
self.receiver.connected_evt.set()
def datagram_received(self, data, addr):
self.receiver.parse_incoming(data, addr)
[docs]
class UmdReceiver(Dispatcher):
"""Receiver for UMD messages
Arguments:
hostaddr (str): The local host address to bind the server to. Defaults
to :attr:`DEFAULT_HOST`
hostport (int): The port to listen on. Defaults to :attr:`DEFAULT_PORT`
:Events:
.. event:: on_tally_added(tally: Tally)
Fired when a :class:`~.Tally` instance is added to :attr:`tallies`
.. event:: on_tally_updated(tally: Tally)
Fired when any :class:`~.Tally` instance has been updated
.. event:: on_tally_control(tally: Tally, data: bytes)
Fired when control data has been received for a :class:`~.Tally`
.. versionadded:: 0.0.3
.. event:: on_screen_added(screen: Screen)
Fired when a :class:`~.Screen` instance is added to :attr:`screens`
.. versionadded:: 0.0.3
.. event:: on_scontrol(screen: int, data: bytes)
Fired when a message with :attr:`~.Message.scontrol` data is received
* ``screen`` : The :attr:`~.Message.screen` from the incoming
control message
* ``data`` : The control data
.. versionadded:: 0.0.2
"""
DEFAULT_HOST: str = '0.0.0.0' #: The default host address to listen on
DEFAULT_PORT: int = 65000 #: The default host port to listen on
screens: dict[int, Screen]
"""Mapping of :class:`~.Screen` objects by :attr:`~.Screen.index`
.. versionadded:: 0.0.3
"""
broadcast_screen: Screen
"""A :class:`~.Screen` instance created using :meth:`.Screen.broadcast`
.. versionadded:: 0.0.3
"""
tallies: dict[TallyKey, Tally]
"""Mapping of :class:`~.Tally` objects by their :attr:`~.Tally.id`
.. versionchanged:: 0.0.3
The keys are now a combination of the :class:`~.Screen` and
:class:`.Tally` indices
"""
running: bool
"""``True`` if the client / server are running
"""
_events_ = [
'on_tally_added', 'on_tally_updated', 'on_tally_control',
'on_screen_added', 'on_scontrol',
]
def __init__(self, hostaddr: str = DEFAULT_HOST, hostport: int = DEFAULT_PORT):
self.__hostaddr = hostaddr
self.__hostport = hostport
self.screens = {}
self.broadcast_screen = Screen(0xffff)
self._bind_screen(self.broadcast_screen)
self.screens[self.broadcast_screen.index] = self.broadcast_screen
self.tallies = {}
self.__loop: asyncio.AbstractEventLoop|None = None
self.running = False
self.__connect_lock: asyncio.Lock|None = None
self.__connected_evt: asyncio.Event|None = None
@property
def loop(self) -> asyncio.AbstractEventLoop:
"""The :class:`asyncio.BaseEventLoop` associated with the instance"""
loop = self.__loop
if loop is None:
loop = self.__loop = asyncio.get_running_loop()
return loop
@property
def connected_evt(self) -> asyncio.Event:
e = self.__connected_evt
if e is None:
e = self.__connected_evt = asyncio.Event()
return e
@property
def _connect_lock(self) -> asyncio.Lock:
l = self.__connect_lock
if l is None:
l = self.__connect_lock = asyncio.Lock()
return l
@property
def hostaddr(self) -> str:
"""The local host address to bind the server to
"""
return self.__hostaddr
@property
def hostport(self) -> int:
"""The port to listen on
"""
return self.__hostport
[docs]
async def open(self):
"""Open the server
"""
async with self._connect_lock:
if self.running:
return
logger.debug('UmdReceiver.open()')
self.running = True
self.connected_evt.clear()
self.transport, self.protocol = await self.loop.create_datagram_endpoint(
lambda: UmdProtocol(self),
local_addr=(self.hostaddr, self.hostport),
reuse_port=True,
)
await self.connected_evt.wait()
logger.info('UmdReceiver running')
[docs]
async def close(self):
"""Close the server
"""
async with self._connect_lock:
if not self.running:
return
logger.debug('UmdReceiver.close()')
self.running = False
self.transport.close()
self.connected_evt.clear()
logger.info('UmdReceiver closed')
[docs]
async def set_bind_address(self, hostaddr: str, hostport: int):
"""Set the :attr:`hostaddr` and :attr:`hostport` and restart the server
"""
if hostaddr == self.hostaddr and hostport == self.hostport:
return
running = self.running
if running:
await self.close()
self.__hostaddr = hostaddr
self.__hostport = hostport
if running:
await self.open()
[docs]
async def set_hostaddr(self, hostaddr: str):
"""Set the :attr:`hostaddr` and restart the server
"""
await self.set_bind_address(hostaddr, self.hostport)
[docs]
async def set_hostport(self, hostport: int):
"""Set the :attr:`hostport` and restart the server
"""
await self.set_bind_address(self.hostaddr, hostport)
[docs]
def parse_incoming(self, data: bytes, addr: Tuple[str, int]):
"""Parse data received by the server
"""
while True:
message, remaining = Message.parse(data)
if message.screen not in self.screens:
screen = Screen(message.screen)
self.screens[screen.index] = screen
self._bind_screen(screen)
self.emit('on_screen_added', screen)
logger.debug(f'new screen: {screen.index}')
else:
screen = self.screens[message.screen]
if message.is_broadcast:
for screen in self.screens.values():
screen.update_from_message(message)
else:
screen.update_from_message(message)
if not len(remaining):
break
def _bind_screen(self, screen: Screen):
screen.bind(
on_tally_added=self._on_screen_tally_added,
on_tally_update=self._on_screen_tally_update,
on_tally_control=self._on_screen_tally_control,
on_control=self._on_screen_control,
)
def _on_screen_tally_added(self, tally: Tally, **kwargs):
if tally.id not in self.tallies:
self.tallies[tally.id] = tally
self.emit('on_tally_added', tally, **kwargs)
def _on_screen_tally_update(self, *args, **kwargs):
self.emit('on_tally_updated', *args, **kwargs)
def _on_screen_tally_control(self, *args, **kwargs):
self.emit('on_tally_control', *args, **kwargs)
def _on_screen_control(self, *args, **kwargs):
self.emit('on_scontrol', *args, **kwargs)
async def __aenter__(self):
await self.open()
return self
async def __aexit__(self, exc_type, exc_value, traceback):
await self.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
umd = UmdReceiver()
loop.run_until_complete(umd.open())
try:
loop.run_forever()
except KeyboardInterrupt:
loop.run_until_complete(umd.close())
finally:
loop.close()