Skip to content

Commit 7d4148c

Browse files
authored
Merge pull request #83 from iptecharch/optimize-intent-up-del
optimize cache interactions during intent update and delete
2 parents 50e7644 + 230aeaa commit 7d4148c

File tree

9 files changed

+269
-766
lines changed

9 files changed

+269
-766
lines changed

.dockerignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ proto/*proto
77
schema-server
88
cached
99
intra
10+
schema-dir

collocated.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@ grpc-server:
9898
schema-store:
9999
# type is either memory or persistent (default)
100100
type: persistent
101+
cache:
102+
ttl: 60s
103+
capacity: 100
101104
schemas:
102105
- name: sros
103106
vendor: Nokia

pkg/datastore/data_rpc.go

Lines changed: 139 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -100,131 +100,7 @@ func (d *Datastore) Set(ctx context.Context, req *sdcpb.SetDataRequest) (*sdcpb.
100100
case sdcpb.Type_INTENDED:
101101
return nil, status.Error(codes.InvalidArgument, "cannot set fields in INTENDED datastore")
102102
case sdcpb.Type_CANDIDATE:
103-
//
104-
ok, err := d.cacheClient.HasCandidate(ctx, req.GetName(), req.GetDatastore().GetName())
105-
if err != nil {
106-
return nil, err
107-
}
108-
if !ok {
109-
return nil, status.Errorf(codes.InvalidArgument, "unknown candidate %s", req.GetDatastore().GetName())
110-
}
111-
112-
replaces := make([]*sdcpb.Update, 0, len(req.GetReplace()))
113-
updates := make([]*sdcpb.Update, 0, len(req.GetUpdate()))
114-
115-
// expand json/json_ietf values
116-
for _, upd := range req.GetReplace() {
117-
rs, err := d.expandUpdate(ctx, upd, false)
118-
if err != nil {
119-
return nil, status.Errorf(codes.InvalidArgument, "failed expand replace: %v", err)
120-
}
121-
replaces = append(replaces, rs...)
122-
}
123-
124-
for _, upd := range req.GetUpdate() {
125-
rs, err := d.expandUpdate(ctx, upd, false)
126-
if err != nil {
127-
return nil, status.Errorf(codes.InvalidArgument, "failed expand update: %v", err)
128-
}
129-
updates = append(updates, rs...)
130-
}
131-
132-
// debugging
133-
if log.GetLevel() >= log.DebugLevel {
134-
for _, upd := range replaces {
135-
log.Debugf("expanded replace:\n%s", prototext.Format(upd))
136-
}
137-
for _, upd := range updates {
138-
log.Debugf("expanded update:\n%s", prototext.Format(upd))
139-
}
140-
}
141-
142-
// validate individual deletes
143-
dels := make([]*sdcpb.Path, 0, len(req.GetDelete()))
144-
for _, del := range req.GetDelete() {
145-
// err = d.validatePath(ctx, del)
146-
rsp, err := d.schemaClient.ExpandPath(ctx, &sdcpb.ExpandPathRequest{
147-
Path: del,
148-
Schema: d.Schema().GetSchema(),
149-
DataType: sdcpb.DataType_CONFIG,
150-
})
151-
if err != nil {
152-
return nil, status.Errorf(codes.InvalidArgument, "delete path: %q validation failed: %v", del, err)
153-
}
154-
dels = append(dels, rsp.GetPath()...)
155-
}
156-
157-
for _, upd := range replaces {
158-
err = d.validateUpdate(ctx, upd)
159-
if err != nil {
160-
log.Debugf("replace %v validation failed: %v", upd, err)
161-
return nil, status.Errorf(codes.InvalidArgument, "replace: validation failed: %v", err)
162-
}
163-
}
164-
165-
for _, upd := range updates {
166-
err = d.validateUpdate(ctx, upd)
167-
if err != nil {
168-
log.Debugf("update %v validation failed: %v", upd, err)
169-
return nil, status.Errorf(codes.InvalidArgument, "update: validation failed: %v", err)
170-
}
171-
}
172-
173-
// insert/delete
174-
// the order of operations is delete, replace, update
175-
rsp := &sdcpb.SetDataResponse{
176-
Response: make([]*sdcpb.UpdateResult, 0,
177-
len(dels)+len(replaces)+len(updates)),
178-
}
179-
180-
name := fmt.Sprintf("%s/%s", req.GetName(), req.GetDatastore().GetName())
181-
cdels := make([][]string, 0, len(dels))
182-
upds := make([]*cache.Update, 0, len(replaces)+len(updates))
183-
// deletes start
184-
for _, del := range dels {
185-
cdels = append(cdels, utils.ToStrings(del, false, false))
186-
}
187-
for _, changes := range [][]*sdcpb.Update{replaces, updates} {
188-
for _, upd := range changes {
189-
cUpd, err := d.cacheClient.NewUpdate(upd)
190-
if err != nil {
191-
return nil, err
192-
}
193-
upds = append(upds, cUpd)
194-
}
195-
}
196-
err = d.cacheClient.Modify(ctx, name, &cache.Opts{
197-
Store: cachepb.Store_CONFIG,
198-
}, cdels, upds)
199-
if err != nil {
200-
return nil, err
201-
}
202-
203-
// deletes start
204-
for _, del := range req.GetDelete() {
205-
rsp.Response = append(rsp.Response, &sdcpb.UpdateResult{
206-
Path: del,
207-
Op: sdcpb.UpdateResult_DELETE,
208-
})
209-
}
210-
// deletes end
211-
// replaces start
212-
for _, rep := range req.GetReplace() {
213-
rsp.Response = append(rsp.Response, &sdcpb.UpdateResult{
214-
Path: rep.GetPath(),
215-
Op: sdcpb.UpdateResult_REPLACE,
216-
})
217-
}
218-
// replaces end
219-
// updates start
220-
for _, upd := range req.GetUpdate() {
221-
rsp.Response = append(rsp.Response, &sdcpb.UpdateResult{
222-
Path: upd.GetPath(),
223-
Op: sdcpb.UpdateResult_UPDATE,
224-
})
225-
}
226-
// updates end
227-
return rsp, nil
103+
return d.setCandidate(ctx, req, true)
228104
default:
229105
return nil, status.Errorf(codes.InvalidArgument, "unknown datastore %v", req.GetDatastore().GetType())
230106
}
@@ -451,16 +327,16 @@ func (d *Datastore) validateUpdate(ctx context.Context, upd *sdcpb.Update) error
451327
return nil
452328
}
453329

454-
func (d *Datastore) validateMustStatement(ctx context.Context, candidateName string, p *sdcpb.Path) (bool, error) {
455-
// normalizedPaths will contain the provided path. If the last path.elems contins one or more keys, these will be
330+
func (d *Datastore) validateMustStatement(ctx context.Context, candidateName string, p *sdcpb.Path, normalizePaths bool) (bool, error) {
331+
// normalizedPaths will contain the provided path. If the last path.elems contains one or more keys, these will be
456332
// taken and appended to the path. The must statements have to be checked for all of the key elements.
457-
var normalizedPaths []*sdcpb.Path
333+
var normalizedPaths = make([]*sdcpb.Path, 0, 1)
458334

459335
// this is to massage for instance
460336
// /bfd/subinterface[id=ethernet-1.25] -> /bfd/subinterface[id=ethernet-1.25]/id
461337
// because we need to resolve down to id, to retrieve the relevant must statements
462338
// further there can be more then just a single key.
463-
if len(p.Elem[len(p.Elem)-1].Key) > 0 {
339+
if len(p.Elem[len(p.Elem)-1].Key) > 0 && normalizePaths {
464340
for k := range p.GetElem()[len(p.Elem)-1].GetKey() {
465341
// clone p as new path
466342
newPath := proto.Clone(p).(*sdcpb.Path)
@@ -1253,3 +1129,137 @@ func (d *Datastore) subscribeResponseFromCacheUpdate(ctx context.Context, upd *c
12531129
},
12541130
}, nil
12551131
}
1132+
1133+
func (d *Datastore) setCandidate(ctx context.Context, req *sdcpb.SetDataRequest, expandDeletes bool) (*sdcpb.SetDataResponse, error) {
1134+
ok, err := d.cacheClient.HasCandidate(ctx, req.GetName(), req.GetDatastore().GetName())
1135+
if err != nil {
1136+
return nil, err
1137+
}
1138+
if !ok {
1139+
return nil, status.Errorf(codes.InvalidArgument, "unknown candidate %s", req.GetDatastore().GetName())
1140+
}
1141+
1142+
replaces := make([]*sdcpb.Update, 0, len(req.GetReplace()))
1143+
updates := make([]*sdcpb.Update, 0, len(req.GetUpdate()))
1144+
1145+
// expand json/json_ietf values
1146+
for _, upd := range req.GetReplace() {
1147+
rs, err := d.expandUpdate(ctx, upd, false)
1148+
if err != nil {
1149+
return nil, status.Errorf(codes.InvalidArgument, "failed expand replace: %v", err)
1150+
}
1151+
replaces = append(replaces, rs...)
1152+
}
1153+
1154+
for _, upd := range req.GetUpdate() {
1155+
rs, err := d.expandUpdate(ctx, upd, false)
1156+
if err != nil {
1157+
return nil, status.Errorf(codes.InvalidArgument, "failed expand update: %v", err)
1158+
}
1159+
updates = append(updates, rs...)
1160+
}
1161+
1162+
// debugging
1163+
if log.GetLevel() >= log.DebugLevel {
1164+
for _, upd := range replaces {
1165+
log.Debugf("expanded replace:\n%s", prototext.Format(upd))
1166+
}
1167+
for _, upd := range updates {
1168+
log.Debugf("expanded update:\n%s", prototext.Format(upd))
1169+
}
1170+
}
1171+
1172+
// validate individual deletes
1173+
dels := make([]*sdcpb.Path, 0, len(req.GetDelete()))
1174+
for _, del := range req.GetDelete() {
1175+
if expandDeletes {
1176+
rsp, err := d.schemaClient.ExpandPath(ctx, &sdcpb.ExpandPathRequest{
1177+
Path: del,
1178+
Schema: d.Schema().GetSchema(),
1179+
DataType: sdcpb.DataType_CONFIG,
1180+
})
1181+
if err != nil {
1182+
return nil, status.Errorf(codes.InvalidArgument, "delete path: %q validation failed: %v", del, err)
1183+
}
1184+
dels = append(dels, rsp.GetPath()...)
1185+
continue
1186+
}
1187+
err = d.validatePath(ctx, del)
1188+
if err != nil {
1189+
return nil, err
1190+
}
1191+
dels = append(dels, del)
1192+
}
1193+
1194+
for _, upd := range replaces {
1195+
err = d.validateUpdate(ctx, upd)
1196+
if err != nil {
1197+
log.Debugf("replace %v validation failed: %v", upd, err)
1198+
return nil, status.Errorf(codes.InvalidArgument, "replace: validation failed: %v", err)
1199+
}
1200+
}
1201+
1202+
for _, upd := range updates {
1203+
err = d.validateUpdate(ctx, upd)
1204+
if err != nil {
1205+
log.Debugf("update %v validation failed: %v", upd, err)
1206+
return nil, status.Errorf(codes.InvalidArgument, "update: validation failed: %v", err)
1207+
}
1208+
}
1209+
1210+
// insert/delete
1211+
// the order of operations is delete, replace, update
1212+
rsp := &sdcpb.SetDataResponse{
1213+
Response: make([]*sdcpb.UpdateResult, 0,
1214+
len(dels)+len(replaces)+len(updates)),
1215+
}
1216+
1217+
name := fmt.Sprintf("%s/%s", req.GetName(), req.GetDatastore().GetName())
1218+
cdels := make([][]string, 0, len(dels))
1219+
upds := make([]*cache.Update, 0, len(replaces)+len(updates))
1220+
// deletes start
1221+
for _, del := range dels {
1222+
cdels = append(cdels, utils.ToStrings(del, false, false))
1223+
}
1224+
for _, changes := range [][]*sdcpb.Update{replaces, updates} {
1225+
for _, upd := range changes {
1226+
cUpd, err := d.cacheClient.NewUpdate(upd)
1227+
if err != nil {
1228+
return nil, err
1229+
}
1230+
upds = append(upds, cUpd)
1231+
}
1232+
}
1233+
err = d.cacheClient.Modify(ctx, name, &cache.Opts{
1234+
Store: cachepb.Store_CONFIG,
1235+
}, cdels, upds)
1236+
if err != nil {
1237+
return nil, err
1238+
}
1239+
1240+
// deletes start
1241+
for _, del := range req.GetDelete() {
1242+
rsp.Response = append(rsp.Response, &sdcpb.UpdateResult{
1243+
Path: del,
1244+
Op: sdcpb.UpdateResult_DELETE,
1245+
})
1246+
}
1247+
// deletes end
1248+
// replaces start
1249+
for _, rep := range req.GetReplace() {
1250+
rsp.Response = append(rsp.Response, &sdcpb.UpdateResult{
1251+
Path: rep.GetPath(),
1252+
Op: sdcpb.UpdateResult_REPLACE,
1253+
})
1254+
}
1255+
// replaces end
1256+
// updates start
1257+
for _, upd := range req.GetUpdate() {
1258+
rsp.Response = append(rsp.Response, &sdcpb.UpdateResult{
1259+
Path: upd.GetPath(),
1260+
Op: sdcpb.UpdateResult_UPDATE,
1261+
})
1262+
}
1263+
// updates end
1264+
return rsp, nil
1265+
}

pkg/datastore/datastore_rpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ func (d *Datastore) Commit(ctx context.Context, req *sdcpb.CommitRequest) error
190190
// validate MUST statements
191191
for _, upd := range notification.GetUpdate() {
192192
log.Debugf("%s:%s validating must statement on path: %v", d.Name(), name, upd.GetPath())
193-
_, err = d.validateMustStatement(ctx, req.GetDatastore().GetName(), upd.GetPath())
193+
_, err = d.validateMustStatement(ctx, req.GetDatastore().GetName(), upd.GetPath(), true)
194194
if err != nil {
195195
return err
196196
}

pkg/datastore/intent_rpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (d *Datastore) applyIntent(ctx context.Context, candidateName string, prior
168168
continue
169169
}
170170
log.Debugf("%s: %s validating must statement on path: %v", d.Name(), candidateName, upd.GetPath())
171-
_, err = d.validateMustStatement(ctx, candidateName, upd.GetPath())
171+
_, err = d.validateMustStatement(ctx, candidateName, upd.GetPath(), false)
172172
if err != nil {
173173
return err
174174
}

0 commit comments

Comments
 (0)