Skip to content

Commit

Permalink
feat: toward direct reading of InMemoryShard
Browse files Browse the repository at this point in the history
  • Loading branch information
bogovicj committed Jan 8, 2025
1 parent 6e3cbe5 commit 0647755
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
package org.janelia.saalfeldlab.n5.shard;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

import org.apache.commons.io.input.BoundedInputStream;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.commons.io.output.CountingOutputStream;
import org.apache.commons.io.output.ProxyOutputStream;
import org.checkerframework.checker.units.qual.A;
import org.janelia.saalfeldlab.n5.DataBlock;
import org.janelia.saalfeldlab.n5.DatasetAttributes;
import org.janelia.saalfeldlab.n5.DefaultBlockReader;
import org.janelia.saalfeldlab.n5.DefaultBlockWriter;
import org.janelia.saalfeldlab.n5.KeyValueAccess;
import org.janelia.saalfeldlab.n5.LockedChannel;
import org.janelia.saalfeldlab.n5.shard.ShardingCodec.IndexLocation;
import org.janelia.saalfeldlab.n5.util.GridIterator;

public class InMemoryShard<T> extends AbstractShard<T> {

Expand All @@ -27,7 +34,6 @@ public class InMemoryShard<T> extends AbstractShard<T> {
* Use morton- or c-ording instead of writing blocks out in the order they're added?
* (later)
*/

public <A extends DatasetAttributes & ShardParameters> InMemoryShard(final A datasetAttributes, final long[] shardPosition) {

this( datasetAttributes, shardPosition, null);
Expand Down Expand Up @@ -99,6 +105,61 @@ public void write(final OutputStream out) throws IOException {
writeShardStart(out, this);
}

public static <T, A extends DatasetAttributes & ShardParameters> InMemoryShard<T> readShard(
final KeyValueAccess kva, final String key, final long[] gridPosition, final A attributes)
throws IOException {

try (final LockedChannel lockedChannel = kva.lockForReading(key)) {
try (final InputStream is = lockedChannel.newInputStream()) {
return readShard(is, gridPosition, attributes);
}
}
}

public static <T, A extends DatasetAttributes & ShardParameters> InMemoryShard<T> readShard(
final InputStream inputStream, final long[] gridPosition, final A attributes) throws IOException {

try (ByteArrayOutputStream result = new ByteArrayOutputStream()) {
byte[] buffer = new byte[1024];
for (int length; (length = inputStream.read(buffer)) != -1;) {
result.write(buffer, 0, length);
}
return readShard(result.toByteArray(), gridPosition, attributes);
}
}

public static <T, A extends DatasetAttributes & ShardParameters> InMemoryShard<T> readShard(final byte[] data,
long[] shardPosition, final A attributes) throws IOException {

final ShardIndex index = attributes.createIndex();
ShardIndex.read(data, index);

final InMemoryShard<T> shard = new InMemoryShard<T>(attributes, shardPosition, index);
final GridIterator it = new GridIterator(attributes.getBlocksPerShard());
while (it.hasNext()) {

final long[] p = it.next();
final int[] pInt = GridIterator.long2int(p);

if (index.exists(pInt)) {

final ByteArrayInputStream is = new ByteArrayInputStream(data);
is.skip(index.getOffset(pInt));
BoundedInputStream bIs = BoundedInputStream.builder().setInputStream(is)
.setMaxCount(index.getNumBytes(pInt)).get();

final long[] blockGridPosition = attributes.getBlockPositionFromShardPosition(shardPosition, p);
@SuppressWarnings("unchecked")
final DataBlock<T> blk = (DataBlock<T>) DefaultBlockReader.readBlock(bIs, attributes,
blockGridPosition);
shard.addBlock(blk);
bIs.close();
}
}

return shard;
}

public static <T> void writeShard(final OutputStream out, final Shard<T> shard) throws IOException {

fromShard(shard).write(out);
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/janelia/saalfeldlab/n5/shard/VirtualShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.util.function.Supplier;

import org.checkerframework.checker.units.qual.A;
import org.janelia.saalfeldlab.n5.DataBlock;
Expand All @@ -28,6 +29,20 @@ public <A extends DatasetAttributes & ShardParameters> VirtualShard(final A data
this.path = path;
}

public <A extends DatasetAttributes & ShardParameters> VirtualShard(final A datasetAttributes, long[] gridPosition) {

this(datasetAttributes, gridPosition, null, null);
}

@SuppressWarnings("unchecked")
public DataBlock<T> getBlock(Supplier<InputStream> inputSupplier, long... blockGridPosition) throws IOException {

// TODO this method is just a wrapper around readBlock and probably not worth keeping
try (InputStream is = inputSupplier.get()) {
return (DataBlock<T>) DefaultBlockReader.readBlock(is, datasetAttributes, blockGridPosition);
}
}

@SuppressWarnings("unchecked")
@Override
public DataBlock<T> getBlock(long... blockGridPosition) {
Expand Down
11 changes: 4 additions & 7 deletions src/main/java/org/janelia/saalfeldlab/n5/util/GridIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ public long[] next() {
return position;
}

public int[] nextAsInt() {
return long2int(next());
}

public int getIndex() {
return index;
}
Expand Down Expand Up @@ -93,11 +97,4 @@ final static public long[] int2long(final int[] i) {
return l;
}

public static void main(String[] args) {

final GridIterator it = new GridIterator(new int[]{2, 2, 2});
while (it.hasNext()) {
System.out.println(Arrays.toString(it.next()));
}
}
}

0 comments on commit 0647755

Please sign in to comment.