Source code for tslumd.messages

import asyncio
import dataclasses
from dataclasses import dataclass, field
import enum
import struct
from typing import List, Tuple, Dict, Iterable, Optional

from tslumd import MessageType, TallyColor, Tally

__all__ = (
    'Display', 'Message', 'ParseError', 'MessageParseError',
    'DmsgParseError', 'DmsgControlParseError', 'MessageLengthError',

[docs]class ParseError(Exception): """Raised on errors during message parsing .. versionadded:: 0.0.2 """ msg: str #: Error message value: bytes #: The relevant message bytes containing the error def __init__(self, msg: str, value: bytes): self.msg = msg self.value = value def __str__(self): return f'{self.msg}: "{self.value!r}"'
[docs]class MessageParseError(ParseError): """Raised on errors while parsing :class:`Message` objects .. versionadded:: 0.0.2 """ pass
[docs]class DmsgParseError(ParseError): """Raised on errors while parsing :class:`Display` objects .. versionadded:: 0.0.2 """ pass
[docs]class DmsgControlParseError(ParseError): """Raised on errors when parsing :attr:`Display.control` data .. versionadded:: 0.0.2 """ pass
[docs]class MessageLengthError(ValueError): """Raised when message length is larger than 2048 bytes .. versionadded:: 0.0.4 """
class Flags(enum.IntFlag): """Message flags """ NO_FLAGS = 0 #: No flags set UTF16 = 1 """Indicates text formatted as ``UTF-16LE`` if set, otherwise ``UTF-8``""" SCONTROL = 2 """Indicates the message contains ``SCONTROL`` data if set, otherwise ``DMESG`` """
[docs]@dataclass class Display: """A single tally "display" """ index: int #: The display index from 0 to 65534 (``0xFFFE``) rh_tally: TallyColor = TallyColor.OFF #: Right hand tally indicator txt_tally: TallyColor = TallyColor.OFF #: Text tally indicator lh_tally: TallyColor = TallyColor.OFF #: Left hand tally indicator brightness: int = 3 #: Display brightness (from 0 to 3) text: str = '' #: Text to display control: bytes = b'' """Control data (if :attr:`type` is :attr:`~.MessageType.control`) .. versionadded:: 0.0.2 """ type: MessageType = MessageType.display """The message type. One of :attr:`~.MessageType.display` or :attr:`~.MessageType.control`. * For :attr:`~.MessageType.display` (the default), the message contains :attr:`text` information and the :attr:`control` field must be empty. * For :attr:`~.MessageType.control`, the message contains :attr:`control` data and the :attr:`text` field must be empty .. versionadded:: 0.0.2 """ is_broadcast: bool = field(init=False) """``True`` if the display is to a "broadcast", meaning sent to all display indices. (if the :attr:`index` is ``0xffff``) .. versionadded:: 0.0.2 """ def __post_init__(self): self.is_broadcast = self.index == 0xffff if len(self.control): self.type = MessageType.control if self.type == MessageType.control and len(self.text): raise ValueError('Control message cannot contain text')
[docs] @classmethod def broadcast(cls, **kwargs) -> 'Display': """Create a :attr:`broadcast <is_broadcast>` display (with :attr:`index` set to ``0xffff``) .. versionadded:: 0.0.2 """ kwargs = kwargs.copy() kwargs['index'] = 0xffff return cls(**kwargs)
[docs] @classmethod def from_dmsg(cls, flags: Flags, dmsg: bytes) -> Tuple['Display', bytes]: """Construct an instance from a ``DMSG`` portion of received message. Any remaining message data after the relevant ``DMSG`` is returned along with the instance. """ if len(dmsg) < 4: raise DmsgParseError('Invalid dmsg length', dmsg) hdr = struct.unpack('<2H', dmsg[:4]) dmsg = dmsg[4:] ctrl = hdr[1] kw = dict( index=hdr[0], rh_tally=TallyColor(ctrl & 0b11), txt_tally=TallyColor(ctrl >> 2 & 0b11), lh_tally=TallyColor(ctrl >> 4 & 0b11), brightness=ctrl >> 6 & 0b11, ) is_control_data = ctrl & 0x8000 == 0x8000 if is_control_data: ctrl, dmsg = cls._unpack_control_data(dmsg) kw['control'] = ctrl kw['type'] = MessageType.control else: if len(dmsg) < 2: raise DmsgParseError('Invalid text length field', dmsg) txt_byte_len = struct.unpack('<H', dmsg[:2])[0] dmsg = dmsg[2:] txt_bytes = dmsg[:txt_byte_len] dmsg = dmsg[txt_byte_len:] if len(txt_bytes) != txt_byte_len: raise DmsgParseError( f'Invalid text bytes. Expected {txt_byte_len}', txt_bytes, ) if Flags.UTF16 in flags: txt = txt_bytes.decode('UTF-16le') else: if b'\0' in txt_bytes: txt_bytes = txt_bytes.split(b'\0')[0] txt = txt_bytes.decode('UTF-8') kw['text'] = txt return cls(**kw), dmsg
[docs] @staticmethod def _unpack_control_data(data: bytes) -> bytes: """Unpack control data (if control bit 15 is set) Arguments: data: The portion of the ``dmsg`` at the start of the "Control Data" field Returns: bytes: remaining The remaining message data after the control data field Note: This is undefined as of UMDv5.0 and its implementation is the author's "best guess" based off of other areas of the protocol .. versionadded:: 0.0.2 :meta public: """ if len(data) < 2: raise DmsgControlParseError('Unknown control data format', data) length = struct.unpack('<H', data[:2])[0] data = data[2:] if len(data) < length: raise DmsgControlParseError('Unknown control data format', data) return data[:length], data[length:]
[docs] @staticmethod def _pack_control_data(data: bytes) -> bytes: """Pack control data (if control bit 15 is set) Arguments: data: The control data to pack Returns: bytes: packed The packed control data Note: This is undefined as of UMDv5.0 and its implementation is the author's "best guess" based off of other areas of the protocol .. versionadded:: 0.0.2 :meta public: """ length = len(data) return struct.pack(f'<H{length}s', length, data)
[docs] def to_dmsg(self, flags: Flags) -> bytes: """Build ``dmsg`` bytes to be included in a message (called from :meth:`Message.build_message`) """ ctrl = self.rh_tally & 0b11 ctrl += (self.txt_tally & 0b11) << 2 ctrl += (self.lh_tally & 0b11) << 4 ctrl += (self.brightness & 0b11) << 6 if self.type == MessageType.control: ctrl |= 0x8000 data = bytearray(struct.pack('<2H', self.index, ctrl)) data.extend(self._pack_control_data(self.control)) else: if Flags.UTF16 in flags: txt_bytes = bytes(self.text, 'UTF16-le') else: txt_bytes = bytes(self.text, 'UTF-8') txt_byte_len = len(txt_bytes) data = bytearray(struct.pack('<3H', self.index, ctrl, txt_byte_len)) data.extend(txt_bytes) return data
def to_dict(self) -> Dict: d = dataclasses.asdict(self) del d['is_broadcast'] return d
[docs] @classmethod def from_tally(cls, tally: Tally, msg_type: MessageType = MessageType.display) -> 'Display': """Create a :class:`Display` from the given :class:`~.Tally` .. versionadded:: 0.0.2 The msg_type argument """ kw = tally.to_dict() del kw['id'] if msg_type == MessageType.control: del kw['text'] elif msg_type == MessageType.display: del kw['control'] kw['type'] = msg_type return cls(**kw)
def __eq__(self, other): if not isinstance(other, (Display, Tally)): return NotImplemented self_dict = self.to_dict() oth_dict = other.to_dict() if isinstance(other, Display): return self_dict == oth_dict else: del oth_dict['id'] del self_dict['type'] if self.type == MessageType.control: del self_dict['text'] del oth_dict['text'] else: del self_dict['control'] del oth_dict['control'] return self_dict == oth_dict def __ne__(self, other): if not isinstance(other, (Display, Tally)): return NotImplemented return not self.__eq__(other)
[docs]@dataclass class Message: """A single UMDv5 message packet """ version: int = 0 #: Protocol minor version flags: int = Flags.NO_FLAGS #: The message :class:`Flags` field screen: int = 0 #: Screen index from 0 to 65534 (``0xFFFE``) displays: List[Display] = field(default_factory=list) """A list of :class:`Display` instances""" scontrol: bytes = b'' """SCONTROL data (if :attr:`type` is :attr:`~.MessageType.control`)""" type: MessageType = MessageType.display """The message type. One of :attr:`~.MessageType.display` or :attr:`~.MessageType.control`. * For :attr:`~.MessageType.display` (the default), the contents of :attr:`displays` are used and the :attr:`scontrol` field must be empty. * For :attr:`~.MessageType.control`, the :attr:`scontrol` field is used and :attr:`displays` must be empty. .. versionadded:: 0.0.2 """ is_broadcast: bool = field(init=False) """``True`` if the message is to be "broadcast" to all screens. (if :attr:`screen` is ``0xffff``) .. versionadded:: 0.0.2 """ def __post_init__(self): self.is_broadcast = self.screen == 0xffff if not isinstance(self.flags, Flags): self.flags = Flags(self.flags) if len(self.scontrol) and len(self.displays): raise ValueError('SCONTROL message cannot contain displays') if len(self.scontrol): self.type = MessageType.control if self.type == MessageType.control: self.flags |= Flags.SCONTROL elif self.type == MessageType._unset: if Flags.SCONTROL in self.flags: self.type = MessageType.control else: self.type = MessageType.display
[docs] @classmethod def broadcast(cls, **kwargs) -> 'Message': """Create a :attr:`broadcast <is_broadcast>` message (with :attr:`screen` set to ``0xffff``) .. versionadded:: 0.0.2 """ kwargs = kwargs.copy() kwargs['screen'] = 0xffff return cls(**kwargs)
[docs] @classmethod def parse(cls, msg: bytes) -> Tuple['Message', bytes]: """Parse incoming message data to create a :class:`Message` instance. Any remaining message data after parsing is returned along with the instance. """ if len(msg) < 6: raise MessageParseError('Invalid header length', msg) data = struct.unpack('<HBBH', msg[:6]) byte_count, version, flags, screen = data kw = dict( version=version, flags=Flags(flags), screen=screen, type=MessageType._unset, ) msg = msg[2:] if len(msg) < byte_count: raise MessageParseError( f'Invalid byte count. Expected {byte_count}, got {len(msg)}', msg, ) remaining = msg[byte_count:] msg = msg[4:byte_count] obj = cls(**kw) if obj.type == MessageType.control: obj.scontrol = msg return obj, remaining while len(msg): disp, msg = Display.from_dmsg(obj.flags, msg) obj.displays.append(disp) return obj, remaining
[docs] def build_message(self, ignore_packet_length: Optional[bool] = False) -> bytes: """Build a message packet from data in this instance Arguments: ignore_packet_length (bool, optional): If ``False``, the message limit of 2048 bytes is respected, and if exceeded, an exception is raised. Otherwise, the limit is ignored. (default is False) Raises: MessageLengthError: If the message packet is larger than 2048 bytes (and ``ignore_packet_length`` is False) Note: This method is retained for backwards compatability. To properly handle the message limit, use :meth:`build_messages` .. versionchanged:: 0.0.4 * The ``ignore_packet_length`` parameter was added * Message length is limited to 2048 bytes """ it = self.build_messages(ignore_packet_length=ignore_packet_length) data = next(it) try: next_data = next(it) except StopIteration: pass else: if not ignore_packet_length: raise MessageLengthError() return data
[docs] def build_messages(self, ignore_packet_length: Optional[bool] = False) -> Iterable[bytes]: """Build message packet(s) from data in this instance as an iterator The specified maximum packet length of 2048 is respected and if necessary, the data will be split into separate messages. This method will always function as a :term:`generator`, regardless of the number of message packets produced. .. versionadded:: 0.0.4 """ msg_len_exceeded = False next_disp_index = None if self.type == MessageType.control: payload = bytearray(self.scontrol) byte_count = len(payload) if byte_count + 6 > 2048: raise MessageLengthError() else: byte_count = 0 payload = bytearray() for disp_index, display in enumerate(self.displays): disp_payload = display.to_dmsg(self.flags) disp_len = len(disp_payload) if not ignore_packet_length: if byte_count + disp_len + 6 >= 2048: if disp_index == 0: raise MessageLengthError() msg_len_exceeded = True next_disp_index = disp_index break byte_count += disp_len payload.extend(disp_payload) fmt = f'<HBBH{byte_count}B' pbc = struct.calcsize(fmt) - 2 data = bytearray(struct.pack('<HBBH', pbc, self.version, self.flags, self.screen)) data.extend(payload) yield bytes(data) if msg_len_exceeded: displays = self.displays[next_disp_index:] attrs = ('version', 'flags', 'screen', 'scontrol', 'type') kw = {attr:getattr(self, attr) for attr in attrs} kw['displays'] = displays sub_msg = Message(**kw) yield from sub_msg.build_messages()