blob: 574feabaf20f3fc97650d9c3f2e5de8ceae1ed14 [file] [log] [blame]
# File generated from <stdin>, with the command:
# ./pdl-compiler/scripts/generate_python_backend.py
# /!\ Do not edit by hand.
from dataclasses import dataclass, field, fields
from typing import Optional, List, Tuple, Union
import enum
import inspect
import math
@dataclass
class Packet:
payload: Optional[bytes] = field(repr=False, default_factory=bytes, compare=False)
@classmethod
def parse_all(cls, span: bytes) -> 'Packet':
packet, remain = getattr(cls, 'parse')(span)
if len(remain) > 0:
raise Exception('Unexpected parsing remainder')
return packet
@property
def size(self) -> int:
pass
def show(self, prefix: str = ''):
print(f'{self.__class__.__name__}')
def print_val(p: str, pp: str, name: str, align: int, typ, val):
if name == 'payload':
pass
# Scalar fields.
elif typ is int:
print(f'{p}{name:{align}} = {val} (0x{val:x})')
# Byte fields.
elif typ is bytes:
print(f'{p}{name:{align}} = [', end='')
line = ''
n_pp = ''
for (idx, b) in enumerate(val):
if idx > 0 and idx % 8 == 0:
print(f'{n_pp}{line}')
line = ''
n_pp = pp + (' ' * (align + 4))
line += f' {b:02x}'
print(f'{n_pp}{line} ]')
# Enum fields.
elif inspect.isclass(typ) and issubclass(typ, enum.IntEnum):
print(f'{p}{name:{align}} = {typ.__name__}::{val.name} (0x{val:x})')
# Struct fields.
elif inspect.isclass(typ) and issubclass(typ, globals().get('Packet')):
print(f'{p}{name:{align}} = ', end='')
val.show(prefix=pp)
# Array fields.
elif getattr(typ, '__origin__', None) == list:
print(f'{p}{name:{align}}')
last = len(val) - 1
align = 5
for (idx, elt) in enumerate(val):
n_p = pp + ('├── ' if idx != last else '└── ')
n_pp = pp + ('│ ' if idx != last else ' ')
print_val(n_p, n_pp, f'[{idx}]', align, typ.__args__[0], val[idx])
# Custom fields.
elif inspect.isclass(typ):
print(f'{p}{name:{align}} = {repr(val)}')
else:
print(f'{p}{name:{align}} = ##{typ}##')
last = len(fields(self)) - 1
align = max(len(f.name) for f in fields(self) if f.name != 'payload')
for (idx, f) in enumerate(fields(self)):
p = prefix + ('├── ' if idx != last else '└── ')
pp = prefix + ('│ ' if idx != last else ' ')
val = getattr(self, f.name)
print_val(p, pp, f.name, align, f.type, val)
class PacketBoundaryFlag(enum.IntEnum):
COMPLETE = 0x0
NOT_COMPLETE = 0x1
@staticmethod
def from_int(v: int) -> Union[int, 'PacketBoundaryFlag']:
try:
return PacketBoundaryFlag(v)
except ValueError as exn:
raise exn
class MessageType(enum.IntEnum):
DATA = 0x0
COMMAND = 0x1
RESPONSE = 0x2
NOTIFICATION = 0x3
@staticmethod
def from_int(v: int) -> Union[int, 'MessageType']:
try:
return MessageType(v)
except ValueError as exn:
raise exn
class GroupId(enum.IntEnum):
CORE = 0x0
SESSION_CONFIG = 0x1
SESSION_CONTROL = 0x2
DATA_CONTROL = 0x3
VENDOR_RESERVED_9 = 0x9
VENDOR_RESERVED_A = 0xa
VENDOR_RESERVED_B = 0xb
VENDOR_ANDROID = 0xc
TEST = 0xd
VENDOR_RESERVED_E = 0xe
VENDOR_RESERVED_F = 0xf
@staticmethod
def from_int(v: int) -> Union[int, 'GroupId']:
try:
return GroupId(v)
except ValueError as exn:
raise exn
class DataPacketFormat(enum.IntEnum):
DATA_SND = 0x1
DATA_RCV = 0x2
@staticmethod
def from_int(v: int) -> Union[int, 'DataPacketFormat']:
try:
return DataPacketFormat(v)
except ValueError as exn:
raise exn
class CoreOpcodeId(enum.IntEnum):
DEVICE_RESET = 0x0
DEVICE_STATUS = 0x1
GET_DEVICE_INFO = 0x2
GET_CAPS_INFO = 0x3
SET_CONFIG = 0x4
GET_CONFIG = 0x5
GENERIC_ERROR = 0x7
QUERY_UWBS_TIMESTAMP = 0x8
@staticmethod
def from_int(v: int) -> Union[int, 'CoreOpcodeId']:
try:
return CoreOpcodeId(v)
except ValueError as exn:
raise exn
class SessionConfigOpcodeId(enum.IntEnum):
INIT = 0x0
DEINIT = 0x1
STATUS = 0x2
SET_APP_CONFIG = 0x3
GET_APP_CONFIG = 0x4
GET_COUNT = 0x5
GET_STATE = 0x6
UPDATE_CONTROLLER_MULTICAST_LIST = 0x7
UPDATE_DT_ANCHOR_RANGING_ROUNDS = 0x8
UPDATE_DT_TAG_RANGING_ROUNDS = 0x9
QUERY_DATA_SIZE_IN_RANGING = 0xb
@staticmethod
def from_int(v: int) -> Union[int, 'SessionConfigOpcodeId']:
try:
return SessionConfigOpcodeId(v)
except ValueError as exn:
raise exn
class SessionControlOpcodeId(enum.IntEnum):
START = 0x0
STOP = 0x1
GET_RANGING_COUNT = 0x3
DATA_CREDIT = 0x4
DATA_TRANSFER_STATUS = 0x5
@staticmethod
def from_int(v: int) -> Union[int, 'SessionControlOpcodeId']:
try:
return SessionControlOpcodeId(v)
except ValueError as exn:
raise exn
class AndroidOpcodeId(enum.IntEnum):
GET_POWER_STATS = 0x0
SET_COUNTRY_CODE = 0x1
FIRA_RANGE_DIAGNOSTICS = 0x2
@staticmethod
def from_int(v: int) -> Union[int, 'AndroidOpcodeId']:
try:
return AndroidOpcodeId(v)
except ValueError as exn:
raise exn
class Status(enum.IntEnum):
OK = 0x0
REJECTED = 0x1
FAILED = 0x2
SYNTAX_ERROR = 0x3
INVALID_PARAM = 0x4
INVALID_RANGE = 0x5
INVALID_MESSAGE_SIZE = 0x6
UNKNOWN_GID = 0x7
UNKNOWN_OID = 0x8
READ_ONLY = 0x9
UCI_MESSAGE_RETRY = 0xa
UNKNOWN = 0xb
NOT_APPLICABLE = 0xc
ERROR_SESSION_NOT_EXIST = 0x11
ERROR_SESSION_DUPLICATE = 0x12
ERROR_SESSION_ACTIVE = 0x13
ERROR_MAX_SESSIONS_EXCEEDED = 0x14
ERROR_SESSION_NOT_CONFIGURED = 0x15
ERROR_ACTIVE_SESSIONS_ONGOING = 0x16
ERROR_MULTICAST_LIST_FULL = 0x17
ERROR_UWB_INITIATION_TIME_TOO_OLD = 0x1a
OK_NEGATIVE_DISTANCE_REPORT = 0x1b
RANGING_TX_FAILED = 0x20
RANGING_RX_TIMEOUT = 0x21
RANGING_RX_PHY_DEC_FAILED = 0x22
RANGING_RX_PHY_TOA_FAILED = 0x23
RANGING_RX_PHY_STS_FAILED = 0x24
RANGING_RX_MAC_DEC_FAILED = 0x25
RANGING_RX_MAC_IE_DEC_FAILED = 0x26
RANGING_RX_MAC_IE_MISSING = 0x27
ERROR_ROUND_INDEX_NOT_ACTIVATED = 0x28
ERROR_NUMBER_OF_ACTIVE_RANGING_ROUNDS_EXCEEDED = 0x29
ERROR_DL_TDOA_DEVICE_ADDRESS_NOT_MATCHING_IN_REPLY_TIME_LIST = 0x2a
@staticmethod
def from_int(v: int) -> Union[int, 'Status']:
try:
return Status(v)
except ValueError as exn:
return v
class DataRcvStatusCode(enum.IntEnum):
UCI_STATUS_SUCCESS = 0x0
UCI_STATUS_ERROR = 0x1
UCI_STATUS_UNKNOWN = 0x2
@staticmethod
def from_int(v: int) -> Union[int, 'DataRcvStatusCode']:
try:
return DataRcvStatusCode(v)
except ValueError as exn:
raise exn
class CreditAvailability(enum.IntEnum):
CREDIT_NOT_AVAILABLE = 0x0
CREDIT_AVAILABLE = 0x1
@staticmethod
def from_int(v: int) -> Union[int, 'CreditAvailability']:
try:
return CreditAvailability(v)
except ValueError as exn:
raise exn
class DataTransferNtfStatusCode(enum.IntEnum):
UCI_DATA_TRANSFER_STATUS_REPETITION_OK = 0x0
UCI_DATA_TRANSFER_STATUS_OK = 0x1
UCI_DATA_TRANSFER_STATUS_ERROR_DATA_TRANSFER = 0x2
UCI_DATA_TRANSFER_STATUS_ERROR_NO_CREDIT_AVAILABLE = 0x3
UCI_DATA_TRANSFER_STATUS_ERROR_REJECTED = 0x4
UCI_DATA_TRANSFER_STATUS_SESSION_TYPE_NOT_SUPPORTED = 0x5
UCI_DATA_TRANSFER_STATUS_ERROR_DATA_TRANSFER_IS_ONGOING = 0x6
UCI_DATA_TRANSFER_STATUS_INVALID_FORMAT = 0x7
@staticmethod
def from_int(v: int) -> Union[int, 'DataTransferNtfStatusCode']:
try:
return DataTransferNtfStatusCode(v)
except ValueError as exn:
raise exn
class ResetConfig(enum.IntEnum):
UWBS_RESET = 0x0
@staticmethod
def from_int(v: int) -> Union[int, 'ResetConfig']:
try:
return ResetConfig(v)
except ValueError as exn:
raise exn
class AppConfigTlvType(enum.IntEnum):
DEVICE_TYPE = 0x0
RANGING_ROUND_USAGE = 0x1
STS_CONFIG = 0x2
MULTI_NODE_MODE = 0x3
CHANNEL_NUMBER = 0x4
NUMBER_OF_CONTROLEES = 0x5
DEVICE_MAC_ADDRESS = 0x6
DST_MAC_ADDRESS = 0x7
SLOT_DURATION = 0x8
RANGING_DURATION = 0x9
STS_INDEX = 0xa
MAC_FCS_TYPE = 0xb
RANGING_ROUND_CONTROL = 0xc
AOA_RESULT_REQ = 0xd
SESSION_INFO_NTF_CONFIG = 0xe
NEAR_PROXIMITY_CONFIG = 0xf
FAR_PROXIMITY_CONFIG = 0x10
DEVICE_ROLE = 0x11
RFRAME_CONFIG = 0x12
RSSI_REPORTING = 0x13
PREAMBLE_CODE_INDEX = 0x14
SFD_ID = 0x15
PSDU_DATA_RATE = 0x16
PREAMBLE_DURATION = 0x17
LINK_LAYER_MODE = 0x18
DATA_REPETITION_COUNT = 0x19
RANGING_TIME_STRUCT = 0x1a
SLOTS_PER_RR = 0x1b
AOA_BOUND_CONFIG = 0x1d
PRF_MODE = 0x1f
CAP_SIZE_RANGE = 0x20
TX_JITTER_WINDOW_SIZE = 0x21
SCHEDULE_MODE = 0x22
KEY_ROTATION = 0x23
KEY_ROTATION_RATE = 0x24
SESSION_PRIORITY = 0x25
MAC_ADDRESS_MODE = 0x26
VENDOR_ID = 0x27
STATIC_STS_IV = 0x28
NUMBER_OF_STS_SEGMENTS = 0x29
MAX_RR_RETRY = 0x2a
UWB_INITIATION_TIME = 0x2b
HOPPING_MODE = 0x2c
BLOCK_STRIDE_LENGTH = 0x2d
RESULT_REPORT_CONFIG = 0x2e
IN_BAND_TERMINATION_ATTEMPT_COUNT = 0x2f
SUB_SESSION_ID = 0x30
BPRF_PHR_DATA_RATE = 0x31
MAX_NUMBER_OF_MEASUREMENTS = 0x32
STS_LENGTH = 0x35
MIN_FRAMES_PER_RR = 0x3a
MTU_SIZE = 0x3b
INTER_FRAME_INTERVAL = 0x3c
DL_TDOA_RANGING_METHOD = 0x3d
DL_TDOA_TX_TIMESTAMP_CONF = 0x3e
DL_TDOA_HOP_COUNT = 0x3f
DL_TDOA_ANCHOR_CFO = 0x40
DL_TDOA_ANCHOR_LOCATION = 0x41
DL_TDOA_TX_ACTIVE_RANGING_ROUNDS = 0x42
DL_TDOA_BLOCK_SKIPPING = 0x43
DL_TDOA_TIME_REFERENCE_ANCHOR = 0x44
SESSION_KEY = 0x45
SUB_SESSION_KEY = 0x46
SESSION_DATA_TRANSFER_STATUS_NTF_CONFIG = 0x47
SESSION_TIME_BASE = 0x48
DL_TDOA_RESPONDER_TOF = 0x49
SECURE_RANGING_NEFA_LEVEL = 0x4a
SECURE_RANGING_CSW_LENGTH = 0x4b
APPLICATION_DATA_ENDPOINT = 0x4c
OWR_AOA_MEASUREMENT_NTF_PERIOD = 0x4d
@staticmethod
def from_int(v: int) -> Union[int, 'AppConfigTlvType']:
try:
return AppConfigTlvType(v)
except ValueError as exn:
return v
class DeviceType(enum.IntEnum):
CONTROLEE = 0x0
CONTROLLER = 0x1
@staticmethod
def from_int(v: int) -> Union[int, 'DeviceType']:
try:
return DeviceType(v)
except ValueError as exn:
raise exn
class RangingRoundUsage(enum.IntEnum):
SS_TWR_DEFERRED_MODE = 0x1
DS_TWR_DEFERRED_MODE = 0x2
SS_TWR_NON_DEFERRED_MODE = 0x3
DS_TWR_NON_DEFERRED_MODE = 0x4
ON_WAY_RANGING_DL_TDOA = 0x5
OWR_AOA_MEASUREMENT = 0x6
ESS_TWR_NON_DEFERRED = 0x7
ADS_TWR_NON_DEFERRED = 0x8
@staticmethod
def from_int(v: int) -> Union[int, 'RangingRoundUsage']:
try:
return RangingRoundUsage(v)
except ValueError as exn:
raise exn
class StsConfig(enum.IntEnum):
STATIC = 0x0
DYNAMIC = 0x1
DYNAMIC_FOR_RESPONDER_SUB_SESSION_KEY = 0x2
PROVISIONED = 0x3
PROVISIONED_FOR_RESPONDER_SUB_SESSION_KEY = 0x4
@staticmethod
def from_int(v: int) -> Union[int, 'StsConfig']:
try:
return StsConfig(v)
except ValueError as exn:
raise exn
class MultiNodeMode(enum.IntEnum):
ONE_TO_ONE = 0x0
ONE_TO_MANY = 0x1
@staticmethod
def from_int(v: int) -> Union[int, 'MultiNodeMode']:
try:
return MultiNodeMode(v)
except ValueError as exn:
raise exn
class ChannelNumber(enum.IntEnum):
CHANNEL_NUMBER_5 = 0x5
CHANNEL_NUMBER_6 = 0x6
CHANNEL_NUMBER_8 = 0x8
CHANNEL_NUMBER_9 = 0x9
CHANNEL_NUMBER_10 = 0xa
CHANNEL_NUMBER_12 = 0xc
CHANNEL_NUMBER_13 = 0xd
CHANNEL_NUMBER_14 = 0xe
@staticmethod
def from_int(v: int) -> Union[int, 'ChannelNumber']:
try:
return ChannelNumber(v)
except ValueError as exn:
raise exn
class MacFcsType(enum.IntEnum):
CRC_16 = 0x0
CRC_32 = 0x1
@staticmethod
def from_int(v: int) -> Union[int, 'MacFcsType']:
try:
return MacFcsType(v)
except ValueError as exn:
raise exn
@dataclass
class RangingRoundControl(Packet):
rrrm: int = field(kw_only=True, default=0)
rcp: int = field(kw_only=True, default=0)
mrp: int = field(kw_only=True, default=0)
mrm: int = field(kw_only=True, default=0)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['RangingRoundControl', bytes]:
fields = {'payload': None}
if len(span) < 1:
raise Exception('Invalid packet size')
fields['rrrm'] = (span[0] >> 0) & 0x1
if (span[0] >> 1) & 0x1 != 0x1:
raise Exception('Unexpected fixed field value')
fields['rcp'] = (span[0] >> 2) & 0x1
fields['mrp'] = (span[0] >> 6) & 0x1
fields['mrm'] = (span[0] >> 7) & 0x1
span = span[1:]
return RangingRoundControl(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.rrrm > 1:
print(f"Invalid value for field RangingRoundControl::rrrm: {self.rrrm} > 1; the value will be truncated")
self.rrrm &= 1
if self.rcp > 1:
print(f"Invalid value for field RangingRoundControl::rcp: {self.rcp} > 1; the value will be truncated")
self.rcp &= 1
if self.mrp > 1:
print(f"Invalid value for field RangingRoundControl::mrp: {self.mrp} > 1; the value will be truncated")
self.mrp &= 1
if self.mrm > 1:
print(f"Invalid value for field RangingRoundControl::mrm: {self.mrm} > 1; the value will be truncated")
self.mrm &= 1
_value = (
(self.rrrm << 0) |
(1 << 1) |
(self.rcp << 2) |
(self.mrp << 6) |
(self.mrm << 7)
)
_span.append(_value)
return bytes(_span)
@property
def size(self) -> int:
return 1
class AoaResultReq(enum.IntEnum):
AOA_DISABLED = 0x0
AOA_ENABLED = 0x1
AOA_ENABLED_AZIMUTH_ONLY = 0x2
AOA_ENABLED_ELEVATION_ONLY = 0x3
@staticmethod
def from_int(v: int) -> Union[int, 'AoaResultReq']:
try:
return AoaResultReq(v)
except ValueError as exn:
raise exn
class SessionInfoNtfConfig(enum.IntEnum):
DISABLE = 0x0
ENABLE = 0x1
ENABLE_PROXIMITY_TRIGGER = 0x2
ENABLE_AOA_TRIGGER = 0x3
ENABLE_PROXIMITY_AOA_TRIGGER = 0x4
ENABLE_PROXIMITY_EDGE_TRIGGER = 0x5
ENABLE_AOA_EDGE_TRIGGER = 0x6
ENABLE_PROXIMITY_AOA_EDGE_TRIGGER = 0x7
@staticmethod
def from_int(v: int) -> Union[int, 'SessionInfoNtfConfig']:
try:
return SessionInfoNtfConfig(v)
except ValueError as exn:
raise exn
class DeviceRole(enum.IntEnum):
RESPONDER = 0x0
INITIATOR = 0x1
ADVERTISER = 0x5
OBSERVER = 0x6
DT_ANCHOR = 0x7
DT_TAG = 0x8
@staticmethod
def from_int(v: int) -> Union[int, 'DeviceRole']:
try:
return DeviceRole(v)
except ValueError as exn:
raise exn
class RframeConfig(enum.IntEnum):
SP0 = 0x0
SP1 = 0x1
SP3 = 0x3
@staticmethod
def from_int(v: int) -> Union[int, 'RframeConfig']:
try:
return RframeConfig(v)
except ValueError as exn:
raise exn
class RssiReporting(enum.IntEnum):
DISABLE = 0x0
ENABLE = 0x1
@staticmethod
def from_int(v: int) -> Union[int, 'RssiReporting']:
try:
return RssiReporting(v)
except ValueError as exn:
raise exn
class PsduDataRate(enum.IntEnum):
DATA_RATE_6M81 = 0x0
DATA_RATE_7M80 = 0x1
DATA_RATE_27M2 = 0x2
DATA_RATE_31M2 = 0x3
@staticmethod
def from_int(v: int) -> Union[int, 'PsduDataRate']:
try:
return PsduDataRate(v)
except ValueError as exn:
raise exn
class PreambleDuration(enum.IntEnum):
DURATION_32_SYMBOLS = 0x0
DURATION_64_SYMBOLS = 0x1
@staticmethod
def from_int(v: int) -> Union[int, 'PreambleDuration']:
try:
return PreambleDuration(v)
except ValueError as exn:
raise exn
class LinkLayerMode(enum.IntEnum):
BYPASS_MODE = 0x0
@staticmethod
def from_int(v: int) -> Union[int, 'LinkLayerMode']:
try:
return LinkLayerMode(v)
except ValueError as exn:
raise exn
class RangingTimeStruct(enum.IntEnum):
BLOCK_BASED_SCHEDULING = 0x1
@staticmethod
def from_int(v: int) -> Union[int, 'RangingTimeStruct']:
try:
return RangingTimeStruct(v)
except ValueError as exn:
raise exn
class PrfMode(enum.IntEnum):
BPRF_MODE = 0x0
HPRF_MODE_124M8 = 0x1
HPRF_MODE_249M6 = 0x2
@staticmethod
def from_int(v: int) -> Union[int, 'PrfMode']:
try:
return PrfMode(v)
except ValueError as exn:
raise exn
class ScheduleMode(enum.IntEnum):
CONTENTION_BASED = 0x0
TIME_SCHEDULED = 0x1
@staticmethod
def from_int(v: int) -> Union[int, 'ScheduleMode']:
try:
return ScheduleMode(v)
except ValueError as exn:
raise exn
class KeyRotation(enum.IntEnum):
DISABLE = 0x0
ENABLE = 0x1
@staticmethod
def from_int(v: int) -> Union[int, 'KeyRotation']:
try:
return KeyRotation(v)
except ValueError as exn:
raise exn
class MacAddressMode(enum.IntEnum):
MODE_0 = 0x0
MODE_1 = 0x1
MODE_2 = 0x2
@staticmethod
def from_int(v: int) -> Union[int, 'MacAddressMode']:
try:
return MacAddressMode(v)
except ValueError as exn:
raise exn
class HoppingMode(enum.IntEnum):
DISABLE = 0x0
ENABLE = 0x1
@staticmethod
def from_int(v: int) -> Union[int, 'HoppingMode']:
try:
return HoppingMode(v)
except ValueError as exn:
raise exn
@dataclass
class ResultReportConfig(Packet):
tof: int = field(kw_only=True, default=0)
aoa_azimuth: int = field(kw_only=True, default=0)
aoa_elevation: int = field(kw_only=True, default=0)
aoa_fom: int = field(kw_only=True, default=0)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['ResultReportConfig', bytes]:
fields = {'payload': None}
if len(span) < 1:
raise Exception('Invalid packet size')
fields['tof'] = (span[0] >> 0) & 0x1
fields['aoa_azimuth'] = (span[0] >> 1) & 0x1
fields['aoa_elevation'] = (span[0] >> 2) & 0x1
fields['aoa_fom'] = (span[0] >> 3) & 0x1
span = span[1:]
return ResultReportConfig(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.tof > 1:
print(f"Invalid value for field ResultReportConfig::tof: {self.tof} > 1; the value will be truncated")
self.tof &= 1
if self.aoa_azimuth > 1:
print(f"Invalid value for field ResultReportConfig::aoa_azimuth: {self.aoa_azimuth} > 1; the value will be truncated")
self.aoa_azimuth &= 1
if self.aoa_elevation > 1:
print(f"Invalid value for field ResultReportConfig::aoa_elevation: {self.aoa_elevation} > 1; the value will be truncated")
self.aoa_elevation &= 1
if self.aoa_fom > 1:
print(f"Invalid value for field ResultReportConfig::aoa_fom: {self.aoa_fom} > 1; the value will be truncated")
self.aoa_fom &= 1
_value = (
(self.tof << 0) |
(self.aoa_azimuth << 1) |
(self.aoa_elevation << 2) |
(self.aoa_fom << 3)
)
_span.append(_value)
return bytes(_span)
@property
def size(self) -> int:
return 1
class BprfPhrDataRate(enum.IntEnum):
DATA_RATE_850K = 0x0
DATA_RATE_6M81 = 0x1
@staticmethod
def from_int(v: int) -> Union[int, 'BprfPhrDataRate']:
try:
return BprfPhrDataRate(v)
except ValueError as exn:
raise exn
class StsLength(enum.IntEnum):
LENGTH_32_SYMBOLS = 0x0
LENGTH_64_SYMBOLS = 0x1
LENGTH_128_SYMBOLS = 0x2
@staticmethod
def from_int(v: int) -> Union[int, 'StsLength']:
try:
return StsLength(v)
except ValueError as exn:
raise exn
class DlTdoaRangingMethod(enum.IntEnum):
SS_TWR = 0x0
DS_TWR = 0x1
@staticmethod
def from_int(v: int) -> Union[int, 'DlTdoaRangingMethod']:
try:
return DlTdoaRangingMethod(v)
except ValueError as exn:
raise exn
class DlTdoaAnchorCfo(enum.IntEnum):
ANCHOR_CFO_NOT_INCLUDED = 0x0
ANCHOR_CFO_INCLUDED = 0x1
@staticmethod
def from_int(v: int) -> Union[int, 'DlTdoaAnchorCfo']:
try:
return DlTdoaAnchorCfo(v)
except ValueError as exn:
raise exn
class SessionDataTransferStatusNtfConfig(enum.IntEnum):
DISABLE = 0x0
ENABLE = 0x1
@staticmethod
def from_int(v: int) -> Union[int, 'SessionDataTransferStatusNtfConfig']:
try:
return SessionDataTransferStatusNtfConfig(v)
except ValueError as exn:
raise exn
class CapTlvType(enum.IntEnum):
SUPPORTED_FIRA_PHY_VERSION_RANGE = 0x0
SUPPORTED_FIRA_MAC_VERSION_RANGE = 0x1
SUPPORTED_DEVICE_ROLES = 0x2
SUPPORTED_RANGING_METHOD = 0x3
SUPPORTED_STS_CONFIG = 0x4
SUPPORTED_MULTI_NODE_MODES = 0x5
SUPPORTED_RANGING_TIME_STRUCT = 0x6
SUPPORTED_SCHEDULED_MODE = 0x7
SUPPORTED_HOPPING_MODE = 0x8
SUPPORTED_BLOCK_STRIDING = 0x9
SUPPORTED_UWB_INITIATION_TIME = 0xa
SUPPORTED_CHANNELS = 0xb
SUPPORTED_RFRAME_CONFIG = 0xc
SUPPORTED_CC_CONSTRAINT_LENGTH = 0xd
SUPPORTED_BPRF_PARAMETER_SETS = 0xe
SUPPORTED_HPRF_PARAMETER_SETS = 0xf
SUPPORTED_AOA = 0x10
SUPPORTED_EXTENDED_MAC_ADDRESS = 0x11
SUPPORTED_MAX_MESSAGE_SIZE = 0x12
SUPPORTED_MAX_DATA_PACKET_PAYLOAD_SIZE = 0x13
SUPPORTED_POWER_STATS = 0xc0
@staticmethod
def from_int(v: int) -> Union[int, 'CapTlvType']:
try:
return CapTlvType(v)
except ValueError as exn:
return v
class AoaResultReqType(enum.IntEnum):
AOA_DISABLE = 0x0
AOA_ENABLE = 0x1
AOA_ENABLE_AZIMUTH = 0x2
AOA_ENABLE_ELEVATION = 0x3
AOA_ENABLE_INTERLEAVED = 0xf0
@staticmethod
def from_int(v: int) -> Union[int, 'AoaResultReqType']:
try:
return AoaResultReqType(v)
except ValueError as exn:
raise exn
class DeviceState(enum.IntEnum):
DEVICE_STATE_READY = 0x1
DEVICE_STATE_ACTIVE = 0x2
DEVICE_STATE_ERROR = 0xff
@staticmethod
def from_int(v: int) -> Union[int, 'DeviceState']:
try:
return DeviceState(v)
except ValueError as exn:
raise exn
class SessionState(enum.IntEnum):
SESSION_STATE_INIT = 0x0
SESSION_STATE_DEINIT = 0x1
SESSION_STATE_ACTIVE = 0x2
SESSION_STATE_IDLE = 0x3
@staticmethod
def from_int(v: int) -> Union[int, 'SessionState']:
try:
return SessionState(v)
except ValueError as exn:
raise exn
class ReasonCode(enum.IntEnum):
STATE_CHANGE_WITH_SESSION_MANAGEMENT_COMMANDS = 0x0
MAX_RANGING_ROUND_RETRY_COUNT_REACHED = 0x1
MAX_NUMBER_OF_MEASUREMENTS_REACHED = 0x2
SESSION_SUSPENDED_DUE_TO_INBAND_SIGNAL = 0x3
SESSION_RESUMED_DUE_TO_INBAND_SIGNAL = 0x4
SESSION_STOPPED_DUE_TO_INBAND_SIGNAL = 0x5
ERROR_INVALID_UL_TDOA_RANDOM_WINDOW = 0x1d
ERROR_MIN_RFRAMES_PER_RR_NOT_SUPPORTED = 0x1e
ERROR_TX_DELAY_NOT_SUPPORTED = 0x1f
ERROR_SLOT_LENGTH_NOT_SUPPORTED = 0x20
ERROR_INSUFFICIENT_SLOTS_PER_RR = 0x21
ERROR_MAC_ADDRESS_MODE_NOT_SUPPORTED = 0x22
ERROR_INVALID_RANGING_DURATION = 0x23
ERROR_INVALID_STS_CONFIG = 0x24
ERROR_INVALID_RFRAME_CONFIG = 0x25
ERROR_HUS_NOT_ENOUGH_SLOTS = 0x26
ERROR_HUS_CFP_PHASE_TOO_SHORT = 0x27
ERROR_HUS_CAP_PHASE_TOO_SHORT = 0x28
ERROR_HUS_OTHERS = 0x29
ERROR_STATUS_SESSION_KEY_NOT_FOUND = 0x2a
ERROR_STATUS_SUB_SESSION_KEY_NOT_FOUND = 0x2b
ERROR_INVALID_PREAMBLE_CODE_INDEX = 0x2c
ERROR_INVALID_SFD_ID = 0x2d
ERROR_INVALID_PSDU_DATA_RATE = 0x2e
ERROR_INVALID_PHR_DATA_RATE = 0x2f
ERROR_INVALID_PREAMBLE_DURATION = 0x30
ERROR_INVALID_STS_LENGTH = 0x31
ERROR_INVALID_NUM_OF_STS_SEGMENTS = 0x32
ERROR_INVALID_NUM_OF_CONTROLEES = 0x33
ERROR_MAX_RANGING_REPLY_TIME_EXCEEDED = 0x34
ERROR_INVALID_DST_ADDRESS_LIST = 0x35
ERROR_INVALID_OR_NOT_FOUND_SUB_SESSION_ID = 0x36
ERROR_INVALID_RESULT_REPORT_CONFIG = 0x37
ERROR_INVALID_RANGING_ROUND_CONTROL_CONFIG = 0x38
ERROR_INVALID_RANGING_ROUND_USAGE = 0x39
ERROR_INVALID_MULTI_NODE_MODE = 0x3a
ERROR_RDS_FETCH_FAILURE = 0x3b
ERROR_REF_UWB_SESSION_DOES_NOT_EXIST = 0x3c
ERROR_REF_UWB_SESSION_RANGING_DURATION_MISMATCH = 0x3d
ERROR_REF_UWB_SESSION_INVALID_OFFSET_TIME = 0x3e
ERROR_REF_UWB_SESSION_LOST = 0x3f
VENDOR_SPECIFIC_REASON_CODE_2 = 0xff
@staticmethod
def from_int(v: int) -> Union[int, 'ReasonCode']:
try:
return ReasonCode(v)
except ValueError as exn:
return v
class MulticastUpdateStatus(enum.IntEnum):
OK_MULTICAST_LIST_UPDATE = 0x0
ERROR_MULTICAST_LIST_FULL = 0x1
ERROR_KEY_FETCH_FAIL = 0x2
ERROR_SUB_SESSION_ID_NOT_FOUND = 0x3
ERROR_SUB_SESSION_KEY_NOT_FOUND = 0x4
ERROR_SUB_SESSION_KEY_NOT_APPLICABLE = 0x5
ERROR_SESSION_KEY_NOT_FOUND = 0x6
ERROR_ADDRESS_NOT_FOUND = 0x7
ERROR_ADDRESS_ALREADY_PRESENT = 0x8
@staticmethod
def from_int(v: int) -> Union[int, 'MulticastUpdateStatus']:
try:
return MulticastUpdateStatus(v)
except ValueError as exn:
raise exn
class MacAddressIndicator(enum.IntEnum):
SHORT_ADDRESS = 0x0
EXTENDED_ADDRESS = 0x1
@staticmethod
def from_int(v: int) -> Union[int, 'MacAddressIndicator']:
try:
return MacAddressIndicator(v)
except ValueError as exn:
raise exn
class SessionType(enum.IntEnum):
FIRA_RANGING_SESSION = 0x0
FIRA_RANGING_AND_IN_BAND_DATA_SESSION = 0x1
FIRA_DATA_TRANSFER_SESSION = 0x2
FIRA_RANGING_ONLY_PHASE = 0x3
FIRA_IN_BAND_DATA_PHASE = 0x4
FIRA_RANGING_WITH_DATA_PHASE = 0x5
CCC = 0xa0
DEVICE_TEST_MODE = 0xd0
@staticmethod
def from_int(v: int) -> Union[int, 'SessionType']:
try:
return SessionType(v)
except ValueError as exn:
raise exn
@dataclass
class CommonPacketHeader(Packet):
pbf: PacketBoundaryFlag = field(kw_only=True, default=PacketBoundaryFlag.COMPLETE)
mt: MessageType = field(kw_only=True, default=MessageType.DATA)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['CommonPacketHeader', bytes]:
fields = {'payload': None}
if len(span) < 1:
raise Exception('Invalid packet size')
fields['pbf'] = PacketBoundaryFlag.from_int((span[0] >> 4) & 0x1)
fields['mt'] = MessageType.from_int((span[0] >> 5) & 0x7)
span = span[1:]
return CommonPacketHeader(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_value = (
(self.pbf << 4) |
(self.mt << 5)
)
_span.append(_value)
return bytes(_span)
@property
def size(self) -> int:
return 1
@dataclass
class ControlPacketHeader(Packet):
pbf: PacketBoundaryFlag = field(kw_only=True, default=PacketBoundaryFlag.COMPLETE)
mt: MessageType = field(kw_only=True, default=MessageType.DATA)
payload_length: int = field(kw_only=True, default=0)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['ControlPacketHeader', bytes]:
fields = {'payload': None}
if len(span) < 4:
raise Exception('Invalid packet size')
fields['pbf'] = PacketBoundaryFlag.from_int((span[0] >> 4) & 0x1)
fields['mt'] = MessageType.from_int((span[0] >> 5) & 0x7)
value_ = int.from_bytes(span[1:3], byteorder='little')
fields['payload_length'] = span[3]
span = span[4:]
return ControlPacketHeader(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_value = (
(self.pbf << 4) |
(self.mt << 5)
)
_span.append(_value)
_span.extend([0] * 2)
if self.payload_length > 255:
print(f"Invalid value for field ControlPacketHeader::payload_length: {self.payload_length} > 255; the value will be truncated")
self.payload_length &= 255
_span.append((self.payload_length << 0))
return bytes(_span)
@property
def size(self) -> int:
return 4
@dataclass
class DataPacketHeader(Packet):
pbf: PacketBoundaryFlag = field(kw_only=True, default=PacketBoundaryFlag.COMPLETE)
mt: MessageType = field(kw_only=True, default=MessageType.DATA)
payload_length: int = field(kw_only=True, default=0)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['DataPacketHeader', bytes]:
fields = {'payload': None}
if len(span) < 4:
raise Exception('Invalid packet size')
fields['pbf'] = PacketBoundaryFlag.from_int((span[0] >> 4) & 0x1)
fields['mt'] = MessageType.from_int((span[0] >> 5) & 0x7)
value_ = int.from_bytes(span[2:4], byteorder='little')
fields['payload_length'] = value_
span = span[4:]
return DataPacketHeader(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_value = (
(self.pbf << 4) |
(self.mt << 5)
)
_span.append(_value)
_span.extend([0] * 1)
if self.payload_length > 65535:
print(f"Invalid value for field DataPacketHeader::payload_length: {self.payload_length} > 65535; the value will be truncated")
self.payload_length &= 65535
_span.extend(int.to_bytes((self.payload_length << 0), length=2, byteorder='little'))
return bytes(_span)
@property
def size(self) -> int:
return 4
@dataclass
class ControlPacket(Packet):
gid: GroupId = field(kw_only=True, default=GroupId.CORE)
mt: MessageType = field(kw_only=True, default=MessageType.DATA)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['ControlPacket', bytes]:
fields = {'payload': None}
if len(span) < 1:
raise Exception('Invalid packet size')
fields['gid'] = GroupId.from_int((span[0] >> 0) & 0xf)
fields['mt'] = MessageType.from_int((span[0] >> 5) & 0x7)
span = span[1:]
payload = span
span = bytes([])
fields['payload'] = payload
try:
return CorePacket.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionConfigPacket.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionControlPacket.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return AndroidPacket.parse(fields.copy(), payload)
except Exception as exn:
pass
return ControlPacket(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_value = (
(self.gid << 0) |
(self.mt << 5)
)
_span.append(_value)
_span.extend(payload or self.payload or [])
return bytes(_span)
@property
def size(self) -> int:
return len(self.payload) + 1
@dataclass
class DataPacket(Packet):
dpf: DataPacketFormat = field(kw_only=True, default=DataPacketFormat.DATA_SND)
pbf: PacketBoundaryFlag = field(kw_only=True, default=PacketBoundaryFlag.COMPLETE)
mt: MessageType = field(kw_only=True, default=MessageType.DATA)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['DataPacket', bytes]:
fields = {'payload': None}
if len(span) < 4:
raise Exception('Invalid packet size')
fields['dpf'] = DataPacketFormat.from_int((span[0] >> 0) & 0xf)
fields['pbf'] = PacketBoundaryFlag.from_int((span[0] >> 4) & 0x1)
fields['mt'] = MessageType.from_int((span[0] >> 5) & 0x7)
value_ = int.from_bytes(span[2:4], byteorder='little')
span = span[4:]
payload = span
span = bytes([])
fields['payload'] = payload
try:
return DataMessageSnd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return DataMessageRcv.parse(fields.copy(), payload)
except Exception as exn:
pass
return DataPacket(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_value = (
(self.dpf << 0) |
(self.pbf << 4) |
(self.mt << 5)
)
_span.append(_value)
_span.extend([0] * 1)
_span.extend([0] * 2)
_span.extend(payload or self.payload or [])
return bytes(_span)
@property
def size(self) -> int:
return len(self.payload) + 4
@dataclass
class DataMessageSnd(DataPacket):
session_handle: int = field(kw_only=True, default=0)
destination_address: int = field(kw_only=True, default=0)
data_sequence_number: int = field(kw_only=True, default=0)
application_data: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
self.dpf = DataPacketFormat.DATA_SND
self.mt = MessageType.DATA
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['DataMessageSnd', bytes]:
if fields['dpf'] != DataPacketFormat.DATA_SND or fields['mt'] != MessageType.DATA:
raise Exception("Invalid constraint field values")
if len(span) < 16:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_handle'] = value_
value_ = int.from_bytes(span[4:12], byteorder='little')
fields['destination_address'] = value_
value_ = int.from_bytes(span[12:14], byteorder='little')
fields['data_sequence_number'] = value_
value_ = int.from_bytes(span[14:16], byteorder='little')
application_data_size = value_
span = span[16:]
if len(span) < application_data_size:
raise Exception('Invalid packet size')
fields['application_data'] = list(span[:application_data_size])
span = span[application_data_size:]
return DataMessageSnd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_handle > 4294967295:
print(f"Invalid value for field DataMessageSnd::session_handle: {self.session_handle} > 4294967295; the value will be truncated")
self.session_handle &= 4294967295
_span.extend(int.to_bytes((self.session_handle << 0), length=4, byteorder='little'))
if self.destination_address > 18446744073709551615:
print(f"Invalid value for field DataMessageSnd::destination_address: {self.destination_address} > 18446744073709551615; the value will be truncated")
self.destination_address &= 18446744073709551615
_span.extend(int.to_bytes((self.destination_address << 0), length=8, byteorder='little'))
if self.data_sequence_number > 65535:
print(f"Invalid value for field DataMessageSnd::data_sequence_number: {self.data_sequence_number} > 65535; the value will be truncated")
self.data_sequence_number &= 65535
_span.extend(int.to_bytes((self.data_sequence_number << 0), length=2, byteorder='little'))
_span.extend(int.to_bytes(((len(self.application_data) * 1) << 0), length=2, byteorder='little'))
_span.extend(self.application_data)
return DataPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.application_data) * 1 + 16
@dataclass
class DataMessageRcv(DataPacket):
session_handle: int = field(kw_only=True, default=0)
status: Status = field(kw_only=True, default=Status.OK)
source_address: int = field(kw_only=True, default=0)
data_sequence_number: int = field(kw_only=True, default=0)
application_data: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
self.dpf = DataPacketFormat.DATA_RCV
self.mt = MessageType.DATA
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['DataMessageRcv', bytes]:
if fields['dpf'] != DataPacketFormat.DATA_RCV or fields['mt'] != MessageType.DATA:
raise Exception("Invalid constraint field values")
if len(span) < 17:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_handle'] = value_
fields['status'] = Status.from_int(span[4])
value_ = int.from_bytes(span[5:13], byteorder='little')
fields['source_address'] = value_
value_ = int.from_bytes(span[13:15], byteorder='little')
fields['data_sequence_number'] = value_
value_ = int.from_bytes(span[15:17], byteorder='little')
application_data_size = value_
span = span[17:]
if len(span) < application_data_size:
raise Exception('Invalid packet size')
fields['application_data'] = list(span[:application_data_size])
span = span[application_data_size:]
return DataMessageRcv(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_handle > 4294967295:
print(f"Invalid value for field DataMessageRcv::session_handle: {self.session_handle} > 4294967295; the value will be truncated")
self.session_handle &= 4294967295
_span.extend(int.to_bytes((self.session_handle << 0), length=4, byteorder='little'))
_span.append((self.status << 0))
if self.source_address > 18446744073709551615:
print(f"Invalid value for field DataMessageRcv::source_address: {self.source_address} > 18446744073709551615; the value will be truncated")
self.source_address &= 18446744073709551615
_span.extend(int.to_bytes((self.source_address << 0), length=8, byteorder='little'))
if self.data_sequence_number > 65535:
print(f"Invalid value for field DataMessageRcv::data_sequence_number: {self.data_sequence_number} > 65535; the value will be truncated")
self.data_sequence_number &= 65535
_span.extend(int.to_bytes((self.data_sequence_number << 0), length=2, byteorder='little'))
_span.extend(int.to_bytes(((len(self.application_data) * 1) << 0), length=2, byteorder='little'))
_span.extend(self.application_data)
return DataPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.application_data) * 1 + 17
@dataclass
class CorePacket(ControlPacket):
oid: CoreOpcodeId = field(kw_only=True, default=CoreOpcodeId.DEVICE_RESET)
def __post_init__(self):
self.gid = GroupId.CORE
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['CorePacket', bytes]:
if fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 3:
raise Exception('Invalid packet size')
fields['oid'] = CoreOpcodeId.from_int((span[0] >> 0) & 0x3f)
value_ = int.from_bytes(span[1:3], byteorder='little')
span = span[3:]
payload = span
span = bytes([])
fields['payload'] = payload
try:
return CoreDeviceResetCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return CoreDeviceResetRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return CoreDeviceStatusNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return CoreGetDeviceInfoCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return CoreGetDeviceInfoRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return CoreGetCapsInfoCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return CoreGetCapsInfoRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return CoreSetConfigCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return CoreSetConfigRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return CoreGetConfigCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return CoreGetConfigRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return CoreGenericErrorNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return CoreQueryTimeStampCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return CoreQueryTimeStampRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
return CorePacket(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.oid << 0))
_span.extend([0] * 2)
_span.extend(payload or self.payload or [])
return ControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.payload) + 3
@dataclass
class SessionConfigPacket(ControlPacket):
oid: SessionConfigOpcodeId = field(kw_only=True, default=SessionConfigOpcodeId.INIT)
def __post_init__(self):
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionConfigPacket', bytes]:
if fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 3:
raise Exception('Invalid packet size')
fields['oid'] = SessionConfigOpcodeId.from_int((span[0] >> 0) & 0x3f)
value_ = int.from_bytes(span[1:3], byteorder='little')
span = span[3:]
payload = span
span = bytes([])
fields['payload'] = payload
try:
return SessionInitCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionInitRsp_V2.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionInitRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionDeinitCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionDeinitRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionStatusNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionSetAppConfigCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionSetAppConfigRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionGetAppConfigCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionGetAppConfigRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionGetCountCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionGetCountRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionGetStateCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionGetStateRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionUpdateDtAnchorRangingRoundsCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionUpdateDtAnchorRangingRoundsRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionUpdateDtTagRangingRoundsCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionUpdateDtTagRangingRoundsRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionUpdateControllerMulticastListCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionUpdateControllerMulticastListRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionUpdateControllerMulticastListNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionQueryMaxDataSizeInRangingCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionQueryMaxDataSizeInRangingRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
return SessionConfigPacket(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.oid << 0))
_span.extend([0] * 2)
_span.extend(payload or self.payload or [])
return ControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.payload) + 3
@dataclass
class SessionControlPacket(ControlPacket):
oid: SessionControlOpcodeId = field(kw_only=True, default=SessionControlOpcodeId.START)
def __post_init__(self):
self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionControlPacket', bytes]:
if fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 3:
raise Exception('Invalid packet size')
fields['oid'] = SessionControlOpcodeId.from_int((span[0] >> 0) & 0x3f)
value_ = int.from_bytes(span[1:3], byteorder='little')
span = span[3:]
payload = span
span = bytes([])
fields['payload'] = payload
try:
return SessionDataCreditNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionDataTransferStatusNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionStartCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionStartRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionInfoNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionStopCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionStopRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionGetRangingCountCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return SessionGetRangingCountRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
return SessionControlPacket(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.oid << 0))
_span.extend([0] * 2)
_span.extend(payload or self.payload or [])
return ControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.payload) + 3
@dataclass
class AndroidPacket(ControlPacket):
oid: AndroidOpcodeId = field(kw_only=True, default=AndroidOpcodeId.GET_POWER_STATS)
def __post_init__(self):
self.gid = GroupId.VENDOR_ANDROID
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['AndroidPacket', bytes]:
if fields['gid'] != GroupId.VENDOR_ANDROID:
raise Exception("Invalid constraint field values")
if len(span) < 3:
raise Exception('Invalid packet size')
fields['oid'] = AndroidOpcodeId.from_int((span[0] >> 0) & 0x3f)
value_ = int.from_bytes(span[1:3], byteorder='little')
span = span[3:]
payload = span
span = bytes([])
fields['payload'] = payload
try:
return AndroidGetPowerStatsCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return AndroidGetPowerStatsRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return AndroidSetCountryCodeCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return AndroidSetCountryCodeRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return AndroidRangeDiagnosticsNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
return AndroidPacket(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.oid << 0))
_span.extend([0] * 2)
_span.extend(payload or self.payload or [])
return ControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.payload) + 3
@dataclass
class CoreDeviceResetCmd(CorePacket):
reset_config: ResetConfig = field(kw_only=True, default=ResetConfig.UWBS_RESET)
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = CoreOpcodeId.DEVICE_RESET
self.gid = GroupId.CORE
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['CoreDeviceResetCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.DEVICE_RESET or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
fields['reset_config'] = ResetConfig.from_int(span[0])
span = span[1:]
return CoreDeviceResetCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.reset_config << 0))
return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1
@dataclass
class CoreDeviceResetRsp(CorePacket):
status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = CoreOpcodeId.DEVICE_RESET
self.gid = GroupId.CORE
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['CoreDeviceResetRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.DEVICE_RESET or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
span = span[1:]
return CoreDeviceResetRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1
@dataclass
class CoreDeviceStatusNtf(CorePacket):
device_state: DeviceState = field(kw_only=True, default=DeviceState.DEVICE_STATE_READY)
def __post_init__(self):
self.mt = MessageType.NOTIFICATION
self.oid = CoreOpcodeId.DEVICE_STATUS
self.gid = GroupId.CORE
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['CoreDeviceStatusNtf', bytes]:
if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != CoreOpcodeId.DEVICE_STATUS or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
fields['device_state'] = DeviceState.from_int(span[0])
span = span[1:]
return CoreDeviceStatusNtf(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.device_state << 0))
return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1
@dataclass
class CoreGetDeviceInfoCmd(CorePacket):
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = CoreOpcodeId.GET_DEVICE_INFO
self.gid = GroupId.CORE
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['CoreGetDeviceInfoCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.GET_DEVICE_INFO or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
return CoreGetDeviceInfoCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 0
@dataclass
class CoreGetDeviceInfoRsp(CorePacket):
status: Status = field(kw_only=True, default=Status.OK)
uci_version: int = field(kw_only=True, default=0)
mac_version: int = field(kw_only=True, default=0)
phy_version: int = field(kw_only=True, default=0)
uci_test_version: int = field(kw_only=True, default=0)
vendor_spec_info: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = CoreOpcodeId.GET_DEVICE_INFO
self.gid = GroupId.CORE
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['CoreGetDeviceInfoRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.GET_DEVICE_INFO or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 10:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
value_ = int.from_bytes(span[1:3], byteorder='little')
fields['uci_version'] = value_
value_ = int.from_bytes(span[3:5], byteorder='little')
fields['mac_version'] = value_
value_ = int.from_bytes(span[5:7], byteorder='little')
fields['phy_version'] = value_
value_ = int.from_bytes(span[7:9], byteorder='little')
fields['uci_test_version'] = value_
vendor_spec_info_count = span[9]
span = span[10:]
if len(span) < vendor_spec_info_count:
raise Exception('Invalid packet size')
fields['vendor_spec_info'] = list(span[:vendor_spec_info_count])
span = span[vendor_spec_info_count:]
return CoreGetDeviceInfoRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
if self.uci_version > 65535:
print(f"Invalid value for field CoreGetDeviceInfoRsp::uci_version: {self.uci_version} > 65535; the value will be truncated")
self.uci_version &= 65535
_span.extend(int.to_bytes((self.uci_version << 0), length=2, byteorder='little'))
if self.mac_version > 65535:
print(f"Invalid value for field CoreGetDeviceInfoRsp::mac_version: {self.mac_version} > 65535; the value will be truncated")
self.mac_version &= 65535
_span.extend(int.to_bytes((self.mac_version << 0), length=2, byteorder='little'))
if self.phy_version > 65535:
print(f"Invalid value for field CoreGetDeviceInfoRsp::phy_version: {self.phy_version} > 65535; the value will be truncated")
self.phy_version &= 65535
_span.extend(int.to_bytes((self.phy_version << 0), length=2, byteorder='little'))
if self.uci_test_version > 65535:
print(f"Invalid value for field CoreGetDeviceInfoRsp::uci_test_version: {self.uci_test_version} > 65535; the value will be truncated")
self.uci_test_version &= 65535
_span.extend(int.to_bytes((self.uci_test_version << 0), length=2, byteorder='little'))
if len(self.vendor_spec_info) > 255:
print(f"Invalid length for field CoreGetDeviceInfoRsp::vendor_spec_info: {len(self.vendor_spec_info)} > 255; the array will be truncated")
del self.vendor_spec_info[255:]
_span.append((len(self.vendor_spec_info) << 0))
_span.extend(self.vendor_spec_info)
return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.vendor_spec_info) * 1 + 10
@dataclass
class CoreGetCapsInfoCmd(CorePacket):
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = CoreOpcodeId.GET_CAPS_INFO
self.gid = GroupId.CORE
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['CoreGetCapsInfoCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.GET_CAPS_INFO or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
return CoreGetCapsInfoCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 0
@dataclass
class CapTlv(Packet):
t: CapTlvType = field(kw_only=True, default=CapTlvType.SUPPORTED_FIRA_PHY_VERSION_RANGE)
v: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['CapTlv', bytes]:
fields = {'payload': None}
if len(span) < 2:
raise Exception('Invalid packet size')
fields['t'] = CapTlvType.from_int(span[0])
v_count = span[1]
span = span[2:]
if len(span) < v_count:
raise Exception('Invalid packet size')
fields['v'] = list(span[:v_count])
span = span[v_count:]
return CapTlv(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.t << 0))
if len(self.v) > 255:
print(f"Invalid length for field CapTlv::v: {len(self.v)} > 255; the array will be truncated")
del self.v[255:]
_span.append((len(self.v) << 0))
_span.extend(self.v)
return bytes(_span)
@property
def size(self) -> int:
return len(self.v) * 1 + 2
@dataclass
class CoreGetCapsInfoRsp(CorePacket):
status: Status = field(kw_only=True, default=Status.OK)
tlvs: List[CapTlv] = field(kw_only=True, default_factory=list)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = CoreOpcodeId.GET_CAPS_INFO
self.gid = GroupId.CORE
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['CoreGetCapsInfoRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.GET_CAPS_INFO or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 2:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
tlvs_count = span[1]
span = span[2:]
tlvs = []
for n in range(tlvs_count):
element, span = CapTlv.parse(span)
tlvs.append(element)
fields['tlvs'] = tlvs
return CoreGetCapsInfoRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
if len(self.tlvs) > 255:
print(f"Invalid length for field CoreGetCapsInfoRsp::tlvs: {len(self.tlvs)} > 255; the array will be truncated")
del self.tlvs[255:]
_span.append((len(self.tlvs) << 0))
for _elt in self.tlvs:
_span.extend(_elt.serialize())
return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return sum([elt.size for elt in self.tlvs]) + 2
class ConfigParameterId(enum.IntEnum):
DEVICE_STATE = 0x0
LOW_POWER_MODE = 0x1
@staticmethod
def from_int(v: int) -> Union[int, 'ConfigParameterId']:
try:
return ConfigParameterId(v)
except ValueError as exn:
return v
@dataclass
class ConfigParameter(Packet):
id: ConfigParameterId = field(kw_only=True, default=ConfigParameterId.DEVICE_STATE)
value: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['ConfigParameter', bytes]:
fields = {'payload': None}
if len(span) < 2:
raise Exception('Invalid packet size')
fields['id'] = ConfigParameterId.from_int(span[0])
value_size = span[1]
span = span[2:]
if len(span) < value_size:
raise Exception('Invalid packet size')
fields['value'] = list(span[:value_size])
span = span[value_size:]
return ConfigParameter(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.id << 0))
_span.append(((len(self.value) * 1) << 0))
_span.extend(self.value)
return bytes(_span)
@property
def size(self) -> int:
return len(self.value) * 1 + 2
@dataclass
class CoreSetConfigCmd(CorePacket):
parameters: List[ConfigParameter] = field(kw_only=True, default_factory=list)
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = CoreOpcodeId.SET_CONFIG
self.gid = GroupId.CORE
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['CoreSetConfigCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.SET_CONFIG or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
parameters_count = span[0]
span = span[1:]
parameters = []
for n in range(parameters_count):
element, span = ConfigParameter.parse(span)
parameters.append(element)
fields['parameters'] = parameters
return CoreSetConfigCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if len(self.parameters) > 255:
print(f"Invalid length for field CoreSetConfigCmd::parameters: {len(self.parameters)} > 255; the array will be truncated")
del self.parameters[255:]
_span.append((len(self.parameters) << 0))
for _elt in self.parameters:
_span.extend(_elt.serialize())
return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return sum([elt.size for elt in self.parameters]) + 1
@dataclass
class ConfigParameterStatus(Packet):
id: ConfigParameterId = field(kw_only=True, default=ConfigParameterId.DEVICE_STATE)
status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['ConfigParameterStatus', bytes]:
fields = {'payload': None}
if len(span) < 2:
raise Exception('Invalid packet size')
fields['id'] = ConfigParameterId.from_int(span[0])
fields['status'] = Status.from_int(span[1])
span = span[2:]
return ConfigParameterStatus(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.id << 0))
_span.append((self.status << 0))
return bytes(_span)
@property
def size(self) -> int:
return 2
@dataclass
class CoreSetConfigRsp(CorePacket):
status: Status = field(kw_only=True, default=Status.OK)
parameters: List[ConfigParameterStatus] = field(kw_only=True, default_factory=list)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = CoreOpcodeId.SET_CONFIG
self.gid = GroupId.CORE
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['CoreSetConfigRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.SET_CONFIG or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 2:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
parameters_count = span[1]
span = span[2:]
if len(span) < parameters_count * 2:
raise Exception('Invalid packet size')
parameters = []
for n in range(parameters_count):
parameters.append(ConfigParameterStatus.parse_all(span[n * 2:(n + 1) * 2]))
fields['parameters'] = parameters
span = span[parameters_count * 2:]
return CoreSetConfigRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
if len(self.parameters) > 255:
print(f"Invalid length for field CoreSetConfigRsp::parameters: {len(self.parameters)} > 255; the array will be truncated")
del self.parameters[255:]
_span.append((len(self.parameters) << 0))
for _elt in self.parameters:
_span.extend(_elt.serialize())
return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return sum([elt.size for elt in self.parameters]) + 2
@dataclass
class CoreGetConfigCmd(CorePacket):
parameter_ids: List[ConfigParameterId] = field(kw_only=True, default_factory=list)
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = CoreOpcodeId.GET_CONFIG
self.gid = GroupId.CORE
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['CoreGetConfigCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.GET_CONFIG or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
parameter_ids_count = span[0]
span = span[1:]
if len(span) < parameter_ids_count:
raise Exception('Invalid packet size')
parameter_ids = []
for n in range(parameter_ids_count):
parameter_ids.append(ConfigParameterId(int.from_bytes(span[n:n + 1], byteorder='little')))
fields['parameter_ids'] = parameter_ids
span = span[parameter_ids_count:]
return CoreGetConfigCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if len(self.parameter_ids) > 255:
print(f"Invalid length for field CoreGetConfigCmd::parameter_ids: {len(self.parameter_ids)} > 255; the array will be truncated")
del self.parameter_ids[255:]
_span.append((len(self.parameter_ids) << 0))
for _elt in self.parameter_ids:
_span.append(_elt)
return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.parameter_ids) * 8 + 1
@dataclass
class CoreGetConfigRsp(CorePacket):
status: Status = field(kw_only=True, default=Status.OK)
parameters: List[ConfigParameter] = field(kw_only=True, default_factory=list)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = CoreOpcodeId.GET_CONFIG
self.gid = GroupId.CORE
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['CoreGetConfigRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.GET_CONFIG or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 2:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
parameters_count = span[1]
span = span[2:]
parameters = []
for n in range(parameters_count):
element, span = ConfigParameter.parse(span)
parameters.append(element)
fields['parameters'] = parameters
return CoreGetConfigRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
if len(self.parameters) > 255:
print(f"Invalid length for field CoreGetConfigRsp::parameters: {len(self.parameters)} > 255; the array will be truncated")
del self.parameters[255:]
_span.append((len(self.parameters) << 0))
for _elt in self.parameters:
_span.extend(_elt.serialize())
return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return sum([elt.size for elt in self.parameters]) + 2
@dataclass
class CoreGenericErrorNtf(CorePacket):
status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
self.mt = MessageType.NOTIFICATION
self.oid = CoreOpcodeId.GENERIC_ERROR
self.gid = GroupId.CORE
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['CoreGenericErrorNtf', bytes]:
if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != CoreOpcodeId.GENERIC_ERROR or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
span = span[1:]
return CoreGenericErrorNtf(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1
@dataclass
class CoreQueryTimeStampCmd(CorePacket):
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = CoreOpcodeId.QUERY_UWBS_TIMESTAMP
self.gid = GroupId.CORE
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['CoreQueryTimeStampCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.QUERY_UWBS_TIMESTAMP or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
return CoreQueryTimeStampCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 0
@dataclass
class CoreQueryTimeStampRsp(CorePacket):
status: Status = field(kw_only=True, default=Status.OK)
timeStamp: int = field(kw_only=True, default=0)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = CoreOpcodeId.QUERY_UWBS_TIMESTAMP
self.gid = GroupId.CORE
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['CoreQueryTimeStampRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.QUERY_UWBS_TIMESTAMP or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 9:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
value_ = int.from_bytes(span[1:9], byteorder='little')
fields['timeStamp'] = value_
span = span[9:]
return CoreQueryTimeStampRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
if self.timeStamp > 18446744073709551615:
print(f"Invalid value for field CoreQueryTimeStampRsp::timeStamp: {self.timeStamp} > 18446744073709551615; the value will be truncated")
self.timeStamp &= 18446744073709551615
_span.extend(int.to_bytes((self.timeStamp << 0), length=8, byteorder='little'))
return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 9
@dataclass
class SessionInitCmd(SessionConfigPacket):
session_id: int = field(kw_only=True, default=0)
session_type: SessionType = field(kw_only=True, default=SessionType.FIRA_RANGING_SESSION)
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = SessionConfigOpcodeId.INIT
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionInitCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.INIT or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 5:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_id'] = value_
fields['session_type'] = SessionType.from_int(span[4])
span = span[5:]
return SessionInitCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_id > 4294967295:
print(f"Invalid value for field SessionInitCmd::session_id: {self.session_id} > 4294967295; the value will be truncated")
self.session_id &= 4294967295
_span.extend(int.to_bytes((self.session_id << 0), length=4, byteorder='little'))
_span.append((self.session_type << 0))
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 5
@dataclass
class SessionInitRsp_V2(SessionConfigPacket):
status: Status = field(kw_only=True, default=Status.OK)
session_handle: int = field(kw_only=True, default=0)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = SessionConfigOpcodeId.INIT
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionInitRsp_V2', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.INIT or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 5:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
value_ = int.from_bytes(span[1:5], byteorder='little')
fields['session_handle'] = value_
span = span[5:]
return SessionInitRsp_V2(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
if self.session_handle > 4294967295:
print(f"Invalid value for field SessionInitRsp_V2::session_handle: {self.session_handle} > 4294967295; the value will be truncated")
self.session_handle &= 4294967295
_span.extend(int.to_bytes((self.session_handle << 0), length=4, byteorder='little'))
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 5
@dataclass
class SessionInitRsp(SessionConfigPacket):
status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = SessionConfigOpcodeId.INIT
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionInitRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.INIT or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
span = span[1:]
return SessionInitRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1
@dataclass
class SessionDeinitCmd(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = SessionConfigOpcodeId.DEINIT
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionDeinitCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.DEINIT or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 4:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_token'] = value_
span = span[4:]
return SessionDeinitCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_token > 4294967295:
print(f"Invalid value for field SessionDeinitCmd::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 4
@dataclass
class SessionDeinitRsp(SessionConfigPacket):
status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = SessionConfigOpcodeId.DEINIT
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionDeinitRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.DEINIT or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
span = span[1:]
return SessionDeinitRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1
@dataclass
class SessionStatusNtf(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
session_state: SessionState = field(kw_only=True, default=SessionState.SESSION_STATE_INIT)
reason_code: int = field(kw_only=True, default=0)
def __post_init__(self):
self.mt = MessageType.NOTIFICATION
self.oid = SessionConfigOpcodeId.STATUS
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionStatusNtf', bytes]:
if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionConfigOpcodeId.STATUS or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 6:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_token'] = value_
fields['session_state'] = SessionState.from_int(span[4])
fields['reason_code'] = span[5]
span = span[6:]
return SessionStatusNtf(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_token > 4294967295:
print(f"Invalid value for field SessionStatusNtf::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
_span.append((self.session_state << 0))
if self.reason_code > 255:
print(f"Invalid value for field SessionStatusNtf::reason_code: {self.reason_code} > 255; the value will be truncated")
self.reason_code &= 255
_span.append((self.reason_code << 0))
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 6
@dataclass
class AppConfigTlv(Packet):
cfg_id: AppConfigTlvType = field(kw_only=True, default=AppConfigTlvType.DEVICE_TYPE)
v: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['AppConfigTlv', bytes]:
fields = {'payload': None}
if len(span) < 2:
raise Exception('Invalid packet size')
fields['cfg_id'] = AppConfigTlvType.from_int(span[0])
v_count = span[1]
span = span[2:]
if len(span) < v_count:
raise Exception('Invalid packet size')
fields['v'] = list(span[:v_count])
span = span[v_count:]
return AppConfigTlv(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.cfg_id << 0))
if len(self.v) > 255:
print(f"Invalid length for field AppConfigTlv::v: {len(self.v)} > 255; the array will be truncated")
del self.v[255:]
_span.append((len(self.v) << 0))
_span.extend(self.v)
return bytes(_span)
@property
def size(self) -> int:
return len(self.v) * 1 + 2
@dataclass
class SessionSetAppConfigCmd(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
tlvs: List[AppConfigTlv] = field(kw_only=True, default_factory=list)
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = SessionConfigOpcodeId.SET_APP_CONFIG
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionSetAppConfigCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.SET_APP_CONFIG or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 5:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_token'] = value_
tlvs_count = span[4]
span = span[5:]
tlvs = []
for n in range(tlvs_count):
element, span = AppConfigTlv.parse(span)
tlvs.append(element)
fields['tlvs'] = tlvs
return SessionSetAppConfigCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_token > 4294967295:
print(f"Invalid value for field SessionSetAppConfigCmd::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
if len(self.tlvs) > 255:
print(f"Invalid length for field SessionSetAppConfigCmd::tlvs: {len(self.tlvs)} > 255; the array will be truncated")
del self.tlvs[255:]
_span.append((len(self.tlvs) << 0))
for _elt in self.tlvs:
_span.extend(_elt.serialize())
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return sum([elt.size for elt in self.tlvs]) + 5
@dataclass
class AppConfigStatus(Packet):
cfg_id: AppConfigTlvType = field(kw_only=True, default=AppConfigTlvType.DEVICE_TYPE)
status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['AppConfigStatus', bytes]:
fields = {'payload': None}
if len(span) < 2:
raise Exception('Invalid packet size')
fields['cfg_id'] = AppConfigTlvType.from_int(span[0])
fields['status'] = Status.from_int(span[1])
span = span[2:]
return AppConfigStatus(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.cfg_id << 0))
_span.append((self.status << 0))
return bytes(_span)
@property
def size(self) -> int:
return 2
@dataclass
class SessionSetAppConfigRsp(SessionConfigPacket):
status: Status = field(kw_only=True, default=Status.OK)
cfg_status: List[AppConfigStatus] = field(kw_only=True, default_factory=list)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = SessionConfigOpcodeId.SET_APP_CONFIG
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionSetAppConfigRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.SET_APP_CONFIG or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 2:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
cfg_status_count = span[1]
span = span[2:]
if len(span) < cfg_status_count * 2:
raise Exception('Invalid packet size')
cfg_status = []
for n in range(cfg_status_count):
cfg_status.append(AppConfigStatus.parse_all(span[n * 2:(n + 1) * 2]))
fields['cfg_status'] = cfg_status
span = span[cfg_status_count * 2:]
return SessionSetAppConfigRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
if len(self.cfg_status) > 255:
print(f"Invalid length for field SessionSetAppConfigRsp::cfg_status: {len(self.cfg_status)} > 255; the array will be truncated")
del self.cfg_status[255:]
_span.append((len(self.cfg_status) << 0))
for _elt in self.cfg_status:
_span.extend(_elt.serialize())
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return sum([elt.size for elt in self.cfg_status]) + 2
@dataclass
class SessionGetAppConfigCmd(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
app_cfg: List[AppConfigTlvType] = field(kw_only=True, default_factory=list)
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = SessionConfigOpcodeId.GET_APP_CONFIG
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionGetAppConfigCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.GET_APP_CONFIG or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 5:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_token'] = value_
app_cfg_count = span[4]
span = span[5:]
if len(span) < app_cfg_count:
raise Exception('Invalid packet size')
app_cfg = []
for n in range(app_cfg_count):
app_cfg.append(AppConfigTlvType(int.from_bytes(span[n:n + 1], byteorder='little')))
fields['app_cfg'] = app_cfg
span = span[app_cfg_count:]
return SessionGetAppConfigCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_token > 4294967295:
print(f"Invalid value for field SessionGetAppConfigCmd::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
if len(self.app_cfg) > 255:
print(f"Invalid length for field SessionGetAppConfigCmd::app_cfg: {len(self.app_cfg)} > 255; the array will be truncated")
del self.app_cfg[255:]
_span.append((len(self.app_cfg) << 0))
for _elt in self.app_cfg:
_span.append(_elt)
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.app_cfg) * 8 + 5
@dataclass
class SessionGetAppConfigRsp(SessionConfigPacket):
status: Status = field(kw_only=True, default=Status.OK)
tlvs: List[AppConfigTlv] = field(kw_only=True, default_factory=list)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = SessionConfigOpcodeId.GET_APP_CONFIG
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionGetAppConfigRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.GET_APP_CONFIG or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 2:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
tlvs_count = span[1]
span = span[2:]
tlvs = []
for n in range(tlvs_count):
element, span = AppConfigTlv.parse(span)
tlvs.append(element)
fields['tlvs'] = tlvs
return SessionGetAppConfigRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
if len(self.tlvs) > 255:
print(f"Invalid length for field SessionGetAppConfigRsp::tlvs: {len(self.tlvs)} > 255; the array will be truncated")
del self.tlvs[255:]
_span.append((len(self.tlvs) << 0))
for _elt in self.tlvs:
_span.extend(_elt.serialize())
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return sum([elt.size for elt in self.tlvs]) + 2
@dataclass
class SessionGetCountCmd(SessionConfigPacket):
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = SessionConfigOpcodeId.GET_COUNT
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionGetCountCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.GET_COUNT or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
return SessionGetCountCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 0
@dataclass
class SessionGetCountRsp(SessionConfigPacket):
status: Status = field(kw_only=True, default=Status.OK)
session_count: int = field(kw_only=True, default=0)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = SessionConfigOpcodeId.GET_COUNT
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionGetCountRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.GET_COUNT or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 2:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
fields['session_count'] = span[1]
span = span[2:]
return SessionGetCountRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
if self.session_count > 255:
print(f"Invalid value for field SessionGetCountRsp::session_count: {self.session_count} > 255; the value will be truncated")
self.session_count &= 255
_span.append((self.session_count << 0))
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 2
@dataclass
class SessionGetStateCmd(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = SessionConfigOpcodeId.GET_STATE
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionGetStateCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.GET_STATE or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 4:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_token'] = value_
span = span[4:]
return SessionGetStateCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_token > 4294967295:
print(f"Invalid value for field SessionGetStateCmd::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 4
@dataclass
class SessionGetStateRsp(SessionConfigPacket):
status: Status = field(kw_only=True, default=Status.OK)
session_state: SessionState = field(kw_only=True, default=SessionState.SESSION_STATE_INIT)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = SessionConfigOpcodeId.GET_STATE
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionGetStateRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.GET_STATE or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 2:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
fields['session_state'] = SessionState.from_int(span[1])
span = span[2:]
return SessionGetStateRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
_span.append((self.session_state << 0))
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 2
@dataclass
class SessionUpdateDtAnchorRangingRoundsCmd(SessionConfigPacket):
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = SessionConfigOpcodeId.UPDATE_DT_ANCHOR_RANGING_ROUNDS
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateDtAnchorRangingRoundsCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.UPDATE_DT_ANCHOR_RANGING_ROUNDS or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
return SessionUpdateDtAnchorRangingRoundsCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 0
@dataclass
class SessionUpdateDtAnchorRangingRoundsRsp(SessionConfigPacket):
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = SessionConfigOpcodeId.UPDATE_DT_ANCHOR_RANGING_ROUNDS
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateDtAnchorRangingRoundsRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.UPDATE_DT_ANCHOR_RANGING_ROUNDS or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
return SessionUpdateDtAnchorRangingRoundsRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 0
@dataclass
class SessionUpdateDtTagRangingRoundsCmd(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
ranging_round_indexes: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = SessionConfigOpcodeId.UPDATE_DT_TAG_RANGING_ROUNDS
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateDtTagRangingRoundsCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.UPDATE_DT_TAG_RANGING_ROUNDS or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 5:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_token'] = value_
ranging_round_indexes_count = span[4]
span = span[5:]
if len(span) < ranging_round_indexes_count:
raise Exception('Invalid packet size')
fields['ranging_round_indexes'] = list(span[:ranging_round_indexes_count])
span = span[ranging_round_indexes_count:]
return SessionUpdateDtTagRangingRoundsCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_token > 4294967295:
print(f"Invalid value for field SessionUpdateDtTagRangingRoundsCmd::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
if len(self.ranging_round_indexes) > 255:
print(f"Invalid length for field SessionUpdateDtTagRangingRoundsCmd::ranging_round_indexes: {len(self.ranging_round_indexes)} > 255; the array will be truncated")
del self.ranging_round_indexes[255:]
_span.append((len(self.ranging_round_indexes) << 0))
_span.extend(self.ranging_round_indexes)
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.ranging_round_indexes) * 1 + 5
@dataclass
class SessionUpdateDtTagRangingRoundsRsp(SessionConfigPacket):
status: Status = field(kw_only=True, default=Status.OK)
ranging_round_indexes: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = SessionConfigOpcodeId.UPDATE_DT_TAG_RANGING_ROUNDS
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateDtTagRangingRoundsRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.UPDATE_DT_TAG_RANGING_ROUNDS or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 2:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
ranging_round_indexes_count = span[1]
span = span[2:]
if len(span) < ranging_round_indexes_count:
raise Exception('Invalid packet size')
fields['ranging_round_indexes'] = list(span[:ranging_round_indexes_count])
span = span[ranging_round_indexes_count:]
return SessionUpdateDtTagRangingRoundsRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
if len(self.ranging_round_indexes) > 255:
print(f"Invalid length for field SessionUpdateDtTagRangingRoundsRsp::ranging_round_indexes: {len(self.ranging_round_indexes)} > 255; the array will be truncated")
del self.ranging_round_indexes[255:]
_span.append((len(self.ranging_round_indexes) << 0))
_span.extend(self.ranging_round_indexes)
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.ranging_round_indexes) * 1 + 2
@dataclass
class Controlee(Packet):
short_address: bytearray = field(kw_only=True, default_factory=bytearray)
subsession_id: int = field(kw_only=True, default=0)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['Controlee', bytes]:
fields = {'payload': None}
if len(span) < 2:
raise Exception('Invalid packet size')
fields['short_address'] = list(span[:2])
span = span[2:]
if len(span) < 4:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['subsession_id'] = value_
span = span[4:]
return Controlee(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.extend(self.short_address)
if self.subsession_id > 4294967295:
print(f"Invalid value for field Controlee::subsession_id: {self.subsession_id} > 4294967295; the value will be truncated")
self.subsession_id &= 4294967295
_span.extend(int.to_bytes((self.subsession_id << 0), length=4, byteorder='little'))
return bytes(_span)
@property
def size(self) -> int:
return 6
@dataclass
class Controlee_V2_0_16_Byte_Version(Packet):
short_address: bytearray = field(kw_only=True, default_factory=bytearray)
subsession_id: int = field(kw_only=True, default=0)
subsession_key: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['Controlee_V2_0_16_Byte_Version', bytes]:
fields = {'payload': None}
if len(span) < 2:
raise Exception('Invalid packet size')
fields['short_address'] = list(span[:2])
span = span[2:]
if len(span) < 4:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['subsession_id'] = value_
span = span[4:]
if len(span) < 16:
raise Exception('Invalid packet size')
fields['subsession_key'] = list(span[:16])
span = span[16:]
return Controlee_V2_0_16_Byte_Version(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.extend(self.short_address)
if self.subsession_id > 4294967295:
print(f"Invalid value for field Controlee_V2_0_16_Byte_Version::subsession_id: {self.subsession_id} > 4294967295; the value will be truncated")
self.subsession_id &= 4294967295
_span.extend(int.to_bytes((self.subsession_id << 0), length=4, byteorder='little'))
_span.extend(self.subsession_key)
return bytes(_span)
@property
def size(self) -> int:
return 22
@dataclass
class Controlee_V2_0_32_Byte_Version(Packet):
short_address: bytearray = field(kw_only=True, default_factory=bytearray)
subsession_id: int = field(kw_only=True, default=0)
subsession_key: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['Controlee_V2_0_32_Byte_Version', bytes]:
fields = {'payload': None}
if len(span) < 2:
raise Exception('Invalid packet size')
fields['short_address'] = list(span[:2])
span = span[2:]
if len(span) < 4:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['subsession_id'] = value_
span = span[4:]
if len(span) < 32:
raise Exception('Invalid packet size')
fields['subsession_key'] = list(span[:32])
span = span[32:]
return Controlee_V2_0_32_Byte_Version(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.extend(self.short_address)
if self.subsession_id > 4294967295:
print(f"Invalid value for field Controlee_V2_0_32_Byte_Version::subsession_id: {self.subsession_id} > 4294967295; the value will be truncated")
self.subsession_id &= 4294967295
_span.extend(int.to_bytes((self.subsession_id << 0), length=4, byteorder='little'))
_span.extend(self.subsession_key)
return bytes(_span)
@property
def size(self) -> int:
return 38
class UpdateMulticastListAction(enum.IntEnum):
ADD_CONTROLEE = 0x0
REMOVE_CONTROLEE = 0x1
ADD_CONTROLEE_WITH_SHORT_SUB_SESSION_KEY = 0x2
ADD_CONTROLEE_WITH_EXTENDED_SUB_SESSION_KEY = 0x3
@staticmethod
def from_int(v: int) -> Union[int, 'UpdateMulticastListAction']:
try:
return UpdateMulticastListAction(v)
except ValueError as exn:
raise exn
@dataclass
class SessionUpdateControllerMulticastListCmd(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
action: UpdateMulticastListAction = field(kw_only=True, default=UpdateMulticastListAction.ADD_CONTROLEE)
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateControllerMulticastListCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 5:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_token'] = value_
fields['action'] = UpdateMulticastListAction.from_int(span[4])
span = span[5:]
payload = span
span = bytes([])
fields['payload'] = payload
return SessionUpdateControllerMulticastListCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_token > 4294967295:
print(f"Invalid value for field SessionUpdateControllerMulticastListCmd::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
_span.append((self.action << 0))
_span.extend(payload or self.payload or [])
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.payload) + 5
@dataclass
class SessionUpdateControllerMulticastListCmdPayload(Packet):
controlees: List[Controlee] = field(kw_only=True, default_factory=list)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['SessionUpdateControllerMulticastListCmdPayload', bytes]:
fields = {'payload': None}
if len(span) < 1:
raise Exception('Invalid packet size')
controlees_count = span[0]
span = span[1:]
if len(span) < controlees_count * 6:
raise Exception('Invalid packet size')
controlees = []
for n in range(controlees_count):
controlees.append(Controlee.parse_all(span[n * 6:(n + 1) * 6]))
fields['controlees'] = controlees
span = span[controlees_count * 6:]
return SessionUpdateControllerMulticastListCmdPayload(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if len(self.controlees) > 255:
print(f"Invalid length for field SessionUpdateControllerMulticastListCmdPayload::controlees: {len(self.controlees)} > 255; the array will be truncated")
del self.controlees[255:]
_span.append((len(self.controlees) << 0))
for _elt in self.controlees:
_span.extend(_elt.serialize())
return bytes(_span)
@property
def size(self) -> int:
return sum([elt.size for elt in self.controlees]) + 1
@dataclass
class SessionUpdateControllerMulticastListCmd_2_0_16_Byte_Payload(Packet):
controlees: List[Controlee_V2_0_16_Byte_Version] = field(kw_only=True, default_factory=list)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['SessionUpdateControllerMulticastListCmd_2_0_16_Byte_Payload', bytes]:
fields = {'payload': None}
if len(span) < 1:
raise Exception('Invalid packet size')
controlees_count = span[0]
span = span[1:]
if len(span) < controlees_count * 22:
raise Exception('Invalid packet size')
controlees = []
for n in range(controlees_count):
controlees.append(Controlee_V2_0_16_Byte_Version.parse_all(span[n * 22:(n + 1) * 22]))
fields['controlees'] = controlees
span = span[controlees_count * 22:]
return SessionUpdateControllerMulticastListCmd_2_0_16_Byte_Payload(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if len(self.controlees) > 255:
print(f"Invalid length for field SessionUpdateControllerMulticastListCmd_2_0_16_Byte_Payload::controlees: {len(self.controlees)} > 255; the array will be truncated")
del self.controlees[255:]
_span.append((len(self.controlees) << 0))
for _elt in self.controlees:
_span.extend(_elt.serialize())
return bytes(_span)
@property
def size(self) -> int:
return sum([elt.size for elt in self.controlees]) + 1
@dataclass
class SessionUpdateControllerMulticastListCmd_2_0_32_Byte_Payload(Packet):
controlees: List[Controlee_V2_0_32_Byte_Version] = field(kw_only=True, default_factory=list)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['SessionUpdateControllerMulticastListCmd_2_0_32_Byte_Payload', bytes]:
fields = {'payload': None}
if len(span) < 1:
raise Exception('Invalid packet size')
controlees_count = span[0]
span = span[1:]
if len(span) < controlees_count * 38:
raise Exception('Invalid packet size')
controlees = []
for n in range(controlees_count):
controlees.append(Controlee_V2_0_32_Byte_Version.parse_all(span[n * 38:(n + 1) * 38]))
fields['controlees'] = controlees
span = span[controlees_count * 38:]
return SessionUpdateControllerMulticastListCmd_2_0_32_Byte_Payload(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if len(self.controlees) > 255:
print(f"Invalid length for field SessionUpdateControllerMulticastListCmd_2_0_32_Byte_Payload::controlees: {len(self.controlees)} > 255; the array will be truncated")
del self.controlees[255:]
_span.append((len(self.controlees) << 0))
for _elt in self.controlees:
_span.extend(_elt.serialize())
return bytes(_span)
@property
def size(self) -> int:
return sum([elt.size for elt in self.controlees]) + 1
@dataclass
class SessionUpdateControllerMulticastListRsp(SessionConfigPacket):
status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateControllerMulticastListRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
span = span[1:]
return SessionUpdateControllerMulticastListRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1
@dataclass
class ControleeStatus(Packet):
mac_address: bytearray = field(kw_only=True, default_factory=bytearray)
subsession_id: int = field(kw_only=True, default=0)
status: MulticastUpdateStatus = field(kw_only=True, default=MulticastUpdateStatus.OK_MULTICAST_LIST_UPDATE)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['ControleeStatus', bytes]:
fields = {'payload': None}
if len(span) < 2:
raise Exception('Invalid packet size')
fields['mac_address'] = list(span[:2])
span = span[2:]
if len(span) < 5:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['subsession_id'] = value_
fields['status'] = MulticastUpdateStatus.from_int(span[4])
span = span[5:]
return ControleeStatus(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.extend(self.mac_address)
if self.subsession_id > 4294967295:
print(f"Invalid value for field ControleeStatus::subsession_id: {self.subsession_id} > 4294967295; the value will be truncated")
self.subsession_id &= 4294967295
_span.extend(int.to_bytes((self.subsession_id << 0), length=4, byteorder='little'))
_span.append((self.status << 0))
return bytes(_span)
@property
def size(self) -> int:
return 7
@dataclass
class SessionUpdateControllerMulticastListNtf(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
remaining_multicast_list_size: int = field(kw_only=True, default=0)
controlee_status: List[ControleeStatus] = field(kw_only=True, default_factory=list)
def __post_init__(self):
self.mt = MessageType.NOTIFICATION
self.oid = SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateControllerMulticastListNtf', bytes]:
if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 6:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_token'] = value_
fields['remaining_multicast_list_size'] = span[4]
controlee_status_count = span[5]
span = span[6:]
if len(span) < controlee_status_count * 7:
raise Exception('Invalid packet size')
controlee_status = []
for n in range(controlee_status_count):
controlee_status.append(ControleeStatus.parse_all(span[n * 7:(n + 1) * 7]))
fields['controlee_status'] = controlee_status
span = span[controlee_status_count * 7:]
return SessionUpdateControllerMulticastListNtf(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_token > 4294967295:
print(f"Invalid value for field SessionUpdateControllerMulticastListNtf::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
if self.remaining_multicast_list_size > 255:
print(f"Invalid value for field SessionUpdateControllerMulticastListNtf::remaining_multicast_list_size: {self.remaining_multicast_list_size} > 255; the value will be truncated")
self.remaining_multicast_list_size &= 255
_span.append((self.remaining_multicast_list_size << 0))
if len(self.controlee_status) > 255:
print(f"Invalid length for field SessionUpdateControllerMulticastListNtf::controlee_status: {len(self.controlee_status)} > 255; the array will be truncated")
del self.controlee_status[255:]
_span.append((len(self.controlee_status) << 0))
for _elt in self.controlee_status:
_span.extend(_elt.serialize())
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return sum([elt.size for elt in self.controlee_status]) + 6
@dataclass
class SessionDataCreditNtf(SessionControlPacket):
session_token: int = field(kw_only=True, default=0)
credit_availability: CreditAvailability = field(kw_only=True, default=CreditAvailability.CREDIT_NOT_AVAILABLE)
def __post_init__(self):
self.mt = MessageType.NOTIFICATION
self.oid = SessionControlOpcodeId.DATA_CREDIT
self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionDataCreditNtf', bytes]:
if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.DATA_CREDIT or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 5:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_token'] = value_
fields['credit_availability'] = CreditAvailability.from_int(span[4])
span = span[5:]
return SessionDataCreditNtf(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_token > 4294967295:
print(f"Invalid value for field SessionDataCreditNtf::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
_span.append((self.credit_availability << 0))
return SessionControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 5
@dataclass
class SessionDataTransferStatusNtf(SessionControlPacket):
session_token: int = field(kw_only=True, default=0)
uci_sequence_number: int = field(kw_only=True, default=0)
status: DataTransferNtfStatusCode = field(kw_only=True, default=DataTransferNtfStatusCode.UCI_DATA_TRANSFER_STATUS_REPETITION_OK)
tx_count: int = field(kw_only=True, default=0)
def __post_init__(self):
self.mt = MessageType.NOTIFICATION
self.oid = SessionControlOpcodeId.DATA_TRANSFER_STATUS
self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionDataTransferStatusNtf', bytes]:
if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.DATA_TRANSFER_STATUS or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 7:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_token'] = value_
fields['uci_sequence_number'] = span[4]
fields['status'] = DataTransferNtfStatusCode.from_int(span[5])
fields['tx_count'] = span[6]
span = span[7:]
return SessionDataTransferStatusNtf(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_token > 4294967295:
print(f"Invalid value for field SessionDataTransferStatusNtf::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
if self.uci_sequence_number > 255:
print(f"Invalid value for field SessionDataTransferStatusNtf::uci_sequence_number: {self.uci_sequence_number} > 255; the value will be truncated")
self.uci_sequence_number &= 255
_span.append((self.uci_sequence_number << 0))
_span.append((self.status << 0))
if self.tx_count > 255:
print(f"Invalid value for field SessionDataTransferStatusNtf::tx_count: {self.tx_count} > 255; the value will be truncated")
self.tx_count &= 255
_span.append((self.tx_count << 0))
return SessionControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 7
@dataclass
class SessionQueryMaxDataSizeInRangingCmd(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = SessionConfigOpcodeId.QUERY_DATA_SIZE_IN_RANGING
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionQueryMaxDataSizeInRangingCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.QUERY_DATA_SIZE_IN_RANGING or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 4:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_token'] = value_
span = span[4:]
return SessionQueryMaxDataSizeInRangingCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_token > 4294967295:
print(f"Invalid value for field SessionQueryMaxDataSizeInRangingCmd::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 4
@dataclass
class SessionQueryMaxDataSizeInRangingRsp(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
max_data_size: int = field(kw_only=True, default=0)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = SessionConfigOpcodeId.QUERY_DATA_SIZE_IN_RANGING
self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionQueryMaxDataSizeInRangingRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.QUERY_DATA_SIZE_IN_RANGING or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 6:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_token'] = value_
value_ = int.from_bytes(span[4:6], byteorder='little')
fields['max_data_size'] = value_
span = span[6:]
return SessionQueryMaxDataSizeInRangingRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_token > 4294967295:
print(f"Invalid value for field SessionQueryMaxDataSizeInRangingRsp::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
if self.max_data_size > 65535:
print(f"Invalid value for field SessionQueryMaxDataSizeInRangingRsp::max_data_size: {self.max_data_size} > 65535; the value will be truncated")
self.max_data_size &= 65535
_span.extend(int.to_bytes((self.max_data_size << 0), length=2, byteorder='little'))
return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 6
@dataclass
class SessionStartCmd(SessionControlPacket):
session_id: int = field(kw_only=True, default=0)
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = SessionControlOpcodeId.START
self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionStartCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 4:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_id'] = value_
span = span[4:]
return SessionStartCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_id > 4294967295:
print(f"Invalid value for field SessionStartCmd::session_id: {self.session_id} > 4294967295; the value will be truncated")
self.session_id &= 4294967295
_span.extend(int.to_bytes((self.session_id << 0), length=4, byteorder='little'))
return SessionControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 4
@dataclass
class SessionStartRsp(SessionControlPacket):
status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = SessionControlOpcodeId.START
self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionStartRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
span = span[1:]
return SessionStartRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
return SessionControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1
@dataclass
class ShortAddressTwoWayRangingMeasurement(Packet):
mac_address: int = field(kw_only=True, default=0)
status: Status = field(kw_only=True, default=Status.OK)
nlos: int = field(kw_only=True, default=0)
distance: int = field(kw_only=True, default=0)
aoa_azimuth: int = field(kw_only=True, default=0)
aoa_azimuth_fom: int = field(kw_only=True, default=0)
aoa_elevation: int = field(kw_only=True, default=0)
aoa_elevation_fom: int = field(kw_only=True, default=0)
aoa_destination_azimuth: int = field(kw_only=True, default=0)
aoa_destination_azimuth_fom: int = field(kw_only=True, default=0)
aoa_destination_elevation: int = field(kw_only=True, default=0)
aoa_destination_elevation_fom: int = field(kw_only=True, default=0)
slot_index: int = field(kw_only=True, default=0)
rssi: int = field(kw_only=True, default=0)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['ShortAddressTwoWayRangingMeasurement', bytes]:
fields = {'payload': None}
if len(span) < 31:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:2], byteorder='little')
fields['mac_address'] = value_
fields['status'] = Status.from_int(span[2])
fields['nlos'] = span[3]
value_ = int.from_bytes(span[4:6], byteorder='little')
fields['distance'] = value_
value_ = int.from_bytes(span[6:8], byteorder='little')
fields['aoa_azimuth'] = value_
fields['aoa_azimuth_fom'] = span[8]
value_ = int.from_bytes(span[9:11], byteorder='little')
fields['aoa_elevation'] = value_
fields['aoa_elevation_fom'] = span[11]
value_ = int.from_bytes(span[12:14], byteorder='little')
fields['aoa_destination_azimuth'] = value_
fields['aoa_destination_azimuth_fom'] = span[14]
value_ = int.from_bytes(span[15:17], byteorder='little')
fields['aoa_destination_elevation'] = value_
fields['aoa_destination_elevation_fom'] = span[17]
fields['slot_index'] = span[18]
fields['rssi'] = span[19]
value_ = int.from_bytes(span[20:28], byteorder='little')
value_ = int.from_bytes(span[28:31], byteorder='little')
span = span[31:]
return ShortAddressTwoWayRangingMeasurement(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.mac_address > 65535:
print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::mac_address: {self.mac_address} > 65535; the value will be truncated")
self.mac_address &= 65535
_span.extend(int.to_bytes((self.mac_address << 0), length=2, byteorder='little'))
_span.append((self.status << 0))
if self.nlos > 255:
print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::nlos: {self.nlos} > 255; the value will be truncated")
self.nlos &= 255
_span.append((self.nlos << 0))
if self.distance > 65535:
print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::distance: {self.distance} > 65535; the value will be truncated")
self.distance &= 65535
_span.extend(int.to_bytes((self.distance << 0), length=2, byteorder='little'))
if self.aoa_azimuth > 65535:
print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::aoa_azimuth: {self.aoa_azimuth} > 65535; the value will be truncated")
self.aoa_azimuth &= 65535
_span.extend(int.to_bytes((self.aoa_azimuth << 0), length=2, byteorder='little'))
if self.aoa_azimuth_fom > 255:
print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::aoa_azimuth_fom: {self.aoa_azimuth_fom} > 255; the value will be truncated")
self.aoa_azimuth_fom &= 255
_span.append((self.aoa_azimuth_fom << 0))
if self.aoa_elevation > 65535:
print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::aoa_elevation: {self.aoa_elevation} > 65535; the value will be truncated")
self.aoa_elevation &= 65535
_span.extend(int.to_bytes((self.aoa_elevation << 0), length=2, byteorder='little'))
if self.aoa_elevation_fom > 255:
print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::aoa_elevation_fom: {self.aoa_elevation_fom} > 255; the value will be truncated")
self.aoa_elevation_fom &= 255
_span.append((self.aoa_elevation_fom << 0))
if self.aoa_destination_azimuth > 65535:
print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::aoa_destination_azimuth: {self.aoa_destination_azimuth} > 65535; the value will be truncated")
self.aoa_destination_azimuth &= 65535
_span.extend(int.to_bytes((self.aoa_destination_azimuth << 0), length=2, byteorder='little'))
if self.aoa_destination_azimuth_fom > 255:
print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::aoa_destination_azimuth_fom: {self.aoa_destination_azimuth_fom} > 255; the value will be truncated")
self.aoa_destination_azimuth_fom &= 255
_span.append((self.aoa_destination_azimuth_fom << 0))
if self.aoa_destination_elevation > 65535:
print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::aoa_destination_elevation: {self.aoa_destination_elevation} > 65535; the value will be truncated")
self.aoa_destination_elevation &= 65535
_span.extend(int.to_bytes((self.aoa_destination_elevation << 0), length=2, byteorder='little'))
if self.aoa_destination_elevation_fom > 255:
print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::aoa_destination_elevation_fom: {self.aoa_destination_elevation_fom} > 255; the value will be truncated")
self.aoa_destination_elevation_fom &= 255
_span.append((self.aoa_destination_elevation_fom << 0))
if self.slot_index > 255:
print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::slot_index: {self.slot_index} > 255; the value will be truncated")
self.slot_index &= 255
_span.append((self.slot_index << 0))
if self.rssi > 255:
print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::rssi: {self.rssi} > 255; the value will be truncated")
self.rssi &= 255
_span.append((self.rssi << 0))
_span.extend([0] * 8)
_span.extend([0] * 3)
return bytes(_span)
@property
def size(self) -> int:
return 31
@dataclass
class ExtendedAddressTwoWayRangingMeasurement(Packet):
mac_address: int = field(kw_only=True, default=0)
status: Status = field(kw_only=True, default=Status.OK)
nlos: int = field(kw_only=True, default=0)
distance: int = field(kw_only=True, default=0)
aoa_azimuth: int = field(kw_only=True, default=0)
aoa_azimuth_fom: int = field(kw_only=True, default=0)
aoa_elevation: int = field(kw_only=True, default=0)
aoa_elevation_fom: int = field(kw_only=True, default=0)
aoa_destination_azimuth: int = field(kw_only=True, default=0)
aoa_destination_azimuth_fom: int = field(kw_only=True, default=0)
aoa_destination_elevation: int = field(kw_only=True, default=0)
aoa_destination_elevation_fom: int = field(kw_only=True, default=0)
slot_index: int = field(kw_only=True, default=0)
rssi: int = field(kw_only=True, default=0)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['ExtendedAddressTwoWayRangingMeasurement', bytes]:
fields = {'payload': None}
if len(span) < 31:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:8], byteorder='little')
fields['mac_address'] = value_
fields['status'] = Status.from_int(span[8])
fields['nlos'] = span[9]
value_ = int.from_bytes(span[10:12], byteorder='little')
fields['distance'] = value_
value_ = int.from_bytes(span[12:14], byteorder='little')
fields['aoa_azimuth'] = value_
fields['aoa_azimuth_fom'] = span[14]
value_ = int.from_bytes(span[15:17], byteorder='little')
fields['aoa_elevation'] = value_
fields['aoa_elevation_fom'] = span[17]
value_ = int.from_bytes(span[18:20], byteorder='little')
fields['aoa_destination_azimuth'] = value_
fields['aoa_destination_azimuth_fom'] = span[20]
value_ = int.from_bytes(span[21:23], byteorder='little')
fields['aoa_destination_elevation'] = value_
fields['aoa_destination_elevation_fom'] = span[23]
fields['slot_index'] = span[24]
fields['rssi'] = span[25]
value_ = int.from_bytes(span[26:31], byteorder='little')
span = span[31:]
return ExtendedAddressTwoWayRangingMeasurement(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.mac_address > 18446744073709551615:
print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::mac_address: {self.mac_address} > 18446744073709551615; the value will be truncated")
self.mac_address &= 18446744073709551615
_span.extend(int.to_bytes((self.mac_address << 0), length=8, byteorder='little'))
_span.append((self.status << 0))
if self.nlos > 255:
print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::nlos: {self.nlos} > 255; the value will be truncated")
self.nlos &= 255
_span.append((self.nlos << 0))
if self.distance > 65535:
print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::distance: {self.distance} > 65535; the value will be truncated")
self.distance &= 65535
_span.extend(int.to_bytes((self.distance << 0), length=2, byteorder='little'))
if self.aoa_azimuth > 65535:
print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::aoa_azimuth: {self.aoa_azimuth} > 65535; the value will be truncated")
self.aoa_azimuth &= 65535
_span.extend(int.to_bytes((self.aoa_azimuth << 0), length=2, byteorder='little'))
if self.aoa_azimuth_fom > 255:
print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::aoa_azimuth_fom: {self.aoa_azimuth_fom} > 255; the value will be truncated")
self.aoa_azimuth_fom &= 255
_span.append((self.aoa_azimuth_fom << 0))
if self.aoa_elevation > 65535:
print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::aoa_elevation: {self.aoa_elevation} > 65535; the value will be truncated")
self.aoa_elevation &= 65535
_span.extend(int.to_bytes((self.aoa_elevation << 0), length=2, byteorder='little'))
if self.aoa_elevation_fom > 255:
print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::aoa_elevation_fom: {self.aoa_elevation_fom} > 255; the value will be truncated")
self.aoa_elevation_fom &= 255
_span.append((self.aoa_elevation_fom << 0))
if self.aoa_destination_azimuth > 65535:
print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::aoa_destination_azimuth: {self.aoa_destination_azimuth} > 65535; the value will be truncated")
self.aoa_destination_azimuth &= 65535
_span.extend(int.to_bytes((self.aoa_destination_azimuth << 0), length=2, byteorder='little'))
if self.aoa_destination_azimuth_fom > 255:
print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::aoa_destination_azimuth_fom: {self.aoa_destination_azimuth_fom} > 255; the value will be truncated")
self.aoa_destination_azimuth_fom &= 255
_span.append((self.aoa_destination_azimuth_fom << 0))
if self.aoa_destination_elevation > 65535:
print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::aoa_destination_elevation: {self.aoa_destination_elevation} > 65535; the value will be truncated")
self.aoa_destination_elevation &= 65535
_span.extend(int.to_bytes((self.aoa_destination_elevation << 0), length=2, byteorder='little'))
if self.aoa_destination_elevation_fom > 255:
print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::aoa_destination_elevation_fom: {self.aoa_destination_elevation_fom} > 255; the value will be truncated")
self.aoa_destination_elevation_fom &= 255
_span.append((self.aoa_destination_elevation_fom << 0))
if self.slot_index > 255:
print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::slot_index: {self.slot_index} > 255; the value will be truncated")
self.slot_index &= 255
_span.append((self.slot_index << 0))
if self.rssi > 255:
print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::rssi: {self.rssi} > 255; the value will be truncated")
self.rssi &= 255
_span.append((self.rssi << 0))
_span.extend([0] * 5)
return bytes(_span)
@property
def size(self) -> int:
return 31
@dataclass
class ShortAddressOwrAoaRangingMeasurement(Packet):
mac_address: int = field(kw_only=True, default=0)
status: Status = field(kw_only=True, default=Status.OK)
nlos: int = field(kw_only=True, default=0)
frame_sequence_number: int = field(kw_only=True, default=0)
block_index: int = field(kw_only=True, default=0)
aoa_azimuth: int = field(kw_only=True, default=0)
aoa_azimuth_fom: int = field(kw_only=True, default=0)
aoa_elevation: int = field(kw_only=True, default=0)
aoa_elevation_fom: int = field(kw_only=True, default=0)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['ShortAddressOwrAoaRangingMeasurement', bytes]:
fields = {'payload': None}
if len(span) < 13:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:2], byteorder='little')
fields['mac_address'] = value_
fields['status'] = Status.from_int(span[2])
fields['nlos'] = span[3]
fields['frame_sequence_number'] = span[4]
value_ = int.from_bytes(span[5:7], byteorder='little')
fields['block_index'] = value_
value_ = int.from_bytes(span[7:9], byteorder='little')
fields['aoa_azimuth'] = value_
fields['aoa_azimuth_fom'] = span[9]
value_ = int.from_bytes(span[10:12], byteorder='little')
fields['aoa_elevation'] = value_
fields['aoa_elevation_fom'] = span[12]
span = span[13:]
return ShortAddressOwrAoaRangingMeasurement(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.mac_address > 65535:
print(f"Invalid value for field ShortAddressOwrAoaRangingMeasurement::mac_address: {self.mac_address} > 65535; the value will be truncated")
self.mac_address &= 65535
_span.extend(int.to_bytes((self.mac_address << 0), length=2, byteorder='little'))
_span.append((self.status << 0))
if self.nlos > 255:
print(f"Invalid value for field ShortAddressOwrAoaRangingMeasurement::nlos: {self.nlos} > 255; the value will be truncated")
self.nlos &= 255
_span.append((self.nlos << 0))
if self.frame_sequence_number > 255:
print(f"Invalid value for field ShortAddressOwrAoaRangingMeasurement::frame_sequence_number: {self.frame_sequence_number} > 255; the value will be truncated")
self.frame_sequence_number &= 255
_span.append((self.frame_sequence_number << 0))
if self.block_index > 65535:
print(f"Invalid value for field ShortAddressOwrAoaRangingMeasurement::block_index: {self.block_index} > 65535; the value will be truncated")
self.block_index &= 65535
_span.extend(int.to_bytes((self.block_index << 0), length=2, byteorder='little'))
if self.aoa_azimuth > 65535:
print(f"Invalid value for field ShortAddressOwrAoaRangingMeasurement::aoa_azimuth: {self.aoa_azimuth} > 65535; the value will be truncated")
self.aoa_azimuth &= 65535
_span.extend(int.to_bytes((self.aoa_azimuth << 0), length=2, byteorder='little'))
if self.aoa_azimuth_fom > 255:
print(f"Invalid value for field ShortAddressOwrAoaRangingMeasurement::aoa_azimuth_fom: {self.aoa_azimuth_fom} > 255; the value will be truncated")
self.aoa_azimuth_fom &= 255
_span.append((self.aoa_azimuth_fom << 0))
if self.aoa_elevation > 65535:
print(f"Invalid value for field ShortAddressOwrAoaRangingMeasurement::aoa_elevation: {self.aoa_elevation} > 65535; the value will be truncated")
self.aoa_elevation &= 65535
_span.extend(int.to_bytes((self.aoa_elevation << 0), length=2, byteorder='little'))
if self.aoa_elevation_fom > 255:
print(f"Invalid value for field ShortAddressOwrAoaRangingMeasurement::aoa_elevation_fom: {self.aoa_elevation_fom} > 255; the value will be truncated")
self.aoa_elevation_fom &= 255
_span.append((self.aoa_elevation_fom << 0))
return bytes(_span)
@property
def size(self) -> int:
return 13
@dataclass
class ExtendedAddressOwrAoaRangingMeasurement(Packet):
mac_address: int = field(kw_only=True, default=0)
status: Status = field(kw_only=True, default=Status.OK)
nlos: int = field(kw_only=True, default=0)
frame_sequence_number: int = field(kw_only=True, default=0)
block_index: int = field(kw_only=True, default=0)
aoa_azimuth: int = field(kw_only=True, default=0)
aoa_azimuth_fom: int = field(kw_only=True, default=0)
aoa_elevation: int = field(kw_only=True, default=0)
aoa_elevation_fom: int = field(kw_only=True, default=0)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['ExtendedAddressOwrAoaRangingMeasurement', bytes]:
fields = {'payload': None}
if len(span) < 19:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:8], byteorder='little')
fields['mac_address'] = value_
fields['status'] = Status.from_int(span[8])
fields['nlos'] = span[9]
fields['frame_sequence_number'] = span[10]
value_ = int.from_bytes(span[11:13], byteorder='little')
fields['block_index'] = value_
value_ = int.from_bytes(span[13:15], byteorder='little')
fields['aoa_azimuth'] = value_
fields['aoa_azimuth_fom'] = span[15]
value_ = int.from_bytes(span[16:18], byteorder='little')
fields['aoa_elevation'] = value_
fields['aoa_elevation_fom'] = span[18]
span = span[19:]
return ExtendedAddressOwrAoaRangingMeasurement(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.mac_address > 18446744073709551615:
print(f"Invalid value for field ExtendedAddressOwrAoaRangingMeasurement::mac_address: {self.mac_address} > 18446744073709551615; the value will be truncated")
self.mac_address &= 18446744073709551615
_span.extend(int.to_bytes((self.mac_address << 0), length=8, byteorder='little'))
_span.append((self.status << 0))
if self.nlos > 255:
print(f"Invalid value for field ExtendedAddressOwrAoaRangingMeasurement::nlos: {self.nlos} > 255; the value will be truncated")
self.nlos &= 255
_span.append((self.nlos << 0))
if self.frame_sequence_number > 255:
print(f"Invalid value for field ExtendedAddressOwrAoaRangingMeasurement::frame_sequence_number: {self.frame_sequence_number} > 255; the value will be truncated")
self.frame_sequence_number &= 255
_span.append((self.frame_sequence_number << 0))
if self.block_index > 65535:
print(f"Invalid value for field ExtendedAddressOwrAoaRangingMeasurement::block_index: {self.block_index} > 65535; the value will be truncated")
self.block_index &= 65535
_span.extend(int.to_bytes((self.block_index << 0), length=2, byteorder='little'))
if self.aoa_azimuth > 65535:
print(f"Invalid value for field ExtendedAddressOwrAoaRangingMeasurement::aoa_azimuth: {self.aoa_azimuth} > 65535; the value will be truncated")
self.aoa_azimuth &= 65535
_span.extend(int.to_bytes((self.aoa_azimuth << 0), length=2, byteorder='little'))
if self.aoa_azimuth_fom > 255:
print(f"Invalid value for field ExtendedAddressOwrAoaRangingMeasurement::aoa_azimuth_fom: {self.aoa_azimuth_fom} > 255; the value will be truncated")
self.aoa_azimuth_fom &= 255
_span.append((self.aoa_azimuth_fom << 0))
if self.aoa_elevation > 65535:
print(f"Invalid value for field ExtendedAddressOwrAoaRangingMeasurement::aoa_elevation: {self.aoa_elevation} > 65535; the value will be truncated")
self.aoa_elevation &= 65535
_span.extend(int.to_bytes((self.aoa_elevation << 0), length=2, byteorder='little'))
if self.aoa_elevation_fom > 255:
print(f"Invalid value for field ExtendedAddressOwrAoaRangingMeasurement::aoa_elevation_fom: {self.aoa_elevation_fom} > 255; the value will be truncated")
self.aoa_elevation_fom &= 255
_span.append((self.aoa_elevation_fom << 0))
return bytes(_span)
@property
def size(self) -> int:
return 19
class RangingMeasurementType(enum.IntEnum):
ONE_WAY = 0x0
TWO_WAY = 0x1
DL_TDOA = 0x2
OWR_AOA = 0x3
@staticmethod
def from_int(v: int) -> Union[int, 'RangingMeasurementType']:
try:
return RangingMeasurementType(v)
except ValueError as exn:
raise exn
@dataclass
class SessionInfoNtf(SessionControlPacket):
sequence_number: int = field(kw_only=True, default=0)
session_token: int = field(kw_only=True, default=0)
rcr_indicator: int = field(kw_only=True, default=0)
current_ranging_interval: int = field(kw_only=True, default=0)
ranging_measurement_type: RangingMeasurementType = field(kw_only=True, default=RangingMeasurementType.ONE_WAY)
mac_address_indicator: MacAddressIndicator = field(kw_only=True, default=MacAddressIndicator.SHORT_ADDRESS)
def __post_init__(self):
self.mt = MessageType.NOTIFICATION
self.oid = SessionControlOpcodeId.START
self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionInfoNtf', bytes]:
if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 24:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['sequence_number'] = value_
value_ = int.from_bytes(span[4:8], byteorder='little')
fields['session_token'] = value_
fields['rcr_indicator'] = span[8]
value_ = int.from_bytes(span[9:13], byteorder='little')
fields['current_ranging_interval'] = value_
fields['ranging_measurement_type'] = RangingMeasurementType.from_int(span[13])
fields['mac_address_indicator'] = MacAddressIndicator.from_int(span[15])
value_ = int.from_bytes(span[16:24], byteorder='little')
span = span[24:]
payload = span
span = bytes([])
fields['payload'] = payload
try:
return ShortMacTwoWaySessionInfoNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return ExtendedMacTwoWaySessionInfoNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return ShortMacDlTDoASessionInfoNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return ExtendedMacDlTDoASessionInfoNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return ShortMacOwrAoaSessionInfoNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return ExtendedMacOwrAoaSessionInfoNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
return SessionInfoNtf(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.sequence_number > 4294967295:
print(f"Invalid value for field SessionInfoNtf::sequence_number: {self.sequence_number} > 4294967295; the value will be truncated")
self.sequence_number &= 4294967295
_span.extend(int.to_bytes((self.sequence_number << 0), length=4, byteorder='little'))
if self.session_token > 4294967295:
print(f"Invalid value for field SessionInfoNtf::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
if self.rcr_indicator > 255:
print(f"Invalid value for field SessionInfoNtf::rcr_indicator: {self.rcr_indicator} > 255; the value will be truncated")
self.rcr_indicator &= 255
_span.append((self.rcr_indicator << 0))
if self.current_ranging_interval > 4294967295:
print(f"Invalid value for field SessionInfoNtf::current_ranging_interval: {self.current_ranging_interval} > 4294967295; the value will be truncated")
self.current_ranging_interval &= 4294967295
_span.extend(int.to_bytes((self.current_ranging_interval << 0), length=4, byteorder='little'))
_span.append((self.ranging_measurement_type << 0))
_span.extend([0] * 1)
_span.append((self.mac_address_indicator << 0))
_span.extend([0] * 8)
_span.extend(payload or self.payload or [])
return SessionControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.payload) + 24
@dataclass
class ShortMacTwoWaySessionInfoNtf(SessionInfoNtf):
two_way_ranging_measurements: List[ShortAddressTwoWayRangingMeasurement] = field(kw_only=True, default_factory=list)
vendor_data: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
self.ranging_measurement_type = RangingMeasurementType.TWO_WAY
self.mac_address_indicator = MacAddressIndicator.SHORT_ADDRESS
self.mt = MessageType.NOTIFICATION
self.oid = SessionControlOpcodeId.START
self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['ShortMacTwoWaySessionInfoNtf', bytes]:
if fields['ranging_measurement_type'] != RangingMeasurementType.TWO_WAY or fields['mac_address_indicator'] != MacAddressIndicator.SHORT_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
two_way_ranging_measurements_count = span[0]
span = span[1:]
if len(span) < two_way_ranging_measurements_count * 31:
raise Exception('Invalid packet size')
two_way_ranging_measurements = []
for n in range(two_way_ranging_measurements_count):
two_way_ranging_measurements.append(ShortAddressTwoWayRangingMeasurement.parse_all(span[n * 31:(n + 1) * 31]))
fields['two_way_ranging_measurements'] = two_way_ranging_measurements
span = span[two_way_ranging_measurements_count * 31:]
fields['vendor_data'] = list(span)
span = bytes()
return ShortMacTwoWaySessionInfoNtf(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if len(self.two_way_ranging_measurements) > 255:
print(f"Invalid length for field ShortMacTwoWaySessionInfoNtf::two_way_ranging_measurements: {len(self.two_way_ranging_measurements)} > 255; the array will be truncated")
del self.two_way_ranging_measurements[255:]
_span.append((len(self.two_way_ranging_measurements) << 0))
for _elt in self.two_way_ranging_measurements:
_span.extend(_elt.serialize())
_span.extend(self.vendor_data)
return SessionInfoNtf.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1 + (
sum([elt.size for elt in self.two_way_ranging_measurements]) +
len(self.vendor_data) * 1
)
@dataclass
class ExtendedMacTwoWaySessionInfoNtf(SessionInfoNtf):
two_way_ranging_measurements: List[ExtendedAddressTwoWayRangingMeasurement] = field(kw_only=True, default_factory=list)
vendor_data: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
self.ranging_measurement_type = RangingMeasurementType.TWO_WAY
self.mac_address_indicator = MacAddressIndicator.EXTENDED_ADDRESS
self.mt = MessageType.NOTIFICATION
self.oid = SessionControlOpcodeId.START
self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['ExtendedMacTwoWaySessionInfoNtf', bytes]:
if fields['ranging_measurement_type'] != RangingMeasurementType.TWO_WAY or fields['mac_address_indicator'] != MacAddressIndicator.EXTENDED_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
two_way_ranging_measurements_count = span[0]
span = span[1:]
if len(span) < two_way_ranging_measurements_count * 31:
raise Exception('Invalid packet size')
two_way_ranging_measurements = []
for n in range(two_way_ranging_measurements_count):
two_way_ranging_measurements.append(ExtendedAddressTwoWayRangingMeasurement.parse_all(span[n * 31:(n + 1) * 31]))
fields['two_way_ranging_measurements'] = two_way_ranging_measurements
span = span[two_way_ranging_measurements_count * 31:]
fields['vendor_data'] = list(span)
span = bytes()
return ExtendedMacTwoWaySessionInfoNtf(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if len(self.two_way_ranging_measurements) > 255:
print(f"Invalid length for field ExtendedMacTwoWaySessionInfoNtf::two_way_ranging_measurements: {len(self.two_way_ranging_measurements)} > 255; the array will be truncated")
del self.two_way_ranging_measurements[255:]
_span.append((len(self.two_way_ranging_measurements) << 0))
for _elt in self.two_way_ranging_measurements:
_span.extend(_elt.serialize())
_span.extend(self.vendor_data)
return SessionInfoNtf.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1 + (
sum([elt.size for elt in self.two_way_ranging_measurements]) +
len(self.vendor_data) * 1
)
@dataclass
class ShortMacDlTDoASessionInfoNtf(SessionInfoNtf):
no_of_ranging_measurements: int = field(kw_only=True, default=0)
dl_tdoa_measurements: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
self.ranging_measurement_type = RangingMeasurementType.DL_TDOA
self.mac_address_indicator = MacAddressIndicator.SHORT_ADDRESS
self.mt = MessageType.NOTIFICATION
self.oid = SessionControlOpcodeId.START
self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['ShortMacDlTDoASessionInfoNtf', bytes]:
if fields['ranging_measurement_type'] != RangingMeasurementType.DL_TDOA or fields['mac_address_indicator'] != MacAddressIndicator.SHORT_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
fields['no_of_ranging_measurements'] = span[0]
span = span[1:]
fields['dl_tdoa_measurements'] = list(span)
span = bytes()
return ShortMacDlTDoASessionInfoNtf(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.no_of_ranging_measurements > 255:
print(f"Invalid value for field ShortMacDlTDoASessionInfoNtf::no_of_ranging_measurements: {self.no_of_ranging_measurements} > 255; the value will be truncated")
self.no_of_ranging_measurements &= 255
_span.append((self.no_of_ranging_measurements << 0))
_span.extend(self.dl_tdoa_measurements)
return SessionInfoNtf.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.dl_tdoa_measurements) * 1 + 1
@dataclass
class ExtendedMacDlTDoASessionInfoNtf(SessionInfoNtf):
no_of_ranging_measurements: int = field(kw_only=True, default=0)
dl_tdoa_measurements: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
self.ranging_measurement_type = RangingMeasurementType.DL_TDOA
self.mac_address_indicator = MacAddressIndicator.EXTENDED_ADDRESS
self.mt = MessageType.NOTIFICATION
self.oid = SessionControlOpcodeId.START
self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['ExtendedMacDlTDoASessionInfoNtf', bytes]:
if fields['ranging_measurement_type'] != RangingMeasurementType.DL_TDOA or fields['mac_address_indicator'] != MacAddressIndicator.EXTENDED_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
fields['no_of_ranging_measurements'] = span[0]
span = span[1:]
fields['dl_tdoa_measurements'] = list(span)
span = bytes()
return ExtendedMacDlTDoASessionInfoNtf(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.no_of_ranging_measurements > 255:
print(f"Invalid value for field ExtendedMacDlTDoASessionInfoNtf::no_of_ranging_measurements: {self.no_of_ranging_measurements} > 255; the value will be truncated")
self.no_of_ranging_measurements &= 255
_span.append((self.no_of_ranging_measurements << 0))
_span.extend(self.dl_tdoa_measurements)
return SessionInfoNtf.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.dl_tdoa_measurements) * 1 + 1
@dataclass
class ShortMacOwrAoaSessionInfoNtf(SessionInfoNtf):
owr_aoa_ranging_measurements: List[ShortAddressOwrAoaRangingMeasurement] = field(kw_only=True, default_factory=list)
vendor_data: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
self.ranging_measurement_type = RangingMeasurementType.OWR_AOA
self.mac_address_indicator = MacAddressIndicator.SHORT_ADDRESS
self.mt = MessageType.NOTIFICATION
self.oid = SessionControlOpcodeId.START
self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['ShortMacOwrAoaSessionInfoNtf', bytes]:
if fields['ranging_measurement_type'] != RangingMeasurementType.OWR_AOA or fields['mac_address_indicator'] != MacAddressIndicator.SHORT_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
owr_aoa_ranging_measurements_count = span[0]
span = span[1:]
if len(span) < owr_aoa_ranging_measurements_count * 13:
raise Exception('Invalid packet size')
owr_aoa_ranging_measurements = []
for n in range(owr_aoa_ranging_measurements_count):
owr_aoa_ranging_measurements.append(ShortAddressOwrAoaRangingMeasurement.parse_all(span[n * 13:(n + 1) * 13]))
fields['owr_aoa_ranging_measurements'] = owr_aoa_ranging_measurements
span = span[owr_aoa_ranging_measurements_count * 13:]
fields['vendor_data'] = list(span)
span = bytes()
return ShortMacOwrAoaSessionInfoNtf(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if len(self.owr_aoa_ranging_measurements) > 255:
print(f"Invalid length for field ShortMacOwrAoaSessionInfoNtf::owr_aoa_ranging_measurements: {len(self.owr_aoa_ranging_measurements)} > 255; the array will be truncated")
del self.owr_aoa_ranging_measurements[255:]
_span.append((len(self.owr_aoa_ranging_measurements) << 0))
for _elt in self.owr_aoa_ranging_measurements:
_span.extend(_elt.serialize())
_span.extend(self.vendor_data)
return SessionInfoNtf.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1 + (
sum([elt.size for elt in self.owr_aoa_ranging_measurements]) +
len(self.vendor_data) * 1
)
@dataclass
class ExtendedMacOwrAoaSessionInfoNtf(SessionInfoNtf):
owr_aoa_ranging_measurements: List[ExtendedAddressOwrAoaRangingMeasurement] = field(kw_only=True, default_factory=list)
vendor_data: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
self.ranging_measurement_type = RangingMeasurementType.OWR_AOA
self.mac_address_indicator = MacAddressIndicator.EXTENDED_ADDRESS
self.mt = MessageType.NOTIFICATION
self.oid = SessionControlOpcodeId.START
self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['ExtendedMacOwrAoaSessionInfoNtf', bytes]:
if fields['ranging_measurement_type'] != RangingMeasurementType.OWR_AOA or fields['mac_address_indicator'] != MacAddressIndicator.EXTENDED_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
owr_aoa_ranging_measurements_count = span[0]
span = span[1:]
if len(span) < owr_aoa_ranging_measurements_count * 19:
raise Exception('Invalid packet size')
owr_aoa_ranging_measurements = []
for n in range(owr_aoa_ranging_measurements_count):
owr_aoa_ranging_measurements.append(ExtendedAddressOwrAoaRangingMeasurement.parse_all(span[n * 19:(n + 1) * 19]))
fields['owr_aoa_ranging_measurements'] = owr_aoa_ranging_measurements
span = span[owr_aoa_ranging_measurements_count * 19:]
fields['vendor_data'] = list(span)
span = bytes()
return ExtendedMacOwrAoaSessionInfoNtf(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if len(self.owr_aoa_ranging_measurements) > 255:
print(f"Invalid length for field ExtendedMacOwrAoaSessionInfoNtf::owr_aoa_ranging_measurements: {len(self.owr_aoa_ranging_measurements)} > 255; the array will be truncated")
del self.owr_aoa_ranging_measurements[255:]
_span.append((len(self.owr_aoa_ranging_measurements) << 0))
for _elt in self.owr_aoa_ranging_measurements:
_span.extend(_elt.serialize())
_span.extend(self.vendor_data)
return SessionInfoNtf.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1 + (
sum([elt.size for elt in self.owr_aoa_ranging_measurements]) +
len(self.vendor_data) * 1
)
@dataclass
class SessionStopCmd(SessionControlPacket):
session_id: int = field(kw_only=True, default=0)
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = SessionControlOpcodeId.STOP
self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionStopCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionControlOpcodeId.STOP or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 4:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_id'] = value_
span = span[4:]
return SessionStopCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_id > 4294967295:
print(f"Invalid value for field SessionStopCmd::session_id: {self.session_id} > 4294967295; the value will be truncated")
self.session_id &= 4294967295
_span.extend(int.to_bytes((self.session_id << 0), length=4, byteorder='little'))
return SessionControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 4
@dataclass
class SessionStopRsp(SessionControlPacket):
status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = SessionControlOpcodeId.STOP
self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionStopRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionControlOpcodeId.STOP or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
span = span[1:]
return SessionStopRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
return SessionControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1
@dataclass
class SessionGetRangingCountCmd(SessionControlPacket):
session_id: int = field(kw_only=True, default=0)
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = SessionControlOpcodeId.GET_RANGING_COUNT
self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionGetRangingCountCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionControlOpcodeId.GET_RANGING_COUNT or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 4:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_id'] = value_
span = span[4:]
return SessionGetRangingCountCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_id > 4294967295:
print(f"Invalid value for field SessionGetRangingCountCmd::session_id: {self.session_id} > 4294967295; the value will be truncated")
self.session_id &= 4294967295
_span.extend(int.to_bytes((self.session_id << 0), length=4, byteorder='little'))
return SessionControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 4
@dataclass
class SessionGetRangingCountRsp(SessionControlPacket):
status: Status = field(kw_only=True, default=Status.OK)
count: int = field(kw_only=True, default=0)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = SessionControlOpcodeId.GET_RANGING_COUNT
self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionGetRangingCountRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionControlOpcodeId.GET_RANGING_COUNT or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 5:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
value_ = int.from_bytes(span[1:5], byteorder='little')
fields['count'] = value_
span = span[5:]
return SessionGetRangingCountRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
if self.count > 4294967295:
print(f"Invalid value for field SessionGetRangingCountRsp::count: {self.count} > 4294967295; the value will be truncated")
self.count &= 4294967295
_span.extend(int.to_bytes((self.count << 0), length=4, byteorder='little'))
return SessionControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 5
@dataclass
class AndroidGetPowerStatsCmd(AndroidPacket):
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = AndroidOpcodeId.GET_POWER_STATS
self.gid = GroupId.VENDOR_ANDROID
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['AndroidGetPowerStatsCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != AndroidOpcodeId.GET_POWER_STATS or fields['gid'] != GroupId.VENDOR_ANDROID:
raise Exception("Invalid constraint field values")
return AndroidGetPowerStatsCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
return AndroidPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 0
@dataclass
class PowerStats(Packet):
status: Status = field(kw_only=True, default=Status.OK)
idle_time_ms: int = field(kw_only=True, default=0)
tx_time_ms: int = field(kw_only=True, default=0)
rx_time_ms: int = field(kw_only=True, default=0)
total_wake_count: int = field(kw_only=True, default=0)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['PowerStats', bytes]:
fields = {'payload': None}
if len(span) < 17:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
value_ = int.from_bytes(span[1:5], byteorder='little')
fields['idle_time_ms'] = value_
value_ = int.from_bytes(span[5:9], byteorder='little')
fields['tx_time_ms'] = value_
value_ = int.from_bytes(span[9:13], byteorder='little')
fields['rx_time_ms'] = value_
value_ = int.from_bytes(span[13:17], byteorder='little')
fields['total_wake_count'] = value_
span = span[17:]
return PowerStats(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
if self.idle_time_ms > 4294967295:
print(f"Invalid value for field PowerStats::idle_time_ms: {self.idle_time_ms} > 4294967295; the value will be truncated")
self.idle_time_ms &= 4294967295
_span.extend(int.to_bytes((self.idle_time_ms << 0), length=4, byteorder='little'))
if self.tx_time_ms > 4294967295:
print(f"Invalid value for field PowerStats::tx_time_ms: {self.tx_time_ms} > 4294967295; the value will be truncated")
self.tx_time_ms &= 4294967295
_span.extend(int.to_bytes((self.tx_time_ms << 0), length=4, byteorder='little'))
if self.rx_time_ms > 4294967295:
print(f"Invalid value for field PowerStats::rx_time_ms: {self.rx_time_ms} > 4294967295; the value will be truncated")
self.rx_time_ms &= 4294967295
_span.extend(int.to_bytes((self.rx_time_ms << 0), length=4, byteorder='little'))
if self.total_wake_count > 4294967295:
print(f"Invalid value for field PowerStats::total_wake_count: {self.total_wake_count} > 4294967295; the value will be truncated")
self.total_wake_count &= 4294967295
_span.extend(int.to_bytes((self.total_wake_count << 0), length=4, byteorder='little'))
return bytes(_span)
@property
def size(self) -> int:
return 17
@dataclass
class AndroidGetPowerStatsRsp(AndroidPacket):
stats: PowerStats = field(kw_only=True, default_factory=PowerStats)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = AndroidOpcodeId.GET_POWER_STATS
self.gid = GroupId.VENDOR_ANDROID
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['AndroidGetPowerStatsRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != AndroidOpcodeId.GET_POWER_STATS or fields['gid'] != GroupId.VENDOR_ANDROID:
raise Exception("Invalid constraint field values")
if len(span) < 17:
raise Exception('Invalid packet size')
fields['stats'] = PowerStats.parse_all(span[0:17])
span = span[17:]
return AndroidGetPowerStatsRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.extend(self.stats.serialize())
return AndroidPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 17
@dataclass
class AndroidSetCountryCodeCmd(AndroidPacket):
country_code: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
self.mt = MessageType.COMMAND
self.oid = AndroidOpcodeId.SET_COUNTRY_CODE
self.gid = GroupId.VENDOR_ANDROID
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['AndroidSetCountryCodeCmd', bytes]:
if fields['mt'] != MessageType.COMMAND or fields['oid'] != AndroidOpcodeId.SET_COUNTRY_CODE or fields['gid'] != GroupId.VENDOR_ANDROID:
raise Exception("Invalid constraint field values")
if len(span) < 2:
raise Exception('Invalid packet size')
fields['country_code'] = list(span[:2])
span = span[2:]
return AndroidSetCountryCodeCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.extend(self.country_code)
return AndroidPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 2
@dataclass
class AndroidSetCountryCodeRsp(AndroidPacket):
status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
self.mt = MessageType.RESPONSE
self.oid = AndroidOpcodeId.SET_COUNTRY_CODE
self.gid = GroupId.VENDOR_ANDROID
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['AndroidSetCountryCodeRsp', bytes]:
if fields['mt'] != MessageType.RESPONSE or fields['oid'] != AndroidOpcodeId.SET_COUNTRY_CODE or fields['gid'] != GroupId.VENDOR_ANDROID:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
fields['status'] = Status.from_int(span[0])
span = span[1:]
return AndroidSetCountryCodeRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
return AndroidPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1
class FrameReportTlvType(enum.IntEnum):
RSSI = 0x0
AOA = 0x1
CIR = 0x2
@staticmethod
def from_int(v: int) -> Union[int, 'FrameReportTlvType']:
try:
return FrameReportTlvType(v)
except ValueError as exn:
raise exn
@dataclass
class FrameReportTlv(Packet):
t: FrameReportTlvType = field(kw_only=True, default=FrameReportTlvType.RSSI)
v: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['FrameReportTlv', bytes]:
fields = {'payload': None}
if len(span) < 3:
raise Exception('Invalid packet size')
fields['t'] = FrameReportTlvType.from_int(span[0])
value_ = int.from_bytes(span[1:3], byteorder='little')
v_size = value_
span = span[3:]
if len(span) < v_size:
raise Exception('Invalid packet size')
fields['v'] = list(span[:v_size])
span = span[v_size:]
return FrameReportTlv(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.t << 0))
_span.extend(int.to_bytes(((len(self.v) * 1) << 0), length=2, byteorder='little'))
_span.extend(self.v)
return bytes(_span)
@property
def size(self) -> int:
return len(self.v) * 1 + 3
@dataclass
class FrameReportTlvPacket(Packet):
t: FrameReportTlvType = field(kw_only=True, default=FrameReportTlvType.RSSI)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['FrameReportTlvPacket', bytes]:
fields = {'payload': None}
if len(span) < 3:
raise Exception('Invalid packet size')
fields['t'] = FrameReportTlvType.from_int(span[0])
value_ = int.from_bytes(span[1:3], byteorder='little')
_body__size = value_
span = span[3:]
if len(span) < _body__size:
raise Exception('Invalid packet size')
payload = span[:_body__size]
span = span[_body__size:]
fields['payload'] = payload
try:
return Rssi.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return Aoa.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return Cir.parse(fields.copy(), payload)
except Exception as exn:
pass
return FrameReportTlvPacket(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.t << 0))
_payload_size = len(payload or self.payload or [])
if _payload_size > 65535:
print(f"Invalid length for payload field: {_payload_size} > 65535; the packet cannot be generated")
raise Exception("Invalid payload length")
_span.extend(int.to_bytes((_payload_size << 0), length=2, byteorder='little'))
_span.extend(payload or self.payload or [])
return bytes(_span)
@property
def size(self) -> int:
return len(self.payload) + 3
@dataclass
class Rssi(FrameReportTlvPacket):
rssi: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
self.t = FrameReportTlvType.RSSI
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['Rssi', bytes]:
if fields['t'] != FrameReportTlvType.RSSI:
raise Exception("Invalid constraint field values")
fields['rssi'] = list(span)
span = bytes()
return Rssi(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.extend(self.rssi)
return FrameReportTlvPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.rssi) * 1
@dataclass
class AoaMeasurement(Packet):
tdoa: int = field(kw_only=True, default=0)
pdoa: int = field(kw_only=True, default=0)
aoa: int = field(kw_only=True, default=0)
fom: int = field(kw_only=True, default=0)
t: int = field(kw_only=True, default=0)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['AoaMeasurement', bytes]:
fields = {'payload': None}
if len(span) < 8:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:2], byteorder='little')
fields['tdoa'] = value_
value_ = int.from_bytes(span[2:4], byteorder='little')
fields['pdoa'] = value_
value_ = int.from_bytes(span[4:6], byteorder='little')
fields['aoa'] = value_
fields['fom'] = span[6]
fields['t'] = span[7]
span = span[8:]
return AoaMeasurement(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.tdoa > 65535:
print(f"Invalid value for field AoaMeasurement::tdoa: {self.tdoa} > 65535; the value will be truncated")
self.tdoa &= 65535
_span.extend(int.to_bytes((self.tdoa << 0), length=2, byteorder='little'))
if self.pdoa > 65535:
print(f"Invalid value for field AoaMeasurement::pdoa: {self.pdoa} > 65535; the value will be truncated")
self.pdoa &= 65535
_span.extend(int.to_bytes((self.pdoa << 0), length=2, byteorder='little'))
if self.aoa > 65535:
print(f"Invalid value for field AoaMeasurement::aoa: {self.aoa} > 65535; the value will be truncated")
self.aoa &= 65535
_span.extend(int.to_bytes((self.aoa << 0), length=2, byteorder='little'))
if self.fom > 255:
print(f"Invalid value for field AoaMeasurement::fom: {self.fom} > 255; the value will be truncated")
self.fom &= 255
_span.append((self.fom << 0))
if self.t > 255:
print(f"Invalid value for field AoaMeasurement::t: {self.t} > 255; the value will be truncated")
self.t &= 255
_span.append((self.t << 0))
return bytes(_span)
@property
def size(self) -> int:
return 8
@dataclass
class Aoa(FrameReportTlvPacket):
aoa: List[AoaMeasurement] = field(kw_only=True, default_factory=list)
def __post_init__(self):
self.t = FrameReportTlvType.AOA
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['Aoa', bytes]:
if fields['t'] != FrameReportTlvType.AOA:
raise Exception("Invalid constraint field values")
if len(span) % 8 != 0:
raise Exception('Array size is not a multiple of the element size')
aoa_count = int(len(span) / 8)
aoa = []
for n in range(aoa_count):
aoa.append(AoaMeasurement.parse_all(span[n * 8:(n + 1) * 8]))
fields['aoa'] = aoa
span = bytes()
return Aoa(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
for _elt in self.aoa:
_span.extend(_elt.serialize())
return FrameReportTlvPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return sum([elt.size for elt in self.aoa])
@dataclass
class CirValue(Packet):
first_path_index: int = field(kw_only=True, default=0)
first_path_snr: int = field(kw_only=True, default=0)
first_path_ns: int = field(kw_only=True, default=0)
peak_path_index: int = field(kw_only=True, default=0)
peak_path_snr: int = field(kw_only=True, default=0)
peak_path_ns: int = field(kw_only=True, default=0)
first_path_sample_offset: int = field(kw_only=True, default=0)
samples_number: int = field(kw_only=True, default=0)
sample_window: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['CirValue', bytes]:
fields = {'payload': None}
if len(span) < 16:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:2], byteorder='little')
fields['first_path_index'] = value_
value_ = int.from_bytes(span[2:4], byteorder='little')
fields['first_path_snr'] = value_
value_ = int.from_bytes(span[4:6], byteorder='little')
fields['first_path_ns'] = value_
value_ = int.from_bytes(span[6:8], byteorder='little')
fields['peak_path_index'] = value_
value_ = int.from_bytes(span[8:10], byteorder='little')
fields['peak_path_snr'] = value_
value_ = int.from_bytes(span[10:12], byteorder='little')
fields['peak_path_ns'] = value_
fields['first_path_sample_offset'] = span[12]
fields['samples_number'] = span[13]
value_ = int.from_bytes(span[14:16], byteorder='little')
sample_window_size = value_
span = span[16:]
if len(span) < sample_window_size:
raise Exception('Invalid packet size')
fields['sample_window'] = list(span[:sample_window_size])
span = span[sample_window_size:]
return CirValue(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.first_path_index > 65535:
print(f"Invalid value for field CirValue::first_path_index: {self.first_path_index} > 65535; the value will be truncated")
self.first_path_index &= 65535
_span.extend(int.to_bytes((self.first_path_index << 0), length=2, byteorder='little'))
if self.first_path_snr > 65535:
print(f"Invalid value for field CirValue::first_path_snr: {self.first_path_snr} > 65535; the value will be truncated")
self.first_path_snr &= 65535
_span.extend(int.to_bytes((self.first_path_snr << 0), length=2, byteorder='little'))
if self.first_path_ns > 65535:
print(f"Invalid value for field CirValue::first_path_ns: {self.first_path_ns} > 65535; the value will be truncated")
self.first_path_ns &= 65535
_span.extend(int.to_bytes((self.first_path_ns << 0), length=2, byteorder='little'))
if self.peak_path_index > 65535:
print(f"Invalid value for field CirValue::peak_path_index: {self.peak_path_index} > 65535; the value will be truncated")
self.peak_path_index &= 65535
_span.extend(int.to_bytes((self.peak_path_index << 0), length=2, byteorder='little'))
if self.peak_path_snr > 65535:
print(f"Invalid value for field CirValue::peak_path_snr: {self.peak_path_snr} > 65535; the value will be truncated")
self.peak_path_snr &= 65535
_span.extend(int.to_bytes((self.peak_path_snr << 0), length=2, byteorder='little'))
if self.peak_path_ns > 65535:
print(f"Invalid value for field CirValue::peak_path_ns: {self.peak_path_ns} > 65535; the value will be truncated")
self.peak_path_ns &= 65535
_span.extend(int.to_bytes((self.peak_path_ns << 0), length=2, byteorder='little'))
if self.first_path_sample_offset > 255:
print(f"Invalid value for field CirValue::first_path_sample_offset: {self.first_path_sample_offset} > 255; the value will be truncated")
self.first_path_sample_offset &= 255
_span.append((self.first_path_sample_offset << 0))
if self.samples_number > 255:
print(f"Invalid value for field CirValue::samples_number: {self.samples_number} > 255; the value will be truncated")
self.samples_number &= 255
_span.append((self.samples_number << 0))
_span.extend(int.to_bytes(((len(self.sample_window) * 1) << 0), length=2, byteorder='little'))
_span.extend(self.sample_window)
return bytes(_span)
@property
def size(self) -> int:
return len(self.sample_window) * 1 + 16
@dataclass
class Cir(FrameReportTlvPacket):
cir_value: List[CirValue] = field(kw_only=True, default_factory=list)
def __post_init__(self):
self.t = FrameReportTlvType.CIR
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['Cir', bytes]:
if fields['t'] != FrameReportTlvType.CIR:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
cir_value_count = span[0]
span = span[1:]
cir_value = []
for n in range(cir_value_count):
element, span = CirValue.parse(span)
cir_value.append(element)
fields['cir_value'] = cir_value
return Cir(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if len(self.cir_value) > 255:
print(f"Invalid length for field Cir::cir_value: {len(self.cir_value)} > 255; the array will be truncated")
del self.cir_value[255:]
_span.append((len(self.cir_value) << 0))
for _elt in self.cir_value:
_span.extend(_elt.serialize())
return FrameReportTlvPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return sum([elt.size for elt in self.cir_value]) + 1
@dataclass
class FrameReport(Packet):
uwb_msg_id: int = field(kw_only=True, default=0)
action: int = field(kw_only=True, default=0)
antenna_set: int = field(kw_only=True, default=0)
frame_report_tlvs: List[FrameReportTlv] = field(kw_only=True, default_factory=list)
def __post_init__(self):
pass
@staticmethod
def parse(span: bytes) -> Tuple['FrameReport', bytes]:
fields = {'payload': None}
if len(span) < 4:
raise Exception('Invalid packet size')
fields['uwb_msg_id'] = span[0]
fields['action'] = span[1]
fields['antenna_set'] = span[2]
frame_report_tlvs_count = span[3]
span = span[4:]
frame_report_tlvs = []
for n in range(frame_report_tlvs_count):
element, span = FrameReportTlv.parse(span)
frame_report_tlvs.append(element)
fields['frame_report_tlvs'] = frame_report_tlvs
return FrameReport(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.uwb_msg_id > 255:
print(f"Invalid value for field FrameReport::uwb_msg_id: {self.uwb_msg_id} > 255; the value will be truncated")
self.uwb_msg_id &= 255
_span.append((self.uwb_msg_id << 0))
if self.action > 255:
print(f"Invalid value for field FrameReport::action: {self.action} > 255; the value will be truncated")
self.action &= 255
_span.append((self.action << 0))
if self.antenna_set > 255:
print(f"Invalid value for field FrameReport::antenna_set: {self.antenna_set} > 255; the value will be truncated")
self.antenna_set &= 255
_span.append((self.antenna_set << 0))
if len(self.frame_report_tlvs) > 255:
print(f"Invalid length for field FrameReport::frame_report_tlvs: {len(self.frame_report_tlvs)} > 255; the array will be truncated")
del self.frame_report_tlvs[255:]
_span.append((len(self.frame_report_tlvs) << 0))
for _elt in self.frame_report_tlvs:
_span.extend(_elt.serialize())
return bytes(_span)
@property
def size(self) -> int:
return sum([elt.size for elt in self.frame_report_tlvs]) + 4
@dataclass
class AndroidRangeDiagnosticsNtf(AndroidPacket):
session_token: int = field(kw_only=True, default=0)
sequence_number: int = field(kw_only=True, default=0)
frame_reports: List[FrameReport] = field(kw_only=True, default_factory=list)
def __post_init__(self):
self.mt = MessageType.NOTIFICATION
self.oid = AndroidOpcodeId.FIRA_RANGE_DIAGNOSTICS
self.gid = GroupId.VENDOR_ANDROID
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['AndroidRangeDiagnosticsNtf', bytes]:
if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != AndroidOpcodeId.FIRA_RANGE_DIAGNOSTICS or fields['gid'] != GroupId.VENDOR_ANDROID:
raise Exception("Invalid constraint field values")
if len(span) < 9:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_token'] = value_
value_ = int.from_bytes(span[4:8], byteorder='little')
fields['sequence_number'] = value_
frame_reports_count = span[8]
span = span[9:]
frame_reports = []
for n in range(frame_reports_count):
element, span = FrameReport.parse(span)
frame_reports.append(element)
fields['frame_reports'] = frame_reports
return AndroidRangeDiagnosticsNtf(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_token > 4294967295:
print(f"Invalid value for field AndroidRangeDiagnosticsNtf::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
if self.sequence_number > 4294967295:
print(f"Invalid value for field AndroidRangeDiagnosticsNtf::sequence_number: {self.sequence_number} > 4294967295; the value will be truncated")
self.sequence_number &= 4294967295
_span.extend(int.to_bytes((self.sequence_number << 0), length=4, byteorder='little'))
if len(self.frame_reports) > 255:
print(f"Invalid length for field AndroidRangeDiagnosticsNtf::frame_reports: {len(self.frame_reports)} > 255; the array will be truncated")
del self.frame_reports[255:]
_span.append((len(self.frame_reports) << 0))
for _elt in self.frame_reports:
_span.extend(_elt.serialize())
return AndroidPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return sum([elt.size for elt in self.frame_reports]) + 9