Skip to content

Commit

Permalink
Merge branch 'master' into ja/auth-error-event
Browse files Browse the repository at this point in the history
  • Loading branch information
j0sh authored Dec 20, 2024
2 parents ba1a388 + 0bfd28b commit a669dae
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 5 deletions.
14 changes: 13 additions & 1 deletion server/ai_live_video.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,26 @@ func startTricklePublish(ctx context.Context, url *url.URL, params aiRequestPara
if seq != currentSeq {
clog.Infof(ctx, "Next segment has already started; skipping this one seq=%d currentSeq=%d", seq, currentSeq)
params.liveParams.sendErrorEvent(fmt.Errorf("Next segment has started"))
segment.Close()
return
}
n, err := segment.Write(r)
if err == nil {
// no error, all done, let's leave
return
}
if errors.Is(err, trickle.StreamNotFoundErr) {
clog.Infof(ctx, "Stream no longer exists on orchestrator; terminating")
params.liveParams.stopPipeline(fmt.Errorf("Stream does not exist"))
return
}
// Retry segment only if nothing has been sent yet
// and the next segment has not yet started
// otherwise drop
if n > 0 {
clog.Infof(ctx, "Error publishing segment; dropping remainder wrote=%d err=%v", n, err)
params.liveParams.sendErrorEvent(fmt.Errorf("Error publishing, wrote %d dropping %v", n, err))
segment.Close()
return
}
clog.Infof(ctx, "Error publishing segment before writing; retrying err=%v", err)
Expand Down Expand Up @@ -142,10 +149,15 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa
clog.V(8).Infof(ctx, "trickle subscribe read data begin")
segment, err = subscriber.Read()
if err != nil {
if errors.Is(err, trickle.EOS) {
if errors.Is(err, trickle.EOS) || errors.Is(err, trickle.StreamNotFoundErr) {
params.liveParams.stopPipeline(fmt.Errorf("trickle subscribe end of stream: %w", err))
return
}
var sequenceNonexistent *trickle.SequenceNonexistent
if errors.As(err, &sequenceNonexistent) {
// stream exists but segment doesn't, so skip to leading edge
subscriber.SetSeq(sequenceNonexistent.Latest)
}
// TODO if not EOS then signal a new orchestrator is needed
err = fmt.Errorf("trickle subscribe error reading: %w", err)
clog.Infof(ctx, "%s", err)
Expand Down
33 changes: 33 additions & 0 deletions trickle/trickle_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,39 @@ func (p *pendingPost) Write(data io.Reader) (int64, error) {
return n, nil
}

/*
Close a segment. This is a polite action to notify any
subscribers that might be waiting for this segment.
Only needed if the segment is dropped or otherwise errored;
not required if the segment is written normally.
Note that subscribers still work fine even without this call;
it would just take longer for them to stop waiting when
the current segment drops out of the window of active segments.
*/
func (p *pendingPost) Close() error {
p.writer.Close()
url := fmt.Sprintf("%s/%d", p.client.baseURL, p.index)
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
return err
}
resp, err := (&http.Client{Transport: &http.Transport{
// ignore orch certs for now
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}}).Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return &HTTPError{Code: resp.StatusCode, Body: string(body)}
}
return nil
}

// Write sends data to the current segment, sets up the next segment concurrently, and blocks until completion
func (c *TricklePublisher) Write(data io.Reader) error {
pp, err := c.Next()
Expand Down
45 changes: 41 additions & 4 deletions trickle/trickle_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Stream struct {
name string
mimeType string
writeTime time.Time
closed bool
}

type Segment struct {
Expand Down Expand Up @@ -115,6 +116,7 @@ func ConfigureServer(config TrickleServerConfig) *Server {
mux.HandleFunc("POST "+basePath+"{streamName}", streamManager.handleCreate)
mux.HandleFunc("GET "+basePath+"{streamName}/{idx}", streamManager.handleGet)
mux.HandleFunc("POST "+basePath+"{streamName}/{idx}", streamManager.handlePost)
mux.HandleFunc("DELETE "+basePath+"{streamName}/{idx}", streamManager.closeSeq)
mux.HandleFunc("DELETE "+basePath+"{streamName}", streamManager.handleDelete)
return streamManager
}
Expand Down Expand Up @@ -185,7 +187,7 @@ func (sm *Server) clearAllStreams() {
// TODO update changefeed

for _, stream := range sm.streams {
stream.clear()
stream.close()
}
sm.streams = make(map[string]*Stream)
}
Expand Down Expand Up @@ -213,13 +215,14 @@ func (sm *Server) sweepIdleChannels() {
}
}

func (s *Stream) clear() {
func (s *Stream) close() {
s.mutex.Lock()
defer s.mutex.Unlock()
for _, segment := range s.segments {
segment.close()
}
s.segments = make([]*Segment, maxSegmentsPerStream)
s.closed = true
}

func (sm *Server) closeStream(streamName string) error {
Expand All @@ -230,7 +233,7 @@ func (sm *Server) closeStream(streamName string) error {

// TODO there is a bit of an issue around session reuse

stream.clear()
stream.close()
sm.mutex.Lock()
delete(sm.streams, streamName)
sm.mutex.Unlock()
Expand Down Expand Up @@ -258,6 +261,28 @@ func (sm *Server) handleDelete(w http.ResponseWriter, r *http.Request) {
}
}

func (sm *Server) closeSeq(w http.ResponseWriter, r *http.Request) {
s, exists := sm.getStream(r.PathValue("streamName"))
if !exists {
http.Error(w, "Stream not found", http.StatusNotFound)
return
}
idx, err := strconv.Atoi(r.PathValue("idx"))
if err != nil {
http.Error(w, "Invalid idx", http.StatusBadRequest)
return
}
slog.Info("DELETE closing seq", "channel", s.name, "seq", idx)
s.mutex.RLock()
seg := s.segments[idx%maxSegmentsPerStream]
s.mutex.RUnlock()
if seg == nil || seg.idx != idx {
http.Error(w, "Nonexistent segment", http.StatusBadRequest)
return
}
seg.close()
}

func (sm *Server) handleCreate(w http.ResponseWriter, r *http.Request) {
stream := sm.getOrCreateStream(r.PathValue("streamName"), r.Header.Get("Expect-Content"), false)
if stream == nil {
Expand Down Expand Up @@ -489,8 +514,20 @@ func (s *Stream) handleGet(w http.ResponseWriter, r *http.Request, idx int) {
}
if eof {
if totalWrites <= 0 {
// check if the channel was closed; sometimes we drop / skip a segment
s.mutex.RLock()
closed := s.closed
latestSeq := s.latestWrite
s.mutex.RUnlock()
w.Header().Set("Lp-Trickle-Seq", strconv.Itoa(segment.idx))
w.Header().Set("Lp-Trickle-Closed", "terminated")
if closed {
w.Header().Set("Lp-Trickle-Closed", "terminated")
} else {
// if the segment was dropped, its probably slow
// send over latest seq so the client can grab leading edge
w.Header().Set("Lp-Trickle-Latest", strconv.Itoa(latestSeq))
w.WriteHeader(470)
}
}
return totalWrites, nil
}
Expand Down
40 changes: 40 additions & 0 deletions trickle/trickle_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ import (

var EOS = errors.New("End of stream")

type SequenceNonexistent struct {
Latest int
Seq int
}

func (e *SequenceNonexistent) Error() string {
return fmt.Sprintf("Channel exists but sequence does not: requested %d latest %d", e.Seq, e.Latest)
}

const preconnectRefreshTimeout = 20 * time.Second

// TrickleSubscriber represents a trickle streaming reader that always fetches from index -1
Expand Down Expand Up @@ -51,10 +60,28 @@ func GetSeq(resp *http.Response) int {
return i
}

func GetLatest(resp *http.Response) int {
if resp == nil {
return -99 // TODO hmm
}
v := resp.Header.Get("Lp-Trickle-Latest")
i, err := strconv.Atoi(v)
if err != nil {
return -1 // Use the latest index on the server
}
return i
}

func IsEOS(resp *http.Response) bool {
return resp.Header.Get("Lp-Trickle-Closed") != ""
}

func (c *TrickleSubscriber) SetSeq(seq int) {
c.mu.Lock()
defer c.mu.Unlock()
c.idx = seq
}

func (c *TrickleSubscriber) connect(ctx context.Context) (*http.Response, error) {
url := fmt.Sprintf("%s/%d", c.url, c.idx)
slog.Debug("preconnecting", "url", url)
Expand All @@ -76,6 +103,9 @@ func (c *TrickleSubscriber) connect(ctx context.Context) (*http.Response, error)
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
resp.Body.Close() // Ensure we close the body to avoid leaking connections
if resp.StatusCode == http.StatusNotFound || resp.StatusCode == 470 {
return resp, nil
}
return nil, fmt.Errorf("failed GET segment, status code: %d, msg: %s", resp.StatusCode, string(body))
}

Expand Down Expand Up @@ -152,9 +182,19 @@ func (c *TrickleSubscriber) Read() (*http.Response, error) {
c.pendingGet = nil

if IsEOS(conn) {
conn.Body.Close() // because this is a 200; maybe use a custom status code
return nil, EOS
}

if conn.StatusCode == http.StatusNotFound {
return nil, StreamNotFoundErr
}

if conn.StatusCode == 470 {
// stream exists but segment dosn't
return nil, &SequenceNonexistent{Seq: GetSeq(conn), Latest: GetLatest(conn)}
}

// Set to use the next index for the next (pre-)connection
idx := GetSeq(conn)
if idx >= 0 {
Expand Down

0 comments on commit a669dae

Please sign in to comment.