Skip to content
Merged
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 86 additions & 7 deletions src/fsspec_xrootd/xrootd.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import io
import logging
import os.path
import warnings
from collections import defaultdict
Expand Down Expand Up @@ -168,10 +169,21 @@ def __init__(
If true, synchronous methods will not be available in this instance
loop:
Bring your own loop (for sync methods)
storage_options:
Options for the XRootD file system object. Includes (not limited to):
- locate_all_sources = True: bool
- Defaults to True. Finds all locations at which the file is hosted, and chooses
from those. Does not let the redirector pick the first to respond.
- valid_sources = []: list
- If given and locate_all_sources is True, fsspec will only reject any file host
not in this list. Entries should be of the form ie: `cmsxrootd-site1.fnal.gov`
(no port number)
"""
super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options)
self.timeout = storage_options.get("timeout", XRootDFileSystem.default_timeout)
self.hostid = hostid
self.locate_all_sources = storage_options.get("locate_all_sources", True)
self.valid_sources = storage_options.get("valid_sources", [])
self._myclient = client.FileSystem("root://" + hostid)
if not self._myclient.url.is_valid():
raise ValueError(f"Invalid hostid: {hostid!r}")
Expand Down Expand Up @@ -702,23 +714,37 @@ def __init__(
if not isinstance(path, str):
raise ValueError(f"Path expected to be string, path: {path}")

self._myFile = client.File()
status, _n = self._myFile.open(
fs.unstrip_protocol(path),
self.mode,
timeout=self.timeout,
)
self.fs = fs

if "r" in mode and self.fs.locate_all_sources:
self._hosts = self._locate_sources(path)
else:
self._hosts = [fs.storage_options["hostid"]]

# Try hosts until you find an openable file
for _i_host in range(len(self._hosts)):
self._myFile = client.File()
status, _n = self._myFile.open(
fs.unstrip_protocol(path),
self.mode,
timeout=self.timeout,
)
if status.ok:
break

if not status.ok:
raise OSError(f"File did not open properly: {status.message}")

# Move hosts that tried and failed to self._dismissed_hosts
self._dismissed_hosts = self._hosts[:_i_host]
self._hosts = self._hosts[_i_host:]

self.metaOffset = 0
if "a" in mode:
_stats, _deets = self._myFile.stat(timeout=self.timeout)
self.metaOffset = _deets.size

self.path = path
self.fs = fs
self.mode = mode
self.blocksize = (
self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size
Expand Down Expand Up @@ -760,6 +786,59 @@ def __init__(
self.location = None
self.offset = 0

def _locate_sources(self, logical_filename: str) -> list[str]:
"""Find hosts that have the desired file.

Gets a list of hosts from the XRootD server that was provided when the
XRootDFile object was instantiated. Note that this implies it will only find
more hosts of the given file if self.fs is a redirector. Implementation of a
solution from the Pepper project in this issue:

(https://github.com/CoffeaTeam/fsspec-xrootd/issues/36).

If valid_sources is a non-empty list in fs.storage_options, will only return domain names
that are also in valid_sources

Parameters
----------
logical_filename: The logical filename of the file. (ex: "//store/mc/other/stuff/file.root")

Returns
-------
List of domain names that host the file
"""
myclient = self.fs._myclient
# From Pepper:
# The flag PrefName (to get domain names instead of IP addresses) does
# not exist in the Python bindings. However, MAKEPATH has the same value
status, loc = myclient.locate(logical_filename, client.flags.OpenFlags.MAKEPATH)
if loc is None:
raise OSError("XRootD error: " + status.message)
hosts = []
for r in loc:
if len(r.address.split(":")) > 1:
# Strip off the port number if necessary
clean_address = "".join(r.address.split(":")[:-1])
else:
clean_address = r.address
if (clean_address in self.fs.valid_sources) or (
len(self.fs.valid_sources) == 0
):
hosts.append(clean_address)
logging.debug(f"Added host {clean_address} to _hosts")
else:
logging.debug(
f"Host {clean_address} not in valid_sources {self.fs.valid_sources}"
)
if len(hosts) == 0:
err_msg = f"XRootD error: No hosts for file {logical_filename} found using XRootD server {self.fs.storage_options['hostid']}"
if len(self.fs.valid_sources) > 0:
vld_src_msg = f" and valid sources {self.fs.valid_sources}"
raise OSError(err_msg + vld_src_msg)
else:
raise OSError(err_msg)
return hosts

def _fetch_range(self, start: int, end: int) -> Any:
status, data = self._myFile.read(
self.metaOffset + start, self.metaOffset + end - start, timeout=self.timeout
Expand Down