@@ -17,6 +17,7 @@ package datastore
1717import (
1818 "context"
1919 "errors"
20+ "strings"
2021 "sync"
2122 "time"
2223
@@ -316,10 +317,6 @@ func (d *Datastore) WatchDeviations(req *sdcpb.WatchDeviationRequest, stream sdc
316317 }
317318 pName := p .Addr .String ()
318319
319- if oStream , ok := d .deviationClients [pName ]; ok {
320- _ = oStream // TODO:
321- }
322-
323320 d .deviationClients [pName ] = stream
324321 return nil
325322}
@@ -328,6 +325,7 @@ func (d *Datastore) StopDeviationsWatch(peer string) {
328325 d .m .Lock ()
329326 defer d .m .Unlock ()
330327 delete (d .deviationClients , peer )
328+ log .Debugf ("deviation watcher %s removed" , peer )
331329}
332330
333331func (d * Datastore ) DeviationMgr (ctx context.Context ) {
@@ -342,37 +340,47 @@ func (d *Datastore) DeviationMgr(ctx context.Context) {
342340 return
343341 case <- ticker .C :
344342 d .m .RLock ()
345- deviationClients := make ([]sdcpb.DataServer_WatchDeviationsServer , 0 , len (d .deviationClients ))
346- for _ , devStream := range d .deviationClients {
347- deviationClients = append (deviationClients , devStream )
343+ deviationClientNames := make ([]string , 0 , len (d .deviationClients ))
344+ deviationClients := map [string ]sdcpb.DataServer_WatchDeviationsServer {}
345+ for clientIdentifier , devStream := range d .deviationClients {
346+ deviationClients [clientIdentifier ] = devStream
347+ deviationClientNames = append (deviationClientNames , clientIdentifier )
348348 }
349349 d .m .RUnlock ()
350350 if len (deviationClients ) == 0 {
351+ log .Debugf ("no deviation clients present %s" , d .config .Name )
351352 continue
352353 }
353- for _ , dc := range deviationClients {
354- dc .Send (& sdcpb.WatchDeviationResponse {
354+ log .Debugf ("deviations clients for %s: [ %s ]" , d .config .Name , strings .Join (deviationClientNames , ", " ))
355+ for clientIdentifier , dc := range deviationClients {
356+ err := dc .Send (& sdcpb.WatchDeviationResponse {
355357 Name : d .config .Name ,
356358 Event : sdcpb .DeviationEvent_START ,
357359 })
360+ if err != nil {
361+ log .Errorf ("error sending deviation to %s: %v" , clientIdentifier , err )
362+ }
358363 }
359364 deviationChan , err := d .calculateDeviations (ctx )
360365 if err != nil {
361366 log .Error (err )
362367 continue
363368 }
364369 d .SendDeviations (deviationChan , deviationClients )
365- for _ , dc := range deviationClients {
366- dc .Send (& sdcpb.WatchDeviationResponse {
370+ for clientIdentifier , dc := range deviationClients {
371+ err := dc .Send (& sdcpb.WatchDeviationResponse {
367372 Name : d .config .Name ,
368373 Event : sdcpb .DeviationEvent_END ,
369374 })
375+ if err != nil {
376+ log .Errorf ("error sending deviation to %s: %v" , clientIdentifier , err )
377+ }
370378 }
371379 }
372380 }
373381}
374382
375- func (d * Datastore ) SendDeviations (ch <- chan * treetypes.DeviationEntry , deviationClients [ ]sdcpb.DataServer_WatchDeviationsServer ) {
383+ func (d * Datastore ) SendDeviations (ch <- chan * treetypes.DeviationEntry , deviationClients map [ string ]sdcpb.DataServer_WatchDeviationsServer ) {
376384 wg := & sync.WaitGroup {}
377385 for {
378386 select {
@@ -382,9 +390,9 @@ func (d *Datastore) SendDeviations(ch <-chan *treetypes.DeviationEntry, deviatio
382390 return
383391 }
384392 wg .Add (1 )
385- go func (de DeviationEntry , dcs [ ]sdcpb.DataServer_WatchDeviationsServer ) {
386- for _ , dc := range dcs {
387- dc .Send (& sdcpb.WatchDeviationResponse {
393+ go func (de DeviationEntry , dcs map [ string ]sdcpb.DataServer_WatchDeviationsServer ) {
394+ for clientIdentifier , dc := range dcs {
395+ err := dc .Send (& sdcpb.WatchDeviationResponse {
388396 Name : d .config .Name ,
389397 Intent : de .IntentName (),
390398 Event : sdcpb .DeviationEvent_UPDATE ,
@@ -393,6 +401,9 @@ func (d *Datastore) SendDeviations(ch <-chan *treetypes.DeviationEntry, deviatio
393401 ExpectedValue : de .ExpectedValue (),
394402 CurrentValue : de .CurrentValue (),
395403 })
404+ if err != nil {
405+ log .Errorf ("error sending deviation to %s: %v" , clientIdentifier , err )
406+ }
396407 }
397408 wg .Done ()
398409 }(de , deviationClients )
0 commit comments