Skip to content

Commit

Permalink
binarylog: export Sink (#3879)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Sep 22, 2020
1 parent d81def4 commit 400b4a0
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 43 deletions.
Expand Up @@ -29,11 +29,11 @@ import (
"time"

"github.com/golang/protobuf/proto"

"google.golang.org/grpc"
"google.golang.org/grpc/binarylog"
pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/binarylog"
iblog "google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/metadata"
testpb "google.golang.org/grpc/stats/grpc_testing"
Expand All @@ -53,8 +53,8 @@ func Test(t *testing.T) {
func init() {
// Setting environment variable in tests doesn't work because of the init
// orders. Set the loggers directly here.
binarylog.SetLogger(binarylog.AllLogger)
binarylog.SetDefaultSink(testSink)
iblog.SetLogger(iblog.AllLogger)
binarylog.SetSink(testSink)
}

var testSink = &testBinLogSink{}
Expand Down Expand Up @@ -503,7 +503,7 @@ func (ed *expectedData) newClientHeaderEntry(client bool, rpcID, inRPCID uint64)
Logger: logger,
Payload: &pb.GrpcLogEntry_ClientHeader{
ClientHeader: &pb.ClientHeader{
Metadata: binarylog.MdToMetadataProto(testMetadata),
Metadata: iblog.MdToMetadataProto(testMetadata),
MethodName: ed.method,
Authority: ed.te.srvAddr,
},
Expand Down Expand Up @@ -535,7 +535,7 @@ func (ed *expectedData) newServerHeaderEntry(client bool, rpcID, inRPCID uint64)
Logger: logger,
Payload: &pb.GrpcLogEntry_ServerHeader{
ServerHeader: &pb.ServerHeader{
Metadata: binarylog.MdToMetadataProto(testMetadata),
Metadata: iblog.MdToMetadataProto(testMetadata),
},
},
Peer: peer,
Expand Down Expand Up @@ -643,7 +643,7 @@ func (ed *expectedData) newServerTrailerEntry(client bool, rpcID, inRPCID uint64
Logger: logger,
Payload: &pb.GrpcLogEntry_Trailer{
Trailer: &pb.Trailer{
Metadata: binarylog.MdToMetadataProto(testTrailerMetadata),
Metadata: iblog.MdToMetadataProto(testTrailerMetadata),
// st will be nil if err was not a status error, but nil is ok.
StatusCode: uint32(st.Code()),
StatusMessage: st.Message(),
Expand Down
68 changes: 68 additions & 0 deletions binarylog/sink.go
@@ -0,0 +1,68 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package binarylog implementation binary logging as defined in
// https://github.com/grpc/proposal/blob/master/A16-binary-logging.md.
//
// Notice: All APIs in this package are experimental.
package binarylog

import (
"fmt"
"io/ioutil"

pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
iblog "google.golang.org/grpc/internal/binarylog"
)

// SetSink sets the destination for the binary log entries.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe.
func SetSink(s Sink) {
if iblog.DefaultSink != nil {
iblog.DefaultSink.Close()
}
iblog.DefaultSink = s
}

// Sink represents the destination for the binary log entries.
type Sink interface {
// Write marshals the log entry and writes it to the destination. The format
// is not specified, but should have sufficient information to rebuild the
// entry. Some options are: proto bytes, or proto json.
//
// Note this function needs to be thread-safe.
Write(*pb.GrpcLogEntry) error
// Close closes this sink and cleans up resources (e.g. the flushing
// goroutine).
Close() error
}

// NewTempFileSink creates a temp file and returns a Sink that writes to this
// file.
func NewTempFileSink() (Sink, error) {
// Two other options to replace this function:
// 1. take filename as input.
// 2. export NewBufferedSink().
tempFile, err := ioutil.TempFile("/tmp", "grpcgo_binarylog_*.txt")
if err != nil {
return nil, fmt.Errorf("failed to create temp file: %v", err)
}
return iblog.NewBufferedSink(tempFile), nil
}
2 changes: 1 addition & 1 deletion internal/binarylog/method_logger.go
Expand Up @@ -65,7 +65,7 @@ func newMethodLogger(h, m uint64) *MethodLogger {
callID: idGen.next(),
idWithinCallGen: &callIDGenerator{},

sink: defaultSink, // TODO(blog): make it plugable.
sink: DefaultSink, // TODO(blog): make it plugable.
}
}

Expand Down
68 changes: 33 additions & 35 deletions internal/binarylog/sink.go
Expand Up @@ -21,9 +21,7 @@ package binarylog
import (
"bufio"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"sync"
"time"

Expand All @@ -32,20 +30,14 @@ import (
)

var (
defaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
// DefaultSink is the sink where the logs will be written to. It's exported
// for the binarylog package to update.
DefaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
)

// SetDefaultSink sets the sink where binary logs will be written to.
//
// Not thread safe. Only set during initialization.
func SetDefaultSink(s Sink) {
if defaultSink != nil {
defaultSink.Close()
}
defaultSink = s
}

// Sink writes log entry into the binary log sink.
//
// sink is a copy of the exported binarylog.Sink, to avoid circular dependency.
type Sink interface {
// Write will be called to write the log entry into the sink.
//
Expand All @@ -66,7 +58,7 @@ func (ns *noopSink) Close() error { return nil }
// message is prefixed with a 4 byte big endian unsigned integer as the length.
//
// No buffer is done, Close() doesn't try to close the writer.
func newWriterSink(w io.Writer) *writerSink {
func newWriterSink(w io.Writer) Sink {
return &writerSink{out: w}
}

Expand All @@ -92,17 +84,17 @@ func (ws *writerSink) Write(e *pb.GrpcLogEntry) error {

func (ws *writerSink) Close() error { return nil }

type bufWriteCloserSink struct {
type bufferedSink struct {
mu sync.Mutex
closer io.Closer
out *writerSink // out is built on buf.
out Sink // out is built on buf.
buf *bufio.Writer // buf is kept for flush.

writeStartOnce sync.Once
writeTicker *time.Ticker
}

func (fs *bufWriteCloserSink) Write(e *pb.GrpcLogEntry) error {
func (fs *bufferedSink) Write(e *pb.GrpcLogEntry) error {
// Start the write loop when Write is called.
fs.writeStartOnce.Do(fs.startFlushGoroutine)
fs.mu.Lock()
Expand All @@ -118,44 +110,50 @@ const (
bufFlushDuration = 60 * time.Second
)

func (fs *bufWriteCloserSink) startFlushGoroutine() {
func (fs *bufferedSink) startFlushGoroutine() {
fs.writeTicker = time.NewTicker(bufFlushDuration)
go func() {
for range fs.writeTicker.C {
fs.mu.Lock()
fs.buf.Flush()
if err := fs.buf.Flush(); err != nil {
grpclogLogger.Warningf("failed to flush to Sink: %v", err)
}
fs.mu.Unlock()
}
}()
}

func (fs *bufWriteCloserSink) Close() error {
func (fs *bufferedSink) Close() error {
if fs.writeTicker != nil {
fs.writeTicker.Stop()
}
fs.mu.Lock()
fs.buf.Flush()
fs.closer.Close()
fs.out.Close()
if err := fs.buf.Flush(); err != nil {
grpclogLogger.Warningf("failed to flush to Sink: %v", err)
}
if err := fs.closer.Close(); err != nil {
grpclogLogger.Warningf("failed to close the underlying WriterCloser: %v", err)
}
if err := fs.out.Close(); err != nil {
grpclogLogger.Warningf("failed to close the Sink: %v", err)
}
fs.mu.Unlock()
return nil
}

func newBufWriteCloserSink(o io.WriteCloser) Sink {
// NewBufferedSink creates a binary log sink with the given WriteCloser.
//
// Write() marshals the proto message and writes it to the given writer. Each
// message is prefixed with a 4 byte big endian unsigned integer as the length.
//
// Content is kept in a buffer, and is flushed every 60 seconds.
//
// Close closes the WriteCloser.
func NewBufferedSink(o io.WriteCloser) Sink {
bufW := bufio.NewWriter(o)
return &bufWriteCloserSink{
return &bufferedSink{
closer: o,
out: newWriterSink(bufW),
buf: bufW,
}
}

// NewTempFileSink creates a temp file and returns a Sink that writes to this
// file.
func NewTempFileSink() (Sink, error) {
tempFile, err := ioutil.TempFile("/tmp", "grpcgo_binarylog_*.txt")
if err != nil {
return nil, fmt.Errorf("failed to create temp file: %v", err)
}
return newBufWriteCloserSink(tempFile), nil
}

0 comments on commit 400b4a0

Please sign in to comment.