Skip to content
Merged
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c5dcc11
wip
sven1977 Aug 13, 2024
bf7a10e
Merge branch 'master' of https://github.com/ray-project/ray into docs…
sven1977 Aug 22, 2024
32b33ec
wip
sven1977 Aug 22, 2024
e7a6f24
wip
sven1977 Aug 23, 2024
30272c1
wip
sven1977 Aug 26, 2024
f097d1a
Merge branch 'master' of https://github.com/ray-project/ray into docs…
sven1977 Aug 26, 2024
f2a2359
wip
sven1977 Aug 26, 2024
bba9fbb
wip
sven1977 Aug 26, 2024
328e0c1
wip
sven1977 Aug 26, 2024
345ee78
wip
sven1977 Aug 26, 2024
ebdd51b
Merge branch 'master' of https://github.com/ray-project/ray into docs…
sven1977 Jan 19, 2025
5a822cb
wip
sven1977 Jan 19, 2025
e7468dd
Merge branch 'master' of https://github.com/ray-project/ray into docs…
sven1977 Jun 11, 2025
2550c03
wip
sven1977 Jun 11, 2025
bc9fb66
LINT
sven1977 Jun 11, 2025
b870d4f
LINT
sven1977 Jun 12, 2025
9df3bdf
Merge branch 'master' of https://github.com/ray-project/ray into docs…
sven1977 Jun 13, 2025
d513f61
wip
sven1977 Jun 13, 2025
f273b72
wip
sven1977 Jun 17, 2025
7bfb131
Merge branch 'master' of https://github.com/ray-project/ray into docs…
sven1977 Jun 17, 2025
b96eaa0
wip
sven1977 Jun 17, 2025
8b9a981
wip
sven1977 Jun 18, 2025
bf6a086
Merge branch 'master' of https://github.com/ray-project/ray into docs…
sven1977 Jun 18, 2025
68f48a6
wip
sven1977 Jun 23, 2025
30bde82
Merge branch 'master' of https://github.com/ray-project/ray into docs…
sven1977 Jun 25, 2025
f7df27d
Merge branch 'master' of https://github.com/ray-project/ray into docs…
sven1977 Jun 30, 2025
99da912
wip
sven1977 Jun 30, 2025
ad3c2eb
wip
sven1977 Jun 30, 2025
ac04d49
wip
sven1977 Jun 30, 2025
801549a
wip
sven1977 Jun 30, 2025
befdb79
wip
sven1977 Jul 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 116 additions & 12 deletions rllib/connectors/env_to_module/observation_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,24 @@

from ray.rllib.connectors.connector_v2 import ConnectorV2
from ray.rllib.core.rl_module.rl_module import RLModule
from ray.rllib.env.multi_agent_episode import MultiAgentEpisode
from ray.rllib.env.single_agent_episode import SingleAgentEpisode
from ray.rllib.utils.annotations import override
from ray.rllib.utils.typing import EpisodeType
from ray.util.annotations import PublicAPI


@PublicAPI(stability="alpha")
class ObservationPreprocessor(ConnectorV2, abc.ABC):
"""Env-to-module connector performing one preprocessor step on the last observation.
class SingleAgentObservationPreprocessor(ConnectorV2, abc.ABC):
"""Env-to-module connector preprocessing the most recent single-agent observation.

This is a convenience class that simplifies the writing of few-step preprocessor
connectors.

Note that this class also works in a multi-agent setup, in which case RLlib
separately calls this connector piece with each agents' observation and
`SingleAgentEpisode` object.

Users must implement the `preprocess()` method, which simplifies the usual procedure
of extracting some data from a list of episodes and adding it to the batch to a mere
"old-observation --transform--> return new-observation" step.
Expand All @@ -28,23 +34,27 @@ def recompute_output_observation_space(
input_observation_space: gym.Space,
input_action_space: gym.Space,
) -> gym.Space:
# Users should override this method only in case the `ObservationPreprocessor`
# changes the observation space of the pipeline. In this case, return the new
# observation space based on the incoming one (`input_observation_space`).
# Users should override this method only in case the
# `SingleAgentObservationPreprocessor` changes the observation space of the
# pipeline. In this case, return the new observation space based on the
# incoming one (`input_observation_space`).
return super().recompute_output_observation_space(
input_observation_space, input_action_space
)

@abc.abstractmethod
def preprocess(self, observation):
def preprocess(self, observation, episode: SingleAgentEpisode):
"""Override to implement the preprocessing logic.

Args:
observation: A single (non-batched) observation item for a single agent to
be processed by this connector.
be preprocessed by this connector.
episode: The `SingleAgentEpisode` instance, from which `observation` was
taken. You can extract information on the particular AgentID and the
ModuleID through `episode.agent_id` and `episode.module_id`.

Returns:
The new observation after `observation` has been preprocessed.
The new observation for the agent after `observation` has been preprocessed.
"""

@override(ConnectorV2)
Expand All @@ -67,14 +77,108 @@ def __call__(

# Process the observation and write the new observation back into the
# episode.
new_observation = self.preprocess(observation=observation)
new_observation = self.preprocess(
observation=observation,
episode=sa_episode,
)
sa_episode.set_observations(at_indices=-1, new_data=new_observation)
# We set the Episode's observation space to ours so that we can safely
# set the last obs to the new value (without causing a space mismatch
# error).
sa_episode.observation_space = self.observation_space

# Leave `batch` as is. RLlib's default connector will automatically
# populate the OBS column therein from the episodes' now transformed
# observations.
# Leave `batch` as is. RLlib's default connector automatically populates
# the OBS column therein from the episodes' now transformed observations.
return batch


@PublicAPI(stability="alpha")
class MultiAgentObservationPreprocessor(ConnectorV2, abc.ABC):
"""Env-to-module connector preprocessing the most recent multi-agent observation.

The observation is always a dict of individual agents' observations.

This is a convenience class that simplifies the writing of few-step preprocessor
connectors.

Users must implement the `preprocess()` method, which simplifies the usual procedure
of extracting some data from a list of episodes and adding it to the batch to a mere
"old-observation --transform--> return new-observation" step.
"""

@override(ConnectorV2)
def recompute_output_observation_space(
self,
input_observation_space: gym.Space,
input_action_space: gym.Space,
) -> gym.Space:
# Users should override this method only in case the
# `MultiAgentObservationPreprocessor` changes the observation space of the
# pipeline. In this case, return the new observation space based on the
# incoming one (`input_observation_space`).
return super().recompute_output_observation_space(
input_observation_space, input_action_space
)

@abc.abstractmethod
def preprocess(self, observations, episode: MultiAgentEpisode):
"""Override to implement the preprocessing logic.

Args:
observations: An observation dict containing each stepping agents'
(non-batched) observation to be preprocessed by this connector.
episode: The MultiAgentEpisode instance, where the `observation` dict
originated from.

Returns:
The new multi-agent observation dict after `observations` has been
preprocessed.
"""

@override(ConnectorV2)
def __call__(
self,
*,
rl_module: RLModule,
batch: Dict[str, Any],
episodes: List[EpisodeType],
explore: Optional[bool] = None,
persistent_data: Optional[dict] = None,
**kwargs,
) -> Any:
# We process and then replace observations inside the episodes directly.
# Thus, all following connectors will only see and operate on the already
# processed observation (w/o having access anymore to the original
# observations).
for ma_episode in episodes:
observations = ma_episode.get_observations(-1)

# Process the observation and write the new observation back into the
# episode.
new_observation = self.preprocess(
observations=observations,
episode=ma_episode,
)
# TODO (sven): Implement set_observations API for multi-agent episodes.
# For now, we'll hack it through the single agent APIs.
# ma_episode.set_observations(at_indices=-1, new_data=new_observation)
for agent_id, obs in new_observation.items():
ma_episode.agent_episodes[agent_id].set_observations(
at_indices=-1,
new_data=obs,
)
ma_episode.agent_episodes[
agent_id
].observation_space = self.observation_space[agent_id]
# We set the Episode's observation space to ours so that we can safely
# set the last obs to the new value (without causing a space mismatch
# error).
ma_episode.observation_space = self.observation_space

# Leave `batch` as is. RLlib's default connector automatically populates
# the OBS column therein from the episodes' now transformed observations.
return batch


# Backward compatibility
ObservationPreprocessor = SingleAgentObservationPreprocessor