Skip to content

Commit 052e82e

Browse files
REFACTOR-#7427: Track engine and storage format at the query compiler level.
Signed-off-by: sfc-gh-mvashishtha <[email protected]>
1 parent 02515bb commit 052e82e

File tree

23 files changed

+210
-92
lines changed

23 files changed

+210
-92
lines changed

modin/conftest.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,9 @@ class TestQC(BaseQueryCompiler):
165165
def __init__(self, modin_frame):
166166
self._modin_frame = modin_frame
167167

168+
storage_format = property(lambda self: "Base")
169+
engine = property(lambda self: "Python")
170+
168171
def finalize(self):
169172
self._modin_frame.finalize()
170173

modin/core/dataframe/algebra/binary.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ def caller(
393393
and query_compiler.columns.equals(other.columns)
394394
):
395395
shape_hint = "column"
396-
return query_compiler.__constructor__(
396+
return query_compiler._constructor(
397397
query_compiler._modin_frame.broadcast_apply(
398398
axis,
399399
lambda left, right: func(
@@ -417,7 +417,7 @@ def caller(
417417
and query_compiler.columns.equals(other.columns)
418418
):
419419
shape_hint = "column"
420-
return query_compiler.__constructor__(
420+
return query_compiler._constructor(
421421
query_compiler._modin_frame.n_ary_op(
422422
lambda x, y: func(x, y, *args, **kwargs),
423423
[other._modin_frame],
@@ -453,7 +453,7 @@ def caller(
453453
dtypes=dtypes,
454454
lazy=True,
455455
)
456-
return query_compiler.__constructor__(
456+
return query_compiler._constructor(
457457
new_modin_frame, shape_hint=shape_hint
458458
)
459459

modin/core/dataframe/algebra/fold.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def caller(
8282
A new query compiler representing the result of executing the
8383
function.
8484
"""
85-
return query_compiler.__constructor__(
85+
return query_compiler._constructor(
8686
query_compiler._modin_frame.fold(
8787
cls.validate_axis(fold_axis),
8888
lambda x: fold_function(x, *args, **kwargs),

modin/core/dataframe/algebra/groupby.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ def caller(
446446
new_index=new_index,
447447
)
448448

449-
result = query_compiler.__constructor__(new_modin_frame)
449+
result = query_compiler._constructor(new_modin_frame)
450450
return result
451451

452452
@classmethod

modin/core/dataframe/algebra/map.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def caller(
6060
) -> PandasQueryCompiler:
6161
"""Execute Map function against passed query compiler."""
6262
shape_hint = call_kwds.pop("shape_hint", None) or query_compiler._shape_hint
63-
return query_compiler.__constructor__(
63+
return query_compiler._constructor(
6464
query_compiler._modin_frame.map(
6565
lambda x: function(x, *args, **kwargs), *call_args, **call_kwds
6666
),

modin/core/dataframe/algebra/reduce.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def caller(
6060
) -> PandasQueryCompiler:
6161
"""Execute Reduce function against passed query compiler."""
6262
_axis = kwargs.get("axis") if axis is None else axis
63-
return query_compiler.__constructor__(
63+
return query_compiler._constructor(
6464
query_compiler._modin_frame.reduce(
6565
cls.validate_axis(_axis),
6666
lambda x: reduce_function(x, *args, **kwargs),

modin/core/dataframe/algebra/tree_reduce.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def caller(
7070
if compute_dtypes and query_compiler.frame_has_materialized_dtypes:
7171
new_dtypes = str(compute_dtypes(query_compiler.dtypes, *args, **kwargs))
7272

73-
return query_compiler.__constructor__(
73+
return query_compiler._constructor(
7474
query_compiler._modin_frame.tree_reduce(
7575
cls.validate_axis(_axis),
7676
lambda x: map_function(x, *args, **kwargs),

modin/core/dataframe/pandas/dataframe/dataframe.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
from pandas.core.indexes.api import Index, RangeIndex
3434

3535
from modin.config import (
36-
Engine,
3736
IsRayCluster,
3837
MinColumnPartitionSize,
3938
MinRowPartitionSize,
@@ -1677,7 +1676,9 @@ def copy(self):
16771676
)
16781677

16791678
@lazy_metadata_decorator(apply_axis="both")
1680-
def astype(self, col_dtypes, errors: str = "raise"):
1679+
def astype(
1680+
self, col_dtypes, engine: str, storage_format: str, errors: str = "raise"
1681+
):
16811682
"""
16821683
Convert the columns dtypes to given dtypes.
16831684
@@ -1707,7 +1708,7 @@ def astype(self, col_dtypes, errors: str = "raise"):
17071708
new_dtypes = self_dtypes.copy()
17081709
# Update the new dtype series to the proper pandas dtype
17091710
new_dtype = pandas.api.types.pandas_dtype(dtype)
1710-
if Engine.get() == "Dask" and hasattr(dtype, "_is_materialized"):
1711+
if engine == "Dask" and hasattr(dtype, "_is_materialized"):
17111712
# FIXME: https://github.com/dask/distributed/issues/8585
17121713
_ = dtype._materialize_categories()
17131714

@@ -1736,7 +1737,7 @@ def astype_builder(df):
17361737
if not (col_dtypes == self_dtypes).all():
17371738
new_dtypes = self_dtypes.copy()
17381739
new_dtype = pandas.api.types.pandas_dtype(col_dtypes)
1739-
if Engine.get() == "Dask" and hasattr(new_dtype, "_is_materialized"):
1740+
if engine == "Dask" and hasattr(new_dtype, "_is_materialized"):
17401741
# FIXME: https://github.com/dask/distributed/issues/8585
17411742
_ = new_dtype._materialize_categories()
17421743
if isinstance(new_dtype, pandas.CategoricalDtype):

modin/core/execution/dispatching/factories/factories.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,8 @@ def _to_pickle_glob(cls, *args, **kwargs):
629629
**kwargs : kwargs
630630
Arguments to the writer method.
631631
"""
632+
# TODO(https://github.com/modin-project/modin/issues/7429): Use
633+
# frame-level execution instead of the global, default execution.
632634
current_execution = get_current_execution()
633635
if current_execution not in supported_executions:
634636
raise NotImplementedError(

modin/core/execution/modin_aqp.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ def call_progress_bar(result_parts, line_no):
9191

9292
threading.Thread(target=_show_time_updates, args=(progress_bars[pbar_id],)).start()
9393

94+
# TODO(https://github.com/modin-project/modin/issues/7429): Use
95+
# frame-level engine config.
9496
modin_engine = Engine.get()
9597
engine_wrapper = None
9698
if modin_engine == "Ray":

0 commit comments

Comments
 (0)