Skip to content

Commit

Permalink
adds tests to verify estimated split file sizes (#4625)
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-turner authored Jun 3, 2024
1 parent c76f5f7 commit 1123f12
Showing 1 changed file with 96 additions and 6 deletions.
102 changes: 96 additions & 6 deletions test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.LOCK_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -36,7 +37,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
Expand All @@ -58,10 +58,12 @@
import org.apache.accumulo.core.client.rfile.RFileWriter;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Pair;
Expand All @@ -85,8 +87,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.MoreCollectors;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

public class SplitIT extends AccumuloClusterHarness {
private static final Logger log = LoggerFactory.getLogger(SplitIT.class);

Expand Down Expand Up @@ -135,6 +135,98 @@ public void resetConfig() throws Exception {
}
}

// Test that checks the estimated file sizes created by a split are reasonable
@Test
public void testEstimatedSizes() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
String table = getUniqueNames(1)[0];

Map<String,String> props = new HashMap<>();
props.put(Property.TABLE_MAJC_RATIO.getKey(), "10");

c.tableOperations().create(table, new NewTableConfiguration().setProperties(props));

var random = RANDOM.get();
byte[] data = new byte[1000];

try (var writer = c.createBatchWriter(table)) {
// create 5 files with each file covering a larger range of data
for (int batch = 1; batch <= 5; batch++) {
for (int i = 0; i < 100; i++) {
Mutation m = new Mutation(String.format("%09d", i * batch));
random.nextBytes(data);
m.at().family("data").qualifier("random").put(data);
writer.addMutation(m);
}
writer.flush();
c.tableOperations().flush(table, null, null, true);
}
}

var tableId = getServerContext().getTableId(table);
var files = getServerContext().getAmple().readTablet(new KeyExtent(tableId, null, null))
.getFilesMap();

// map of file name and the estimates for that file from the original tablet
Map<String,DataFileValue> filesSizes1 = new HashMap<>();
files.forEach((file, dfv) -> filesSizes1.put(file.getFileName(), dfv));

TreeSet<Text> splits = new TreeSet<>();
for (int batch = 1; batch < 5; batch++) {
splits.add(new Text(String.format("%09d", batch * 100)));
}

c.tableOperations().addSplits(table, splits);

// map of file name and the estimates for that file from all splits
Map<String,List<DataFileValue>> filesSizes2 = new HashMap<>();
try (var tablets =
getServerContext().getAmple().readTablets().forTable(tableId).fetch(FILES).build()) {
for (var tablet : tablets) {
tablet.getFilesMap().forEach((file, dfv) -> filesSizes2
.computeIfAbsent(file.getFileName(), k -> new ArrayList<>()).add(dfv));
}
}

assertEquals(5, filesSizes1.size());
assertEquals(filesSizes1.keySet(), filesSizes2.keySet());

// the way the data is relative to splits should have a tablet w/ 1 file, a tablet w/ 2 files,
// etc
assertEquals(Set.of(1, 2, 3, 4, 5),
filesSizes2.values().stream().map(List::size).collect(Collectors.toSet()));

// compare the estimates from the split tablets to the estimates from the original tablet
for (var fileName : filesSizes1.keySet()) {
var origSize = filesSizes1.get(fileName);

// wrote 100 entries to each file
assertEquals(100, origSize.getNumEntries());
// wrote 100x1000 random byte array which should not compress
assertTrue(origSize.getSize() > 100_000);
assertTrue(origSize.getSize() < 110_000);

var splitSize = filesSizes2.get(fileName).stream().mapToLong(DataFileValue::getSize).sum();
var diff = 1 - splitSize / (double) origSize.getSize();
// the sum of the sizes in the split should be very close to the original
assertTrue(diff < .02,
"diff:" + diff + " file:" + fileName + " orig:" + origSize + " split:" + splitSize);

var splitEntries =
filesSizes2.get(fileName).stream().mapToLong(DataFileValue::getNumEntries).sum();
diff = 1 - splitEntries / (double) origSize.getNumEntries();
// the sum of the entries in the split should be very close to the original
assertTrue(diff < .02,
"diff:" + diff + " file:" + fileName + " orig:" + origSize + " split:" + splitSize);

// all the splits should have the same file size and estimate because the data was evenly
// split
assertEquals(1, filesSizes2.get(fileName).stream().distinct().count(),
"" + filesSizes2.get(fileName));
}
}
}

@Test
public void tabletShouldSplit() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
Expand Down Expand Up @@ -338,8 +430,6 @@ private String getDir() throws Exception {
return dir;
}

@SuppressFBWarnings(value = {"PREDICTABLE_RANDOM", "DMI_RANDOM_USED_ONLY_ONCE"},
justification = "predictable random with specific seed is intended for this test")
@Test
public void bulkImportThatCantSplitHangsCompaction() throws Exception {

Expand All @@ -361,7 +451,7 @@ public void bulkImportThatCantSplitHangsCompaction() throws Exception {
c.tableOperations().create(tableName, new NewTableConfiguration()
.setProperties(singletonMap(Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K")));

Random random = new Random();
var random = RANDOM.get();
byte[] val = new byte[100];

String dir = getDir();
Expand Down

0 comments on commit 1123f12

Please sign in to comment.