Skip to content

Commit 8ad06e7

Browse files
committed
Add test for tables
1 parent 157031b commit 8ad06e7

File tree

1 file changed

+86
-1
lines changed
  • tests/integration/test_database_iceberg

1 file changed

+86
-1
lines changed

tests/integration/test_database_iceberg/test.py

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,21 @@
1414
import pytz
1515
from minio import Minio
1616
from pyiceberg.catalog import load_catalog
17-
from pyiceberg.partitioning import PartitionField, PartitionSpec
17+
from pyiceberg.partitioning import PartitionField, PartitionSpec, UNPARTITIONED_PARTITION_SPEC
1818
from pyiceberg.schema import Schema
1919
from pyiceberg.table.sorting import SortField, SortOrder
2020
from pyiceberg.transforms import DayTransform, IdentityTransform
2121
from pyiceberg.types import (
2222
DoubleType,
23+
LongType,
2324
FloatType,
2425
NestedField,
2526
StringType,
2627
StructType,
2728
TimestampType,
2829
TimestamptzType
2930
)
31+
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER
3032

3133
from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm
3234
from helpers.config_cluster import minio_secret_key, minio_access_key
@@ -485,3 +487,86 @@ def test_non_existing_tables(started_cluster):
485487
assert "DB::Exception: Table" in str(e)
486488
assert "doesn't exist" in str(e)
487489

490+
491+
def test_cluster_joins(started_cluster):
492+
node = started_cluster.instances["node1"]
493+
494+
test_ref = f"test_join_tables_{uuid.uuid4()}"
495+
table_name = f"{test_ref}_table"
496+
table_name_2 = f"{test_ref}_table_2"
497+
498+
root_namespace = f"{test_ref}_namespace"
499+
500+
catalog = load_catalog_impl(started_cluster)
501+
catalog.create_namespace(root_namespace)
502+
503+
schema = Schema(
504+
NestedField(
505+
field_id=1,
506+
name="tag",
507+
field_type=LongType(),
508+
required=False
509+
),
510+
NestedField(
511+
field_id=2,
512+
name="name",
513+
field_type=StringType(),
514+
required=False,
515+
),
516+
)
517+
table = create_table(catalog, root_namespace, table_name, schema,
518+
partition_spec=UNPARTITIONED_PARTITION_SPEC, sort_order=UNSORTED_SORT_ORDER)
519+
data = [{"tag": 1, "name": "John"}, {"tag": 2, "name": "Jack"}]
520+
df = pa.Table.from_pylist(data)
521+
table.append(df)
522+
523+
schema2 = Schema(
524+
NestedField(
525+
field_id=1,
526+
name="id",
527+
field_type=LongType(),
528+
required=False
529+
),
530+
NestedField(
531+
field_id=2,
532+
name="second_name",
533+
field_type=StringType(),
534+
required=False,
535+
),
536+
)
537+
table2 = create_table(catalog, root_namespace, table_name_2, schema2,
538+
partition_spec=UNPARTITIONED_PARTITION_SPEC, sort_order=UNSORTED_SORT_ORDER)
539+
data = [{"id": 1, "second_name": "Dow"}, {"id": 2, "second_name": "Sparrow"}]
540+
df = pa.Table.from_pylist(data)
541+
table2.append(df)
542+
543+
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
544+
545+
res = node.query(
546+
f"""
547+
SELECT t1.name,t2.second_name
548+
FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1
549+
JOIN {CATALOG_NAME}.`{root_namespace}.{table_name_2}` AS t2
550+
ON t1.tag=t2.id
551+
ORDER BY ALL
552+
SETTINGS object_storage_cluster_join_mode='local'
553+
"""
554+
)
555+
556+
assert res == "Jack\tSparrow\nJohn\tDow\n"
557+
558+
res = node.query(
559+
f"""
560+
SELECT name
561+
FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`
562+
WHERE tag in (
563+
SELECT id
564+
FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}`
565+
)
566+
ORDER BY ALL
567+
SETTINGS object_storage_cluster_join_mode='local'
568+
"""
569+
)
570+
571+
assert res == "Jack\nJohn\n"
572+

0 commit comments

Comments
 (0)