Skip to content

Commit

Permalink
[SYSTEMDS-3288] CLA SDC isolated DefaultTuple
Browse files Browse the repository at this point in the history
This commit change the SDC groups with a default tuple to isolate the
default into an array other than the dictionary. This leads to cheep
"morphing" between SDC like column groups to improve performance of
matrix multiplications and other like operations that benefit from
the morphing of column groups to more effecient types.

Minor additions (times are for that operation specific improvements)

- Improved CLA rexpand (one hot encode) by ~10-50x
- BitSet DDC preAggregate to use Bit set operations ~10x
- MapToData getCounts specialization ~3x
- Full integration of centralMoment (previously extracted MatrixBlock)
- Hardening interface for AColGroup, reduce inefficient extractions
- Compression time -1 sec for census_enc now ~5.5-6.5

Closes #1533
  • Loading branch information
Baunsgaard committed Feb 8, 2022
1 parent 549f634 commit b8d4897
Show file tree
Hide file tree
Showing 63 changed files with 2,011 additions and 1,387 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,18 @@
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.colgroup.AColGroup;
import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
import org.apache.sysds.runtime.compress.colgroup.AColGroupValue;
import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty;
import org.apache.sysds.runtime.compress.colgroup.ColGroupIO;
import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed;
import org.apache.sysds.runtime.compress.lib.CLALibAppend;
import org.apache.sysds.runtime.compress.lib.CLALibBinaryCellOp;
import org.apache.sysds.runtime.compress.lib.CLALibCMOps;
import org.apache.sysds.runtime.compress.lib.CLALibCompAgg;
import org.apache.sysds.runtime.compress.lib.CLALibDecompress;
import org.apache.sysds.runtime.compress.lib.CLALibLeftMultBy;
import org.apache.sysds.runtime.compress.lib.CLALibMMChain;
import org.apache.sysds.runtime.compress.lib.CLALibMatrixMult;
import org.apache.sysds.runtime.compress.lib.CLALibReExpand;
import org.apache.sysds.runtime.compress.lib.CLALibRexpand;
import org.apache.sysds.runtime.compress.lib.CLALibScalar;
import org.apache.sysds.runtime.compress.lib.CLALibSlice;
import org.apache.sysds.runtime.compress.lib.CLALibSquash;
Expand Down Expand Up @@ -275,10 +275,8 @@ public long recomputeNonZeros() {
nonZeros = nnz;
}

if(nonZeros == 0) {
ColGroupEmpty cg = ColGroupEmpty.generate(getNumColumns());
allocateColGroup(cg);
}
if(nonZeros == 0) // If there is no nonzeros then reallocate into single empty column group.
allocateColGroup(ColGroupEmpty.create(getNumColumns()));

return nonZeros;
}
Expand Down Expand Up @@ -468,7 +466,8 @@ public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, Matri
}

@Override
public MatrixBlock aggregateBinaryOperations(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret, AggregateBinaryOperator op) {
public MatrixBlock aggregateBinaryOperations(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret,
AggregateBinaryOperator op) {
checkAggregateBinaryOperations(m1, m2, op);
return CLALibMatrixMult.matrixMultiply(m1, m2, ret, op.getNumThreads(), false, false);
}
Expand Down Expand Up @@ -636,13 +635,7 @@ public double mean() {
@Override
public MatrixBlock rexpandOperations(MatrixBlock ret, double max, boolean rows, boolean cast, boolean ignore,
int k) {
if(rows) {
printDecompressWarning("rexpandOperations");
MatrixBlock tmp = getUncompressed();
return tmp.rexpandOperations(ret, max, rows, cast, ignore, k);
}
else
return CLALibReExpand.reExpand(this, ret, max, cast, ignore, k);
return CLALibRexpand.rexpand(this, ret, max, rows, cast, ignore, k);
}

@Override
Expand Down Expand Up @@ -713,29 +706,7 @@ public MatrixBlock zeroOutOperations(MatrixValue result, IndexRange range, boole

@Override
public CM_COV_Object cmOperations(CMOperator op) {
if(isEmpty())
return super.cmOperations(op);
else if(_colGroups.size() == 1 && _colGroups.get(0) instanceof AColGroupValue) {
AColGroupValue g = (AColGroupValue) _colGroups.get(0);
MatrixBlock vals = g.getValuesAsBlock();
MatrixBlock counts = getCountsAsBlock(g.getCounts());
if(counts.isEmpty())
return vals.cmOperations(op);
return vals.cmOperations(op, counts);
}
else
return getUncompressed("cmOperations").cmOperations(op);
}

private static MatrixBlock getCountsAsBlock(int[] counts) {
if(counts != null) {
MatrixBlock ret = new MatrixBlock(counts.length, 1, false);
for(int i = 0; i < counts.length; i++)
ret.quickSetValue(i, 0, counts[i]);
return ret;
}
else
return new MatrixBlock(1, 1, false);
return CLALibCMOps.centralMoment(this, op);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.compress.cocode.CoCoderFactory;
import org.apache.sysds.runtime.compress.colgroup.AColGroup;
import org.apache.sysds.runtime.compress.colgroup.AColGroupValue;
import org.apache.sysds.runtime.compress.colgroup.ColGroupConst;
import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty;
import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed;
import org.apache.sysds.runtime.compress.colgroup.AColGroupValue;
import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
import org.apache.sysds.runtime.compress.cost.CostEstimatorBuilder;
import org.apache.sysds.runtime.compress.cost.CostEstimatorFactory;
Expand Down Expand Up @@ -205,7 +206,7 @@ public static CompressedMatrixBlock genUncompressedCompressedMatrixBlock(MatrixB
*/
public static CompressedMatrixBlock createConstant(int numRows, int numCols, double value) {
CompressedMatrixBlock block = new CompressedMatrixBlock(numRows, numCols);
AColGroup cg = ColGroupFactory.genColGroupConst(numCols, value);
AColGroup cg = ColGroupConst.create(numCols, value);
block.allocateColGroup(cg);
block.recomputeNonZeros();
if(block.getNumRows() == 0 || block.getNumColumns() == 0) {
Expand All @@ -223,7 +224,7 @@ private Pair<MatrixBlock, CompressionStatistics> compressMatrix() {
else if(mb.isEmpty()) {
LOG.info("Empty input to compress, returning a compressed Matrix block with empty column group");
CompressedMatrixBlock ret = new CompressedMatrixBlock(mb.getNumRows(), mb.getNumColumns());
ColGroupEmpty cg = ColGroupEmpty.generate(mb.getNumColumns());
ColGroupEmpty cg = ColGroupEmpty.create(mb.getNumColumns());
ret.allocateColGroup(cg);
ret.setNonZeros(0);
return new ImmutablePair<>(ret, null);
Expand Down Expand Up @@ -440,7 +441,7 @@ private void logPhase() {
DMLCompressionStatistics.addCompressionTime(getLastTimePhase(), phase);
if(LOG.isDebugEnabled()) {
if(compSettings.isInSparkInstruction) {
if(phase == 5)
if(phase == 4)
LOG.debug(_stats);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
package org.apache.sysds.runtime.compress.cocode;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.CompressionSettings;
import org.apache.sysds.runtime.compress.DMLCompressionException;
import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
import org.apache.sysds.runtime.compress.cost.ICostEstimate;
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
Expand Down Expand Up @@ -132,7 +133,6 @@ private static List<CompressedSizeInfoColGroup> coCodeBruteForce(List<Compressed

protected static void parallelFirstJoin(List<ColIndexes> workSet, Memorizer mem, ICostEstimate cEst, int k) {
try {

ExecutorService pool = CommonThreadPool.get(k);
List<JoinTask> tasks = new ArrayList<>();
for(int i = 0; i < workSet.size(); i++)
Expand All @@ -154,7 +154,7 @@ protected static void parallelFirstJoin(List<ColIndexes> workSet, Memorizer mem,
pool.shutdown();
}
catch(Exception e) {
throw new DMLRuntimeException("failed to join column groups", e);
throw new DMLCompressionException("Failed parallelize first level all join all", e);
}
}

Expand All @@ -170,8 +170,14 @@ protected JoinTask(ColIndexes c1, ColIndexes c2, Memorizer m) {

@Override
public Object call() {
_m.getOrCreate(_c1, _c2);
return null;
try {
_m.getOrCreate(_c1, _c2);
return null;
}
catch(Exception e) {
throw new DMLCompressionException(
"Failed to join columns : " + Arrays.toString(_c1._indexes) + " + " + Arrays.toString(_c2._indexes), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.instructions.cp.CM_COV_Object;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
import org.apache.sysds.runtime.matrix.operators.CMOperator;
import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
import org.apache.sysds.utils.MemoryEstimates;

Expand Down Expand Up @@ -501,8 +503,8 @@ public double get(int r, int c) {
public abstract double getMax();

/**
* Get a copy of this column group. Depending on which column group is copied it is a deep or shallow copy. If the
* primitives for the underlying column groups is Immutable then only shallow copies is performed.
* Get a copy of this column group note this is only a shallow copy. Meaning only the object wrapping index
* structures, column indexes and dictionaries are copied.
*
* @return Get a copy of this column group.
*/
Expand Down Expand Up @@ -542,6 +544,26 @@ public double get(int r, int c) {
*/
public abstract void computeColSums(double[] c, int nRows);

/**
* Central Moment instruction executed on a column group.
*
* @param op The Operator to use.
* @param nRows The number of rows contained in the ColumnGroup.
* @return A Central Moment object.
*/
public abstract CM_COV_Object centralMoment(CMOperator op, int nRows);

/**
* Expand the column group to multiple columns. (one hot encode the column group)
*
* @param max The number of columns to expand to and cutoff values at.
* @param ignore If zero and negative values should be ignored.
* @param cast If the double values contained should be cast to whole numbers.
* @param nRows The number of rows in the column group.
* @return A new column group containing max number of columns.
*/
public abstract AColGroup rexpandCols(int max, boolean ignore, boolean cast, int nRows);

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ protected static void tsmmDense(double[] result, int numColumns, double[] values
if(values == null)
return;
final int nCol = colIndexes.length;
final int nRow = values.length / colIndexes.length;
final int nRow = counts.length;
for(int k = 0; k < nRow; k++) {
final int offTmp = nCol * k;
final int scale = counts[k];
Expand All @@ -204,7 +204,7 @@ protected static void tsmmDense(double[] result, int numColumns, double[] values
}

protected static void tsmmSparse(double[] result, int numColumns, SparseBlock sb, int[] counts, int[] colIndexes) {
for(int row = 0; row < sb.numRows(); row++) {
for(int row = 0; row < counts.length; row++) {
if(sb.isEmpty(row))
continue;
final int apos = sb.pos(row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.functionobjects.Builtin;
import org.apache.sysds.runtime.instructions.cp.CM_COV_Object;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.CMOperator;

/**
* Base class for column groups encoded with value dictionary. This include column groups such as DDC OLE and RLE.
Expand All @@ -51,6 +53,8 @@ public abstract class AColGroupValue extends AColGroupCompressed implements Clon
/**
* ColGroup Implementation Contains zero tuple. Note this is not if it contains a zero value. If false then the
* stored values are filling the ColGroup making it a dense representation, that can be leveraged in operations.
*
* TODO remove
*/
protected boolean _zeros = false;

Expand Down Expand Up @@ -173,21 +177,10 @@ public int getNumValues() {
return _dict.getNumberOfValues(_colIndexes.length);
}

public final ADictionary getDictionary() {
public ADictionary getDictionary() {
return _dict;
}

public final MatrixBlock getValuesAsBlock() {
_dict = _dict.getMBDict(_colIndexes.length);
MatrixBlock ret = ((MatrixBlockDictionary) _dict).getMatrixBlock();
if(_zeros) {
MatrixBlock tmp = new MatrixBlock();
ret.append(new MatrixBlock(1, _colIndexes.length, 0), tmp, false);
return tmp;
}
return ret;
}

/**
* Returns the counts of values inside the dictionary. If already calculated it will return the previous counts. This
* produce an overhead in cases where the count is calculated, but the overhead will be limited to number of distinct
Expand All @@ -202,7 +195,7 @@ public final int[] getCounts() {
int[] ret = getCachedCounts();

if(ret == null) {
ret = getCounts(new int[getNumValues() + (_zeros ? 1 : 0)]);
ret = getCounts(new int[getNumValues()]);
counts = new SoftReference<>(ret);
}

Expand Down Expand Up @@ -318,7 +311,6 @@ public long getExactSizeOnDisk() {
long ret = super.getExactSizeOnDisk();
ret += 1; // zeros boolean
ret += _dict.getExactSizeOnDisk();

return ret;
}

Expand Down Expand Up @@ -349,6 +341,11 @@ protected void computeProduct(double[] c, int nRows) {
c[0] *= _dict.product(getCounts(), _colIndexes.length);
}

@Override
protected void computeColProduct(double[] c, int nRows) {
_dict.colProduct(c, getCounts(), _colIndexes);
}

@Override
protected void computeRowProduct(double[] c, int rl, int ru, double[] preAgg) {
throw new NotImplementedException();
Expand All @@ -375,10 +372,6 @@ protected double[] preAggBuiltinRows(Builtin builtin) {
}

@Override
protected void computeColProduct(double[] c, int nRows) {
_dict.colProduct(c, getCounts(), _colIndexes);
}

protected Object clone() {
try {
return super.clone();
Expand All @@ -388,21 +381,17 @@ protected Object clone() {
}
}

public AColGroup copyAndSet(double[] newDictionary) {
return copyAndSet(new Dictionary(newDictionary));
}

public AColGroup copyAndSet(ADictionary newDictionary) {
protected AColGroup copyAndSet(ADictionary newDictionary) {
AColGroupValue clone = (AColGroupValue) this.clone();
clone._dict = newDictionary;
return clone;
}

public AColGroup copyAndSet(int[] colIndexes, double[] newDictionary) {
private AColGroup copyAndSet(int[] colIndexes, double[] newDictionary) {
return copyAndSet(colIndexes, new Dictionary(newDictionary));
}

public AColGroup copyAndSet(int[] colIndexes, ADictionary newDictionary) {
private AColGroup copyAndSet(int[] colIndexes, ADictionary newDictionary) {
AColGroupValue clone = (AColGroupValue) this.clone();
clone._dict = newDictionary;
clone.setColIndices(colIndexes);
Expand All @@ -416,7 +405,7 @@ public AColGroupValue copy() {

@Override
protected AColGroup sliceSingleColumn(int idx) {
final AColGroupValue ret = (AColGroupValue) copy();
final AColGroupValue ret = (AColGroupValue) this.clone();
ret._colIndexes = new int[] {0};
if(_colIndexes.length == 1)
ret._dict = ret._dict.clone();
Expand All @@ -428,7 +417,7 @@ protected AColGroup sliceSingleColumn(int idx) {

@Override
protected AColGroup sliceMultiColumns(int idStart, int idEnd, int[] outputCols) {
final AColGroupValue ret = (AColGroupValue) copy();
final AColGroupValue ret = (AColGroupValue) this.clone();
ret._dict = ret._dict.sliceOutColumnRange(idStart, idEnd, _colIndexes.length);
ret._colIndexes = outputCols;
return ret;
Expand Down Expand Up @@ -504,6 +493,20 @@ public AColGroup replace(double pattern, double replace) {
return copyAndSet(replaced);
}

@Override
public CM_COV_Object centralMoment(CMOperator op, int nRows) {
return _dict.centralMoment(op.fn, getCounts(), nRows);
}

@Override
public AColGroup rexpandCols(int max, boolean ignore, boolean cast, int nRows){
ADictionary d = _dict.rexpandCols(max, ignore, cast, _colIndexes.length);
if(d == null)
return ColGroupEmpty.create(max);
else
return copyAndSet(d);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Expand Down
Loading

0 comments on commit b8d4897

Please sign in to comment.