import asyncio
import dataclasses
from dataclasses import dataclass, field
import enum
import struct
from typing import List, Tuple, Dict
from tslumd import MessageType, TallyColor, Tally
__all__ = (
'Display', 'Message', 'ParseError', 'MessageParseError',
'DmsgParseError', 'DmsgControlParseError',
)
[docs]class ParseError(Exception):
"""Raised on errors during message parsing
.. versionadded:: 0.0.2
"""
msg: str #: Error message
value: bytes #: The relevant message bytes containing the error
def __init__(self, msg: str, value: bytes):
self.msg = msg
self.value = value
def __str__(self):
return f'{self.msg}: "{self.value!r}"'
[docs]class MessageParseError(ParseError):
"""Raised on errors while parsing :class:`Message` objects
.. versionadded:: 0.0.2
"""
pass
[docs]class DmsgParseError(ParseError):
"""Raised on errors while parsing :class:`Display` objects
.. versionadded:: 0.0.2
"""
pass
[docs]class DmsgControlParseError(ParseError):
"""Raised on errors when parsing :attr:`Display.control` data
.. versionadded:: 0.0.2
"""
pass
class Flags(enum.IntFlag):
"""Message flags
"""
NO_FLAGS = 0 #: No flags set
UTF16 = 1
"""Indicates text formatted as ``UTF-16LE`` if set, otherwise ``UTF-8``"""
SCONTROL = 2
"""Indicates the message contains ``SCONTROL`` data if set, otherwise ``DMESG``
"""
[docs]@dataclass
class Display:
"""A single tally "display"
"""
index: int #: The display index from 0 to 65534 (``0xFFFE``)
rh_tally: TallyColor = TallyColor.OFF #: Right hand tally indicator
txt_tally: TallyColor = TallyColor.OFF #: Text tally indicator
lh_tally: TallyColor = TallyColor.OFF #: Left hand tally indicator
brightness: int = 3 #: Display brightness (from 0 to 3)
text: str = '' #: Text to display
control: bytes = b''
"""Control data (if :attr:`type` is :attr:`~.MessageType.control`)
.. versionadded:: 0.0.2
"""
type: MessageType = MessageType.display
"""The message type. One of :attr:`~.MessageType.display` or
:attr:`~.MessageType.control`.
* For :attr:`~.MessageType.display` (the default), the message contains
:attr:`text` information and the :attr:`control` field must be empty.
* For :attr:`~.MessageType.control`, the message contains :attr:`control`
data and the :attr:`text` field must be empty
.. versionadded:: 0.0.2
"""
is_broadcast: bool = field(init=False)
"""``True`` if the display is to a "broadcast", meaning sent to all display
indices.
(if the :attr:`index` is ``0xffff``)
.. versionadded:: 0.0.2
"""
def __post_init__(self):
self.is_broadcast = self.index == 0xffff
if len(self.control):
self.type = MessageType.control
if self.type == MessageType.control and len(self.text):
raise ValueError('Control message cannot contain text')
[docs] @classmethod
def broadcast(cls, **kwargs) -> 'Display':
"""Create a :attr:`broadcast <is_broadcast>` display
(with :attr:`index` set to ``0xffff``)
.. versionadded:: 0.0.2
"""
kwargs = kwargs.copy()
kwargs['index'] = 0xffff
return cls(**kwargs)
[docs] @classmethod
def from_dmsg(cls, flags: Flags, dmsg: bytes) -> Tuple['Display', bytes]:
"""Construct an instance from a ``DMSG`` portion of received message.
Any remaining message data after the relevant ``DMSG`` is returned along
with the instance.
"""
if len(dmsg) < 4:
raise DmsgParseError('Invalid dmsg length', dmsg)
hdr = struct.unpack('<2H', dmsg[:4])
dmsg = dmsg[4:]
ctrl = hdr[1]
kw = dict(
index=hdr[0],
rh_tally=TallyColor(ctrl & 0b11),
txt_tally=TallyColor(ctrl >> 2 & 0b11),
lh_tally=TallyColor(ctrl >> 4 & 0b11),
brightness=ctrl >> 6 & 0b11,
)
is_control_data = ctrl & 0x8000 == 0x8000
if is_control_data:
ctrl, dmsg = cls._unpack_control_data(dmsg)
kw['control'] = ctrl
kw['type'] = MessageType.control
else:
if len(dmsg) < 2:
raise DmsgParseError('Invalid text length field', dmsg)
txt_byte_len = struct.unpack('<H', dmsg[:2])[0]
dmsg = dmsg[2:]
txt_bytes = dmsg[:txt_byte_len]
dmsg = dmsg[txt_byte_len:]
if len(txt_bytes) != txt_byte_len:
raise DmsgParseError(
f'Invalid text bytes. Expected {txt_byte_len}',
txt_bytes,
)
if Flags.UTF16 in flags:
txt = txt_bytes.decode('UTF-16le')
else:
if b'\0' in txt_bytes:
txt_bytes = txt_bytes.split(b'\0')[0]
txt = txt_bytes.decode('UTF-8')
kw['text'] = txt
return cls(**kw), dmsg
[docs] @staticmethod
def _unpack_control_data(data: bytes) -> bytes:
"""Unpack control data (if control bit 15 is set)
Arguments:
data: The portion of the ``dmsg`` at the start of the
"Control Data" field
Returns:
bytes: remaining
The remaining message data after the control data field
Note:
This is undefined as of UMDv5.0 and its implementation is
the author's "best guess" based off of other areas of the protocol
.. versionadded:: 0.0.2
:meta public:
"""
if len(data) < 2:
raise DmsgControlParseError('Unknown control data format', data)
length = struct.unpack('<H', data[:2])[0]
data = data[2:]
if len(data) < length:
raise DmsgControlParseError('Unknown control data format', data)
return data[:length], data[length:]
[docs] @staticmethod
def _pack_control_data(data: bytes) -> bytes:
"""Pack control data (if control bit 15 is set)
Arguments:
data: The control data to pack
Returns:
bytes: packed
The packed control data
Note:
This is undefined as of UMDv5.0 and its implementation is
the author's "best guess" based off of other areas of the protocol
.. versionadded:: 0.0.2
:meta public:
"""
length = len(data)
return struct.pack(f'<H{length}s', length, data)
[docs] def to_dmsg(self, flags: Flags) -> bytes:
"""Build ``dmsg`` bytes to be included in a message
(called from :meth:`Message.build_message`)
"""
ctrl = self.rh_tally & 0b11
ctrl += (self.txt_tally & 0b11) << 2
ctrl += (self.lh_tally & 0b11) << 4
ctrl += (self.brightness & 0b11) << 6
if self.type == MessageType.control:
ctrl |= 0x8000
data = bytearray(struct.pack('<2H', self.index, ctrl))
data.extend(self._pack_control_data(self.control))
else:
if Flags.UTF16 in flags:
txt_bytes = bytes(self.text, 'UTF16-le')
else:
txt_bytes = bytes(self.text, 'UTF-8')
txt_byte_len = len(txt_bytes)
data = bytearray(struct.pack('<3H', self.index, ctrl, txt_byte_len))
data.extend(txt_bytes)
return data
def to_dict(self) -> Dict:
d = dataclasses.asdict(self)
del d['is_broadcast']
return d
[docs] @classmethod
def from_tally(cls, tally: Tally, msg_type: MessageType = MessageType.display) -> 'Display':
"""Create a :class:`Display` from the given :class:`~.Tally`
.. versionadded:: 0.0.2
The msg_type argument
"""
kw = tally.to_dict()
del kw['id']
if msg_type == MessageType.control:
del kw['text']
elif msg_type == MessageType.display:
del kw['control']
kw['type'] = msg_type
return cls(**kw)
def __eq__(self, other):
if not isinstance(other, (Display, Tally)):
return NotImplemented
self_dict = self.to_dict()
oth_dict = other.to_dict()
if isinstance(other, Display):
return self_dict == oth_dict
else:
del oth_dict['id']
del self_dict['type']
if self.type == MessageType.control:
del self_dict['text']
del oth_dict['text']
else:
del self_dict['control']
del oth_dict['control']
return self_dict == oth_dict
def __ne__(self, other):
if not isinstance(other, (Display, Tally)):
return NotImplemented
return not self.__eq__(other)
[docs]@dataclass
class Message:
"""A single UMDv5 message packet
"""
version: int = 0 #: Protocol minor version
flags: int = Flags.NO_FLAGS #: The message :class:`Flags` field
screen: int = 0 #: Screen index from 0 to 65534 (``0xFFFE``)
displays: List[Display] = field(default_factory=list)
"""A list of :class:`Display` instances"""
scontrol: bytes = b''
"""SCONTROL data (if :attr:`type` is :attr:`~.MessageType.control`)"""
type: MessageType = MessageType.display
"""The message type. One of :attr:`~.MessageType.display` or
:attr:`~.MessageType.control`.
* For :attr:`~.MessageType.display` (the default), the contents of
:attr:`displays` are used and the :attr:`scontrol` field must be empty.
* For :attr:`~.MessageType.control`, the :attr:`scontrol` field is used and
:attr:`displays` must be empty.
.. versionadded:: 0.0.2
"""
is_broadcast: bool = field(init=False)
"""``True`` if the message is to be "broadcast" to all screens.
(if :attr:`screen` is ``0xffff``)
.. versionadded:: 0.0.2
"""
def __post_init__(self):
self.is_broadcast = self.screen == 0xffff
if not isinstance(self.flags, Flags):
self.flags = Flags(self.flags)
if len(self.scontrol) and len(self.displays):
raise ValueError('SCONTROL message cannot contain displays')
if len(self.scontrol):
self.type = MessageType.control
if self.type == MessageType.control:
self.flags |= Flags.SCONTROL
elif self.type == MessageType._unset:
if Flags.SCONTROL in self.flags:
self.type = MessageType.control
else:
self.type = MessageType.display
[docs] @classmethod
def broadcast(cls, **kwargs) -> 'Message':
"""Create a :attr:`broadcast <is_broadcast>` message
(with :attr:`screen` set to ``0xffff``)
.. versionadded:: 0.0.2
"""
kwargs = kwargs.copy()
kwargs['screen'] = 0xffff
return cls(**kwargs)
[docs] @classmethod
def parse(cls, msg: bytes) -> Tuple['Message', bytes]:
"""Parse incoming message data to create a :class:`Message` instance.
Any remaining message data after parsing is returned along with the instance.
"""
if len(msg) < 6:
raise MessageParseError('Invalid header length', msg)
data = struct.unpack('<HBBH', msg[:6])
byte_count, version, flags, screen = data
kw = dict(
version=version,
flags=Flags(flags),
screen=screen,
type=MessageType._unset,
)
msg = msg[2:]
if len(msg) < byte_count:
raise MessageParseError(
f'Invalid byte count. Expected {byte_count}, got {len(msg)}',
msg,
)
remaining = msg[byte_count:]
msg = msg[4:byte_count]
obj = cls(**kw)
if obj.type == MessageType.control:
obj.scontrol = msg
return obj, remaining
while len(msg):
disp, msg = Display.from_dmsg(obj.flags, msg)
obj.displays.append(disp)
return obj, remaining
[docs] def build_message(self) -> bytes:
"""Build a message packet from data in this instance
"""
if self.type == MessageType.control:
payload = bytearray(self.scontrol)
else:
payload = bytearray()
for display in self.displays:
payload.extend(display.to_dmsg(self.flags))
payload_byte_count = len(payload)
fmt = f'<HBBH{payload_byte_count}B'
pbc = struct.calcsize(fmt) - 2
data = bytearray(struct.pack('<HBBH', pbc, self.version, self.flags, self.screen))
data.extend(payload)
return bytes(data)