Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spatial index database #93

Merged
merged 4 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cloud/google/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ def GenerateConfig(context):

if "nfsServer" in context.properties:
worker_subnetworks.add(context.properties['nfsServer']['subnetwork'])
metadata = context.properties['nfsServer']
metadata['hostname'] = hostname_nfs_server
worker_metadata.append({
'key': 'nfs-server',
'value': json.dumps(context.properties['nfsServer'])
'value': json.dumps(metadata)
})

manager_resource = GenerateManager(context, hostname_manager, hostname_nfs_server, worker_metadata)
Expand Down
2 changes: 2 additions & 0 deletions cloud/google/nfs_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def GenerateNFSServerStartupScript(context, hostname_manager):
mkfs.ext4 -F /dev/sdb
mount /dev/sdb /share
chmod 777 /share
mkdir -p /share/mariadb
apt-get install nfs-kernel-server -y
echo "/share 172.31.0.0/16(insecure,rw,async,no_subtree_check)" >> /etc/exports
echo "ALL: 172.31.0.0/16" >> /etc/hosts.allow
Expand All @@ -47,6 +48,7 @@ def GenerateNFSServerStartupScript(context, hostname_manager):
mount /dev/sdb /share
chmod 777 /share
systemctl restart nfs-kernel-server.service
docker run --rm -p 3306:3306 --tmpfs /tmp:rw -v /share/mariadb:/var/lib/mysql --env MARIADB_ROOT_PASSWORD=igneous --env MARIADB_USER=igneous --env MARIADB_PASSWORD=igneous mariadb:latest --max-connections=10000 --innodb-buffer-pool-size=10737418240 >& /dev/null &
{oom_canary_cmd} &
{worker_cmd}

Expand Down
7 changes: 7 additions & 0 deletions dags/dag_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,10 @@ def remove_workers(queue):
tis = query_task_instances(queue=queue)
for ti in tis:
ti.set_state(State.SUCCESS)


def db_name(run_name, data_ext):
import re
prefix = re.sub(r'[^a-z0-9]', '_', run_name.strip().lower()).lstrip('_')
max_len = 63 - len(data_ext)
return prefix[:max_len] + '_' + data_ext
40 changes: 38 additions & 2 deletions dags/igneous_and_cloudvolume.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,14 +566,43 @@ def mesh(run_name, seg_cloudpath, mesh_quality, sharded, frag_path=None):
return tasks_with_metadata(metadata, tasks)


@mount_secrets
def ingest_spatial_index(run_name, seg_cloudpath, sql_url, db_ext):
import os
from cloudvolume import CloudVolume
from dag_utils import db_name
from slack_message import slack_message
if sql_url:
spatial_index_db = os.path.join(sql_url, db_name(run_name, db_ext))
vol = CloudVolume(seg_cloudpath)
if db_ext == "mesh":
slack_message(f":arrow_forward: Ingesting spatial_index for meshes to {db_name(run_name, db_ext)}")
vol.mesh.spatial_index.to_sql(spatial_index_db, parallel=8)
elif db_ext == "skeleton":
slack_message(f":arrow_forward: Ingesting spatial_index for skeletons to {db_name(run_name, db_ext)}")
vol.skeleton.spatial_index.to_sql(spatial_index_db, parallel=8)
else:
slack_message(f":exclamation:*Error* Cannot ingest spatial_index for {db_ext}")
else:
slack_message(":arrow_forward: No database configured, skip ingesting spatial index")



@mount_secrets
@kombu_tasks(cluster_name="igneous", init_workers=8)
def merge_mesh_fragments(run_name, seg_cloudpath, concurrency, frag_path=None):
def merge_mesh_fragments(run_name, seg_cloudpath, concurrency, frag_path=None, sql_url=None):
import os
import igneous.task_creation as tc
from slack_message import slack_message
from dag_utils import db_name
if sql_url:
spatial_index_db = os.path.join(sql_url, db_name(run_name, "mesh"))
else:
spatial_index_db = None
tasks = tc.create_sharded_multires_mesh_tasks(seg_cloudpath,
num_lod=8,
frag_path=frag_path,
spatial_index_db=spatial_index_db,
cache=True,
max_labels_per_shard=10000)
slack_message(":arrow_forward: Merge mesh fragments `{}`: {} tasks in total".format(seg_cloudpath, len(tasks)))
Expand Down Expand Up @@ -679,14 +708,21 @@ def create_skeleton_fragments(run_name, seg_cloudpath, teasar_param, frag_path=N

@mount_secrets
@kombu_tasks(cluster_name="igneous", init_workers=4)
def merge_skeleton_fragments(run_name, seg_cloudpath, frag_path=None):
def merge_skeleton_fragments(run_name, seg_cloudpath, frag_path=None, sql_url=None):
import os
import igneous.task_creation as tc
from slack_message import slack_message
from dag_utils import db_name
if sql_url:
spatial_index_db = os.path.join(sql_url, db_name(run_name, "skeleton"))
else:
spatial_index_db = None
tasks = tc.create_sharded_skeleton_merge_tasks(seg_cloudpath,
dust_threshold=1000,
tick_threshold=3500,
minishard_index_encoding='gzip', # or None
frag_path=frag_path,
spatial_index_db=spatial_index_db,
cache=True,
data_encoding='gzip', # or None
max_labels_per_shard=10000,
Expand Down
42 changes: 37 additions & 5 deletions dags/igneous_ops.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from airflow.operators.python_operator import PythonOperator
from airflow.utils.weight_rule import WeightRule
from slack_message import task_retry_alert
from igneous_and_cloudvolume import downsample, downsample_for_meshing, mesh, mesh_manifest, merge_mesh_fragments, create_skeleton_fragments, merge_skeleton_fragments
from igneous_and_cloudvolume import downsample, downsample_for_meshing, ingest_spatial_index, mesh, mesh_manifest, merge_mesh_fragments, create_skeleton_fragments, merge_skeleton_fragments, ingest_spatial_index
from helper_ops import placeholder_op
from dag_utils import get_connection
from dag_utils import get_connection, db_name


def create_igneous_ops(param, dag):
import os
Expand All @@ -12,11 +13,16 @@ def create_igneous_ops(param, dag):
ops = [placeholder_op(dag, "start_igneous_tasks")]
run_name = f'{param["NAME"]}.segmentation'

if get_connection("NFSServer"):
nfs_kwargs = {"frag_path": f"file:///share/{run_name}"}
nfs_conn = get_connection("NFSServer")

if nfs_conn:
extra_args = nfs_conn.extra_dejson
sql_url = f"mysql://root:igneous@{extra_args['hostname']}"
nfs_kwargs = {"frag_path": f"file:///share/{run_name}", "sql_url": sql_url}
queue = "nfs"
else:
nfs_kwargs = {"frag_path": None}
nfs_kwargs = {"frag_path": None, "sql_url": None}
sql_url = None
queue = "manager"

if not param.get("SKIP_DOWNSAMPLE", False):
Expand Down Expand Up @@ -48,6 +54,19 @@ def create_igneous_ops(param, dag):
ops[-1] >> current_op
ops.append(current_op)

if sql_url and param.get("SHARDED_MESH", True):
current_op = PythonOperator(
task_id="ingest_spatial_index_mesh",
python_callable=ingest_spatial_index,
op_args=[run_name, seg_cloudpath, sql_url, "mesh",],
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue=queue,
dag=dag
)
ops[-1] >> current_op
ops.append(current_op)

if param.get("SHARDED_MESH", True):
current_op = PythonOperator(
task_id="merge_mesh_fragments",
Expand Down Expand Up @@ -107,6 +126,19 @@ def create_igneous_ops(param, dag):
ops[-1] >> current_op
ops.append(current_op)

if sql_url:
current_op = PythonOperator(
task_id="ingest_spatial_index_skeleton",
python_callable=ingest_spatial_index,
op_args=[run_name, seg_cloudpath, sql_url, "skeleton",],
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue=queue,
dag=dag
)
ops[-1] >> current_op
ops.append(current_op)

current_op = PythonOperator(
task_id="merge_skeleton",
python_callable=merge_skeleton_fragments,
Expand Down
1 change: 1 addition & 0 deletions pipeline/init_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def parse_metadata():
elif item["key"] == "nfs-server":
worker = json.loads(item["value"])
metadata["nfs-server"] = {
'hostname': worker['hostname'],
'zone': worker['zone'],
}

Expand Down
Loading