Skip to content

Commit

Permalink
[#29760] Only respond to sampling request while data sampling is ena…
Browse files Browse the repository at this point in the history
…bled (#29764)

* Only respond to sampling request while data sampling is enabled

* do not fail runner when data sampling is not enabled

---------

Co-authored-by: Zechen Jiang <[email protected]>
  • Loading branch information
lostluck and zechenj18 authored Dec 14, 2023
1 parent 7e6fa65 commit f773123
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions sdks/go/pkg/beam/core/runtime/harness/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,19 +665,21 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
case req.GetSampleData() != nil:
msg := req.GetSampleData()
var samples = make(map[string]*fnpb.SampleDataResponse_ElementList)
var elementsMap = c.dataSampler.GetSamples(msg.GetPcollectionIds())

for pid, elements := range elementsMap {
var elementList fnpb.SampleDataResponse_ElementList
for i := range elements {
var sampledElement = &fnpb.SampledElement{
Element: elements[i].Element,
SampleTimestamp: timestamppb.New(elements[i].Timestamp),
if c.dataSampler != nil {
var elementsMap = c.dataSampler.GetSamples(msg.GetPcollectionIds())
for pid, elements := range elementsMap {
var elementList fnpb.SampleDataResponse_ElementList
for i := range elements {
var sampledElement = &fnpb.SampledElement{
Element: elements[i].Element,
SampleTimestamp: timestamppb.New(elements[i].Timestamp),
}
elementList.Elements = append(elementList.Elements, sampledElement)
}
elementList.Elements = append(elementList.Elements, sampledElement)
samples[pid] = &elementList
}
samples[pid] = &elementList
}

return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_SampleData{
Expand Down

0 comments on commit f773123

Please sign in to comment.