From f63a2903f1c45ebb81d98bfc6d80a08a4c9a97e2 Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Tue, 22 Aug 2023 08:05:48 -0700 Subject: [PATCH 1/8] remove deprecated sampling options, support other sequence types --- .../dask/sampling/uniform_neighbor_sample.py | 333 +----------------- .../sampling/uniform_neighbor_sample.py | 179 +--------- .../graph_implementation/simpleGraph.py | 2 +- 3 files changed, 28 insertions(+), 486 deletions(-) diff --git a/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py b/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py index 88fab60120d..f48d2c25263 100644 --- a/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py @@ -287,112 +287,6 @@ def _call_plc_uniform_neighbor_sample( ) -def _call_plc_uniform_neighbor_sample_legacy( - sID, - mg_graph_x, - st_x, - label_list, - label_to_output_comm_rank, - fanout_vals, - with_replacement, - weight_t, - with_edge_properties, - random_state=None, - return_offsets=False, - return_hops=True, -): - start_list_x = st_x[start_col_name] - batch_id_list_x = st_x[batch_col_name] if batch_col_name in st_x else None - cp_arrays = pylibcugraph_uniform_neighbor_sample( - resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()), - input_graph=mg_graph_x, - start_list=start_list_x, - label_list=label_list, - label_to_output_comm_rank=label_to_output_comm_rank, - h_fan_out=fanout_vals, - with_replacement=with_replacement, - do_expensive_check=False, - with_edge_properties=with_edge_properties, - batch_id_list=batch_id_list_x, - random_state=random_state, - return_hops=return_hops, - ) - - output = convert_to_cudf( - cp_arrays, weight_t, with_edge_properties, return_offsets=return_offsets - ) - - if isinstance(output, (list, tuple)) and len(output) == 1: - return output[0] - return output - - -def _mg_call_plc_uniform_neighbor_sample_legacy( - client, - session_id, - input_graph, - ddf, - label_list, - label_to_output_comm_rank, - fanout_vals, - with_replacement, - weight_t, - indices_t, - with_edge_properties, - random_state, - return_offsets=False, - return_hops=True, -): - result = [ - client.submit( - _call_plc_uniform_neighbor_sample_legacy, - session_id, - input_graph._plc_graph[w], - ddf[w][0], - label_list, - label_to_output_comm_rank, - fanout_vals, - with_replacement, - weight_t=weight_t, - with_edge_properties=with_edge_properties, - # FIXME accept and properly transmute a numpy/cupy random state. - random_state=hash((random_state, i)), - workers=[w], - allow_other_workers=False, - pure=False, - return_offsets=return_offsets, - return_hops=return_hops, - ) - for i, w in enumerate(Comms.get_workers()) - ] - - empty_df = ( - create_empty_df_with_edge_props( - indices_t, weight_t, return_offsets=return_offsets - ) - if with_edge_properties - else create_empty_df(indices_t, weight_t) - ) - - if return_offsets: - result = [delayed(lambda x: x, nout=2)(r) for r in result] - ddf = dask_cudf.from_delayed( - [r[0] for r in result], meta=empty_df[0], verify_meta=False - ).persist() - ddf_offsets = dask_cudf.from_delayed( - [r[1] for r in result], meta=empty_df[1], verify_meta=False - ).persist() - wait(ddf) - wait(ddf_offsets) - wait([r.release() for r in result]) - return ddf, ddf_offsets - else: - ddf = dask_cudf.from_delayed(result, meta=empty_df, verify_meta=False).persist() - wait(ddf) - wait([r.release() for r in result]) - return ddf - - def _mg_call_plc_uniform_neighbor_sample( client, session_id, @@ -501,170 +395,12 @@ def _mg_call_plc_uniform_neighbor_sample( return tuple(return_dfs) -def _uniform_neighbor_sample_legacy( - input_graph: Graph, - start_list: Sequence, - fanout_vals: List[int], - with_replacement: bool = True, - with_edge_properties: bool = False, - batch_id_list: Sequence = None, - label_list: Sequence = None, - label_to_output_comm_rank: bool = None, - random_state: int = None, - return_offsets: bool = False, - return_hops: bool = False, - _multiple_clients: bool = False, -) -> Union[dask_cudf.DataFrame, Tuple[dask_cudf.DataFrame, dask_cudf.DataFrame]]: - warnings.warn( - "The batch_id_list, label_list, and label_to_output_comm_rank " - "parameters are deprecated. Consider using with_batch_ids, " - "keep_batches_together, min_batch_id, and max_batch_id instead." - ) - - if isinstance(start_list, int): - start_list = [start_list] - - if isinstance(start_list, list): - start_list = cudf.Series( - start_list, - dtype=input_graph.edgelist.edgelist_df[ - input_graph.renumber_map.renumbered_src_col_name - ].dtype, - ) - - elif with_edge_properties and batch_id_list is None: - batch_id_list = cudf.Series(cp.zeros(len(start_list), dtype="int32")) - - # fanout_vals must be a host array! - # FIXME: ensure other sequence types (eg. cudf Series) can be handled. - if isinstance(fanout_vals, list): - fanout_vals = numpy.asarray(fanout_vals, dtype="int32") - else: - raise TypeError("fanout_vals must be a list, " f"got: {type(fanout_vals)}") - - if "value" in input_graph.edgelist.edgelist_df: - weight_t = input_graph.edgelist.edgelist_df["value"].dtype - else: - weight_t = "float32" - - if "_SRC_" in input_graph.edgelist.edgelist_df: - indices_t = input_graph.edgelist.edgelist_df["_SRC_"].dtype - elif src_n in input_graph.edgelist.edgelist_df: - indices_t = input_graph.edgelist.edgelist_df[src_n].dtype - else: - indices_t = numpy.int32 - - start_list = start_list.rename(start_col_name) - if batch_id_list is not None: - batch_id_list = batch_id_list.rename(batch_col_name) - if hasattr(start_list, "compute"): - # mg input - start_list = start_list.to_frame() - batch_id_list = batch_id_list.to_frame() - ddf = start_list.merge( - batch_id_list, - how="left", - left_index=True, - right_index=True, - ) - else: - # sg input - ddf = cudf.concat( - [ - start_list, - batch_id_list, - ], - axis=1, - ) - else: - ddf = start_list.to_frame() - - if input_graph.renumbered: - ddf = input_graph.lookup_internal_vertex_id(ddf, column_name=start_col_name) - - if hasattr(ddf, "compute"): - ddf = get_distributed_data(ddf) - wait(ddf) - ddf = ddf.worker_to_parts - else: - splits = cp.array_split(cp.arange(len(ddf)), len(Comms.get_workers())) - ddf = {w: [ddf.iloc[splits[i]]] for i, w in enumerate(Comms.get_workers())} - - client = get_client() - session_id = Comms.get_session_id() - if _multiple_clients: - # Distributed centralized lock to allow - # two disconnected processes (clients) to coordinate a lock - # https://docs.dask.org/en/stable/futures.html?highlight=lock#distributed.Lock - lock = Lock("plc_graph_access") - if lock.acquire(timeout=100): - try: - ddf = _mg_call_plc_uniform_neighbor_sample_legacy( - client=client, - session_id=session_id, - input_graph=input_graph, - ddf=ddf, - label_list=label_list, - label_to_output_comm_rank=label_to_output_comm_rank, - fanout_vals=fanout_vals, - with_replacement=with_replacement, - weight_t=weight_t, - indices_t=indices_t, - with_edge_properties=with_edge_properties, - random_state=random_state, - return_offsets=return_offsets, - return_hops=return_hops, - ) - finally: - lock.release() - else: - raise RuntimeError( - "Failed to acquire lock(plc_graph_access) while trying to sampling" - ) - else: - ddf = _mg_call_plc_uniform_neighbor_sample_legacy( - client=client, - session_id=session_id, - input_graph=input_graph, - ddf=ddf, - label_list=label_list, - label_to_output_comm_rank=label_to_output_comm_rank, - fanout_vals=fanout_vals, - with_replacement=with_replacement, - weight_t=weight_t, - indices_t=indices_t, - with_edge_properties=with_edge_properties, - random_state=random_state, - return_offsets=return_offsets, - return_hops=return_hops, - ) - - if return_offsets: - ddf, offsets_ddf = ddf - if input_graph.renumbered: - ddf = input_graph.unrenumber(ddf, "sources", preserve_order=True) - ddf = input_graph.unrenumber(ddf, "destinations", preserve_order=True) - - if return_offsets: - return ddf, offsets_ddf - - return ddf - - -uniform_neighbor_sample_legacy = deprecated_warning_wrapper( - _uniform_neighbor_sample_legacy -) - - def uniform_neighbor_sample( input_graph: Graph, start_list: Sequence, fanout_vals: List[int], with_replacement: bool = True, - with_edge_properties: bool = False, - batch_id_list: Sequence = None, # deprecated - label_list: Sequence = None, # deprecated - label_to_output_comm_rank: bool = None, # deprecated + with_edge_properties: bool = False, # deprecated with_batch_ids: bool = False, keep_batches_together=False, min_batch_id=None, @@ -698,27 +434,10 @@ def uniform_neighbor_sample( Flag to specify if the random sampling is done with replacement with_edge_properties: bool, optional (default=False) + Deprecated. Flag to specify whether to return edge properties (weight, edge id, edge type, batch id, hop id) with the sampled edges. - batch_id_list: cudf.Series or dask_cudf.Series (int32), optional (default=None) - Deprecated. - List of batch ids that will be returned with the sampled edges if - with_edge_properties is set to True. - - label_list: cudf.Series or dask_cudf.Series (int32), optional (default=None) - Deprecated. - List of unique batch id labels. Used along with - label_to_output_comm_rank to assign batch ids to GPUs. - - label_to_out_comm_rank: cudf.Series or dask_cudf.Series (int32), - optional (default=None) - Deprecated. - List of output GPUs (by rank) corresponding to batch - id labels in the label list. Used to assign each batch - id to a GPU. - Must be in ascending order (i.e. [0, 0, 1, 2]). - with_batch_ids: bool, optional (default=False) Flag to specify whether batch ids are present in the start_list @@ -831,41 +550,6 @@ def uniform_neighbor_sample( Contains the batch offsets for the renumber maps """ - if ( - batch_id_list is not None - or label_list is not None - or label_to_output_comm_rank is not None - ): - if prior_sources_behavior or deduplicate_sources: - raise ValueError( - "unique sources, carry_over_sources, and deduplicate_sources" - " are not supported with batch_id_list, label_list, and" - " label_to_output_comm_rank. Consider using with_batch_ids" - " and keep_batches_together instead." - ) - - if renumber: - raise ValueError( - "renumber is not supported with batch_id_list, label_list, " - "and label_to_output_comm_rank. Consider using " - "with_batch_ids and keep_batches_together instead." - ) - - return uniform_neighbor_sample_legacy( - input_graph, - start_list, - fanout_vals, - with_replacement=with_replacement, - with_edge_properties=with_edge_properties, - batch_id_list=batch_id_list, - label_list=label_list, - label_to_output_comm_rank=label_to_output_comm_rank, - random_state=random_state, - return_offsets=return_offsets, - return_hops=return_hops, - _multiple_clients=_multiple_clients, - ) - if isinstance(start_list, int): start_list = [start_list] @@ -906,12 +590,17 @@ def uniform_neighbor_sample( "when performing renumbering." ) - # fanout_vals must be a host array! - # FIXME: ensure other sequence types (eg. cudf Series) can be handled. - if isinstance(fanout_vals, list): + # fanout_vals must be passed to pylibcugraph as a host array + if isinstance(fanout_vals, numpy.ndarray): + fanout_vals = fanout_vals.astype('int32') + elif isinstance(fanout_vals, list): fanout_vals = numpy.asarray(fanout_vals, dtype="int32") + elif isinstance(fanout_vals, cp.ndarray): + fanout_vals = fanout_vals.get().astype('int32') + elif isinstance(fanout_vals, cudf.Series): + fanout_vals = fanout_vals.values_host.astype('int32') else: - raise TypeError("fanout_vals must be a list, " f"got: {type(fanout_vals)}") + raise TypeError("fanout_vals must be a sequence, " f"got: {type(fanout_vals)}") if "value" in input_graph.edgelist.edgelist_df: weight_t = input_graph.edgelist.edgelist_df["value"].dtype diff --git a/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py b/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py index 96f40090a34..a6e6c022881 100644 --- a/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py @@ -55,143 +55,12 @@ def ensure_valid_dtype(input_graph, start_list): return start_list -def _uniform_neighbor_sample_legacy( - G: Graph, - start_list: Sequence, - fanout_vals: List[int], - with_replacement: bool = True, - with_edge_properties: bool = False, - batch_id_list: Sequence = None, - random_state: int = None, - return_offsets: bool = False, - return_hops: bool = True, -) -> Union[cudf.DataFrame, Tuple[cudf.DataFrame, cudf.DataFrame]]: - - warnings.warn( - "The batch_id_list parameter is deprecated. " - "Consider passing a DataFrame where the last column " - "is the batch ids and setting with_batch_ids=True" - ) - - if isinstance(start_list, int): - start_list = [start_list] - - if isinstance(start_list, list): - start_list = cudf.Series( - start_list, dtype=G.edgelist.edgelist_df[G.srcCol].dtype - ) - - if with_edge_properties and batch_id_list is None: - batch_id_list = cp.zeros(len(start_list), dtype="int32") - - # fanout_vals must be a host array! - # FIXME: ensure other sequence types (eg. cudf Series) can be handled. - if isinstance(fanout_vals, list): - fanout_vals = numpy.asarray(fanout_vals, dtype="int32") - else: - raise TypeError("fanout_vals must be a list, " f"got: {type(fanout_vals)}") - - if "weights" in G.edgelist.edgelist_df: - weight_t = G.edgelist.edgelist_df["weights"].dtype - else: - weight_t = "float32" - - start_list = ensure_valid_dtype(G, start_list) - - if G.renumbered is True: - if isinstance(start_list, cudf.DataFrame): - start_list = G.lookup_internal_vertex_id(start_list, start_list.columns) - else: - start_list = G.lookup_internal_vertex_id(start_list) - - sampling_result = pylibcugraph_uniform_neighbor_sample( - resource_handle=ResourceHandle(), - input_graph=G._plc_graph, - start_list=start_list, - h_fan_out=fanout_vals, - with_replacement=with_replacement, - do_expensive_check=False, - with_edge_properties=with_edge_properties, - batch_id_list=batch_id_list, - return_hops=return_hops, - random_state=random_state, - ) - - df = cudf.DataFrame() - - if with_edge_properties: - ( - sources, - destinations, - weights, - edge_ids, - edge_types, - batch_ids, - offsets, - hop_ids, - ) = sampling_result - - df["sources"] = sources - df["destinations"] = destinations - df["weight"] = weights - df["edge_id"] = edge_ids - df["edge_type"] = edge_types - df["hop_id"] = hop_ids - - if return_offsets: - offsets_df = cudf.DataFrame( - { - "batch_id": batch_ids, - "offsets": offsets[:-1], - } - ) - - else: - if len(batch_ids) > 0: - batch_ids = cudf.Series(batch_ids).repeat(cp.diff(offsets)) - batch_ids.reset_index(drop=True, inplace=True) - - df["batch_id"] = batch_ids - - else: - sources, destinations, indices = sampling_result - - df["sources"] = sources - df["destinations"] = destinations - - if indices is None: - df["indices"] = None - else: - df["indices"] = indices - if weight_t == "int32": - df["indices"] = indices.astype("int32") - elif weight_t == "int64": - df["indices"] = indices.astype("int64") - else: - df["indices"] = indices - - if G.renumbered: - df = G.unrenumber(df, "sources", preserve_order=True) - df = G.unrenumber(df, "destinations", preserve_order=True) - - if return_offsets: - return df, offsets_df - - return df - - -uniform_neighbor_sample_legacy = deprecated_warning_wrapper( - _uniform_neighbor_sample_legacy -) - - def uniform_neighbor_sample( G: Graph, start_list: Sequence, fanout_vals: List[int], with_replacement: bool = True, - with_edge_properties: bool = False, - batch_id_list: Sequence = None, # deprecated + with_edge_properties: bool = False, # deprecated with_batch_ids: bool = False, random_state: int = None, return_offsets: bool = False, @@ -221,14 +90,10 @@ def uniform_neighbor_sample( Flag to specify if the random sampling is done with replacement with_edge_properties: bool, optional (default=False) + Deprecated. Flag to specify whether to return edge properties (weight, edge id, edge type, batch id, hop id) with the sampled edges. - batch_id_list: list (int32) - Deprecated. - List of batch ids that will be returned with the sampled edges if - with_edge_properties is set to True. - with_batch_ids: bool, optional (default=False) Flag to specify whether batch ids are present in the start_list Assumes they are the last column in the start_list dataframe @@ -329,29 +194,12 @@ def uniform_neighbor_sample( Contains the batch offsets for the renumber maps """ - if batch_id_list is not None: - if prior_sources_behavior or deduplicate_sources: - raise ValueError( - "prior_sources_behavior and deduplicate_sources" - " are not supported with batch_id_list." - " Consider using with_batch_ids instead." - ) - if renumber: - raise ValueError( - "renumber is not supported with batch_id_list." - " Consider using with_batch_ids instead." - ) - return uniform_neighbor_sample_legacy( - G, - start_list, - fanout_vals, - with_replacement=with_replacement, - with_edge_properties=with_edge_properties, - batch_id_list=batch_id_list, - random_state=random_state, - return_offsets=return_offsets, - return_hops=return_hops, + if with_edge_properties: + warning_msg = ( + "The with_edge_properties flag is deprecated" + " and will be removed in the next release." ) + warnings.warn(warning_msg, DeprecationWarning) if isinstance(start_list, int): start_list = [start_list] @@ -369,12 +217,17 @@ def uniform_neighbor_sample( cp.zeros(len(start_list), dtype="int32") ) - # fanout_vals must be a host array! - # FIXME: ensure other sequence types (eg. cudf Series) can be handled. - if isinstance(fanout_vals, list): + # fanout_vals must be passed to pylibcugraph as a host array + if isinstance(fanout_vals, numpy.ndarray): + fanout_vals = fanout_vals.astype('int32') + elif isinstance(fanout_vals, list): fanout_vals = numpy.asarray(fanout_vals, dtype="int32") + elif isinstance(fanout_vals, cp.ndarray): + fanout_vals = fanout_vals.get().astype('int32') + elif isinstance(fanout_vals, cudf.Series): + fanout_vals = fanout_vals.values_host.astype('int32') else: - raise TypeError("fanout_vals must be a list, " f"got: {type(fanout_vals)}") + raise TypeError("fanout_vals must be a sequence, " f"got: {type(fanout_vals)}") if "weights" in G.edgelist.edgelist_df: weight_t = G.edgelist.edgelist_df["weights"].dtype diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py index 2690ab88c13..532ac5d9eeb 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py @@ -136,7 +136,7 @@ def __from_edgelist( warning_msg = ( "The parameter 'legacy_renum_only' is deprecated and will be removed." ) - warnings.warn(warning_msg, DeprecationWarning) + warnings.warn(warning_msg, ) # Verify column names present in input DataFrame s_col = source From bc2d3724256e0e0ecfb7b1dd9531d594bc1cec30 Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Tue, 22 Aug 2023 08:06:41 -0700 Subject: [PATCH 2/8] add deprecation warning for with_edge_properties --- .../cugraph/dask/sampling/uniform_neighbor_sample.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py b/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py index f48d2c25263..a63eb2834e4 100644 --- a/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py @@ -550,6 +550,13 @@ def uniform_neighbor_sample( Contains the batch offsets for the renumber maps """ + if with_edge_properties: + warning_msg = ( + "The with_edge_properties flag is deprecated" + " and will be removed in the next release." + ) + warnings.warn(warning_msg, DeprecationWarning) + if isinstance(start_list, int): start_list = [start_list] From 6cac7628dd69dac43adee2a8654638b4e64914b8 Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Tue, 22 Aug 2023 08:07:31 -0700 Subject: [PATCH 3/8] fix style --- .../cugraph/dask/sampling/uniform_neighbor_sample.py | 10 ++++------ .../cugraph/sampling/uniform_neighbor_sample.py | 9 ++++----- .../structure/graph_implementation/simpleGraph.py | 4 +++- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py b/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py index a63eb2834e4..9e50169b4a7 100644 --- a/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py @@ -27,10 +27,8 @@ from pylibcugraph import ResourceHandle from pylibcugraph import uniform_neighbor_sample as pylibcugraph_uniform_neighbor_sample -from pylibcugraph.utilities.api_tools import deprecated_warning_wrapper from cugraph.dask.comms import comms as Comms -from cugraph.dask.common.input_utils import get_distributed_data from cugraph.dask import get_n_workers from typing import Sequence, List, Union, Tuple @@ -400,7 +398,7 @@ def uniform_neighbor_sample( start_list: Sequence, fanout_vals: List[int], with_replacement: bool = True, - with_edge_properties: bool = False, # deprecated + with_edge_properties: bool = False, # deprecated with_batch_ids: bool = False, keep_batches_together=False, min_batch_id=None, @@ -599,13 +597,13 @@ def uniform_neighbor_sample( # fanout_vals must be passed to pylibcugraph as a host array if isinstance(fanout_vals, numpy.ndarray): - fanout_vals = fanout_vals.astype('int32') + fanout_vals = fanout_vals.astype("int32") elif isinstance(fanout_vals, list): fanout_vals = numpy.asarray(fanout_vals, dtype="int32") elif isinstance(fanout_vals, cp.ndarray): - fanout_vals = fanout_vals.get().astype('int32') + fanout_vals = fanout_vals.get().astype("int32") elif isinstance(fanout_vals, cudf.Series): - fanout_vals = fanout_vals.values_host.astype('int32') + fanout_vals = fanout_vals.values_host.astype("int32") else: raise TypeError("fanout_vals must be a sequence, " f"got: {type(fanout_vals)}") diff --git a/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py b/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py index a6e6c022881..219854bb002 100644 --- a/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py @@ -15,7 +15,6 @@ from pylibcugraph import ResourceHandle from pylibcugraph import uniform_neighbor_sample as pylibcugraph_uniform_neighbor_sample -from pylibcugraph.utilities.api_tools import deprecated_warning_wrapper import numpy @@ -60,7 +59,7 @@ def uniform_neighbor_sample( start_list: Sequence, fanout_vals: List[int], with_replacement: bool = True, - with_edge_properties: bool = False, # deprecated + with_edge_properties: bool = False, # deprecated with_batch_ids: bool = False, random_state: int = None, return_offsets: bool = False, @@ -219,13 +218,13 @@ def uniform_neighbor_sample( # fanout_vals must be passed to pylibcugraph as a host array if isinstance(fanout_vals, numpy.ndarray): - fanout_vals = fanout_vals.astype('int32') + fanout_vals = fanout_vals.astype("int32") elif isinstance(fanout_vals, list): fanout_vals = numpy.asarray(fanout_vals, dtype="int32") elif isinstance(fanout_vals, cp.ndarray): - fanout_vals = fanout_vals.get().astype('int32') + fanout_vals = fanout_vals.get().astype("int32") elif isinstance(fanout_vals, cudf.Series): - fanout_vals = fanout_vals.values_host.astype('int32') + fanout_vals = fanout_vals.values_host.astype("int32") else: raise TypeError("fanout_vals must be a sequence, " f"got: {type(fanout_vals)}") diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py index 532ac5d9eeb..2b23d3a26b7 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py @@ -136,7 +136,9 @@ def __from_edgelist( warning_msg = ( "The parameter 'legacy_renum_only' is deprecated and will be removed." ) - warnings.warn(warning_msg, ) + warnings.warn( + warning_msg, + ) # Verify column names present in input DataFrame s_col = source From ac3d0d037bac6548c8eadcaea1c66b89118a7f50 Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Wed, 23 Aug 2023 07:00:43 -0700 Subject: [PATCH 4/8] fix pytests --- .../tests/mg/test_mg_cugraph_sampler.py | 18 ++++++++++++++---- .../cugraph_pyg/tests/test_cugraph_sampler.py | 17 +++++++++++++---- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py index 93687c4a107..96e153df51b 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py @@ -33,14 +33,19 @@ def test_neighbor_sample(dask_client, basic_graph_1): F, G, N = basic_graph_1 cugraph_store = CuGraphStore(F, G, N, multi_gpu=True) + batches = cudf.DataFrame({ + 'start': cudf.Series([0, 1, 2, 3, 4], dtype="int64"), + 'batch': cudf.Series(cupy.zeros(5, dtype="int32")), + }) + sampling_results = ( uniform_neighbor_sample( cugraph_store._subgraph(), - cudf.Series([0, 1, 2, 3, 4], dtype="int64"), + batches, + with_batch_ids=True, fanout_vals=[-1], with_replacement=False, with_edge_properties=True, - batch_id_list=cudf.Series(cupy.zeros(5, dtype="int32")), random_state=62, return_offsets=False, return_hops=True, @@ -90,16 +95,21 @@ def test_neighbor_sample_multi_vertex(dask_client, multi_edge_multi_vertex_graph F, G, N = multi_edge_multi_vertex_graph_1 cugraph_store = CuGraphStore(F, G, N, multi_gpu=True) + batches = cudf.DataFrame({ + 'start': cudf.Series([0, 1, 2, 3, 4], dtype="int64"), + 'batches': cudf.Series(cupy.zeros(5, dtype="int32")), + }) + sampling_results = ( uniform_neighbor_sample( cugraph_store._subgraph(), - cudf.Series([0, 1, 2, 3, 4], dtype="int64"), + batches, fanout_vals=[-1], with_replacement=False, with_edge_properties=True, - batch_id_list=cudf.Series(cupy.zeros(5, dtype="int32")), random_state=62, return_offsets=False, + with_batch_ids=True, ) .sort_values(by=["sources", "destinations"]) .compute() diff --git a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py index c1949f495e4..7e9cc024a07 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py @@ -31,13 +31,18 @@ def test_neighbor_sample(basic_graph_1): F, G, N = basic_graph_1 cugraph_store = CuGraphStore(F, G, N) + batches = cudf.DataFrame({ + 'start': cudf.Series([0, 1, 2, 3, 4], dtype="int64"), + 'batch': cudf.Series(cupy.zeros(5, dtype="int32")), + }) + sampling_results = uniform_neighbor_sample( cugraph_store._subgraph(), - cudf.Series([0, 1, 2, 3, 4], dtype="int64"), + batches, fanout_vals=[-1], with_replacement=False, with_edge_properties=True, - batch_id_list=cudf.Series(cupy.zeros(5, dtype="int32")), + with_batch_ids=True, random_state=62, return_offsets=False, ).sort_values(by=["sources", "destinations"]) @@ -82,13 +87,17 @@ def test_neighbor_sample_multi_vertex(multi_edge_multi_vertex_graph_1): F, G, N = multi_edge_multi_vertex_graph_1 cugraph_store = CuGraphStore(F, G, N) + batches = cudf.DataFrame({ + 'start': cudf.Series([0, 1, 2, 3, 4], dtype="int64"), + 'batch': cudf.Series(cupy.zeros(5, dtype="int32")), + }) + sampling_results = uniform_neighbor_sample( cugraph_store._subgraph(), - cudf.Series([0, 1, 2, 3, 4], dtype="int64"), + batches, fanout_vals=[-1], with_replacement=False, with_edge_properties=True, - batch_id_list=cudf.Series(cupy.zeros(5, dtype="int32")), random_state=62, return_offsets=False, ).sort_values(by=["sources", "destinations"]) From 567ea66c28acf19127ff6278380f952ec3c7491b Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Wed, 23 Aug 2023 07:02:51 -0700 Subject: [PATCH 5/8] add with_batch_ids option --- python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py index 7e9cc024a07..5dd39804c0a 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py @@ -100,6 +100,7 @@ def test_neighbor_sample_multi_vertex(multi_edge_multi_vertex_graph_1): with_edge_properties=True, random_state=62, return_offsets=False, + with_batch_ids=True, ).sort_values(by=["sources", "destinations"]) out = _sampler_output_from_sampling_results( From b540997b9bd1877a0e2defdfefb5ad331b32c6fd Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Wed, 23 Aug 2023 07:03:26 -0700 Subject: [PATCH 6/8] fix style --- .../tests/mg/test_mg_cugraph_sampler.py | 20 +++++++++++-------- .../cugraph_pyg/tests/test_cugraph_sampler.py | 20 +++++++++++-------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py index 96e153df51b..550852a3303 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py @@ -33,10 +33,12 @@ def test_neighbor_sample(dask_client, basic_graph_1): F, G, N = basic_graph_1 cugraph_store = CuGraphStore(F, G, N, multi_gpu=True) - batches = cudf.DataFrame({ - 'start': cudf.Series([0, 1, 2, 3, 4], dtype="int64"), - 'batch': cudf.Series(cupy.zeros(5, dtype="int32")), - }) + batches = cudf.DataFrame( + { + "start": cudf.Series([0, 1, 2, 3, 4], dtype="int64"), + "batch": cudf.Series(cupy.zeros(5, dtype="int32")), + } + ) sampling_results = ( uniform_neighbor_sample( @@ -95,10 +97,12 @@ def test_neighbor_sample_multi_vertex(dask_client, multi_edge_multi_vertex_graph F, G, N = multi_edge_multi_vertex_graph_1 cugraph_store = CuGraphStore(F, G, N, multi_gpu=True) - batches = cudf.DataFrame({ - 'start': cudf.Series([0, 1, 2, 3, 4], dtype="int64"), - 'batches': cudf.Series(cupy.zeros(5, dtype="int32")), - }) + batches = cudf.DataFrame( + { + "start": cudf.Series([0, 1, 2, 3, 4], dtype="int64"), + "batches": cudf.Series(cupy.zeros(5, dtype="int32")), + } + ) sampling_results = ( uniform_neighbor_sample( diff --git a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py index 5dd39804c0a..08a8625b33b 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py @@ -31,10 +31,12 @@ def test_neighbor_sample(basic_graph_1): F, G, N = basic_graph_1 cugraph_store = CuGraphStore(F, G, N) - batches = cudf.DataFrame({ - 'start': cudf.Series([0, 1, 2, 3, 4], dtype="int64"), - 'batch': cudf.Series(cupy.zeros(5, dtype="int32")), - }) + batches = cudf.DataFrame( + { + "start": cudf.Series([0, 1, 2, 3, 4], dtype="int64"), + "batch": cudf.Series(cupy.zeros(5, dtype="int32")), + } + ) sampling_results = uniform_neighbor_sample( cugraph_store._subgraph(), @@ -87,10 +89,12 @@ def test_neighbor_sample_multi_vertex(multi_edge_multi_vertex_graph_1): F, G, N = multi_edge_multi_vertex_graph_1 cugraph_store = CuGraphStore(F, G, N) - batches = cudf.DataFrame({ - 'start': cudf.Series([0, 1, 2, 3, 4], dtype="int64"), - 'batch': cudf.Series(cupy.zeros(5, dtype="int32")), - }) + batches = cudf.DataFrame( + { + "start": cudf.Series([0, 1, 2, 3, 4], dtype="int64"), + "batch": cudf.Series(cupy.zeros(5, dtype="int32")), + } + ) sampling_results = uniform_neighbor_sample( cugraph_store._subgraph(), From 47c3e4b0a455b87678a08218ddc6cbcd0fe0251e Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Wed, 23 Aug 2023 17:52:32 -0700 Subject: [PATCH 7/8] add whitespace --- python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py index e3eb4a85a85..f82721690ad 100644 --- a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py @@ -367,6 +367,7 @@ def __construct_graph( ------- A newly-constructed directed cugraph.MultiGraph object. """ + # Ensure the original dict is not modified. edge_info_cg = {} From 84ce3c93347b9825051d6d6ea099ffb0c5ee9140 Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Thu, 24 Aug 2023 14:28:30 -0700 Subject: [PATCH 8/8] fix style --- python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py index f82721690ad..8d5d2fd4894 100644 --- a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py @@ -367,7 +367,7 @@ def __construct_graph( ------- A newly-constructed directed cugraph.MultiGraph object. """ - + # Ensure the original dict is not modified. edge_info_cg = {}