Skip to content
tom10271 edited this page Apr 3, 2018 · 11 revisions

The ScrollService allows you to iterate through a large result set. The example below for elastic v3.0.50+ illustrates how to do that effectively as a pipeline, using multiple goroutines. We use the excellent golang.org/x/sync/errgroup for our solution.

func (c *HBESClient) RunScrollRequest(
	scrollRequest *elastic.ScrollService,
	resultCallback func(result *elastic.SearchResult) error,
	hitCallback func(message *json.RawMessage) error,
) *errgroup.Group {
	resultsChannel := make(chan elastic.SearchResult)
	hitsChannel := make(chan json.RawMessage)
	allResultsCollected := make(chan bool)
	group, ctx := errgroup.WithContext(context.Background())

	group.Go(func() error {
		defer close(resultsChannel)
		for {
			results, err := scrollRequest.Do(context.Background())
			if err == io.EOF {
				return nil
			}
			if err != nil {
				return err
			}

			select {
			case resultsChannel <- *results:
			case <-ctx.Done():
				return ctx.Err()
			}
		}
		return nil
	})

	group.Go(func() error {
		for results := range resultsChannel {
			err := resultCallback(&results)
			if err != nil {
				return err
			}

			for _, hit := range results.Hits.Hits {
				select {
				case hitsChannel <- *hit.Source:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		}

		return nil
	})

	for i := 0; i < 10; i++ {
		group.Go(func() error {
			for hit := range hitsChannel {
				err := hitCallback(&hit)
				if err != nil {
					return err
				}

				select {
				default:
				case <-ctx.Done():
					return ctx.Err()
				}

				if len(allResultsCollected) > 0 && len(hitsChannel) == 0 {
					close(hitsChannel)
					close(allResultsCollected)
				}
			}
			
			return nil
		})
	}

	return group
}

Notice: You can sort the results with ScrollService. But that comes with a big performance hit. Elasticsearch has to sort results first before returning them to you. So if all you need is to get each result from a result set (potentially with a Query), do not add a sort order. See here for details.

Clone this wiki locally