Skip to content

De/serialization and Validation

The Simple Management Protocol (SMP) Message base class.

T = TypeVar('T', bound='_MessageBase') module-attribute

_counter = itertools.count() module-attribute

logger = logging.getLogger(__name__) module-attribute

_MessageBase

Bases: ABC, BaseModel

The base class for SMP messages.

Source code in smp/message.py
class _MessageBase(ABC, BaseModel):
    """The base class for SMP messages."""

    model_config = ConfigDict(extra="forbid", frozen=True)

    _OP: ClassVar[smpheader.OP]
    _FLAGS: ClassVar[smpheader.Flag] = smpheader.Flag(0)
    _GROUP_ID: ClassVar[smpheader.GroupIdField]
    _COMMAND_ID: ClassVar[
        smpheader.AnyCommandId
        | smpheader.CommandId.ImageManagement
        | smpheader.CommandId.OSManagement
        | smpheader.CommandId.ShellManagement
        | smpheader.CommandId.Intercreate
        | smpheader.CommandId.FileManagement
    ]

    # This is is a dummy header that will be replaced in model_post_init
    header: smpheader.Header = None  # type: ignore
    version: smpheader.Version = smpheader.Version.V2
    sequence: int = None  # type: ignore
    smp_data: bytes = None  # type: ignore

    def __bytes__(self) -> bytes:
        return self.smp_data

    @property
    def BYTES(self) -> bytes:
        return self.smp_data

    @classmethod
    def loads(cls: Type[T], data: bytes) -> T:
        """Deserialize the SMP message."""
        message = cls(
            header=smpheader.Header.loads(data[: smpheader.Header.SIZE]),
            **cast(dict, cbor2.loads(data[smpheader.Header.SIZE :])),
            smp_data=data,
        )
        if message.header is None:  # pragma: no cover
            raise ValueError
        if message.header.group_id != cls._GROUP_ID:  # pragma: no cover
            raise SMPMismatchedGroupId(
                f"{cls.__name__} has {cls._GROUP_ID}, header has {message.header.group_id}"
            )
        return message

    @classmethod
    def load(cls: Type[T], header: smpheader.Header, data: dict) -> T:
        """Load an SMP header and CBOR dict."""
        if header.group_id != cls._GROUP_ID:  # pragma: no cover
            raise SMPMismatchedGroupId(
                f"{cls.__name__} has {cls._GROUP_ID}, header has {header.group_id}"
            )
        return cls(header=header, **data)

    def model_post_init(self, _: None) -> None:
        data_bytes = cbor2.dumps(
            self.model_dump(
                exclude_unset=True,
                exclude={'header', 'version', 'sequence', 'smp_data'},
                exclude_none=True,
            ),
            canonical=True,
        )
        if self.header is None:  # create the header
            object.__setattr__(
                self,
                'header',
                smpheader.Header(
                    op=self._OP,
                    version=self.version,
                    flags=smpheader.Flag(self._FLAGS),
                    length=len(data_bytes),
                    group_id=self._GROUP_ID,
                    sequence=next(_counter) % 0xFF if self.sequence is None else self.sequence,
                    command_id=self._COMMAND_ID,
                ),
            )
            object.__setattr__(self, 'sequence', self.header.sequence)
        else:  # validate the header and update version & sequence
            if self.smp_data is None and self.header.length != len(data_bytes):
                raise SMPMalformed(
                    f"header.length {self.header.length} != len(data_bytes) {len(data_bytes)}"
                )
            if self.sequence is not None:  # pragma: no cover
                raise ValueError(
                    f"{self.sequence=} {self.header.sequence=} "
                    "Do not use the sequence attribute when the header is provided."
                )
            object.__setattr__(self, 'sequence', self.header.sequence)
            if self.version != self.header.version:
                logger.warning(
                    f"Overriding {self.version=} with {self.header.version=} "
                    "from the provided header."
                )
            object.__setattr__(self, 'version', self.header.version)
        if self.smp_data is None:
            object.__setattr__(self, 'smp_data', bytes(self.header) + data_bytes)

model_config = ConfigDict(extra='forbid', frozen=True) class-attribute instance-attribute

_OP: smpheader.OP class-attribute

_FLAGS: smpheader.Flag = smpheader.Flag(0) class-attribute

_GROUP_ID: smpheader.GroupIdField class-attribute

_COMMAND_ID: smpheader.AnyCommandId | smpheader.CommandId.ImageManagement | smpheader.CommandId.OSManagement | smpheader.CommandId.ShellManagement | smpheader.CommandId.Intercreate | smpheader.CommandId.FileManagement class-attribute

header: smpheader.Header = None class-attribute instance-attribute

version: smpheader.Version = smpheader.Version.V2 class-attribute instance-attribute

sequence: int = None class-attribute instance-attribute

smp_data: bytes = None class-attribute instance-attribute

BYTES: bytes property

__bytes__() -> bytes

Source code in smp/message.py
def __bytes__(self) -> bytes:
    return self.smp_data

loads(data: bytes) -> T classmethod

Deserialize the SMP message.

Source code in smp/message.py
@classmethod
def loads(cls: Type[T], data: bytes) -> T:
    """Deserialize the SMP message."""
    message = cls(
        header=smpheader.Header.loads(data[: smpheader.Header.SIZE]),
        **cast(dict, cbor2.loads(data[smpheader.Header.SIZE :])),
        smp_data=data,
    )
    if message.header is None:  # pragma: no cover
        raise ValueError
    if message.header.group_id != cls._GROUP_ID:  # pragma: no cover
        raise SMPMismatchedGroupId(
            f"{cls.__name__} has {cls._GROUP_ID}, header has {message.header.group_id}"
        )
    return message

load(header: smpheader.Header, data: dict) -> T classmethod

Load an SMP header and CBOR dict.

Source code in smp/message.py
@classmethod
def load(cls: Type[T], header: smpheader.Header, data: dict) -> T:
    """Load an SMP header and CBOR dict."""
    if header.group_id != cls._GROUP_ID:  # pragma: no cover
        raise SMPMismatchedGroupId(
            f"{cls.__name__} has {cls._GROUP_ID}, header has {header.group_id}"
        )
    return cls(header=header, **data)

model_post_init(_: None) -> None

Source code in smp/message.py
def model_post_init(self, _: None) -> None:
    data_bytes = cbor2.dumps(
        self.model_dump(
            exclude_unset=True,
            exclude={'header', 'version', 'sequence', 'smp_data'},
            exclude_none=True,
        ),
        canonical=True,
    )
    if self.header is None:  # create the header
        object.__setattr__(
            self,
            'header',
            smpheader.Header(
                op=self._OP,
                version=self.version,
                flags=smpheader.Flag(self._FLAGS),
                length=len(data_bytes),
                group_id=self._GROUP_ID,
                sequence=next(_counter) % 0xFF if self.sequence is None else self.sequence,
                command_id=self._COMMAND_ID,
            ),
        )
        object.__setattr__(self, 'sequence', self.header.sequence)
    else:  # validate the header and update version & sequence
        if self.smp_data is None and self.header.length != len(data_bytes):
            raise SMPMalformed(
                f"header.length {self.header.length} != len(data_bytes) {len(data_bytes)}"
            )
        if self.sequence is not None:  # pragma: no cover
            raise ValueError(
                f"{self.sequence=} {self.header.sequence=} "
                "Do not use the sequence attribute when the header is provided."
            )
        object.__setattr__(self, 'sequence', self.header.sequence)
        if self.version != self.header.version:
            logger.warning(
                f"Overriding {self.version=} with {self.header.version=} "
                "from the provided header."
            )
        object.__setattr__(self, 'version', self.header.version)
    if self.smp_data is None:
        object.__setattr__(self, 'smp_data', bytes(self.header) + data_bytes)

Request

Bases: _MessageBase, ABC

Base class for SMP Requests.

Source code in smp/message.py
class Request(_MessageBase, ABC):
    """Base class for SMP Requests."""

ResponseType

Bases: IntEnum

An SMP Response to an SMP Request must be SUCCESS, ERROR_V1, or ERROR_V2.

Source code in smp/message.py
@unique
class ResponseType(IntEnum):
    """An SMP `Response` to an SMP `Request` must be `SUCCESS`, `ERROR_V1`, or `ERROR_V2`."""

    SUCCESS = 0
    ERROR_V1 = 1
    ERROR_V2 = 2

SUCCESS = 0 class-attribute instance-attribute

ERROR_V1 = 1 class-attribute instance-attribute

ERROR_V2 = 2 class-attribute instance-attribute

Response

Bases: _MessageBase, ABC

Base class for SMP Responses.

Source code in smp/message.py
class Response(_MessageBase, ABC):
    """Base class for SMP Responses."""

    RESPONSE_TYPE: ClassVar[ResponseType]

RESPONSE_TYPE: ResponseType class-attribute

ReadRequest

Bases: Request, ABC

A read request from an SMP client to an SMP server.

Source code in smp/message.py
class ReadRequest(Request, ABC):
    """A read request from an SMP client to an SMP server."""

    _OP = smpheader.OP.READ

_OP = smpheader.OP.READ class-attribute instance-attribute

ReadResponse

Bases: Response, ABC

A response from an SMP server to an SMP client read request.

Source code in smp/message.py
class ReadResponse(Response, ABC):
    """A response from an SMP server to an SMP client read request."""

    RESPONSE_TYPE = ResponseType.SUCCESS
    _OP = smpheader.OP.READ_RSP

RESPONSE_TYPE = ResponseType.SUCCESS class-attribute instance-attribute

_OP = smpheader.OP.READ_RSP class-attribute instance-attribute

WriteRequest

Bases: Request, ABC

A write request from an SMP client to an SMP server.

Source code in smp/message.py
class WriteRequest(Request, ABC):
    """A write request from an SMP client to an SMP server."""

    _OP = smpheader.OP.WRITE

_OP = smpheader.OP.WRITE class-attribute instance-attribute

WriteResponse

Bases: Response, ABC

A response from an SMP server to an SMP client write request.

Source code in smp/message.py
class WriteResponse(Response, ABC):
    """A response from an SMP server to an SMP client write request."""

    RESPONSE_TYPE = ResponseType.SUCCESS
    _OP = smpheader.OP.WRITE_RSP

RESPONSE_TYPE = ResponseType.SUCCESS class-attribute instance-attribute

_OP = smpheader.OP.WRITE_RSP class-attribute instance-attribute