Skip to content

Conversation

@dancingactor
Copy link

Description

  • Increased workload: dummy_map_batches_sleep(0.01) -> dummy_map_batches_sleep(0.03)
  • Add a Tolerance rate of 0.9

Related issues

Closes #58565

Additional information

Verification

Run the following script to verify that test_dataset_throughput will be deterministic after this modification

#!/bin/bash
pass=0
fail=0

echo "Starting 20 test runs..."

for i in {1..20}; do
  echo "Run $i ..."
  if pytest -s python/ray/data/tests/test_stats.py::test_dataset_throughput -xvs > /dev/null 2>&1; then
    echo "  Result: PASS"
    ((pass++))
  else
    echo "  Result: FAIL"
    ((fail++))
  fi
done

echo "Finished."
echo "Passed: $pass, Failed: $fail"
Screenshot 2025-11-17 at 11 55 33 AM

benchmark

The following statistics show the performance before and after the change. Execution time increases slightly, but the results are now more deterministic.

Before(dummy_map_batches_sleep(0.01), no tolerance)

  • Operator 1

    • Ray Data throughput: 72.72917246615148 rows/s
    • Estimated single node throughput: 90.58423664907902 rows/s
    • Execution time: 1.37s
  • Operator 2

    • Ray Data throughput: 73.90465679066125 rows/s
    • Estimated single node throughput: 91.43369873780887 rows/s
    • Execution time: 1.35s
  • Dataset

    • Ray Data throughput: 50.12984150299628 rows/s
    • Estimated single node throughput: 45.50349275504925 rows/s

After(dummy_map_batches_sleep(0.03), tolerance rate 0.9)

  • Operator 1

    • Ray Data throughput: 46.21329599716785 rows/s
    • Estimated single node throughput: 32.232319387714035 rows/s
    • Execution time: 2.16s
  • Operator 2

    • Ray Data throughput: 42.99239308718863 rows/s
    • Estimated single node throughput: 32.23841945286399 rows/s
    • Execution time: 2.33s
  • Dataset

    • Ray Data throughput: 25.102150849555628 rows/s
    • Estimated single node throughput: 16.117684565851157 rows/s

@dancingactor dancingactor requested a review from a team as a code owner November 17, 2025 04:10
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request aims to make test_dataset_throughput deterministic by increasing the workload and introducing a tolerance for the throughput assertions. The changes look good and should help improve the test's stability. I've added a couple of minor style suggestions to align a new variable name with PEP 8 conventions.

@dancingactor dancingactor force-pushed the master branch 2 times, most recently from 08f8c78 to 7f86e78 Compare November 17, 2025 05:17
ray.init(num_cpus=2)

f = dummy_map_batches_sleep(0.01)
f = dummy_map_batches_sleep(0.03)
Copy link
Author

@dancingactor dancingactor Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A shorter sleep time is better because it reduces the execution time. However, we choose 0.03 instead of 0.02 because using 0.02 resulting in 1 failure during 20 test runs

@dancingactor
Copy link
Author

@owenowenisme @bveeramani PTAL, thanks!

Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR decreases the likeliness that this test fails, but ultimately, the test still relies on nondeterministic timing. It's also brittle because it uses regex that can break with minor formatting changes.

Rather than adjusting the parameters, could you rewrite this test as a unit test?

Separately, I realized that the "per node" throughputs actually represent the per-task throughput. Based on this, I think we should:

  1. Remove the "per node throughput" for the "Dataset throughput" section, because the average per-task throughput across all operators isn't really useful, and
  2. Rename "per node throughput" to "per task throughput" in the "Operator throughput" sections

The tests could look something like this:

def test_dataset_throughput_calculation():
    """Test throughput calculations using mock block stats."""
    from ray.data._internal.stats import DatasetStats
    from ray.data.block import BlockStats, BlockExecStats

    # Helper to create minimal block stats with only timing fields
    def create_block_stats(start_time, end_time, num_rows):
        exec_stats = BlockExecStats()
        exec_stats.start_time_s = start_time
        exec_stats.end_time_s = end_time
        exec_stats.wall_time_s = end_time - start_time

        return BlockStats(
            num_rows=num_rows,
            size_bytes=None,
            exec_stats=exec_stats
        )

    # Simulate 3 overlapping blocks
    blocks = [
        create_block_stats(0.0, 2.0, 100),
        create_block_stats(0.5, 2.5, 100),
        create_block_stats(1.0, 3.0, 100),
    ]

    stats = DatasetStats(metadata={"Map": blocks}, parent=None)
    summary = stats.to_summary()

    # Throughput: total rows / total execution duration
    # Total rows = 300
    # Duration = max end_time - min start_time = 3.0s
    # 300 rows / 3s = 100 rows/s
    # TODO: You'll need to expose this as a property so that it's testable.
    assert summary.num_rows_per_s == 100

def test_operator_throughput_calculation():
    ...  # A similar unit test. You might need to do some refactoring.

    # summary is a OperatorStatsSummary here, not DatasetStatsSummary
    # TODO: You'll need to similarly expose this property.
    assert summary.num_rows_per_s == 100
    assert summary.num_rows_per_task_s == 100

@bveeramani
Copy link
Member

@dancingactor lemme know if you have any questions.

@ray-gardener ray-gardener bot added data Ray Data-related issues community-contribution Contributed by the community labels Nov 17, 2025
@dancingactor
Copy link
Author

dancingactor commented Nov 17, 2025

Thanks for your detailed feedback! I have two questions:

1.

My understanding is that we should remove the original test_dataset_throughput performance test, and instead add two unit tests, test_dataset_throughput_calculation and test_operator_throughput_calculation, to verify the correctness of the dataset and operator throughput calculations.

2.

Separately, I realized that the "per node" throughputs actually represent the per-task throughput. Based on this, I think we should:

  1. Remove the "per node throughput" for the "Dataset throughput" section, because the average per-task throughput across all operators isn't really useful, and
  2. Rename "per node throughput" to "per task throughput" in the "Operator throughput" sections

May I confirm that this means we should modify the current ds.stats() output as follow

Operator 1 Map(f): 4 tasks executed, 4 blocks produced in 2.23s                                                                 
 ...                                                                                                                             
* Operator throughput:                                                                                                          
        * Total input num rows: 0 rows                                                                                          
        * Total output num rows: 100 rows                                                                                       
        * Ray Data throughput: 44.8881328745759 rows/s                                               
        * Estimated single node throughput: 32.05117589203472 rows/s    <-- change node to task

Dataset throughput:
        * Ray Data throughput: 24.66899248263124 rows/s
        * Estimated single node throughput: 16.076964501040045 rows/s.  <-- remove this line

@bveeramani
Copy link
Member

@dancingactor that's right!

@dancingactor
Copy link
Author

dancingactor commented Nov 18, 2025

Just to confirm, I need to do following things in this PR

  1. Remove test_dataset_throughput test, add test_dataset_throughput_calculation and test_operator_throughput_calculation tests
  2. Modify stats.py
  3. Modify other tests in test_stats.py that are related to the change in stats.py

@bveeramani
Copy link
Member

Just to confirm, I need to do following things in this PR

  1. Remove test_dataset_throughput test, add test_dataset_throughput_calculation and test_operator_throughput_calculation tests
  2. Modify stats.py
  3. Modify other tests in test_stats.py that are related to the change in stats.py

That sounds right.

One note of warning -- test_stats.py is extremely brittle!

@bveeramani
Copy link
Member

Hey @dancingactor, just following up here. Lemme know if I can provide any info or help to move this along!

@dancingactor
Copy link
Author

Hi @bveeramani, since I am new to ray, I spend some time understanding the context and the codebase. I almost completed the test_dataset_throughput and test_operator_throughput_calculation. and will work on ds.stats() output very soon.

@dancingactor dancingactor force-pushed the master branch 2 times, most recently from 6f974a2 to fabe20e Compare November 20, 2025 14:17
@dancingactor
Copy link
Author

dancingactor commented Nov 20, 2025

Hi @bveeramani, could you please advise on how to correctly test the new test_stats.py? 🙏

Currently I try to directly execute pytest /ray/python/ray/data/tests/test_stats.py I run into an error during environment setup. The error message is like

2025-11-21 00:50:44,720 INFO worker.py:2023 -- Started a local Ray instance.
                                                                                                                        6% ▋         

―――――――――――――――――――――――――――――――――――― ERROR at setup of test_large_args_scheduling_strategy[True] ――――――――――――――――――――――――――――――――――――

request = <SubRequest 'ray_start_regular_shared' for <Function test_streaming_split_stats>>

    @pytest.fixture(scope="module")
    def ray_start_regular_shared(request):
        param = getattr(request, "param", {})
>       with _ray_start(**param) as res:

python/ray/tests/conftest.py:615: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/contextlib.py:117: in __enter__
    return next(self.gen)
python/ray/tests/conftest.py:547: in _ray_start
    address_info = ray.init("local", **init_kwargs)
python/ray/_private/client_mode_hook.py:104: in wrapper
    return func(*args, **kwargs)
python/ray/_private/worker.py:2025: in init
    connect(
python/ray/_private/worker.py:1163: in wrapper
    return func(*args, **kwargs)
python/ray/_private/worker.py:2662: in connect
    worker.core_worker = ray._raylet.CoreWorker(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

>   self._gc_thread = PythonGCThread()
E   AttributeError: module 'ray._private.ray_constants' has no attribute 'RAY_GC_MIN_COLLECT_INTERVAL'

python/ray/_raylet.pyx:2709: AttributeError

@bveeramani
Copy link
Member

Ah, this sounds like your Ray Core version is out-of-date.

Are you building Ray Core from source, or using the setup-dev.py script? I think you might need to either rebuild Ray (if building from source) or reinstasll the Ray nightly wheel (if using setup-dev.py)

@dancingactor
Copy link
Author

Thanks! I will try the setup-dev.py approach

@bveeramani
Copy link
Member

Thanks! I will try the setup-dev.py approach

Awesome! Lemme know if you run into any problems. Happy to help you out

@dancingactor
Copy link
Author

dancingactor commented Nov 23, 2025

Hi @bveeramani,
I have tested the modified code works, PTAL

tests % pytest /Users/ryanchen/github/ray/python/ray/data/tests/test_stats.py         
Test session starts (platform: darwin, Python 3.10.19, pytest 7.4.4, pytest-sugar 0.9.5)
rootdir: /Users/ryanchen/github/ray
configfile: pytest.ini
plugins: docker-tools-3.1.3, sphinx-0.5.1.dev0, forked-1.4.0, anyio-4.11.0, asyncio-0.17.2, sugar-0.9.5, timeout-2.1.0, shutil-1.8.1, lazy-fixtures-1.1.2, rerunfailures-11.1.2, pytest_httpserver-1.1.3, virtualenv-1.8.1, mock-3.14.0, aiohttp-1.1.0
asyncio: mode=auto
timeout: 180.0s
timeout method: signal
timeout func_only: False
collecting ... 
 python/ray/data/tests/test_stats.py ss✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓s✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓                                                                100% ██████████

Results (729.00s):
      44 passed
       3 skipped

@dancingactor dancingactor force-pushed the master branch 2 times, most recently from 7f7199b to 7efd4f6 Compare November 24, 2025 04:30
@dancingactor dancingactor force-pushed the master branch 2 times, most recently from 616744c to 963260c Compare November 24, 2025 05:44
…oad and applying tolerance

Signed-off-by: dancingactor <[email protected]>
2. Rename "per node throughput" to "per task throughput" in the "Operator throughput" sections

Signed-off-by: dancingactor <[email protected]>
Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! Left a few comments

Comment on lines 1211 to 1213
output_num_rows = self.operators_stats[-1].output_num_rows
total_num_out_rows = output_num_rows["sum"] if output_num_rows else 0
wall_time = self.get_total_wall_time()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this dead code now?

out += "Dataset throughput:\n"
out += (
"\t* Ray Data throughput:"
f" {total_num_out_rows / wall_time} "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be using num_rows_per_s here, right?

# total number of rows produced the sum of the wall times across all blocks
# of all operators. This assumes that on a single node the work done would
# be equivalent, with no concurrency.
output_num_rows = self.operators_stats[-1].output_num_rows
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move the comments closer to where the calculations are actually made? (e.g., num_rows_per_s)

)
out += (
indent + "\t* Estimated single node throughput:"
indent + "\t* Estimated single task throughput:"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and in the above statement -- I think we want to use num_rows_per_s and num_rows_per_task_s?

def test_dataset_throughput(shutdown_only):
ray.shutdown()
ray.init(num_cpus=2)
blocks = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Here and for test_dataset_throughput_calculation -- I think ``block_stats` might be a clearer name to avoid confusion with the actual blocks

Comment on lines +1738 to +1739
from ray.data._internal.stats import DatasetStats
from ray.data.block import BlockExecStats, BlockStats
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Move to top for consistency with Python convention?


# NOTE: All tests above share a Ray cluster, while the tests below do not. These
# tests should only be carefully reordered to retain this invariant!
def test_dataset_throughput_calculation(ray_start_regular_shared):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this and test_operator_throughput_calculation -- I think (?) we don't need ray_start_regular_shared anymore

@bveeramani bveeramani self-assigned this Nov 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Data][Flaky] test_dataset_throughput fails when Ray Data throughput is slightly lower than single node estimate

2 participants