Skip to content

Commit

Permalink
[blog] binarylog: export Sink
Browse files Browse the repository at this point in the history
So the sink can be updated (instead of only being noop)
  • Loading branch information
menghanl committed Sep 16, 2020
1 parent ff9dd65 commit 978c7d7
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 94 deletions.
Expand Up @@ -29,11 +29,11 @@ import (
"time"

"github.com/golang/protobuf/proto"

"google.golang.org/grpc"
"google.golang.org/grpc/binarylog"
pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/binarylog"
iblog "google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/metadata"
testpb "google.golang.org/grpc/stats/grpc_testing"
Expand All @@ -53,7 +53,7 @@ func Test(t *testing.T) {
func init() {
// Setting environment variable in tests doesn't work because of the init
// orders. Set the loggers directly here.
binarylog.SetLogger(binarylog.AllLogger)
iblog.SetLogger(iblog.AllLogger)
binarylog.SetDefaultSink(testSink)
}

Expand Down Expand Up @@ -503,7 +503,7 @@ func (ed *expectedData) newClientHeaderEntry(client bool, rpcID, inRPCID uint64)
Logger: logger,
Payload: &pb.GrpcLogEntry_ClientHeader{
ClientHeader: &pb.ClientHeader{
Metadata: binarylog.MdToMetadataProto(testMetadata),
Metadata: iblog.MdToMetadataProto(testMetadata),
MethodName: ed.method,
Authority: ed.te.srvAddr,
},
Expand Down Expand Up @@ -535,7 +535,7 @@ func (ed *expectedData) newServerHeaderEntry(client bool, rpcID, inRPCID uint64)
Logger: logger,
Payload: &pb.GrpcLogEntry_ServerHeader{
ServerHeader: &pb.ServerHeader{
Metadata: binarylog.MdToMetadataProto(testMetadata),
Metadata: iblog.MdToMetadataProto(testMetadata),
},
},
Peer: peer,
Expand Down Expand Up @@ -643,7 +643,7 @@ func (ed *expectedData) newServerTrailerEntry(client bool, rpcID, inRPCID uint64
Logger: logger,
Payload: &pb.GrpcLogEntry_Trailer{
Trailer: &pb.Trailer{
Metadata: binarylog.MdToMetadataProto(testTrailerMetadata),
Metadata: iblog.MdToMetadataProto(testTrailerMetadata),
// st will be nil if err was not a status error, but nil is ok.
StatusCode: uint32(st.Code()),
StatusMessage: st.Message(),
Expand Down
123 changes: 123 additions & 0 deletions binarylog/sink.go
@@ -0,0 +1,123 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package binarylog implementation binary logging as defined in
// https://github.com/grpc/proposal/blob/master/A16-binary-logging.md.
//
// Notice: All APIs in this package are experimental.
package binarylog

import (
"bufio"
"fmt"
"io"
"io/ioutil"
"sync"
"time"

pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
iblog "google.golang.org/grpc/internal/binarylog"
)

// SetDefaultSink sets the sink where binary logs will be written to.
//
// Not thread safe. Only set during initialization.
func SetDefaultSink(s Sink) {
if iblog.DefaultSink != nil {
iblog.DefaultSink.Close()
}
iblog.DefaultSink = s
}

// Sink writes log entry into the binary log sink.
type Sink interface {
// Write will be called to write the log entry into the sink.
//
// It should be thread-safe so it can be called in parallel.
Write(*pb.GrpcLogEntry) error
// Close will be called when the Sink is replaced by a new Sink.
Close() error
}

type bufWriteCloserSink struct {
mu sync.Mutex
closer io.Closer
out Sink // out is built on buf.
buf *bufio.Writer // buf is kept for flush.

writeStartOnce sync.Once
writeTicker *time.Ticker
}

func (fs *bufWriteCloserSink) Write(e *pb.GrpcLogEntry) error {
// Start the write loop when Write is called.
fs.writeStartOnce.Do(fs.startFlushGoroutine)
fs.mu.Lock()
if err := fs.out.Write(e); err != nil {
fs.mu.Unlock()
return err
}
fs.mu.Unlock()
return nil
}

const (
bufFlushDuration = 60 * time.Second
)

func (fs *bufWriteCloserSink) startFlushGoroutine() {
fs.writeTicker = time.NewTicker(bufFlushDuration)
go func() {
for range fs.writeTicker.C {
fs.mu.Lock()
fs.buf.Flush()
fs.mu.Unlock()
}
}()
}

func (fs *bufWriteCloserSink) Close() error {
if fs.writeTicker != nil {
fs.writeTicker.Stop()
}
fs.mu.Lock()
fs.buf.Flush()
fs.closer.Close()
fs.out.Close()
fs.mu.Unlock()
return nil
}

func newBufWriteCloserSink(o io.WriteCloser) Sink {
bufW := bufio.NewWriter(o)
return &bufWriteCloserSink{
closer: o,
out: iblog.NewWriterSink(bufW),
buf: bufW,
}
}

// NewTempFileSink creates a temp file and returns a Sink that writes to this
// file.
func NewTempFileSink() (Sink, error) {
tempFile, err := ioutil.TempFile("/tmp", "grpcgo_binarylog_*.txt")
if err != nil {
return nil, fmt.Errorf("failed to create temp file: %v", err)
}
return newBufWriteCloserSink(tempFile), nil
}
2 changes: 1 addition & 1 deletion internal/binarylog/method_logger.go
Expand Up @@ -65,7 +65,7 @@ func newMethodLogger(h, m uint64) *MethodLogger {
callID: idGen.next(),
idWithinCallGen: &callIDGenerator{},

sink: defaultSink, // TODO(blog): make it plugable.
sink: DefaultSink, // TODO(blog): make it plugable.
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/binarylog/method_logger_test.go
Expand Up @@ -37,7 +37,7 @@ func (s) TestLog(t *testing.T) {
ml := newMethodLogger(10, 10)
// Set sink to testing buffer.
buf := bytes.NewBuffer(nil)
ml.sink = newWriterSink(buf)
ml.sink = NewWriterSink(buf)

addr := "1.2.3.4"
port := 790
Expand Down
93 changes: 7 additions & 86 deletions internal/binarylog/sink.go
Expand Up @@ -19,33 +19,22 @@
package binarylog

import (
"bufio"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"sync"
"time"

"github.com/golang/protobuf/proto"
pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
)

var (
defaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
// DefaultSink is the sink where the logs will be written to. It's exported
// for the binarylog package to update.
DefaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
)

// SetDefaultSink sets the sink where binary logs will be written to.
//
// Not thread safe. Only set during initialization.
func SetDefaultSink(s Sink) {
if defaultSink != nil {
defaultSink.Close()
}
defaultSink = s
}

// Sink writes log entry into the binary log sink.
//
// sink is a copy of the exported binarylog.Sink, to avoid circular dependency.
type Sink interface {
// Write will be called to write the log entry into the sink.
//
Expand All @@ -60,13 +49,13 @@ type noopSink struct{}
func (ns *noopSink) Write(*pb.GrpcLogEntry) error { return nil }
func (ns *noopSink) Close() error { return nil }

// newWriterSink creates a binary log sink with the given writer.
// NewWriterSink creates a binary log sink with the given writer.
//
// Write() marshals the proto message and writes it to the given writer. Each
// message is prefixed with a 4 byte big endian unsigned integer as the length.
//
// No buffer is done, Close() doesn't try to close the writer.
func newWriterSink(w io.Writer) *writerSink {
func NewWriterSink(w io.Writer) Sink {
return &writerSink{out: w}
}

Expand All @@ -91,71 +80,3 @@ func (ws *writerSink) Write(e *pb.GrpcLogEntry) error {
}

func (ws *writerSink) Close() error { return nil }

type bufWriteCloserSink struct {
mu sync.Mutex
closer io.Closer
out *writerSink // out is built on buf.
buf *bufio.Writer // buf is kept for flush.

writeStartOnce sync.Once
writeTicker *time.Ticker
}

func (fs *bufWriteCloserSink) Write(e *pb.GrpcLogEntry) error {
// Start the write loop when Write is called.
fs.writeStartOnce.Do(fs.startFlushGoroutine)
fs.mu.Lock()
if err := fs.out.Write(e); err != nil {
fs.mu.Unlock()
return err
}
fs.mu.Unlock()
return nil
}

const (
bufFlushDuration = 60 * time.Second
)

func (fs *bufWriteCloserSink) startFlushGoroutine() {
fs.writeTicker = time.NewTicker(bufFlushDuration)
go func() {
for range fs.writeTicker.C {
fs.mu.Lock()
fs.buf.Flush()
fs.mu.Unlock()
}
}()
}

func (fs *bufWriteCloserSink) Close() error {
if fs.writeTicker != nil {
fs.writeTicker.Stop()
}
fs.mu.Lock()
fs.buf.Flush()
fs.closer.Close()
fs.out.Close()
fs.mu.Unlock()
return nil
}

func newBufWriteCloserSink(o io.WriteCloser) Sink {
bufW := bufio.NewWriter(o)
return &bufWriteCloserSink{
closer: o,
out: newWriterSink(bufW),
buf: bufW,
}
}

// NewTempFileSink creates a temp file and returns a Sink that writes to this
// file.
func NewTempFileSink() (Sink, error) {
tempFile, err := ioutil.TempFile("/tmp", "grpcgo_binarylog_*.txt")
if err != nil {
return nil, fmt.Errorf("failed to create temp file: %v", err)
}
return newBufWriteCloserSink(tempFile), nil
}

0 comments on commit 978c7d7

Please sign in to comment.