Skip to content

JetStream Simplification #167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open

Conversation

vankiru
Copy link
Collaborator

@vankiru vankiru commented Mar 19, 2025

This is an implementation of JetStream Simplification.

Structure

The new API consists of several key objects:

  • Context is an entry point to the new API.
  • API defines JetStream API endpoints for operations on streams and consumers.
  • Stream represents a single stream and provides management methods.
  • Consumer represents a consumer and provides methods for its management.
  • Message is a set of wrappers for JetStream-specific messages.
  • Pull implements a base for pulling messages from a stream and is used to implement Fetch and Consume operations.
  • Publisher handles publishing messages to a stream.

API

The purpose of Api object is to simplify working with JetStream API:

class Api
extend DSL
endpoint :info, response: AccountInfoResponse, subject: false
group :stream do
endpoint :create, request: StreamCreateRequest, response: StreamCreateResponse
endpoint :update, request: StreamUpdateRequest, response: StreamUpdateResponse
endpoint :info, request: StreamInfoRequest, response: StreamInfoResponse
endpoint :delete, response: StreamDeleteResponse
endpoint :purge, request: StreamPurgeRequest, response: StreamPurgeResponse
endpoint :list, request: StreamListRequest, response: StreamListResponse, subject: false
endpoint :names, request: StreamNamesRequest, response: StreamNamesResponse, subject: false
group :msg do
endpoint :get, request: StreamMsgGetRequest, response: StreamMsgGetResponse
endpoint :delete, request: StreamMsgGetRequest, response: StreamMsgDeleteResponse
end

API endpoints are defined with custom dsl methods group and endpoint. The dsl magic is implemented in DSL, Group and Endpoint classes and uses Ruby meta programming.

The group method helps to create an endpoints hierarchy. The endpoint method defines an endpoint using:

  • request that defines what options can be sent to the endpoint,
  • response that describes the structure of the endpoint response,
  • subject defines whether or not the endpoint accepts a subject as its first field.

Different types of requests and responses are described in api/requests and api/response directories.

With the api object, making a request looks just like the request subject, which greatly simplifies the code that uses it. For example, to make an API request to $JS.API.STREAM.MSG.GET.* you just write

js = NATS.connect.js
js.api.stream.msg.get("stream", seq: 1)

Context

Since the Legacy JetStream API "occupies" the JetStream object, I made a new class, Context, to be an entry point to the simplified API.

It initializes an Api instance underneath, provides methods for accessing info and streams, and publishing messages with publish.

Stream

A single stream is represented by the Stream class. Most of the Stream subclasses define different internal parts of a stream, such as:

  • Stream::Config defines a stream config.
  • Stream::Info defines data that is returned by api.stream.info.
  • Stream::State defines a stream state object.
  • stream/schemas.rb contains definitions for Info sub-objects, such as SubjectTransfrom, ExternalStream, and StreamSource.

Each stream has methods to manage it: info, update, delete, and purge.

List

Stream::List encapsulates the logic of finding, creating and iterating over streams/names. The iteration is implemented via a Ruby Enumerator object that makes subsequent api.stream.list/names requests.

Consumer

The Consumer class represents a consumer. It has a set of sub-classes similar to a stream:

  • Consumer::Config defines a consumer config.
  • Consumer::Info defines data that is returned by api.consumer.info.
  • consumer/schemas.rb contains definitions for Info sub-objects, such as Limits and SequenceInfo.

A consumer has info, update and delete methods that uses api requests underneath.

List

Consumer::List encapsulates the logic of finding, creating and iterating over consumers/names. The iteration is implemented in the same way as for streams: via a Ruby Enumerator object that makes subsequent api.consumer.list/names requests.

Fetch

The Consumer::Fetch wraps up a Fetch pull and adds iterator like behaviour to it. The fetch operation blocks until all the messages are received or the pull expires. For more on Fetch see the Pull section.

Message

The Message class is a base for JetStream messages that can be divided into 4 groups:

  • Stream Message is returned by the stream.messages.find method, and it has only one method delete.
  • Consumer Message is a message you get inside the Fetch and Consume operations. It provides methods for message acknowledgement, such as ack, nack, term and in_progress.
  • Warning Message is used to signal to a consume pull that it needs to create a new message request. This group consists of NoMessages, RequestTimeout, MaxRequestBatchMessage, MaxRequestExpires, MaxRequestMaxBytes, MaxWaiting, MaxBytesExceeded and BatchCompleted
  • Error Message signals that something is wrong and we need to abort a fetch/consume request. This group consists of BadRequest, ConsumerDeleted, ConsumerLeadershipChanged, NoResponders.

Message.build method builds a JetStream message from a NATS message by checking its "Status" and "Description" headers.

Pull

Pull is the basis for the fetch and consume operations. A pull consists of

  • Config - defines pull options,
  • Subscription - a subscription that receives messages in the pull,
  • Handler - handles messages,
  • Buffer - tracks how many messages/bytes have been received/are pending,
  • Monitors, such as Heartbeats, Timeout, and Connection - monitor idle heartbeats, timeouts, and reconnections, respectively.

Each pull has a status field: pending, processing, draining or closed.

The workflow of the pull has only 2 steps:

  • start that starts the pull execution,
  • drain that starts draining the pull.

You can wait until a pull is closed with #wait method that blocks until the pull is closed or timeout seconds pass.

Fetch

Fetch::Handler handles messages in the following way:

  • ConsumerMessage - resets heartbeats, stores the message in the buffer, initiates drain if the buffer is full.
  • IdleHeartbeatMessage - resets heartbeats.
  • WarningMessage - stores the message description in last_error, drains the pull.
  • ErrorMessager- stores the message description in last_error, drains the pull.

Fetch::Buffer has three methods:

  • fetched(message) stores a message and tracks the number of messages/bytes fetched so far,
  • full? returns true if the number of messages/bytes fetched,
  • messages - an array of fetched messages.

Fetch::Heartbeats sets a Concurrent::ScheduledTask task for monitoring heartbeats. Every time a consumer or idle heartbeat message is received, the heartbeats task is rescheduled. If no messages are received or the pull request expires, the pull initiates drain.

Fetch::Timeout sets a Concurrent::ScheduledTask task for monitoring timeouts that will be executed after a fetch requests expires.

Consume

Consume::Handler handles messages in the following way:

  • ConsumerMessage - resets heartbeats, calls the user defined block, tracks the message in the buffer, requests new messages if the buffer is depleting.
  • IdleHeartbeatMessage - resets heartbeats.
  • WarningMessage - resets heartbeats, update the buffer if the message contains "Nats-Pending" headers, requests new messages if the buffer is depleting.
  • ErrorMessager- stores the message description in last_error, drains the pull.

Consume::Buffer has four methods:

  • consumed(message) tracks the number of messages/bytes pending,
  • depleting? returns true if the number of messages/bytes pending falls below the threshold,
  • refill - refills the number of messages/bytes pending indicating that a new batch of messages has been requested,
  • trim(message) - decrease the number of messages/bytes pending by "Nats-Pending" headers.

Consume::Heartbeats sets a Concurrent::ScheduledTask task for monitoring heartbeats. The heartbeats task is rescheduled whenever a consumer or idle heartbeat message is received. If no messages are received, the pull requests new messages.

Fetch::Connection registers a new status listener that listens to reconnecting/connected statuses (the idea is inspired by the Go client) and creates a separate thread that monitors the client connection via the status listener and stops/reschedules heartbeats depending on the client status.

Publish

A Publisher object implements publishing messages. It defines publish method that:

  1. processes options with Publisher::Options,
  2. makes a publish request,
  3. returns the result in the form of Publisher::Ack object.

def publish(subject, data, params = {})
options = Options.new(params)
message = begin
js.client.request(
subject,
data,
header: options.header,
timeout: options.timeout
)
rescue NATS::IO::NoRespondersError
raise JetStream::NoStreamResponseError
end
Ack.build(message)
end

Legacy vs Simplified API

The Legacy JetStream API is available as usual via jetstream and jsm methods

nats = NATS.connect

js = nats.jetstream
# or 
js = nats.jsm

The Simplified JetStream API can be accessed through the new js method or by creating a JetStream context object:

nats = NATS.connect

js = NATS::JetStream::Context.new(nats)
# or a shorter version
js = nats.js

@vankiru vankiru force-pushed the evl/jetstream-simplification branch from 108e261 to 8227b62 Compare March 20, 2025 12:38
@vankiru vankiru marked this pull request as ready for review March 21, 2025 18:09
@vankiru vankiru force-pushed the evl/jetstream-simplification branch from 174b8fe to 04cc96d Compare March 28, 2025 17:05
max = send("max_#{type}")

if threshold.nil?
instance_variable_set("@threshold_#{type}", max / 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it needs to be possible to also be able to make a batch request with both max msgs and max bytes limits, in the Go client it is an option called: PullMaxMessagesWithBytesLimit that always makes fetch requests using the same max_bytes https://github.com/nats-io/nats.go/pull/1789/files

https://github.com/nats-io/nats.go/blob/5efde11c8be67a150de0de3ec3bd3df962646166/jetstream/README.md?plain=1#L558-L564

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the consume config, so now consume can accept both max_messages and max_bytes parameters.

@vankiru vankiru force-pushed the evl/jetstream-simplification branch from df550c3 to 7195367 Compare April 25, 2025 18:34
@vankiru vankiru force-pushed the evl/jetstream-simplification branch from 4a58ad4 to 595189b Compare April 30, 2025 16:57
Copy link
Member

@wallyqs wallyqs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran into a few import issues running the examples, this should be ok usage right?

require 'nats'

nc = NATS.connect

jsctx = NATS::JetStream::Context.new(nc)

@@ -0,0 +1,45 @@
# frozen_string_literal: true

require "nats"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried running this example and got the following, maybe there is a missing require?

# bundle exec ruby examples/jetstream/consume.rb 

nats-pure.rb/lib/nats/io/client.rb:869:in 'NATS::Client#js': uninitialized constant NATS::JetStream::Context (NameError)

      ::NATS::JetStream::Context.new(self, options)
                       ^^^^^^^^^
	from examples/jetstream/consume.rb:6:in '<main>'

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed requires in nats.rb that caused the issue.

Please note that if you run examples with ruby path/to/example.rb you need to build and install the gem locally:

gem build
gem install nats-pure-2.5.0.gem

Or you can run bundle console and copy the code there.

end
alias_method :JetStream, :jetstream
alias_method :jsm, :jetstream

# Simplified JetStream API
def js(options = {})
::NATS::JetStream::Context.new(self, options)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running into this when running an example, maybe there is a missing require?

bundle exec ruby examples/jetstream/next.rb 
...nats-pure.rb/lib/nats/io/client.rb:869:in 'NATS::Client#js': uninitialized constant NATS::JetStream::Context (NameError)

      ::NATS::JetStream::Context.new(self, options)

Copy link
Collaborator Author

@vankiru vankiru May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. See #167 (comment)

```ruby
client = NATS.connect

js = NATS::JetStream::Context.new(client)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you include the requires in the examples as well? I tried this but ran into loading issues

require 'nats'

nc = NATS.connect

jsctx = NATS::JetStream::Context.new(nc)
lib/nats/jetstream/stream/schemas.rb:7:in '<class:Stream>': uninitialized constant NATS::Utils::Config (NameError)

      class SubjectTransform < NATS::Utils::Config

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. See #167 (comment)

class JetStream
class Stream
# Subject transform to apply to matching messages going into the stream
class SubjectTransform < NATS::Utils::Config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could it be that this file missing some requires?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. See #167 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants