Skip to content

Commit

Permalink
hooks: PATCH V10
Browse files Browse the repository at this point in the history
  • Loading branch information
Adhityaa Chandrasekar committed Dec 6, 2019
1 parent fe71f4a commit 28093c8
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 96 deletions.
8 changes: 4 additions & 4 deletions internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
wq: h.wq,
}
l.estdStreams[h.streamID] = str
h.stat.Egress(timer)
timer.Egress()
return nil
}

Expand Down Expand Up @@ -704,7 +704,7 @@ func (l *loopyWriter) preprocessData(df *dataFrame) error {
timer := df.stat.NewTimer("/http2/send/dataFrame/loopyWriter/preprocess")
str, ok := l.estdStreams[df.streamID]
if !ok {
df.stat.Egress(timer)
timer.Egress()
return nil
}
// If we got data for a stream it means that
Expand All @@ -714,7 +714,7 @@ func (l *loopyWriter) preprocessData(df *dataFrame) error {
str.state = active
l.activeStreams.enqueue(str)
}
df.stat.Egress(timer)
timer.Egress()
return nil
}

Expand Down Expand Up @@ -845,7 +845,7 @@ func (l *loopyWriter) processData() (bool, error) {
// maximum possilbe HTTP2 frame size.

if dataItem.stat != nil {
defer dataItem.stat.Egress(dataItem.stat.NewTimer("/http2/send/dataFrame/loopyWriter"))
defer dataItem.stat.NewTimer("/http2/send/dataFrame/loopyWriter").Egress()
}

if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
Expand Down
13 changes: 7 additions & 6 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,14 +568,15 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
if profiling.IsEnabled() {
s.stat = profiling.NewStat("client")
s.stat.Metadata = make([]byte, 12)
binary.BigEndian.PutUint64(s.stat.Metadata[0:8], t.connectionCounter)
binary.BigEndian.PutUint64(s.stat.Metadata[0:8], t.connectionID)
// Stream ID will be set when loopy writer actually establishes the stream
// and obtains a stream ID
profiling.StreamStats.Push(s.stat)
// Usually, performance suffers when we used defers to record egress out of
// functions when profiling is disabled, but it's fine here because this is
// executed only when profliing is enabled.
defer s.stat.Egress(s.stat.AppendTimer(timer))
s.stat.AppendTimer(timer)
defer timer.Egress()
}

cleanup := func(err error) {
Expand Down Expand Up @@ -985,11 +986,11 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
if s == nil {
return
}
timerIdx := s.stat.AppendTimer(timer)
s.stat.AppendTimer(timer)
if size > 0 {
if err := s.fc.onData(size); err != nil {
t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
s.stat.Egress(timerIdx)
timer.Egress()
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
Expand All @@ -1012,7 +1013,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
}
s.stat.Egress(timerIdx)
timer.Egress()
}

func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
Expand Down Expand Up @@ -1200,7 +1201,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}

if s.stat != nil {
defer s.stat.Egress(s.stat.NewTimer("/http2/recv/header/loopyReader"))
defer s.stat.NewTimer("/http2/recv/header/loopyReader").Egress()
}

endStream := frame.StreamEnded()
Expand Down
19 changes: 8 additions & 11 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,10 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
// server, so let's create an associated Stat object.
s.stat = profiling.NewStat("server")
s.stat.Metadata = make([]byte, 12)
binary.BigEndian.PutUint64(s.stat.Metadata[0:8], t.connectionCounter)
binary.BigEndian.PutUint64(s.stat.Metadata[0:8], t.connectionID)
binary.BigEndian.PutUint32(s.stat.Metadata[8:12], streamID)
profiling.StreamStats.Push(s.stat)
defer s.stat.Egress(s.stat.NewTimer("/http2/recv/header"))
defer s.stat.NewTimer("/http2/recv/header").Egress()
}

if frame.StreamEnded() {
Expand Down Expand Up @@ -628,11 +628,11 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
if !ok {
return
}
timerIdx := s.stat.AppendTimer(timer)
s.stat.AppendTimer(timer)
if size > 0 {
if err := s.fc.onData(size); err != nil {
t.closeStream(s, true, http2.ErrCodeFlowControl, false)
s.stat.Egress(timerIdx)
timer.Egress()
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
Expand All @@ -655,7 +655,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
s.compareAndSwapState(streamActive, streamReadDone)
s.write(recvMsg{err: io.EOF})
}
s.stat.Egress(timerIdx)
timer.Egress()
}

func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
Expand Down Expand Up @@ -853,9 +853,10 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
// OK is adopted.
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
timer := s.stat.NewTimer("/WriteStatus")
if s.stat != nil {
defer s.stat.NewTimer("/WriteStatus").Egress()
}
if s.getState() == streamDone {
s.stat.Egress(timer)
return nil
}
s.hdrMu.Lock()
Expand All @@ -866,7 +867,6 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
if len(s.header) > 0 { // Send a separate header frame.
if err := t.writeHeaderLocked(s); err != nil {
s.hdrMu.Unlock()
s.stat.Egress(timer)
return err
}
} else { // Send a trailer only response.
Expand Down Expand Up @@ -899,11 +899,9 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
if !success {
if err != nil {
s.stat.Egress(timer)
return err
}
t.closeStream(s, true, http2.ErrCodeInternal, false)
s.stat.Egress(timer)
return ErrHeaderListSizeLimitViolation
}
// Send a RST_STREAM after the trailers if the client has not already half-closed.
Expand All @@ -912,7 +910,6 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
if t.stats != nil {
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
}
s.stat.Egress(timer)
return nil
}

Expand Down
47 changes: 24 additions & 23 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,10 @@ type parser struct {
func (p *parser) recvMsg(maxReceiveMessageSize int, stat *profiling.Stat) (pf payloadFormat, msg []byte, err error) {
timer := stat.NewTimer("/header")
if _, err := p.r.Read(p.header[:]); err != nil {
stat.Egress(timer)
timer.Egress()
return 0, nil, err
}
stat.Egress(timer)
timer.Egress()

pf = payloadFormat(p.header[0])
length := binary.BigEndian.Uint32(p.header[1:])
Expand All @@ -534,10 +534,10 @@ func (p *parser) recvMsg(maxReceiveMessageSize int, stat *profiling.Stat) (pf pa
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
stat.Egress(timer)
timer.Egress()
return 0, nil, err
}
stat.Egress(timer)
timer.Egress()
return pf, msg, nil
}

Expand Down Expand Up @@ -574,31 +574,31 @@ func compress(in []byte, cp Compressor, compressor encoding.Compressor, stat *pr
timer := stat.NewTimer("/compresslib/init")
z, err := compressor.Compress(cbuf)
if err != nil {
stat.Egress(timer)
timer.Egress()
return nil, wrapErr(err)
}
stat.Egress(timer)
timer.Egress()

timer = stat.NewTimer("/compresslib/write")
if _, err := z.Write(in); err != nil {
stat.Egress(timer)
timer.Egress()
return nil, wrapErr(err)
}
stat.Egress(timer)
timer.Egress()

timer = stat.NewTimer("/compresslib/close")
if err := z.Close(); err != nil {
stat.Egress(timer)
timer.Egress()
return nil, wrapErr(err)
}
stat.Egress(timer)
timer.Egress()
} else {
timer := stat.NewTimer("/compressor")
if err := cp.Do(cbuf, in); err != nil {
stat.Egress(timer)
timer.Egress()
return nil, wrapErr(err)
}
stat.Egress(timer)
timer.Egress()
}
return cbuf.Bytes(), nil
}
Expand Down Expand Up @@ -661,9 +661,9 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei
overallTimer := stat.NewTimer("/recvAndDecompress")
timer := stat.NewTimer("/recvMsg")
pf, d, err := p.recvMsg(maxReceiveMessageSize, stat)
stat.Egress(timer)
timer.Egress()
if err != nil {
stat.Egress(overallTimer)
overallTimer.Egress()
return nil, err
}

Expand All @@ -673,11 +673,11 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei

timer = stat.NewTimer("/checkRecvPayload")
if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil {
stat.Egress(timer)
stat.Egress(overallTimer)
timer.Egress()
overallTimer.Egress()
return nil, st.Err()
}
stat.Egress(timer)
timer.Egress()

timer = stat.NewTimer("/compression")
var size int
Expand All @@ -691,22 +691,22 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei
d, size, err = decompress(compressor, d, maxReceiveMessageSize)
}
if err != nil {
stat.Egress(timer)
stat.Egress(overallTimer)
timer.Egress()
overallTimer.Egress()
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
}
} else {
size = len(d)
}
stat.Egress(timer)
timer.Egress()

if size > maxReceiveMessageSize {
// TODO: Revisit the error code. Currently keep it consistent with java
// implementation.
stat.Egress(overallTimer)
overallTimer.Egress()
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", size, maxReceiveMessageSize)
}
stat.Egress(overallTimer)
overallTimer.Egress()
return d, nil
}

Expand Down Expand Up @@ -749,9 +749,10 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf

timer := stat.NewTimer("/encoding")
if err := c.Unmarshal(d, m); err != nil {
timer.Egress()
return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
}
stat.Egress(timer)
timer.Egress()

if payInfo != nil {
payInfo.uncompressedBytes = d
Expand Down
Loading

0 comments on commit 28093c8

Please sign in to comment.