Skip to content

Commit

Permalink
feat: add pause consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
yordis committed Feb 27, 2024
1 parent 62e12a3 commit b001247
Showing 1 changed file with 35 additions and 3 deletions.
38 changes: 35 additions & 3 deletions lib/gnat/jetstream/api/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ defmodule Gnat.Jetstream.API.Consumer do
deliver_policy: :all,
max_ack_pending: 20_000,
max_deliver: -1,
replay_policy: :instant
replay_policy: :instant,
pause_until: nil
]

@type t :: %__MODULE__{
Expand Down Expand Up @@ -142,7 +143,8 @@ defmodule Gnat.Jetstream.API.Consumer do
opt_start_time: nil | DateTime.t(),
rate_limit_bps: nil | non_neg_integer(),
replay_policy: :instant | :original,
sample_freq: nil | binary()
sample_freq: nil | binary(),
pause_until: nil | DateTime.t()
}

@type info :: %{
Expand Down Expand Up @@ -177,7 +179,9 @@ defmodule Gnat.Jetstream.API.Consumer do
num_redelivered: non_neg_integer(),
num_waiting: non_neg_integer(),
push_bound: nil | boolean(),
stream_name: binary()
stream_name: binary(),
paused: boolean(),
pause_remaining: nil | Time.t() # TODO: Verify type here
}

@type config :: %{
Expand Down Expand Up @@ -275,6 +279,34 @@ defmodule Gnat.Jetstream.API.Consumer do
end
end

@doc """
Pause a consumer until a specific time.
## Examples
iex> {:ok, _response} = Gnat.Jetstream.API.Stream.create(:gnat, %Gnat.Jetstream.API.Stream{name: "astream", subjects: ["subject"]})
iex> {:ok, _response} = Gnat.Jetstream.API.Consumer.create(:gnat, %Gnat.Jetstream.API.Consumer{durable_name: "consumer", stream_name: "astream"})
iex> pause_until = DateTime.add(DateTime.utc_now(), 1, :hour)
iex> Gnat.Jetstream.API.Consumer.pause(:gnat, "astream", "consumer", pause_until)
:ok
"""
@spec pause(
conn :: Gnat.t(),
stream_name :: binary(),
consumer_name :: binary(),
pause_until :: DateTime.t(),
opts :: [domain: nil | binary()]
) ::
:ok | {:error, any()}
def pause(conn, stream_name, consumer_name, pause_until, opts \\ []) do
topic = "#{js_api(opts[:domain])}.CONSUMER.PAUSE.#{stream_name}.#{consumer_name}"
payload = Jason.encode!(%{ pause_until: pause_until })
with {:ok, _response} <- request(conn, topic, payload) do
:ok
end
end

@doc """
Information about the consumer.
Expand Down

0 comments on commit b001247

Please sign in to comment.