Skip to content

Commit 4548266

Browse files
committed
Merge branch 'master' into introduce-polars
2 parents c62b9b0 + e99c591 commit 4548266

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+349
-182
lines changed

docs/conf.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
# -*- coding: utf-8 -*-
2-
31
# https://github.com/sphinx-doc/sphinx/issues/6211
42
import luigi
53

gokart/build.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
from __future__ import annotations
2+
13
import enum
24
import logging
35
import sys
46
from dataclasses import dataclass
57
from functools import partial
68
from logging import getLogger
7-
from typing import Literal, Optional, Protocol, TypeVar, cast, overload
9+
from typing import Literal, Protocol, TypeVar, cast, overload
810

911
import backoff
1012
import luigi
@@ -62,7 +64,7 @@ def add(self, task: TaskOnKart) -> bool: ...
6264

6365
def run(self) -> bool: ...
6466

65-
def __enter__(self) -> 'WorkerProtocol': ...
67+
def __enter__(self) -> WorkerProtocol: ...
6668

6769
def __exit__(self, type, value, traceback) -> Literal[False]: ...
6870

@@ -162,7 +164,7 @@ def build(
162164
task_lock_exception_max_wait_seconds: int = 600,
163165
task_dump_config: TaskDumpConfig = TaskDumpConfig(),
164166
**env_params,
165-
) -> Optional[T]:
167+
) -> T | None:
166168
"""
167169
Run gokart task for local interpreter.
168170
Sharing the most of its parameters with luigi.build (see https://luigi.readthedocs.io/en/stable/api/luigi.html?highlight=build#luigi.build)

gokart/build_process_task_info.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import annotations
2+
13
import io
24

35
import gokart

gokart/config_params.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
from typing import Dict, Optional, Type
1+
from __future__ import annotations
22

33
import luigi
44

55
import gokart
66

77

88
class inherits_config_params:
9-
def __init__(self, config_class: Type[luigi.Config], parameter_alias: Optional[Dict[str, str]] = None):
9+
def __init__(self, config_class: type[luigi.Config], parameter_alias: dict[str, str] | None = None):
1010
"""
1111
Decorates task to inherit parameter value of `config_class`.
1212
@@ -15,10 +15,10 @@ def __init__(self, config_class: Type[luigi.Config], parameter_alias: Optional[D
1515
key: config_class's parameter name. value: decorated task's parameter name.
1616
"""
1717

18-
self._config_class: Type[luigi.Config] = config_class
19-
self._parameter_alias: Dict[str, str] = parameter_alias if parameter_alias is not None else {}
18+
self._config_class: type[luigi.Config] = config_class
19+
self._parameter_alias: dict[str, str] = parameter_alias if parameter_alias is not None else {}
2020

21-
def __call__(self, task_class: Type[gokart.TaskOnKart]):
21+
def __call__(self, task_class: type[gokart.TaskOnKart]):
2222
# wrap task to prevent task name from being changed
2323
@luigi.task._task_wraps(task_class)
2424
class Wrapped(task_class): # type: ignore
@@ -29,6 +29,6 @@ def get_param_values(cls, params, args, kwargs):
2929

3030
if hasattr(cls, task_param_key) and task_param_key not in kwargs:
3131
kwargs[task_param_key] = param_value
32-
return super(Wrapped, cls).get_param_values(params, args, kwargs)
32+
return super().get_param_values(params, args, kwargs)
3333

3434
return Wrapped

gokart/conflict_prevention_lock/task_lock.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
from __future__ import annotations
2+
13
import functools
24
import os
35
from logging import getLogger
4-
from typing import NamedTuple, Optional
6+
from typing import NamedTuple
57

68
import redis
79
from apscheduler.schedulers.background import BackgroundScheduler
@@ -10,9 +12,9 @@
1012

1113

1214
class TaskLockParams(NamedTuple):
13-
redis_host: Optional[str]
14-
redis_port: Optional[int]
15-
redis_timeout: Optional[int]
15+
redis_host: str | None
16+
redis_port: int | None
17+
redis_timeout: int | None
1618
redis_key: str
1719
should_task_lock: bool
1820
raise_task_lock_exception_on_collision: bool
@@ -31,10 +33,10 @@ def __new__(cls, *args, **kwargs):
3133
if cls not in cls._instances:
3234
cls._instances[cls] = {}
3335
if key not in cls._instances[cls]:
34-
cls._instances[cls][key] = super(RedisClient, cls).__new__(cls)
36+
cls._instances[cls][key] = super().__new__(cls)
3537
return cls._instances[cls][key]
3638

37-
def __init__(self, host: Optional[str], port: Optional[int]) -> None:
39+
def __init__(self, host: str | None, port: int | None) -> None:
3840
if not hasattr(self, '_redis_client'):
3941
host = host or 'localhost'
4042
port = port or 6379
@@ -72,17 +74,17 @@ def set_lock_scheduler(task_lock: redis.lock.Lock, task_lock_params: TaskLockPar
7274
return scheduler
7375

7476

75-
def make_task_lock_key(file_path: str, unique_id: Optional[str]):
77+
def make_task_lock_key(file_path: str, unique_id: str | None):
7678
basename_without_ext = os.path.splitext(os.path.basename(file_path))[0]
7779
return f'{basename_without_ext}_{unique_id}'
7880

7981

8082
def make_task_lock_params(
8183
file_path: str,
82-
unique_id: Optional[str],
83-
redis_host: Optional[str] = None,
84-
redis_port: Optional[int] = None,
85-
redis_timeout: Optional[int] = None,
84+
unique_id: str | None,
85+
redis_host: str | None = None,
86+
redis_port: int | None = None,
87+
redis_timeout: int | None = None,
8688
raise_task_lock_exception_on_collision: bool = False,
8789
lock_extend_seconds: int = 10,
8890
) -> TaskLockParams:

gokart/conflict_prevention_lock/task_lock_wrappers.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import annotations
2+
13
import functools
24
from logging import getLogger
35
from typing import Any, Callable

gokart/file_processor.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import annotations
2+
13
import os
24
import xml.etree.ElementTree as ET
35
from abc import abstractmethod
@@ -63,7 +65,7 @@ def dump(self, obj, file):
6365
file.write(obj)
6466

6567

66-
class _ChunkedLargeFileReader(object):
68+
class _ChunkedLargeFileReader:
6769
def __init__(self, file) -> None:
6870
self._file = file
6971

@@ -183,8 +185,11 @@ def dump(self, obj, file):
183185

184186

185187
class JsonFileProcessor(FileProcessor):
188+
def __init__(self, orient: str | None = None):
189+
self._orient = orient
190+
186191
def format(self):
187-
return None
192+
return luigi.format.Nop
188193

189194
def load(self, file):
190195
...
@@ -196,6 +201,8 @@ def dump(self, obj, file):
196201
class PolarsJsonFileProcessor(JsonFileProcessor):
197202
def load(self, file):
198203
try:
204+
if self._orient == 'records':
205+
return self.read_ndjson(file)
199206
return self.read_json(file)
200207
except pl.exceptions.NoDataError:
201208
return pl.DataFrame()
@@ -206,13 +213,17 @@ def dump(self, obj, file):
206213
)
207214
if isinstance(obj, dict):
208215
obj = pl.from_dict(obj)
209-
obj.write_json(file)
216+
217+
if self._orient == 'records':
218+
obj_write_ndjson(file)
219+
else:
220+
obj.write_json(file)
210221

211222

212223
class PandasJsonFileProcessor(JsonFileProcessor):
213224
def load(self, file):
214225
try:
215-
return self.read_json(file)
226+
return pd.read_json(file, orient=self._orient, lines=True if self._orient == 'records' else False)
216227
except pd.errors.EmptyDataError:
217228
return pd.DataFrame()
218229

@@ -222,7 +233,7 @@ def dump(self, obj, file):
222233
)
223234
if isinstance(obj, dict):
224235
obj = pd.DataFrame.from_dict(obj)
225-
obj.to_json(file)
236+
obj.to_json(file, orient=self._orient, lines=True if self._orient == 'records' else False)
226237

227238

228239
class XmlFileProcessor(FileProcessor):
@@ -386,6 +397,7 @@ def make_file_processor(file_path: str, store_index_in_feather: bool) -> FilePro
386397
'.pkl': PickleFileProcessor(),
387398
'.gz': GzipFileProcessor(),
388399
'.json': JsonFileProcessor(),
400+
'.ndjson': JsonFileProcessor(orient='records'),
389401
'.xml': XmlFileProcessor(),
390402
'.npz': NpzFileProcessor(),
391403
'.parquet': ParquetFileProcessor(compression='gzip'),

gokart/gcs_config.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
from __future__ import annotations
2+
13
import json
24
import os
3-
from typing import Optional
45

56
import luigi
67
import luigi.contrib.gcs
@@ -19,7 +20,7 @@ def get_gcs_client(self) -> luigi.contrib.gcs.GCSClient:
1920
def _get_gcs_client(self) -> luigi.contrib.gcs.GCSClient:
2021
return luigi.contrib.gcs.GCSClient(oauth_credentials=self._load_oauth_credentials())
2122

22-
def _load_oauth_credentials(self) -> Optional[Credentials]:
23+
def _load_oauth_credentials(self) -> Credentials | None:
2324
json_str = os.environ.get(self.gcs_credential_name)
2425
if not json_str:
2526
return None

gokart/gcs_obj_metadata_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import copy
44
import re
55
from logging import getLogger
6-
from typing import Any, Union
6+
from typing import Any
77
from urllib.parse import urlsplit
88

99
from googleapiclient.model import makepatch
@@ -84,7 +84,7 @@ def _get_patched_obj_metadata(
8484
metadata: Any,
8585
task_params: dict[str, str] | None = None,
8686
custom_labels: dict[str, Any] | None = None,
87-
) -> Union[dict, Any]:
87+
) -> dict | Any:
8888
# If metadata from response when getting bucket and object information is not dictionary,
8989
# something wrong might be happened, so return original metadata, no patched.
9090
if not isinstance(metadata, dict):

gokart/gcs_zip_client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import annotations
2+
13
import os
24
import shutil
35

0 commit comments

Comments
 (0)