Skip to content

Request

import asyncio
import logging

from spa_dat.application.application import DistributedApplication
from spa_dat.protocol.mqtt import MqttConfig
from spa_dat.protocol.typedef import SpaMessage, SpaSocket
from spa_dat.provider import SocketProviderFactory

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)


socket_provider = SocketProviderFactory.from_config(MqttConfig(host="mqtt-dashboard.com", port=1883))
app = DistributedApplication(default_socket_provider=socket_provider)


@app.producer()
async def producer(socket: SpaSocket, **kwargs):
    for i in range(10):
        logger.info("Sending Request")
        response = await socket.request(
            SpaMessage(
                payload=f"Producer Message {i}",
                topic="test/spa-dat-producer",
            )
        )
        logging.info(f"Received Response: {response.payload}")


@app.application("test/spa-dat-producer")
async def consumer(message: SpaMessage, socket: SpaSocket, **kwargs):
    logger.info(f"Received Request: {message.payload}")

    # simulate long running request
    await asyncio.sleep(1)

    await socket.publish(
        SpaMessage(
            payload=f"Response Message for {message.payload}",
            topic=message.response_topic,
        )
    )


app.start()
import asyncio
import logging

from spa_dat.application.application import DistributedApplication
from spa_dat.protocol.kafka import KafkaConfig
from spa_dat.protocol.typedef import SpaMessage, SpaSocket
from spa_dat.provider import SocketProviderFactory

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)


socket_provider = SocketProviderFactory.from_config(KafkaConfig(bootstrap_servers="localhost:9092"))
app = DistributedApplication(default_socket_provider=socket_provider)


@app.application("test-spa-dat-producer")
async def consumer(message: SpaMessage, socket: SpaSocket, **kwargs):
    logger.info(f"Received Request: {message.payload}")

    # simulate long running request
    await asyncio.sleep(1)

    await socket.publish(
        SpaMessage(
            payload=f"Response Message for {message.payload}",
            topic=message.response_topic,
        )
    )


@app.producer()
async def producer(socket: SpaSocket, **kwargs):
    for i in range(10):
        logger.info("Sending Request")
        response = await socket.request(
            SpaMessage(
                payload=f"Producer Message {i}",
                topic="test-spa-dat-producer",
            )
        )
        logging.info(f"Received Response: {response.payload}")


app.start()