New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ADDED] Stream subject mapping transforms #3814
Conversation
Extract subject transformation code out of accounts.go Stream sources can now have a subject mapping transform You can source the same stream more than once Remove limitation that the subject filter for a source, mirror or consumer must have an overlap with the sourced/mirrored's stream or the stream's subjects
fbd63d9
to
a953e84
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few minor nits.
server/jetstream_test.go
Outdated
@@ -11647,6 +11631,14 @@ func TestJetStreamSourceBasics(t *testing.T) { | |||
return nil | |||
}) | |||
|
|||
m, err := js.GetMsg("MS", 1) | |||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requie_NoError(t, err)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, since this pattern was present in many places in the file I made it consistent and replaced with require_NoError in all these other places in the file as well
server/jetstream_test.go
Outdated
} | ||
|
||
m, err := js.GetMsg("T1", 1) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
require_NoError()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done (see above)
server/stream.go
Outdated
@@ -58,6 +58,9 @@ type StreamConfig struct { | |||
Mirror *StreamSource `json:"mirror,omitempty"` | |||
Sources []*StreamSource `json:"sources,omitempty"` | |||
|
|||
// Allow applying a subject transform to incoming messages before doing anything else | |||
InputSubjectTransform *InputSubjectTransform `json:"input_subject_transform,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shorter json name possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought of just input_transform but then thought maybe we want to have transform on other things than subjects later. And also thought since it's only in stream config / stream info it's not critical to optimize the size and as humans will be editing those JSONs a lot of the time I erred on the side of maybe a bit too verbose figuring it couldn't hurt 🤷♂️ but yeah it's like 'input' it's the best I could come up with but not especially attached to it... Open to anything you'd rather use
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given it’s “subjects” when setting the interest subjects for the stream naming this input sometning is confusing so maybe subject_transform or even subjects_transform to be very clear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I put input in there to try and denote that it's not just messages from the 'subjects' that get transformed but also those from sources or the mirror, but really for lack of a better term. But just "subjects_transform" may lead people to forget it also applies to mirror/sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to convey it's a 'stream pre-processing' transform that happens after the subjects/mirror/sources step (which itself can already have another transform, at least for sources)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cant possibly convey all this by choice of a single word :) So seems we should at least just pick a term and stick with it, even better if its one we already use. It's subject(s).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe trying to cram too much meaning in a field name (picking good names can be hard 😅) and "subject_transform" is enough... @derekcollison let me know what name you like best and I'll adjust everywhere (companion branches) for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ripienaar fair enough, I will go with "subject_transform" then (unless someone else comes with a better one by tomorrow morning)
server/stream.go
Outdated
// Lock should be held | ||
func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo { | ||
if si == nil { | ||
return nil | ||
} | ||
|
||
ssi := &StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err} | ||
var ssi *StreamSourceInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe keep same original assignment and just add the field if si.tr != nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
server/subject_transform.go
Outdated
|
||
// Helper to ingest and index the subjectTransform destination token (e.g. $x or {{}}) in the token | ||
// returns a transformation type, and three function arguments: an array of source subject token indexes, and a single number (e.g. number of partitions, or a slice size), and a string (e.g.a split delimiter) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove empty line, make line above 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
… and calling Fatalf throughout all the tests in the file Improvements to readability
LMK when you want me to take another look for review. |
Still need to do the changes for input_subject_transform naming but I’m in meetings most of the time until sometime in the afternoon today, I’ll let you know. I believe that name change should be the only thing left.
… On Jan 26, 2023, at 9:44 AM, Derek Collison ***@***.***> wrote:
LMK when you want me to take another look for review.
—
Reply to this email directly, view it on GitHub <#3814 (comment)>, or unsubscribe <https://github.com/notifications/unsubscribe-auth/AB3U6WEROTVTPEYP6HLFF4TWUKZZDANCNFSM6AAAAAAUGX2NKQ>.
You are receiving this because you authored the thread.
|
…ce's SubjectTransform)
Change of the StreamConfig's input subject transform to just subject transform. Because I was already using "subject_transform" inside source config/source info which is not a complete transform (source+dest) but just the dest (since the source is in the subject_filter field) in turn I had to rename that to something else. Commits have also been pushed for this changes to the companion branches in nats.go, jsm.go, and natscli |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@derekcollison @jnmoyne Playing around, I was able to construct a cycle with the two stream setup with this PR and #3825 applied by setting the
And the equivalent for |
Will take a look |
I added in ability to have two sources reference each other as long as their subjects did not overlap. Just need to add in the transform logic in dev branch. |
I had created a branch that rebased this one on top of main, incorporating both sets of changes called |
Can't reproduce using the current top of dev, which also has #3825 merged in |
Adds support for subject mapping transforms in streams
Introduces the following changes:
This enables many interesting use cases, such as KV bucket sourcing, or inserting a partition number token in the subject in order to then create a consumer per partition, directly at the stream definition level rather than having to resort to using the Core NATS subject mapping functionality (which requires administrative access to the server config file or account key to be configured).
/cc @nats-io/core