Skip to content

Commit f73f115

Browse files
authored
[FIX] Dataset query to get only the latest facet for each version (#2859)
* fix dataset query to get only the latest facet for each version Signed-off-by: sophiely <[email protected]> * small fix in query Signed-off-by: sophiely <[email protected]> --------- Signed-off-by: sophiely <[email protected]>
1 parent 801831c commit f73f115

File tree

1 file changed

+45
-24
lines changed

1 file changed

+45
-24
lines changed

api/src/main/java/marquez/db/DatasetDao.java

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -121,30 +121,51 @@ default void setFields(Dataset ds) {
121121

122122
@SqlQuery(
123123
"""
124-
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
125-
FROM datasets_view d
126-
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
127-
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
128-
LEFT JOIN (
129-
SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
130-
FROM tags AS t
131-
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
132-
GROUP BY m.dataset_uuid
133-
) t ON t.dataset_uuid = d.uuid
134-
LEFT JOIN (
135-
SELECT
136-
df.dataset_version_uuid,
137-
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
138-
FROM dataset_facets AS df
139-
WHERE df.facet IS NOT NULL AND
140-
(df.type ILIKE 'dataset' OR df.type ILIKE 'unknown' OR df.type ILIKE 'input') AND
141-
df.dataset_uuid IN (SELECT uuid FROM datasets_view WHERE namespace_name = :namespaceName ORDER BY name LIMIT :limit OFFSET :offset)
142-
GROUP BY df.dataset_version_uuid
143-
) f ON f.dataset_version_uuid = d.current_version_uuid
144-
WHERE d.namespace_name = :namespaceName
145-
ORDER BY d.name
146-
LIMIT :limit OFFSET :offset
147-
""")
124+
WITH facets_t AS
125+
(SELECT df.dataset_version_uuid,
126+
df.facet,
127+
df."name",
128+
df.created_at,
129+
rank() OVER (PARTITION BY df.dataset_version_uuid, "name"
130+
ORDER BY created_at DESC) AS r
131+
FROM dataset_facets AS df
132+
WHERE df.facet IS NOT NULL
133+
AND (df.type ILIKE 'dataset'
134+
OR df.type ILIKE 'unknown'
135+
OR df.type ILIKE 'input')
136+
AND df.dataset_uuid IN
137+
(SELECT UUID
138+
FROM datasets_view
139+
WHERE namespace_name = :namespaceName
140+
ORDER BY name
141+
LIMIT 10
142+
OFFSET :offset))
143+
SELECT d.*,
144+
dv.fields,
145+
dv.lifecycle_state,
146+
sv.schema_location,
147+
t.tags,
148+
facets
149+
FROM datasets_view d
150+
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
151+
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
152+
LEFT JOIN
153+
(SELECT ARRAY_AGG(t.name) AS tags,
154+
m.dataset_uuid
155+
FROM tags AS t
156+
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
157+
GROUP BY m.dataset_uuid) t ON t.dataset_uuid = d.uuid
158+
LEFT JOIN
159+
(SELECT df.dataset_version_uuid,
160+
JSONB_AGG(df.facet) AS facets
161+
FROM facets_t AS df
162+
WHERE r = 1
163+
GROUP BY df.dataset_version_uuid) f ON f.dataset_version_uuid = d.current_version_uuid
164+
WHERE d.namespace_name = :namespaceName
165+
ORDER BY d.name
166+
LIMIT :limit
167+
OFFSET :offset
168+
""")
148169
List<Dataset> findAll(String namespaceName, int limit, int offset);
149170

150171
@SqlQuery("SELECT count(*) FROM datasets_view")

0 commit comments

Comments
 (0)