From 978c7d761d8b9e878b9e58d4368e2b02d53fc83b Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 15 Sep 2020 13:10:01 -0700 Subject: [PATCH] [blog] binarylog: export Sink So the sink can be updated (instead of only being noop) --- .../binarylog_end2end_test.go | 12 +- binarylog/sink.go | 123 ++++++++++++++++++ internal/binarylog/method_logger.go | 2 +- internal/binarylog/method_logger_test.go | 2 +- internal/binarylog/sink.go | 93 +------------ 5 files changed, 138 insertions(+), 94 deletions(-) rename {internal/binarylog => binarylog}/binarylog_end2end_test.go (99%) create mode 100644 binarylog/sink.go diff --git a/internal/binarylog/binarylog_end2end_test.go b/binarylog/binarylog_end2end_test.go similarity index 99% rename from internal/binarylog/binarylog_end2end_test.go rename to binarylog/binarylog_end2end_test.go index 54dd3da7419..f4a9febc389 100644 --- a/internal/binarylog/binarylog_end2end_test.go +++ b/binarylog/binarylog_end2end_test.go @@ -29,11 +29,11 @@ import ( "time" "github.com/golang/protobuf/proto" - "google.golang.org/grpc" + "google.golang.org/grpc/binarylog" pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/internal/binarylog" + iblog "google.golang.org/grpc/internal/binarylog" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/metadata" testpb "google.golang.org/grpc/stats/grpc_testing" @@ -53,7 +53,7 @@ func Test(t *testing.T) { func init() { // Setting environment variable in tests doesn't work because of the init // orders. Set the loggers directly here. - binarylog.SetLogger(binarylog.AllLogger) + iblog.SetLogger(iblog.AllLogger) binarylog.SetDefaultSink(testSink) } @@ -503,7 +503,7 @@ func (ed *expectedData) newClientHeaderEntry(client bool, rpcID, inRPCID uint64) Logger: logger, Payload: &pb.GrpcLogEntry_ClientHeader{ ClientHeader: &pb.ClientHeader{ - Metadata: binarylog.MdToMetadataProto(testMetadata), + Metadata: iblog.MdToMetadataProto(testMetadata), MethodName: ed.method, Authority: ed.te.srvAddr, }, @@ -535,7 +535,7 @@ func (ed *expectedData) newServerHeaderEntry(client bool, rpcID, inRPCID uint64) Logger: logger, Payload: &pb.GrpcLogEntry_ServerHeader{ ServerHeader: &pb.ServerHeader{ - Metadata: binarylog.MdToMetadataProto(testMetadata), + Metadata: iblog.MdToMetadataProto(testMetadata), }, }, Peer: peer, @@ -643,7 +643,7 @@ func (ed *expectedData) newServerTrailerEntry(client bool, rpcID, inRPCID uint64 Logger: logger, Payload: &pb.GrpcLogEntry_Trailer{ Trailer: &pb.Trailer{ - Metadata: binarylog.MdToMetadataProto(testTrailerMetadata), + Metadata: iblog.MdToMetadataProto(testTrailerMetadata), // st will be nil if err was not a status error, but nil is ok. StatusCode: uint32(st.Code()), StatusMessage: st.Message(), diff --git a/binarylog/sink.go b/binarylog/sink.go new file mode 100644 index 00000000000..d122159f622 --- /dev/null +++ b/binarylog/sink.go @@ -0,0 +1,123 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package binarylog implementation binary logging as defined in +// https://github.com/grpc/proposal/blob/master/A16-binary-logging.md. +// +// Notice: All APIs in this package are experimental. +package binarylog + +import ( + "bufio" + "fmt" + "io" + "io/ioutil" + "sync" + "time" + + pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" + iblog "google.golang.org/grpc/internal/binarylog" +) + +// SetDefaultSink sets the sink where binary logs will be written to. +// +// Not thread safe. Only set during initialization. +func SetDefaultSink(s Sink) { + if iblog.DefaultSink != nil { + iblog.DefaultSink.Close() + } + iblog.DefaultSink = s +} + +// Sink writes log entry into the binary log sink. +type Sink interface { + // Write will be called to write the log entry into the sink. + // + // It should be thread-safe so it can be called in parallel. + Write(*pb.GrpcLogEntry) error + // Close will be called when the Sink is replaced by a new Sink. + Close() error +} + +type bufWriteCloserSink struct { + mu sync.Mutex + closer io.Closer + out Sink // out is built on buf. + buf *bufio.Writer // buf is kept for flush. + + writeStartOnce sync.Once + writeTicker *time.Ticker +} + +func (fs *bufWriteCloserSink) Write(e *pb.GrpcLogEntry) error { + // Start the write loop when Write is called. + fs.writeStartOnce.Do(fs.startFlushGoroutine) + fs.mu.Lock() + if err := fs.out.Write(e); err != nil { + fs.mu.Unlock() + return err + } + fs.mu.Unlock() + return nil +} + +const ( + bufFlushDuration = 60 * time.Second +) + +func (fs *bufWriteCloserSink) startFlushGoroutine() { + fs.writeTicker = time.NewTicker(bufFlushDuration) + go func() { + for range fs.writeTicker.C { + fs.mu.Lock() + fs.buf.Flush() + fs.mu.Unlock() + } + }() +} + +func (fs *bufWriteCloserSink) Close() error { + if fs.writeTicker != nil { + fs.writeTicker.Stop() + } + fs.mu.Lock() + fs.buf.Flush() + fs.closer.Close() + fs.out.Close() + fs.mu.Unlock() + return nil +} + +func newBufWriteCloserSink(o io.WriteCloser) Sink { + bufW := bufio.NewWriter(o) + return &bufWriteCloserSink{ + closer: o, + out: iblog.NewWriterSink(bufW), + buf: bufW, + } +} + +// NewTempFileSink creates a temp file and returns a Sink that writes to this +// file. +func NewTempFileSink() (Sink, error) { + tempFile, err := ioutil.TempFile("/tmp", "grpcgo_binarylog_*.txt") + if err != nil { + return nil, fmt.Errorf("failed to create temp file: %v", err) + } + return newBufWriteCloserSink(tempFile), nil +} diff --git a/internal/binarylog/method_logger.go b/internal/binarylog/method_logger.go index 5e1083539b4..0cdb4183150 100644 --- a/internal/binarylog/method_logger.go +++ b/internal/binarylog/method_logger.go @@ -65,7 +65,7 @@ func newMethodLogger(h, m uint64) *MethodLogger { callID: idGen.next(), idWithinCallGen: &callIDGenerator{}, - sink: defaultSink, // TODO(blog): make it plugable. + sink: DefaultSink, // TODO(blog): make it plugable. } } diff --git a/internal/binarylog/method_logger_test.go b/internal/binarylog/method_logger_test.go index a99360bd92d..866458a946a 100644 --- a/internal/binarylog/method_logger_test.go +++ b/internal/binarylog/method_logger_test.go @@ -37,7 +37,7 @@ func (s) TestLog(t *testing.T) { ml := newMethodLogger(10, 10) // Set sink to testing buffer. buf := bytes.NewBuffer(nil) - ml.sink = newWriterSink(buf) + ml.sink = NewWriterSink(buf) addr := "1.2.3.4" port := 790 diff --git a/internal/binarylog/sink.go b/internal/binarylog/sink.go index 835f51040cb..7c91f4f4fc6 100644 --- a/internal/binarylog/sink.go +++ b/internal/binarylog/sink.go @@ -19,33 +19,22 @@ package binarylog import ( - "bufio" "encoding/binary" - "fmt" "io" - "io/ioutil" - "sync" - "time" "github.com/golang/protobuf/proto" pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" ) var ( - defaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp). + // DefaultSink is the sink where the logs will be written to. It's exported + // for the binarylog package to update. + DefaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp). ) -// SetDefaultSink sets the sink where binary logs will be written to. -// -// Not thread safe. Only set during initialization. -func SetDefaultSink(s Sink) { - if defaultSink != nil { - defaultSink.Close() - } - defaultSink = s -} - // Sink writes log entry into the binary log sink. +// +// sink is a copy of the exported binarylog.Sink, to avoid circular dependency. type Sink interface { // Write will be called to write the log entry into the sink. // @@ -60,13 +49,13 @@ type noopSink struct{} func (ns *noopSink) Write(*pb.GrpcLogEntry) error { return nil } func (ns *noopSink) Close() error { return nil } -// newWriterSink creates a binary log sink with the given writer. +// NewWriterSink creates a binary log sink with the given writer. // // Write() marshals the proto message and writes it to the given writer. Each // message is prefixed with a 4 byte big endian unsigned integer as the length. // // No buffer is done, Close() doesn't try to close the writer. -func newWriterSink(w io.Writer) *writerSink { +func NewWriterSink(w io.Writer) Sink { return &writerSink{out: w} } @@ -91,71 +80,3 @@ func (ws *writerSink) Write(e *pb.GrpcLogEntry) error { } func (ws *writerSink) Close() error { return nil } - -type bufWriteCloserSink struct { - mu sync.Mutex - closer io.Closer - out *writerSink // out is built on buf. - buf *bufio.Writer // buf is kept for flush. - - writeStartOnce sync.Once - writeTicker *time.Ticker -} - -func (fs *bufWriteCloserSink) Write(e *pb.GrpcLogEntry) error { - // Start the write loop when Write is called. - fs.writeStartOnce.Do(fs.startFlushGoroutine) - fs.mu.Lock() - if err := fs.out.Write(e); err != nil { - fs.mu.Unlock() - return err - } - fs.mu.Unlock() - return nil -} - -const ( - bufFlushDuration = 60 * time.Second -) - -func (fs *bufWriteCloserSink) startFlushGoroutine() { - fs.writeTicker = time.NewTicker(bufFlushDuration) - go func() { - for range fs.writeTicker.C { - fs.mu.Lock() - fs.buf.Flush() - fs.mu.Unlock() - } - }() -} - -func (fs *bufWriteCloserSink) Close() error { - if fs.writeTicker != nil { - fs.writeTicker.Stop() - } - fs.mu.Lock() - fs.buf.Flush() - fs.closer.Close() - fs.out.Close() - fs.mu.Unlock() - return nil -} - -func newBufWriteCloserSink(o io.WriteCloser) Sink { - bufW := bufio.NewWriter(o) - return &bufWriteCloserSink{ - closer: o, - out: newWriterSink(bufW), - buf: bufW, - } -} - -// NewTempFileSink creates a temp file and returns a Sink that writes to this -// file. -func NewTempFileSink() (Sink, error) { - tempFile, err := ioutil.TempFile("/tmp", "grpcgo_binarylog_*.txt") - if err != nil { - return nil, fmt.Errorf("failed to create temp file: %v", err) - } - return newBufWriteCloserSink(tempFile), nil -}