diff --git a/pkg/drivers/jetstream/jetstream.go b/pkg/drivers/jetstream/jetstream.go index e44456d5..4918b7cb 100644 --- a/pkg/drivers/jetstream/jetstream.go +++ b/pkg/drivers/jetstream/jetstream.go @@ -16,9 +16,9 @@ import ( ) const ( - kineBucket = "kine" - revHistory = 12 - kSlowMethodMilliseconds = 500 + kineBucket = "kine" + revHistory = 12 + slowMethodMilliseconds = 500 ) var ( @@ -127,7 +127,7 @@ func (j *JetStream) Get(ctx context.Context, key string, revision int64) (revRet size = len(kvRet.Value) } fStr := "GET %s, rev=%d => revRet=%d, kv=%v, size=%d, err=%v, duration=%d" - if duration.Milliseconds() > kSlowMethodMilliseconds { + if duration.Milliseconds() > slowMethodMilliseconds { logrus.Warnf(fStr, key, revision, revRet, kvRet != nil, size, errRet, duration.Milliseconds()) } else { logrus.Tracef(fStr, key, revision, revRet, kvRet != nil, size, errRet, duration.Milliseconds()) @@ -214,7 +214,7 @@ func (j *JetStream) Create(ctx context.Context, key string, value []byte, lease defer func() { duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond()) fStr := "CREATE %s, size=%d, lease=%d => rev=%d, err=%v, duration=%d" - if duration.Milliseconds() > kSlowMethodMilliseconds { + if duration.Milliseconds() > slowMethodMilliseconds { logrus.Warnf(fStr, key, len(value), lease, revRet, errRet, duration.Milliseconds()) } else { logrus.Tracef(fStr, key, len(value), lease, revRet, errRet, duration.Milliseconds()) @@ -269,13 +269,12 @@ func (j *JetStream) Create(ctx context.Context, key string, value []byte, lease return 0, err } return int64(seq), nil - } else { - seq, err := j.kvBucket.Create(key, event) - if err != nil { - return 0, err - } - return int64(seq), nil } + seq, err := j.kvBucket.Create(key, event) + if err != nil { + return 0, err + } + return int64(seq), nil } func (j *JetStream) Delete(ctx context.Context, key string, revision int64) (revRet int64, kvRet *server.KeyValue, deletedRet bool, errRet error) { @@ -284,7 +283,7 @@ func (j *JetStream) Delete(ctx context.Context, key string, revision int64) (rev defer func() { duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond()) fStr := "DELETE %s, rev=%d => rev=%d, kv=%v, deleted=%v, err=%v, duration=%d" - if duration.Milliseconds() > kSlowMethodMilliseconds { + if duration.Milliseconds() > slowMethodMilliseconds { logrus.Warnf(fStr, key, revision, revRet, kvRet != nil, deletedRet, errRet, duration.Milliseconds()) } else { logrus.Tracef(fStr, key, revision, revRet, kvRet != nil, deletedRet, errRet, duration.Milliseconds()) @@ -305,9 +304,8 @@ func (j *JetStream) Delete(ctx context.Context, key string, revision int64) (rev if err != nil { if err == nats.ErrKeyNotFound { return rev, nil, true, nil - } else { - return rev, nil, false, err } + return rev, nil, false, err } if value == nil { @@ -357,7 +355,7 @@ func (j *JetStream) List(ctx context.Context, prefix, startKey string, limit, re defer func() { duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond()) fStr := "LIST %s, start=%s, limit=%d, rev=%d => rev=%d, kvs=%d, err=%v, duration=%d" - if duration.Milliseconds() > kSlowMethodMilliseconds { + if duration.Milliseconds() > slowMethodMilliseconds { logrus.Warnf(fStr, prefix, startKey, limit, revision, revRet, len(kvRet), errRet, duration.Milliseconds()) } else { logrus.Tracef(fStr, prefix, startKey, limit, revision, revRet, len(kvRet), errRet, duration.Milliseconds()) @@ -413,17 +411,17 @@ func (j *JetStream) List(ctx context.Context, prefix, startKey string, limit, re } } } - var nextRevId = minRev + var nextRevID = minRev var nextRevision nats.KeyValueEntry for k, v := range histories { logrus.Debugf("Checking %s history", k) for i := len(v) - 1; i >= 0; i-- { - if int64(v[i].Revision()) > nextRevId && int64(v[i].Revision()) <= revision { - nextRevId = int64(v[i].Revision()) + if int64(v[i].Revision()) > nextRevID && int64(v[i].Revision()) <= revision { + nextRevID = int64(v[i].Revision()) nextRevision = v[i] - logrus.Debugf("found next rev=%d", nextRevId) + logrus.Debugf("found next rev=%d", nextRevID) break - } else if int64(v[i].Revision()) <= nextRevId { + } else if int64(v[i].Revision()) <= nextRevID { break } } @@ -437,67 +435,65 @@ func (j *JetStream) List(ctx context.Context, prefix, startKey string, limit, re } return rev, kvs, nil + } - } else { - - current := true + current := true - if revision != 0 { - rev = revision - current = false - } + if revision != 0 { + rev = revision + current = false + } - if current { + if current { - entries, err := j.getKeyValues(ctx, prefix, true) - if err != nil { - return 0, nil, err - } - for _, e := range entries { - if count < limit || limit == 0 { - kv, err := decode(e) - if !j.isKeyExpired(ctx, e.Created(), &kv) && err == nil { - kvs = append(kvs, kv.KV) - count++ - } - } else { - break + entries, err := j.getKeyValues(ctx, prefix, true) + if err != nil { + return 0, nil, err + } + for _, e := range entries { + if count < limit || limit == 0 { + kv, err := decode(e) + if !j.isKeyExpired(ctx, e.Created(), &kv) && err == nil { + kvs = append(kvs, kv.KV) + count++ } + } else { + break } + } - } else { - keys, err := j.getKeys(ctx, prefix, true) - if err != nil { - return 0, nil, err - } - if revision == 0 && len(keys) == 0 { - return rev, nil, nil - } + } else { + keys, err := j.getKeys(ctx, prefix, true) + if err != nil { + return 0, nil, err + } + if revision == 0 && len(keys) == 0 { + return rev, nil, nil + } - for _, key := range keys { - if count < limit || limit == 0 { - if history, err := j.kvBucket.History(key, nats.Context(ctx)); err == nil { - for i := len(history) - 1; i >= 0; i-- { - if int64(history[i].Revision()) <= revision { - if entry, err := decode(history[i]); err == nil { - kvs = append(kvs, entry.KV) - count++ - } else { - logrus.Warnf("Could not decode %s rev=> %d", key, history[i].Revision()) - } - break + for _, key := range keys { + if count < limit || limit == 0 { + if history, err := j.kvBucket.History(key, nats.Context(ctx)); err == nil { + for i := len(history) - 1; i >= 0; i-- { + if int64(history[i].Revision()) <= revision { + if entry, err := decode(history[i]); err == nil { + kvs = append(kvs, entry.KV) + count++ + } else { + logrus.Warnf("Could not decode %s rev=> %d", key, history[i].Revision()) } + break } - } else { - // should not happen - logrus.Warnf("no history for %s", key) } + } else { + // should not happen + logrus.Warnf("no history for %s", key) } } - } - return rev, kvs, nil + } + return rev, kvs, nil } func (j *JetStream) listAfter(ctx context.Context, prefix string, revision int64) (revRet int64, eventRet []*server.Event, errRet error) { @@ -543,7 +539,7 @@ func (j *JetStream) Count(ctx context.Context, prefix string) (revRet int64, cou defer func() { duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond()) fStr := "COUNT %s => rev=%d, count=%d, err=%v, duration=%d" - if duration.Milliseconds() > kSlowMethodMilliseconds { + if duration.Milliseconds() > slowMethodMilliseconds { logrus.Warnf(fStr, prefix, revRet, count, err, duration.Milliseconds()) } else { logrus.Tracef(fStr, prefix, revRet, count, err, duration.Milliseconds()) @@ -579,7 +575,7 @@ func (j *JetStream) Update(ctx context.Context, key string, value []byte, revisi kvRev = kvRet.ModRevision } fStr := "UPDATE %s, value=%d, rev=%d, lease=%v => rev=%d, kvrev=%d, updated=%v, err=%v, duration=%d" - if duration.Milliseconds() > kSlowMethodMilliseconds { + if duration.Milliseconds() > slowMethodMilliseconds { logrus.Warnf(fStr, key, len(value), revision, lease, revRet, kvRev, updateRet, errRet, duration.Milliseconds()) } else { logrus.Tracef(fStr, key, len(value), revision, lease, revRet, kvRev, updateRet, errRet, duration.Milliseconds()) @@ -648,16 +644,16 @@ func (j *JetStream) Update(ctx context.Context, key string, value []byte, revisi func (j *JetStream) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event { - watchCtx, _ := context.WithCancel(ctx) + //watchCtx, _ := context.WithCancel(ctx) //logrus.Tracef("WATCH %s, rev=%d", prefix, revision) - watcher, err := j.kvBucket.(*kv.EncodedKV).WatchWithCtx(watchCtx, prefix, nats.IgnoreDeletes()) + watcher, err := j.kvBucket.(*kv.EncodedKV).WatchWithCtx(ctx, prefix, nats.IgnoreDeletes()) if revision > 0 { revision-- } - _, events, err := j.listAfter(watchCtx, prefix, revision) + _, events, err := j.listAfter(ctx, prefix, revision) if err != nil { logrus.Errorf("failed to create watcher %s for revision %d", prefix, revision) @@ -702,7 +698,7 @@ func (j *JetStream) Watch(ctx context.Context, prefix string, revision int64) <- if err != nil { logrus.Warnf("watch event: could not decode %s seq %d", i.Key(), i.Revision()) } - if _, prevEntry, prevErr := j.get(watchCtx, i.Key(), value.PrevRevision, false); prevErr == nil { + if _, prevEntry, prevErr := j.get(ctx, i.Key(), value.PrevRevision, false); prevErr == nil { if prevEntry != nil { prevValue = *prevEntry } @@ -737,7 +733,7 @@ func (j *JetStream) Watch(ctx context.Context, prefix string, revision int64) <- } // } } - case <-watchCtx.Done(): + case <-ctx.Done(): logrus.Infof("watcher: %s context cancelled", prefix) if err := watcher.Stop(); err != nil && err != nats.ErrBadSubscription { logrus.Warnf("error stopping %s watcher: %v", prefix, err)