Skip to content

Commit

Permalink
cmd/blobstore: add test stubs, implement bucket creation (#45686)
Browse files Browse the repository at this point in the history
* cmd/blobstore: add uploadstore testing structure

Once these tests are implemented and pass, then `cmd/blobstore` can
replace the old s3proxy implementation.

* cmd/blobstore: add basic request parsing, createBucket stub
* cmd/blobstore: implement bucket creation, bucket locking
* cmd/blobstore: make TestGetNotExists pass
* blobstore: improve "bucket already exists" behavior matching

Signed-off-by: Stephen Gutekanst <stephen@sourcegraph.com>
  • Loading branch information
slimsag committed Dec 15, 2022
1 parent 9361967 commit 5dec121
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 5 deletions.
106 changes: 104 additions & 2 deletions cmd/blobstore/internal/blobstore/blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,133 @@
package blobstore

import (
"context"
"fmt"
"net/http"
"os"
"path/filepath"
"strings"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
sglog "github.com/sourcegraph/log"

"github.com/sourcegraph/log"

"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/lib/errors"
)

// Service is the blobstore service. It is an http.Handler.
type Service struct {
DataDir string
Log log.Logger
ObservationCtx *observation.Context

initOnce sync.Once
bucketLocksMu sync.Mutex
bucketLocks map[string]*sync.RWMutex
}

func (s *Service) init() {
s.initOnce.Do(func() {
s.bucketLocks = map[string]*sync.RWMutex{}

if err := os.MkdirAll(filepath.Join(s.DataDir, "buckets"), os.ModePerm); err != nil {
s.Log.Fatal("cannot create buckets directory:", sglog.Error(err))
}
})
}

// ServeHTTP handles HTTP based search requests
func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
s.init()
metricRunning.Inc()
defer metricRunning.Dec()

// TODO(blobstore): handle requests
err := s.serve(w, r)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
s.Log.Error("serving request", sglog.Error(err))
fmt.Fprintf(w, "error: %v", err)
return
}
}

func (s *Service) serve(w http.ResponseWriter, r *http.Request) error {
ctx := r.Context()
path := strings.FieldsFunc(r.URL.Path, func(r rune) bool { return r == '/' })
switch r.Method {
case "PUT":
if len(path) == 1 {
// PUT /<bucket>
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateBucket.html
if r.ContentLength != 0 {
return errors.Newf("expected CreateBucket request to have content length 0: %s %s", r.Method, r.URL)
}
if err := s.createBucket(ctx, path[0]); err != nil {
if err == ErrBucketAlreadyExists {
w.WriteHeader(http.StatusConflict)
fmt.Fprintf(w, "bucket already exists")
return nil
}
return errors.Wrap(err, "createBucket")
}
w.WriteHeader(http.StatusOK)
return nil
}
return errors.Newf("unexpected PUT request: %s", r.URL)
case "GET":
if len(path) == 2 && r.URL.Query().Get("x-id") == "GetObject" {
// GET /<bucket>/<key>?x-id=GetObject
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
// TODO(blobstore): implement me!
w.WriteHeader(http.StatusNotFound)
return nil
}
return errors.Newf("unexpected GET request: %s", r.URL)
default:
return errors.Newf("unexpected request: %s %s", r.Method, r.URL)
}
}

var (
ErrBucketAlreadyExists = errors.New("bucket already exists")
)

func (s *Service) createBucket(ctx context.Context, name string) error {
_ = ctx
defer s.Log.Info("created bucket", sglog.String("name", name))

// Lock the bucket so nobody can read or write to the same bucket while we create it.
bucketLock := s.bucketLock(name)
bucketLock.Lock()
defer bucketLock.Unlock()

// Create the bucket storage directory.
bucketDir := filepath.Join(s.DataDir, "buckets", name)
if _, err := os.Stat(bucketDir); err == nil {
return ErrBucketAlreadyExists
}
if err := os.Mkdir(bucketDir, os.ModePerm); err != nil {
return errors.Wrap(err, "MkdirAll")
}
return nil
}

// returns a bucket-level lock which can be used for reading objects in a bucket, or in write-lock
// mode can be used to create or delete a bucket with the given name.
func (s *Service) bucketLock(name string) *sync.RWMutex {
s.bucketLocksMu.Lock()
defer s.bucketLocksMu.Unlock()

lock, ok := s.bucketLocks[name]
if !ok {
lock = &sync.RWMutex{}
s.bucketLocks[name] = lock
}
return lock
}

var (
Expand Down
117 changes: 114 additions & 3 deletions cmd/blobstore/internal/blobstore/blobstore_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,131 @@
package blobstore_test

import (
"context"
"io"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/sourcegraph/log/logtest"

"github.com/sourcegraph/sourcegraph/cmd/blobstore/internal/blobstore"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/uploadstore"
)

func TestBlobstore(t *testing.T) {
// Tests that initializing an uploadstore with blobstore as the backend works (performing no operations.)
func TestInit(t *testing.T) {
ctx := context.Background()
store, server := initTestStore(ctx, t)
defer server.Close()

_ = store
}

// Tests that getting an object that does not exist works.
func TestGetNotExists(t *testing.T) {
ctx := context.Background()
store, server := initTestStore(ctx, t)
defer server.Close()

reader, err := store.Get(ctx, "does-not-exist-key")
if err != nil {
t.Fatal("expected a reader, got an error", err)
}
defer reader.Close()
data, err := io.ReadAll(reader)
if err == nil {
t.Fatal("expected error")
}
if !strings.Contains(err.Error(), "NotFound") {
t.Fatalf("expected NotFound error, got %+v", err)
}
if len(data) != 0 {
t.Fatal("expected no data")
}
}

// Tests uploading an object works.
func TestUpload(t *testing.T) {
ctx := context.Background()
store, server := initTestStore(ctx, t)
defer server.Close()

// TODO(blobstore): call store.Upload(ctx context.Context, key string, r io.Reader) (int64, error)
_ = store
}

// Tests uploading an object and getting it back works.
func TestGetExists(t *testing.T) {
ctx := context.Background()
store, server := initTestStore(ctx, t)
defer server.Close()

// TODO(blobstore): call store.Get(ctx context.Context, key string) (io.ReadCloser, error)
_ = store
}

// Tests uploading two objects and then composing them together works.
func TestCompose(t *testing.T) {
ctx := context.Background()
store, server := initTestStore(ctx, t)
defer server.Close()

// TODO(blobstore): call store.Compose(ctx context.Context, destination string, sources ...string) (int64, error)
//
// Compose will concatenate the given source objects together and write to the given
// destination object. The source objects will be removed if the composed write is
// successful.
_ = store
}

// Tests deleting an object works.
func TestDelete(t *testing.T) {
ctx := context.Background()
store, server := initTestStore(ctx, t)
defer server.Close()

// TODO(blobstore): call store.Delete(ctx context.Context, key string) error
_ = store
}

// Tests expiring objects works.
func TestExpireObjects(t *testing.T) {
ctx := context.Background()
store, server := initTestStore(ctx, t)
defer server.Close()

// TODO(blobstore): call store.ExpireObjects(ctx context.Context, prefix string, maxAge time.Duration) error
_ = store
}

func initTestStore(ctx context.Context, t *testing.T) (uploadstore.Store, *httptest.Server) {
observationCtx := observation.TestContextTB(t)
ts := httptest.NewServer(&blobstore.Service{
DataDir: t.TempDir(),
Log: logtest.Scoped(t),
ObservationCtx: observation.TestContextTB(t),
ObservationCtx: observationCtx,
})
defer ts.Close()

config := uploadstore.Config{
Backend: "blobstore",
ManageBucket: false,
Bucket: "lsif-uploads",
TTL: 168 * time.Hour,
S3: uploadstore.S3Config{
Region: "us-east-1",
Endpoint: ts.URL,
UsePathStyle: false,
},
}
store, err := uploadstore.CreateLazy(ctx, config, uploadstore.NewOperations(observationCtx, "test", "lsifstore"))
if err != nil {
t.Fatal("CreateLazy", err)
}
if err := store.Init(ctx); err != nil {
t.Fatal("Init", err)
}
return store, ts
}

0 comments on commit 5dec121

Please sign in to comment.