Skip to content

Commit 61fdf00

Browse files
authored
Adding PollingEvents (#2291)
* Adding PollingEvents draft * Change mocking mechanisms for trace poller * Updating code with PR suggestions and integration env fixes
1 parent 5a82464 commit 61fdf00

File tree

14 files changed

+221
-183
lines changed

14 files changed

+221
-183
lines changed

TEST_RUN_EVENTS.csv

Lines changed: 36 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,36 @@
1-
Test Run Events,,,
2-
,,,
3-
EVENT_SUFFIX,Description,,
4-
_INFO,Adding extra context for an specific task,,
5-
_SUCCESS,The task has finalized with no errors,,
6-
_ERROR,An execution problem was found,,
7-
_START,A new task section has begun,,
8-
,,,
9-
,,,
10-
,,,
11-
,,,
12-
,,,
13-
Stage ,Event Type,Description,Definition
14-
Trigger,CREATED_INFO,Trigger Run has been created,
15-
Trigger,RESOLVE_ERROR,Resolving trigger details failed,
16-
Trigger,RESOLVE_SUCCESS,Successful resolving of trigger details,
17-
Trigger,RESOLVE_START,Resolving trigger details based on environment variables,
18-
Trigger,EXECUTION_START,Initial trigger execution,
19-
Trigger,EXECUTION_SUCCESS,Successful trigger execution,
20-
Trigger,HTTP_UNREACHABLE_HOST_ERROR,Tracetest could not reach the defined host in the trigger,
21-
Trigger,DOCKER_COMPOSE_HOST_MISMATCH_ERROR,"We identified Tracetest is running inside a docker compose container, so if you are trying to access your local host machine please use the host.docker.internal hostname. For more information, see https://docs.docker.com/docker-for-mac/networking/#use-cases-and-workarounds",
22-
Trigger,GRPC_UNREACHABLE_HOST_ERROR,Tracetest could not reach the defined host in the trigger,
23-
Trace,FETCHING_START,Starting the trace fetching process,
24-
Trace,QUEUED_INFO,Trace Run has been queued to start the fetching process,
25-
Trace,DATA_STORE_CONNECTION_INFO,A Data store connection request has been executed,test connection result information
26-
Trace,POLLING_START,Starting the trace polling process,
27-
Trace,POLLING_ITERATION_INFO,A polling iteration has been executed,# of spans - iteration # - reason of next iteration
28-
Trace,POLLING_SUCCESS,The polling strategy has succeeded in fetching the trace from the Data Store,
29-
Trace,POLLING_ERROR,The polling strategy has failed to fetch the trace,
30-
Trace,FETCHING_SUCCESS,The trace was successfully processed by the backend,
31-
Trace,FETCHING_ERROR,The trace was not able to be fetched,
32-
Trace,STOPPED_INFO,The test run was stopped during its execution,
33-
Test,OUTPUT_GENERATION_WARNING,The value for output <output_name> could not be generated,
34-
Test,TEST_SPECS_RUN_SUCCESS,Test Specs were successfully executed,
35-
Test,TEST_SPECS_RUN_ERROR,Test specs execution error,
36-
Test,TEST_SPECS_RUN_START,Test specs execution start,
37-
Test,TEST_SPECS_ASSERTION_ERROR,An assertion in the test spec failed
1+
Test Run Events,,,
2+
,,,
3+
EVENT_SUFFIX,Description,,
4+
_INFO,Adding extra context for an specific task,,
5+
_SUCCESS,The task has finalized with no errors,,
6+
_ERROR,An execution problem was found,,
7+
_START,A new task section has begun,,
8+
,,,
9+
,,,
10+
,,,
11+
,,,
12+
,,,
13+
Stage ,Event Type,Description,Definition
14+
Trigger,CREATED_INFO,Trigger Run has been created,
15+
Trigger,RESOLVE_ERROR,Resolving trigger details failed,
16+
Trigger,RESOLVE_SUCCESS,Successful resolving of trigger details,
17+
Trigger,RESOLVE_START,Resolving trigger details based on environment variables,
18+
Trigger,EXECUTION_START,Initial trigger execution,
19+
Trigger,EXECUTION_SUCCESS,Successful trigger execution,
20+
Trigger,HTTP_UNREACHABLE_HOST_ERROR,Tracetest could not reach the defined host in the trigger,
21+
Trigger,DOCKER_COMPOSE_HOST_MISMATCH_ERROR,"We identified Tracetest is running inside a docker compose container, so if you are trying to access your local host machine please use the host.docker.internal hostname. For more information, see https://docs.docker.com/docker-for-mac/networking/#use-cases-and-workarounds",
22+
Trigger,GRPC_UNREACHABLE_HOST_ERROR,Tracetest could not reach the defined host in the trigger,
23+
Trace,DATA_STORE_CONNECTION_INFO,A Data store connection request has been executed,test connection result information
24+
Trace,POLLING_START,Starting the trace polling process,
25+
Trace,POLLING_ITERATION_INFO,A polling iteration has been executed,# of spans - iteration # - reason of next iteration
26+
Trace,POLLING_SUCCESS,The polling strategy has succeeded in fetching the trace from the Data Store,
27+
Trace,POLLING_ERROR,The polling strategy has failed to fetch the trace,
28+
Trace,FETCHING_START,Starting the trace fetching process,
29+
Trace,FETCHING_SUCCESS,The trace was successfully processed by the backend,
30+
Trace,FETCHING_ERROR,The trace was not able to be fetched,
31+
Trace,STOPPED_INFO,The test run was stopped during its execution,
32+
Test,OUTPUT_GENERATION_WARNING,The value for output <output_name> could not be generated,
33+
Test,TEST_SPECS_RUN_SUCCESS,Test Specs were successfully executed,
34+
Test,TEST_SPECS_RUN_ERROR,Test specs execution error,
35+
Test,TEST_SPECS_RUN_START,Test specs execution start,
36+
Test,TEST_SPECS_ASSERTION_ERROR,An assertion in the test spec failed

api/testEvents.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ components:
3939
type: string
4040
enum:
4141
- periodic
42-
reasonNextIteration:
43-
type: string
4442
isComplete:
4543
type: boolean
4644
periodic:

cli/openapi/model_polling_info.go

Lines changed: 3 additions & 39 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/app/facade.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ func newRunnerFacades(
5959
execTestUpdater,
6060
tracedb.Factory(testDB),
6161
testDB,
62+
eventEmitter,
6263
)
6364

6465
tracePoller := executor.NewTracePoller(
@@ -67,6 +68,7 @@ func newRunnerFacades(
6768
execTestUpdater,
6869
assertionRunner,
6970
subscriptionManager,
71+
eventEmitter,
7072
)
7173

7274
runner := executor.NewPersistentRunner(

server/executor/poller_executor.go

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ package executor
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"log"
78

89
"github.com/kubeshop/tracetest/server/model"
10+
"github.com/kubeshop/tracetest/server/model/events"
911
"github.com/kubeshop/tracetest/server/tracedb"
12+
"github.com/kubeshop/tracetest/server/tracedb/connection"
1013
"go.opentelemetry.io/otel/attribute"
1114
"go.opentelemetry.io/otel/trace"
1215
)
@@ -18,18 +21,19 @@ type DefaultPollerExecutor struct {
1821
updater RunUpdater
1922
newTraceDBFn traceDBFactoryFn
2023
dsRepo model.DataStoreRepository
24+
eventEmitter EventEmitter
2125
}
2226

2327
type InstrumentedPollerExecutor struct {
2428
tracer trace.Tracer
2529
pollerExecutor PollerExecutor
2630
}
2731

28-
func (pe InstrumentedPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, model.Run, error) {
32+
func (pe InstrumentedPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, string, model.Run, error) {
2933
_, span := pe.tracer.Start(request.ctx, "Fetch trace")
3034
defer span.End()
3135

32-
finished, run, err := pe.pollerExecutor.ExecuteRequest(request)
36+
finished, finishReason, run, err := pe.pollerExecutor.ExecuteRequest(request)
3337

3438
spanCount := 0
3539
if run.Trace != nil {
@@ -44,13 +48,17 @@ func (pe InstrumentedPollerExecutor) ExecuteRequest(request *PollingRequest) (bo
4448
attribute.Int("tracetest.run.trace_poller.amount_retrieved_spans", spanCount),
4549
}
4650

51+
if finishReason != "" {
52+
attrs = append(attrs, attribute.String("tracetest.run.trace_poller.finish_reason", finishReason))
53+
}
54+
4755
if err != nil {
4856
attrs = append(attrs, attribute.String("tracetest.run.trace_poller.error", err.Error()))
4957
span.RecordError(err)
5058
}
5159

5260
span.SetAttributes(attrs...)
53-
return finished, run, err
61+
return finished, finishReason, run, err
5462
}
5563

5664
func NewPollerExecutor(
@@ -59,13 +67,15 @@ func NewPollerExecutor(
5967
updater RunUpdater,
6068
newTraceDBFn traceDBFactoryFn,
6169
dsRepo model.DataStoreRepository,
70+
eventEmitter EventEmitter,
6271
) PollerExecutor {
6372

6473
pollerExecutor := &DefaultPollerExecutor{
6574
ppGetter: ppGetter,
6675
updater: updater,
6776
newTraceDBFn: newTraceDBFn,
6877
dsRepo: dsRepo,
78+
eventEmitter: eventEmitter,
6979
}
7080

7181
return &InstrumentedPollerExecutor{
@@ -88,21 +98,52 @@ func (pe DefaultPollerExecutor) traceDB(ctx context.Context) (tracedb.TraceDB, e
8898
return tdb, nil
8999
}
90100

91-
func (pe DefaultPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, model.Run, error) {
101+
func (pe DefaultPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, string, model.Run, error) {
92102
log.Printf("[PollerExecutor] Test %s Run %d: ExecuteRequest\n", request.test.ID, request.run.ID)
93103
run := request.run
94104

95105
traceDB, err := pe.traceDB(request.ctx)
96106
if err != nil {
97107
log.Printf("[PollerExecutor] Test %s Run %d: GetDataStore error: %s\n", request.test.ID, request.run.ID, err.Error())
98-
return false, model.Run{}, err
108+
return false, "", model.Run{}, err
109+
}
110+
111+
if request.IsFirstRequest() {
112+
connectionResult := traceDB.TestConnection(request.ctx)
113+
114+
err = pe.eventEmitter.Emit(request.ctx, events.TraceDataStoreConnectionInfo(request.test.ID, request.run.ID, connectionResult))
115+
if err != nil {
116+
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceDataStoreConnectionInfo event: error: %s\n", request.test.ID, request.run.ID, err.Error())
117+
}
118+
}
119+
120+
err = pe.eventEmitter.Emit(request.ctx, events.TraceFetchingStart(request.test.ID, request.run.ID))
121+
if err != nil {
122+
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceFetchingStart event: error: %s\n", request.test.ID, request.run.ID, err.Error())
99123
}
100124

101125
traceID := run.TraceID.String()
102126
trace, err := traceDB.GetTraceByID(request.ctx, traceID)
103127
if err != nil {
128+
connectionResult := model.ConnectionResult{}
129+
130+
if !errors.Is(err, connection.ErrTraceNotFound) {
131+
// run test connection to give a diagnostic when an unknown error happens
132+
connectionResult = traceDB.TestConnection(request.ctx)
133+
}
134+
135+
anotherErr := pe.eventEmitter.Emit(request.ctx, events.TraceFetchingError(request.test.ID, request.run.ID, connectionResult, err))
136+
if anotherErr != nil {
137+
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceFetchingError event: error: %s\n", request.test.ID, request.run.ID, anotherErr.Error())
138+
}
139+
104140
log.Printf("[PollerExecutor] Test %s Run %d: GetTraceByID (traceID %s) error: %s\n", request.test.ID, request.run.ID, traceID, err.Error())
105-
return false, model.Run{}, err
141+
return false, "", model.Run{}, err
142+
}
143+
144+
err = pe.eventEmitter.Emit(request.ctx, events.TraceFetchingSuccess(request.test.ID, request.run.ID))
145+
if err != nil {
146+
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceFetchingSuccess event: error: %s\n", request.test.ID, request.run.ID, err.Error())
106147
}
107148

108149
trace.ID = run.TraceID
@@ -113,7 +154,7 @@ func (pe DefaultPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, m
113154
log.Printf("[PollerExecutor] Test %s Run %d: Not done polling. (%s)\n", request.test.ID, request.run.ID, reason)
114155
run.Trace = &trace
115156
request.run = run
116-
return false, run, nil
157+
return false, "", run, nil
117158
}
118159

119160
log.Printf("[PollerExecutor] Test %s Run %d: Done polling. (%s)\n", request.test.ID, request.run.ID, reason)
@@ -138,10 +179,10 @@ func (pe DefaultPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, m
138179
err = pe.updater.Update(request.ctx, run)
139180
if err != nil {
140181
log.Printf("[PollerExecutor] Test %s Run %d: Update error: %s\n", request.test.ID, request.run.ID, err.Error())
141-
return false, model.Run{}, err
182+
return false, "", model.Run{}, err
142183
}
143184

144-
return true, run, nil
185+
return true, reason, run, nil
145186
}
146187

147188
func (pe DefaultPollerExecutor) donePollingTraces(job *PollingRequest, traceDB tracedb.TraceDB, trace model.Trace) (bool, string) {

server/executor/poller_executor_test.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ import (
1010
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
1111
"github.com/kubeshop/tracetest/server/id"
1212
"github.com/kubeshop/tracetest/server/model"
13+
"github.com/kubeshop/tracetest/server/subscription"
1314
"github.com/kubeshop/tracetest/server/testdb"
1415
"github.com/kubeshop/tracetest/server/tracedb"
1516
"github.com/kubeshop/tracetest/server/tracedb/connection"
1617
"github.com/kubeshop/tracetest/server/tracing"
18+
"github.com/stretchr/testify/mock"
1719
"github.com/stretchr/testify/require"
1820

1921
"go.opentelemetry.io/otel/trace"
@@ -455,15 +457,17 @@ func executeAndValidatePollingRequests(t *testing.T, pollerExecutor executor.Pol
455457
for i, value := range expectedValues {
456458
request := executor.NewPollingRequest(ctx, test, run, i)
457459

458-
finished, anotherRun, err := pollerExecutor.ExecuteRequest(request)
460+
finished, finishReason, anotherRun, err := pollerExecutor.ExecuteRequest(request)
459461
run = anotherRun // should store a run to use in another iteration
460462

461463
require.NotNilf(t, run, "The test run should not be nil on iteration %d", i)
462464

463465
if value.finished {
464466
require.Truef(t, finished, "The poller should have finished on iteration %d", i)
467+
require.NotEmptyf(t, finishReason, "The poller should not have finish reason on iteration %d", i)
465468
} else {
466469
require.Falsef(t, finished, "The poller should have not finished on iteration %d", i)
470+
require.Emptyf(t, finishReason, "The poller should have finish reason on iteration %d", i)
467471
}
468472

469473
if value.expectNoTraceError {
@@ -505,13 +509,15 @@ func getPollerExecutorWithMocks(t *testing.T, retryDelay, maxWaitTimeForTrace ti
505509
tracer := getTracerMock(t)
506510
testDB := getDataStoreRepositoryMock(t)
507511
traceDBFactory := getTraceDBMockFactory(t, tracePerIteration, &traceDBState{})
512+
eventEmitter := getEventEmitterMock(t, testDB)
508513

509514
return executor.NewPollerExecutor(
510515
defaultProfileGetter{retryDelay, maxWaitTimeForTrace},
511516
tracer,
512517
updater,
513518
traceDBFactory,
514519
testDB,
520+
eventEmitter,
515521
)
516522
}
517523

@@ -527,23 +533,22 @@ func getRunUpdaterMock(t *testing.T) executor.RunUpdater {
527533
}
528534

529535
// DataStoreRepository
530-
type dataStoreRepositoryMock struct {
531-
testdb.MockRepository
532-
// ...
533-
}
536+
func getDataStoreRepositoryMock(t *testing.T) model.Repository {
537+
t.Helper()
538+
539+
testDB := testdb.MockRepository{}
534540

535-
func (m dataStoreRepositoryMock) DefaultDataStore(_ context.Context) (model.DataStore, error) {
536-
return model.DataStore{}, nil
541+
testDB.Mock.On("DefaultDataStore", mock.Anything).Return(model.DataStore{Type: model.DataStoreTypeOTLP}, nil)
542+
testDB.Mock.On("CreateTestRunEvent", mock.Anything).Return(noError)
543+
544+
return &testDB
537545
}
538546

539-
func getDataStoreRepositoryMock(t *testing.T) model.Repository {
547+
// EventEmitter
548+
func getEventEmitterMock(t *testing.T, db model.Repository) executor.EventEmitter {
540549
t.Helper()
541550

542-
mock := new(dataStoreRepositoryMock)
543-
mock.T = t
544-
mock.Test(t)
545-
546-
return mock
551+
return executor.NewEventEmitter(db, subscription.NewManager())
547552
}
548553

549554
// Tracer

0 commit comments

Comments
 (0)