Skip to content

Commit

Permalink
Merge pull request #799 from pjbgf/perf2
Browse files Browse the repository at this point in the history
plumbing: Optimise memory consumption for filesystem storage
  • Loading branch information
pjbgf committed Nov 6, 2023
2 parents 8c1e3e2 + 1c361ad commit 48bec23
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 61 deletions.
195 changes: 145 additions & 50 deletions plumbing/format/packfile/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package packfile
import (
"bytes"
"errors"
"fmt"
"io"

"github.com/go-git/go-git/v5/plumbing"
Expand Down Expand Up @@ -174,13 +175,25 @@ func (p *Parser) init() error {
return nil
}

type objectHeaderWriter func(typ plumbing.ObjectType, sz int64) error

type lazyObjectWriter interface {
// LazyWriter enables an object to be lazily written.
// It returns:
// - w: a writer to receive the object's content.
// - lwh: a func to write the object header.
// - err: any error from the initial writer creation process.
//
// Note that if the object header is not written BEFORE the writer
// is used, this will result in an invalid object.
LazyWriter() (w io.WriteCloser, lwh objectHeaderWriter, err error)
}

func (p *Parser) indexObjects() error {
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)

for i := uint32(0); i < p.count; i++ {
buf.Reset()

oh, err := p.scanner.NextObjectHeader()
if err != nil {
return err
Expand Down Expand Up @@ -220,21 +233,60 @@ func (p *Parser) indexObjects() error {
ota = newBaseObject(oh.Offset, oh.Length, t)
}

buf.Grow(int(oh.Length))
_, crc, err := p.scanner.NextObject(buf)
hasher := plumbing.NewHasher(oh.Type, oh.Length)
writers := []io.Writer{hasher}
var obj *plumbing.MemoryObject

// Lazy writing is only available for non-delta objects.
if p.storage != nil && !delta {
// When a storage is set and supports lazy writing,
// use that instead of creating a memory object.
if low, ok := p.storage.(lazyObjectWriter); ok {
ow, lwh, err := low.LazyWriter()
if err != nil {
return err
}

if err = lwh(oh.Type, oh.Length); err != nil {
return err
}

defer ow.Close()
writers = append(writers, ow)
} else {
obj = new(plumbing.MemoryObject)
obj.SetSize(oh.Length)
obj.SetType(oh.Type)

writers = append(writers, obj)
}
}
if delta && !p.scanner.IsSeekable {
buf.Reset()
buf.Grow(int(oh.Length))
writers = append(writers, buf)
}

mw := io.MultiWriter(writers...)

_, crc, err := p.scanner.NextObject(mw)
if err != nil {
return err
}

// Non delta objects needs to be added into the storage. This
// is only required when lazy writing is not supported.
if obj != nil {
if _, err := p.storage.SetEncodedObject(obj); err != nil {
return err
}
}

ota.Crc32 = crc
ota.Length = oh.Length

data := buf.Bytes()
if !delta {
sha1, err := getSHA1(ota.Type, data)
if err != nil {
return err
}
sha1 := hasher.Sum()

// Move children of placeholder parent into actual parent, in case this
// was a non-external delta reference.
Expand All @@ -249,20 +301,8 @@ func (p *Parser) indexObjects() error {
p.oiByHash[ota.SHA1] = ota
}

if p.storage != nil && !delta {
obj := new(plumbing.MemoryObject)
obj.SetSize(oh.Length)
obj.SetType(oh.Type)
if _, err := obj.Write(data); err != nil {
return err
}

if _, err := p.storage.SetEncodedObject(obj); err != nil {
return err
}
}

if delta && !p.scanner.IsSeekable {
data := buf.Bytes()
p.deltas[oh.Offset] = make([]byte, len(data))
copy(p.deltas[oh.Offset], data)
}
Expand All @@ -280,23 +320,29 @@ func (p *Parser) resolveDeltas() error {

for _, obj := range p.oi {
buf.Reset()
buf.Grow(int(obj.Length))
err := p.get(obj, buf)
if err != nil {
return err
}
content := buf.Bytes()

if err := p.onInflatedObjectHeader(obj.Type, obj.Length, obj.Offset); err != nil {
return err
}

if err := p.onInflatedObjectContent(obj.SHA1, obj.Offset, obj.Crc32, content); err != nil {
if err := p.onInflatedObjectContent(obj.SHA1, obj.Offset, obj.Crc32, nil); err != nil {
return err
}

if !obj.IsDelta() && len(obj.Children) > 0 {
// Dealing with an io.ReaderAt object, means we can
// create it once and reuse across all children.
r := bytes.NewReader(buf.Bytes())
for _, child := range obj.Children {
if err := p.resolveObject(io.Discard, child, content); err != nil {
// Even though we are discarding the output, we still need to read it to
// so that the scanner can advance to the next object, and the SHA1 can be
// calculated.
if err := p.resolveObject(io.Discard, child, r); err != nil {
return err
}
p.resolveExternalRef(child)
Expand Down Expand Up @@ -361,13 +407,13 @@ func (p *Parser) get(o *objectInfo, buf *bytes.Buffer) (err error) {
if o.DiskType.IsDelta() {
b := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(b)
buf.Grow(int(o.Length))
err := p.get(o.Parent, b)
if err != nil {
return err
}
base := b.Bytes()

err = p.resolveObject(buf, o, base)
err = p.resolveObject(buf, o, bytes.NewReader(b.Bytes()))
if err != nil {
return err
}
Expand All @@ -378,6 +424,13 @@ func (p *Parser) get(o *objectInfo, buf *bytes.Buffer) (err error) {
}
}

// If the scanner is seekable, caching this data into
// memory by offset seems wasteful.
// There is a trade-off to be considered here in terms
// of execution time vs memory consumption.
//
// TODO: improve seekable execution time, so that we can
// skip this cache.
if len(o.Children) > 0 {
data := make([]byte, buf.Len())
copy(data, buf.Bytes())
Expand All @@ -386,10 +439,25 @@ func (p *Parser) get(o *objectInfo, buf *bytes.Buffer) (err error) {
return nil
}

// resolveObject resolves an object from base, using information
// provided by o.
//
// This call has the side-effect of changing field values
// from the object info o:
// - Type: OFSDeltaObject may become the target type (e.g. Blob).
// - Size: The size may be update with the target size.
// - Hash: Zero hashes will be calculated as part of the object
// resolution. Hence why this process can't be avoided even when w
// is an io.Discard.
//
// base must be an io.ReaderAt, which is a requirement from
// patchDeltaStream. The main reason being that reversing an
// delta object may lead to going backs and forths within base,
// which is not supported by io.Reader.
func (p *Parser) resolveObject(
w io.Writer,
o *objectInfo,
base []byte,
base io.ReaderAt,
) error {
if !o.DiskType.IsDelta() {
return nil
Expand All @@ -400,26 +468,46 @@ func (p *Parser) resolveObject(
if err != nil {
return err
}
data := buf.Bytes()

data, err = applyPatchBase(o, data, base)
writers := []io.Writer{w}
var obj *plumbing.MemoryObject
var lwh objectHeaderWriter

if p.storage != nil {
if low, ok := p.storage.(lazyObjectWriter); ok {
ow, wh, err := low.LazyWriter()
if err != nil {
return err
}
lwh = wh

defer ow.Close()
writers = append(writers, ow)
} else {
obj = new(plumbing.MemoryObject)
ow, err := obj.Writer()
if err != nil {
return err
}

writers = append(writers, ow)
}
}

mw := io.MultiWriter(writers...)

err = applyPatchBase(o, base, buf, mw, lwh)
if err != nil {
return err
}

if p.storage != nil {
obj := new(plumbing.MemoryObject)
obj.SetSize(o.Size())
if obj != nil {
obj.SetType(o.Type)
if _, err := obj.Write(data); err != nil {
return err
}

obj.SetSize(o.Size()) // Size here is correct as it was populated by applyPatchBase.
if _, err := p.storage.SetEncodedObject(obj); err != nil {
return err
}
}
_, err = w.Write(data)
return err
}

Expand All @@ -443,24 +531,31 @@ func (p *Parser) readData(w io.Writer, o *objectInfo) error {
return nil
}

func applyPatchBase(ota *objectInfo, data, base []byte) ([]byte, error) {
patched, err := PatchDelta(base, data)
if err != nil {
return nil, err
// applyPatchBase applies the patch to target.
//
// Note that ota will be updated based on the description in resolveObject.
func applyPatchBase(ota *objectInfo, base io.ReaderAt, delta io.Reader, target io.Writer, wh objectHeaderWriter) error {
if target == nil {
return fmt.Errorf("cannot apply patch against nil target")
}

typ := ota.Type
if ota.SHA1 == plumbing.ZeroHash {
ota.Type = ota.Parent.Type
sha1, err := getSHA1(ota.Type, patched)
if err != nil {
return nil, err
}
typ = ota.Parent.Type
}

sz, h, err := patchDeltaWriter(target, base, delta, typ, wh)
if err != nil {
return err
}

ota.SHA1 = sha1
ota.Length = int64(len(patched))
if ota.SHA1 == plumbing.ZeroHash {
ota.Type = typ
ota.Length = int64(sz)
ota.SHA1 = h
}

return patched, nil
return nil
}

func getSHA1(t plumbing.ObjectType, data []byte) (plumbing.Hash, error) {
Expand Down

0 comments on commit 48bec23

Please sign in to comment.