Python Distributed Task Queue - backed by Kafka
Kueue was born out of the lack of a library that leverages Kafka for distributed task processing
- simple
- intuitive api
- extensible
import time
from kueue import task, TaskExecutorConsumer, KueueConfig
KueueConfig(
kafka_bootstrap='localhost:9092'
)
@task(topic="my-topic")
def sleepy_task(sleep: int):
time.sleep(sleep)
print("done sleeping", sleep)
return sleep
sleepy_task.enqueue(args=(15,))
consumer = TaskExecutorConsumer(["my-topic"])
consumer.start()
# prints "done sleeping, 15"
pip install kueue
Install poetry and run poetry install
at the root of the repository. This should create a virtual environment with all the necessary python dependencies.
The test framework makes heavy use of pytest
fixtures in order to spin up full integration environment consisting of a kubernetes cluster using kind and pytest-kind and kafka using strimzi
# unit tests
pytest
# unit tests + integration tests
pytest --integration
MIT © Jose Rojas