-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Description
Affected module
Issue affects the lineage ingestion framework, specifically the openmetadata-spark-agent.
Describe the bug
I setup s3 (minio) and openmetadata in my minikube. I am able to ingest metadata from the bucket into openmetadata.
However, transformations via spark do not create lineage between buckets in s3. I followed the documentation here: https://docs.open-metadata.org/latest/connectors/ingestion/lineage/spark-lineage (added the jar and setup the config). Running the spark application does create the pipeline service in openmetadata, however no lineage data is stored. I can see openlineage logs emitting lineage events, however openmetadata-spark-agent runs into log.debug("Failed to get id of table {} from OpenMetadata.", tableName)
, which causes it to never send the data to openmetadata.
To Reproduce
-
Setup Minikube with minio and openmetadata.
-
Prepare the spark image
FROM apache/spark:3.5.6-python3
ARG HADOOP_AWS_VERSION=3.3.4
ARG AWS_SDK_VERSION=1.12.262
RUN mkdir -p /opt/spark/jars && \
wget -qO /opt/spark/jars/hadoop-aws-${HADOOP_AWS_VERSION}.jar \
[https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_AWS_VERSION}/hadoop-aws-${HADOOP_AWS_VERSION}.jar](https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/$%7BHADOOP_AWS_VERSION%7D/hadoop-aws-$%7BHADOOP_AWS_VERSION%7D.jar) && \
wget -qO /opt/spark/jars/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar \
[https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar](https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/$%7BAWS_SDK_VERSION%7D/aws-java-sdk-bundle-$%7BAWS_SDK_VERSION%7D.jar)
ARG OPENMETADATA_SPARK_AGENT_VERSION=1.0
RUN wget -qO /opt/spark/jars/openmetadata-spark-agent.jar \
[https://github.com/open-metadata/openmetadata-spark-agent/releases/download/${OPENMETADATA_SPARK_AGENT_VERSION}/openmetadata-spark-agent.jar](https://github.com/open-metadata/openmetadata-spark-agent/releases/download/$%7BOPENMETADATA_SPARK_AGENT_VERSION%7D/openmetadata-spark-agent.jar)
USER 185
- Create configuration
spark.hadoop.fs.s3a.endpoint http://minio.local:80
spark.hadoop.fs.s3a.path.style.access true
spark.hadoop.fs.s3a.connection.ssl.enabled false
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.aws.credentials.provider org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
# override all of S3A's built-in "60s" string defaults with pure longs in seconds
spark.hadoop.fs.s3a.connection.timeout 60
spark.hadoop.fs.s3a.connection.establish.timeout 60
spark.hadoop.fs.s3a.threads.keepalivetime 60
spark.hadoop.fs.obs.multipart.purge.age 86400
spark.hadoop.fs.s3a.multipart.purge.age 86400
spark.hadoop.fs.s3a.access.key minio
spark.hadoop.fs.s3a.secret.key minio123
spark.driver.extraJavaOptions -Dlog4j.configuration=file:/opt/spark/conf/log4j.properties
spark.executor.extraJavaOptions -Dlog4j.configuration=file:/opt/spark/conf/log4j.properties
spark.extraListeners io.openlineage.spark.agent.OpenLineageSparkListener
# OpenMetadata transport settings
spark.openmetadata.transport.hostPort http://open-metadata.local/
spark.openmetadata.transport.type openmetadata
spark.openmetadata.transport.pipelineServiceName my_pipeline_service2
spark.openmetadata.transport.pipelineName my_pipeline_name2
spark.openmetadata.transport.pipelineDescription My ETL Pipeline
# spark.openmetadata.transport.databaseServiceNames minio-source,minio-target
spark.openmetadata.transport.timeout 30
spark.openmetadata.transport.jwtToken eyJraWQi...
- Build and run the spark image
docker build -t custom-spark-py:0.1 .
docker run --rm -it \
--network host \
-v "$(pwd)/spark-defaults.conf":/opt/spark/conf/spark-defaults.conf:ro \
-v "$(pwd)/log4j.properties":/opt/spark/conf/log4j.properties:ro \
-e AWS_ACCESS_KEY_ID=minio \
-e AWS_SECRET_ACCESS_KEY=minio123 \
custom-spark-py:0.1 \
/opt/spark/bin/pyspark --master 'local[*]'
- Create buckets/insert testdata
- Run spark code
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, trim, col
spark = (
SparkSession.builder
.master("local")
.appName("CSV Processing with OpenMetadata")
.getOrCreate()
)
df = spark.read.option("header", "true").csv("s3a://my-minio-bucket/test/helloworld.csv")
for c in df.columns:
df = df.withColumn(c, trim(col(c)))
df = df.withColumn("processed_date", current_date())
df.write.mode("overwrite").csv("s3a://my-other-bucket/silver", header=True)
spark.stop()
Expected behavior
I expect lineage to be ingested.
Version:
Openmetadata helm chart 1.8.3
openmetadata-spark-aget 1.0
Additional context
In my ingress i can see requests aginst the table_search_index
in opensearch. The code in openmetadata-spark-agent
suggests it should query against the container_search_index
. If i query the container_search_index
myself i get results, if i query the table_search_index
myself i get no results.