Skip to content

PR2 (nullability bug): adding new OH SparkCatalog which enables preserving non-nullable schemas #288

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,61 @@
import static org.junit.jupiter.api.Assertions.*;

import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
import java.util.Map;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;

public class CTASNonNullTest extends OpenHouseSparkITest {
@Test
public void testCTASPreservesNonNull() throws Exception {
try (SparkSession spark = getSparkSession()) {
String overrideCatalogName = "opensource_iceberg_spark_catalog";
try (SparkSession spark =
getSparkSession(
overrideCatalogName,
Map.of(
String.format("spark.sql.catalog.%s", "opensource_iceberg_spark_catalog"),
"org.apache.iceberg.spark.SparkCatalog"))) {
// Create source table with NOT NULL column
spark.sql(
"CREATE TABLE openhouse.ctasNonNull.test_table (id INT NOT NULL, name STRING NOT NULL, value DOUBLE NOT NULL)");
// Create target table using CTAS, OpenHouse catalog
spark.sql(
"CREATE TABLE openhouse.ctasNonNull.test_tableCtas USING iceberg AS SELECT * FROM openhouse.ctasNonNull.test_table");
// Create target table using CTAS, Iceberg catalog
spark.sql(
"CREATE TABLE opensource_iceberg_spark_catalog.ctasNonNull.test_tableCtasDefaultIceberg USING iceberg AS SELECT * FROM openhouse.ctasNonNull.test_table");

// Get schemas for both tables
StructType sourceSchema = spark.table("openhouse.ctasNonNull.test_table").schema();
StructType targetSchema = spark.table("openhouse.ctasNonNull.test_tableCtas").schema();
StructType targetSchemaGood = spark.table("openhouse.ctasNonNull.test_tableCtas").schema();
StructType targetSchemaBroken =
spark
.table("opensource_iceberg_spark_catalog.ctasNonNull.test_tableCtasDefaultIceberg")
.schema();

// Verify spark catalogs have correct classes configured
assertEquals(
"org.apache.iceberg.spark.SparkCatalog", spark.conf().get("spark.sql.catalog.openhouse"));
"com.linkedin.openhouse.spark.SparkCatalog",
spark.conf().get("spark.sql.catalog.openhouse"));
assertEquals(
"org.apache.iceberg.spark.SparkCatalog",
spark.conf().get("spark.sql.catalog.opensource_iceberg_spark_catalog"));

// Verify id column is preserved in good catalog, not preserved in bad catalog
assertFalse(sourceSchema.apply("id").nullable(), "Source table id column should be required");
assertTrue(
targetSchema.apply("id").nullable(),
"Target table id column required should not be preserved -- due to 1) the CTAS non-nullable preservation is off by default and 2) OS spark3.1 catalyst connector lack of support for non-null CTAS");
targetSchemaBroken.apply("id").nullable(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if both are nullable how is the SparkCatalog helping?

"Target table id column required should not be preserved -- due to 1) the CTAS non-nullable preservation is off by default");
assertTrue(
targetSchemaGood.apply("id").nullable(),
"Target table id column required should not be preserved -- due to 2) OS spark3.1 catalyst connector lack of support for non-null CTAS");

// Clean up
spark.sql("DROP TABLE openhouse.ctasNonNull.test_table");
spark.sql("DROP TABLE openhouse.ctasNonNull.test_tableCtas");
spark.sql(
"DROP TABLE opensource_iceberg_spark_catalog.ctasNonNull.test_tableCtasDefaultIceberg");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.linkedin.openhouse.spark;

public class SparkCatalog extends org.apache.iceberg.spark.SparkCatalog {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd strongly prefer we not introduce this layer (ie.SparkCatalog) in OSS codebase (if its a must, a better place would be li-wrapper)
Reason:

  • iceberg solves this in 1.7.x (https://iceberg.apache.org/docs/1.7.1/spark-configuration/#catalog-configuration) with a configuration. Ideally we upgrade to 1.7.x and this problem is auto-solved for us.
  • If we override this conf, iceberg behavior will be ignored
    • It's ideal we do not touch iceberg connector code (ie: SparkCatalog, FlinkCatalog etc), its crucial OHCatalog stays minimal with its interfacing (easy for us to upgrade iceberg/spark/other dependencies)
  • Adding this layer would allow other iceberg confs to be overridden which we shouldn't allow
    • I'm concerned of it getting misused and thereby our forks/behavior deviating from OSS

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't see this as a perfect option and appreciate the points here.

  1. we can't upgrade to 1.7.x because it has a dependency on higher versions of spark in spark clients
  2. what exactly will be ignored / changed? because we are simply extending and adding everything should be preserved
  3. this can codified among the contributors to not edit

public boolean useNullableQuerySchema() {
// Preserve DataFrame nullability when writing to OH tables
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,61 @@
import static org.junit.jupiter.api.Assertions.*;

import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
import java.util.Map;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;

public class CTASNonNullTestSpark3_5 extends OpenHouseSparkITest {
@Test
public void testCTASPreservesNonNull() throws Exception {
try (SparkSession spark = getSparkSession()) {
String overrideCatalogName = "opensource_iceberg_spark_catalog";
try (SparkSession spark =
getSparkSession(
overrideCatalogName,
Map.of(
String.format("spark.sql.catalog.%s", "opensource_iceberg_spark_catalog"),
"org.apache.iceberg.spark.SparkCatalog"))) {
// Create source table with NOT NULL column
spark.sql(
"CREATE TABLE openhouse.ctasNonNull.test_table (id INT NOT NULL, name STRING NOT NULL, value DOUBLE NOT NULL)");
// Create target table using CTAS, OpenHouse catalog
spark.sql(
"CREATE TABLE openhouse.ctasNonNull.test_tableCtas USING iceberg AS SELECT * FROM openhouse.ctasNonNull.test_table");
// Create target table using CTAS, Iceberg catalog
spark.sql(
"CREATE TABLE opensource_iceberg_spark_catalog.ctasNonNull.test_tableCtasDefaultIceberg USING iceberg AS SELECT * FROM openhouse.ctasNonNull.test_table");

// Get schemas for both tables
StructType sourceSchema = spark.table("openhouse.ctasNonNull.test_table").schema();
StructType targetSchema = spark.table("openhouse.ctasNonNull.test_tableCtas").schema();
StructType targetSchemaGood = spark.table("openhouse.ctasNonNull.test_tableCtas").schema();
StructType targetSchemaBroken =
spark
.table("opensource_iceberg_spark_catalog.ctasNonNull.test_tableCtasDefaultIceberg")
.schema();

// Verify spark catalogs have correct classes configured
assertEquals(
"org.apache.iceberg.spark.SparkCatalog", spark.conf().get("spark.sql.catalog.openhouse"));
"com.linkedin.openhouse.spark.SparkCatalog",
spark.conf().get("spark.sql.catalog.openhouse"));
assertEquals(
"org.apache.iceberg.spark.SparkCatalog",
spark.conf().get("spark.sql.catalog.opensource_iceberg_spark_catalog"));

// Verify id column is preserved in good catalog, not preserved in bad catalog
assertFalse(sourceSchema.apply("id").nullable(), "Source table id column should be required");
assertTrue(
targetSchema.apply("id").nullable(),
"Target table id column required should not be preserved -- due to 1) the CTAS non-nullable preservation is off by default and 2) OS spark3.1 catalyst connector lack of support for non-null CTAS");
targetSchemaBroken.apply("id").nullable(),
"Target table id column required should not be preserved -- due to 1) the CTAS non-nullable preservation is off by default");
assertFalse(
targetSchemaGood.apply("id").nullable(),
"Target table id column required should be preserved.");

// Clean up
spark.sql("DROP TABLE openhouse.ctasNonNull.test_table");
spark.sql("DROP TABLE openhouse.ctasNonNull.test_tableCtas");
spark.sql(
"DROP TABLE opensource_iceberg_spark_catalog.ctasNonNull.test_tableCtasDefaultIceberg");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static void configureCatalogs(
builder
.config(
String.format("spark.sql.catalog.%s", catalogName),
"org.apache.iceberg.spark.SparkCatalog")
"com.linkedin.openhouse.spark.SparkCatalog")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd need to change Docker code too, and all other references to this connector.

.config(
String.format("spark.sql.catalog.%s.catalog-impl", catalogName),
"com.linkedin.openhouse.spark.OpenHouseCatalog")
Expand Down
Loading