Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 138 additions & 139 deletions pkg/jaeger/store/trace_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,35 @@ import (
)

const (
subqueryFormat = `
SELECT
trace_sub.trace_id, start_time_max - $%[1]d::interval as time_low, start_time_max + $%[1]d::interval as time_high
FROM (
SELECT
s.trace_id,
max(start_time) as start_time_max
FROM _ps_trace.span s
WHERE
%[2]s
GROUP BY s.trace_id
) as trace_sub
ORDER BY trace_sub.start_time_max DESC
`
subqueryTimeRangeForTraceID = `
SELECT
s.trace_id,
s.start_time - $1::interval as time_low,
s.start_time + $1::interval as time_high
FROM _ps_trace.span s
WHERE
s.trace_id = $2
LIMIT 1
`

// PostgreSQL badly overestimates the number of rows returned if the complete trace query
// uses an IN clause on trace_id, but gives good estimates for equality conditions. So, leverage an INNER
// JOIN LATERAL to provide an equality condition on the complete trace.
//
// A lateral join is used for the `link` and `event` table instead of using
// directly `array_agg` calls in the main SELECT clause to avoid returning
// duplicated values when the cartesian product of event x link is greater
Expand All @@ -31,109 +60,6 @@ const (
//
// - With GROUP BY https://explain.dalibo.com/plan/e6c74995bc36begd
// - Without GROUP BY https://explain.dalibo.com/plan/f09259cd21g57dh3
completeTraceSQLFormat = `
SELECT
s.trace_id,
s.span_id,
s.parent_span_id,
s.start_time start_times,
s.end_time end_times,
o.span_kind,
s.dropped_tags_count dropped_tags_counts,
s.dropped_events_count dropped_events_counts,
s.dropped_link_count dropped_link_counts,
s.status_code status_code_string,
s.status_message,
s.trace_state trace_states,
s_url.url schema_urls,
o.span_name span_names,
_ps_trace.tag_map_denormalize(s.resource_tags) resource_tags,
_ps_trace.tag_map_denormalize(s.span_tags) span_tags,
event.event_names,
event.event_times,
event.event_dropped_tags_count,
event.event_tags,
inst_lib.name library_name,
inst_lib.version library_version,
inst_lib_url.url library_schema_url,
links_linked_trace_ids,
links_linked_span_ids,
links_trace_states,
links_dropped_tags_count,
links_tags
FROM
_ps_trace.span s
INNER JOIN
_ps_trace.operation o ON (s.operation_id = o.id)
LEFT JOIN
_ps_trace.schema_url s_url ON s.resource_schema_url_id = s_url.id
LEFT JOIN
_ps_trace.instrumentation_lib inst_lib ON s.instrumentation_lib_id = inst_lib.id
LEFT JOIN
_ps_trace.schema_url inst_lib_url ON inst_lib_url.id = inst_lib.schema_url_id
LEFT JOIN LATERAL (
SELECT
array_agg(e.name ORDER BY e.event_nbr) event_names,
array_agg(e.time ORDER BY e.event_nbr) event_times,
array_agg(e.dropped_tags_count ORDER BY e.event_nbr) event_dropped_tags_count,
array_agg(_ps_trace.tag_map_denormalize(e.tags) ORDER BY e.event_nbr) event_tags
FROM _ps_trace.event as e
WHERE e.trace_id = s.trace_id AND e.span_id = s.span_id %[3]s
) as event ON (TRUE)
LEFT JOIN LATERAL (
SELECT
array_agg(lk.linked_trace_id ORDER BY lk.link_nbr) links_linked_trace_ids,
array_agg(lk.linked_span_id ORDER BY lk.link_nbr) links_linked_span_ids,
array_agg(lk.trace_state ORDER BY lk.link_nbr) links_trace_states,
array_agg(lk.dropped_tags_count ORDER BY lk.link_nbr) links_dropped_tags_count,
array_agg(_ps_trace.tag_map_denormalize(lk.tags) ORDER BY lk.link_nbr) links_tags
FROM _ps_trace.link as lk
WHERE lk.trace_id = s.trace_id AND lk.span_id = s.span_id %[4]s
) as link ON (TRUE)
WHERE
%[1]s %[2]s
GROUP BY
s.trace_id,
s.span_id,
s.parent_span_id,
s.start_time,
s.end_time,
s.resource_tags,
s.span_tags,
o.span_name,
o.span_kind,
s_url.url,
inst_lib.name,
inst_lib.version,
inst_lib_url.url,
event_names,
event_times,
event_dropped_tags_count,
event_tags,
links_linked_trace_ids,
links_linked_span_ids,
links_trace_states,
links_dropped_tags_count,
links_tags`

subqueryFormat = `
SELECT
trace_sub.trace_id, start_time_max - $%[1]d::interval as time_low, start_time_max + $%[1]d::interval as time_high
FROM (
SELECT
s.trace_id,
max(start_time) as start_time_max
FROM _ps_trace.span s
WHERE
%[2]s
GROUP BY s.trace_id
) as trace_sub
ORDER BY trace_sub.start_time_max DESC
`

/* PostgreSQL badly overestimates the number of rows returned if the complete trace query
uses an IN clause on trace_id, but gives good estimates for equality conditions. So, leverage an INNER
JOIN LATERAL to provide an equality condition on the complete trace. */
findTraceSQLFormat = `
WITH trace_ids AS (
%s
Expand All @@ -143,9 +69,94 @@ const (
FROM
trace_ids
INNER JOIN LATERAL (
%s
SELECT
s.trace_id,
s.span_id,
s.parent_span_id,
s.start_time start_times,
s.end_time end_times,
o.span_kind,
s.dropped_tags_count dropped_tags_counts,
s.dropped_events_count dropped_events_counts,
s.dropped_link_count dropped_link_counts,
s.status_code status_code_string,
s.status_message,
s.trace_state trace_states,
s_url.url schema_urls,
o.span_name span_names,
_ps_trace.tag_map_denormalize(s.resource_tags) resource_tags,
_ps_trace.tag_map_denormalize(s.span_tags) span_tags,
event.event_names,
event.event_times,
event.event_dropped_tags_count,
event.event_tags,
inst_lib.name library_name,
inst_lib.version library_version,
inst_lib_url.url library_schema_url,
links_linked_trace_ids,
links_linked_span_ids,
links_trace_states,
links_dropped_tags_count,
links_tags
FROM
_ps_trace.span s
INNER JOIN
_ps_trace.operation o ON (s.operation_id = o.id)
LEFT JOIN
_ps_trace.schema_url s_url ON s.resource_schema_url_id = s_url.id
LEFT JOIN
_ps_trace.instrumentation_lib inst_lib ON s.instrumentation_lib_id = inst_lib.id
LEFT JOIN
_ps_trace.schema_url inst_lib_url ON inst_lib_url.id = inst_lib.schema_url_id
LEFT JOIN LATERAL (
SELECT
array_agg(e.name ORDER BY e.event_nbr) event_names,
array_agg(e.time ORDER BY e.event_nbr) event_times,
array_agg(e.dropped_tags_count ORDER BY e.event_nbr) event_dropped_tags_count,
array_agg(_ps_trace.tag_map_denormalize(e.tags) ORDER BY e.event_nbr) event_tags
FROM _ps_trace.event as e
WHERE e.trace_id = s.trace_id AND e.span_id = s.span_id
AND e.time > trace_ids.time_low AND e.time < trace_ids.time_high
) as event ON (TRUE)
LEFT JOIN LATERAL (
SELECT
array_agg(lk.linked_trace_id ORDER BY lk.link_nbr) links_linked_trace_ids,
array_agg(lk.linked_span_id ORDER BY lk.link_nbr) links_linked_span_ids,
array_agg(lk.trace_state ORDER BY lk.link_nbr) links_trace_states,
array_agg(lk.dropped_tags_count ORDER BY lk.link_nbr) links_dropped_tags_count,
array_agg(_ps_trace.tag_map_denormalize(lk.tags) ORDER BY lk.link_nbr) links_tags
FROM _ps_trace.link as lk
WHERE lk.trace_id = s.trace_id AND lk.span_id = s.span_id
AND lk.span_start_time > trace_ids.time_low AND lk.span_start_time < trace_ids.time_high
) as link ON (TRUE)
WHERE
s.trace_id = trace_ids.trace_id
AND s.start_time > trace_ids.time_low AND s.start_time < trace_ids.time_high
GROUP BY
s.trace_id,
s.span_id,
s.parent_span_id,
s.start_time,
s.end_time,
s.resource_tags,
s.span_tags,
o.span_name,
o.span_kind,
s_url.url,
inst_lib.name,
inst_lib.version,
inst_lib_url.url,
event_names,
event_times,
event_dropped_tags_count,
event_tags,
links_linked_trace_ids,
links_linked_span_ids,
links_trace_states,
links_dropped_tags_count,
links_tags
) AS complete_trace ON (TRUE)
`
`

// Keys used to represent OTLP constructs from Jaeger tags which are then dropped from the tag map.
TagError = "error"
Expand All @@ -164,56 +175,39 @@ func NewBuilder(cfg *Config) *Builder {
return &Builder{cfg}
}

func (b *Builder) buildCompleteTraceQuery(traceIDClause string, timeLowRef string, timeHighRef string) string {
spanTimeClause := ""
eventTimeClause := ""
linkTimeClause := ""
if timeLowRef != "" {
spanTimeClause += " AND s.start_time > " + timeLowRef
eventTimeClause += " AND e.time > " + timeLowRef
linkTimeClause += " AND lk.span_start_time > " + timeLowRef
}
if timeHighRef != "" {
spanTimeClause += " AND s.start_time < " + timeHighRef
eventTimeClause += " AND e.time < " + timeHighRef
linkTimeClause += " AND lk.span_start_time < " + timeHighRef
}

return fmt.Sprintf(
completeTraceSQLFormat,
traceIDClause,
spanTimeClause,
eventTimeClause,
linkTimeClause,
)
}

func (b *Builder) findTracesQuery(q *spanstore.TraceQueryParameters, tInfo *tagsInfo) (string, []interface{}) {
subquery, params := b.BuildTraceIDSubquery(q, tInfo)
traceIDClause := "s.trace_id = trace_ids.trace_id"
completeTraceSQL := b.buildCompleteTraceQuery(traceIDClause, "trace_ids.time_low", "trace_ids.time_high")
return fmt.Sprintf(findTraceSQLFormat, subquery, completeTraceSQL), params
return fmt.Sprintf(findTraceSQLFormat, subquery), params
}

func (b *Builder) findTraceIDsQuery(q *spanstore.TraceQueryParameters, tInfo *tagsInfo) (string, []interface{}) {
return b.BuildTraceIDSubquery(q, tInfo)
}

func (b *Builder) getTraceQuery(traceID model.TraceID) (string, []interface{}, error) {
func getUUIDFromTraceID(traceID model.TraceID) (pgtype.UUID, error) {
var buf [16]byte
var uuid pgtype.UUID
n, err := traceID.MarshalTo(buf[:])
if n != 16 || err != nil {
return "", nil, fmt.Errorf("marshaling TraceID: %w", err)
return uuid, fmt.Errorf("marshaling TraceID: %w", err)
}

var uuid pgtype.UUID
if err := uuid.Set(buf); err != nil {
return "", nil, fmt.Errorf("setting TraceID: %w", err)
return uuid, fmt.Errorf("setting TraceID: %w", err)
}
params := []interface{}{uuid}
return uuid, nil
}

traceIDClause := "s.trace_id = $1"
return b.buildCompleteTraceQuery(traceIDClause, "", ""), params, nil
func (b *Builder) getTraceQuery(traceID model.TraceID) (string, []interface{}, error) {
traceUUID, err := getUUIDFromTraceID(traceID)
if err != nil {
return "", nil, fmt.Errorf("TraceID to UUID conversion: %w", err)
}

//it may seem silly to build a traceID subquery when we know the traceID
//but, this allows us to get the time range of the trace for the rest of the query.
subquery, params := b.BuildTraceTimeRangeSubqueryForTraceID(traceUUID)
return fmt.Sprintf(findTraceSQLFormat, subquery), params, nil
}

func (b *Builder) buildOperationSubquery(q *spanstore.TraceQueryParameters, tInfo *tagsInfo, params []interface{}) (string, []interface{}) {
Expand Down Expand Up @@ -288,6 +282,11 @@ func (b *Builder) buildTagClauses(q *spanstore.TraceQueryParameters, tInfo *tags

}

func (b *Builder) BuildTraceTimeRangeSubqueryForTraceID(traceID pgtype.UUID) (string, []interface{}) {
params := []interface{}{b.cfg.MaxTraceDuration, traceID}
return subqueryTimeRangeForTraceID, params
}

func (b *Builder) BuildTraceIDSubquery(q *spanstore.TraceQueryParameters, tInfo *tagsInfo) (string, []interface{}) {
clauses := make([]string, 0, 15)
params := tInfo.params
Expand Down