Skip to content
This repository was archived by the owner on Feb 26, 2025. It is now read-only.

Commit c2861df

Browse files
Use shared memory to write partial DataFrames (#33)
- Improve performance of report extraction and features calculation by writing partial DataFrames to the shared memory (or temp directory, if shared memory is not available). - Use zstd compression instead of snappy when writing parquet files. - When repo is pickled, extract the DataFrames only if they aren't already stored in the cache. - Remove fastparquet extra dependency.
1 parent 5602fd3 commit c2861df

File tree

14 files changed

+416
-218
lines changed

14 files changed

+416
-218
lines changed

CHANGELOG.rst

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,18 @@
11
Changelog
22
=========
33

4+
Version 0.9.1
5+
-------------
6+
7+
Improvements
8+
~~~~~~~~~~~~
9+
10+
- Improve performance of report extraction and features calculation by writing partial DataFrames to the shared memory (or temp directory, if shared memory is not available).
11+
Both the used memory and the execution time should be lower than before, when processing large DataFrames.
12+
- Use zstd compression instead of snappy when writing parquet files.
13+
- When ``repo`` is pickled, extract the DataFrames only if they aren't already stored in the cache.
14+
- Remove fastparquet extra dependency.
15+
416
Version 0.9.0
517
-------------
618

pyproject.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ dynamic = ["version"]
4141
[project.optional-dependencies]
4242
extra = [
4343
# extra requirements that may be dropped at some point
44-
"fastparquet>=0.8.3,!=2023.1.0", # needed by pandas to read and write parquet files
45-
"orjson", # faster json decoder used by fastparquet
4644
"tables>=3.6.1", # needed by pandas to read and write hdf files
4745
]
4846
external = [

src/blueetl/extract/report.py

Lines changed: 55 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
"""Generic Report extractor."""
22

33
import logging
4+
import tempfile
45
from abc import ABCMeta, abstractmethod
56
from dataclasses import dataclass
6-
from typing import NamedTuple, Optional, TypeVar
7+
from functools import partial
8+
from pathlib import Path
9+
from typing import Callable, NamedTuple, Optional, TypeVar
710

811
import pandas as pd
912
from blueetl_core.utils import smart_concat
@@ -16,6 +19,8 @@
1619
from blueetl.extract.simulations import Simulations
1720
from blueetl.extract.windows import Windows
1821
from blueetl.parallel import merge_filter
22+
from blueetl.store.parquet import ParquetStore
23+
from blueetl.utils import ensure_dtypes, get_shmdir, timed
1924

2025
L = logging.getLogger(__name__)
2126
ReportExtractorT = TypeVar("ReportExtractorT", bound="ReportExtractor")
@@ -96,33 +101,55 @@ def from_simulations(
96101
Returns:
97102
New instance.
98103
"""
99-
100-
def _func(key: NamedTuple, df_list: list[pd.DataFrame]) -> tuple[NamedTuple, pd.DataFrame]:
101-
# executed in a subprocess
102-
simulations_df, neurons_df, windows_df = df_list
103-
simulation_id, simulation = simulations_df.etl.one()[[SIMULATION_ID, SIMULATION]]
104-
assert simulation_id == key.simulation_id # type: ignore[attr-defined]
105-
df_list = []
106-
for inner_key, df in neurons_df.etl.groupby_iter([CIRCUIT_ID, NEURON_CLASS]):
107-
population = neuron_classes.df.etl.one(
108-
circuit_id=inner_key.circuit_id, neuron_class=inner_key.neuron_class
109-
)[POPULATION]
110-
result_df = cls._load_values(
111-
simulation=simulation,
112-
population=population,
113-
gids=df[GID],
114-
windows_df=windows_df,
104+
with tempfile.TemporaryDirectory(prefix="blueetl_", dir=get_shmdir()) as _temp_folder:
105+
with timed(L.info, "Executing merge_filter "):
106+
func = partial(
107+
_merge_filter_func,
108+
temp_folder=Path(_temp_folder),
115109
name=name,
110+
neuron_classes_df=neuron_classes.df,
111+
dataframe_builder=cls._load_values,
112+
)
113+
merge_filter(
114+
df_list=[simulations.df, neurons.df, windows.df],
115+
groupby=[SIMULATION_ID, CIRCUIT_ID],
116+
func=func,
116117
)
117-
result_df[[SIMULATION_ID, *inner_key._fields]] = [simulation_id, *inner_key]
118-
df_list.append(result_df)
119-
return smart_concat(df_list, ignore_index=True)
120-
121-
all_df = merge_filter(
122-
df_list=[simulations.df, neurons.df, windows.df],
123-
groupby=[SIMULATION_ID, CIRCUIT_ID],
124-
func=_func,
125-
parallel=True,
118+
with timed(L.info, "Executing concatenation"):
119+
df = ParquetStore(Path(_temp_folder)).load()
120+
df = ensure_dtypes(df)
121+
return cls(df, cached=False, filtered=False)
122+
123+
124+
def _merge_filter_func(
125+
task_index: int,
126+
key: NamedTuple,
127+
df_list: list[pd.DataFrame],
128+
temp_folder: Path,
129+
name: str,
130+
neuron_classes_df: pd.DataFrame,
131+
dataframe_builder: Callable[..., pd.DataFrame],
132+
) -> None:
133+
"""Executed in a subprocess, write a partial DataFrame to temp_folder."""
134+
# pylint: disable=too-many-locals
135+
simulations_df, neurons_df, windows_df = df_list
136+
simulation_id, simulation = simulations_df.etl.one()[[SIMULATION_ID, SIMULATION]]
137+
assert simulation_id == key.simulation_id # type: ignore[attr-defined]
138+
df_list = []
139+
for inner_key, df in neurons_df.etl.groupby_iter([CIRCUIT_ID, NEURON_CLASS]):
140+
population = neuron_classes_df.etl.one(
141+
circuit_id=inner_key.circuit_id, neuron_class=inner_key.neuron_class
142+
)[POPULATION]
143+
result_df = dataframe_builder(
144+
simulation=simulation,
145+
population=population,
146+
gids=df[GID],
147+
windows_df=windows_df,
148+
name=name,
126149
)
127-
df = smart_concat(all_df, ignore_index=True)
128-
return cls(df, cached=False, filtered=False)
150+
result_df[[SIMULATION_ID, *inner_key._fields]] = [simulation_id, *inner_key]
151+
df_list.append(result_df)
152+
result_df = smart_concat(df_list, ignore_index=True)
153+
# the conversion to the desired dtype here is important to reduce memory usage and cpu time
154+
result_df = ensure_dtypes(result_df)
155+
ParquetStore(temp_folder).dump(result_df, name=f"{task_index:08d}")

0 commit comments

Comments
 (0)