Source code for tslumd.messages

import asyncio
import dataclasses
from dataclasses import dataclass, field
import enum
import struct
from typing import List, Tuple, Dict

from tslumd import TallyColor, Tally

__all__ = ('Display', 'Message')

class Flags(enum.IntFlag):
    """Message flags
    """
    NO_FLAGS = 0 #: No flags set
    UTF16 = 1
    """Indicates text formatted as ``UTF-16LE`` if set, otherwise ``UTF-8``"""

    SCONTROL = 2
    """Indicates the message contains ``SCONTROL`` data if set, otherwise ``DMESG``
    """

[docs]@dataclass class Display: """A single tally "display" """ index: int #: The display index rh_tally: TallyColor = TallyColor.OFF #: Right hand tally indicator txt_tally: TallyColor = TallyColor.OFF #: Text tally indicator lh_tally: TallyColor = TallyColor.OFF #: Left hand tally indicator brightness: int = 3 #: Display brightness (from 0 to 3) text: str = '' #: Text to display
[docs] @classmethod def from_dmsg(cls, flags: Flags, dmsg: bytes) -> Tuple['Display', bytes]: """Construct an instance from a ``DMSG`` portion of received message. Any remaining message data after the relevant ``DMSG`` is returned along with the instance. """ hdr = struct.unpack('<2H', dmsg[:4]) dmsg = dmsg[4:] ctrl = hdr[1] kw = dict( index=hdr[0], rh_tally=TallyColor(ctrl & 0b11), txt_tally=TallyColor(ctrl >> 2 & 0b11), lh_tally=TallyColor(ctrl >> 4 & 0b11), brightness=ctrl >> 6 & 0b11, ) is_control_data = ctrl & 0x8000 == 0x8000 if is_control_data: raise ValueError('Control data undefined for UMDv5.0') else: txt_byte_len = struct.unpack('<H', dmsg[:2])[0] dmsg = dmsg[2:] txt_bytes = dmsg[:txt_byte_len] dmsg = dmsg[txt_byte_len:] if Flags.UTF16 in flags: txt = txt_bytes.decode('UTF-16le') else: if b'\0' in txt_bytes: txt_bytes = txt_bytes.split(b'\0')[0] txt = txt_bytes.decode('UTF-8') kw['text'] = txt return cls(**kw), dmsg
[docs] def to_dmsg(self, flags: Flags) -> bytes: """Build ``dmsg`` bytes to be included in a message (called from :meth:`Message.build_message`) """ ctrl = self.rh_tally & 0b11 ctrl += (self.txt_tally & 0b11) << 2 ctrl += (self.lh_tally & 0b11) << 4 ctrl += (self.brightness & 0b11) << 6 if Flags.UTF16 in flags: txt_bytes = bytes(self.text, 'UTF16-le') else: txt_bytes = bytes(self.text, 'UTF-8') txt_byte_len = len(txt_bytes) data = bytearray(struct.pack('<3H', self.index, ctrl, txt_byte_len)) data.extend(txt_bytes) return data
def to_dict(self) -> Dict: return dataclasses.asdict(self)
[docs] @classmethod def from_tally(cls, tally: Tally) -> 'Display': """Create a :class:`Display` from the given :class:`~.Tally` """ kw = tally.to_dict() return cls(**kw)
def __eq__(self, other): if not isinstance(other, (Display, Tally)): return NotImplemented return self.to_dict() == other.to_dict() def __ne__(self, other): if not isinstance(other, (Display, Tally)): return NotImplemented return self.to_dict() != other.to_dict()
[docs]@dataclass class Message: """A single UMDv5 message packet """ version: int = 0 #: Protocol minor version flags: int = Flags.NO_FLAGS #: The message :class:`Flags` field screen: int = 0 #: Screen index displays: List[Display] = field(default_factory=list) """A list of :class:`Display` instances""" scontrol: bytes = b'' """SCONTROL data (if present). Not currently implemented""" def __post_init__(self): if not isinstance(self.flags, Flags): self.flags = Flags(self.flags)
[docs] @classmethod def parse(cls, msg: bytes) -> Tuple['Message', bytes]: """Parse incoming message data to create a :class:`Message` instance. Any remaining message data after parsing is returned along with the instance. """ data = struct.unpack('<HBBH', msg[:6]) byte_count, version, flags, screen = data kw = dict( version=version, flags=Flags(flags), screen=screen, ) msg = msg[2:] remaining = msg[byte_count:] msg = msg[4:byte_count] obj = cls(**kw) if Flags.SCONTROL in obj.flags: obj.scontrol = msg return obj, remaining while len(msg): disp, msg = Display.from_dmsg(obj.flags, msg) obj.displays.append(disp) return obj, remaining
[docs] def build_message(self) -> bytes: """Build a message packet from data in this instance """ if Flags.SCONTROL in self.flags: payload = bytearray(self.scontrol) else: payload = bytearray() for display in self.displays: payload.extend(display.to_dmsg(self.flags)) payload_byte_count = len(payload) fmt = f'<HBBH{payload_byte_count}B' pbc = struct.calcsize(fmt) - 2 data = bytearray(struct.pack('<HBBH', pbc, self.version, self.flags, self.screen)) data.extend(payload) return bytes(data)