Skip to content

Commit 61fd065

Browse files
committed
fix: add safety check for _jvm and _jsc in _list_hdfs_files
1 parent 916788e commit 61fd065

File tree

1 file changed

+17
-14
lines changed
  • sdk/python/feast/infra/offline_stores/contrib/spark_offline_store

1 file changed

+17
-14
lines changed

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,6 @@ def persist(
442442
self.to_spark_df().write.format(file_format).saveAsTable(table_name)
443443
else:
444444
self.to_spark_df().createOrReplaceTempView(table_name)
445-
446445

447446
def _has_remote_warehouse_in_config(self) -> bool:
448447
"""
@@ -506,7 +505,7 @@ def to_remote_storage(self) -> List[str]:
506505
spark_session = get_spark_session_or_start_new_with_repoconfig(
507506
store_config=self._config.offline_store
508507
)
509-
return self._list_hdfs_files(spark_session, output_uri)
508+
return _list_hdfs_files(spark_session, output_uri)
510509
else:
511510
raise NotImplementedError(
512511
"to_remote_storage is only implemented for file://, s3:// and hdfs:// uri schemes"
@@ -515,18 +514,6 @@ def to_remote_storage(self) -> List[str]:
515514
else:
516515
raise NotImplementedError()
517516

518-
def _list_hdfs_files(self, spark_session: SparkSession, uri: str) -> List[str]:
519-
jvm = spark_session._jvm
520-
conf = spark_session._jsc.hadoopConfiguration()
521-
path = jvm.org.apache.hadoop.fs.Path(uri)
522-
fs = jvm.org.apache.hadoop.fs.FileSystem.get(path.toUri(), conf)
523-
statuses = fs.listStatus(path)
524-
files = []
525-
for f in statuses:
526-
if f.isFile():
527-
files.append(f.getPath().toString())
528-
return files
529-
530517
@property
531518
def metadata(self) -> Optional[RetrievalMetadata]:
532519
"""
@@ -650,6 +637,22 @@ def _list_files_in_folder(folder):
650637
return files
651638

652639

640+
def _list_hdfs_files(spark_session: SparkSession, uri: str) -> List[str]:
641+
jvm = spark_session._jvm
642+
jsc = spark_session._jsc
643+
if jvm is None or jsc is None:
644+
raise RuntimeError("Spark JVM or JavaSparkContext is not available")
645+
conf = jsc.hadoopConfiguration()
646+
path = jvm.org.apache.hadoop.fs.Path(uri)
647+
fs = jvm.org.apache.hadoop.fs.FileSystem.get(path.toUri(), conf)
648+
statuses = fs.listStatus(path)
649+
files = []
650+
for f in statuses:
651+
if f.isFile():
652+
files.append(f.getPath().toString())
653+
return files
654+
655+
653656
def _cast_data_frame(
654657
df_new: pyspark.sql.DataFrame, df_existing: pyspark.sql.DataFrame
655658
) -> pyspark.sql.DataFrame:

0 commit comments

Comments
 (0)