Skip to content

Commit 6e64d9c

Browse files
authored
chore(server): Updating data store test to use queues (#3166)
* chore(server): Updating data store test to use queues * chore(server): Updating data store test to use queues * fix: build * Adding postgres pipelines * fix: agent unit tests * using testconnection job instead of executor job * cleanup * PR comments
1 parent 23aff08 commit 6e64d9c

20 files changed

+2339
-439
lines changed

agent/client/client.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ type Client struct {
2626
sessionConfig *SessionConfig
2727
done chan bool
2828

29-
triggerListener func(context.Context, *proto.TriggerRequest) error
30-
pollListener func(context.Context, *proto.PollingRequest) error
31-
shutdownListener func(context.Context, *proto.ShutdownRequest) error
29+
triggerListener func(context.Context, *proto.TriggerRequest) error
30+
pollListener func(context.Context, *proto.PollingRequest) error
31+
shutdownListener func(context.Context, *proto.ShutdownRequest) error
32+
dataStoreConnectionListener func(context.Context, *proto.DataStoreConnectionTestRequest) error
3233
}
3334

3435
func (c *Client) Start(ctx context.Context) error {
@@ -61,6 +62,11 @@ func (c *Client) Start(ctx context.Context) error {
6162
return err
6263
}
6364

65+
err = c.startDataStoreConnectionTestListener(ctx)
66+
if err != nil {
67+
return err
68+
}
69+
6470
c.startHearthBeat(ctx)
6571

6672
return nil
@@ -88,6 +94,10 @@ func (c *Client) OnTriggerRequest(listener func(context.Context, *proto.TriggerR
8894
c.triggerListener = listener
8995
}
9096

97+
func (c *Client) OnDataStoreTestConnectionRequest(listener func(context.Context, *proto.DataStoreConnectionTestRequest) error) {
98+
c.dataStoreConnectionListener = listener
99+
}
100+
91101
func (c *Client) OnPollingRequest(listener func(context.Context, *proto.PollingRequest) error) {
92102
c.pollListener = listener
93103
}

agent/client/mocks/grpc_server.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,22 @@ import (
1313

1414
type GrpcServerMock struct {
1515
proto.UnimplementedOrchestratorServer
16-
port int
17-
triggerChannel chan *proto.TriggerRequest
18-
pollingChannel chan *proto.PollingRequest
19-
terminationChannel chan *proto.ShutdownRequest
16+
port int
17+
triggerChannel chan *proto.TriggerRequest
18+
pollingChannel chan *proto.PollingRequest
19+
terminationChannel chan *proto.ShutdownRequest
20+
dataStoreTestChannel chan *proto.DataStoreConnectionTestRequest
2021

2122
lastTriggerResponse *proto.TriggerResponse
2223
lastPollingResponse *proto.PollingResponse
2324
}
2425

2526
func NewGrpcServer() *GrpcServerMock {
2627
server := &GrpcServerMock{
27-
triggerChannel: make(chan *proto.TriggerRequest),
28-
pollingChannel: make(chan *proto.PollingRequest),
29-
terminationChannel: make(chan *proto.ShutdownRequest),
28+
triggerChannel: make(chan *proto.TriggerRequest),
29+
pollingChannel: make(chan *proto.PollingRequest),
30+
terminationChannel: make(chan *proto.ShutdownRequest),
31+
dataStoreTestChannel: make(chan *proto.DataStoreConnectionTestRequest),
3032
}
3133
var wg sync.WaitGroup
3234
wg.Add(1)
@@ -108,6 +110,20 @@ func (s *GrpcServerMock) RegisterPollerAgent(id *proto.AgentIdentification, stre
108110
}
109111
}
110112

113+
func (s *GrpcServerMock) RegisterDataStoreConnectionTestAgent(id *proto.AgentIdentification, stream proto.Orchestrator_RegisterDataStoreConnectionTestAgentServer) error {
114+
if id.Token != "token" {
115+
return fmt.Errorf("could not validate token")
116+
}
117+
118+
for {
119+
dsTestRequest := <-s.dataStoreTestChannel
120+
err := stream.Send(dsTestRequest)
121+
if err != nil {
122+
log.Println("could not send polling request to agent: %w", err)
123+
}
124+
}
125+
}
126+
111127
func (s *GrpcServerMock) SendPolledSpans(ctx context.Context, result *proto.PollingResponse) (*proto.Empty, error) {
112128
if result.AgentIdentification == nil || result.AgentIdentification.Token != "token" {
113129
return nil, fmt.Errorf("could not validate token")
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"log"
8+
9+
"github.com/kubeshop/tracetest/agent/proto"
10+
)
11+
12+
func (c *Client) startDataStoreConnectionTestListener(ctx context.Context) error {
13+
client := proto.NewOrchestratorClient(c.conn)
14+
15+
stream, err := client.RegisterDataStoreConnectionTestAgent(ctx, c.sessionConfig.AgentIdentification)
16+
if err != nil {
17+
return fmt.Errorf("could not open agent stream: %w", err)
18+
}
19+
20+
go func() {
21+
for {
22+
req := proto.DataStoreConnectionTestRequest{}
23+
err := stream.RecvMsg(&req)
24+
if err == io.EOF {
25+
return
26+
}
27+
28+
if err != nil {
29+
log.Fatal("could not get message from trigger stream: %w", err)
30+
}
31+
32+
// TODO: Get ctx from request
33+
c.dataStoreConnectionListener(context.Background(), &req)
34+
}
35+
}()
36+
return nil
37+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/kubeshop/tracetest/agent/proto"
8+
)
9+
10+
func (c *Client) SendDataStoreConnectionResult(ctx context.Context, response *proto.DataStoreConnectionTestResponse) error {
11+
client := proto.NewOrchestratorClient(c.conn)
12+
13+
response.AgentIdentification = c.sessionConfig.AgentIdentification
14+
15+
_, err := client.SendDataStoreConnectionTestResult(ctx, response)
16+
if err != nil {
17+
return fmt.Errorf("could not send data store connection result request: %w", err)
18+
}
19+
20+
return nil
21+
}

agent/initialization/start.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ func NewClient(ctx context.Context, config config.Config) (*client.Client, error
2121

2222
triggerWorker := workers.NewTriggerWorker(client)
2323
pollingWorker := workers.NewPollerWorker(client)
24+
dataStoreTestConnectionWorker := workers.NewTestConnectionWorker(client)
2425

26+
client.OnDataStoreTestConnectionRequest(dataStoreTestConnectionWorker.Test)
2527
client.OnTriggerRequest(triggerWorker.Trigger)
2628
client.OnPollingRequest(pollingWorker.Poll)
2729
client.OnConnectionClosed(func(ctx context.Context, sr *proto.ShutdownRequest) error {

0 commit comments

Comments
 (0)