diff --git a/report_artifact.go b/report_artifact.go index 17db93b7..d142ef22 100644 --- a/report_artifact.go +++ b/report_artifact.go @@ -100,10 +100,10 @@ func (a *TestArtifact) Upload(ctx context.Context, conf BucketConfiguration, dry } opts := pail.S3Options{ - Name: a.Bucket, - Prefix: a.Prefix, - Region: conf.Region, - MaxRetries: 10, + Name: a.Bucket, + Prefix: a.Prefix, + Region: conf.Region, + // MaxRetries: 10, Permissions: pail.S3Permissions(a.Permissions), DryRun: dryRun, } diff --git a/rpc/internal/collector.service.go b/rpc/internal/collector.service.go index f38bc488..ce7990e0 100644 --- a/rpc/internal/collector.service.go +++ b/rpc/internal/collector.service.go @@ -1,9 +1,11 @@ package internal import ( + "bytes" + "compress/gzip" "container/heap" - "container/list" "context" + "encoding/gob" "io" "sync" @@ -157,7 +159,10 @@ type streamGroup struct { type stream struct { inHeap bool closed bool - buffer *list.List + + buffer bytes.Buffer + encoder *gob.Encoder + decoder *gob.Decoder } // addStream adds a new stream to the group for the given collector. If the @@ -188,7 +193,16 @@ func (sc *streamsCoordinator) addStream(name string, registry *poplar.RecorderRe defer group.mu.Unlock() id := utility.RandomString() - group.streams[id] = &stream{buffer: list.New()} + + buffer := bytes.Buffer{} + encoder := gob.NewEncoder(gzip.NewWriter(&buffer)) + reader, _ := gzip.NewReader(&buffer) + decoder := gob.NewDecoder(reader) + group.streams[id] = &stream{ + buffer: bytes.Buffer{}, + encoder: encoder, + decoder: decoder, + } group.availableStreams = append(group.availableStreams, id) return nil @@ -232,7 +246,10 @@ func (sg *streamGroup) addEvent(ctx context.Context, id string, event *events.Pe } if stream.inHeap { - stream.buffer.PushBack(event) + err := stream.encoder.Encode(event) + if err != nil { + panic(err) + } return nil } sg.eventHeap.SafePush(&performanceHeapItem{id: id, event: event}) @@ -279,12 +296,15 @@ func (sg *streamGroup) flush() error { } else { stream.inHeap = false } - if event := stream.buffer.Front(); event != nil { - // Get next event from stream's buffer and add - // it to the min heap. - sg.eventHeap.SafePush(&performanceHeapItem{id: item.id, event: event.Value.(*events.Performance)}) - stream.inHeap = true - stream.buffer.Remove(event) + if stream.buffer.Len() > 0 { + event := new(events.Performance) + stream.decoder.Decode(&event) + if event != nil { + // Get next event from stream's buffer and add + // it to the min heap. + sg.eventHeap.SafePush(&performanceHeapItem{id: item.id, event: event}) + stream.inHeap = true + } } } diff --git a/rpc/internal/collector.service_test.go b/rpc/internal/collector.service_test.go index 7aa2f503..bd3ca1d3 100644 --- a/rpc/internal/collector.service_test.go +++ b/rpc/internal/collector.service_test.go @@ -2,7 +2,6 @@ package internal import ( "bytes" - "container/list" "context" fmt "fmt" "io/ioutil" @@ -96,7 +95,7 @@ func TestCloseCollector(t *testing.T) { svc.coordinator.groups["group"] = &streamGroup{ streams: map[string]*stream{ "id1": { - buffer: &list.List{}, + // buffer: &list.List{}, }, }, eventHeap: &PerformanceHeap{},