Skip to content

Commit 819ea66

Browse files
sguiheuxbnjjj
authored andcommitted
fix(api): reduce locktime + avoid deadlock (#3010)
* fix close channel * fix: init broker
1 parent 2b5eece commit 819ea66

File tree

2 files changed

+73
-102
lines changed

2 files changed

+73
-102
lines changed

engine/api/api_routes.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@ func (api *API) InitRouter() {
2424
api.lastUpdateBroker.Init(api.Router.Background)
2525

2626
api.eventsBroker = &eventsBroker{
27-
cache: api.Cache,
28-
clients: make(map[string]eventsBrokerSubscribe),
29-
dbFunc: api.DBConnectionFactory.GetDBMap,
30-
messages: make(chan sdk.Event),
31-
mutex: &sync.Mutex{},
27+
cache: api.Cache,
28+
clients: make(map[string]eventsBrokerSubscribe),
29+
dbFunc: api.DBConnectionFactory.GetDBMap,
30+
messages: make(chan sdk.Event),
31+
mutex: &sync.Mutex{},
32+
disconnectedMutex: &sync.Mutex{},
33+
disconnected: make(map[string]bool),
3234
}
3335
api.eventsBroker.Init(context.Background())
3436

engine/api/events.go

Lines changed: 66 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,25 @@ import (
2323
type eventsBrokerSubscribe struct {
2424
UUID string
2525
User *sdk.User
26-
Queue chan string
26+
Queue chan sdk.Event
2727
}
2828

2929
// lastUpdateBroker keeps connected client of the current route,
3030
type eventsBroker struct {
31-
clients map[string]eventsBrokerSubscribe
32-
messages chan sdk.Event
33-
mutex *sync.Mutex
34-
dbFunc func() *gorp.DbMap
35-
cache cache.Store
31+
clients map[string]eventsBrokerSubscribe
32+
messages chan sdk.Event
33+
mutex *sync.Mutex
34+
disconnected map[string]bool
35+
disconnectedMutex *sync.Mutex
36+
dbFunc func() *gorp.DbMap
37+
cache cache.Store
3638
}
3739

3840
// AddClient add a client to the client map
39-
func (b *eventsBroker) addClient(uuid string, messageChan eventsBrokerSubscribe) {
41+
func (b *eventsBroker) addClient(client eventsBrokerSubscribe) {
4042
b.mutex.Lock()
4143
defer b.mutex.Unlock()
42-
b.clients[uuid] = messageChan
44+
b.clients[client.UUID] = client
4345
}
4446

4547
// CleanAll cleans all clients
@@ -54,37 +56,10 @@ func (b *eventsBroker) cleanAll() {
5456
}
5557
}
5658

57-
// CleanClient cleans a client
58-
func (b *eventsBroker) cleanClient(client eventsBrokerSubscribe) {
59-
b.mutex.Lock()
60-
defer b.mutex.Unlock()
61-
62-
// Close channel
63-
close(client.Queue)
64-
// Delete client from map
65-
delete(b.clients, client.UUID)
66-
}
67-
68-
func (b *eventsBroker) setUser(user *sdk.User) {
69-
b.mutex.Lock()
70-
defer b.mutex.Unlock()
71-
for _, c := range b.clients {
72-
if c.User.Username == user.Username {
73-
c.User = user
74-
break
75-
}
76-
}
77-
}
78-
79-
func (b *eventsBroker) getUser(username string) *sdk.User {
80-
b.mutex.Lock()
81-
defer b.mutex.Unlock()
82-
for _, c := range b.clients {
83-
if c.User.Username == username {
84-
return c.User
85-
}
86-
}
87-
return nil
59+
func (b *eventsBroker) disconnectClient(uuid string) {
60+
b.disconnectedMutex.Lock()
61+
defer b.disconnectedMutex.Unlock()
62+
b.disconnected[uuid] = true
8863
}
8964

9065
//Init the eventsBroker
@@ -133,24 +108,6 @@ func cacheSubscribe(c context.Context, cacheMsgChan chan<- sdk.Event, store cach
133108
}
134109
}
135110

136-
func (b *eventsBroker) UpdateUserPermissions(username string) {
137-
var user *sdk.User
138-
139-
user = b.getUser(username)
140-
141-
if user == nil {
142-
return
143-
}
144-
// load permission without being in the mutex lock
145-
if err := loadUserPermissions(b.dbFunc(), b.cache, user); err != nil {
146-
log.Error("eventsBroker.UpdateUserPermissions> Cannot load user permission:%s", err)
147-
}
148-
149-
// then, relock map and update user
150-
b.setUser(user)
151-
152-
}
153-
154111
// Start the broker
155112
func (b *eventsBroker) Start(c context.Context) {
156113
for {
@@ -162,12 +119,7 @@ func (b *eventsBroker) Start(c context.Context) {
162119
return
163120
}
164121
case receivedEvent := <-b.messages:
165-
bEvent, err := json.Marshal(receivedEvent)
166-
if err != nil {
167-
log.Warning("eventsBroker.Start> Unable to marshal event: %+v", receivedEvent)
168-
continue
169-
}
170-
b.manageEvent(receivedEvent, string(bEvent))
122+
b.manageEvent(receivedEvent)
171123
}
172124
}
173125
}
@@ -191,14 +143,14 @@ func (b *eventsBroker) ServeHTTP() Handler {
191143
return sdk.WrapError(err, "eventsBroker.Serve Cannot load user permission")
192144
}
193145

194-
messageChan := eventsBrokerSubscribe{
146+
client := eventsBrokerSubscribe{
195147
UUID: uuid,
196148
User: user,
197-
Queue: make(chan string, 10), // chan buffered, to avoid goroutine Start() wait on push in queue
149+
Queue: make(chan sdk.Event, 10), // chan buffered, to avoid goroutine Start() wait on push in queue
198150
}
199151

200152
// Add this client to the map of those that should receive updates
201-
b.addClient(uuid, messageChan)
153+
b.addClient(client)
202154

203155
// Set the headers related to event streaming.
204156
w.Header().Set("Content-Type", "text/event-stream")
@@ -219,16 +171,26 @@ func (b *eventsBroker) ServeHTTP() Handler {
219171
select {
220172
case <-ctx.Done():
221173
log.Info("events.Http: context done")
222-
b.cleanClient(messageChan)
174+
b.disconnectClient(client.UUID)
223175
break leave
224176
case <-r.Context().Done():
225177
log.Info("events.Http: client disconnected")
226-
b.cleanClient(messageChan)
178+
b.disconnectClient(client.UUID)
227179
break leave
228-
case msg := <-messageChan.Queue:
180+
case event := <-client.Queue:
181+
if ok := client.manageEvent(event); !ok {
182+
continue
183+
}
184+
185+
msg, errJ := json.Marshal(event)
186+
if errJ != nil {
187+
log.Warning("sendevent> Unavble to marshall event: %v", errJ)
188+
continue
189+
}
190+
229191
var buffer bytes.Buffer
230192
buffer.WriteString("data: ")
231-
buffer.WriteString(msg)
193+
buffer.Write(msg)
232194
buffer.WriteString("\n\n")
233195

234196
if _, err := w.Write(buffer.Bytes()); err != nil {
@@ -247,57 +209,64 @@ func (b *eventsBroker) ServeHTTP() Handler {
247209
}
248210
}
249211

250-
func (b *eventsBroker) manageEvent(receivedEvent sdk.Event, eventS string) {
212+
func (b *eventsBroker) manageEvent(receivedEvent sdk.Event) {
251213
b.mutex.Lock()
252214
defer b.mutex.Unlock()
253215
for _, i := range b.clients {
254-
if i.Queue != nil {
255-
b.handleEvent(receivedEvent, eventS, i)
256-
} else {
257-
log.Warning("eventsBroker.manageEvent > Queue is null for client %+v/%s", i.User, i.UUID)
216+
if b.canSend(i) {
217+
i.Queue <- receivedEvent
258218
}
259-
260219
}
261220
}
262221

263-
func (b *eventsBroker) handleEvent(event sdk.Event, eventS string, subscriber eventsBrokerSubscribe) {
264-
if strings.HasPrefix(event.EventType, "sdk.EventAction") {
265-
subscriber.Queue <- eventS
222+
// canSend Test if client is connected. If not, close channel and remove client from map
223+
func (b *eventsBroker) canSend(client eventsBrokerSubscribe) bool {
224+
b.disconnectedMutex.Lock()
225+
defer b.disconnectedMutex.Unlock()
226+
if _, ok := b.disconnected[client.UUID]; !ok {
227+
return true
266228
}
229+
close(client.Queue)
230+
delete(b.clients, client.UUID)
231+
return false
232+
}
233+
234+
func (s *eventsBrokerSubscribe) manageEvent(event sdk.Event) bool {
267235
if strings.HasPrefix(event.EventType, "sdk.EventProject") {
268-
if subscriber.User.Admin || permission.ProjectPermission(event.ProjectKey, subscriber.User) >= permission.PermissionRead {
269-
subscriber.Queue <- eventS
236+
if s.User.Admin || permission.ProjectPermission(event.ProjectKey, s.User) >= permission.PermissionRead {
237+
return true
270238
}
271-
return
239+
return false
272240
}
273241
if strings.HasPrefix(event.EventType, "sdk.EventWorkflow") || strings.HasPrefix(event.EventType, "sdk.EventRunWorkflow") {
274-
if subscriber.User.Admin || permission.WorkflowPermission(event.ProjectKey, event.WorkflowName, subscriber.User) >= permission.PermissionRead {
275-
subscriber.Queue <- eventS
242+
if s.User.Admin || permission.WorkflowPermission(event.ProjectKey, event.WorkflowName, s.User) >= permission.PermissionRead {
243+
return true
276244
}
277-
return
245+
return false
278246
}
279247
if strings.HasPrefix(event.EventType, "sdk.EventApplication") {
280-
if subscriber.User.Admin || permission.ApplicationPermission(event.ProjectKey, event.ApplicationName, subscriber.User) >= permission.PermissionRead {
281-
subscriber.Queue <- eventS
248+
if s.User.Admin || permission.ApplicationPermission(event.ProjectKey, event.ApplicationName, s.User) >= permission.PermissionRead {
249+
return true
282250
}
283-
return
251+
return false
284252
}
285253
if strings.HasPrefix(event.EventType, "sdk.EventPipeline") {
286-
if subscriber.User.Admin || permission.PipelinePermission(event.ProjectKey, event.PipelineName, subscriber.User) >= permission.PermissionRead {
287-
subscriber.Queue <- eventS
254+
if s.User.Admin || permission.PipelinePermission(event.ProjectKey, event.PipelineName, s.User) >= permission.PermissionRead {
255+
return true
288256
}
289-
return
257+
return false
290258
}
291259
if strings.HasPrefix(event.EventType, "sdk.EventEnvironment") {
292-
if subscriber.User.Admin || permission.EnvironmentPermission(event.ProjectKey, event.EnvironmentName, subscriber.User) >= permission.PermissionRead {
293-
subscriber.Queue <- eventS
260+
if s.User.Admin || permission.EnvironmentPermission(event.ProjectKey, event.EnvironmentName, s.User) >= permission.PermissionRead {
261+
return true
294262
}
295-
return
263+
return false
296264
}
297265
if strings.HasPrefix(event.EventType, "sdk.EventBroadcast") {
298-
if subscriber.User.Admin || event.ProjectKey == "" || permission.AccessToProject(event.ProjectKey, subscriber.User, permission.PermissionRead) {
299-
subscriber.Queue <- eventS
266+
if s.User.Admin || event.ProjectKey == "" || permission.AccessToProject(event.ProjectKey, s.User, permission.PermissionRead) {
267+
return true
300268
}
301-
return
269+
return false
302270
}
271+
return false
303272
}

0 commit comments

Comments
 (0)