Skip to content

taskiq-python/taskiq-faststream

Repository files navigation

Taskiq - FastStream

Tests status Package version downloads Supported Python versions GitHub


The current package is just a wrapper for FastStream objects to make them compatible with Taskiq library.

The main goal of it - provide FastStream with a great Taskiq tasks scheduling feature.

Installation

If you already have FastStream project to interact with your Message Broker, you can add scheduling to it by installing just a taskiq-faststream

pip install taskiq-faststream

If you starting with a clear project, you can specify taskiq-faststream broker by the following distributions:

pip install taskiq-faststream[rabbit]
# or
pip install taskiq-faststream[kafka]
# or
pip install taskiq-faststream[nats]

Usage

The package gives you two classes: AppWrapper and BrokerWrapper

These are just containers for the related FastStream objects to make them taskiq-compatible

To create scheduling tasks for your broker, just wrap it to BrokerWrapper and use it like a regular taskiq Broker.

# regular FastStream code
from faststream.nats import NatsBroker

broker = NatsBroker()

@broker.subscriber("test-subject")
async def handler(msg: str):
    print(msg)

# taskiq-faststream scheduling
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_faststream import BrokerWrapper, StreamScheduler

# wrap FastStream object
taskiq_broker = BrokerWrapper(broker)

# create periodic task
taskiq_broker.task(
    message="Hi!",
    # If you are using RabbitBroker, then you need to replace subject with queue.
    # If you are using KafkaBroker, then you need to replace subject with topic.
    subject="test-subject",
    schedule=[{
        "cron": "* * * * *",
    }],
)

# create scheduler object
scheduler = StreamScheduler(
    broker=taskiq_broker,
    sources=[LabelScheduleSource(taskiq_broker)],
)

To run the scheduler, just use the following command

taskiq scheduler module:scheduler

Also, you can wrap your FastStream application the same way (allows to use lifespan events and AsyncAPI documentation):

# regular FastStream code
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

@broker.subscriber("test-subject")
async def handler(msg: str):
    print(msg)

# wrap FastStream object
from taskiq_faststream import AppWrapper
taskiq_broker = AppWrapper(app)

# Code below omitted 👇

A little feature: instead of using a final message argument, you can set a message callback to collect information right before sending:

async def collect_information_to_send():
    return "Message to send"

taskiq_broker.task(
    message=collect_information_to_send,
    ...,
)

Also, you can send a multiple message by one task call just using generator message callback with yield

async def collect_information_to_send():
    """Sends 10 messages per task call."""
    for i in range(10):
        yield i

taskiq_broker.task(
    message=collect_information_to_send,
    ...,
)