Skip to content

Commit

Permalink
executor: fix hash join sqllogic test block, data race. (pingcap#8392)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored and zz-jason committed Nov 22, 2018
1 parent df466a8 commit d39d382
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.C
select {
case <-doneCh:
return
case <-e.closeCh:
return
default:
if e.finished.Load().(bool) {
return
Expand All @@ -275,7 +277,12 @@ func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.C
if chk.NumRows() == 0 {
return
}
chkCh <- chk
select {
case chkCh <- chk:
break
case <-e.closeCh:
return
}
e.innerResult.Add(chk)
}
}
Expand Down Expand Up @@ -527,19 +534,14 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) {
doneCh := make(chan struct{})
go util.WithRecovery(func() { e.fetchInnerRows(ctx, innerResultCh, doneCh) }, nil)

if e.finished.Load().(bool) {
return
}
// TODO: Parallel build hash table. Currently not support because `mvmap` is not thread-safe.
err := e.buildHashTableForList(innerResultCh)
if err != nil {
e.innerFinished <- errors.Trace(err)
close(doneCh)
// fetchInnerRows may be blocked by this channel, so read from the channel to unblock it.
select {
case <-innerResultCh:
default:
}
}
// wait fetchInnerRows be finished.
for range innerResultCh {
}
}

Expand Down

0 comments on commit d39d382

Please sign in to comment.