Skip to content

Consumer

A consumer is triggered by a message and can then do arbitary work. It can also publish messages using the DistributedApplicationContext provided inside the method call.

The following shows a minimal example of such a consumer.

import logging

from spa_dat.application.application import DistributedApplication
from spa_dat.protocol.mqtt import MqttConfig
from spa_dat.protocol.typedef import SpaMessage
from spa_dat.provider import SocketProviderFactory

logging.basicConfig(level=logging.DEBUG)


socket_provider = SocketProviderFactory.from_config(MqttConfig(host="mqtt-dashboard.com", port=1883))
app = DistributedApplication(socket_provider)


@app.application("test/spa-dat")
async def consumer(
    message: SpaMessage,
    # socket: SpaSocket, <- not needed but available if communication is required
    **kwargs,
):
    logging.info(f"Received message: {message}")


app.start()
import logging

from spa_dat.application.application import DistributedApplication
from spa_dat.protocol.kafka import KafkaConfig
from spa_dat.protocol.typedef import SpaMessage
from spa_dat.provider import SocketProviderFactory

logging.basicConfig(level=logging.INFO)


socket_provider = SocketProviderFactory.from_config(KafkaConfig(bootstrap_servers="localhost:9092"))
app = DistributedApplication(socket_provider)


@app.application("spa-dat2")
async def consumer(
    message: SpaMessage,
    # socket: SpaSocket, <- not needed but available if communication is required
    **kwargs,
):
    logging.info(f"Received message: {message}")


app.start()