Skip to content

Commit

Permalink
wip: gitserver: grpc: create chagned files impl for grpc server
Browse files Browse the repository at this point in the history
  • Loading branch information
ggilmore committed May 7, 2024
1 parent b4c835b commit 53c8d36
Show file tree
Hide file tree
Showing 16 changed files with 3,216 additions and 461 deletions.
20 changes: 19 additions & 1 deletion cmd/gitserver/internal/git/gitcli/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,18 @@ func newGitDiffIterator(rc io.ReadCloser) git.ChangedFilesIterator {
scanner := bufio.NewScanner(rc)
scanner.Split(byteutils.ScanNullLines)

closeChan := make(chan struct{})
closer := sync.OnceValue(func() error {
return rc.Close()
err := rc.Close()
close(closeChan)

return err
})

return &gitDiffIterator{
rc: rc,
scanner: scanner,
closeChan: closeChan,
onceFuncCloser: closer,
}
}
Expand All @@ -102,11 +107,24 @@ type gitDiffIterator struct {
rc io.ReadCloser
scanner *bufio.Scanner

closeChan chan struct{}
onceFuncCloser func() error
}

func (i *gitDiffIterator) Next() (gitdomain.PathStatus, error) {
select {
case <-i.closeChan:
return gitdomain.PathStatus{}, io.EOF
default:
}

for i.scanner.Scan() {
select {
case <-i.closeChan:
return gitdomain.PathStatus{}, io.EOF
default:
}

status := i.scanner.Text()
if len(status) == 0 {
continue
Expand Down
26 changes: 26 additions & 0 deletions cmd/gitserver/internal/git/gitcli/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,32 @@ func TestGitDiffIterator(t *testing.T) {

err := iter.Close()
require.NoError(t, err)

// Verify that Next returns io.EOF after the iterator is closed
_, err = iter.Next()
require.Equal(t, io.EOF, err)
})

t.Run("close iterator during iteration", func(t *testing.T) {
input := combineBytes(
[]byte("A"), []byte{NUL}, []byte("file1.txt"), []byte{NUL},
[]byte("M"), []byte{NUL}, []byte("file2.txt"), []byte{NUL},
[]byte("D"), []byte{NUL}, []byte("file3.txt"), []byte{NUL},
)
rc := io.NopCloser(bytes.NewReader(input))
iter := newGitDiffIterator(rc)

// Iterate over the first change
_, err := iter.Next()
require.NoError(t, err)

// Close the iterator during iteration
err = iter.Close()
require.NoError(t, err)

// Verify that Next returns io.EOF after the iterator is closed
_, err = iter.Next()
require.Equal(t, io.EOF, err)
})
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/gitserver/internal/git/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ type ChangedFilesIterator interface {
// If there are no more files, io.EOF is returned.
Next() (gitdomain.PathStatus, error)
// Close releases resources associated with the iterator.
//
// After Close() is called, Next() will always return io.EOF.
Close() error
}

Expand Down
81 changes: 81 additions & 0 deletions cmd/gitserver/internal/server_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,87 @@ func (gs *grpcServer) BehindAhead(ctx context.Context, req *proto.BehindAheadReq
return behindAhead.ToProto(), nil
}

func (gs *grpcServer) ChangedFiles(req *proto.ChangedFilesRequest, ss proto.GitserverService_ChangedFilesServer) error {
ctx := ss.Context()

accesslog.Record(
ctx,
req.GetRepoName(),
log.String("base", string(req.GetBase())),
log.String("head", string(req.GetHead())),
)

if req.GetRepoName() == "" {
return status.New(codes.InvalidArgument, "repo must be specified").Err()
}

if len(req.GetHead()) == 0 {
return status.New(codes.InvalidArgument, "head (<tree-ish>) must be specified").Err()
}

repoName := api.RepoName(req.GetRepoName())
repoDir := gs.fs.RepoDir(repoName)

if err := gs.checkRepoExists(ctx, repoName); err != nil {
return err
}

backend := gs.getBackendFunc(repoDir, repoName)

iterator, err := backend.ChangedFiles(ctx, string(req.GetBase()), string(req.GetHead()))
if err != nil {
if gitdomain.IsRevisionNotFoundError(err) {
s, err := status.New(codes.NotFound, "revision not found").WithDetails(&proto.RevisionNotFoundPayload{
Repo: req.GetRepoName(),
Spec: err.Error(),
})
if err != nil {
return err
}
return s.Err()
}

gs.svc.LogIfCorrupt(ctx, repoName, err)
return err
}
defer iterator.Close()

chunker := chunk.New(func(paths []*proto.ChangedFile) error {
out := &proto.ChangedFilesResponse{
Files: paths,
}

return ss.Send(out)
})

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

file, err := iterator.Next()
if err == io.EOF {
break
}

if err != nil {
return errors.Wrap(err, "failed to get changed file")
}

if err := chunker.Send(file.ToProto()); err != nil {
return errors.Wrapf(err, "failed to send changed file %s", file)
}
}

if err := chunker.Flush(); err != nil {
return errors.Wrap(err, "failed to flush file chunks")
}

return nil
}

// checkRepoExists checks if a given repository is cloned on disk, and returns an
// error otherwise.
// On Sourcegraph.com, not all repos are managed by the scheduler. We thus
Expand Down
35 changes: 33 additions & 2 deletions cmd/gitserver/internal/server_grpc_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ import (
"sync/atomic"
"time"

"google.golang.org/grpc/codes"

"github.com/sourcegraph/sourcegraph/cmd/gitserver/internal/urlredactor"
"github.com/sourcegraph/sourcegraph/internal/grpc/grpcutil"
"github.com/sourcegraph/sourcegraph/internal/vcs"
"google.golang.org/grpc/codes"

"github.com/sourcegraph/log"
"google.golang.org/grpc/status"

proto "github.com/sourcegraph/sourcegraph/internal/gitserver/v1"
"github.com/sourcegraph/sourcegraph/internal/trace"
"google.golang.org/grpc/status"
)

// loggingGRPCServer is a wrapper around the provided GitserverServiceServer
Expand Down Expand Up @@ -1004,6 +1006,35 @@ func BehindAheadRequestToLogFields(req *proto.BehindAheadRequest) []log.Field {
}
}

func (l *loggingGRPCServer) ChangedFiles(req *proto.ChangedFilesRequest, ss proto.GitserverService_ChangedFilesServer) (err error) {
start := time.Now()

defer func() {
elapsed := time.Since(start)

doLog(
l.logger,
proto.GitserverService_ChangedFiles_FullMethodName,
status.Code(err),
trace.Context(ss.Context()).TraceID,
elapsed,

changedFilesRequestToLogFields(req)...,
)

}()

return l.base.ChangedFiles(req, ss)
}

func changedFilesRequestToLogFields(req *proto.ChangedFilesRequest) []log.Field {
return []log.Field{
log.String("repoName", req.GetRepoName()),
log.String("base", string(req.GetBase())),
log.String("head", string(req.GetHead())),
}
}

type loggingRepositoryServiceServer struct {
base proto.GitserverRepositoryServiceServer
logger log.Logger
Expand Down
116 changes: 116 additions & 0 deletions cmd/gitserver/internal/server_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,105 @@ func TestGRPCServer_ContributorCounts(t *testing.T) {
})
}

func TestGRPCServer_ChangedFiles(t *testing.T) {
mockSS := gitserver.NewMockGitserverService_ChangedFilesServer()
mockSS.ContextFunc.SetDefaultReturn(context.Background())
t.Run("argument validation", func(t *testing.T) {
t.Run("repo must be specified", func(t *testing.T) {
gs := &grpcServer{}
err := gs.ChangedFiles(&v1.ChangedFilesRequest{RepoName: "", Head: []byte("HEAD")}, mockSS)
require.ErrorContains(t, err, "repo must be specified")
assertGRPCStatusCode(t, err, codes.InvalidArgument)
})

t.Run("head (<tree-ish>) must be specified", func(t *testing.T) {
gs := &grpcServer{}
err := gs.ChangedFiles(&v1.ChangedFilesRequest{RepoName: "therepo"}, mockSS)
require.ErrorContains(t, err, "head (<tree-ish>) must be specified")
assertGRPCStatusCode(t, err, codes.InvalidArgument)
})
})
t.Run("checks for uncloned repo", func(t *testing.T) {
fs := gitserverfs.NewMockFS()
fs.RepoClonedFunc.SetDefaultReturn(false, nil)
locker := NewMockRepositoryLocker()
locker.StatusFunc.SetDefaultReturn("cloning", true)
gs := &grpcServer{svc: NewMockService(), fs: fs, locker: locker}
err := gs.ChangedFiles(&v1.ChangedFilesRequest{RepoName: "therepo", Base: []byte("base"), Head: []byte("head")}, mockSS)
require.Error(t, err)
assertGRPCStatusCode(t, err, codes.NotFound)
assertHasGRPCErrorDetailOfType(t, err, &proto.RepoNotFoundPayload{})
require.Contains(t, err.Error(), "repo not found")
mockassert.Called(t, fs.RepoClonedFunc)
mockassert.Called(t, locker.StatusFunc)
})
t.Run("revision not found", func(t *testing.T) {
fs := gitserverfs.NewMockFS()
// Repo is cloned, proceed!
fs.RepoClonedFunc.SetDefaultReturn(true, nil)
gs := &grpcServer{
svc: NewMockService(),
fs: fs,
getBackendFunc: func(common.GitDir, api.RepoName) git.GitBackend {
b := git.NewMockGitBackend()
b.ChangedFilesFunc.SetDefaultReturn(nil, &gitdomain.RevisionNotFoundError{Repo: "therepo", Spec: "base...head"})
return b
},
}
err := gs.ChangedFiles(&v1.ChangedFilesRequest{RepoName: "therepo", Base: []byte("base"), Head: []byte("head")}, mockSS)
require.Error(t, err)
assertGRPCStatusCode(t, err, codes.NotFound)
assertHasGRPCErrorDetailOfType(t, err, &proto.RevisionNotFoundPayload{})
require.Contains(t, err.Error(), "revision not found")
})
t.Run("e2e", func(t *testing.T) {
fs := gitserverfs.NewMockFS()
// Repo is cloned, proceed!
fs.RepoClonedFunc.SetDefaultReturn(true, nil)
b := git.NewMockGitBackend()
b.ChangedFilesFunc.SetDefaultReturn(&testChangedFilesIterator{
paths: []gitdomain.PathStatus{
{Path: "file1.txt", Status: gitdomain.AddedAMD},
{Path: "file2.txt", Status: gitdomain.ModifiedAMD},
{Path: "file3.txt", Status: gitdomain.DeletedAMD},
},
}, nil)
gs := &grpcServer{
svc: NewMockService(),
fs: fs,
getBackendFunc: func(common.GitDir, api.RepoName) git.GitBackend {
return b
},
}

cli := spawnServer(t, gs)
r, err := cli.ChangedFiles(context.Background(), &v1.ChangedFilesRequest{
RepoName: "therepo",
Base: []byte("base"),
Head: []byte("head"),
})
require.NoError(t, err)
var paths []*proto.ChangedFile
for {
msg, err := r.Recv()
if err != nil {
if err == io.EOF {
break
}
require.NoError(t, err)
}
paths = append(paths, msg.GetFiles()...)
}
if diff := cmp.Diff([]*proto.ChangedFile{
{Path: []byte("file1.txt"), Status: proto.ChangedFile_STATUS_ADDED},
{Path: []byte("file2.txt"), Status: proto.ChangedFile_STATUS_MODIFIED},
{Path: []byte("file3.txt"), Status: proto.ChangedFile_STATUS_DELETED},
}, paths, protocmp.Transform()); diff != "" {
t.Fatalf("unexpected response (-want +got):\n%s", diff)
}
})
}

func TestGRPCServer_FirstCommitEver(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -1106,3 +1205,20 @@ func spawnServer(t *testing.T, server *grpcServer) proto.GitserverServiceClient

return proto.NewGitserverServiceClient(cc)
}

type testChangedFilesIterator struct {
paths []gitdomain.PathStatus
}

func (t *testChangedFilesIterator) Next() (gitdomain.PathStatus, error) {
if len(t.paths) == 0 {
return gitdomain.PathStatus{}, io.EOF
}
path := t.paths[0]
t.paths = t.paths[1:]
return path, nil
}

func (t *testChangedFilesIterator) Close() error {
return nil
}
18 changes: 18 additions & 0 deletions internal/gitserver/errwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,4 +329,22 @@ func (r *errorTranslatingClient) BehindAhead(ctx context.Context, in *proto.Behi
return res, convertGRPCErrorToGitDomainError(err)
}

type errorTranslatingChangedFilesClient struct {
proto.GitserverService_ChangedFilesClient
}

func (r *errorTranslatingChangedFilesClient) Recv() (*proto.ChangedFilesResponse, error) {
res, err := r.GitserverService_ChangedFilesClient.Recv()
return res, convertGRPCErrorToGitDomainError(err)
}

// ChangedFiles implements v1.GitserverServiceClient.
func (r *errorTranslatingClient) ChangedFiles(ctx context.Context, in *proto.ChangedFilesRequest, opts ...grpc.CallOption) (proto.GitserverService_ChangedFilesClient, error) {
cc, err := r.base.ChangedFiles(ctx, in, opts...)
if err != nil {
return nil, convertGRPCErrorToGitDomainError(err)
}
return &errorTranslatingChangedFilesClient{cc}, nil
}

var _ proto.GitserverServiceClient = &errorTranslatingClient{}
5 changes: 4 additions & 1 deletion internal/gitserver/gitdomain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ go_library(
go_test(
name = "gitdomain_test",
timeout = "short",
srcs = ["common_test.go"],
srcs = [
"common_test.go",
"log_test.go",
],
embed = [":gitdomain"],
deps = [
"//internal/api",
Expand Down

0 comments on commit 53c8d36

Please sign in to comment.