Skip to content

Commit

Permalink
Split clients and consistency changes
Browse files Browse the repository at this point in the history
  • Loading branch information
angelini committed May 8, 2024
1 parent a40cddb commit e2fdadb
Show file tree
Hide file tree
Showing 15 changed files with 188 additions and 105 deletions.
10 changes: 4 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ build: internal/pb/fs.pb.go internal/pb/fs_grpc.pb.go internal/pb/cache.pb.go in
lint:
golangci-lint run



release/%_linux_amd64: cmd/%/main.go $(PKG_GO_FILES) $(INTERNAL_GO_FILES) go.sum
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build $(BUILD_FLAGS) -o $@ $<

Expand Down Expand Up @@ -177,10 +175,10 @@ client-getcache: export DL_SKIP_SSL_VERIFICATION=1
client-getcache:
go run cmd/client/main.go getcache --host $(GRPC_HOST) --path input/cache

client-getcache-from-daemon: export DL_TOKEN=$(DEV_TOKEN_ADMIN)
client-getcache-from-daemon: export DL_SKIP_SSL_VERIFICATION=1
client-getcache-from-daemon:
mkdir -p tmp/pods/test-pod/volumes/example && go run cmd/client/main.go getcache-from-daemon --host $(GRPC_HOST) --port $(GRPC_CACHED_PORT) input/cache
client-getcached: export DL_TOKEN=$(DEV_TOKEN_ADMIN)
client-getcached: export DL_SKIP_SSL_VERIFICATION=1
client-getcached:
go run cmd/client/main.go getcached --host $(GRPC_HOST) --port $(GRPC_CACHED_PORT) --path input/cache

client-gc-contents: export DL_TOKEN=$(DEV_TOKEN_ADMIN)
client-gc-contents: export DL_SKIP_SSL_VERIFICATION=1
Expand Down
24 changes: 12 additions & 12 deletions internal/pb/cache.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions internal/pb/cache.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ package pb;

option go_package = "github.com/gadget-inc/dateilager/pkg/pb";

service Cache {
rpc PopulateDiskCache(PopulateDiskCacheRequest)
returns (PopulateDiskCacheResponse);
service Cached {
rpc PopulateDiskCache(PopulateDiskCacheRequest) returns (PopulateDiskCacheResponse);
}

message PopulateDiskCacheRequest { string path = 1; }
Expand Down
62 changes: 31 additions & 31 deletions internal/pb/cache_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions internal/testutil/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/gadget-inc/dateilager/internal/db"
"github.com/gadget-inc/dateilager/internal/environment"
"github.com/gadget-inc/dateilager/pkg/api"
"github.com/gadget-inc/dateilager/pkg/client"
"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand Down Expand Up @@ -90,3 +91,11 @@ func (tc *TestCtx) FsApi() *api.Fs {
ContentLookup: tc.ContentLookup(),
}
}

func (tc *TestCtx) CachedApi(cl *client.Client, stagingPath string) *api.Cached {
return &api.Cached{
Env: environment.Test,
Client: cl,
StagingPath: stagingPath,
}
}
12 changes: 11 additions & 1 deletion pkg/api/cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,35 @@ import (
"path"
"time"

"github.com/gadget-inc/dateilager/internal/environment"
"github.com/gadget-inc/dateilager/internal/files"
"github.com/gadget-inc/dateilager/internal/key"
"github.com/gadget-inc/dateilager/internal/logger"
"github.com/gadget-inc/dateilager/internal/pb"
"github.com/gadget-inc/dateilager/pkg/client"
"golang.org/x/sys/unix"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type Cached struct {
pb.UnimplementedCacheServer
pb.UnimplementedCachedServer

Env environment.Env
Client *client.Client
StagingPath string

// the current directory holding a fully formed downloaded cache
currentDir string
// the current version of the cache on disk at currentDir
currentVersion int64
}

func (c *Cached) PopulateDiskCache(ctx context.Context, req *pb.PopulateDiskCacheRequest) (*pb.PopulateDiskCacheResponse, error) {
if c.Env != environment.Dev && c.Env != environment.Test {
return nil, status.Errorf(codes.Unimplemented, "Cached populateDiskCache only implemented in dev and test environments")
}

err := requireAdminAuth(ctx)
if err != nil {
return nil, err
Expand Down
15 changes: 5 additions & 10 deletions pkg/cached/cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/gadget-inc/dateilager/internal/logger"
"github.com/gadget-inc/dateilager/internal/pb"
"github.com/gadget-inc/dateilager/pkg/api"
"github.com/gadget-inc/dateilager/pkg/client"
"github.com/gadget-inc/dateilager/pkg/server"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
Expand All @@ -24,10 +23,9 @@ import (
type CacheServer struct {
Grpc *grpc.Server
Health *health.Server
Cached *api.Cached
}

func NewServer(ctx context.Context, client *client.Client, cert *tls.Certificate, stagingPath string, pasetoKey ed25519.PublicKey) *CacheServer {
func NewServer(ctx context.Context, cert *tls.Certificate, pasetoKey ed25519.PublicKey) *CacheServer {
creds := credentials.NewServerTLSFromCert(cert)
validator := auth.NewAuthValidator(pasetoKey)

Expand All @@ -53,21 +51,18 @@ func NewServer(ctx context.Context, client *client.Client, cert *tls.Certificate
healthServer := health.NewServer()
healthpb.RegisterHealthServer(grpcServer, healthServer)

cached := &api.Cached{
Client: client,
StagingPath: stagingPath,
}
pb.RegisterCacheServer(grpcServer, cached)

server := &CacheServer{
Grpc: grpcServer,
Health: healthServer,
Cached: cached,
}

return server
}

func (s *CacheServer) RegisterCachedServer(ctx context.Context, cached *api.Cached) {
pb.RegisterCachedServer(s.Grpc, cached)
}

func (s *CacheServer) Serve(lis net.Listener) error {
return s.Grpc.Serve(lis)
}
26 changes: 18 additions & 8 deletions pkg/cli/cachedaemon.go → pkg/cli/cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/gadget-inc/dateilager/internal/key"
"github.com/gadget-inc/dateilager/internal/logger"
"github.com/gadget-inc/dateilager/internal/telemetry"
"github.com/gadget-inc/dateilager/pkg/api"
"github.com/gadget-inc/dateilager/pkg/cached"
"github.com/gadget-inc/dateilager/pkg/client"
"github.com/gadget-inc/dateilager/pkg/version"
Expand Down Expand Up @@ -109,7 +110,14 @@ func NewCacheDaemonCommand() *cobra.Command {
return fmt.Errorf("failed to listen on TCP port %d: %w", port, err)
}

s := cached.NewServer(ctx, cl, &cert, stagingPath, pasetoKey)
s := cached.NewServer(ctx, &cert, pasetoKey)
logger.Info(ctx, "register Cached")
cached := &api.Cached{
Env: env,
Client: cl,
StagingPath: stagingPath,
}
s.RegisterCachedServer(ctx, cached)

osSignals := make(chan os.Signal, 1)
signal.Notify(osSignals, os.Interrupt, syscall.SIGTERM)
Expand All @@ -118,12 +126,12 @@ func NewCacheDaemonCommand() *cobra.Command {
s.Grpc.GracefulStop()
}()

err = s.Cached.Prepare(ctx)
err = cached.Prepare(ctx)
if err != nil {
return fmt.Errorf("failed to prepare cache daemon in %s: %w", stagingPath, err)
}

logger.Info(ctx, "start server", key.Port.Field(port), key.Environment.Field(env.String()))
logger.Info(ctx, "start cached server", key.Port.Field(port), key.Environment.Field(env.String()))
return s.Serve(listen)
},
PostRunE: func(cmd *cobra.Command, _ []string) error {
Expand All @@ -146,17 +154,19 @@ func NewCacheDaemonCommand() *cobra.Command {
flags.StringVar(&encoding, "log-encoding", "console", "Log encoding (console | json)")
flags.BoolVar(&tracing, "tracing", false, "Whether tracing is enabled")
flags.StringVar(&profilePath, "profile", "", "CPU profile output path (profiling enabled if set)")
flags.StringVar(&certFile, "cert", "development/server.crt", "TLS cert file")
flags.StringVar(&keyFile, "key", "development/server.key", "TLS key file")
flags.StringVar(&pasetoFile, "paseto", "development/paseto.pub", "Paseto public key file")

flags.IntVar(&port, "port", 5053, "cache API port")
flags.StringVar(&upstreamHost, "upstream-host", "localhost", "GRPC server hostname")
flags.Uint16Var(&upstreamPort, "upstream-port", 5051, "GRPC server port")
flags.StringVar(&headlessHost, "headless-host", "", "Alternative headless hostname to use for round robin connections")
flags.StringVar(&certFile, "cert", "development/server.crt", "TLS cert file")
flags.StringVar(&keyFile, "key", "development/server.key", "TLS key file")
flags.StringVar(&pasetoFile, "paseto", "development/paseto.pub", "Paseto public key file")
flags.UintVar(&timeout, "timeout", 0, "GRPC client timeout (ms)")
flags.IntVar(&port, "port", 5053, "cache API port")
flags.StringVar(&stagingPath, "staging-path", "", "path for staging downloaded caches")

flags.StringVar(&stagingPath, "staging-path", "", "path for staging downloaded caches")
_ = cmd.MarkPersistentFlagRequired("staging-path")

return cmd
}

Expand Down

0 comments on commit e2fdadb

Please sign in to comment.