Skip to content

Commit

Permalink
Fix duplicate node iteration in GraphAssigner (#2857)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaokunyang authored Mar 24, 2022
1 parent e12963d commit fabe7fa
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
46 changes: 46 additions & 0 deletions asv_bench/benchmarks/graph_assigner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import mars.tensor as mt
import mars.dataframe as md
from mars.core.graph import TileableGraph, TileableGraphBuilder, ChunkGraphBuilder
from mars.services.task.analyzer import GraphAnalyzer
from mars.services.task.analyzer.assigner import GraphAssigner


class ChunkGraphAssignerSuite:
"""
Benchmark that times performance of chunk graph assigner
"""

def setup(self):
num_rows = 10000
df1 = md.DataFrame(
mt.random.rand(num_rows, 4, chunk_size=10), columns=list("abcd")
)
df2 = md.DataFrame(
mt.random.rand(num_rows, 4, chunk_size=10), columns=list("abcd")
)
merged_df = df1.merge(df2, left_on="a", right_on="a")
graph = TileableGraph([merged_df.data])
next(TileableGraphBuilder(graph).build())
self.chunk_graph = next(ChunkGraphBuilder(graph, fuse_enabled=False).build())

def time_assigner(self):
start_ops = list(GraphAnalyzer._iter_start_ops(self.chunk_graph))
band_slots = {(f"worker-{i}", "numa-0"): 16 for i in range(50)}
current_assign = {}
assigner = GraphAssigner(self.chunk_graph, start_ops, band_slots)
assigned_result = assigner.assign(current_assign)
assert len(assigned_result) == len(start_ops)
7 changes: 4 additions & 3 deletions mars/services/task/analyzer/assigner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from abc import ABC, abstractmethod
from collections import defaultdict
from operator import itemgetter
from typing import List, Dict, Set
from typing import List, Dict, Set, Union

import numpy as np

Expand Down Expand Up @@ -130,7 +130,7 @@ def _assign_by_bfs(
initial_sizes: Dict[BandType, int],
spread_limits: Dict[BandType, float],
key_to_assign: Set[str],
assigned_record: Dict[str, int],
assigned_record: Dict[str, Union[str, BandType]],
):
"""
Assign initial nodes using breath-first search given initial sizes and
Expand All @@ -151,9 +151,10 @@ def _assign_by_bfs(
if op_key in assigned_record:
continue
spread_range += 1
# `op_key` may not be in `key_to_assign`, but we need to record it to avoid iterate the node repeatedly.
assigned_record[op_key] = band
if op_key not in key_to_assign:
continue
assigned_record[op_key] = band
assigned += 1
if spread_range >= spread_limits[band] or assigned >= initial_sizes[band]:
break
Expand Down

0 comments on commit fabe7fa

Please sign in to comment.