Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add TTL support #25

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 1 addition & 10 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
strategy:
matrix:
os: [macOS-latest, ubuntu-latest]
goversion: [1.13, 1.14, 1.15]
goversion: [1.17]
steps:
- name: Set up Go ${{matrix.goversion}} on ${{matrix.os}}
uses: actions/setup-go@v1
Expand All @@ -18,20 +18,11 @@ jobs:
- name: Check out code into the Go module directory
uses: actions/checkout@v1

- name: gofmt
run: |
[[ -z $(gofmt -l $(find . -name '*.go') ) ]]

- name: Get dependencies
env:
GO111MODULE: on
run: go mod download

- name: Vet
env:
GO111MODULE: on
run: go vet -mod=readonly ./...

- name: Test
env:
GO111MODULE: on
Expand Down
46 changes: 46 additions & 0 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: golangci-lint
on:
push:
tags:
- v*
branches:
- master
- main
pull_request:
permissions:
contents: read
# Optional: allow read access to pull request. Use with `only-new-issues` option.
pull-requests: read
jobs:
golangci:
name: lint
runs-on: ubuntu-latest
steps:
- uses: actions/setup-go@v3
with:
go-version: 1.17
- uses: actions/checkout@v3
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
# Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version
version: v1.29

# Optional: working directory, useful for monorepos
# working-directory: somedir

# Optional: golangci-lint command line arguments.
# args: --issues-exit-code=0

# Optional: show only new issues if it's a pull request. The default value is `false`.
only-new-issues: true

# Optional: if set to true then the all caching functionality will be complete disabled,
# takes precedence over all other caching options.
# skip-cache: true

# Optional: if set to true then the action don't cache or restore ~/go/pkg.
# skip-pkg-cache: true

# Optional: if set to true then the action don't cache or restore ~/.cache/go-build.
# skip-build-cache: true
5 changes: 5 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
issues:
exclude-rules:
- linters:
- staticcheck
text: "SA1019:"
48 changes: 31 additions & 17 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ limitations under the License.

package galaxycache

import "time"

// Codec includes both the BinaryMarshaler and BinaryUnmarshaler
// interfaces
type Codec interface {
MarshalBinary() ([]byte, error)
UnmarshalBinary(data []byte) error
MarshalBinary() ([]byte, time.Time, error)
UnmarshalBinary(data []byte, expire time.Time) error
Comment on lines +24 to +25
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very disruptive way to side-channel the expiration time around that breaks all existing Codec implementations as well as all BackendGetter implementations.

}

// Note: to ensure that unmarshaling is a read-only operation, bytes
Expand All @@ -32,48 +34,60 @@ func cloneBytes(b []byte) []byte {
}

// ByteCodec is a byte slice type that implements Codec
type ByteCodec []byte
type ByteCodec struct {
bytes []byte
expire time.Time
}
Comment on lines +37 to +40
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change breaks the common use-case of ByteCodec. (same with the other two public Codec implementations in this file)

This change makes it impossible to extract the value out for anyone outside this package.


// MarshalBinary on a ByteCodec returns the bytes
func (c *ByteCodec) MarshalBinary() ([]byte, error) {
return *c, nil
func (c *ByteCodec) MarshalBinary() ([]byte, time.Time, error) {
return c.bytes, c.expire, nil
}

// UnmarshalBinary on a ByteCodec sets the ByteCodec to
// a copy of the provided data
func (c *ByteCodec) UnmarshalBinary(data []byte) error {
*c = cloneBytes(data)
func (c *ByteCodec) UnmarshalBinary(data []byte, expire time.Time) error {
c.bytes = cloneBytes(data)
c.expire = expire
return nil
}

// CopyingByteCodec is a byte slice type that implements Codec
// and returns a copy of the bytes when marshaled
type CopyingByteCodec []byte
type CopyingByteCodec struct {
bytes []byte
expire time.Time
}

// MarshalBinary on a CopyingByteCodec returns a copy of the bytes
func (c *CopyingByteCodec) MarshalBinary() ([]byte, error) {
return cloneBytes(*c), nil
func (c *CopyingByteCodec) MarshalBinary() ([]byte, time.Time, error) {
return cloneBytes(c.bytes), c.expire, nil
}

// UnmarshalBinary on a CopyingByteCodec sets the ByteCodec to
// a copy of the provided data
func (c *CopyingByteCodec) UnmarshalBinary(data []byte) error {
*c = cloneBytes(data)
func (c *CopyingByteCodec) UnmarshalBinary(data []byte, expire time.Time) error {
c.bytes = cloneBytes(data)
c.expire = expire
return nil
}

// StringCodec is a string type that implements Codec
type StringCodec string
type StringCodec struct {
str string
expire time.Time
}

// MarshalBinary on a StringCodec returns the bytes underlying
// the string
func (c *StringCodec) MarshalBinary() ([]byte, error) {
return []byte(*c), nil
func (c *StringCodec) MarshalBinary() ([]byte, time.Time, error) {
return []byte(c.str), c.expire, nil
}

// UnmarshalBinary on a StringCodec sets the StringCodec to
// a stringified copy of the provided data
func (c *StringCodec) UnmarshalBinary(data []byte) error {
*c = StringCodec(data)
func (c *StringCodec) UnmarshalBinary(data []byte, expire time.Time) error {
c.str = string(data)
c.expire = expire
return nil
}
15 changes: 12 additions & 3 deletions codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package galaxycache
import (
"bytes"
"testing"
"time"

"github.com/stretchr/testify/require"
)

const testBytes = "some bytes"
Expand Down Expand Up @@ -51,25 +54,31 @@ func TestCodec(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
inBytes := []byte(testBytes)
tc.codec.UnmarshalBinary(inBytes)
require.NoError(t, tc.codec.UnmarshalBinary(inBytes, time.Time{}))
inBytes[0] = 'a' // change the original byte slice to ensure copy was made
marshaledBytes, err := tc.codec.MarshalBinary()
marshaledBytes, expTm, err := tc.codec.MarshalBinary()
if err != nil {
t.Errorf("Error marshaling from byteCodec: %s", err)
}
if !expTm.Equal(time.Time{}) {
t.Errorf("Expected empty expiration time")
}
if string(marshaledBytes) != testBytes {
t.Errorf("Unmarshal/Marshal resulted in %q; want %q", marshaledBytes, testBytes)
}

if tc.checkCopy {
marshaledBytes[0] = 'a' // change marshaled bytes to ensure full copy on marshal
secondMarshaledBytes, errM := tc.codec.MarshalBinary()
secondMarshaledBytes, expTm, errM := tc.codec.MarshalBinary()
if errM != nil {
t.Errorf("Error marshaling from byteCodec: %s", errM)
}
if bytes.Equal(marshaledBytes, secondMarshaledBytes) {
t.Errorf("Marshaling did not copy the bytes")
}
if !expTm.Equal(time.Time{}) {
t.Errorf("Expected empty expiration time")
}
}
})
}
Expand Down
61 changes: 46 additions & 15 deletions galaxycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,35 @@ func (g *Galaxy) recordRequest(ctx context.Context, h hitLevel, localAuthoritati
}
}

// GetMultiple is like Get but fetches multiple keys at once into the respective
// destinations (codecs).
func (g *Galaxy) GetMultiple(ctx context.Context, keys []string, destinations []Codec) error {
if len(keys) != len(destinations) {
return fmt.Errorf("number of keys vs. codecs doesn't match (%d vs. %d)", len(keys), len(destinations))
}
ctx, tagErr := tag.New(ctx, tag.Upsert(GalaxyKey, g.name))
if tagErr != nil {
return fmt.Errorf("Error tagging context: %s", tagErr)
}

ctx, span := trace.StartSpan(ctx, "galaxycache.(*Galaxy).GetMultiple on "+g.name)
startTime := time.Now()
defer func() {
g.recordStats(ctx, nil, MRoundtripLatencyMilliseconds.M(sinceInMilliseconds(startTime)))
span.End()
}()

g.Stats.Gets.Add(1)
g.recordStats(ctx, nil, MGets.M(1))

// TODO:
// Group each key by peer.
// Request all of those keys from that one peer concurrently.
// Try to load what is missing.

return nil
}

// Get as defined here is the primary "get" called on a galaxy to
// find the value for the given key, using the following logic:
// - First, try the local cache; if its a cache hit, we're done
Expand Down Expand Up @@ -436,7 +465,7 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error {
value.stats.touch()
g.recordRequest(ctx, hlvl, false)
g.recordStats(ctx, nil, MValueLength.M(int64(len(value.data))))
return dest.UnmarshalBinary(value.data)
return dest.UnmarshalBinary(value.data, value.expire)
}

span.Annotatef([]trace.Attribute{trace.BoolAttribute("cache_hit", false)}, "Cache miss")
Expand All @@ -456,7 +485,7 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error {
if destPopulated {
return nil
}
return dest.UnmarshalBinary(value.data)
return dest.UnmarshalBinary(value.data, value.expire)
}

type valWithLevel struct {
Expand Down Expand Up @@ -525,7 +554,7 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi
// probably boring (normal task movement), so not
// worth logging I imagine.
}
data, err := g.getLocally(ctx, key, dest)
data, expTm, err := g.getLocally(ctx, key, dest)
if err != nil {
g.Stats.BackendLoadErrors.Add(1)
g.recordStats(ctx, nil, MBackendLoadErrors.M(1))
Expand All @@ -535,7 +564,7 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi
g.Stats.CoalescedBackendLoads.Add(1)
g.recordStats(ctx, nil, MCoalescedBackendLoads.M(1))
destPopulated = true // only one caller of load gets this return value
value = newValWithStat(data, nil)
value = newValWithStat(data, nil, expTm)
g.populateCache(ctx, key, value, &g.mainCache)
return &valWithLevel{value, hitBackend, authoritative, peerErr, err}, nil
})
Expand All @@ -548,22 +577,23 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi
return
}

func (g *Galaxy) getLocally(ctx context.Context, key string, dest Codec) ([]byte, error) {
func (g *Galaxy) getLocally(ctx context.Context, key string, dest Codec) ([]byte, time.Time, error) {
err := g.getter.Get(ctx, key, dest)
if err != nil {
return nil, err
return nil, time.Time{}, err
}
return dest.MarshalBinary()
}

func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string) (*valWithStat, error) {
data, err := peer.Fetch(ctx, g.name, key)
data, err := peer.Fetch(ctx, g.name, []string{key})
if err != nil {
return nil, err
}
expire := data[0].TTL
vi, ok := g.candidateCache.get(key)
if !ok {
vi = g.addNewToCandidateCache(key)
vi = g.addNewToCandidateCache(key, expire)
}

g.maybeUpdateHotCacheStats() // will update if at least a second has passed since the last update
Expand All @@ -573,7 +603,7 @@ func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string
KeyQPS: kStats.val(),
HCStats: g.hcStatsWithTime.hcs,
}
value := newValWithStat(data, kStats)
value := newValWithStat(data[0].Data, kStats, expire)
if g.opts.promoter.ShouldPromote(key, value.data, stats) {
g.populateCache(ctx, key, value, &g.hotCache)
}
Expand Down Expand Up @@ -627,7 +657,7 @@ func (g *Galaxy) populateCache(ctx context.Context, key string, value *valWithSt
}

func (g *Galaxy) recordStats(ctx context.Context, mutators []tag.Mutator, measurements ...stats.Measurement) {
stats.RecordWithOptions(
_ = stats.RecordWithOptions(
ctx,
stats.WithMeasurements(measurements...),
stats.WithTags(mutators...),
Expand Down Expand Up @@ -692,8 +722,9 @@ func (c *cache) stats() CacheStats {
}

type valWithStat struct {
data []byte
stats *keyStats
data []byte
stats *keyStats
expire time.Time
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit memory-inefficient to add the expiration time to both to the LRU's value struct and the value we're storing inside the LRU.

}

// sizeOfValWithStats returns the total size of the value in the hot/main
Expand All @@ -704,21 +735,21 @@ func (v *valWithStat) size() int64 {
return int64(unsafe.Sizeof(*v.stats)) + int64(len(v.data)) + int64(unsafe.Sizeof(v)) + int64(unsafe.Sizeof(*v))
}

func (c *cache) setLRUOnEvicted(f func(key string, kStats *keyStats)) {
func (c *cache) setLRUOnEvicted(f func(key string, kStats *keyStats, ttl time.Time)) {
c.lru.OnEvicted = func(key lru.Key, value interface{}) {
val := value.(*valWithStat)
c.nbytes -= int64(len(key.(string))) + val.size()
c.nevict++
if f != nil {
f(key.(string), val.stats)
f(key.(string), val.stats, val.expire)
}
}
}

func (c *cache) add(key string, value *valWithStat) {
c.mu.Lock()
defer c.mu.Unlock()
c.lru.Add(key, value)
c.lru.Add(key, value, value.expire)
c.nbytes += int64(len(key)) + value.size()
}

Expand Down