Skip to content

Reference

SPA Distributed Application Tools package.

application

application

AbstractApplication

Bases: ApplicationLifeCycle

This provides a simple class for implementing a distributed application. It connects to a given message bus and provides a services for handling messages.

It provides a simple way to initialize ressources and handle sockets. It does not provide a concret implementation for how the service behaves. This is left up to subclasses (see run_async method).

Attributes:

Name Type Description
async_callback

A callback which is called upon receiving a message.

config

The configuration for the message bus. Can be any of the supported config types.

ressources dict[str, SupportedContextManagers]

A list of context managers which are entered and exited during the livecycle

_queue_in Queue

A queue for receiving messages from the message bus.

Source code in spa_dat/application/application.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class AbstractApplication(ApplicationLifeCycle):
    """
    This provides a simple class for implementing a distributed application.
    It connects to a given message bus and provides a services for handling messages.

    It provides a simple way to initialize ressources and handle sockets. It does not provide a concret implementation
    for how the service behaves. This is left up to subclasses (see `run_async` method).

    Attributes:
        async_callback: A callback which is called upon receiving a message.
        config: The configuration for the message bus. Can be any of the supported config types.
        ressources: A list of context managers which are entered and exited during the livecycle
        _queue_in (asyncio.Queue): A queue for receiving messages from the message bus.
    """

    def __init__(
        self,
        async_callback: Union[ProducerCallback, ConsumerCallback],
        socket_provider: SocketProvider,
        state: Any = None,
        ressources: dict[str, SupportedContextManagers] = {},
    ) -> None:
        self.exit_stack = AsyncExitStack()
        self.callback = async_callback
        self.socket_provider = socket_provider

        self.state = state
        self.ressources: dict[str, SupportedContextManagers] = ressources

    def setup(self):
        """
        Method for initializing non async components/ressources of the application.
        """
        logger.info(f"Starting '{self.callback.__name__}'")
        self.socket = self.socket_provider.create_socket(None)

    def teardown(self):
        """
        Method for cleanup non async components/ressources of the application.
        """
        logger.info(f"Halting '{self.callback.__name__}'")

    def start(self):
        """
        Start the asyncronous application. This method blocks until the application is stopped.
        """
        asyncio.run(self.start_async(), debug=True)

    async def start_async(self):
        """
        Initialize the application and its ressources.
        """
        self.setup()
        async with self.exit_stack:
            # enter fixed context
            await self.exit_stack.enter_async_context(self.socket)

            # enter dynamic ressource context
            for ressource in self.ressources:
                if isinstance(ressource, AbstractAsyncContextManager):
                    await self.exit_stack.enter_async_context(ressource)
                else:
                    self.exit_stack.enter_context(ressource)

            # shut down after leaving context
            self.exit_stack.callback(self.teardown)

            # run the logic for this application
            await self.run_async()

    async def run_async(self):
        """
        Contains the logic how the applicaiton behaves (e.g. endless loop, etc.)
        """
        raise NotImplementedError()
run_async async
run_async()

Contains the logic how the applicaiton behaves (e.g. endless loop, etc.)

Source code in spa_dat/application/application.py
87
88
89
90
91
async def run_async(self):
    """
    Contains the logic how the applicaiton behaves (e.g. endless loop, etc.)
    """
    raise NotImplementedError()
setup
setup()

Method for initializing non async components/ressources of the application.

Source code in spa_dat/application/application.py
46
47
48
49
50
51
def setup(self):
    """
    Method for initializing non async components/ressources of the application.
    """
    logger.info(f"Starting '{self.callback.__name__}'")
    self.socket = self.socket_provider.create_socket(None)
start
start()

Start the asyncronous application. This method blocks until the application is stopped.

Source code in spa_dat/application/application.py
59
60
61
62
63
def start(self):
    """
    Start the asyncronous application. This method blocks until the application is stopped.
    """
    asyncio.run(self.start_async(), debug=True)
start_async async
start_async()

Initialize the application and its ressources.

Source code in spa_dat/application/application.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
async def start_async(self):
    """
    Initialize the application and its ressources.
    """
    self.setup()
    async with self.exit_stack:
        # enter fixed context
        await self.exit_stack.enter_async_context(self.socket)

        # enter dynamic ressource context
        for ressource in self.ressources:
            if isinstance(ressource, AbstractAsyncContextManager):
                await self.exit_stack.enter_async_context(ressource)
            else:
                self.exit_stack.enter_context(ressource)

        # shut down after leaving context
        self.exit_stack.callback(self.teardown)

        # run the logic for this application
        await self.run_async()
teardown
teardown()

Method for cleanup non async components/ressources of the application.

Source code in spa_dat/application/application.py
53
54
55
56
57
def teardown(self):
    """
    Method for cleanup non async components/ressources of the application.
    """
    logger.info(f"Halting '{self.callback.__name__}'")

ConsumerApplication

Bases: AbstractApplication

Source code in spa_dat/application/application.py
102
103
104
105
106
107
108
109
110
111
112
113
114
class ConsumerApplication(AbstractApplication):
    def setup(self):
        super().setup()
        self._queue_in = asyncio.Queue()
        self.socket = self.socket_provider.create_socket(self._queue_in)

    async def run_async(self):
        """
        Repeats the callback on each received message
        """
        while True:
            message = await self._queue_in.get()
            await self.callback(message=message, socket=self.socket, state=self.state, **self.ressources)
run_async async
run_async()

Repeats the callback on each received message

Source code in spa_dat/application/application.py
108
109
110
111
112
113
114
async def run_async(self):
    """
    Repeats the callback on each received message
    """
    while True:
        message = await self._queue_in.get()
        await self.callback(message=message, socket=self.socket, state=self.state, **self.ressources)

DistributedApplication

This class provides a simple interface for creating distributed applications. It allows to create multiple applications which are connected to the same message bus. It does this by providing decorators for creating applications and producers.

Source code in spa_dat/application/application.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
class DistributedApplication:
    """
    This class provides a simple interface for creating distributed applications. It allows to create multiple
    applications which are connected to the same message bus. It does this by providing decorators for creating
    applications and producers.
    """

    def __init__(self, default_socket_provider: SocketProvider) -> None:
        self.applications = []
        self.default_socket_provider = default_socket_provider

    def add_application(
        self,
        async_consumer_callback: ConsumerCallback,
        socket_provider: SocketProvider,
        state: Any = None,
        ressources: dict[str, SupportedContextManagers] = {},
    ):
        self.applications.append(
            ConsumerApplication(
                async_callback=async_consumer_callback,
                socket_provider=socket_provider,
                state=state,
                ressources=ressources,
            )
        )

    def add_producer_application(
        self,
        async_producer_callback: ProducerCallback,
        socket_provider: SocketProvider,
        state: Any = None,
        ressources: dict[str, SupportedContextManagers] = {},
    ):
        self.applications.append(
            ProducerApplication(
                async_callback=async_producer_callback,
                socket_provider=socket_provider,
                state=state,
                ressources=ressources,
            )
        )

    def application(
        self,
        topics: list[str] | str,
        *,
        socket_provider: SocketProvider | None = None,
        state: Any = None,
        ressources: dict[str, SupportedContextManagers] = {},
    ):
        socket_provider = socket_provider or self.default_socket_provider
        if socket_provider is None:
            raise ValueError("No socket provider found. Either set the default in the constructor or here!")

        socket_provider.overwrite_config(topics=topics)

        def inner(callback: ConsumerCallback):
            self.add_application(callback, socket_provider, state, ressources)

        return inner

    def producer(
        self,
        *,
        socket_provider: SocketProvider | None = None,
        state: Any = None,
        ressources: dict[str, SupportedContextManagers] = {},
    ):
        socket_provider = socket_provider or self.default_socket_provider
        if socket_provider is None:
            raise ValueError("No socket provider found. Either set the default in the constructor or here!")

        def inner(callback: ProducerCallback):
            self.add_producer_application(callback, socket_provider, state, ressources)

        return inner

    def start(self):
        asyncio.run(self.start_async())

    async def start_async(self):
        await asyncio.gather(*[app.start_async() for app in self.applications])

ProducerApplication

Bases: AbstractApplication

Source code in spa_dat/application/application.py
94
95
96
97
98
99
class ProducerApplication(AbstractApplication):
    async def run_async(self):
        """
        Starts the callback once and after it ends the producer is done.
        """
        await self.callback(socket=self.socket, state=self.state, **self.ressources)
run_async async
run_async()

Starts the callback once and after it ends the producer is done.

Source code in spa_dat/application/application.py
95
96
97
98
99
async def run_async(self):
    """
    Starts the callback once and after it ends the producer is done.
    """
    await self.callback(socket=self.socket, state=self.state, **self.ressources)

typedef

ApplicationLifeCycle

Bases: Protocol

Source code in spa_dat/application/typedef.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class ApplicationLifeCycle(Protocol):
    def setup(self):
        """
        Perform initialization tasks, such as setting up resources and dependencies
        """
        raise NotImplementedError()

    def start(self):
        """
        Execute your application - should block at this point
        """
        raise NotImplementedError()

    async def start_async(self):
        """
        Asyncronous method which starts the application itself, not blocking
        """
        raise NotImplementedError()

    async def run_async(self):
        """
        Asyncronous method which contains the logic of the application. (e.g. endless loop, etc.)
        """
        raise NotImplementedError()

    def teardown(self):
        """
        Handle cleanup
        """
        raise NotImplementedError()
run_async async
run_async()

Asyncronous method which contains the logic of the application. (e.g. endless loop, etc.)

Source code in spa_dat/application/typedef.py
36
37
38
39
40
async def run_async(self):
    """
    Asyncronous method which contains the logic of the application. (e.g. endless loop, etc.)
    """
    raise NotImplementedError()
setup
setup()

Perform initialization tasks, such as setting up resources and dependencies

Source code in spa_dat/application/typedef.py
18
19
20
21
22
def setup(self):
    """
    Perform initialization tasks, such as setting up resources and dependencies
    """
    raise NotImplementedError()
start
start()

Execute your application - should block at this point

Source code in spa_dat/application/typedef.py
24
25
26
27
28
def start(self):
    """
    Execute your application - should block at this point
    """
    raise NotImplementedError()
start_async async
start_async()

Asyncronous method which starts the application itself, not blocking

Source code in spa_dat/application/typedef.py
30
31
32
33
34
async def start_async(self):
    """
    Asyncronous method which starts the application itself, not blocking
    """
    raise NotImplementedError()
teardown
teardown()

Handle cleanup

Source code in spa_dat/application/typedef.py
42
43
44
45
46
def teardown(self):
    """
    Handle cleanup
    """
    raise NotImplementedError()

protocol

kafka

KafkaSocket

Bases: SpaSocket, AbstractAsyncContextManager

Source code in spa_dat/protocol/kafka.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class KafkaSocket(SpaSocket, AbstractAsyncContextManager):
    def __init__(
        self,
        config: KafkaConfig,
        message_queue: asyncio.Queue = asyncio.Queue(),
        message_decoder: MessageDecoder = _kafka_message_decoder,
        message_encoder: MessageEncoder = _kafka_message_encoder,
    ) -> None:
        self.config = config
        self.message_queue = message_queue
        self.message_decoder = message_decoder
        self.message_encoder = message_encoder
        self.client_id = config.client_id or str(uuid.uuid4())

        self.consumer = AIOKafkaConsumer(**KafkaSocket._build_consumer_config(config))
        self.admin_client = AIOKafkaAdminClient(**KafkaSocket._build_producer_config(config))
        self.producer = AIOKafkaProducer(**KafkaSocket._build_producer_config(config))

        self.reader_task = None

        # these are created by the client and deleted again
        self.managed_topics = set()
        if config.default_subscription_topics is not None:
            if isinstance(config.default_subscription_topics, str):
                self.managed_topics.update([config.default_subscription_topics])
            else:
                # we expect it to be a list or list like structure
                self.managed_topics.update(config.default_subscription_topics)

        # add topic for response messages
        self.managed_topics.add(self._get_ephemeral_response_topic())

    @staticmethod
    def _build_consumer_config(config: KafkaConfig) -> dict:
        return {
            "bootstrap_servers": config.bootstrap_servers,
            "group_id": config.group_id,
            "client_id": config.client_id,
        }

    @staticmethod
    def _build_producer_config(config) -> dict:
        return {
            "bootstrap_servers": config.bootstrap_servers,
            "client_id": config.client_id,
        }

    async def __aenter__(self):
        """Return `self` upon entering the runtime context."""
        await self.admin_client.start()
        await self.consumer.start()
        await self.producer.start()

        # spawn tasks which reads messages
        if self.reader_task is None:
            self.reader_task = asyncio.create_task(
                _read_messages(self.consumer, self.message_queue, self.message_decoder),
                name="task-kafka-reader",
            )

        # subscribe to default topic
        if self.managed_topics is not None:
            await self.admin_client.create_topics(
                [NewTopic(name=name, num_partitions=1, replication_factor=1) for name in self.managed_topics],
            )
        if self.config.default_subscription_topics is not None:
            await self.subscribe(self.config.default_subscription_topics)

        return self

    async def __aexit__(self, exc_type, exc_value, traceback):
        """Raise any exception triggered within the runtime context."""
        if self.reader_task is not None:
            self.reader_task.cancel()
            self.reader_task = None

        if self.managed_topics is not None:
            await self.admin_client.delete_topics(list(self.managed_topics))

        await self.consumer.stop()
        await self.producer.stop()
        await self.admin_client.close()
        return None

    async def publish(self, message: SpaMessage) -> None:
        await self.producer.send_and_wait(
            topic=message.topic,
            value=self.message_encoder(message),
        )

    async def subscribe(self, topic: list[str]) -> None:
        self.consumer.subscribe(topic)
        logger.info(f"Subscribed to topic: {topic}")

    async def unsubscribe(self, topic: list[str]) -> None:
        self.consumer.unsubscribe(topic)
        logger.info(f"Unsubscribed from topic: {topic}")

    def _get_ephemeral_response_topic(self) -> str:
        return f"{self.client_id}_request_response"

    async def request(self, message: SpaMessage) -> SpaMessage | None:
        """
        Publish a message and wait for a response. Returns none if no response was received / could not be parsed.
        """
        ephemeral_response_topic = self._get_ephemeral_response_topic()

        async with AIOKafkaConsumer(
            ephemeral_response_topic, **KafkaSocket._build_consumer_config(self.config)
        ) as consumer:
            # start listener for response
            listener = _read_response_message(consumer, self.message_decoder)

            # publish message and wait for response, set response topic
            message.response_topic = ephemeral_response_topic
            await self.publish(message)

            # wait for response
            response = await listener
        return response
__aenter__ async
__aenter__()

Return self upon entering the runtime context.

Source code in spa_dat/protocol/kafka.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
async def __aenter__(self):
    """Return `self` upon entering the runtime context."""
    await self.admin_client.start()
    await self.consumer.start()
    await self.producer.start()

    # spawn tasks which reads messages
    if self.reader_task is None:
        self.reader_task = asyncio.create_task(
            _read_messages(self.consumer, self.message_queue, self.message_decoder),
            name="task-kafka-reader",
        )

    # subscribe to default topic
    if self.managed_topics is not None:
        await self.admin_client.create_topics(
            [NewTopic(name=name, num_partitions=1, replication_factor=1) for name in self.managed_topics],
        )
    if self.config.default_subscription_topics is not None:
        await self.subscribe(self.config.default_subscription_topics)

    return self
__aexit__ async
__aexit__(exc_type, exc_value, traceback)

Raise any exception triggered within the runtime context.

Source code in spa_dat/protocol/kafka.py
132
133
134
135
136
137
138
139
140
141
142
143
144
async def __aexit__(self, exc_type, exc_value, traceback):
    """Raise any exception triggered within the runtime context."""
    if self.reader_task is not None:
        self.reader_task.cancel()
        self.reader_task = None

    if self.managed_topics is not None:
        await self.admin_client.delete_topics(list(self.managed_topics))

    await self.consumer.stop()
    await self.producer.stop()
    await self.admin_client.close()
    return None
request async
request(message: SpaMessage) -> SpaMessage | None

Publish a message and wait for a response. Returns none if no response was received / could not be parsed.

Source code in spa_dat/protocol/kafka.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
async def request(self, message: SpaMessage) -> SpaMessage | None:
    """
    Publish a message and wait for a response. Returns none if no response was received / could not be parsed.
    """
    ephemeral_response_topic = self._get_ephemeral_response_topic()

    async with AIOKafkaConsumer(
        ephemeral_response_topic, **KafkaSocket._build_consumer_config(self.config)
    ) as consumer:
        # start listener for response
        listener = _read_response_message(consumer, self.message_decoder)

        # publish message and wait for response, set response topic
        message.response_topic = ephemeral_response_topic
        await self.publish(message)

        # wait for response
        response = await listener
    return response

KafkaSocketProvider

Bases: SocketProvider

Source code in spa_dat/protocol/kafka.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class KafkaSocketProvider(SocketProvider):
    def __init__(
        self,
        config: KafkaConfig,
        message_decoder: MessageDecoder = _kafka_message_decoder,
        message_encoder: MessageEncoder = _kafka_message_encoder,
    ) -> None:
        self.config = config
        self.message_decoder = message_decoder
        self.message_encoder = message_encoder

    def overwrite_config(self, topics: str | list[str] | None = None, *kwargs) -> None:
        """
        Overwrites the given config from any defaults which were provided earlier. This is useful if you want to
        construct or change the default config
        """
        # normalize and set topics in config
        topics = topics or self.config.default_subscription_topics
        if topics is not None and isinstance(topics, str):
            topics = [topics]
        self.config.default_subscription_topics = topics

    def create_socket(self, queue: asyncio.Queue | None) -> KafkaSocket:
        return KafkaSocket(self.config, queue, self.message_decoder, self.message_encoder)
overwrite_config
overwrite_config(
    topics: str | list[str] | None = None,
    *kwargs: str | list[str] | None
) -> None

Overwrites the given config from any defaults which were provided earlier. This is useful if you want to construct or change the default config

Source code in spa_dat/protocol/kafka.py
195
196
197
198
199
200
201
202
203
204
def overwrite_config(self, topics: str | list[str] | None = None, *kwargs) -> None:
    """
    Overwrites the given config from any defaults which were provided earlier. This is useful if you want to
    construct or change the default config
    """
    # normalize and set topics in config
    topics = topics or self.config.default_subscription_topics
    if topics is not None and isinstance(topics, str):
        topics = [topics]
    self.config.default_subscription_topics = topics

mqtt

MqttSocket

Bases: SpaSocket, AbstractAsyncContextManager

Defines an interface for an mqtt broker. It implements all necessary methods from the SpaProtocol.

Source code in spa_dat/protocol/mqtt.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
class MqttSocket(SpaSocket, AbstractAsyncContextManager):
    """
    Defines an interface for an mqtt broker. It implements all necessary methods from the SpaProtocol.
    """

    def __init__(
        self,
        config: MqttConfig,
        message_queue: asyncio.Queue | None,
        message_decoder: MessageDecoder = _mqtt_message_decoder,
        message_encoder: MessageEncoder = _mqtt_message_encoder,
    ) -> None:
        self.config = config
        # if no message queue is given create an internal one (for later access,
        # it contains all messages which are received from the broker)
        self.message_queue = message_queue if message_queue is not None else asyncio.Queue()
        self.message_decoder = message_decoder
        self.message_encoder = message_encoder
        self._client_config = self.build_client_config()
        self.client = aiomqtt.Client(**self._client_config)
        self.reader_task = None

    def build_client_config(self, client_id: str | None = None) -> dict:
        """
        Build a client config for a new client. The client_id is overwritten from the default if specified.
        """
        return dict(
            hostname=self.config.host,
            port=self.config.port,
            keepalive=self.config.keepalive,
            client_id=self.config.client_id if client_id is None else client_id,
            username=self.config.username,
            password=self.config.password,
        )

    async def __aenter__(self):
        """Return `self` upon entering the runtime context."""
        await self.client.connect()

        # spawn tasks which reads messages
        if self.reader_task is None:
            self.reader_task = asyncio.create_task(
                _read_messages(self.client, self.message_queue, self.message_decoder),
                name="task-mqtt-reader",
            )

        # subscribe to default topic(s)
        if self.config.default_subscription_topics is not None:
            if isinstance(self.config.default_subscription_topics, str):
                await self.subscribe(self.config.default_subscription_topics)
            if isinstance(self.config.default_subscription_topics, list):
                for topic in self.config.default_subscription_topics:
                    await self.subscribe(topic)

        return self

    async def __aexit__(self, exc_type, exc_value, traceback):
        """Raise any exception triggered within the runtime context."""
        if self.reader_task is not None:
            self.reader_task.cancel()
            self.reader_task = None

        await self.client.disconnect()
        return None

    async def publish(self, message: SpaMessage) -> None:
        await self.client.publish(message.topic, payload=self.message_encoder(message), qos=self.config.qos)

    async def subscribe(self, topic: str) -> None:
        await self.client.subscribe(topic, self.config.qos)
        logger.info(f"Subscribed to topic: {topic}")

    async def unsubscribe(self, topic: str) -> None:
        await self.client.unsubscribe(topic)
        logger.info(f"Unsubscribed from topic: {topic}")

    def _get_ephemeral_response_topic(self, topic: str) -> str:
        return f"{topic}/request/{uuid.uuid4()}"

    async def request(self, message: SpaMessage) -> SpaMessage | None:
        """
        Publish a message and wait for a response. Returns none if no response was received / could not be parsed.
        """
        ephemeral_response_topic = self._get_ephemeral_response_topic(message.topic)

        # we must build a new client .. otherwise the background listener will receive the response
        config = self.build_client_config(client_id=f"{self.config.client_id}-response-{uuid.uuid4()}")
        async with aiomqtt.Client(**config) as client:
            await client.subscribe(ephemeral_response_topic)

            # start listener for response
            listener = _read_response_message(client, self.message_decoder)

            # publish message and wait for response, set response topic
            message.response_topic = ephemeral_response_topic

            await self.publish(message)

            # wait for response
            response = await listener
            await client.unsubscribe(ephemeral_response_topic)
        return response
__aenter__ async
__aenter__()

Return self upon entering the runtime context.

Source code in spa_dat/protocol/mqtt.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
async def __aenter__(self):
    """Return `self` upon entering the runtime context."""
    await self.client.connect()

    # spawn tasks which reads messages
    if self.reader_task is None:
        self.reader_task = asyncio.create_task(
            _read_messages(self.client, self.message_queue, self.message_decoder),
            name="task-mqtt-reader",
        )

    # subscribe to default topic(s)
    if self.config.default_subscription_topics is not None:
        if isinstance(self.config.default_subscription_topics, str):
            await self.subscribe(self.config.default_subscription_topics)
        if isinstance(self.config.default_subscription_topics, list):
            for topic in self.config.default_subscription_topics:
                await self.subscribe(topic)

    return self
__aexit__ async
__aexit__(exc_type, exc_value, traceback)

Raise any exception triggered within the runtime context.

Source code in spa_dat/protocol/mqtt.py
130
131
132
133
134
135
136
137
async def __aexit__(self, exc_type, exc_value, traceback):
    """Raise any exception triggered within the runtime context."""
    if self.reader_task is not None:
        self.reader_task.cancel()
        self.reader_task = None

    await self.client.disconnect()
    return None
build_client_config
build_client_config(client_id: str | None = None) -> dict

Build a client config for a new client. The client_id is overwritten from the default if specified.

Source code in spa_dat/protocol/mqtt.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def build_client_config(self, client_id: str | None = None) -> dict:
    """
    Build a client config for a new client. The client_id is overwritten from the default if specified.
    """
    return dict(
        hostname=self.config.host,
        port=self.config.port,
        keepalive=self.config.keepalive,
        client_id=self.config.client_id if client_id is None else client_id,
        username=self.config.username,
        password=self.config.password,
    )
request async
request(message: SpaMessage) -> SpaMessage | None

Publish a message and wait for a response. Returns none if no response was received / could not be parsed.

Source code in spa_dat/protocol/mqtt.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
async def request(self, message: SpaMessage) -> SpaMessage | None:
    """
    Publish a message and wait for a response. Returns none if no response was received / could not be parsed.
    """
    ephemeral_response_topic = self._get_ephemeral_response_topic(message.topic)

    # we must build a new client .. otherwise the background listener will receive the response
    config = self.build_client_config(client_id=f"{self.config.client_id}-response-{uuid.uuid4()}")
    async with aiomqtt.Client(**config) as client:
        await client.subscribe(ephemeral_response_topic)

        # start listener for response
        listener = _read_response_message(client, self.message_decoder)

        # publish message and wait for response, set response topic
        message.response_topic = ephemeral_response_topic

        await self.publish(message)

        # wait for response
        response = await listener
        await client.unsubscribe(ephemeral_response_topic)
    return response

MqttSocketProvider

Bases: SocketProvider

Defines an interface for a socket provider. A socket provider is a function which creates a socket and returns it.

Source code in spa_dat/protocol/mqtt.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class MqttSocketProvider(SocketProvider):
    """
    Defines an interface for a socket provider. A socket provider is a function which creates a socket and returns it.
    """

    def __init__(
        self,
        config: MqttConfig,
        message_decoder: MessageDecoder = _mqtt_message_decoder,
        message_encoder: MessageEncoder = _mqtt_message_encoder,
    ) -> None:
        self.config = config
        self.message_decoder = message_decoder
        self.message_encoder = message_encoder

    def overwrite_config(self, topics: str | list[str] | None = None, *kwargs) -> None:
        """
        Overwrites the given config from any defaults which were provided earlier. This is useful if you want to
        construct or change the default config
        """
        # normalize and set topics in config
        topics = topics or self.config.default_subscription_topics
        if topics is not None and isinstance(topics, str):
            topics = [topics]
        self.config.default_subscription_topics = topics

    def create_socket(
        self,
        queue: asyncio.Queue | None,
    ) -> None:
        return MqttSocket(self.config, queue, self.message_decoder, self.message_encoder)
overwrite_config
overwrite_config(
    topics: str | list[str] | None = None,
    *kwargs: str | list[str] | None
) -> None

Overwrites the given config from any defaults which were provided earlier. This is useful if you want to construct or change the default config

Source code in spa_dat/protocol/mqtt.py
193
194
195
196
197
198
199
200
201
202
def overwrite_config(self, topics: str | list[str] | None = None, *kwargs) -> None:
    """
    Overwrites the given config from any defaults which were provided earlier. This is useful if you want to
    construct or change the default config
    """
    # normalize and set topics in config
    topics = topics or self.config.default_subscription_topics
    if topics is not None and isinstance(topics, str):
        topics = [topics]
    self.config.default_subscription_topics = topics

typedef

SocketProvider

Bases: Protocol

A service provider is a class which creates a socket from a given configuration and returns it. It also allows to add a queue for communication

Source code in spa_dat/protocol/typedef.py
39
40
41
42
43
44
45
46
47
48
49
class SocketProvider(Protocol):
    """
    A service provider is a class which creates a socket from a given configuration and returns it.
    It also allows to add a queue for communication
    """

    def overwrite_config(self, topics: str | list[str] | None = None, *kwargs) -> None:
        raise NotImplementedError()

    def create_socket(self, queue: asyncio.Queue | None, topics: list[str] = None) -> SpaSocket:
        raise NotImplementedError()

SpaMessage

Bases: BaseModel

Defines the message for SPA applications

Source code in spa_dat/protocol/typedef.py
26
27
28
29
30
31
32
33
34
35
36
class SpaMessage(BaseModel):
    """
    Defines the message for SPA applications
    """

    payload: bytes
    topic: str
    content_type: str | None = None
    client_name: str | None = None
    response_topic: str | None = None
    timestamp: int = int(time.time())

SpaSocket

Bases: Protocol

Defines the interface for SPA applications to communicate with the message bus.

Source code in spa_dat/protocol/typedef.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class SpaSocket(Protocol):
    """
    Defines the interface for SPA applications to communicate with the message bus.
    """

    async def publish():
        raise NotImplementedError()

    async def subscribe():
        raise NotImplementedError()

    async def unsubscribe():
        raise NotImplementedError()

    async def request():
        raise NotImplementedError()

provider

SocketProviderFactory

Bases: Protocol

A service provider factory is a function which returns a service provider.

Source code in spa_dat/provider.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class SocketProviderFactory(Protocol):
    """
    A service provider factory is a function which returns a service provider.
    """

    @staticmethod
    def from_config(config: Configuration) -> SocketProvider:
        # load necessary providers
        match config:
            case MqttConfig():
                return MqttSocketProvider(config)
            case KafkaConfig():
                return KafkaSocketProvider(config)
            case _:
                raise NotImplementedError(f"Config type {type(config)} not supported")

    @staticmethod
    def from_url(url: str) -> SocketProvider:
        # 1. Parse url contents and contents to valid config
        # 2. Return socket via from_config
        raise NotImplementedError()