Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Concurrent stream creation of the same stream could return not found #4600

Merged
merged 1 commit into from Sep 28, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 18 additions & 11 deletions server/jetstream_api.go
Expand Up @@ -1731,14 +1731,13 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s

var clusterWideConsCount int

js, cc := s.getJetStreamCluster()
if js == nil {
return
}
// If we are in clustered mode we need to be the stream leader to proceed.
if s.JetStreamIsClustered() {
if cc != nil {
// Check to make sure the stream is assigned.
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
}

js.mu.RLock()
isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, streamName)
var offline bool
Expand Down Expand Up @@ -1833,15 +1832,23 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
}

mset, err := acc.lookupStream(streamName)
// Error is not to be expected at this point, but could happen if same stream trying to be created.
if err != nil {
resp.Error = NewJSStreamNotFoundError(Unless(err))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
if cc != nil {
// This could be inflight, pause for a short bit and try again.
// This will not be inline, so ok.
time.Sleep(10 * time.Millisecond)
mset, err = acc.lookupStream(streamName)
}
// Check again.
if err != nil {
resp.Error = NewJSStreamNotFoundError(Unless(err))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
}
config := mset.config()

js, _ := s.getJetStreamCluster()

resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.stateWithDetail(details),
Expand Down