Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
// ARROW-12481.
std::string internal_path;
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<fs::FileSystem> filesystem,
fs::FileSystemFromUri(uri, &internal_path))
fs::FileSystemFromUriAndFs(uri, &internal_path, options.file_system_java))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what you would do is, if a filesystem is provided, call FileSystem::PathFromUri to set internal_path. If it succeeds, then proceed, else raise an error.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds correct, will do later

ARROW_ASSIGN_OR_RAISE(fs::FileInfo file_info, filesystem->GetFileInfo(internal_path))
if (file_info.IsDirectory()) {
fs::FileSelector selector;
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ struct FileSystemFactoryOptions {
".",
"_",
};

/// when java context have a file system reference to be used
// then use this file system reference in the context
std::shared_ptr<void> file_system_java = nullptr;
Comment on lines +195 to +198
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be Java-specific at all.

This should be shared_ptr<FileSystem>.

};

/// \brief FileSystemDatasetFactory creates a Dataset from a vector of
Expand Down
17 changes: 16 additions & 1 deletion cpp/src/arrow/filesystem/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,8 @@ namespace {
Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const Uri& uri,
const std::string& uri_string,
const io::IOContext& io_context,
std::string* out_path) {
std::string* out_path,
std::shared_ptr<void> file_system_java) {
const auto scheme = uri.scheme();

if (scheme == "file") {
Expand All @@ -703,6 +704,7 @@ Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const Uri& uri,
if (scheme == "hdfs" || scheme == "viewfs") {
#ifdef ARROW_HDFS
ARROW_ASSIGN_OR_RAISE(auto options, HdfsOptions::FromUri(uri));
options.connection_config.filesystem_java = filesystem_java;
if (out_path != nullptr) {
*out_path = uri.path();
}
Expand Down Expand Up @@ -736,13 +738,26 @@ Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const Uri& uri,
return Status::Invalid("Unrecognized filesystem type in URI: ", uri_string);
}

Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const Uri& uri,
const std::string& uri_string,
const io::IOContext& io_context,
std::string* out_path) {
return FileSystemFromUriReal(uri, uri_string, io_context, out_path, nullptr);
}

} // namespace

Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri_string,
std::string* out_path) {
return FileSystemFromUri(uri_string, io::default_io_context(), out_path);
}

Result<std::shared_ptr<FileSystem>> FileSystemFromUriAndFs(const std::string& uri_string,
std::string* out_path, std::shared_ptr<void> file_system_java) {
ARROW_ASSIGN_OR_RAISE(auto fsuri, ParseFileSystemUri(uri_string))
return FileSystemFromUriReal(fsuri, uri_string, io::default_io_context(), out_path, file_system_java);
}

Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri_string,
const io::IOContext& io_context,
std::string* out_path) {
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/filesystem/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,11 @@ ARROW_EXPORT
Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri,
std::string* out_path = NULLPTR);

ARROW_EXPORT
Result<std::shared_ptr<FileSystem>> FileSystemFromUriAndFs(const std::string& uri_string,
std::string* out_path,
std::shared_ptr<void> file_system_java);

/// \brief Create a new FileSystem by URI with a custom IO context
///
/// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3",
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/arrow/io/hdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,11 @@ class HadoopFileSystem::HadoopFileSystemImpl {
}

driver_->BuilderSetForceNewInstance(builder);
fs_ = driver_->BuilderConnect(builder);
if (config -> filesystem_java == nullptr) {
fs_ = driver_->BuilderConnect(builder);
} else {
fs_ = *(static_cast<hdfsFS*>(config -> filesystem_java.get()));
}

if (fs_ == nullptr) {
return Status::IOError("HDFS connection failed");
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/io/hdfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ struct HdfsConnectionConfig {
std::string user;
std::string kerb_ticket;
std::unordered_map<std::string, std::string> extra_conf;
std::shared_ptr<void> filesystem_java = nullptr;
};

class ARROW_EXPORT HadoopFileSystem : public FileSystem {
Expand Down
22 changes: 22 additions & 0 deletions java/dataset/src/main/cpp/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,28 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljav
JNI_METHOD_END(-1L)
}

/*
* Class: org_apache_arrow_dataset_file_JniWrapper
* Method: makeFileSystemDatasetFactory
* Signature: (Ljava/lang/String;ILjava/lang/Object;)J
*/
JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljava_lang_String_2ILjava_lang_Object_2(
JNIEnv* env, jobject, jstring uri, jint file_format_id, jobject fs) {
JNI_METHOD_START
std::shared_ptr<arrow::dataset::FileFormat> file_format =
JniGetOrThrow(GetFileFormat(file_format_id));
arrow::dataset::FileSystemFactoryOptions options;
jobject fsObj = env->NewGlobalRef(fs);
options.file_system_java = &fsObj;

std::shared_ptr<arrow::dataset::DatasetFactory> d =
JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make(
JStringToCString(env, uri), file_format, options));
return CreateNativeRef(d);
JNI_METHOD_END(-1L)
}

/*
* Class: org_apache_arrow_dataset_file_JniWrapper
* Method: makeFileSystemDatasetFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ private JniWrapper() {
*/
public native long makeFileSystemDatasetFactory(String uri, int fileFormat);

/**
* Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a
* intermediate shared_ptr of the factory instance.
*
* @param uri file uri to read, either a file or a directory
* @param fileFormat file format ID
* @param fs the existing java context file system
* @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance.
* @see FileFormat
*/
public native long makeFileSystemDatasetFactory(String uri, int fileFormat, Object fs);

/**
* Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a
* intermediate shared_ptr of the factory instance.
Expand Down