try:
from loguru import logger
except ImportError: # pragma: no cover
import logging
logger = logging.getLogger(__name__)
import asyncio
import socket
import argparse
from typing import Dict, Tuple, Set, Optional, Sequence
from pydispatch import Dispatcher, Property, DictProperty, ListProperty
from tslumd import MessageType, Message, Display, TallyColor, TallyType, Tally
from tslumd.utils import logger_catch
Client = Tuple[str, int] #: A network client as a tuple of ``(address, port)``
__all__ = ('Client', 'UmdSender')
class UmdProtocol(asyncio.DatagramProtocol):
def __init__(self, sender: 'UmdSender'):
self.sender = sender
def connection_made(self, transport):
logger.debug(f'transport={transport}')
self.transport = transport
self.sender.connected_evt.set()
def datagram_received(self, data, addr): # pragma: no cover
pass
[docs]class UmdSender(Dispatcher):
"""Send UMD Messages
Messages are sent immediately when a change is made to any of the
:class:`~.Tally` objects in :attr:`tallies`. These can be added by using
the :meth:`add_tally` method.
Alternatively, the :meth:`set_tally_color` and :meth:`set_tally_text` methods
may be used.
Arguments:
clients: Intitial value for :attr:`clients`
"""
tallies: Dict[int, Tally]
"""Mapping of :class:`Tally` objects using the :attr:`~Tally.index` as keys
Note:
This should not be altered directly. Use :meth:`add_tally` instead
"""
running: bool
"""``True`` if the client / server are running
"""
loop: asyncio.BaseEventLoop
"""The :class:`asyncio.BaseEventLoop` associated with the instance"""
tx_interval: float = .3
"""Interval to send tally messages, regardless of state changes
"""
clients: Set[Client]
"""Set of :data:`clients <Client>` to send messages to
"""
def __init__(self, clients: Optional[Set[Client]] = None):
self.clients = set()
if clients is not None:
for client in clients:
self.clients.add(client)
self.tallies = {}
self.running = False
self.loop = asyncio.get_event_loop()
self.update_queue = asyncio.Queue()
self.update_task = None
self.tx_task = None
self.connected_evt = asyncio.Event()
self._tx_lock = asyncio.Lock()
[docs] async def open(self):
"""Open connections and begin data transmission
"""
if self.running:
return
self.connected_evt.clear()
logger.debug('UmdSender.open()')
self.running = True
self.transport, self.protocol = await self.loop.create_datagram_endpoint(
lambda: UmdProtocol(self),
family=socket.AF_INET,
)
self.tx_task = asyncio.create_task(self.tx_loop())
logger.info('UmdSender running')
[docs] async def close(self):
"""Stop sending to clients and close connections
"""
if not self.running:
return
logger.debug('UmdSender.close()')
self.running = False
await self.update_queue.put(False)
await self.tx_task
self.tx_task = None
self.transport.close()
logger.info('UmdSender closed')
[docs] async def send_scontrol(self, screen: int, data: bytes):
"""Send an :attr:`SCONTROL <.Message.scontrol>` message
Arguments:
screen: The :attr:`~.Message.screen` for the message
data: The data to send in the :attr:`~.Message.scontrol` field
.. versionadded:: 0.0.2
"""
msg = self._build_message(screen=screen, scontrol=data)
await self.send_message(msg)
[docs] async def send_broadcast_scontrol(self, data: bytes):
"""Send a :attr:`broadcast <.Message.is_broadcast>`
:attr:`SCONTROL <.Message.scontrol>` message
Arguments:
data: The data to send in the :attr:`~.Message.scontrol` field
.. versionadded:: 0.0.2
"""
msg = Message.broadcast(scontrol=data)
await self.send_message(msg)
[docs] def add_tally(self, index_: int, **kwargs) -> Tally:
"""Create a :class:`~.Tally` object and add it to :attr:`tallies`
Arguments:
index_: The tally :attr:`~.Tally.index`
**kwargs: Keyword arguments passed to create the tally instance
Raises:
KeyError: If the given ``index_`` already exists
"""
if index_ in self.tallies:
raise KeyError(f'Tally exists for index {index_}')
tally = Tally(index_, **kwargs)
self.tallies[index_] = tally
tally.bind_async(self.loop, on_update=self.on_tally_updated)
logger.debug(f'new tally: {tally}')
return tally
[docs] def set_tally_color(self, index_: int, tally_type: TallyType, color: TallyColor):
"""Set the tally color for the given index and tally type
Arguments:
index_: The tally :attr:`~.Tally.index`
tally_type: A member of :class:`~.common.TallyType` specifying the
tally lamp within the display
color: The member of :class:`~.common.TallyColor` to set
"""
if tally_type == TallyType.no_tally:
raise ValueError()
if index_ not in self.tallies:
tally = self.add_tally(index_)
else:
tally = self.tallies[index_]
attr = tally_type.name
setattr(tally, attr, color)
[docs] def set_tally_text(self, index_: int, text: str):
"""Set the tally text for the given index
Arguments:
index_: The tally :attr:`~.Tally.index`
text: The :attr:`~.Tally.text` to set
"""
if index_ not in self.tallies:
tally = self.add_tally(index_)
else:
tally = self.tallies[index_]
tally.text = text
[docs] async def send_tally_control(self, index_: int, data: bytes):
"""Send :attr:`~.Display.control` data for the given tally index
Arguments:
index_: The tally :attr:`~.Tally.index`
control: The control data to send
.. versionadded:: 0.0.2
"""
if index_ not in self.tallies:
tally = self.add_tally(index_)
else:
tally = self.tallies[index_]
tally.control = data
msg = self._build_message()
disp = Display.from_tally(tally, msg_type=MessageType.control)
msg.displays.append(disp)
await self.send_message(msg)
[docs] async def send_broadcast_tally_control(self, data: bytes, **kwargs):
"""Send :attr:`~.Display.control` data as
:attr:`broadcast <.Display.is_broadcast>` to all listening displays
Arguments:
**kwargs: Additional keyword arguments to pass to the :class:`~.Tally`
constructor
.. versionadded:: 0.0.2
"""
tally = Tally.broadcast(**kwargs)
tally.control = data
msg = self._build_message()
disp = Display.from_tally(tally, msg_type=MessageType.control)
msg.displays.append(disp)
async with self._tx_lock:
await self.send_message(msg)
for tally in self.tallies.values():
tally.unbind(self)
tally.update_from_display(disp)
tally.bind_async(self.loop, on_update=self.on_tally_updated)
[docs] async def send_broadcast_tally(self, **kwargs):
"""Send a :attr:`broadcast <.Display.is_broadcast>` update
to all listening displays
Arguments:
**kwargs: The keyword arguments to pass to the :class:`~.Tally`
constructor
.. versionadded:: 0.0.2
"""
tally = Tally.broadcast(**kwargs)
if tally.text == '' or tally.control != b'':
msg_type = MessageType.control
else:
msg_type = MessageType.display
msg = self._build_message()
disp = Display.from_tally(tally, msg_type=msg_type)
msg.displays.append(disp)
async with self._tx_lock:
await self.send_message(msg)
for tally in self.tallies.values():
tally.unbind(self)
tally.update_from_display(disp)
tally.bind_async(self.loop, on_update=self.on_tally_updated)
async def on_tally_updated(self, tally: Tally, props_changed: Sequence[str], **kwargs):
if self.running:
if set(props_changed) == set(['control']):
return
logger.debug(f'tally update: {tally}')
await self.update_queue.put(tally.index)
@logger_catch
async def tx_loop(self):
async def get_queue_item(timeout):
try:
item = await asyncio.wait_for(self.update_queue.get(), timeout)
except asyncio.TimeoutError:
item = None
return item
await self.connected_evt.wait()
while self.running:
item = await get_queue_item(self.tx_interval)
if item is False:
self.update_queue.task_done()
break
elif item is None and not self._tx_lock.locked():
await self.send_full_update()
else:
indices = set()
indices.add(item)
self.update_queue.task_done()
while not self.update_queue.empty():
try:
item = self.update_queue.get_nowait()
except asyncio.QueueEmpty:
break
if item is False:
self.update_queue.task_done()
return
indices.add(item)
self.update_queue.task_done()
msg = self._build_message()
tallies = {i:self.tallies[i] for i in indices}
async with self._tx_lock:
for key in sorted(tallies.keys()):
tally = tallies[key]
msg.displays.append(Display.from_tally(tally))
await self.send_message(msg)
async def send_message(self, msg: Message):
data = msg.build_message()
for client in self.clients:
self.transport.sendto(data, client)
async def send_full_update(self):
async with self._tx_lock:
msg = self._build_message()
for tally in self.tallies.values():
disp = Display.from_tally(tally)
msg.displays.append(disp)
await self.send_message(msg)
def _build_message(self, **kwargs) -> Message:
return Message(**kwargs)
async def __aenter__(self):
await self.open()
return self
async def __aexit__(self, exc_type, exc_value, traceback):
await self.close()
class ClientArgAction(argparse._AppendAction):
_default_help = ' '.join([
'Client(s) to send UMD messages to formatted as "<hostaddr>:<port>".',
'Multiple arguments may be given.',
'If nothing is provided, defaults to "127.0.0.1:65000"',
])
def __init__(self,
option_strings,
dest,
nargs=None,
const=None,
default=[('127.0.0.1', 65000)],
type_=str,
choices=None,
required=False,
help=_default_help,
metavar=None):
super().__init__(
option_strings, dest, nargs, const, default,
type_, choices, required, help, metavar,
)
def __call__(self, parser, namespace, values, option_string=None):
addr, port = values.split(':')
values = (addr, int(port))
items = getattr(namespace, self.dest, None)
if items == [('127.0.0.1', 65000)]:
items = []
else:
items = argparse._copy_items(items)
items.append(values)
setattr(namespace, self.dest, items)
def main():
p = argparse.ArgumentParser()
p.add_argument(
'-c', '--client', dest='clients', action=ClientArgAction#, type=str,
)
args = p.parse_args()
logger.info(f'Sending to clients: {args.clients!r}')
loop = asyncio.get_event_loop()
sender = UmdSender(clients=args.clients)
loop.run_until_complete(sender.open())
try:
loop.run_forever()
except KeyboardInterrupt:
loop.run_until_complete(sender.close())
finally:
loop.close()
if __name__ == '__main__':
main()