Skip to content

Commit 45d56f5

Browse files
committed
init fields
1 parent f5d7fa3 commit 45d56f5

File tree

2 files changed

+32
-10
lines changed

2 files changed

+32
-10
lines changed

client/cmd/datastore_watchdeviations.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,31 @@ var datastoreWatchDeviationCmd = &cobra.Command{
4141
if err != nil {
4242
return err
4343
}
44+
count := 0
4445
for {
4546
rsp, err := stream.Recv()
4647
if err != nil {
4748
return err
4849
}
4950
switch format {
5051
case "":
51-
fmt.Printf("%s: %s: %s: %s: %s : %s -> %s\n", rsp.GetName(), rsp.GetIntent(), rsp.GetEvent(), rsp.GetReason(), utils.ToXPath(rsp.GetPath(), false), rsp.GetExpectedValue(), rsp.GetCurrentValue())
52-
rsp.GetName()
52+
switch rsp.Event {
53+
case sdcpb.DeviationEvent_START:
54+
fmt.Printf("%s: %s: %s\n",
55+
rsp.GetName(), rsp.GetIntent(), rsp.GetEvent())
56+
case sdcpb.DeviationEvent_END:
57+
fmt.Printf("%s: %s: %s\n",
58+
rsp.GetName(), rsp.GetIntent(), rsp.GetEvent())
59+
60+
fmt.Printf("Received %d deviations\n", count)
61+
count = 0
62+
default:
63+
count++
64+
fmt.Printf("%s: %s: %s: %s: %s : %s -> %s\n",
65+
rsp.GetName(), rsp.GetIntent(), rsp.GetEvent(),
66+
rsp.GetReason(), utils.ToXPath(rsp.GetPath(), false),
67+
rsp.GetExpectedValue(), rsp.GetCurrentValue())
68+
}
5369
case "json":
5470
b, err := json.MarshalIndent(rsp, "", " ")
5571
if err != nil {
@@ -58,8 +74,6 @@ var datastoreWatchDeviationCmd = &cobra.Command{
5874
fmt.Println(string(b))
5975
}
6076
}
61-
62-
return nil
6377
},
6478
}
6579

pkg/datastore/datastore_rpc.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,14 @@ type Datastore struct {
8585
// func New(c *config.DatastoreConfig, schemaServer *config.RemoteSchemaServer) *Datastore {
8686
func New(ctx context.Context, c *config.DatastoreConfig, scc schema.Client, cc cache.Client, opts ...grpc.DialOption) *Datastore {
8787
ds := &Datastore{
88-
config: c,
89-
schemaClient: scc,
90-
cacheClient: cc,
91-
intentMutex: new(sync.Mutex),
92-
m: new(sync.RWMutex),
93-
deviationClients: make(map[string]sdcpb.DataServer_WatchDeviationsServer),
88+
config: c,
89+
schemaClient: scc,
90+
cacheClient: cc,
91+
intentMutex: new(sync.Mutex),
92+
m: new(sync.RWMutex),
93+
deviationClients: make(map[string]sdcpb.DataServer_WatchDeviationsServer),
94+
md: new(sync.RWMutex),
95+
currentIntentsDeviations: make(map[string][]*sdcpb.WatchDeviationResponse),
9496
}
9597
if c.Sync != nil {
9698
ds.synCh = make(chan *target.SyncUpdate, c.Sync.Buffer)
@@ -1102,6 +1104,9 @@ func (d *Datastore) runDeviationUpdate(ctx context.Context, dm map[string]sdcpb.
11021104
}
11031105
}
11041106
xp := utils.ToXPath(sp, false)
1107+
if _, ok := newDeviations[xp]; !ok {
1108+
newDeviations[xp] = make([]*sdcpb.WatchDeviationResponse, 0, 1)
1109+
}
11051110
newDeviations[xp] = append(newDeviations[xp], rsp)
11061111
}
11071112
// remaining intents
@@ -1148,6 +1153,9 @@ func (d *Datastore) runDeviationUpdate(ctx context.Context, dm map[string]sdcpb.
11481153
}
11491154
}
11501155
xp := utils.ToXPath(sp, false)
1156+
if _, ok := newDeviations[xp]; !ok {
1157+
newDeviations[xp] = make([]*sdcpb.WatchDeviationResponse, 0, 1)
1158+
}
11511159
newDeviations[xp] = append(newDeviations[xp], rsp)
11521160
}
11531161
}

0 commit comments

Comments
 (0)