In a Nutshell
Info
This project is part of the Secure Prescriptive Analytics Project. For more information visit the project website.
This package provides a wrapper for the message based communication of distributed applications. It can be used to implement a request-response pattern or a publish-subscribe pattern. It currently supports the following brokers:
- MQTT
- Kafka
It accomplishes this by providing a common interface for the message bus systems and abstracting the protocol for each broker. The package is designed for take advantage of the asnycio library and the async/await syntax.
Building an Application
Info
For more information see the examples in the examples section.
Configuration
Configs can be provided programmatically or loaded from an URL. To programmatically provide a config use the SocketProviderFactory.from_config method. To load a config from an URL use the SocketProviderFactory.from_url method. The following example shows available configurations for each broker:
Source code in spa_dat/protocol/mqtt.py
61 62 63 64 65 66 67 68 69 70 71 | |
Source code in spa_dat/protocol/kafka.py
54 55 56 57 58 59 | |
For more detailed examples see the examples section.
Defining a Producer or Consumer
Can be accomplished via decorators. Currently two decorators are supported: producer and application. The are implemented in the DistributionApplication class.
This class provides a simple interface for creating distributed applications. It allows to create multiple applications which are connected to the same message bus. It does this by providing decorators for creating applications and producers.
producer
producer(
*,
socket_provider: SocketProvider | None = None,
state: Any = None,
ressources: dict[str, SupportedContextManagers] = {}
)
application
application(
topics: list[str] | str,
*,
socket_provider: SocketProvider | None = None,
state: Any = None,
ressources: dict[str, SupportedContextManagers] = {}
)
Application Context and State
The application context and state are injected into the callback functions. The state is used to store the state of the application and can be user defined. An example of the state can be viewed in the examples. The SocketProvider provides a common interface for the interaction patterns. It is used to to ineract with other applications.
Ressource Handling
All ressources (e.g. database connections, ...) are handled via ContextManager. The application supports the following types:
module-attribute
SupportedContextManagers = Union[
AbstractAsyncContextManager, AbstractContextManager
]
To use a Contextmanager, it is added the ressource dict in the decorator function. Context managers are automatically started and cleaned up after the application shuts down.
Supported Interaction Patterns
Bases: Protocol
Defines the interface for SPA applications to communicate with the message bus.
Source code in spa_dat/protocol/typedef.py
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | |
It is implemented for each broker.