Skip to content

Commit 9302c27

Browse files
authored
[HOPSWORKS-1971] Add HSFS python support (#206)
1 parent b2a7e0b commit 9302c27

27 files changed

+1014
-200
lines changed

python/hsfs/client/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ def _send_request(
9191
headers=None,
9292
data=None,
9393
stream=False,
94+
files=None,
9495
):
9596
"""Send REST request to Hopsworks.
9697
@@ -110,6 +111,8 @@ def _send_request(
110111
:type data: dict, optional
111112
:param stream: Set if response should be a stream, defaults to False
112113
:type stream: boolean, optional
114+
:param files: dictionary for multipart encoding upload
115+
:type files: dict, optional
113116
:raises RestAPIError: Raised when request wasn't correctly received, understood or accepted
114117
:return: Response json
115118
:rtype: dict
@@ -126,6 +129,7 @@ def _send_request(
126129
data=data,
127130
params=query_params,
128131
auth=self._auth,
132+
files=files,
129133
)
130134

131135
prepped = self._session.prepare_request(request)

python/hsfs/client/external.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,12 @@ def __init__(
8080
os.makedirs(self._cert_folder, exist_ok=True)
8181
credentials = self._get_credentials(self._project_id)
8282
self._write_b64_cert_to_bytes(
83-
str(credentials["kStore"]), path=self._get_jks_key_store_path(),
83+
str(credentials["kStore"]),
84+
path=self._get_jks_key_store_path(),
8485
)
8586
self._write_b64_cert_to_bytes(
86-
str(credentials["tStore"]), path=self._get_jks_trust_store_path(),
87+
str(credentials["tStore"]),
88+
path=self._get_jks_trust_store_path(),
8789
)
8890

8991
self._cert_key = str(credentials["password"])

python/hsfs/constructor/on_demand_feature_group_alias.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121

2222
class OnDemandFeatureGroupAlias:
2323
def __init__(self, on_demand_feature_group, alias):
24-
self._on_demand_feature_group = feature_group.OnDemandFeatureGroup.from_response_json(
25-
on_demand_feature_group
24+
self._on_demand_feature_group = (
25+
feature_group.OnDemandFeatureGroup.from_response_json(
26+
on_demand_feature_group
27+
)
2628
)
2729
self._alias = alias
2830

python/hsfs/constructor/query.py

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#
1616

1717
import json
18+
import humps
1819
from typing import Optional, List, Union
1920

2021
from hsfs import util, engine
@@ -25,21 +26,23 @@
2526
class Query:
2627
def __init__(
2728
self,
28-
feature_store_name,
29-
feature_store_id,
3029
left_feature_group,
3130
left_features,
32-
left_featuregroup_start_time=None,
33-
left_featuregroup_end_time=None,
31+
feature_store_name=None,
32+
feature_store_id=None,
33+
left_feature_group_start_time=None,
34+
left_feature_group_end_time=None,
35+
joins=[],
36+
filter=None,
3437
):
3538
self._feature_store_name = feature_store_name
3639
self._feature_store_id = feature_store_id
3740
self._left_feature_group = left_feature_group
3841
self._left_features = util.parse_features(left_features)
39-
self._left_featuregroup_start_time = left_featuregroup_start_time
40-
self._left_featuregroup_end_time = left_featuregroup_end_time
41-
self._joins = []
42-
self._filter = None
42+
self._left_feature_group_start_time = left_feature_group_start_time
43+
self._left_feature_group_end_time = left_feature_group_end_time
44+
self._joins = joins
45+
self._filter = filter
4346
self._query_constructor_api = query_constructor_api.QueryConstructorApi()
4447
self._storage_connector_api = storage_connector_api.StorageConnectorApi(
4548
feature_store_id
@@ -155,13 +158,13 @@ def join(
155158

156159
def as_of(self, wallclock_time):
157160
for join in self._joins:
158-
join.query.left_featuregroup_end_time = wallclock_time
159-
self.left_featuregroup_end_time = wallclock_time
161+
join.query.left_feature_group_end_time = wallclock_time
162+
self.left_feature_group_end_time = wallclock_time
160163
return self
161164

162165
def pull_changes(self, wallclock_start_time, wallclock_end_time):
163-
self.left_featuregroup_start_time = wallclock_start_time
164-
self.left_featuregroup_end_time = wallclock_end_time
166+
self.left_feature_group_start_time = wallclock_start_time
167+
self.left_feature_group_end_time = wallclock_end_time
165168
return self
166169

167170
def filter(self, f: Union[filter.Filter, filter.Logic]):
@@ -211,14 +214,32 @@ def json(self):
211214

212215
def to_dict(self):
213216
return {
217+
"featureStoreName": self._feature_store_name,
218+
"featureStoreId": self._feature_store_id,
214219
"leftFeatureGroup": self._left_feature_group,
215220
"leftFeatures": self._left_features,
216-
"leftFeatureGroupStartTime": self._left_featuregroup_start_time,
217-
"leftFeatureGroupEndTime": self._left_featuregroup_end_time,
221+
"leftFeatureGroupStartTime": self._left_feature_group_start_time,
222+
"leftFeatureGroupEndTime": self._left_feature_group_end_time,
218223
"joins": self._joins,
219224
"filter": self._filter,
220225
}
221226

227+
@classmethod
228+
def _hopsworks_json(cls, json_dict):
229+
"""
230+
This method is used by the Hopsworks helper job.
231+
It does not fully deserialize the message as the usecase is to
232+
send it straight back to Hopsworks to read the content of the query
233+
234+
Args:
235+
json_dict (str): a json string containing a query object
236+
237+
Returns:
238+
A partially deserialize query object
239+
"""
240+
json_decamelized = humps.decamelize(json_dict)
241+
return cls(**json_decamelized)
242+
222243
def to_string(self, online=False):
223244
fs_query_instance = self._query_constructor_api.construct_query(self)
224245
return fs_query_instance.query_online if online else fs_query_instance.query
@@ -232,24 +253,25 @@ def _register_on_demand(self, on_demand_fg_aliases):
232253

233254
for on_demand_fg_alias in on_demand_fg_aliases:
234255
engine.get_instance().register_on_demand_temporary_table(
235-
on_demand_fg_alias.on_demand_feature_group, on_demand_fg_alias.alias,
256+
on_demand_fg_alias.on_demand_feature_group,
257+
on_demand_fg_alias.alias,
236258
)
237259

238260
@property
239-
def left_featuregroup_start_time(self):
240-
return self._left_featuregroup_start_time
261+
def left_feature_group_start_time(self):
262+
return self._left_feature_group_start_time
241263

242264
@property
243-
def left_featuregroup_end_time(self):
244-
return self._left_featuregroup_start_time
265+
def left_feature_group_end_time(self):
266+
return self._left_feature_group_start_time
245267

246-
@left_featuregroup_start_time.setter
247-
def left_featuregroup_start_time(self, left_featuregroup_start_time):
248-
self._left_featuregroup_start_time = left_featuregroup_start_time
268+
@left_feature_group_start_time.setter
269+
def left_feature_group_start_time(self, left_feature_group_start_time):
270+
self._left_feature_group_start_time = left_feature_group_start_time
249271

250-
@left_featuregroup_end_time.setter
251-
def left_featuregroup_end_time(self, left_featuregroup_start_time):
252-
self._left_featuregroup_end_time = left_featuregroup_start_time
272+
@left_feature_group_end_time.setter
273+
def left_feature_group_end_time(self, left_feature_group_start_time):
274+
self._left_feature_group_end_time = left_feature_group_start_time
253275

254276
def _register_hudi_tables(
255277
self, hudi_feature_groups, feature_store_id, feature_store_name, read_options

python/hsfs/core/dataset_api.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#
2+
# Copyright 2020 Logical Clocks AB
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
import math
18+
19+
from hsfs import client, util
20+
21+
22+
class DatasetApi:
23+
DEFAULT_FLOW_CHUNK_SIZE = 1048576
24+
25+
def upload(self, feature_group, path, dataframe):
26+
# Convert the dataframe into CSV for upload
27+
df_csv = dataframe.to_csv(index=False)
28+
num_chunks = math.ceil(len(df_csv) / self.DEFAULT_FLOW_CHUNK_SIZE)
29+
30+
base_params = self._get_flow_base_params(feature_group, num_chunks, len(df_csv))
31+
32+
chunks = [
33+
df_csv[i : i + self.DEFAULT_FLOW_CHUNK_SIZE]
34+
for i in range(0, len(df_csv), self.DEFAULT_FLOW_CHUNK_SIZE)
35+
]
36+
37+
chunk_number = 1
38+
for chunk in chunks:
39+
query_params = base_params
40+
query_params["flowCurrentChunkSize"] = len(chunk)
41+
query_params["flowChunkNumber"] = chunk_number
42+
43+
self._upload_request(
44+
query_params, path, util.feature_group_name(feature_group), chunk
45+
)
46+
47+
chunk_number += 1
48+
49+
def _get_flow_base_params(self, feature_group, num_chunks, size):
50+
# TODO(fabio): flow identifier is not unique
51+
return {
52+
"templateId": -1,
53+
"flowChunkSize": self.DEFAULT_FLOW_CHUNK_SIZE,
54+
"flowTotalSize": size,
55+
"flowIdentifier": util.feature_group_name(feature_group),
56+
"flowFilename": util.feature_group_name(feature_group),
57+
"flowRelativePath": util.feature_group_name(feature_group),
58+
"flowTotalChunks": num_chunks,
59+
}
60+
61+
def _upload_request(self, params, path, file_name, chunk):
62+
_client = client.get_instance()
63+
path_params = ["project", _client._project_id, "dataset", "upload", path]
64+
65+
# Flow configuration params are sent as form data
66+
_client._send_request(
67+
"POST", path_params, data=params, files={"file": (file_name, chunk)}
68+
)

python/hsfs/core/feature_group_api.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
from hsfs import client
1818
from hsfs import feature_group, feature_group_commit
19+
from hsfs.core import ingestion_job
1920

2021

2122
class FeatureGroupApi:
@@ -221,3 +222,30 @@ def commit_details(self, feature_group_instance, limit):
221222
return feature_group_commit.FeatureGroupCommit.from_response_json(
222223
_client._send_request("GET", path_params, query_params, headers=headers),
223224
)
225+
226+
def ingestion(self, feature_group_instance, ingestion_conf):
227+
"""
228+
Setup a Hopsworks job for dataframe ingestion
229+
Args:
230+
feature_group_instance: FeatureGroup, required
231+
metadata object of feature group.
232+
ingestion_conf: the configuration for the ingestion job application
233+
"""
234+
235+
_client = client.get_instance()
236+
path_params = [
237+
"project",
238+
_client._project_id,
239+
"featurestores",
240+
self._feature_store_id,
241+
"featuregroups",
242+
feature_group_instance.id,
243+
"ingestion",
244+
]
245+
246+
headers = {"content-type": "application/json"}
247+
return ingestion_job.IngestionJob.from_response_json(
248+
_client._send_request(
249+
"POST", path_params, headers=headers, data=ingestion_conf.json()
250+
),
251+
)

python/hsfs/core/hudi_engine.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def __init__(
7171
self._feature_store_id = feature_store_id
7272
self._feature_store_name = feature_store_name
7373
self._base_path = self._feature_group.location
74-
self._table_name = feature_group.name + "_" + str(feature_group.version)
74+
self._table_name = util.feature_group_name(feature_group)
7575

7676
self._primary_key = ",".join(feature_group.primary_key)
7777
self._partition_key = (

python/hsfs/core/ingestion_job.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#
2+
# Copyright 2020 Logical Clocks AB
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
import humps
18+
from hsfs.core.job import Job
19+
20+
21+
class IngestionJob:
22+
def __init__(
23+
self,
24+
data_path,
25+
job,
26+
href=None,
27+
expand=None,
28+
items=None,
29+
count=None,
30+
type=None,
31+
):
32+
self._data_path = data_path
33+
self._job = Job.from_response_json(job)
34+
35+
@classmethod
36+
def from_response_json(cls, json_dict):
37+
json_decamelized = humps.decamelize(json_dict)
38+
return cls(**json_decamelized)
39+
40+
@property
41+
def data_path(self):
42+
return self._data_path
43+
44+
@property
45+
def job(self):
46+
return self._job

0 commit comments

Comments
 (0)