You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Use openlineage-airflow in airflow example and fix debugging step (#1635)
* Use openlineage-airflow in airflow example
Signed-off-by: wslulciuc <[email protected]>
* Fix alter column for step 5 in airflow example
Signed-off-by: wslulciuc <[email protected]>
Copy file name to clipboardExpand all lines: examples/airflow/README.md
+62-43Lines changed: 62 additions & 43 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -1,14 +1,14 @@
1
1
# [Airflow](https://airflow.apache.org) Example
2
2
3
-
In this example, we'll walk you through how to enable an **Airflow DAG** to send lineage metadata to **Marquez**. The example will help demonstrate some of the features of Marquez.
3
+
In this example, we'll walk you through how to enable Airflow DAGs to send lineage metadata to Marquez using [OpenLineage](https://openlineage.io/). The example will help demonstrate some of the features of Marquez.
4
4
5
5
### What you’ll learn:
6
6
7
-
* Enable Marquez in Airflow
8
-
* Write your very first Marquez enabled DAG
7
+
* Enable OpenLineage in Airflow
8
+
* Write your very first OpenLineage enabled DAG
9
9
* Troubleshoot a failing DAG using Marquez
10
10
11
-
##Prerequisites
11
+
# Prerequisites
12
12
13
13
Before you begin, make sure you have installed:
14
14
@@ -17,29 +17,28 @@ Before you begin, make sure you have installed:
17
17
18
18
> **Note:** We recommend that you have allocated at least **2 CPUs** and **8 GB** of memory to Docker.
19
19
20
-
##Step 1: Prepare the Environment
20
+
# Step 1: Setup
21
21
22
-
First, if you haven't already, clone the Marquez repository and enter the `examples/airflow` directory.
22
+
First, if you haven't already, clone the Marquez repository and change into the `examples/airflow` directory:
To make sure the latest [`marquez-airflow`](https://pypi.org/project/marquez-airflow) is downloaded when starting Airflow, you'll need to create a `requirements.txt` file with the following content:
29
+
To make sure the latest [`openlineage-airflow`](https://pypi.org/project/openlineage-airflow)library is downloaded and installed when starting Airflow, you'll need to create a `requirements.txt` file with the following content:
30
30
31
31
```
32
-
marquez-airflow
32
+
openlineage-airflow
33
33
```
34
34
35
-
Next, we'll need to specify where we want Airflow to send DAG metadata. To do so, create a config file named `marquez.env` with the following environment variables and values:
35
+
Next, we'll need to specify where we want Airflow to send DAG metadata. To do so, create a config file named `openlineage.env` with the following environment variables and values:
36
36
37
37
```bash
38
-
MARQUEZ_BACKEND=http # Collect metadata using HTTP backend
39
-
MARQUEZ_URL=http://marquez:5000 # The URL of the HTTP backend
40
-
MARQUEZ_NAMESPACE=example # The namespace associated with the collected metadata
38
+
OPENLINEAGE_URL=http://marquez:5000 # The URL of the HTTP backend
39
+
OPENLINEAGE_NAMESPACE=example # The namespace associated with the DAG collected metadata
41
40
```
42
-
> **Note:** The `marquez.env` config file will be used by the `airflow`, `airflow_scheduler`, and `airflow_worker` containers to send lineage metadata to Marquez.
41
+
> **Note:** The `openlineage.env` config file will be used by the `airflow`, `airflow_scheduler`, and `airflow_worker` containers to send lineage metadata to Marquez.
43
42
44
43
Your `examples/airflow/` directory should now contain the following:
45
44
@@ -49,36 +48,36 @@ Your `examples/airflow/` directory should now contain the following:
49
48
├── docker
50
49
├── docker-compose.yml
51
50
├── docs
52
-
├── marquez.env
51
+
├── openlineage.env
53
52
└── requirements.txt
54
53
55
54
```
56
55
57
-
##Step 2: Write Airflow DAGs using Marquez
56
+
# Step 2: Write Airflow DAGs using OpenLineage
58
57
59
-
In this step, we will create two new Airflow DAGs that perform simple tasks. The `counter` DAG will generate a random number every minute, while the `sum` DAG calculates a sum every five minutes. This will result in a simple pipeline containing two jobs and two datasets.
58
+
In this step, we'll create two new Airflow DAGs that perform simple tasks. The `counter` DAG will generate a random number every minute, while the `sum` DAG calculates a sum every five minutes. This will result in a simple pipeline containing two jobs and two datasets.
60
59
61
60
First, let's create the `dags/` folder where our example DAGs will be located:
62
61
63
62
```bash
64
63
$ mkdir dags
65
64
```
66
65
67
-
When writing our DAGs, we'll use [`marquez-airflow`](https://pypi.org/project/marquez-airflow), enabling Marquez to observe the DAG and automatically collect task-level metadata. Notice that the only change required to begin collecting DAG metadata is to use `marquez-airflow` instead of `airflow`:
66
+
When writing our DAGs, we'll use [`openlineage-airflow`](https://pypi.org/project/openlineage-airflow), enabling OpenLineage to observe the DAG and automatically collect task-level metadata. Notice that the only change required to begin collecting DAG metadata is to use `openlineage.airflow` instead of `airflow`:
68
67
69
68
```diff
70
69
- from airflow import DAG
71
-
+ from marquez_airflow import DAG
70
+
+ from openlineage.airflow import DAG
72
71
```
73
72
74
-
##Step 2.1: Create DAG `counter`
73
+
# Step 2.1: Create DAG `counter`
75
74
76
75
Under `dags/`, create a file named `counter.py` and add the following code:
77
76
78
77
```python
79
78
import random
80
79
81
-
frommarquez_airflowimportDAG
80
+
fromopenlineage.airflowimportDAG
82
81
from airflow.operators.postgres_operator import PostgresOperator
83
82
from airflow.utils.dates import days_ago
84
83
@@ -124,15 +123,14 @@ t2 = PostgresOperator(
124
123
)
125
124
126
125
t1 >> t2
127
-
128
126
```
129
127
130
-
##Step 2.2: Create DAG `sum`
128
+
# Step 2.2: Create DAG `sum`
131
129
132
130
Under `dags/`, create a file named `sum.py` and add the following code:
133
131
134
132
```python
135
-
frommarquez_airflowimportDAG
133
+
fromopenlineage.airflowimportDAG
136
134
from airflow.operators.postgres_operator import PostgresOperator
137
135
from airflow.utils.dates import days_ago
138
136
@@ -175,7 +173,6 @@ t2 = PostgresOperator(
175
173
)
176
174
177
175
t1 >> t2
178
-
179
176
```
180
177
181
178
At this point, you should have the following under your `examples/airflow/` directory:
@@ -189,13 +186,13 @@ At this point, you should have the following under your `examples/airflow/` dire
189
186
├── docker/
190
187
├── docker-compose.yml
191
188
├── docs/
192
-
├── marquez.env
189
+
├── openlineage.env
193
190
└── requirements.txt
194
191
```
195
192
196
-
##Step 3: Start Airflow with Marquez
193
+
# Step 3: Start Airflow with Marquez
197
194
198
-
Now that we have our DAGs defined and Marquez is enabled in Airflow, we can run the example! To start Airflow, run:
195
+
Now that we have our DAGs defined and OpenLineage is enabled in Airflow, we can run the example! To start Airflow, run:
199
196
200
197
```bash
201
198
$ docker-compose up
@@ -205,20 +202,19 @@ $ docker-compose up
205
202
206
203
**The above command will:**
207
204
208
-
* Start Airflow and install `marquez-airflow`
205
+
* Start Airflow and install `openlineage-airflow`
209
206
* Start Marquez
210
207
* Start Postgres
211
208
212
-
To view the Airflow UI and verify it's running, open http://localhost:8080. Then, login using the username and password: `airflow` / `airflow`. You can also browse to http://localhost:3000 to view the Marquez UI.
213
-
209
+
To view the Airflow UI and verify it's running, open [http://localhost:8080](http://localhost:8080). Then, login using the username and password: `airflow` / `airflow`. You can also browse to [http://localhost:3000](http://localhost:3000) to view the Marquez UI.
214
210
215
-
##Step 4: View Collected Metadata
211
+
# Step 4: View Collected Metadata
216
212
217
-
To ensure that Airflow is executing `counter` and `sum`, navigate to the DAGs tab in Airflow and verify that they are both enabled and have a timestamp in the Last Run column.
213
+
To ensure that Airflow is executing `counter` and `sum`, navigate to the DAGs tab in Airflow and verify that they are both enabled and are in a _running_ state:
218
214
219
215

220
216
221
-
To view DAG metadata collected by Marquez from Airflow, browse to the Marquez UI by visiting http://localhost:3000. Then, use the _search_ bar in the upper right-side of the page and search for the `counter.inc` job. To view lineage metadata for `counter.inc`, click on the job from the drop-down list:
217
+
To view DAG metadata collected by Marquez from Airflow, browse to the Marquez UI by visiting [http://localhost:3000](http://localhost:3000). Then, use the _search_ bar in the upper right-side of the page and search for the `counter.inc` job. To view lineage metadata for `counter.inc`, click on the job from the drop-down list:
222
218
223
219
> **Note:** If the `counter.inc` job is not in the drop-down list, check to see if Airflow has successfully executed the DAG.
224
220
@@ -228,7 +224,7 @@ If you take a quick look at the lineage graph for `counter.inc`, you should see
228
224
229
225

230
226
231
-
##Step 5: Troubleshoot a Failing DAG with Marquez
227
+
# Step 5: Troubleshoot a Failing DAG with Marquez
232
228
233
229
In this step, let's quickly walk through a simple troubleshooting scenario where DAG `sum` begins to fail as the result of an upstream schema change for table `counts`. So, let's get to it!
234
230
@@ -241,13 +237,36 @@ t1 = PostgresOperator(
241
237
- task_id='if_not_exists',
242
238
+ task_id='alter_name_of_column',
243
239
postgres_conn_id='example_db',
244
-
- sql='''
240
+
sql='''
245
241
- CREATE TABLE IF NOT EXISTS counts (
246
242
- value INTEGER
247
243
- );''',
248
-
+ sql='''
249
-
+ ALTER TABLE counts RENAME COLUMN value TO value_1_to_10;
250
-
+ ''',
244
+
+ DO $$
245
+
+ BEGIN
246
+
+ IF EXISTS(SELECT *
247
+
+ FROM information_schema.columns
248
+
+ WHERE table_name='counts' and column_name='value')
249
+
+ THEN
250
+
+ ALTER TABLE "counts" RENAME COLUMN "value" TO "value_1_to_10";
251
+
+ END IF;
252
+
+ END $$;
253
+
''',
254
+
dag=dag
255
+
)
256
+
```
257
+
258
+
```diff
259
+
t2 = PostgresOperator(
260
+
task_id='inc',
261
+
postgres_conn_id='example_db',
262
+
sql='''
263
+
- INSERT INTO counts (value)
264
+
+ INSERT INTO counts (value_1_to_10)
265
+
VALUES (%(value)s)
266
+
''',
267
+
parameters={
268
+
'value': random.randint(1, 10)
269
+
},
251
270
dag=dag
252
271
)
253
272
```
@@ -281,11 +300,11 @@ With the code change, the DAG `sum` begins to run successfully:
281
300
282
301
_Congrats_! You successfully step through a troubleshooting scenario of a failing DAG using metadata collected with Marquez! You can now add your own DAGs to `dags/` to build more complex data lineage graphs.
283
302
284
-
##Next Steps
303
+
# Next Steps
285
304
286
-
* Review the Marquez [HTTP API](https://marquezproject.github.io/marquez/openapi.html) used to collect Airflow DAG metadata and learn how to build your own integrations
287
-
* Take a look at our [`marquez-spark`](https://github.com/MarquezProject/marquez/tree/main/integrations/spark) integration that can be used with Airflow
305
+
* Review the Marquez [HTTP API](https://marquezproject.github.io/marquez/openapi.html) used to collect Airflow DAG metadata and learn how to build your own integrations using OpenLineage
306
+
* Take a look at [`openlineage-spark`](https://openlineage.io/integration/apache-spark) integration that can be used with Airflow
288
307
289
-
##Feedback
308
+
# Feedback
290
309
291
-
What did you think of this example? You can reach out to us on [slack](http://bit.ly/MarquezSlack) and leave us feedback, or [open a pull request](https://github.com/MarquezProject/marquez/blob/main/CONTRIBUTING.md#submitting-a-pull-request) with your suggestions!
310
+
What did you think of this example? You can reach out to us on [slack](http://bit.ly/MarquezSlack) and leave us feedback, or [open a pull request](https://github.com/MarquezProject/marquez/blob/main/CONTRIBUTING.md#submitting-a-pull-request) with your suggestions!
0 commit comments