Skip to content

Performance Issue with rewrite_data_files Spark procedure #14679

@VinhVu95

Description

@VinhVu95

Apache Iceberg version

None

Query engine

Spark

Please describe the bug 🐞

Setup

I am running rewrite_data_files procedure with following setup:

  • Spark 3.5.3 on EMR 7.7.0 - 30 executor nodes of m5.8xlarge instance type
  • Procedure has mostly default setting with target-file-size-bytes=1G & max-file-group-size-bytes=10GB
  • Spark configuration:
    spark.dynamicAllocation.enabled=true
    spark.driver.memory=96g # a smaller number caused OOM on driver 
    spark.executor.memory=96g
    spark.executor.cores=4 # default setting in EMR
    
    conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
    conf.set("spark.sql.catalog.glue_catalog.warehouse", f"s3://{ICEBERG_S3_LOCATION}/{ICEBERG_DATABASE}/")
    conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    
  • Iceberg table with > 100K data files (size ~ 800gb, total records ~ 4b rows, no partition)

Problem

At first I was encountering the following exception on executor task:

Caused by: software.amazon.awssdk.thirdparty.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
	at software.amazon.awssdk.thirdparty.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:316)
	at software.amazon.awssdk.thirdparty.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:282)
	at software.amazon.awssdk.http.apache.internal.conn.ClientConnectionRequestFactory$DelegatingConnectionRequest.get(ClientConnectionRequestFactory.java:92)
	at software.amazon.awssdk.http.apache.internal.conn.ClientConnectionRequestFactory$InstrumentedConnectionRequest.get(ClientConnectionRequestFactory.java:69)
	at software.amazon.awssdk.thirdparty.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190)
	at software.amazon.awssdk.thirdparty.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
	at software.amazon.awssdk.thirdparty.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
	at software.amazon.awssdk.thirdparty.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
	at software.amazon.awssdk.thirdparty.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
	at software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72)
	at software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254)
	at software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104)
	at software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231)
	at software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228)
	at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:102)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:79)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:57)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:40)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:74)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:43)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:79)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:41)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.executeRequest(RetryableStage2.java:93)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.execute(RetryableStage2.java:56)

After that I followed the recommendation in https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/performance.html#Thread_and_connection_pool_settings "* Heavy load. Increase pool size and acquisition timeout fs.s3a.connection.acquisition.timeout" & specify apache http client configuration as instructed in https://iceberg.apache.org/docs/latest/aws/#apache-http-client-configurations for AWS integration and S3FileIO connector

conf.set("spark.sql.catalog.glue_catalog.http-client.apache.max-connections", "1000")
conf.set("spark.sql.catalog.glue_catalog.http-client.apache.connection-timeout-ms", "60000")
conf.set("spark.sql.catalog.glue_catalog.http-client.apache.socket-timeout-ms", "120000")
conf.set("spark.sql.catalog.glue_catalog.http-client.apache.connection-acquisition-timeout-ms", "60000")

All Spark tasks then still failed with container exit on 137 status code (OOM) - even when increase executor memory up to 96g. So I thought this probably means there are many open s3 connection at the same time, with each job tries to rewrite 1500-3000 files and max-concurrent-file-group-rewrites default to 5 means there are 5 ongoing jobs. The next step I tried to change the compaction option and connection pool size to reduce concurrent s3 connection:

COMPACTION_OPTIONS = {
    'target-file-size-bytes': '536870912',        # 512MB, reduce by half 
    'max-file-group-size-bytes': '5368709120',     # 5GB, reduce by half
    'partial-progress.max-commits': '10',
    'partial-progress.enabled': 'true',
    'min-input-files': '5',
    'max-concurrent-file-group-rewrites': '1'      # Reduce from default 5 to 1
}

# set to even smaller number
conf.set("spark.sql.catalog.glue_catalog.http-client.apache.max-connections", "100")

There were then few successful tasks but mostly failed - all Spark jobs despite scheduled sequentially (each rewrite ~800 files) were eventually unable to complete.
So I would like to understand what cause the memory pressure on the executor (how to avoid) and how to run compaction efficiently on a large table like this ?

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions