Skip to content

Commit

Permalink
NatsJS release (#92, close #65)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Jul 3, 2023
1 parent dfe225b commit 432d7f6
Show file tree
Hide file tree
Showing 41 changed files with 1,184 additions and 111 deletions.
15 changes: 8 additions & 7 deletions .github/workflows/tests.yml
Expand Up @@ -44,7 +44,7 @@ jobs:
- 9324:9324

nats:
image: nats
image: diementros/nats:js
ports:
- 4222:4222

Expand Down Expand Up @@ -91,9 +91,10 @@ jobs:
runs-on: ubuntu-latest
services:
nats:
image: nats
image: diementros/nats:js
ports:
- 4222:4222

steps:
- uses: actions/checkout@v3
- name: Set up Python
Expand All @@ -107,7 +108,7 @@ jobs:
run: pip install -e .[testsuite,async-nats]
- run: mkdir coverage
- name: Test
run: coverage run -m pytest -m "run" -k test_run_nats_correct tests/cli/test_run.py
run: coverage run -m pytest -m "run and nats" tests/cli/test_run.py
env:
COVERAGE_FILE: coverage/.coverage.${{ runner.os }}-py${{ matrix.python-version }}
CONTEXT: ${{ runner.os }}-py${{ matrix.python-version }}
Expand Down Expand Up @@ -137,7 +138,7 @@ jobs:
run: pip install -e .[testsuite,async-rabbit]
- run: mkdir coverage
- name: Test
run: coverage run -m pytest -m "run" -k test_run_rabbit_correct tests/cli/test_run.py
run: coverage run -m pytest -m "run and rabbit" tests/cli/test_run.py
env:
COVERAGE_FILE: coverage/.coverage.${{ runner.os }}-py${{ matrix.python-version }}
CONTEXT: ${{ runner.os }}-py${{ matrix.python-version }}
Expand Down Expand Up @@ -178,7 +179,7 @@ jobs:
run: pip install -e .[testsuite,async-kafka]
- run: mkdir coverage
- name: Test
run: coverage run -m pytest -m "run" -k test_run_kafka_correct tests/cli/test_run.py
run: coverage run -m pytest -m "run and kafka" tests/cli/test_run.py
env:
COVERAGE_FILE: coverage/.coverage.${{ runner.os }}-py${{ matrix.python-version }}
CONTEXT: ${{ runner.os }}-py${{ matrix.python-version }}
Expand Down Expand Up @@ -208,7 +209,7 @@ jobs:
run: pip install -e .[testsuite,async-sqs]
- run: mkdir coverage
- name: Test
run: coverage run -m pytest -m "run" -k test_run_sqs_correct tests/cli/test_run.py
run: coverage run -m pytest -m "run and sqs" tests/cli/test_run.py
env:
COVERAGE_FILE: coverage/.coverage.${{ runner.os }}-py${{ matrix.python-version }}
CONTEXT: ${{ runner.os }}-py${{ matrix.python-version }}
Expand Down Expand Up @@ -238,7 +239,7 @@ jobs:
run: pip install -e .[testsuite,async-redis]
- run: mkdir coverage
- name: Test
run: coverage run -m pytest -m "run" -k test_run_redis_correct tests/cli/test_run.py
run: coverage run -m pytest -m "run and redis" tests/cli/test_run.py
env:
COVERAGE_FILE: coverage/.coverage.${{ runner.os }}-py${{ matrix.python-version }}
CONTEXT: ${{ runner.os }}-py${{ matrix.python-version }}
Expand Down
24 changes: 22 additions & 2 deletions README.md
Expand Up @@ -69,7 +69,7 @@ It is a modern, high-level framework on top of popular specific Python brokers l
| **Nats** | :heavy_check_mark: **stable** :heavy_check_mark: | :mag: planning :mag: |
| **Kafka** | :warning: **beta** :warning: | :mag: planning :mag: |
| **SQS** | :warning: **beta** :warning: | :mag: planning :mag: |
| **NatsJS** | :hammer_and_wrench: **WIP** :hammer_and_wrench: | :mag: planning :mag: |
| **NatsJS** | :warning: **beta** :warning: | :mag: planning :mag: |
| **MQTT** | :mag: planning :mag: | :mag: planning :mag: |
| **Redis Streams** | :mag: planning :mag: | :mag: planning :mag: |
| **Pulsar** | :mag: planning :mag: | :mag: planning :mag: |
Expand Down Expand Up @@ -232,7 +232,6 @@ and [more](https://github.com/Lancetnik/Propan/tree/main/examples/dependencies).
from propan import PropanApp, RabbitBroker, Context, Depends

rabbit_broker = RabbitBroker("amqp://guest:guest@localhost:5672/")

app = PropanApp(rabbit_broker)

async def dependency(user_id: int) -> bool:
Expand All @@ -248,6 +247,27 @@ async def base_handler(user_id: int,

---

## RPC over MQ

Also, **Propan** allows you to use **RPC** requests over your broker with a simple way:

```python
from propan import PropanApp, RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(rabbit_broker)

@broker.handle("ping")
async def base_handler():
return "pong"

@app.after_startup
async def self_ping():
assert (await broker.publish("", "ping")) == "pong"
```

---

## Project Documentation

**Propan** automatically generates documentation for your project according to the <a href="https://www.asyncapi.com/" target="_blank">**AsyncAPI**</a> specification. You can work with both generated artifacts and place a Web view of your documentation on resources available to related teams.
Expand Down
8 changes: 8 additions & 0 deletions docs/docs/en/CHANGELOG.md
@@ -1,5 +1,13 @@
# CHANGELOG

## 2023-07-03 **0.1.5.0** NastJS

This update adds **NATS JetStream**(a persistent layer of **NATS**) supporting.

Now you can work with this great broker without fear of losing messages, using the acknowledgment confirmation mechanism and the built-in `key-value` and `object` storages.

Also, some internal classes were changed to further create synchronous interfaces based on them.

## 2023-06-26 **0.1.4.0** PydanticV2

The main change in this update is the support for the **PydanticV2** beta version.
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/en/index.md
Expand Up @@ -120,7 +120,7 @@ This is the **Propan** declarative way to write the same code. That is so much e
| **Nats** | :heavy_check_mark: **stable** :heavy_check_mark: | :mag: planning :mag: |
| **Kafka** | :warning: **beta** :warning: | :mag: planning :mag: |
| **SQS** | :warning: **beta** :warning: | :mag: planning :mag: |
| **NatsJS** | :hammer_and_wrench: **WIP** :hammer_and_wrench: | :mag: planning :mag: |
| **NatsJS** | :warning: **beta** :warning: | :mag: planning :mag: |
| **MQTT** | :mag: planning :mag: | :mag: planning :mag: |
| **Redis Streams** | :mag: planning :mag: | :mag: planning :mag: |
| **Pulsar** | :mag: planning :mag: | :mag: planning :mag: |
Expand Down
30 changes: 30 additions & 0 deletions docs/docs/en/nats/4_nats-js.md
@@ -0,0 +1,30 @@
# NATS JetStream

The default **NATS** usage is suitable for scenarios where:

* `publisher` and `consumer` are always online;
* a system tolerate to the messages loss.

If you need stricter restrictions, like:

* an availability of a message processing confirmation mechanism (`ack`/`nack');
* message persistence (will accumulate in the queue when `consumer` is offline).

You should use the **NATS JetStream** extension.

In fact, the **JetStream** extension is the same **NATS** with the addition a persistent layer above the file system. Therefore, all interfaces for publishing and consuming messages are similar to the refular **NATS** usage.

However, the **JetStream** layer has many possibilities for configuration: from the deleting old messages policy to the maximum stored messages number limit. You can find out more about all **JetStream** features in the official [documentation](https://docs.nats.io/using-nats/developer/develop_jetstream ){.external-link target="_blank"}.

!!! tip ""
If you have worked with other message brokers, then you should know that the logic of **JS** is closer to **Kafka** than to **RabbitMQ**: messages after confirmation are not deleted from the queue, but remain there until the queue is full and it will start deleting old messages (or in accordance with other logic that you can configure yourself).

When connecting a `consumer` (and, especially, when reconnecting), you must determine for yourself, according to what logic it will consume messages: from the subject beginning, starting with some message, starting from some time, only new ones, etc. Don't be surprised if a connection is restored, your `consumer` starts to process all messages received earlier again - you haven't defined the rule.

Also **NATS JetStream** has built-in `key-value` (close to **Redis**) and `object` (close to **Minio**) storages, which, in addition to interface *put/get* have the ability to subscribe to events, which can be extremely useful in vary scenarios.

**Propan** does not provide access to this functionality directly, but it is covered by the [nats-py] library used({{urls.nats_py }}){target="_blank"}. You can access the **JS** object from the application context:

```python linenums="1" hl_lines="2 8"
{!> docs_src/nats/js.py !}
```
8 changes: 8 additions & 0 deletions docs/docs/ru/CHANGELOG.md
@@ -1,5 +1,13 @@
# CHANGELOG

## 2023-07-03 **0.1.5.0** NastJS

В этом обновлении добавлена и протестирована полная поддержка **NATS JetStream** - персистентного слоя **NATS**.

Теперь вы можете работать с этим великолепным брокером, не боясь потери сообщений, используя механизм подтверждения получения, а также встроенные `key-value` и `object` хранилища.

Также, в этом обновлении были изменены некоторые внутренние классы для дальнейшего создания на их основе синхронных интерфейсов.

## 2023-06-26 **0.1.4.0** PydanticV2

Основное изменение в этом обновлении - поддержка бета-версии **PydanticV2**.
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/ru/index.md
Expand Up @@ -123,7 +123,7 @@ async def base_handler(body):
| **Nats** | :heavy_check_mark: **stable** :heavy_check_mark: | :mag: planning :mag: |
| **Kafka** | :warning: **beta** :warning: | :mag: planning :mag: |
| **SQS** | :warning: **beta** :warning: | :mag: planning :mag: |
| **NatsJS** | :hammer_and_wrench: **WIP** :hammer_and_wrench: | :mag: planning :mag: |
| **NatsJS** | :warning: **beta** :warning: | :mag: planning :mag: |
| **MQTT** | :mag: planning :mag: | :mag: planning :mag: |
| **Redis Streams** | :mag: planning :mag: | :mag: planning :mag: |
| **Pulsar** | :mag: planning :mag: | :mag: planning :mag: |
Expand Down
31 changes: 31 additions & 0 deletions docs/docs/ru/nats/4_nats-js.md
@@ -0,0 +1,31 @@
# NATS JetStream

Обычное использование **NATS** идеально подходит для сценариев, в которых:

* `publisher` и `consumer` всегда находятся онлайн;
* система допускает потерю сообщений.

Если вам нужны более строгие ограничения, а именно:

* наличие механизма подтверждения обработки сообщений (`ack`/`nack`);
* персистентность сообщений (при отсутствии `consumer`'а сообщения будут накапливаться в очереди).

Вам следует использование расширение **NATS JetStream**.

На самом деле расширение **JetStream** - это тот же самый **NATS** с добавлением
персистентного слоя над файловой системой, который обеспечивает хранение сообщений в очереди. Поэтому все интерфейсы публикации и потребления сообщений аналогичны обычному использованию **NATS**.

Однако, сама логика работы слоя **JetStream** имеет множество возможностей для конфигурации: от политики удаления старых сообщений до ограничения на максимальное число хранимых сообщений. Подробно со всеми возможностями **JetStream** вы можете ознакомиться в официальной [документации](https://docs.nats.io/using-nats/developer/develop_jetstream){.external-link target="_blank"}.

!!! tip ""
Если вы работали с другими брокерами сообщений, то вам следует знать, что логика работы **JS** ближе к **Kafka**, нежели к **RabbitMQ**: сообщения после подтверждения их обработки не удаляются из очереди, а остаются там до тех пор, пока очередь не наполнится и не начнет удалять старые сообщения (либо в соответсвии с другой логикой, которую вы можете сконфигурировать сами).

При подключении `consumer`'а (и, особенно, при переподключении) вы должны сами определить, в соотвествии с какой логикой он будет потреблять сообщения: с самого начала, начиная с какого-то сообщения, начиная с какого-то времени, только новые и т.д. Не удивляйтесь, если при восстановлении соединения ваш `consumer` начнет заново обрабатывать все сообщения, полученные ранее - вы просто не определили это правило.

Также **NATS JetStream** имеет встроенное `key-value`(cхоже с **Redis**) и `object`(схоже с **Minio**) хранилища, которые, помимо своего базового интерфейса *положить/прочитать* имеют возможность подписки на события, что может быть крайне полезно во многих сценариях.

**Propan** не предоставляет доступ к этому функционалу напрямую, однако он покрывается используемой библиотекой [nats-py]({{ urls.nats_py }}){target="_blank"}. Доступ к объекту **JS** вы можете получить из контекста приложения:

```python linenums="1" hl_lines="2 8"
{!> docs_src/nats/js.py !}
```
23 changes: 23 additions & 0 deletions docs/docs_src/nats/js.py
@@ -0,0 +1,23 @@
from propan import PropanApp, NatsJSBroker
from propan.annotations import NatsJS

broker = NatsJSBroker()
app = PropanApp(broker)

@app.after_startup
async def example(js: NatsJS):
# JS Key-Value Storage
storage = await js.create_key_value(bucket="propan_kv")

await storage.put("hello", b"propan!")
assert (await storage.get("hello")) == b"propan!"

# JS Object Storage
storage = await js.create_object_sotre("propan-obs")

obj_name = "file.mp4"
with open(obj_name) as f:
await storage.put(obj_name, f)

with open(f"copy-{obj_name}") as f:
await storage.get(obj_name, f)
1 change: 1 addition & 0 deletions docs/mkdocs.yml
Expand Up @@ -159,6 +159,7 @@ nav:
- Examples:
- Direct: nats/3_examples/1_direct.md
- Pattern: nats/3_examples/2_pattern.md
- NatsJS: nats/4_nats-js.md
- Integrations: integrations/1_integrations-index.md
- FastAPI Plugin: integrations/2_fastapi-plugin.md
- Contributing:
Expand Down
2 changes: 1 addition & 1 deletion propan/__about__.py
Expand Up @@ -2,7 +2,7 @@

from unittest.mock import Mock

__version__ = "0.1.4.6"
__version__ = "0.1.5.0"


INSTALL_MESSAGE = (
Expand Down
5 changes: 3 additions & 2 deletions propan/__init__.py
Expand Up @@ -11,9 +11,9 @@
RabbitBroker = RabbitRouter = about.INSTALL_RABBIT # type: ignore

try:
from propan.brokers.nats import NatsBroker, NatsRouter
from propan.brokers.nats import NatsBroker, NatsJSBroker, NatsRouter
except ImportError:
NatsBroker = NatsRouter = about.INSTALL_NATS # type: ignore
NatsJSBroker = NatsBroker = NatsRouter = about.INSTALL_NATS # type: ignore

try:
from propan.brokers.redis import RedisBroker, RedisRouter
Expand Down Expand Up @@ -52,6 +52,7 @@
"PropanMessage",
## nats
"NatsBroker",
"NatsJSBroker",
"NatsRouter",
## rabbit
"RabbitBroker",
Expand Down
12 changes: 10 additions & 2 deletions propan/annotations.py
Expand Up @@ -30,14 +30,22 @@


try:
from nats.aio.client import Client
from nats.aio.msg import Msg
from nats.js.client import JetStreamContext

from propan.brokers.nats import NatsBroker as NB
from propan.brokers.nats import NatsJSBroker as NJB

NatsBroker = Annotated[NB, ContextField("broker")]
NatsJSBroker = Annotated[NJB, ContextField("broker")]
NatsMessage = Annotated[Msg, ContextField("message")]
NatsConnection = Annotated[Client, ContextField("broker._connection")]
NatsJS = Annotated[JetStreamContext, ContextField("broker._connection")]
except ImportError:
NatsBroker = NatsMessage = about.INSTALL_NATS
NatsBroker = (
NatsMessage
) = NatsJSBroker = NatsJS = NatsConnection = about.INSTALL_NATS


try:
Expand Down Expand Up @@ -79,7 +87,7 @@
assert any(
(
all((RabbitBroker, RabbitMessage, Channel)),
all((NatsBroker, NatsMessage)),
all((NatsBroker, NatsJSBroker, NatsJS, NatsMessage)),
all((RedisBroker, Redis)),
all((KafkaBroker, KafkaMessage, Producer)),
all((SQSBroker, client, queue_url)),
Expand Down
5 changes: 5 additions & 0 deletions propan/brokers/nats/__init__.py
@@ -1,8 +1,13 @@
from nats.js.api import DeliverPolicy

from propan.brokers.nats.nats_broker import NatsBroker, NatsMessage
from propan.brokers.nats.nats_js_broker import NatsJSBroker
from propan.brokers.nats.routing import NatsRouter

__all__ = (
"NatsBroker",
"NatsMessage",
"DeliverPolicy",
"NatsJSBroker",
"NatsRouter",
)
23 changes: 23 additions & 0 deletions propan/brokers/nats/consts.py
@@ -0,0 +1,23 @@
from nats.js.api import (
ConsumerConfig,
DeliverPolicy,
DiscardPolicy,
Placement,
RePublish,
RetentionPolicy,
StorageType,
StreamConfig,
StreamSource,
)

__all__ = (
"StreamConfig",
"RetentionPolicy",
"DiscardPolicy",
"Placement",
"StorageType",
"StreamSource",
"RePublish",
"ConsumerConfig",
"DeliverPolicy",
)

0 comments on commit 432d7f6

Please sign in to comment.