From 400b4a0a6d5a2f8f2c917a9fb46174a4705015ae Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 22 Sep 2020 09:52:20 -0700 Subject: [PATCH] binarylog: export Sink (#3879) --- .../binarylog_end2end_test.go | 14 ++-- binarylog/sink.go | 68 +++++++++++++++++++ internal/binarylog/method_logger.go | 2 +- internal/binarylog/sink.go | 68 +++++++++---------- 4 files changed, 109 insertions(+), 43 deletions(-) rename {internal/binarylog => binarylog}/binarylog_end2end_test.go (98%) create mode 100644 binarylog/sink.go diff --git a/internal/binarylog/binarylog_end2end_test.go b/binarylog/binarylog_end2end_test.go similarity index 98% rename from internal/binarylog/binarylog_end2end_test.go rename to binarylog/binarylog_end2end_test.go index 54dd3da7419..b8caa828a68 100644 --- a/internal/binarylog/binarylog_end2end_test.go +++ b/binarylog/binarylog_end2end_test.go @@ -29,11 +29,11 @@ import ( "time" "github.com/golang/protobuf/proto" - "google.golang.org/grpc" + "google.golang.org/grpc/binarylog" pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/internal/binarylog" + iblog "google.golang.org/grpc/internal/binarylog" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/metadata" testpb "google.golang.org/grpc/stats/grpc_testing" @@ -53,8 +53,8 @@ func Test(t *testing.T) { func init() { // Setting environment variable in tests doesn't work because of the init // orders. Set the loggers directly here. - binarylog.SetLogger(binarylog.AllLogger) - binarylog.SetDefaultSink(testSink) + iblog.SetLogger(iblog.AllLogger) + binarylog.SetSink(testSink) } var testSink = &testBinLogSink{} @@ -503,7 +503,7 @@ func (ed *expectedData) newClientHeaderEntry(client bool, rpcID, inRPCID uint64) Logger: logger, Payload: &pb.GrpcLogEntry_ClientHeader{ ClientHeader: &pb.ClientHeader{ - Metadata: binarylog.MdToMetadataProto(testMetadata), + Metadata: iblog.MdToMetadataProto(testMetadata), MethodName: ed.method, Authority: ed.te.srvAddr, }, @@ -535,7 +535,7 @@ func (ed *expectedData) newServerHeaderEntry(client bool, rpcID, inRPCID uint64) Logger: logger, Payload: &pb.GrpcLogEntry_ServerHeader{ ServerHeader: &pb.ServerHeader{ - Metadata: binarylog.MdToMetadataProto(testMetadata), + Metadata: iblog.MdToMetadataProto(testMetadata), }, }, Peer: peer, @@ -643,7 +643,7 @@ func (ed *expectedData) newServerTrailerEntry(client bool, rpcID, inRPCID uint64 Logger: logger, Payload: &pb.GrpcLogEntry_Trailer{ Trailer: &pb.Trailer{ - Metadata: binarylog.MdToMetadataProto(testTrailerMetadata), + Metadata: iblog.MdToMetadataProto(testTrailerMetadata), // st will be nil if err was not a status error, but nil is ok. StatusCode: uint32(st.Code()), StatusMessage: st.Message(), diff --git a/binarylog/sink.go b/binarylog/sink.go new file mode 100644 index 00000000000..db79346a291 --- /dev/null +++ b/binarylog/sink.go @@ -0,0 +1,68 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package binarylog implementation binary logging as defined in +// https://github.com/grpc/proposal/blob/master/A16-binary-logging.md. +// +// Notice: All APIs in this package are experimental. +package binarylog + +import ( + "fmt" + "io/ioutil" + + pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" + iblog "google.golang.org/grpc/internal/binarylog" +) + +// SetSink sets the destination for the binary log entries. +// +// NOTE: this function must only be called during initialization time (i.e. in +// an init() function), and is not thread-safe. +func SetSink(s Sink) { + if iblog.DefaultSink != nil { + iblog.DefaultSink.Close() + } + iblog.DefaultSink = s +} + +// Sink represents the destination for the binary log entries. +type Sink interface { + // Write marshals the log entry and writes it to the destination. The format + // is not specified, but should have sufficient information to rebuild the + // entry. Some options are: proto bytes, or proto json. + // + // Note this function needs to be thread-safe. + Write(*pb.GrpcLogEntry) error + // Close closes this sink and cleans up resources (e.g. the flushing + // goroutine). + Close() error +} + +// NewTempFileSink creates a temp file and returns a Sink that writes to this +// file. +func NewTempFileSink() (Sink, error) { + // Two other options to replace this function: + // 1. take filename as input. + // 2. export NewBufferedSink(). + tempFile, err := ioutil.TempFile("/tmp", "grpcgo_binarylog_*.txt") + if err != nil { + return nil, fmt.Errorf("failed to create temp file: %v", err) + } + return iblog.NewBufferedSink(tempFile), nil +} diff --git a/internal/binarylog/method_logger.go b/internal/binarylog/method_logger.go index 5e1083539b4..0cdb4183150 100644 --- a/internal/binarylog/method_logger.go +++ b/internal/binarylog/method_logger.go @@ -65,7 +65,7 @@ func newMethodLogger(h, m uint64) *MethodLogger { callID: idGen.next(), idWithinCallGen: &callIDGenerator{}, - sink: defaultSink, // TODO(blog): make it plugable. + sink: DefaultSink, // TODO(blog): make it plugable. } } diff --git a/internal/binarylog/sink.go b/internal/binarylog/sink.go index 835f51040cb..7d7a3056b71 100644 --- a/internal/binarylog/sink.go +++ b/internal/binarylog/sink.go @@ -21,9 +21,7 @@ package binarylog import ( "bufio" "encoding/binary" - "fmt" "io" - "io/ioutil" "sync" "time" @@ -32,20 +30,14 @@ import ( ) var ( - defaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp). + // DefaultSink is the sink where the logs will be written to. It's exported + // for the binarylog package to update. + DefaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp). ) -// SetDefaultSink sets the sink where binary logs will be written to. -// -// Not thread safe. Only set during initialization. -func SetDefaultSink(s Sink) { - if defaultSink != nil { - defaultSink.Close() - } - defaultSink = s -} - // Sink writes log entry into the binary log sink. +// +// sink is a copy of the exported binarylog.Sink, to avoid circular dependency. type Sink interface { // Write will be called to write the log entry into the sink. // @@ -66,7 +58,7 @@ func (ns *noopSink) Close() error { return nil } // message is prefixed with a 4 byte big endian unsigned integer as the length. // // No buffer is done, Close() doesn't try to close the writer. -func newWriterSink(w io.Writer) *writerSink { +func newWriterSink(w io.Writer) Sink { return &writerSink{out: w} } @@ -92,17 +84,17 @@ func (ws *writerSink) Write(e *pb.GrpcLogEntry) error { func (ws *writerSink) Close() error { return nil } -type bufWriteCloserSink struct { +type bufferedSink struct { mu sync.Mutex closer io.Closer - out *writerSink // out is built on buf. + out Sink // out is built on buf. buf *bufio.Writer // buf is kept for flush. writeStartOnce sync.Once writeTicker *time.Ticker } -func (fs *bufWriteCloserSink) Write(e *pb.GrpcLogEntry) error { +func (fs *bufferedSink) Write(e *pb.GrpcLogEntry) error { // Start the write loop when Write is called. fs.writeStartOnce.Do(fs.startFlushGoroutine) fs.mu.Lock() @@ -118,44 +110,50 @@ const ( bufFlushDuration = 60 * time.Second ) -func (fs *bufWriteCloserSink) startFlushGoroutine() { +func (fs *bufferedSink) startFlushGoroutine() { fs.writeTicker = time.NewTicker(bufFlushDuration) go func() { for range fs.writeTicker.C { fs.mu.Lock() - fs.buf.Flush() + if err := fs.buf.Flush(); err != nil { + grpclogLogger.Warningf("failed to flush to Sink: %v", err) + } fs.mu.Unlock() } }() } -func (fs *bufWriteCloserSink) Close() error { +func (fs *bufferedSink) Close() error { if fs.writeTicker != nil { fs.writeTicker.Stop() } fs.mu.Lock() - fs.buf.Flush() - fs.closer.Close() - fs.out.Close() + if err := fs.buf.Flush(); err != nil { + grpclogLogger.Warningf("failed to flush to Sink: %v", err) + } + if err := fs.closer.Close(); err != nil { + grpclogLogger.Warningf("failed to close the underlying WriterCloser: %v", err) + } + if err := fs.out.Close(); err != nil { + grpclogLogger.Warningf("failed to close the Sink: %v", err) + } fs.mu.Unlock() return nil } -func newBufWriteCloserSink(o io.WriteCloser) Sink { +// NewBufferedSink creates a binary log sink with the given WriteCloser. +// +// Write() marshals the proto message and writes it to the given writer. Each +// message is prefixed with a 4 byte big endian unsigned integer as the length. +// +// Content is kept in a buffer, and is flushed every 60 seconds. +// +// Close closes the WriteCloser. +func NewBufferedSink(o io.WriteCloser) Sink { bufW := bufio.NewWriter(o) - return &bufWriteCloserSink{ + return &bufferedSink{ closer: o, out: newWriterSink(bufW), buf: bufW, } } - -// NewTempFileSink creates a temp file and returns a Sink that writes to this -// file. -func NewTempFileSink() (Sink, error) { - tempFile, err := ioutil.TempFile("/tmp", "grpcgo_binarylog_*.txt") - if err != nil { - return nil, fmt.Errorf("failed to create temp file: %v", err) - } - return newBufWriteCloserSink(tempFile), nil -}