Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sharded meshes (single res using trivial multi-res) #112

Merged
merged 37 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
9a9a505
wip: adding a shard generation routine for meshes
william-silversmith Nov 13, 2021
47bc636
fix: iterator returned must have len
william-silversmith Nov 13, 2021
8b53124
fix: import errors
william-silversmith Nov 13, 2021
f85faff
feat(cli): add --shared to mesh forging
william-silversmith Nov 13, 2021
5a57b7f
feat: add --spatial-index-db to sharded generation cli commands
william-silversmith Nov 13, 2021
648c54d
chore: bump cloud-volume to 6.0.0
william-silversmith Nov 13, 2021
a9f6521
wip: write shard files with manifests and data offsets
william-silversmith Nov 16, 2021
9b10f8e
feat(cli): adds view command to open an on-disk dataset in neuroglancer
william-silversmith Nov 17, 2021
5700a8b
fix: mesh is already in the right binary encoding
william-silversmith Nov 17, 2021
83ff269
feat: normalize cli paths so file:// is assumed
william-silversmith Nov 17, 2021
3bcb909
fix: imports and missing definitions
william-silversmith Nov 17, 2021
339c331
fix: missing imports and arguments
william-silversmith Nov 17, 2021
c265c35
feat: allow alternative neuroglancer servers
william-silversmith Nov 17, 2021
7399e31
fix: wrong data offset and meshes are not gzip encoded (fix spec)
william-silversmith Nov 17, 2021
e1262ce
fix: removed data_encoding from cli
william-silversmith Nov 17, 2021
d01e393
perf: avoid serializing the manifest binary multiple times
william-silversmith Nov 17, 2021
927549f
fix: make sure file handles are cleaned up
william-silversmith Nov 17, 2021
1f9e2f0
refactor: remove pickle logic from sharded meshes
william-silversmith Nov 17, 2021
bf60910
docs: change comment to reflect meshes vs skeletons
william-silversmith Nov 17, 2021
bfcd229
feat: automatically calculate sharding parameters for meshes
william-silversmith Nov 17, 2021
d418d5a
fix: logic to handle small volumes of data in a shard
william-silversmith Nov 18, 2021
2492d78
test: make sure sharding calculation is reasonable
william-silversmith Nov 18, 2021
b0fb4c5
feat: check capacity utilization and halve shards below 55%
william-silversmith Nov 18, 2021
79011d1
refactor: move the computation to tc.common
william-silversmith Nov 18, 2021
1206efe
feat: make skeleton sharding as easy as mesh sharding too
william-silversmith Nov 18, 2021
2c2bb2f
fix: remove trimesh import
william-silversmith Nov 18, 2021
a8c8be9
chore: add constraint on cloud-files >= 4.4.0
william-silversmith Nov 18, 2021
bccb10e
feat(cli): add option to create spatial index db
william-silversmith Nov 19, 2021
b335f07
fix: missed a spot for normalizing paths in mesh merge
william-silversmith Nov 19, 2021
b3c160f
docs: describing the hash rate on an M1
william-silversmith Nov 20, 2021
01d1f2f
fix: use more appropriate quantization range and origin
william-silversmith Dec 4, 2021
51d69af
chore: latest cloud-volume has enhanced multi-res processing
william-silversmith Dec 4, 2021
54f7526
reqs: bump draco and cloud-volume
william-silversmith Dec 14, 2021
947e60f
docs: describe sharded meshing
william-silversmith Dec 14, 2021
666e460
Merge branch 'master' into wms_mesh_shards
william-silversmith Dec 14, 2021
c850d85
docs: note that integer type is required
william-silversmith Dec 14, 2021
eb422c7
refactor: use new draco interface
william-silversmith Dec 14, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -395,21 +395,38 @@ tasks = create_deletion_tasks(

Meshing is a two stage process. First, the dataset is divided up into a regular grid of tasks that will be meshed independently of
each other using the `MeshTask`. The resulting mesh fragments are uploaded to the destination layer's meshing directory
(named something like `mesh_mip_3_err_40`). Without additional processing, Neuroglancer has no way of
(named something like `mesh_mip_3_err_40`).

There are two ways do conduct meshing. The standard "unsharded" way can generate a lot of mesh fragment files. It scales to about 100M labels before it starts incurring unreasonable costs on cloud systems. To handle larger volumes, there is the somwhat more difficult to use sharded meshing process that condenses the number of files by orders of magnitude.

#### Unsharded Meshing

Without additional processing, Neuroglancer has no way of
knowing the names of these chunks (which will be named something like `$SEGID:0:$BOUNDING_BOX` e.g. `1052:0:0-512_0-512_0-512`).
The `$BOUNDING_BOX` part of the name is arbitrary and is the convention used by igneous because it is convenient for debugging.

The manually actuated second stage runs the `MeshManifestTask` which generates files named `$SEGID:0` which contains a short JSON snippet like `{ "fragments": [ "1052:0:0-512_0-512_0-512" ] }`. This file tells Neuroglancer and CloudVolume which mesh files to download when accessing a given segment ID.
The manually actuated second stage runs the `MeshManifestTask` which generates files named `$SEGID:0` which contains a short JSON snippet like `{ "fragments": [ "1052:0:0-512_0-512_0-512" ] }`. This file tells Neuroglancer and CloudVolume which mesh files to download when accessing a given segment ID.

#### Sharded Meshing

Sharded Meshes are not only condensed, but also draco encoded with an integer position attribute. The files must be initially meshed and then a set of meshes gathered into the memory of a single machine which can then synthesize the shard file. This requires more time and memory to generate than unsharded meshes, but simplifies management of the resultant data set by creating far fewer files. The shard files have names like `a31.shard`. A sharded dataset is indicated by the info file in the mesh directory having `{ "@type": "neuroglancer_multilod_draco" }`. In the future, multiscale meshes will be supported, but for now we only generate a single resolution.

#### CLI Meshing

The CLI supports only standard Precomputed. Graphene is not currently supported. There are many more options, check out `igneous mesh --help`, `igneous mesh forge --help`, and `igneous mesh merge --help`.

```bash
# Standard Unsharded Meshing
igneous mesh forge $PATH --mip 2 --queue $QUEUE
igneous execute $QUEUE
igneous mesh merge $PATH --magnitude 2 --queue $QUEUE
igneous execute $QUEUE

# Sharded Meshing
igneous mesh forge $PATH --mip 2 --queue $QUEUE --sharded
igneous execute $QUEUE
igneous mesh merge-sharded $PATH --queue $QUEUE
igneous execute $QUEUE
```

#### Scripting Meshing
Expand Down Expand Up @@ -440,7 +457,7 @@ an additional 10^magnitude. A high magnitude (3-5+) is appropriate for horizonta

In the future, a third stage might be introduced that fuses all the small fragments into a single file.

Of note: Meshing is a memory intensive operation. The underlying zmesh library has an optimization for meshing volumes smaller than 512 voxels on the X and Y dimensions which could be helpful to take advantage of. Meshing time scales with the number of labels contained in the volume.
Of note: Meshing is a memory intensive operation. The underlying zmesh library has an optimization for meshing volumes smaller than 1024 voxels on the X and Y dimensions which could be helpful to take advantage of. Meshing time scales with the number of labels contained in the volume.

### Skeletonization (SkeletonTask, SkeletonMergeTask)

Expand Down
64 changes: 64 additions & 0 deletions igneous/task_creation/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,67 @@ def graphene_prefixes(
prefixes.add(num)

return prefixes

def compute_shard_params_for_hashed(
num_labels, shard_index_bytes=2**13, minishard_index_bytes=2**15
):
"""
Computes the shard parameters for objects that
have been randomly hashed (e.g. murmurhash) so
that the keys are evenly distributed. This is
applicable to skeletons and meshes.

The equations come from the following assumptions.
a. The keys are approximately uniformly randomly distributed.
b. Preshift bits aren't useful for random keys so are zero.
c. Our goal is to optimize the size of the shard index and
the minishard indices to be reasonably sized. The default
values are set for a 100 Mbps connection.
d. The equations below come from finding a solution to
these equations given the constraints provided.

num_shards * num_minishards_per_shard
= 2^(shard_bits) * 2^(minishard_bits)
= num_labels_in_dataset / labels_per_minishard

# from defininition of minishard_bits assuming fixed capacity
labels_per_minishard = minishard_index_bytes / 3 / 8

# from definition of minishard bits
minishard_bits = ceil(log2(shard_index_bytes / 2 / 8))

Returns: (shard_bits, minishard_bits, preshift_bits)
"""
if num_labels <= 0:
return (0,0,0)

num_minishards_per_shard = shard_index_bytes / 2 / 8
labels_per_minishard = minishard_index_bytes / 3 / 8
labels_per_shard = num_minishards_per_shard * labels_per_minishard

if num_labels >= labels_per_shard:
minishard_bits = np.ceil(np.log2(num_minishards_per_shard))
shard_bits = np.ceil(np.log2(
num_labels / (labels_per_minishard * (2 ** minishard_bits))
))
elif num_labels >= labels_per_minishard:
minishard_bits = np.ceil(np.log2(
num_labels / labels_per_minishard
))
shard_bits = 0
else:
minishard_bits = 0
shard_bits = 0

capacity = labels_per_shard * (2 ** shard_bits)
utilized_capacity = num_labels / capacity

# Try to pack shards to capacity, allow going
# about 10% over the input level.
if utilized_capacity <= 0.55:
shard_bits -= 1

minishard_bits = max(minishard_bits, 0)
shard_bits = max(shard_bits, 0)

return (int(shard_bits), int(minishard_bits), 0)
145 changes: 131 additions & 14 deletions igneous/task_creation/mesh.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
from collections import defaultdict
from functools import reduce, partial
from typing import (
Any, Dict, Optional,
Expand All @@ -9,20 +10,24 @@
from time import strftime

import numpy as np
from tqdm import tqdm

import cloudvolume
import cloudvolume.exceptions
from cloudvolume import CloudVolume
from cloudvolume.lib import Vec, Bbox, max2, min2, xyzrange, find_closest_divisor, yellow, jsonify
from cloudvolume.datasource.precomputed.sharding import ShardingSpecification
from cloudfiles import CloudFiles

from igneous.tasks import (
MeshTask, MeshManifestTask, GrapheneMeshTask,
MeshSpatialIndex, MultiResUnshardedMeshMergeTask
MeshSpatialIndex, MultiResShardedMeshMergeTask,
MultiResUnshardedMeshMergeTask
)
from .common import (
operator_contact, FinelyDividedTaskIterator,
get_bounds, num_tasks, graphene_prefixes
get_bounds, num_tasks, graphene_prefixes,
compute_shard_params_for_hashed
)

__all__ = [
Expand All @@ -32,6 +37,7 @@
"create_graphene_hybrid_mesh_manifest_tasks",
"create_spatial_index_mesh_tasks",
"create_unsharded_multires_mesh_tasks",
"create_sharded_multires_mesh_tasks",
]

# split the work up into ~1000 tasks (magnitude 3)
Expand Down Expand Up @@ -297,23 +303,20 @@ def on_finish(self):

return SpatialIndexMeshTaskIterator(vol.bounds, shape)

def create_unsharded_multires_mesh_tasks(
cloudpath:str, mip:int, num_lod:int = 1,
magnitude:int = 3, mesh_dir:str = None,
vertex_quantization_bits:int = 16
) -> Iterator:
def configure_multires_info(
cloudpath:str,
vertex_quantization_bits:int,
mesh_dir:str
):
"""
vertex_quantization_bits: 10 or 16. Adjusts the precision
of mesh vertices.
Computes properties and uploads a multires
mesh info file
"""
# split the work up into ~1000 tasks (magnitude 3)
assert int(magnitude) == magnitude
assert vertex_quantization_bits in (10, 16)

vol = CloudVolume(cloudpath, mip=mip)
vol = CloudVolume(cloudpath)

if mesh_dir is None:
mesh_dir = f"mesh_mip_{mip}_err_40"
mesh_dir = mesh_dir or vol.info.get("mesh", None)

if not "mesh" in vol.info:
vol.info['mesh'] = mesh_dir
Expand All @@ -340,6 +343,26 @@ def create_unsharded_multires_mesh_tasks(
cache_control="no-cache"
)

def create_unsharded_multires_mesh_tasks(
cloudpath:str, num_lod:int = 1,
magnitude:int = 3, mesh_dir:str = None,
vertex_quantization_bits:int = 16
) -> Iterator:
"""
vertex_quantization_bits: 10 or 16. Adjusts the precision
of mesh vertices.
"""
# split the work up into ~1000 tasks (magnitude 3)
assert int(magnitude) == magnitude

configure_multires_info(
cloudpath,
vertex_quantization_bits,
mesh_dir
)

vol = CloudVolume(cloudpath, mip=mip)

start = 10 ** (magnitude - 1)
end = 10 ** magnitude

Expand All @@ -365,3 +388,97 @@ def __iter__(self):
)

return UnshardedMultiResTaskIterator()

def create_sharded_multires_mesh_tasks(
cloudpath:str,
shard_index_bytes=2**13,
minishard_index_bytes=2**15,
num_lod:int = 1,
draco_compression_level:int = 1,
vertex_quantization_bits:int = 16,
minishard_index_encoding="gzip",
mesh_dir:Optional[str] = None,
spatial_index_db:Optional[str] = None
) -> Iterator[MultiResShardedMeshMergeTask]:

configure_multires_info(
cloudpath,
vertex_quantization_bits,
mesh_dir
)

# rebuild b/c sharding changes the mesh source class
cv = CloudVolume(cloudpath, progress=True, spatial_index_db=spatial_index_db)
cv.mip = cv.mesh.meta.mip

# 17 sec to download for pinky100
all_labels = cv.mesh.spatial_index.query(cv.bounds * cv.resolution)

(shard_bits, minishard_bits, preshift_bits) = \
compute_shard_params_for_hashed(
num_labels=len(all_labels),
shard_index_bytes=int(shard_index_bytes),
minishard_index_bytes=int(minishard_index_bytes),
)

spec = ShardingSpecification(
type='neuroglancer_uint64_sharded_v1',
preshift_bits=preshift_bits,
hash='murmurhash3_x86_128',
minishard_bits=minishard_bits,
shard_bits=shard_bits,
minishard_index_encoding=minishard_index_encoding,
data_encoding="raw", # draco encoded meshes
)

cv.mesh.meta.info['sharding'] = spec.to_dict()
cv.mesh.meta.commit_info()

cv = CloudVolume(cloudpath)

# perf: ~66.5k hashes/sec on M1 ARM64
shardfn = lambda lbl: cv.mesh.reader.spec.compute_shard_location(lbl).shard_number

shard_labels = defaultdict(list)
for label in tqdm(all_labels, desc="Hashes"):
shard_labels[shardfn(label)].append(label)
del all_labels

cf = CloudFiles(cv.skeleton.meta.layerpath, progress=True)
files = (
(str(shardno) + '.labels', labels)
for shardno, labels in shard_labels.items()
)
cf.put_jsons(
files, compress="gzip",
cache_control="no-cache", total=len(shard_labels)
)

cv.provenance.processing.append({
'method': {
'task': 'MultiResShardedMeshMergeTask',
'cloudpath': cloudpath,
'mip': cv.mesh.meta.mip,
'num_lod': num_lod,
'vertex_quantization_bits': vertex_quantization_bits,
'preshift_bits': preshift_bits,
'minishard_bits': minishard_bits,
'shard_bits': shard_bits,
'mesh_dir': mesh_dir,
'draco_compression_level': draco_compression_level,
},
'by': operator_contact(),
'date': strftime('%Y-%m-%d %H:%M %Z'),
})
cv.commit_provenance()

return [
partial(MultiResShardedMeshMergeTask,
cloudpath, shard_no,
num_lod=num_lod,
mesh_dir=mesh_dir,
spatial_index_db=spatial_index_db,
draco_compression_level=draco_compression_level,
)
for shard_no in shard_labels.keys()
]
31 changes: 21 additions & 10 deletions igneous/task_creation/skeleton.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from .common import (
operator_contact, FinelyDividedTaskIterator,
get_bounds, num_tasks
get_bounds, num_tasks, compute_shard_params_for_hashed
)

__all__ = [
Expand Down Expand Up @@ -267,10 +267,24 @@ def __iter__(self):

def create_sharded_skeleton_merge_tasks(
layer_path, dust_threshold, tick_threshold,
preshift_bits, minishard_bits, shard_bits,
shard_index_bytes=2**13,
minishard_index_bytes=2**15,
minishard_index_encoding='gzip', data_encoding='gzip',
max_cable_length=None
max_cable_length=None, spatial_index_db=None
):
cv = CloudVolume(layer_path, progress=True, spatial_index_db=spatial_index_db)
cv.mip = cv.skeleton.meta.mip

# 17 sec to download for pinky100
all_labels = cv.skeleton.spatial_index.query(cv.bounds * cv.resolution)

(shard_bits, minishard_bits, preshift_bits) = \
compute_shard_params_for_hashed(
num_labels=len(all_labels),
shard_index_bytes=int(shard_index_bytes),
minishard_index_bytes=int(minishard_index_bytes),
)

spec = ShardingSpecification(
type='neuroglancer_uint64_sharded_v1',
preshift_bits=preshift_bits,
Expand All @@ -280,16 +294,13 @@ def create_sharded_skeleton_merge_tasks(
minishard_index_encoding=minishard_index_encoding,
data_encoding=data_encoding,
)

cv = CloudVolume(layer_path)
cv.skeleton.meta.info['sharding'] = spec.to_dict()
cv.skeleton.meta.commit_info()

cv = CloudVolume(layer_path, progress=True) # rebuild b/c sharding changes the skeleton object
# rebuild b/c sharding changes the skeleton source
cv = CloudVolume(layer_path, progress=True, spatial_index_db=spatial_index_db)
cv.mip = cv.skeleton.meta.mip

# 17 sec to download for pinky100
all_labels = cv.skeleton.spatial_index.query(cv.bounds * cv.resolution)
# perf: ~36k hashes/sec
shardfn = lambda lbl: cv.skeleton.reader.spec.compute_shard_location(lbl).shard_number

Expand Down Expand Up @@ -324,14 +335,14 @@ def create_sharded_skeleton_merge_tasks(
})
cv.commit_provenance()

return (
return [
ShardedSkeletonMergeTask(
layer_path, shard_no,
dust_threshold, tick_threshold,
max_cable_length=max_cable_length
)
for shard_no in shard_labels.keys()
)
]

# split the work up into ~1000 tasks (magnitude 3)
def create_unsharded_skeleton_merge_tasks(
Expand Down
1 change: 1 addition & 0 deletions igneous/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
MeshTask, MeshManifestTask,
GrapheneMeshTask, MeshSpatialIndex,
MultiResUnshardedMeshMergeTask,
MultiResShardedMeshMergeTask,
)
from .image import (
HyperSquareConsensusTask, #HyperSquareTask,
Expand Down
Loading