Skip to content

In a Nutshell

Info

This project is part of the Secure Prescriptive Analytics Project. For more information visit the project website.

This package provides a wrapper for the message based communication of distributed applications. It can be used to implement a request-response pattern or a publish-subscribe pattern. It currently supports the following brokers:

  • MQTT
  • Kafka

It accomplishes this by providing a common interface for the message bus systems and abstracting the protocol for each broker. The package is designed for take advantage of the asnycio library and the async/await syntax.

Building an Application

Info

For more information see the examples in the examples section.

Configuration

Configs can be provided programmatically or loaded from an URL. To programmatically provide a config use the SocketProviderFactory.from_config method. To load a config from an URL use the SocketProviderFactory.from_url method. The following example shows available configurations for each broker:

Source code in spa_dat/protocol/mqtt.py
61
62
63
64
65
66
67
68
69
70
71
@dataclass
class MqttConfig:
    host: str
    port: int
    username = None
    password = None
    keepalive: int = 60
    qos: int = 0
    retain = False
    default_subscription_topics: list[str] | str | None = None  # if set to none no subscription will be made
    client_id: str | None = None
Source code in spa_dat/protocol/kafka.py
54
55
56
57
58
59
@dataclass
class KafkaConfig:
    bootstrap_servers: str
    default_subscription_topics: str | None | list[str] = None  # if set to none no subscription will be made
    group_id: str | None = None
    client_id: str = None

For more detailed examples see the examples section.

Defining a Producer or Consumer

Can be accomplished via decorators. Currently two decorators are supported: producer and application. The are implemented in the DistributionApplication class.

This class provides a simple interface for creating distributed applications. It allows to create multiple applications which are connected to the same message bus. It does this by providing decorators for creating applications and producers.

producer
producer(
    *,
    socket_provider: SocketProvider | None = None,
    state: Any = None,
    ressources: dict[str, SupportedContextManagers] = {}
)
application
application(
    topics: list[str] | str,
    *,
    socket_provider: SocketProvider | None = None,
    state: Any = None,
    ressources: dict[str, SupportedContextManagers] = {}
)

Application Context and State

The application context and state are injected into the callback functions. The state is used to store the state of the application and can be user defined. An example of the state can be viewed in the examples. The SocketProvider provides a common interface for the interaction patterns. It is used to to ineract with other applications.

Ressource Handling

All ressources (e.g. database connections, ...) are handled via ContextManager. The application supports the following types:

SupportedContextManagers module-attribute
SupportedContextManagers = Union[
    AbstractAsyncContextManager, AbstractContextManager
]

To use a Contextmanager, it is added the ressource dict in the decorator function. Context managers are automatically started and cleaned up after the application shuts down.

Supported Interaction Patterns

Bases: Protocol

Defines the interface for SPA applications to communicate with the message bus.

Source code in spa_dat/protocol/typedef.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class SpaSocket(Protocol):
    """
    Defines the interface for SPA applications to communicate with the message bus.
    """

    async def publish():
        raise NotImplementedError()

    async def subscribe():
        raise NotImplementedError()

    async def unsubscribe():
        raise NotImplementedError()

    async def request():
        raise NotImplementedError()

It is implemented for each broker.