Skip to content

Commit

Permalink
[SYSTEMDS-325] Bug fix in transformencode post-processing
Browse files Browse the repository at this point in the history
This patch fixes a bug in the multithreaded compaction logic.
  • Loading branch information
phaniarnab committed Feb 10, 2022
1 parent b8d4897 commit 2439f75
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;

import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -352,8 +354,12 @@ public List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock out, int
return new ColumnApplyTask<>(this, in, out, outputCol, startRow, blk);
}

public List<Integer> getSparseRowsWZeros(){
return _sparseRowsWZeros;
public Set<Integer> getSparseRowsWZeros(){
if (_sparseRowsWZeros != null) {
return new HashSet<Integer>(_sparseRowsWZeros);
}
else
return null;
}

protected void addSparseRowsWZeros(ArrayList<Integer> sparseRowsWZeros){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -357,12 +358,12 @@ public void shiftCol(int columnOffset) {
}

@Override
public List<Integer> getSparseRowsWZeros(){
public Set<Integer> getSparseRowsWZeros(){
return _columnEncoders.stream().map(ColumnEncoder::getSparseRowsWZeros).flatMap(l -> {
if(l == null)
return null;
return l.stream();
}).collect(Collectors.toList());
}).collect(Collectors.toSet());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,22 +427,28 @@ private void outputMatrixPostProcessing(MatrixBlock output){
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
int k = OptimizerUtils.getTransformNumThreads();
ForkJoinPool myPool = new ForkJoinPool(k);
List<Integer> indexSet = _columnEncoders.stream().parallel()
.map(ColumnEncoderComposite::getSparseRowsWZeros).flatMap(l -> {
if(l == null)
return null;
return l.stream();
}).collect(Collectors.toList());

if (k == 1) {
if(!indexSet.stream().parallel().allMatch(Objects::isNull)) {
Set<Integer> indexSet = _columnEncoders.stream()
.map(ColumnEncoderComposite::getSparseRowsWZeros).flatMap(l -> {
if(l == null)
return null;
return l.stream();
}).collect(Collectors.toSet());

if(!indexSet.stream().allMatch(Objects::isNull)) {
for(Integer row : indexSet)
output.getSparseBlock().get(row).compact();
}
}
else {
try {
if(!indexSet.stream().allMatch(Objects::isNull)) {
Set<Integer> indexSet = _columnEncoders.stream().parallel()
.map(ColumnEncoderComposite::getSparseRowsWZeros).flatMap(l -> {
if(l == null)
return null;
return l.stream();
}).collect(Collectors.toSet());
if(!indexSet.stream().parallel().allMatch(Objects::isNull)) {
myPool.submit(() -> {
indexSet.stream().parallel().forEach(row -> {
output.getSparseBlock().get(row).compact();
Expand Down

0 comments on commit 2439f75

Please sign in to comment.