Skip to content

Add NearestNeighbors SPMD API #2557

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
5 changes: 2 additions & 3 deletions onedal/neighbors/neighbors.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,6 @@ def __init__(
self,
n_neighbors=5,
*,
weights="uniform",
algorithm="auto",
p=2,
metric="minkowski",
Expand All @@ -745,7 +744,7 @@ def __init__(
metric_params=metric_params,
**kwargs,
)
self.weights = weights
self.requires_y = False

@bind_default_backend("neighbors.search")
def train(self, *args, **kwargs): ...
Expand Down Expand Up @@ -792,7 +791,7 @@ def _onedal_predict(self, model, X, params):
return self.infer(params, model, X)

@supports_queue
def fit(self, X, y, queue=None):
def fit(self, X, y=None, queue=None):
return self._fit(X, y)

@supports_queue
Expand Down
4 changes: 2 additions & 2 deletions onedal/spmd/neighbors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@
# limitations under the License.
# ==============================================================================

from .neighbors import KNeighborsClassifier, KNeighborsRegressor
from .neighbors import KNeighborsClassifier, KNeighborsRegressor, NearestNeighbors

__all__ = ["KNeighborsClassifier", "KNeighborsRegressor"]
__all__ = ["KNeighborsClassifier", "KNeighborsRegressor", "NearestNeighbors"]
18 changes: 18 additions & 0 deletions onedal/spmd/neighbors/neighbors.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ...common._backend import bind_spmd_backend
from ...neighbors import KNeighborsClassifier as KNeighborsClassifier_Batch
from ...neighbors import KNeighborsRegressor as KNeighborsRegressor_Batch
from ...neighbors import NearestNeighbors as NearestNeighbors_Batch


class KNeighborsClassifier(KNeighborsClassifier_Batch):
Expand Down Expand Up @@ -84,3 +85,20 @@ def _get_onedal_params(self, X, y=None):
if "responses" not in params["result_option"]:
params["result_option"] += "|responses"
return params


class NearestNeighbors(NearestNeighbors_Batch):

@bind_spmd_backend("neighbors.search")
def train(self, *args, **kwargs): ...

@bind_spmd_backend("neighbors.search")
def infer(self, *args, **kwargs): ...

@support_input_format
def fit(self, X, y=None, queue=None):
return super().fit(X, y, queue=queue)

@support_input_format
def kneighbors(self, X=None, n_neighbors=None, return_distance=True, queue=None):
return super().kneighbors(X, n_neighbors, return_distance, queue=queue)
8 changes: 6 additions & 2 deletions sklearnex/spmd/neighbors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
# limitations under the License.
# ==============================================================================

from onedal.spmd.neighbors import KNeighborsClassifier, KNeighborsRegressor
from onedal.spmd.neighbors import (
KNeighborsClassifier,
KNeighborsRegressor,
NearestNeighbors,
)

__all__ = ["KNeighborsClassifier", "KNeighborsRegressor"]
__all__ = ["KNeighborsClassifier", "KNeighborsRegressor", "NearestNeighbors"]
81 changes: 81 additions & 0 deletions sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
_assert_unordered_allclose,
_generate_classification_data,
_generate_regression_data,
_generate_statistic_data,
_get_local_tensor,
_mpi_libs_and_gpu_available,
_spmd_assert_allclose,
Expand Down Expand Up @@ -308,3 +309,83 @@ def test_knnreg_spmd_synthetic(
spmd_dists, batch_dists, localize=True, rtol=tol, atol=tol
)
_spmd_assert_allclose(spmd_result, batch_result, rtol=tol, atol=tol)


@pytest.mark.skipif(
not _mpi_libs_and_gpu_available,
reason="GPU device and MPI libs required for test",
)
@pytest.mark.parametrize(
"dataframe,queue",
get_dataframes_and_queues(dataframe_filter_="dpnp,dpctl", device_filter_="gpu"),
)
@pytest.mark.mpi
def test_knnsearch_spmd_gold(dataframe, queue):
# Import spmd and batch algo
from sklearnex.neighbors import NearestNeighbors as NearestNeighbors_Batch
from sklearnex.spmd.neighbors import NearestNeighbors as NearestNeighbors_SPMD

# Create gold data and convert to dataframe
X_train = np.array([[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2]])
local_dpt_X_train = _convert_to_dataframe(
_get_local_tensor(X_train), sycl_queue=queue, target_df=dataframe
)

# Ensure predictions of batch algo match spmd
spmd_model = NearestNeighbors_SPMD(n_neighbors=2, algorithm="brute").fit(
local_dpt_X_train
)
batch_model = NearestNeighbors_Batch(n_neighbors=2, algorithm="brute").fit(X_train)
spmd_dists, spmd_indcs = spmd_model.kneighbors(local_dpt_X_train)
batch_dists, batch_indcs = batch_model.kneighbors(X_train)

_assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True)
_assert_unordered_allclose(spmd_dists, batch_dists, localize=True)


@pytest.mark.skipif(
not _mpi_libs_and_gpu_available,
reason="GPU device and MPI libs required for test",
)
@pytest.mark.parametrize(
"dimensions", [{"n": 100, "m": 10, "k": 2}, {"n": 100000, "m": 100, "k": 100}]
)
@pytest.mark.parametrize(
"dataframe,queue",
get_dataframes_and_queues(dataframe_filter_="dpnp,dpctl", device_filter_="gpu"),
)
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
@pytest.mark.mpi
def test_knnsearch_spmd_synthetic(
dimensions,
dataframe,
queue,
dtype,
):
if dimensions["n"] > 10000 and dtype == np.float32:
pytest.skip("Skipping large float32 test due to expected precision issues")

# Import spmd and batch algo
from sklearnex.neighbors import NearestNeighbors as NearestNeighbors_Batch
from sklearnex.spmd.neighbors import NearestNeighbors as NearestNeighbors_SPMD

# Generate data and convert to dataframe
X_train = _generate_statistic_data(dimensions["n"], dimensions["m"], dtype=dtype)

local_dpt_X_train = _convert_to_dataframe(
_get_local_tensor(X_train), sycl_queue=queue, target_df=dataframe
)

# Ensure search results of batch algo match spmd
spmd_model = NearestNeighbors_SPMD(
n_neighbors=dimensions["k"], algorithm="brute"
).fit(local_dpt_X_train)
batch_model = NearestNeighbors_Batch(
n_neighbors=dimensions["k"], algorithm="brute"
).fit(X_train)
spmd_dists, spmd_indcs = spmd_model.kneighbors(local_dpt_X_train)
batch_dists, batch_indcs = batch_model.kneighbors(X_train)

tol = 0.005 if dtype == np.float32 else 1e-6
_assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True)
_assert_unordered_allclose(spmd_dists, batch_dists, localize=True, rtol=tol, atol=tol)
Loading