Skip to content

Commit

Permalink
plumbing: Optimise memory consumption for filesystem storage
Browse files Browse the repository at this point in the history
Previously, as part of building the index representation, the resolveObject
func would create an interim plumbing.MemoryObject, which would then be
saved into storage via storage.SetEncodedObject. This meant that objects
would be unnecessarily loaded into memory, to then be saved into disk.

The changes streamlines this process by:
- Introducing the LazyObjectWriter interface which enables the write
  operation to take places directly against the filesystem-based storage.
- Leverage multi-writers to process the input data once, while targeting
  multiple writers (e.g. hasher and storage).

An additional change relates to the caching of object info children within
Parser.get. The cache is now skipped when a seekable filesystem is being
used.

The impact of the changes can be observed when using seekable filesystem
storages, especially when cloning large repositories.

The stats below were captured by adapting the BenchmarkPlainClone test
to clone https://github.com/torvalds/linux.git:

pkg: github.com/go-git/go-git/v5
cpu: Intel(R) Core(TM) i9-10885H CPU @ 2.40GHz
              │  /tmp/old   │             /tmp/new              │
              │   sec/op    │   sec/op    vs base               │
PlainClone-16   41.68 ± 17%   48.04 ± 9%  +15.27% (p=0.015 n=6)

              │   /tmp/old    │               /tmp/new               │
              │     B/op      │     B/op       vs base               │
PlainClone-16   1127.8Mi ± 7%   256.7Mi ± 50%  -77.23% (p=0.002 n=6)

              │  /tmp/old   │              /tmp/new              │
              │  allocs/op  │  allocs/op   vs base               │
PlainClone-16   3.125M ± 0%   3.800M ± 0%  +21.60% (p=0.002 n=6)

Notice that on average the memory consumption per operation is over 75%
smaller. The time per operation increased by 15%, which may actual be less
on long running applications, due to the decreased GC pressure and the
garbage collection costs.

Signed-off-by: Paulo Gomes <pjbgf@linux.com>
  • Loading branch information
pjbgf committed Jul 8, 2023
1 parent 69eb5af commit 60ef5df
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 50 deletions.
191 changes: 141 additions & 50 deletions plumbing/format/packfile/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package packfile
import (
"bytes"
"errors"
"fmt"
"io"

"github.com/go-git/go-git/v5/plumbing"
Expand Down Expand Up @@ -174,13 +175,25 @@ func (p *Parser) init() error {
return nil
}

type objectHeaderWriter func(typ plumbing.ObjectType, sz int64) error

type lazyObjectWriter interface {
// LazyWriter enables an object to be lazily written.
// It returns:
// - w: a writer to receive the object's content.
// - lwh: a func to write the object header.
// - err: any error from the initial writer creation process.
//
// Note that if the object header is not written BEFORE the writer
// is used, this will result in an invalid object.
LazyWriter() (w io.WriteCloser, lwh objectHeaderWriter, err error)
}

func (p *Parser) indexObjects() error {
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)

for i := uint32(0); i < p.count; i++ {
buf.Reset()

oh, err := p.scanner.NextObjectHeader()
if err != nil {
return err
Expand Down Expand Up @@ -220,21 +233,60 @@ func (p *Parser) indexObjects() error {
ota = newBaseObject(oh.Offset, oh.Length, t)
}

buf.Grow(int(oh.Length))
_, crc, err := p.scanner.NextObject(buf)
hasher := plumbing.NewHasher(oh.Type, oh.Length)
writers := []io.Writer{hasher}
var obj *plumbing.MemoryObject

// Lazy writing is only available for non-delta objects.
if p.storage != nil && !delta {
// When a storage is set and supports lazy writing,
// use that instead of creating a memory object.
if low, ok := p.storage.(lazyObjectWriter); ok {
ow, lwh, err := low.LazyWriter()
if err != nil {
return err
}

if err = lwh(oh.Type, oh.Length); err != nil {
return err
}

defer ow.Close()
writers = append(writers, ow)
} else {
obj = new(plumbing.MemoryObject)
obj.SetSize(oh.Length)
obj.SetType(oh.Type)

writers = append(writers, obj)
}
}
if delta && !p.scanner.IsSeekable {
buf.Reset()
buf.Grow(int(oh.Length))
writers = append(writers, buf)
}

mw := io.MultiWriter(writers...)

_, crc, err := p.scanner.NextObject(mw)
if err != nil {
return err
}

// Non delta objects needs to be added into the storage. This
// is only required when lazy writing is not supported.
if obj != nil {
if _, err := p.storage.SetEncodedObject(obj); err != nil {
return err
}
}

ota.Crc32 = crc
ota.Length = oh.Length

data := buf.Bytes()
if !delta {
sha1, err := getSHA1(ota.Type, data)
if err != nil {
return err
}
sha1 := hasher.Sum()

// Move children of placeholder parent into actual parent, in case this
// was a non-external delta reference.
Expand All @@ -249,20 +301,8 @@ func (p *Parser) indexObjects() error {
p.oiByHash[ota.SHA1] = ota
}

if p.storage != nil && !delta {
obj := new(plumbing.MemoryObject)
obj.SetSize(oh.Length)
obj.SetType(oh.Type)
if _, err := obj.Write(data); err != nil {
return err
}

if _, err := p.storage.SetEncodedObject(obj); err != nil {
return err
}
}

if delta && !p.scanner.IsSeekable {
data := buf.Bytes()
p.deltas[oh.Offset] = make([]byte, len(data))
copy(p.deltas[oh.Offset], data)
}
Expand All @@ -280,23 +320,29 @@ func (p *Parser) resolveDeltas() error {

for _, obj := range p.oi {
buf.Reset()
buf.Grow(int(obj.Length))
err := p.get(obj, buf)
if err != nil {
return err
}
content := buf.Bytes()

if err := p.onInflatedObjectHeader(obj.Type, obj.Length, obj.Offset); err != nil {
return err
}

if err := p.onInflatedObjectContent(obj.SHA1, obj.Offset, obj.Crc32, content); err != nil {
if err := p.onInflatedObjectContent(obj.SHA1, obj.Offset, obj.Crc32, nil); err != nil {
return err
}

if !obj.IsDelta() && len(obj.Children) > 0 {
// Dealing with an io.ReaderAt object, means we can
// create it once and reuse across all children.
r := bytes.NewReader(buf.Bytes())
for _, child := range obj.Children {
if err := p.resolveObject(io.Discard, child, content); err != nil {
// Even though we are discarding the output, we still need to read it to
// so that the scanner can advance to the next object, and the SHA1 can be
// calculated.
if err := p.resolveObject(io.Discard, child, r); err != nil {
return err
}
p.resolveExternalRef(child)
Expand Down Expand Up @@ -361,13 +407,13 @@ func (p *Parser) get(o *objectInfo, buf *bytes.Buffer) (err error) {
if o.DiskType.IsDelta() {
b := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(b)
buf.Grow(int(o.Length))
err := p.get(o.Parent, b)
if err != nil {
return err
}
base := b.Bytes()

err = p.resolveObject(buf, o, base)
err = p.resolveObject(buf, o, bytes.NewReader(b.Bytes()))
if err != nil {
return err
}
Expand All @@ -378,6 +424,9 @@ func (p *Parser) get(o *objectInfo, buf *bytes.Buffer) (err error) {
}
}

// If the scanner is seekable, caching this data into
// memory by offset seems wasteful.
// !p.scanner.IsSeekable &&
if len(o.Children) > 0 {
data := make([]byte, buf.Len())
copy(data, buf.Bytes())
Expand All @@ -386,10 +435,25 @@ func (p *Parser) get(o *objectInfo, buf *bytes.Buffer) (err error) {
return nil
}

// resolveObject resolves an object from base, using information
// provided by o.
//
// This call has the side-effect of changing field values
// from the object info o:
// - Type: OFSDeltaObject may become the target type (e.g. Blob).
// - Size: The size may be update with the target size.
// - Hash: Zero hashes will be calculated as part of the object
// resolution. Hence why this process can't be avoided even when w
// is an io.Discard.
//
// base must be an io.ReaderAt, which is a requirement from
// patchDeltaStream. The main reason being that reversing an
// delta object may lead to going backs and forths within base,
// which is not supported by io.Reader.
func (p *Parser) resolveObject(
w io.Writer,
o *objectInfo,
base []byte,
base io.ReaderAt,
) error {
if !o.DiskType.IsDelta() {
return nil
Expand All @@ -400,26 +464,46 @@ func (p *Parser) resolveObject(
if err != nil {
return err
}
data := buf.Bytes()

data, err = applyPatchBase(o, data, base)
writers := []io.Writer{w}
var obj *plumbing.MemoryObject
var lwh objectHeaderWriter

if p.storage != nil {
if low, ok := p.storage.(lazyObjectWriter); ok {
ow, wh, err := low.LazyWriter()
if err != nil {
return err
}
lwh = wh

defer ow.Close()
writers = append(writers, ow)
} else {
obj = new(plumbing.MemoryObject)
ow, err := obj.Writer()
if err != nil {
return err
}

writers = append(writers, ow)
}
}

mw := io.MultiWriter(writers...)

err = applyPatchBase(o, base, buf, mw, lwh)
if err != nil {
return err
}

if p.storage != nil {
obj := new(plumbing.MemoryObject)
obj.SetSize(o.Size())
if obj != nil {
obj.SetType(o.Type)
if _, err := obj.Write(data); err != nil {
return err
}

obj.SetSize(o.Size()) // Size here is correct as it was populated by applyPatchBase.
if _, err := p.storage.SetEncodedObject(obj); err != nil {
return err
}
}
_, err = w.Write(data)
return err
}

Expand All @@ -443,24 +527,31 @@ func (p *Parser) readData(w io.Writer, o *objectInfo) error {
return nil
}

func applyPatchBase(ota *objectInfo, data, base []byte) ([]byte, error) {
patched, err := PatchDelta(base, data)
if err != nil {
return nil, err
// applyPatchBase applies the patch to target.
//
// Note that ota will be updated based on the description in resolveObject.
func applyPatchBase(ota *objectInfo, base io.ReaderAt, delta io.Reader, target io.Writer, wh objectHeaderWriter) error {
if target == nil {
return fmt.Errorf("cannot apply patch against nil target")
}

typ := ota.Type
if ota.SHA1 == plumbing.ZeroHash {
ota.Type = ota.Parent.Type
sha1, err := getSHA1(ota.Type, patched)
if err != nil {
return nil, err
}
typ = ota.Parent.Type
}

sz, h, err := patchDeltaWriter(target, base, delta, typ, wh)
if err != nil {
return err
}

ota.SHA1 = sha1
ota.Length = int64(len(patched))
if ota.SHA1 == plumbing.ZeroHash {
ota.Type = typ
ota.Length = int64(sz)
ota.SHA1 = h
}

return patched, nil
return nil
}

func getSHA1(t plumbing.ObjectType, data []byte) (plumbing.Hash, error) {
Expand Down

0 comments on commit 60ef5df

Please sign in to comment.