Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graph creation fails when copying the dask input dataframe #3790

Closed
1 task
jnke2016 opened this issue Aug 14, 2023 · 4 comments · Fixed by #3895
Closed
1 task

Graph creation fails when copying the dask input dataframe #3790

jnke2016 opened this issue Aug 14, 2023 · 4 comments · Fixed by #3895
Assignees
Labels
bug Something isn't working CRITICAL BUG! BUG that needs to be FIX NOW !!!!

Comments

@jnke2016
Copy link
Contributor

jnke2016 commented Aug 14, 2023

Describe the bug.

The cuGraph graph creation from a dask cudf dataframe fails when copying the input dask dataframe with dask map_partitions right before equally persisting the dask dataframe, for cases where some partitions have empty dataframe.

Minimum reproducible example

    # The datasets must have less edges than there are workers
    ddf = dask_cudf.read_csv(
        input_data_path,
        chunksize=chunksize,
        delimiter=" ",
        names=["src", "dst", "value"],
        dtype=["int32", "int32", "float32"],
    )

    dg1 = cugraph.Graph(directed=True)
    dg1.from_dask_cudf_edgelist(ddf, source="src", destination="dst", edge_attr="value")

    dg2 = cugraph.Graph(directed=True)
    dg2.from_dask_cudf_edgelist(ddf, source="src", destination="dst", edge_attr="value")

Relevant log output

def set_by_label(self, key: Any, value: Any, validate: bool = True):
            """
            Add (or modify) column by name.
    
            Parameters
            ----------
            key
                name of the column
            value : column-like
                The value to insert into the column.
            validate : bool
                If True, the provided value will be coerced to a column and
                validated before setting (Default value = True).
            """
            key = self._pad_key(key)
            if validate:
                value = column.as_column(value)
                if len(self._data) > 0:
                    if len(value) != self._column_length:
                        raise ValueError("All columns must be of equal length")
                else:
                    self._column_length = len(value)
    
            self._data[key] = value
            self._clear_cache()
    
        def _select_by_names(self, names: abc.Sequence) -> Self:
            return self.__class__(
                {key: self[key] for key in names},
                multiindex=self.multiindex,
                level_names=self.level_names,
            )
    
        def _select_by_label_list_like(self, key: Any) -> ColumnAccessor:
            # Might be a generator
            key = tuple(key)
            # Special-casing for boolean mask
            if (bn := len(key)) > 0 and all(map(is_bool, key)):
                if bn != (n := len(self.names)):
                    raise IndexError(
                        f"Boolean mask has wrong length: {bn} not {n}"
                    )
                data = dict(
                    item
                    for item, keep in zip(self._grouped_data.items(), key)
                    if keep
                )
            else:
                data = {k: self._grouped_data[k] for k in key}
            if self.multiindex:
                data = _to_flat_dict(data)
            return self.__class__(
                data,
                multiindex=self.multiindex,
                level_names=self.level_names,
            )
    
        def _select_by_label_grouped(self, key: Any) -> ColumnAccessor:
>           result = self._grouped_data[key]
E           KeyError: 'src'

Tasks

Preview Give feedback
@jnke2016 jnke2016 added ? - Needs Triage Need team to review and classify bug Something isn't working labels Aug 14, 2023
@kingmesal kingmesal removed the ? - Needs Triage Need team to review and classify label Aug 14, 2023
@rlratzel rlratzel added the CRITICAL BUG! BUG that needs to be FIX NOW !!!! label Aug 18, 2023
@jnke2016
Copy link
Contributor Author

jnke2016 commented Aug 22, 2023

Below is a description of the current issue with more details. After creating the PLC graph from a dask_cudf, we eventually delete what seems to be a copy of the dataframe for memory efficiency reason. When creating another PLC graph with the same input dataframe that was used in the first graph creation, it turns out that some of the partition of that dask dataframe are empty with no metadata causing a lazy failure during the copy with map_partitions. It appears that things are not cleanup properly and below is cudf and dask only reproducer.

import cudf
from dask.distributed import wait, futures_of, default_client
from dask import delayed
import dask_cudf
import gc
​
​
def setup_function():
    gc.collect()
​
​
def _create_empty_dask_df_future(meta_df, client, worker):
    df_future = client.scatter(meta_df.head(0), workers=[worker])
    wait(df_future)
    return [df_future]
​
def get_persisted_df_worker_map(dask_df, client):
    ddf_keys = futures_of(dask_df)
    output_map = {}
    for w, w_keys in client.has_what().items():
        output_map[w] = [ddf_k for ddf_k in ddf_keys if str(ddf_k.key) in w_keys]
        if len(output_map[w]) == 0:
            output_map[w] = _create_empty_dask_df_future(dask_df._meta, client, w)
    return output_map
​
​
def _chunk_lst(ls, num_parts):
    return [ls[i::num_parts] for i in range(num_parts)]
​
def persist_dask_df_equal_parts_per_worker(dask_df, client):
    ddf_keys = dask_df.to_delayed()
    workers = client.scheduler_info()["workers"].keys()
    ddf_keys_ls = _chunk_lst(ddf_keys, len(workers))
    persisted_keys = []
    for w, ddf_k in zip(workers, ddf_keys_ls):
        persisted_keys.extend(
            client.persist(ddf_k, workers=w, allow_other_workers=False)
        )
    dask_df = dask_cudf.from_delayed(persisted_keys, meta=dask_df._meta).persist()
    wait(dask_df)
    client.rebalance(dask_df)
    return dask_df
​
​
def _get_column_from_ls_dfs_(lst_df, col_name):
        len_df = sum([len(df) for df in lst_df])
        if len_df == 0:
            return lst_df[0][col_name]
        output_col = cudf.concat([df[col_name] for df in lst_df], ignore_index=True)
        for df in lst_df:
            df.drop(columns=[col_name], inplace=True)
​
        #print("lst_df = ", lst_df)
        gc.collect()
        return output_col
​
class graph:
    def __init__(self, directed):
        self.directed = directed
        self.input_df = None
        self._plc_graph = None
​
    def _make_plc_graph_(
            edata_x,
            src_col_name,
            dst_col_name,
            edge_attr,
        ):
​
        _get_column_from_ls_dfs_(
            edata_x, edge_attr
        )
        _get_column_from_ls_dfs_(edata_x, src_col_name)
        _get_column_from_ls_dfs_(edata_x, dst_col_name)
        print("input ddf after deletion\n", edata_x)
        return None
​
    def from_edgelist_(
            self,
            input_ddf,
            source="source",
            destination="destination",
            edge_attr=None,
    ):
        
        _client = default_client()
        workers = _client.scheduler_info()["workers"]
​
        ddf = input_ddf
        ddf = ddf.repartition(npartitions=len(workers) * 2)
​
        src_col_name = source
        dst_col_name = destination
​
        ddf = ddf.map_partitions(lambda df: df.copy())
        #ddf = ddf.copy()
        print("Number of partitions after the copy ", ddf.npartitions)
        for n in range(ddf.npartitions):
            print("n = ", n, " len = ", len(ddf.get_partition(n)))
        ddf = persist_dask_df_equal_parts_per_worker(ddf, _client)
        ddf = get_persisted_df_worker_map(ddf, _client)
​
        delayed_tasks_d = {
            w: delayed(graph._make_plc_graph_)(
                edata,
                src_col_name,
                dst_col_name,
                edge_attr,
            )
            for w, edata in ddf.items()
        }
        del ddf
        self._plc_graph = {
            w: _client.compute(delayed_task, workers=w, allow_other_workers=False)
            for w, delayed_task in delayed_tasks_d.items()
        }
        wait(list(self._plc_graph.values()))
        del delayed_tasks_d
        _client.run(gc.collect)
​
​
​
def test_reproducer_mg(dask_client):
    # Read graph
    input_data_path = "path_to_csv_file"
​
    # Below are the edges that should be in the CSV file
    """
    1 0 1.0
    2 0 1.0
    """
    chunksize = 8
    print("the chunksize = ", chunksize)
    ddf = dask_cudf.read_csv(
        input_data_path,
        chunksize=chunksize,
        delimiter=" ",
        names=["src", "dst", "value"],
        dtype=["int32", "int32", "float32"],
    )
​
    G_1 = graph(directed=True)
    print("**********first graph creation************")
    G_1.from_edgelist_(ddf, source="src", destination="dst", edge_attr="value")
​
    G_2 = graph(directed=True)
    print("**********second graph creation************")
    G_2.from_edgelist_(ddf, source="src", destination="dst", edge_attr="value")
​
    print("done creating the graph")

@jnke2016
Copy link
Contributor Author

jnke2016 commented Aug 23, 2023

it turns out that some of the partition of that dask dataframe are empty with no metadata causing a lazy failure during the copy with map_partitions

A copy of the dask graph with ddf.copy() yields the same results. A PR temporarily disabling the deletion of the dask dataframe passed CI twice consecutively after a copy with ddf.copy() failed

rlratzel added a commit to rlratzel/cugraph that referenced this issue Aug 29, 2023
rapids-bot bot pushed a commit that referenced this issue Aug 30, 2023
This PR is on top off the changes from #3831. 

Temporarily disables single-GPU "MG" tests in CI until #3790 is closed.
This will unblock CI for PRs unrelated to the issue in #3790 at the risk of removed coverage for MG code paths. Hopefully nightly MG testing will minimize the risk.
A followup PR will be submitted that re-enables the tests and must be merged prior to 23.10 burndown.

Authors:
  - Naim (https://github.com/naimnv)
  - Rick Ratzel (https://github.com/rlratzel)

Approvers:
  - Rick Ratzel (https://github.com/rlratzel)
  - Ray Douglass (https://github.com/raydouglass)

URL: #3833
rlratzel pushed a commit to rlratzel/cugraph that referenced this issue Sep 8, 2023
This PR is on top off the changes from rapidsai#3831.

Temporarily disables single-GPU "MG" tests in CI until rapidsai#3790 is closed.
This will unblock CI for PRs unrelated to the issue in rapidsai#3790 at the risk of removed coverage for MG code paths. Hopefully nightly MG testing will minimize the risk.
A followup PR will be submitted that re-enables the tests and must be merged prior to 23.10 burndown.

Authors:
  - Naim (https://github.com/naimnv)
  - Rick Ratzel (https://github.com/rlratzel)

Approvers:
  - Rick Ratzel (https://github.com/rlratzel)
  - Ray Douglass (https://github.com/raydouglass)

URL: rapidsai#3833
@rlratzel
Copy link
Contributor

This issue was created for the dask-cudf team.

rapids-bot bot pushed a commit that referenced this issue Sep 26, 2023
Enable TEMPORARILY disable single-GPU "MG" tests
And Skip deleting copied Dataframe while creating distributed graph from cudf edge-lists.
Ideally we would like to merger this PR once the [issue 3790](#3790) is closed, but me  might need to merger it if the issue is not resolved before the next release.

Authors:
  - Naim (https://github.com/naimnv)

Approvers:
  - Rick Ratzel (https://github.com/rlratzel)
  - AJ Schmidt (https://github.com/ajschmidt8)

URL: #3837
@rlratzel
Copy link
Contributor

The above issue was closed after this PR was merged.
The above issue has additional details, specifically why we may need to reopen this, but for now it's closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working CRITICAL BUG! BUG that needs to be FIX NOW !!!!
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants