Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Stream subject mapping transforms #3814

Merged
merged 4 commits into from Jan 26, 2023
Merged

Conversation

jnmoyne
Copy link
Contributor

@jnmoyne jnmoyne commented Jan 25, 2023

  • Documentation added (if applicable)
  • Tests added
  • Branch rebased on top of current dev
  • Build is green in Travis CI
  • You have certified that the contribution is your original work and that you license the work to the project under the Apache 2 license

Adds support for subject mapping transforms in streams

Introduces the following changes:

  • Extracts subject transform code and tests from accounts.go to it's own separate set of files.
  • Ability to add a subject mapping transform destination to sourced streams, allowing to map message subjects as they are received from the stream being sourced
  • Ability to source more than once from the same stream, each time using a specific subject filter and destination transform
  • Ability to add a stream input subject mapping transform to a stream which will not act as a filter but will transform the subject of messages matching the transform's source according to the transform's destination. This mapping happens regardless of how the messages are received into the stream (mirror, or subject(s) and/or source(s))

Stream transform-2023-01-17-1401-3

This enables many interesting use cases, such as KV bucket sourcing, or inserting a partition number token in the subject in order to then create a consumer per partition, directly at the stream definition level rather than having to resort to using the Core NATS subject mapping functionality (which requires administrative access to the server config file or account key to be configured).

/cc @nats-io/core

Extract subject transformation code out of accounts.go
Stream sources can now have a subject mapping transform
You can source the same stream more than once
Remove limitation that the subject filter for a source, mirror or consumer must have an overlap with the sourced/mirrored's stream or the stream's subjects
Copy link
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few minor nits.

@@ -11647,6 +11631,14 @@ func TestJetStreamSourceBasics(t *testing.T) {
return nil
})

m, err := js.GetMsg("MS", 1)
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requie_NoError(t, err)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, since this pattern was present in many places in the file I made it consistent and replaced with require_NoError in all these other places in the file as well

}

m, err := js.GetMsg("T1", 1)
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

require_NoError()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (see above)

server/stream.go Outdated
@@ -58,6 +58,9 @@ type StreamConfig struct {
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`

// Allow applying a subject transform to incoming messages before doing anything else
InputSubjectTransform *InputSubjectTransform `json:"input_subject_transform,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shorter json name possible?

Copy link
Contributor Author

@jnmoyne jnmoyne Jan 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of just input_transform but then thought maybe we want to have transform on other things than subjects later. And also thought since it's only in stream config / stream info it's not critical to optimize the size and as humans will be editing those JSONs a lot of the time I erred on the side of maybe a bit too verbose figuring it couldn't hurt 🤷‍♂️ but yeah it's like 'input' it's the best I could come up with but not especially attached to it... Open to anything you'd rather use

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given it’s “subjects” when setting the interest subjects for the stream naming this input sometning is confusing so maybe subject_transform or even subjects_transform to be very clear

Copy link
Contributor Author

@jnmoyne jnmoyne Jan 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I put input in there to try and denote that it's not just messages from the 'subjects' that get transformed but also those from sources or the mirror, but really for lack of a better term. But just "subjects_transform" may lead people to forget it also applies to mirror/sources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to convey it's a 'stream pre-processing' transform that happens after the subjects/mirror/sources step (which itself can already have another transform, at least for sources)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cant possibly convey all this by choice of a single word :) So seems we should at least just pick a term and stick with it, even better if its one we already use. It's subject(s).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe trying to cram too much meaning in a field name (picking good names can be hard 😅) and "subject_transform" is enough... @derekcollison let me know what name you like best and I'll adjust everywhere (companion branches) for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ripienaar fair enough, I will go with "subject_transform" then (unless someone else comes with a better one by tomorrow morning)

server/stream.go Outdated
// Lock should be held
func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo {
if si == nil {
return nil
}

ssi := &StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err}
var ssi *StreamSourceInfo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe keep same original assignment and just add the field if si.tr != nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


// Helper to ingest and index the subjectTransform destination token (e.g. $x or {{}}) in the token
// returns a transformation type, and three function arguments: an array of source subject token indexes, and a single number (e.g. number of partitions, or a slice size), and a string (e.g.a split delimiter)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove empty line, make line above 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

… and calling Fatalf throughout all the tests in the file

Improvements to readability
@derekcollison
Copy link
Member

LMK when you want me to take another look for review.

@jnmoyne
Copy link
Contributor Author

jnmoyne commented Jan 26, 2023 via email

@jnmoyne
Copy link
Contributor Author

jnmoyne commented Jan 26, 2023

Change of the StreamConfig's input subject transform to just subject transform. Because I was already using "subject_transform" inside source config/source info which is not a complete transform (source+dest) but just the dest (since the source is in the subject_filter field) in turn I had to rename that to something else.

Commits have also been pushed for this changes to the companion branches in nats.go, jsm.go, and natscli

@derekcollison derekcollison self-requested a review January 26, 2023 22:06
Copy link
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@derekcollison derekcollison merged commit c2d3d9c into dev Jan 26, 2023
@derekcollison derekcollison deleted the jnm/streamsourcetransform branch January 26, 2023 22:09
@bruth
Copy link
Member

bruth commented Jan 29, 2023

@derekcollison @jnmoyne Playing around, I was able to construct a cycle with the two stream setup with this PR and #3825 applied by setting the subject_transform_dest to the local subject subject, e.g.

{
  "name": "n1",
  "subjects": ["n1.>"],
  "sources": [
    {
      "name": "n2",
      "filter_subject": "n2.>",
      "subject_transform_dest": "n1.>"
    }
  ]
}

And the equivalent for n2 (swapping n1 and n2). This is to be expected since the transform occurs before sourcing occurs, but just wanted to call it out.

@jnmoyne
Copy link
Contributor Author

jnmoyne commented Jan 29, 2023

Will take a look

@derekcollison
Copy link
Member

I added in ability to have two sources reference each other as long as their subjects did not overlap.

Just need to add in the transform logic in dev branch.

@bruth
Copy link
Member

bruth commented Jan 29, 2023

I had created a branch that rebased this one on top of main, incorporating both sets of changes called bjr-suse.

@jnmoyne
Copy link
Contributor Author

jnmoyne commented Jan 30, 2023

Can't reproduce using the current top of dev, which also has #3825 merged in

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants