Skip to content

Commit

Permalink
Refine error for HTTP stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Sep 1, 2024
1 parent d4ba8be commit e4c80cf
Showing 1 changed file with 94 additions and 17 deletions.
111 changes: 94 additions & 17 deletions proxy/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package main
import (
"context"
"fmt"
"io"
"net"
"net/http"
"os"
"path"
Expand Down Expand Up @@ -126,7 +126,10 @@ func (v *httpServer) Run(ctx context.Context) error {
}

type HTTPStreaming struct {
// The context for HTTP streaming.
ctx context.Context
// Whether has written response to client.
written bool
}

func NewHTTPStreaming(opts ...func(streaming *HTTPStreaming)) *HTTPStreaming {
Expand All @@ -138,22 +141,74 @@ func NewHTTPStreaming(opts ...func(streaming *HTTPStreaming)) *HTTPStreaming {
}

func (v *HTTPStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err := v.serve(v.ctx, w, r); err != nil {
apiError(v.ctx, w, r, err)
defer r.Body.Close()
ctx := logger.WithContext(v.ctx)

var backendClosedErr, clientClosedErr bool

handleBackendErr := func(err error) {
if isPeerClosedError(err) {
if !backendClosedErr {
backendClosedErr = true
logger.Df(ctx, "HTTP backend peer closed")
}
} else {
logger.Wf(ctx, "HTTP backend err %+v", err)
}
}

handleClientErr := func(err error) {
if isPeerClosedError(err) {
if !clientClosedErr {
clientClosedErr = true
logger.Df(ctx, "HTTP client peer closed")
}
} else {
logger.Wf(ctx, "HTTP client %v err %+v", r.RemoteAddr, err)
}
}

if err := v.serve(ctx, w, r); err != nil {
if perr, ok := err.(*RTMPProxyError); ok {
if perr.isBackend {
handleBackendErr(perr.err)
} else {
handleClientErr(perr.err)
}
} else {
handleClientErr(err)
}

if !v.written {
apiError(ctx, w, r, err)
}
}
}

func (v *HTTPStreaming) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
// Build the stream URL in vhost/app/stream schema.
scheme := "http"
if r.TLS != nil {
scheme = "https"
var requestURL, originalURL string
if true {
scheme := "http"
if r.TLS != nil {
scheme = "https"
}

hostname, _, err := net.SplitHostPort(r.Host)
if err != nil {
return errors.Wrapf(err, "split host %v", r.Host)
}

streamExt := path.Ext(r.URL.Path)
streamName := strings.TrimSuffix(r.URL.Path, streamExt)
requestURL = fmt.Sprintf("%v://%v%v", scheme, hostname, streamName)
originalURL = fmt.Sprintf("%v%v", requestURL, streamExt)
logger.Df(ctx, "Got HTTP client from %v for %v", r.RemoteAddr, originalURL)
}
streamName := strings.TrimSuffix(r.URL.Path, path.Ext(r.URL.Path))
streamURL, err := buildStreamURL(fmt.Sprintf("%v://%v%v", scheme, r.URL.Hostname(), streamName))

streamURL, err := buildStreamURL(requestURL)
if err != nil {
return errors.Wrapf(err, "build stream url scheme=%v, hostname=%v, stream=%v",
scheme, r.URL.Hostname(), streamName)
return errors.Wrapf(err, "build stream url %v", requestURL)
}

// Pick a backend SRS server to proxy the RTMP stream.
Expand All @@ -163,8 +218,11 @@ func (v *HTTPStreaming) serve(ctx context.Context, w http.ResponseWriter, r *htt
}

if err = v.serveByBackend(ctx, w, r, backend, streamURL); err != nil {
return errors.Wrapf(err, "serve %v by backend %+v for stream %v",
r.URL.String(), backend, streamURL)
wrappedErr := errors.Wrapf(err, "serve %v by backend %+v", originalURL, backend)
if perr, ok := err.(*RTMPProxyError); ok {
return &RTMPProxyError{perr.isBackend, wrappedErr}
}
return wrappedErr
}

return nil
Expand All @@ -187,21 +245,40 @@ func (v *HTTPStreaming) serveByBackend(ctx context.Context, w http.ResponseWrite
backendURL := fmt.Sprintf("http://%v:%v%s", backend.IP, httpPort, r.URL.Path)
req, err := http.NewRequestWithContext(ctx, "GET", backendURL, nil)
if err != nil {
return errors.Wrapf(err, "create request to %v", backendURL)
return &RTMPProxyError{true, errors.Wrapf(err, "create request to %v", backendURL)}
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return errors.Wrapf(err, "proxy stream to %v", backendURL)
return &RTMPProxyError{true, errors.Wrapf(err, "do request to %v", backendURL)}
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return errors.Errorf("proxy stream to %v failed, status=%v", backendURL, resp.Status)
return &RTMPProxyError{true, errors.Errorf("proxy stream to %v failed, status=%v", backendURL, resp.Status)}
}

// Copy all headers from backend to client.
w.WriteHeader(resp.StatusCode)
for k, v := range resp.Header {
for _, vv := range v {
w.Header().Add(k, vv)
}
}

v.written = true

// Copy all data from backend to client.
if _, err := io.Copy(w, resp.Body); err != nil {
return errors.Wrapf(err, "copy stream from %v", backendURL)
buf := make([]byte, 4096)
for {
n, err := resp.Body.Read(buf)
if err != nil {
return &RTMPProxyError{true, errors.Wrapf(err, "read stream from %v", backendURL)}
}

if _, err := w.Write(buf[:n]); err != nil {
return &RTMPProxyError{false, errors.Wrapf(err, "write stream client")}
}
}

return nil
Expand Down

0 comments on commit e4c80cf

Please sign in to comment.