-
Notifications
You must be signed in to change notification settings - Fork 1.2k
ScrollParallel
tom10271 edited this page Apr 3, 2018
·
11 revisions
The ScrollService
allows you to iterate through a large result set. The example below for elastic v3.0.50+ illustrates how to do that effectively as a pipeline, using multiple goroutines. We use the excellent golang.org/x/sync/errgroup
for our solution.
func (c *HBESClient) RunScrollRequest(
scrollRequest *elastic.ScrollService,
resultCallback func(result *elastic.SearchResult) error,
hitCallback func(message *json.RawMessage) error,
) *errgroup.Group {
resultsChannel := make(chan elastic.SearchResult)
hitsChannel := make(chan json.RawMessage)
allResultsCollected := make(chan bool)
group, ctx := errgroup.WithContext(context.Background())
group.Go(func() error {
defer close(resultsChannel)
for {
results, err := scrollRequest.Do(context.Background())
if err == io.EOF {
return nil
}
if err != nil {
return err
}
select {
case resultsChannel <- *results:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
group.Go(func() error {
for results := range resultsChannel {
err := resultCallback(&results)
if err != nil {
return err
}
for _, hit := range results.Hits.Hits {
select {
case hitsChannel <- *hit.Source:
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
})
for i := 0; i < 10; i++ {
group.Go(func() error {
for hit := range hitsChannel {
err := hitCallback(&hit)
if err != nil {
return err
}
select {
default:
case <-ctx.Done():
return ctx.Err()
}
if len(allResultsCollected) > 0 && len(hitsChannel) == 0 {
close(hitsChannel)
close(allResultsCollected)
}
}
return nil
})
}
return group
}
Notice: You can sort the results with ScrollService
. But that comes with a big performance hit. Elasticsearch has to sort results first before returning them to you. So if all you need is to get each result from a result set (potentially with a Query
), do not add a sort order. See here for details.