Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions libraries/dagster-iceberg/kitchen-sink/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ RUN apt-get -qq update && \
# Optional env variables
ENV SPARK_HOME=${SPARK_HOME:-"/opt/spark"}
ENV HADOOP_HOME=${HADOOP_HOME:-"/opt/hadoop"}
ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH
ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.9-src.zip:$PYTHONPATH

RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} && mkdir -p /home/iceberg/spark-events
WORKDIR ${SPARK_HOME}

# Remember to also update `tests/conftest`'s spark setting
ENV SPARK_VERSION=3.5.5
ENV SPARK_VERSION=3.5.6
ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12
ENV ICEBERG_VERSION=1.8.0
ENV PYICEBERG_VERSION=0.8.1
ENV ICEBERG_VERSION=1.9.1
ENV PYICEBERG_VERSION=0.9.1

RUN curl --retry 5 -s -C - https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
&& tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \
Expand Down
2 changes: 1 addition & 1 deletion libraries/dagster-iceberg/kitchen-sink/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ services:
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
until (/usr/bin/mc alias set minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
Expand Down
4 changes: 2 additions & 2 deletions libraries/dagster-iceberg/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ polars = [
"polars>=1.0.0",
]
spark = [
"pyspark[connect]>=3.4.0",
"pyspark[connect]~=3.4",
]

[tool.ruff]
Expand Down Expand Up @@ -128,7 +128,7 @@ docs = [
dev = [
"dagit>=1.8.8",
"dagster>=1.10.5",
"dagster-polars>=0.26.1",
"dagster-polars[patito]>=0.26.1",
"dagster-pyspark>=0.26.1",
"docker",
"fsspec[http]>=2025.2.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from collections.abc import Mapping
from typing import Any, cast
from typing import TYPE_CHECKING, Any, cast

from dagster import (
InputContext,
Expand All @@ -19,6 +18,9 @@
generate_single_partition_dimension,
)

if TYPE_CHECKING:
from collections.abc import Mapping


class CustomDbIOManager(DbIOManager):
"""Works exactly like the DbIOManager, but overrides the _get_table_slice method
Expand All @@ -39,7 +41,7 @@ def _get_schema(
asset_key_path = context.asset_key.path
# schema order of precedence: metadata, I/O manager 'schema' config, key_prefix
if output_context_metadata.get("schema"):
schema = cast(str, output_context_metadata["schema"])
schema = cast("str", output_context_metadata["schema"])
elif self._schema:
schema = self._schema
elif len(asset_key_path) > 1:
Expand Down Expand Up @@ -75,13 +77,13 @@ def _get_table_slice(
partition_dimensions = generate_multi_partitions_dimension(
asset_partition_keys=context.asset_partition_keys,
asset_partitions_def=context.asset_partitions_def,
partition_expr=cast(Mapping[str, str], partition_expr),
partition_expr=cast("Mapping[str, str]", partition_expr),
asset_key=context.asset_key,
)
else:
partition_dimensions = [
generate_single_partition_dimension(
partition_expr=cast(str, partition_expr),
partition_expr=cast("str", partition_expr),
asset_partition_keys=context.asset_partition_keys,
asset_partitions_time_window=(
context.asset_partitions_time_window
Expand All @@ -96,7 +98,7 @@ def _get_table_slice(
else:
table = output_context.name
if output_context_metadata.get("schema"):
schema = cast(str, output_context_metadata["schema"])
schema = cast("str", output_context_metadata["schema"])
elif self._schema:
schema = self._schema
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def generate_multi_partitions_dimension(
"""
partition_dimensions: list[TablePartitionDimension] = []
multi_partition_key_mappings = [
cast(MultiPartitionKey, partition_key).keys_by_dimension
cast("MultiPartitionKey", partition_key).keys_by_dimension
for partition_key in asset_partition_keys
]
for part in asset_partitions_def.partitions_defs:
Expand All @@ -122,7 +122,7 @@ def generate_multi_partitions_dimension(
partitions_: TimeWindow | Sequence[str]
if all(isinstance(partition, TimeWindow) for partition in partitions):
checker = MultiTimePartitionsChecker(
partitions=cast(list[TimeWindow], partitions),
partitions=cast("list[TimeWindow]", partitions),
)
if not checker.is_consecutive():
raise ValueError("Dates are not consecutive.")
Expand All @@ -131,12 +131,12 @@ def generate_multi_partitions_dimension(
end=checker.end,
)
elif all(isinstance(partition, str) for partition in partitions):
partitions_ = list(set(cast(list[str], partitions)))
partitions_ = list(set(cast("list[str]", partitions)))
else:
raise ValueError("Unknown partition type")
partition_dimensions.append(
TablePartitionDimension(
partition_expr=cast(str, partition_expr_str),
partition_expr=cast("str", partition_expr_str),
partitions=partitions_,
),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _partition_filter(
) -> K:
return self._partition_filters_to_predicates(
partition_expr=table_partition.partition_expr,
partition_filters=cast(Sequence[str], table_partition.partitions),
partition_filters=cast("Sequence[str]", table_partition.partitions),
)

def _time_window_partition_filter(
Expand All @@ -81,7 +81,7 @@ def _time_window_partition_filter(
Returns:
List[E.BooleanExpression]: List of iceberg filters with start and end dates
"""
partition = cast(TimeWindow, table_partition.partitions)
partition = cast("TimeWindow", table_partition.partitions)
start_dt, end_dt = partition
if isinstance(start_dt, dt.datetime):
start_dt = start_dt.replace(tzinfo=None)
Expand Down Expand Up @@ -213,7 +213,7 @@ def update_table_partition_spec(
partition_spec_update_mode: str,
):
partition_dimensions = cast(
Sequence[TablePartitionDimension],
"Sequence[TablePartitionDimension]",
table_slice.partition_dimensions,
)
PyIcebergPartitionSpecUpdaterWithRetry(table=table).execute(
Expand Down Expand Up @@ -378,7 +378,7 @@ def updated_dagster_time_partition_field(self) -> str | None:
time_partition = next(iter(self.dagster_time_partitions), None)
updated_field_name: str | None = None
if time_partition is not None:
time_partition_partitions = cast(TimeWindow, time_partition.partitions)
time_partition_partitions = cast("TimeWindow", time_partition.partitions)
time_partition_transformation = diff_to_transformation(
time_partition_partitions.start,
time_partition_partitions.end,
Expand Down Expand Up @@ -489,21 +489,21 @@ def update_table_spec(self, table: Table):
match type_:
case "new":
partition_ = cast(
TablePartitionDimension,
"TablePartitionDimension",
partition,
)
self._spec_new(update=update, partition=partition_)
case "updated":
partition_ = cast(
TablePartitionDimension,
"TablePartitionDimension",
partition,
)
self._spec_update(
update=update,
partition=partition_,
)
case "deleted":
partition_ = cast(PartitionField, partition)
partition_ = cast("PartitionField", partition)
self._spec_delete(
update=update,
partition_name=partition_.name,
Expand Down
8 changes: 5 additions & 3 deletions libraries/dagster-iceberg/src/dagster_iceberg/handler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import abstractmethod
from typing import Generic, TypeVar, cast
from typing import TYPE_CHECKING, Generic, TypeVar, cast

import pyarrow as pa
from dagster import (
Expand All @@ -13,10 +13,12 @@
from dagster._core.storage.db_io_manager import DbTypeHandler, TableSlice
from pyiceberg import table as ibt
from pyiceberg.catalog import Catalog
from pyiceberg.table.snapshots import Snapshot

from dagster_iceberg._utils import preview, table_writer

if TYPE_CHECKING:
from pyiceberg.table.snapshots import Snapshot

U = TypeVar("U")

ArrowTypes = pa.Table | pa.RecordBatchReader
Expand Down Expand Up @@ -69,7 +71,7 @@ def handle_output(

table_ = connection.load_table(f"{table_slice.schema}.{table_slice.table}")

current_snapshot = cast(Snapshot, table_.current_snapshot())
current_snapshot = cast("Snapshot", table_.current_snapshot())

context.add_output_metadata(
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def get_select_statement(table_slice: TableSlice) -> str:
@contextmanager
def connect(context, table_slice: TableSlice) -> Iterator[Catalog]:
resource_config = cast(
_IcebergTableIOManagerResourceConfig,
"_IcebergTableIOManagerResourceConfig",
context.resource_config,
)
# Config passed as env variables or using config file.
Expand Down Expand Up @@ -236,7 +236,7 @@ def _partition_where_clause(


def _time_window_where_clause(table_partition: TablePartitionDimension) -> str:
partition = cast(TimeWindow, table_partition.partitions)
partition = cast("TimeWindow", table_partition.partitions)
start_dt, end_dt = partition
start_dt_str = start_dt.isoformat()
end_dt_str = end_dt.isoformat()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,15 @@ def connect(
context: OutputContext | InputContext,
table_slice: TableSlice,
) -> Iterator[SparkSession]:
builder = cast(SparkSession.Builder, SparkSession.builder)
builder = cast("SparkSession.Builder", SparkSession.builder)
if context.resource_config is not None:
if (spark_config := context.resource_config["spark_config"]) is not None:
builder.config(
map=cast(dict[str, "OptionalPrimitiveType"], spark_config)
map=cast("dict[str, OptionalPrimitiveType]", spark_config)
)

if (remote_url := context.resource_config["remote_url"]) is not None:
builder.remote(cast(str, remote_url))
builder.remote(cast("str", remote_url))

yield builder.getOrCreate()

Expand Down Expand Up @@ -218,7 +218,7 @@ def _partition_where_clause(


def _time_window_where_clause(table_partition: TablePartitionDimension) -> str:
partition = cast(TimeWindow, table_partition.partitions)
partition = cast("TimeWindow", table_partition.partitions)
start_dt, end_dt = partition
start_dt_str = start_dt.isoformat()
end_dt_str = end_dt.isoformat()
Expand Down
2 changes: 1 addition & 1 deletion libraries/dagster-iceberg/tests/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} && mkdir -p /home/iceberg/
WORKDIR ${SPARK_HOME}

# Remember to also update `tests/conftest`'s spark setting
ENV SPARK_VERSION=3.5.5
ENV SPARK_VERSION=3.5.6
ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12
ENV ICEBERG_VERSION=1.8.0
ENV PYICEBERG_VERSION=0.8.1
Expand Down
Loading