Skip to content

Commit

Permalink
Add original message timestamp to republished message headers (#3933)
Browse files Browse the repository at this point in the history
Adds original message timestamp to republished message as
"Nats-Time-Stamp" header

 - [ ] Link to issue, e.g. `Resolves #NNN`
 - [ ] Documentation added (if applicable)
 - [x] Tests added
- [x] Branch rebased on top of current main (`git pull --rebase origin
main`)
- [x] Changes squashed to a single commit (described
[here](http://gitready.com/advanced/2009/02/10/squashing-commits-with-rebase.html))
 - [ ] Build is green in Travis CI
- [x] You have certified that the contribution is your original work and
that you license the work to the project under the [Apache 2
license](https://github.com/nats-io/nats-server/blob/main/LICENSE)

Resolves #

### Changes proposed in this pull request:

- Add original message timestamp to republished message as header
"Nats-Time-Stamp"
 -
 -
  • Loading branch information
derekcollison committed Apr 2, 2023
2 parents ac96d75 + 2d4c12c commit 299d9b2
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
6 changes: 6 additions & 0 deletions server/jetstream_cluster_2_test.go
Expand Up @@ -5651,6 +5651,12 @@ func TestJetStreamClusterStreamRepublish(t *testing.T) {
seq, err := strconv.Atoi(m.Header.Get(JSSequence))
require_NoError(t, err)
require_True(t, seq == i)
// Make sure timestamp is correct
ts, err := time.Parse(time.RFC3339Nano, m.Header.Get(JSTimeStamp))
require_NoError(t, err)
origMsg, err := js.GetMsg("RP", uint64(seq))
require_NoError(t, err)
require_True(t, ts == origMsg.Time)
// Make sure last sequence matches last seq we received on this subject.
last, err := strconv.Atoi(m.Header.Get(JSLastSequence))
require_NoError(t, err)
Expand Down
10 changes: 6 additions & 4 deletions server/stream.go
Expand Up @@ -4070,21 +4070,23 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,

// Check for republish.
if republish {
tsStr := time.Unix(0, ts).UTC().Format(time.RFC3339Nano)
var rpMsg []byte
if len(hdr) == 0 {
const ht = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Last-Sequence: %d\r\n\r\n"
const htho = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Last-Sequence: %d\r\nNats-Msg-Size: %d\r\n\r\n"
const ht = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Time-Stamp: %s\r\nNats-Last-Sequence: %d\r\n\r\n"
const htho = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Time-Stamp: %s\r\nNats-Last-Sequence: %d\r\nNats-Msg-Size: %d\r\n\r\n"
if !thdrsOnly {
hdr = []byte(fmt.Sprintf(ht, name, subject, seq, tlseq))
hdr = []byte(fmt.Sprintf(ht, name, subject, seq, tsStr, tlseq))
rpMsg = copyBytes(msg)
} else {
hdr = []byte(fmt.Sprintf(htho, name, subject, seq, tlseq, len(msg)))
hdr = []byte(fmt.Sprintf(htho, name, subject, seq, tsStr, tlseq, len(msg)))
}
} else {
// Slow path.
hdr = genHeader(hdr, JSStream, name)
hdr = genHeader(hdr, JSSubject, subject)
hdr = genHeader(hdr, JSSequence, strconv.FormatUint(seq, 10))
hdr = genHeader(hdr, JSTimeStamp, tsStr)
hdr = genHeader(hdr, JSLastSequence, strconv.FormatUint(tlseq, 10))
if !thdrsOnly {
rpMsg = copyBytes(msg)
Expand Down

0 comments on commit 299d9b2

Please sign in to comment.