Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MINOR] CLA Factory logging cleanup #2211

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ public void allocateColGroup(AColGroup cg) {
* @param colGroups new ColGroups in the MatrixBlock
*/
public void allocateColGroupList(List<AColGroup> colGroups) {
cachedMemorySize = -1;
_colGroups = colGroups;
}

Expand Down Expand Up @@ -351,7 +352,6 @@ public long recomputeNonZeros(int k) {
List<Future<Long>> tasks = new ArrayList<>();
for(AColGroup g : _colGroups)
tasks.add(pool.submit(() -> g.getNumberNonZeros(rlen)));

long nnz = 0;
for(Future<Long> t : tasks)
nnz += t.get();
Expand Down Expand Up @@ -398,7 +398,6 @@ public long estimateSizeInMemory() {
public long estimateCompressedSizeInMemory() {

if(cachedMemorySize <= -1L) {

long total = baseSizeInMemory();
// take into consideration duplicate dictionaries
Set<IDictionary> dicts = new HashSet<>();
Expand All @@ -413,7 +412,6 @@ public long estimateCompressedSizeInMemory() {
}
cachedMemorySize = total;
return total;

}
else
return cachedMemorySize;
Expand Down Expand Up @@ -1002,6 +1000,10 @@ public MatrixBlock getUncompressed() {
return getUncompressed((String) null);
}

public MatrixBlock getUncompressed(int k){
return getUncompressed((String) null, k);
}

public MatrixBlock getUncompressed(String operation) {
return getUncompressed(operation,
ConfigurationManager.isParallelMatrixOperations() ? InfrastructureAnalyzer.getLocalParallelism() : 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed;
import org.apache.sysds.runtime.compress.cost.ACostEstimate;
import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
import org.apache.sysds.runtime.compress.cost.CostEstimatorBuilder;
import org.apache.sysds.runtime.compress.cost.CostEstimatorFactory;
import org.apache.sysds.runtime.compress.cost.InstructionTypeCounter;
Expand Down Expand Up @@ -159,7 +160,7 @@ public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb,
return compress(mb, k, compSettings, (WTreeRoot) null);
}

public static Future<Void> compressAsync(ExecutionContext ec, String varName) {
public static Future<Void> compressAsync(ExecutionContext ec, String varName) {
return compressAsync(ec, varName, null);
}

Expand All @@ -168,7 +169,7 @@ public static Future<Void> compressAsync(ExecutionContext ec, String varName, In
final ExecutorService pool = CommonThreadPool.get(); // We have to guarantee that a thread pool is allocated.
return CompletableFuture.runAsync(() -> {
// method call or code to be async
try{
try {
CacheableData<?> data = ec.getCacheableData(varName);
if(data instanceof MatrixObject) {
MatrixObject mo = (MatrixObject) data;
Expand All @@ -178,10 +179,11 @@ public static Future<Void> compressAsync(ExecutionContext ec, String varName, In
ExecutionContext.createCacheableData(mb);
mo.acquireModify(mbc);
mo.release();
mbc.sum(); // calculate sum to forcefully materialize counts
}
}
}
finally{
finally {
pool.shutdown();
}
}, pool);
Expand Down Expand Up @@ -288,11 +290,16 @@ else if(mb instanceof CompressedMatrixBlock && ((CompressedMatrixBlock) mb).isOv
_stats.originalSize = mb.getInMemorySize();
_stats.originalCost = costEstimator.getCost(mb);

final double orgSum;
if(CompressedMatrixBlock.debug)
orgSum = mb.sum(k).getDouble(0, 0);
else
orgSum = 0;
if(mb.isEmpty()) // empty input return empty compression
return createEmpty();

res = new CompressedMatrixBlock(mb); // copy metadata and allocate soft reference

logInit();
classifyPhase();
if(compressionGroups == null)
return abortCompression();
Expand All @@ -308,6 +315,12 @@ else if(mb instanceof CompressedMatrixBlock && ((CompressedMatrixBlock) mb).isOv
if(res == null)
return abortCompression();

if(CompressedMatrixBlock.debug) {
final double afterComp = mb.sum(k).getDouble(0, 0);
final double deltaSum = Math.abs(orgSum - afterComp);
LOG.debug("compression Sum: Before:" + orgSum + " after: " + afterComp + " |delta|: " + deltaSum);
}

return new ImmutablePair<>(res, _stats);
}

Expand All @@ -334,7 +347,8 @@ private void classifyPhase() {
final double scale = Math.sqrt(nCols);
final double threshold = _stats.estimatedCostCols / scale;

if(threshold < _stats.originalCost) {
if(threshold < _stats.originalCost *
((costEstimator instanceof ComputationCostEstimator) && !(mb instanceof CompressedMatrixBlock) ? 15 : 0.8)) {
if(nCols > 1)
coCodePhase();
else // LOG a short cocode phase (since there is one column we don't cocode)
Expand Down Expand Up @@ -406,7 +420,7 @@ private void transposeHeuristics() {
compSettings.transposed = false;
break;
default:
compSettings.transposed = transposeHeuristics(compressionGroups.getNumberColGroups() , mb);
compSettings.transposed = transposeHeuristics(compressionGroups.getNumberColGroups(), mb);
}
}

Expand Down Expand Up @@ -442,20 +456,20 @@ private void finalizePhase() {

_stats.compressedSize = res.getInMemorySize();
_stats.compressedCost = costEstimator.getCost(res.getColGroups(), res.getNumRows());

final double ratio = _stats.getRatio();
final double denseRatio = _stats.getDenseRatio();

_stats.setColGroupsCounts(res.getColGroups());
if(ratio < 1 && denseRatio < 100.0) {

if(_stats.compressedCost > _stats.originalCost) {
LOG.info("--dense size: " + _stats.denseSize);
LOG.info("--original size: " + _stats.originalSize);
LOG.info("--compressed size: " + _stats.compressedSize);
LOG.info("--compression ratio: " + ratio);
LOG.info("--compression ratio: " + _stats.getRatio());
LOG.info("--original Cost: " + _stats.originalCost);
LOG.info("--Compressed Cost: " + _stats.compressedCost);
LOG.info("--Cost Ratio: " + _stats.getCostRatio());
LOG.debug("--col groups types " + _stats.getGroupsTypesString());
LOG.debug("--col groups sizes " + _stats.getGroupsSizesString());
logLengths();
LOG.info("Abort block compression because compression ratio is less than 1.");
LOG.info("Abort block compression because cost ratio is less than 1. ");
res = null;
setNextTimePhase(time.stop());
DMLCompressionStatistics.addCompressionTime(getLastTimePhase(), phase);
Expand All @@ -472,9 +486,23 @@ private void finalizePhase() {

private Pair<MatrixBlock, CompressionStatistics> abortCompression() {
LOG.warn("Compression aborted at phase: " + phase);
if(mb instanceof CompressedMatrixBlock && mb.getInMemorySize() > _stats.denseSize) {
MatrixBlock ucmb = ((CompressedMatrixBlock) mb).getUncompressed("Decompressing for abort: ", k);
return new ImmutablePair<>(ucmb, _stats);
}
return new ImmutablePair<>(mb, _stats);
}

private void logInit() {
if(LOG.isDebugEnabled()) {
LOG.debug("--Seed used for comp : " + compSettings.seed);
LOG.debug(String.format("--number columns to compress: %10d", mb.getNumColumns()));
LOG.debug(String.format("--number rows to compress : %10d", mb.getNumRows()));
LOG.debug(String.format("--sparsity : %10.5f", mb.getSparsity()));
LOG.debug(String.format("--nonZeros : %10d", mb.getNonZeros()));
}
}

private void logPhase() {
setNextTimePhase(time.stop());
DMLCompressionStatistics.addCompressionTime(getLastTimePhase(), phase);
Expand All @@ -486,7 +514,6 @@ private void logPhase() {
else {
switch(phase) {
case 0:
LOG.debug("--Seed used for comp : " + compSettings.seed);
LOG.debug("--compression phase " + phase + " Classify : " + getLastTimePhase());
LOG.debug("--Individual Columns Estimated Compression: " + _stats.estimatedSizeCols);
if(mb instanceof CompressedMatrixBlock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@
*/
public class CompressionSettingsBuilder {
private double samplingRatio;
// private double samplePower = 0.6;
private double samplePower = 0.65;
// private double samplePower = 0.68;
// private double samplePower = 0.7;
private boolean allowSharedDictionary = false;
private String transposeInput;
private int seed = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public double getRatio() {
return compressedSize == 0.0 ? Double.POSITIVE_INFINITY : (double) originalSize / compressedSize;
}

public double getCostRatio() {
return compressedSize == 0.0 ? Double.POSITIVE_INFINITY : (double) originalCost / compressedCost;
}

public double getDenseRatio() {
return compressedSize == 0.0 ? Double.POSITIVE_INFINITY : (double) denseSize / compressedSize;
}
Expand All @@ -121,7 +125,7 @@ public String toString() {
sb.append("\nCompressed Size : " + compressedSize);
sb.append("\nCompressionRatio : " + getRatio());
sb.append("\nDenseCompressionRatio : " + getDenseRatio());

if(colGroupCounts != null) {
sb.append("\nCompressionTypes : " + getGroupsTypesString());
sb.append("\nCompressionGroupSizes : " + getGroupsSizesString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* This column group is handy in cases where sparse unsafe operations is executed on very sparse columns. Then the zeros
* would be materialized in the group without any overhead.
*/
public abstract class ASDC extends AMorphingMMColGroup implements AOffsetsGroup , IContainDefaultTuple {
public abstract class ASDC extends AMorphingMMColGroup implements AOffsetsGroup, IContainDefaultTuple {
private static final long serialVersionUID = 769993538831949086L;

/** Sparse row indexes for the data */
Expand Down Expand Up @@ -62,7 +62,7 @@ public AOffset getOffsets() {
@Override
public final CompressedSizeInfoColGroup getCompressionInfo(int nRow) {
EstimationFactors ef = new EstimationFactors(getNumValues(), _numRows, getNumberOffsets(), _dict.getSparsity());
return new CompressedSizeInfoColGroup(_colIndexes, ef, nRow, getCompType(),getEncoding());
return new CompressedSizeInfoColGroup(_colIndexes, ef, estimateInMemorySize(), getCompType(), getEncoding());
}

@Override
Expand All @@ -74,12 +74,12 @@ public ICLAScheme getCompressionScheme() {
public AColGroup morph(CompressionType ct, int nRow) {
if(ct == getCompType())
return this;
else if (ct == CompressionType.SDCFOR)
else if(ct == CompressionType.SDCFOR)
return this; // it does not make sense to change to FOR.
else
return super.morph(ct, nRow);
}

@Override
protected boolean allowShallowIdentityRightMult() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private final void leftMultByMatrixNoPreAggSingleRow(MatrixBlock mb, MatrixBlock
AIterator it) {
if(mb.isEmpty()) // early abort.
return;

final DenseBlock res = result.getDenseBlock();
final double[] resV = res.values(r);
final int offRet = res.pos(r);
Expand Down Expand Up @@ -108,11 +108,11 @@ private final void leftMultByMatrixNoPreAggSingleRowSparse(final SparseBlock sb,
final int v = it.value();
while(apos < alen && aix[apos] < v)
apos++; // go though sparse block until offset start.
if(cu < last)
if(cu < last)
leftMultByMatrixNoPreAggSingleRowSparseInside(v, it, apos, alen, aix, aval, resV, offRet, cu);
else if(aix[alen - 1] < last)
else if(aix[alen - 1] < last)
leftMultByMatrixNoPreAggSingleRowSparseLessThan(v, it, apos, alen, aix, aval, resV, offRet);
else
else
leftMultByMatrixNoPreAggSingleRowSparseTail(v, it, apos, alen, aix, aval, resV, offRet, cu, last);
}

Expand Down Expand Up @@ -245,7 +245,7 @@ public double[] getDefaultTuple() {
@Override
public final CompressedSizeInfoColGroup getCompressionInfo(int nRow) {
EstimationFactors ef = new EstimationFactors(getNumValues(), _numRows, getNumberOffsets(), _dict.getSparsity());
return new CompressedSizeInfoColGroup(_colIndexes, ef, nRow, getCompType(), getEncoding());
return new CompressedSizeInfoColGroup(_colIndexes, ef, this.estimateInMemorySize(), getCompType(), getEncoding());
}

@Override
Expand All @@ -257,12 +257,12 @@ public ICLAScheme getCompressionScheme() {
public AColGroup morph(CompressionType ct, int nRow) {
if(ct == getCompType())
return this;
else if (ct == CompressionType.SDCFOR)
else if(ct == CompressionType.SDCFOR)
return this; // it does not make sense to change to FOR.
else
return super.morph(ct, nRow);
}

@Override
protected boolean allowShallowIdentityRightMult() {
return true;
Expand Down
Loading