Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ def read(fname):
azure_deps = ['azure-storage-blob', 'azure-common', 'azure-core']
http_deps = ['requests']
ssh_deps = ['paramiko']
lakefs_deps = ['lakefs_client']

all_deps = aws_deps + gcs_deps + azure_deps + http_deps + ssh_deps
all_deps = aws_deps + gcs_deps + azure_deps + http_deps + ssh_deps + lakefs_deps
tests_require = all_deps + [
'moto[server]',
'responses',
Expand All @@ -65,7 +66,7 @@ def read(fname):
url='https://github.com/piskvorky/smart_open',
download_url='http://pypi.python.org/pypi/smart_open',

keywords='file streaming, s3, hdfs, gcs, azure blob storage',
keywords='file streaming, s3, hdfs, gcs, azure blob storage, lakefs',

license='MIT',
platforms='any',
Expand All @@ -80,6 +81,7 @@ def read(fname):
'http': http_deps,
'webhdfs': http_deps,
'ssh': ssh_deps,
'lakefs': lakefs_deps,
},
python_requires=">=3.6,<4.0",

Expand Down
255 changes: 255 additions & 0 deletions smart_open/lakefs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
from __future__ import annotations

import dataclasses
import functools
import io
import logging
import os
import re
from typing import IO

try:
from lakefs_client import apis, configuration, models
from lakefs_client import client as lfs_client
except ImportError:
MISSING_DEPS = True

from smart_open import constants, utils

SCHEME = "lakefs"

URI_EXAMPLES = (
"lakefs://REPO/REF/file",
"lakefs:///REPO/main/file.bz2",
)

DEFAULT_BUFFER_SIZE = 4 * 1024**2

logger = logging.getLogger(__name__)


@dataclasses.dataclass
class ParsedURI:
scheme: str
repo: str
ref: str
key: str


def parse_uri(uri_as_string: str) -> ParsedURI:
"""lakefs protocol URIs.

lakeFS uses a specific format for path URIs. The URI lakefs://<REPO>/<REF>/<KEY>
is a path to objects in the given repo and ref expression under key. This is used
both for path prefixes and for full paths. In similar fashion, lakefs://<REPO>/<REF>
identifies the repository at a ref expression, and lakefs://<REPO> identifies a repo.
"""
sr = utils.safe_urlsplit(uri_as_string)
if sr.scheme != SCHEME:
raise ValueError(f"Scheme is not `lakefs` in {uri_as_string}")
_pattern = r"^/(?P<ref>[^/]+)/(?P<key>.+)"
_match = re.fullmatch(_pattern, sr.path)
if _match is None:
raise ValueError(
f"Missing `branch/commit` and `path` in {uri_as_string}."
"The URI should have the format of `lakefs://<REPO>/<REF>/<KEY>`"
)
else:
ref, key = _match.groups()
return ParsedURI(scheme=sr.scheme, repo=sr.netloc, ref=ref, key=key)


def open_uri(uri: str, mode: str, transport_params: dict) -> IO:
"""Return a file-like object pointing to the URI.

:param str uri: The URI to open
:param str mode: Either "rb" or "wb".
:param dict transport_params: Any additional parameters to pass to `open`.

:returns: file-like object.
:rtype: file-like
"""
parsed_uri = parse_uri(uri)
kwargs = utils.check_kwargs(open, transport_params)
return open(parsed_uri.repo, parsed_uri.ref, parsed_uri.key, mode, **kwargs)


def open(
repo: str,
ref: str,
key: str,
mode: str,
client: lfs_client.LakeFSClient | None = None,
commit_message: str | None = None,
buffer_size: int = DEFAULT_BUFFER_SIZE,
):
"""Open a lakefs object for reading or writing.

Parameters
----------
repo: str
The name of the repository this object resides in.
ref: str
The name of the branch or commit.
key: str
The path to the object for a given repo and branch.
mode: str
The mode for opening the object. Must be either "rb" or "wb".
client: lakefs_client.client.LakeFSClient
The lakefs client to use.
commit_message: str
Only when writing. The message to include in the commit.
buffer_size: int, optional
The buffer size to use when performing I/O.
"""
if client is None:
try:
conf = configuration.Configuration(
host=os.environ["LAKECTL_SERVER_ENDPOINT_URL"],
username=os.environ["LAKECTL_CREDENTIALS_ACCESS_KEY_ID"],
password=os.environ["LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY"],
)
client = lfs_client.LakeFSClient(conf)
except KeyError as e:
raise ValueError(
"Missing lakectl credentials. Please set "
"LAKECTL_SERVER_ENDPOINT_URL, LAKECTL_CREDENTIALS_ACCESS_KEY_ID, "
"and LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY"
) from e

if mode == constants.READ_BINARY:
raw = _RawReader(client, repo, ref, key)
return io.BufferedReader(raw, buffer_size)
elif mode == constants.WRITE_BINARY:
raw_writer = _RawWriter(client, repo, ref, key, commit_message)
return io.BufferedWriter(raw_writer, buffer_size)
else:
raise NotImplementedError(f"Lakefs support for mode {mode} not implemented")


class _RawReader(io.RawIOBase):
"""Read a lakeFS object.

Provides low-level access to the underlying lakefs api.
High level primitives are implemented using io.BufferedReader.
"""

def __init__(
self,
client: lfs_client.LakeFSClient,
repo: str,
ref: str,
key: str,
):
self._client = client
self._repo = repo
self._ref = ref
self._path = key
self._position = 0
self.name = key

def seekable(self) -> bool:
return True

def readable(self) -> bool:
return True

@functools.cached_property
def content_length(self) -> int:
objects: apis.ObjectsApi = self._client.objects
obj_stats: models.ObjectStats = objects.stat_object(
self._repo, self._ref, self._path
)
return obj_stats.size_bytes

@property
def eof(self) -> bool:
return self._position == self.content_length

def seek(self, __offset: int, __whence: int = constants.WHENCE_START) -> int:
"""Seek to the specified position.

:param int offset: The byte offset.
:param int whence: Where the offset is from.

:returns: The position after seeking.
:rtype: int
"""
if __whence not in constants.WHENCE_CHOICES:
raise ValueError(
"invalid whence, expected one of %r" % constants.WHENCE_CHOICES
)

if __whence == constants.WHENCE_START:
start = max(0, __offset)
elif __whence == constants.WHENCE_CURRENT:
start = max(0, self._position + __offset)
elif __whence == constants.WHENCE_END:
start = max(0, self.content_length + __offset)

self._position = min(start, self.content_length)

return self._position

def readinto(self, __buffer) -> int | None:
"""Read bytes into a pre-allocated bytes-like object __buffer.

:param int size: number of bytes to read.

:returns: the number of bytes read from lakefs
:rtype: int
"""
if self._position >= self.content_length:
return 0
size = len(__buffer)
start_range = self._position
end_range = min(self.content_length, (start_range + size)) - 1
range = f"bytes={start_range}-{end_range}"
objects: apis.ObjectsApi = self._client.objects
data = objects.get_object(self._repo, self._ref, self._path, range=range).read()
if not data:
return 0
self._position += len(data)
__buffer[: len(data)] = data
return len(data)


class _RawWriter(io.RawIOBase):
"""Write a lakefs object.

Provides low-level access to the underlying lakefs api.
High level primitives are implemented using io.BufferedReader.
"""

def __init__(
self,
client: lfs_client.LakeFSClient,
repo: str,
ref: str,
key: str,
commit_message: str | None,
):
self._client = client
self._repo = repo
self._ref = ref
self._path = key
if commit_message:
self._message = commit_message
else:
self._message = f"Update {self._path}."
self.name = key

def writable(self) -> bool:
return True

def write(self, __b) -> int | None:
objects: apis.ObjectsApi = self._client.objects
commits: apis.CommitsApi = self._client.commits
stream = io.BytesIO(__b)
stream.name = self._path
object_stats = objects.upload_object(
self._repo, self._ref, self._path, content=stream
)
message = models.CommitCreation(self._message)
_ = commits.commit(self._repo, self._ref, message)
return object_stats.size_bytes
31 changes: 31 additions & 0 deletions smart_open/tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
version: '3'
services:
lakefs:
image: "treeverse/lakefs:0.89.0"
ports:
- "8000:8000"
depends_on:
- "postgres"
environment:
- LAKEFS_AUTH_ENCRYPT_SECRET_KEY=${LAKEFS_AUTH_ENCRYPT_SECRET_KEY:-some random secret string}
- LAKEFS_DATABASE_TYPE=${LAKEFS_DATABASE_TYPE:-postgres}
- LAKEFS_DATABASE_POSTGRES_CONNECTION_STRING=${LAKEFS_DATABASE_POSTGRES_CONNECTION_STRING:-postgres://lakefs:lakefs@postgres/postgres?sslmode=disable}
- LAKEFS_BLOCKSTORE_TYPE=${LAKEFS_BLOCKSTORE_TYPE:-local}
- LAKEFS_BLOCKSTORE_LOCAL_PATH=${LAKEFS_BLOCKSTORE_LOCAL_PATH:-/home/lakefs}
- LAKEFS_GATEWAYS_S3_DOMAIN_NAME=${LAKEFS_GATEWAYS_S3_DOMAIN_NAME:-s3.local.lakefs.io:8000}
- LAKEFS_BLOCKSTORE_S3_CREDENTIALS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-}
- LAKEFS_BLOCKSTORE_S3_CREDENTIALS_ACCESS_SECRET_KEY=${AWS_SECRET_ACCESS_KEY:-}
- LAKEFS_LOGGING_LEVEL=${LAKEFS_LOGGING_LEVEL:-INFO}
- LAKEFS_STATS_ENABLED
- LAKEFS_BLOCKSTORE_S3_ENDPOINT
- LAKEFS_BLOCKSTORE_S3_FORCE_PATH_STYLE
- LAKEFS_COMMITTED_LOCAL_CACHE_DIR=${LAKEFS_COMMITTED_LOCAL_CACHE_DIR:-/home/lakefs/.local_tier}
entrypoint: ["/app/wait-for", "postgres:5432", "--", "/app/lakefs", "run"]
postgres:
image: "postgres"
command: "-c log_min_messages=FATAL"
environment:
POSTGRES_USER: lakefs
POSTGRES_PASSWORD: lakefs
# logging:
# driver: none
Loading