Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plumbing: Optimise memory consumption for filesystem storage #799

Merged
merged 2 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
195 changes: 145 additions & 50 deletions plumbing/format/packfile/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package packfile
import (
"bytes"
"errors"
"fmt"
"io"

"github.com/go-git/go-git/v5/plumbing"
Expand Down Expand Up @@ -174,13 +175,25 @@ func (p *Parser) init() error {
return nil
}

type objectHeaderWriter func(typ plumbing.ObjectType, sz int64) error

type lazyObjectWriter interface {
// LazyWriter enables an object to be lazily written.
// It returns:
// - w: a writer to receive the object's content.
// - lwh: a func to write the object header.
// - err: any error from the initial writer creation process.
//
// Note that if the object header is not written BEFORE the writer
// is used, this will result in an invalid object.
LazyWriter() (w io.WriteCloser, lwh objectHeaderWriter, err error)
}

func (p *Parser) indexObjects() error {
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)

for i := uint32(0); i < p.count; i++ {
buf.Reset()

oh, err := p.scanner.NextObjectHeader()
if err != nil {
return err
Expand Down Expand Up @@ -220,21 +233,60 @@ func (p *Parser) indexObjects() error {
ota = newBaseObject(oh.Offset, oh.Length, t)
}

buf.Grow(int(oh.Length))
_, crc, err := p.scanner.NextObject(buf)
hasher := plumbing.NewHasher(oh.Type, oh.Length)
writers := []io.Writer{hasher}
var obj *plumbing.MemoryObject

// Lazy writing is only available for non-delta objects.
if p.storage != nil && !delta {
// When a storage is set and supports lazy writing,
// use that instead of creating a memory object.
if low, ok := p.storage.(lazyObjectWriter); ok {
ow, lwh, err := low.LazyWriter()
if err != nil {
return err
}

if err = lwh(oh.Type, oh.Length); err != nil {
return err
}

defer ow.Close()
writers = append(writers, ow)
} else {
obj = new(plumbing.MemoryObject)
obj.SetSize(oh.Length)
obj.SetType(oh.Type)

writers = append(writers, obj)
}
}
if delta && !p.scanner.IsSeekable {
buf.Reset()
buf.Grow(int(oh.Length))
writers = append(writers, buf)
}

mw := io.MultiWriter(writers...)

_, crc, err := p.scanner.NextObject(mw)
if err != nil {
return err
}

// Non delta objects needs to be added into the storage. This
// is only required when lazy writing is not supported.
if obj != nil {
if _, err := p.storage.SetEncodedObject(obj); err != nil {
return err
}
}

ota.Crc32 = crc
ota.Length = oh.Length

data := buf.Bytes()
if !delta {
sha1, err := getSHA1(ota.Type, data)
if err != nil {
return err
}
sha1 := hasher.Sum()

// Move children of placeholder parent into actual parent, in case this
// was a non-external delta reference.
Expand All @@ -249,20 +301,8 @@ func (p *Parser) indexObjects() error {
p.oiByHash[ota.SHA1] = ota
}

if p.storage != nil && !delta {
obj := new(plumbing.MemoryObject)
obj.SetSize(oh.Length)
obj.SetType(oh.Type)
if _, err := obj.Write(data); err != nil {
return err
}

if _, err := p.storage.SetEncodedObject(obj); err != nil {
return err
}
}

if delta && !p.scanner.IsSeekable {
data := buf.Bytes()
p.deltas[oh.Offset] = make([]byte, len(data))
copy(p.deltas[oh.Offset], data)
}
Expand All @@ -280,23 +320,29 @@ func (p *Parser) resolveDeltas() error {

for _, obj := range p.oi {
buf.Reset()
buf.Grow(int(obj.Length))
err := p.get(obj, buf)
if err != nil {
return err
}
content := buf.Bytes()

if err := p.onInflatedObjectHeader(obj.Type, obj.Length, obj.Offset); err != nil {
return err
}

if err := p.onInflatedObjectContent(obj.SHA1, obj.Offset, obj.Crc32, content); err != nil {
if err := p.onInflatedObjectContent(obj.SHA1, obj.Offset, obj.Crc32, nil); err != nil {
return err
}

if !obj.IsDelta() && len(obj.Children) > 0 {
// Dealing with an io.ReaderAt object, means we can
// create it once and reuse across all children.
r := bytes.NewReader(buf.Bytes())
for _, child := range obj.Children {
if err := p.resolveObject(io.Discard, child, content); err != nil {
// Even though we are discarding the output, we still need to read it to
// so that the scanner can advance to the next object, and the SHA1 can be
// calculated.
if err := p.resolveObject(io.Discard, child, r); err != nil {
return err
}
p.resolveExternalRef(child)
Expand Down Expand Up @@ -361,13 +407,13 @@ func (p *Parser) get(o *objectInfo, buf *bytes.Buffer) (err error) {
if o.DiskType.IsDelta() {
b := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(b)
buf.Grow(int(o.Length))
err := p.get(o.Parent, b)
if err != nil {
return err
}
base := b.Bytes()

err = p.resolveObject(buf, o, base)
err = p.resolveObject(buf, o, bytes.NewReader(b.Bytes()))
if err != nil {
return err
}
Expand All @@ -378,6 +424,13 @@ func (p *Parser) get(o *objectInfo, buf *bytes.Buffer) (err error) {
}
}

// If the scanner is seekable, caching this data into
// memory by offset seems wasteful.
// There is a trade-off to be considered here in terms
// of execution time vs memory consumption.
//
// TODO: improve seekable execution time, so that we can
// skip this cache.
if len(o.Children) > 0 {
data := make([]byte, buf.Len())
copy(data, buf.Bytes())
Expand All @@ -386,10 +439,25 @@ func (p *Parser) get(o *objectInfo, buf *bytes.Buffer) (err error) {
return nil
}

// resolveObject resolves an object from base, using information
// provided by o.
//
// This call has the side-effect of changing field values
// from the object info o:
// - Type: OFSDeltaObject may become the target type (e.g. Blob).
// - Size: The size may be update with the target size.
// - Hash: Zero hashes will be calculated as part of the object
// resolution. Hence why this process can't be avoided even when w
// is an io.Discard.
//
// base must be an io.ReaderAt, which is a requirement from
// patchDeltaStream. The main reason being that reversing an
// delta object may lead to going backs and forths within base,
// which is not supported by io.Reader.
func (p *Parser) resolveObject(
w io.Writer,
o *objectInfo,
base []byte,
base io.ReaderAt,
) error {
if !o.DiskType.IsDelta() {
return nil
Expand All @@ -400,26 +468,46 @@ func (p *Parser) resolveObject(
if err != nil {
return err
}
data := buf.Bytes()

data, err = applyPatchBase(o, data, base)
writers := []io.Writer{w}
var obj *plumbing.MemoryObject
var lwh objectHeaderWriter

if p.storage != nil {
if low, ok := p.storage.(lazyObjectWriter); ok {
ow, wh, err := low.LazyWriter()
if err != nil {
return err
}
lwh = wh

defer ow.Close()
writers = append(writers, ow)
} else {
obj = new(plumbing.MemoryObject)
ow, err := obj.Writer()
if err != nil {
return err
}

writers = append(writers, ow)
}
}

mw := io.MultiWriter(writers...)

err = applyPatchBase(o, base, buf, mw, lwh)
if err != nil {
return err
}

if p.storage != nil {
obj := new(plumbing.MemoryObject)
obj.SetSize(o.Size())
if obj != nil {
obj.SetType(o.Type)
if _, err := obj.Write(data); err != nil {
return err
}

obj.SetSize(o.Size()) // Size here is correct as it was populated by applyPatchBase.
if _, err := p.storage.SetEncodedObject(obj); err != nil {
return err
}
}
_, err = w.Write(data)
return err
}

Expand All @@ -443,24 +531,31 @@ func (p *Parser) readData(w io.Writer, o *objectInfo) error {
return nil
}

func applyPatchBase(ota *objectInfo, data, base []byte) ([]byte, error) {
patched, err := PatchDelta(base, data)
if err != nil {
return nil, err
// applyPatchBase applies the patch to target.
//
// Note that ota will be updated based on the description in resolveObject.
func applyPatchBase(ota *objectInfo, base io.ReaderAt, delta io.Reader, target io.Writer, wh objectHeaderWriter) error {
if target == nil {
return fmt.Errorf("cannot apply patch against nil target")
}

typ := ota.Type
if ota.SHA1 == plumbing.ZeroHash {
ota.Type = ota.Parent.Type
sha1, err := getSHA1(ota.Type, patched)
if err != nil {
return nil, err
}
typ = ota.Parent.Type
}

sz, h, err := patchDeltaWriter(target, base, delta, typ, wh)
if err != nil {
return err
}

ota.SHA1 = sha1
ota.Length = int64(len(patched))
if ota.SHA1 == plumbing.ZeroHash {
ota.Type = typ
ota.Length = int64(sz)
ota.SHA1 = h
}

return patched, nil
return nil
}

func getSHA1(t plumbing.ObjectType, data []byte) (plumbing.Hash, error) {
Expand Down