Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# cloudpathlib Changelog

## Unreleased

- Added streaming I/O support for S3, Azure Blob Storage, Google Cloud Storage, and HTTP/HTTPS via `FileCacheMode.streaming`. (Issue [#XXX](https://github.com/drivendataorg/cloudpathlib/issues/XXX), PR [#XXX](https://github.com/drivendataorg/cloudpathlib/pull/XXX))
- Added `FileCacheMode.streaming` enum value to enable direct streaming I/O without local caching.
- Added `CloudBufferedIO` class implementing `io.BufferedIOBase` for binary streaming operations.
- Added `CloudTextIO` class implementing `io.TextIOBase` for text streaming operations.
- Added provider-specific raw I/O implementations: `_S3StorageRaw`, `_AzureBlobStorageRaw`, `_GSStorageRaw`, `_HttpStorageRaw`.
- Added `register_raw_io_class` decorator for registering streaming I/O implementations.
- Added `buffer_size` parameter to `CloudPath.open()` for controlling streaming buffer size.
- Added abstract methods to `Client` for streaming operations: `_range_download`, `_get_content_length`, `_initiate_multipart_upload`, `_upload_part`, `_complete_multipart_upload`, `_abort_multipart_upload`.
- Updated `CloudPath.__fspath__()` to raise `CloudPathNotImplementedError` when `file_cache_mode` is set to `streaming`, as streaming mode does not create cached files on disk.
- Updated documentation with streaming I/O guide and examples.
- Updated caching documentation to include streaming mode.

## v0.23.0 (2025-10-07)

- Added support for Python 3.14 (Issue [#529](https://github.com/drivendataorg/cloudpathlib/issues/529), PR [#530](https://github.com/drivendataorg/cloudpathlib/pull/530))
Expand Down
3 changes: 3 additions & 0 deletions cloudpathlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .anypath import AnyPath
from .azure.azblobclient import AzureBlobClient
from .azure.azblobpath import AzureBlobPath
from .cloud_io import CloudBufferedIO, CloudTextIO
from .cloudpath import CloudPath, implementation_registry
from .patches import patch_open, patch_os_functions, patch_glob, patch_all_builtins
from .gs.gsclient import GSClient
Expand All @@ -27,7 +28,9 @@
"AnyPath",
"AzureBlobClient",
"AzureBlobPath",
"CloudBufferedIO",
"CloudPath",
"CloudTextIO",
"implementation_registry",
"GSClient",
"GSPath",
Expand Down
1 change: 1 addition & 0 deletions cloudpathlib/azure/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .azblobclient import AzureBlobClient
from .azblobpath import AzureBlobPath
from .azure_io import _AzureBlobStorageRaw # noqa: F401 - imported for registration

__all__ = [
"AzureBlobClient",
Expand Down
81 changes: 81 additions & 0 deletions cloudpathlib/azure/azblobclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,87 @@ def _generate_presigned_url(
url = f"{self._get_public_url(cloud_path)}?{sas_token}"
return url

# ====================== STREAMING I/O METHODS ======================

def _range_download(self, cloud_path: AzureBlobPath, start: int, end: int) -> bytes:
"""Download a byte range from Azure Blob Storage."""
blob_client = self.service_client.get_blob_client(
container=cloud_path.container, blob=cloud_path.blob
)
try:
length = end - start + 1
downloader = blob_client.download_blob(offset=start, length=length)
return downloader.readall()
except Exception as e:
error_str = str(e)
if "ResourceNotFound" in error_str or "BlobNotFound" in error_str:
raise FileNotFoundError(f"Azure blob not found: {cloud_path}")
elif "InvalidRange" in error_str or "out of range" in error_str.lower():
return b""
elif hasattr(e, "error_code"):
if e.error_code in ("ResourceNotFound", "BlobNotFound"):
raise FileNotFoundError(f"Azure blob not found: {cloud_path}")
elif e.error_code == "InvalidRange":
return b""
raise

def _get_content_length(self, cloud_path: AzureBlobPath) -> int:
"""Get the size of an Azure blob."""
blob_client = self.service_client.get_blob_client(
container=cloud_path.container, blob=cloud_path.blob
)
try:
properties = blob_client.get_blob_properties()
return properties.size
except Exception as e:
error_str = str(e)
if "ResourceNotFound" in error_str or "BlobNotFound" in error_str:
raise FileNotFoundError(f"Azure blob not found: {cloud_path}")
elif hasattr(e, "error_code") and e.error_code in (
"ResourceNotFound",
"BlobNotFound",
):
raise FileNotFoundError(f"Azure blob not found: {cloud_path}")
raise

def _initiate_multipart_upload(self, cloud_path: AzureBlobPath) -> str:
"""Start an Azure block blob upload.

Azure doesn't need explicit initialization, return empty string.
"""
return ""

def _upload_part(
self, cloud_path: AzureBlobPath, upload_id: str, part_number: int, data: bytes
) -> dict:
"""Upload a block in an Azure block blob upload."""
import base64

blob_client = self.service_client.get_blob_client(
container=cloud_path.container, blob=cloud_path.blob
)
# Azure uses base64-encoded block IDs
block_id = base64.b64encode(f"block-{part_number:06d}".encode()).decode()
blob_client.stage_block(block_id=block_id, data=data, length=len(data))
return {"block_id": block_id}

def _complete_multipart_upload(
self, cloud_path: AzureBlobPath, upload_id: str, parts: list
) -> None:
"""Complete an Azure block blob upload."""
blob_client = self.service_client.get_blob_client(
container=cloud_path.container, blob=cloud_path.blob
)
block_ids = [part["block_id"] for part in parts]
blob_client.commit_block_list(block_ids)

def _abort_multipart_upload(self, cloud_path: AzureBlobPath, upload_id: str) -> None:
"""Abort an Azure block blob upload.

Azure blocks are automatically cleaned up, nothing to do.
"""
pass


def _hns_rmtree(data_lake_client, container, directory):
"""Stateless implementation so can be used in test suite cleanup as well.
Expand Down
122 changes: 122 additions & 0 deletions cloudpathlib/azure/azure_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""
Azure Blob Storage-specific streaming I/O implementations.

Provides efficient streaming I/O for Azure using range requests and block uploads.
"""

from typing import Optional, Dict, Any

from ..cloud_io import _CloudStorageRaw
from ..cloudpath import register_raw_io_class
from ..enums import FileCacheMode


@register_raw_io_class("azure")
class _AzureBlobStorageRaw(_CloudStorageRaw):
"""
Azure Blob Storage-specific raw I/O adapter.

Implements efficient range-based reads and block blob uploads for Azure.
"""

def __init__(self, client, cloud_path, mode: str = "rb"):
"""
Initialize Azure raw adapter.

Args:
client: AzureBlobClient instance
cloud_path: AzureBlobPath instance
mode: File mode
"""
super().__init__(client, cloud_path, mode)

# For block blob uploads
self._upload_id: str = "" # Azure doesn't use upload IDs
self._parts: list = []

def _range_get(self, start: int, end: int) -> bytes:
"""
Fetch a byte range from Azure Blob Storage.

Args:
start: Start byte position (inclusive)
end: End byte position (inclusive)

Returns:
Bytes in the requested range
"""
return self._client._range_download(self._cloud_path, start, end)

def _get_size(self) -> int:
"""
Get the total size of the Azure blob.

Returns:
Size in bytes
"""
return self._client._get_content_length(self._cloud_path)

def _is_eof_error(self, error: Exception) -> bool:
"""Check if error indicates EOF/out of range."""
error_str = str(error)
if "InvalidRange" in error_str or "out of range" in error_str.lower():
return True
if hasattr(error, "error_code") and error.error_code == "InvalidRange":
return True
return False

# ---- Write support (block blob upload) ----

def _upload_chunk(self, data: bytes, upload_state: Optional[Dict[str, Any]] = None) -> None:
"""
Upload a chunk of data as a block.

Args:
data: Bytes to upload
upload_state: Upload state dictionary
"""
if not data:
return

# Initialize upload if needed (Azure doesn't need explicit init)
if not self._upload_id:
self._upload_id = self._client._initiate_multipart_upload(self._cloud_path)

# Upload block using client method (part_number is 1-indexed)
part_number = len(self._parts) + 1
part_info = self._client._upload_part(self._cloud_path, self._upload_id, part_number, data)
self._parts.append(part_info)

def _finalize_upload(self, upload_state: Optional[Dict[str, Any]] = None) -> None:
"""
Finalize Azure block blob upload.

Args:
upload_state: Upload state dictionary
"""
if not self._parts:
# No blocks uploaded - create empty file
# Temporarily disable streaming mode to avoid recursion
original_mode = self._client.file_cache_mode
try:
self._client.file_cache_mode = FileCacheMode.tmp_dir
self._cloud_path.write_bytes(b"")
finally:
self._client.file_cache_mode = original_mode
return

try:
# Complete upload using client method
self._client._complete_multipart_upload(self._cloud_path, self._upload_id, self._parts)
except Exception as e:
# Note: Azure will auto-expire uncommitted blocks after 7 days
raise e
finally:
self._upload_id = ""
self._parts = []

def close(self) -> None:
"""Close and clean up."""
# Note: The base class close() will call _finalize_upload which handles completion
# Azure auto-expires uncommitted blocks after 7 days, so no need for explicit cleanup
super().close()
87 changes: 87 additions & 0 deletions cloudpathlib/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,90 @@ def _generate_presigned_url(
self, cloud_path: BoundedCloudPath, expire_seconds: int = 60 * 60
) -> str:
pass

# ====================== STREAMING I/O METHODS ======================
# Methods to support efficient streaming without local caching

@abc.abstractmethod
def _range_download(self, cloud_path: BoundedCloudPath, start: int, end: int) -> bytes:
"""Download a byte range from cloud storage.

Args:
cloud_path: Path to download from
start: Start byte position (inclusive)
end: End byte position (inclusive)

Returns:
Bytes in the requested range

Raises:
FileNotFoundError: If object doesn't exist
"""
pass

@abc.abstractmethod
def _get_content_length(self, cloud_path: BoundedCloudPath) -> int:
"""Get the size of an object without downloading it.

Args:
cloud_path: Path to query

Returns:
Size in bytes

Raises:
FileNotFoundError: If object doesn't exist
"""
pass

@abc.abstractmethod
def _initiate_multipart_upload(self, cloud_path: BoundedCloudPath) -> str:
"""Start a multipart/chunked upload session.

Args:
cloud_path: Destination path

Returns:
Upload session ID/handle (provider-specific, may be empty string)
"""
pass

@abc.abstractmethod
def _upload_part(
self, cloud_path: BoundedCloudPath, upload_id: str, part_number: int, data: bytes
) -> dict:
"""Upload a single part/chunk in a multipart upload.

Args:
cloud_path: Destination path
upload_id: Upload session ID from _initiate_multipart_upload
part_number: Sequential part number (1-indexed)
data: Bytes to upload

Returns:
Provider-specific metadata needed for finalization
"""
pass

@abc.abstractmethod
def _complete_multipart_upload(
self, cloud_path: BoundedCloudPath, upload_id: str, parts: list
) -> None:
"""Finalize a multipart upload.

Args:
cloud_path: Destination path
upload_id: Upload session ID
parts: List of part metadata from _upload_part calls
"""
pass

@abc.abstractmethod
def _abort_multipart_upload(self, cloud_path: BoundedCloudPath, upload_id: str) -> None:
"""Cancel a multipart upload and clean up.

Args:
cloud_path: Destination path
upload_id: Upload session ID
"""
pass
Loading
Loading