Skip to content

Commit f5f77a9

Browse files
committed
Do not GET claim in CD unprepare (use local checkpoint)
Signed-off-by: Dr. Jan-Philip Gehrcke <[email protected]>
1 parent bf444c4 commit f5f77a9

File tree

12 files changed

+118
-83
lines changed

12 files changed

+118
-83
lines changed

cmd/compute-domain-controller/computedomain.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (m *ComputeDomainManager) RemoveFinalizer(ctx context.Context, uid string)
191191
return nil
192192
}
193193

194-
// AssertWorkloadsCompletes ensures that all workloads asssociated with a ComputeDomain have completed.
194+
// AssertWorkloadsCompletes ensures that all workloads associated with a ComputeDomain have completed.
195195
//
196196
// TODO: We should probably also check to ensure that all ResourceClaims
197197
// generated from our ResourceClaimTemplate for workloads are gone. Doing

cmd/compute-domain-kubelet-plugin/checkpoint.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"encoding/json"
55

6+
resourceapi "k8s.io/api/resource/v1beta1"
67
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
78
)
89

@@ -12,14 +13,22 @@ type Checkpoint struct {
1213
}
1314

1415
type CheckpointV1 struct {
15-
PreparedClaims PreparedClaims `json:"preparedClaims,omitempty"`
16+
PreparedClaimsByUID `json:"preparedClaims,omitempty"`
17+
}
18+
19+
// key: stringified claim UUID
20+
type PreparedClaimsByUID map[string]PreparedClaim
21+
22+
type PreparedClaim struct {
23+
Status resourceapi.ResourceClaimStatus
24+
PreparedDevices PreparedDevices
1625
}
1726

1827
func newCheckpoint() *Checkpoint {
1928
pc := &Checkpoint{
2029
Checksum: 0,
2130
V1: &CheckpointV1{
22-
PreparedClaims: make(PreparedClaims),
31+
PreparedClaimsByUID: make(PreparedClaimsByUID),
2332
},
2433
}
2534
return pc

cmd/compute-domain-kubelet-plugin/computedomain.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ func (m *ComputeDomainManager) periodicCleanup(ctx context.Context) {
437437
continue
438438
}
439439
if err != nil {
440-
klog.Errorf("error checking for existenc of directory '%s': %v", m.configFilesRoot, err)
440+
klog.Errorf("error checking for existence of directory '%s': %v", m.configFilesRoot, err)
441441
continue
442442
}
443443

cmd/compute-domain-kubelet-plugin/device_state.go

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,13 @@ func NewDeviceState(ctx context.Context, config *Config) (*DeviceState, error) {
115115
}
116116

117117
for _, c := range checkpoints {
118-
if c == DriverPluginCheckpointFile {
118+
if c == DriverPluginCheckpointFileBasename {
119119
return state, nil
120120
}
121121
}
122122

123123
checkpoint := newCheckpoint()
124-
if err := state.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFile, checkpoint); err != nil {
124+
if err := state.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFileBasename, checkpoint); err != nil {
125125
return nil, fmt.Errorf("unable to sync to checkpoint: %v", err)
126126
}
127127

@@ -135,13 +135,17 @@ func (s *DeviceState) Prepare(ctx context.Context, claim *resourceapi.ResourceCl
135135
claimUID := string(claim.UID)
136136

137137
checkpoint := newCheckpoint()
138-
if err := s.checkpointManager.GetCheckpoint(DriverPluginCheckpointFile, checkpoint); err != nil {
139-
return nil, fmt.Errorf("unable to sync from checkpoint: %v", err)
138+
if err := s.checkpointManager.GetCheckpoint(DriverPluginCheckpointFileBasename, checkpoint); err != nil {
139+
return nil, fmt.Errorf("unable to get checkpoint: %w", err)
140140
}
141-
preparedClaims := checkpoint.V1.PreparedClaims
142141

143-
if preparedClaims[claimUID] != nil {
144-
return preparedClaims[claimUID].GetDevices(), nil
142+
preparedClaim, exists := checkpoint.V1.PreparedClaimsByUID[claimUID]
143+
if exists {
144+
// Make this a noop. Associated device(s) has/ave been prepared by us.
145+
// Prepare() must be idempotent, as it may be invoked more than once per
146+
// claim (and actual device preparation must happen at most once).
147+
klog.V(6).Infof("skip prepare: claim %v found in checkpoint", claimUID)
148+
return preparedClaim.PreparedDevices.GetDevices(), nil
145149
}
146150

147151
preparedDevices, err := s.prepareDevices(ctx, claim)
@@ -153,50 +157,65 @@ func (s *DeviceState) Prepare(ctx context.Context, claim *resourceapi.ResourceCl
153157
return nil, fmt.Errorf("unable to create CDI spec file for claim: %w", err)
154158
}
155159

156-
preparedClaims[claimUID] = preparedDevices
157-
if err := s.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFile, checkpoint); err != nil {
158-
return nil, fmt.Errorf("unable to sync to checkpoint: %v", err)
160+
// Add ResourceClaimStatus API object to node-local checkpoint: the
161+
// 'unprepare' code path must use local state exclusively (ResourceClaim
162+
// object might have been deleted from the API server).
163+
checkpoint.V1.PreparedClaimsByUID[claimUID] = PreparedClaim{
164+
Status: claim.Status,
165+
PreparedDevices: preparedDevices,
166+
}
167+
if err := s.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFileBasename, checkpoint); err != nil {
168+
return nil, fmt.Errorf("unable to create checkpoint: %w", err)
159169
}
170+
klog.V(6).Infof("checkpoint written for claim %v", claimUID)
160171

161-
return preparedClaims[claimUID].GetDevices(), nil
172+
return preparedDevices.GetDevices(), nil
162173
}
163174

164-
func (s *DeviceState) Unprepare(ctx context.Context, claim *resourceapi.ResourceClaim) error {
175+
func (s *DeviceState) Unprepare(ctx context.Context, claimRef kubeletplugin.NamespacedObject) error {
165176
s.Lock()
166177
defer s.Unlock()
167178

168-
claimUID := string(claim.UID)
169-
170-
if err := s.unprepareDevices(ctx, claim); err != nil {
171-
return fmt.Errorf("unprepare devices failed: %w", err)
172-
}
179+
claimUID := string(claimRef.UID)
173180

181+
// Rely on local checkpoint state for ability to clean up.
174182
checkpoint := newCheckpoint()
175-
if err := s.checkpointManager.GetCheckpoint(DriverPluginCheckpointFile, checkpoint); err != nil {
176-
return fmt.Errorf("unable to sync from checkpoint: %v", err)
183+
if err := s.checkpointManager.GetCheckpoint(DriverPluginCheckpointFileBasename, checkpoint); err != nil {
184+
return fmt.Errorf("unable to get checkpoint: %w", err)
177185
}
178-
preparedClaims := checkpoint.V1.PreparedClaims
179186

180-
if preparedClaims[claimUID] == nil {
187+
pc, exists := checkpoint.V1.PreparedClaimsByUID[claimUID]
188+
if !exists {
189+
// Not an error: if this claim UID is not in the checkpoint then this
190+
// device was never prepared or has already been unprepared (assume that
191+
// Prepare+Checkpoint are done transactionally). Note that
192+
// claimRef.String() contains namespace, name, UID.
193+
klog.Infof("unprepare noop: claim not found in checkpoint data: %v", claimRef.String())
181194
return nil
182195
}
183196

197+
if err := s.unprepareDevices(ctx, &pc.Status); err != nil {
198+
return fmt.Errorf("unprepare devices failed: %w", err)
199+
}
200+
184201
err := s.cdi.DeleteClaimSpecFile(claimUID)
185202
if err != nil {
186203
return fmt.Errorf("unable to delete CDI spec file for claim: %w", err)
187204
}
188205

189-
delete(preparedClaims, claimUID)
190-
if err := s.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFile, checkpoint); err != nil {
191-
return fmt.Errorf("unable to sync to checkpoint: %v", err)
206+
// Write new checkpoint reflecting that all devices for this claim have been
207+
// unprepared (by virtue of removing its UID from all mappings).
208+
delete(checkpoint.V1.PreparedClaimsByUID, claimUID)
209+
if err := s.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFileBasename, checkpoint); err != nil {
210+
return fmt.Errorf("create checkpoint failed: %w", err)
192211
}
193212

194213
return nil
195214
}
196215

197216
func (s *DeviceState) prepareDevices(ctx context.Context, claim *resourceapi.ResourceClaim) (PreparedDevices, error) {
198217
// Generate a mapping of each OpaqueDeviceConfigs to the Device.Results it applies to
199-
configResultsMap, err := s.getConfigResultsMap(claim)
218+
configResultsMap, err := s.getConfigResultsMap(&claim.Status)
200219
if err != nil {
201220
return nil, fmt.Errorf("error generating configResultsMap: %w", err)
202221
}
@@ -283,9 +302,9 @@ func (s *DeviceState) prepareDevices(ctx context.Context, claim *resourceapi.Res
283302
return preparedDevices, nil
284303
}
285304

286-
func (s *DeviceState) unprepareDevices(ctx context.Context, claim *resourceapi.ResourceClaim) error {
305+
func (s *DeviceState) unprepareDevices(ctx context.Context, cs *resourceapi.ResourceClaimStatus) error {
287306
// Generate a mapping of each OpaqueDeviceConfigs to the Device.Results it applies to
288-
configResultsMap, err := s.getConfigResultsMap(claim)
307+
configResultsMap, err := s.getConfigResultsMap(cs)
289308
if err != nil {
290309
return fmt.Errorf("error generating configResultsMap: %w", err)
291310
}
@@ -407,12 +426,12 @@ func (s *DeviceState) applyComputeDomainDaemonConfig(ctx context.Context, config
407426
return &configState, nil
408427
}
409428

410-
func (s *DeviceState) getConfigResultsMap(claim *resourceapi.ResourceClaim) (map[runtime.Object][]*resourceapi.DeviceRequestAllocationResult, error) {
429+
func (s *DeviceState) getConfigResultsMap(rcs *resourceapi.ResourceClaimStatus) (map[runtime.Object][]*resourceapi.DeviceRequestAllocationResult, error) {
411430
// Retrieve the full set of device configs for the driver.
412431
configs, err := GetOpaqueDeviceConfigs(
413432
configapi.Decoder,
414433
DriverName,
415-
claim.Status.Allocation.Devices.Config,
434+
rcs.Allocation.Devices.Config,
416435
)
417436
if err != nil {
418437
return nil, fmt.Errorf("error getting opaque device configs: %v", err)
@@ -433,7 +452,7 @@ func (s *DeviceState) getConfigResultsMap(claim *resourceapi.ResourceClaim) (map
433452
// Look through the configs and figure out which one will be applied to
434453
// each device allocation result based on their order of precedence and type.
435454
configResultsMap := make(map[runtime.Object][]*resourceapi.DeviceRequestAllocationResult)
436-
for _, result := range claim.Status.Allocation.Devices.Results {
455+
for _, result := range rcs.Allocation.Devices.Results {
437456
device, exists := s.allocatable[result.Device]
438457
if !exists {
439458
return nil, fmt.Errorf("requested device is not allocatable: %v", result.Device)

cmd/compute-domain-kubelet-plugin/driver.go

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"time"
2525

2626
resourceapi "k8s.io/api/resource/v1beta1"
27-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2827
"k8s.io/apimachinery/pkg/types"
2928
coreclientset "k8s.io/client-go/kubernetes"
3029
"k8s.io/dynamic-resource-allocation/kubeletplugin"
@@ -117,7 +116,7 @@ func (d *driver) Shutdown() error {
117116
}
118117

119118
func (d *driver) PrepareResourceClaims(ctx context.Context, claims []*resourceapi.ResourceClaim) (map[types.UID]kubeletplugin.PrepareResult, error) {
120-
klog.Infof("PrepareResourceClaims called with %d claim(s)", len(claims))
119+
klog.V(6).Infof("PrepareResourceClaims called with %d claim(s)", len(claims))
121120

122121
var wg sync.WaitGroup
123122
ctx, cancel := context.WithTimeout(ctx, ErrorRetryMaxTimeout)
@@ -147,7 +146,7 @@ func (d *driver) PrepareResourceClaims(ctx context.Context, claims []*resourceap
147146
}
148147

149148
func (d *driver) UnprepareResourceClaims(ctx context.Context, claimRefs []kubeletplugin.NamespacedObject) (map[types.UID]error, error) {
150-
klog.Infof("UnprepareResourceClaims called with %d claim(s)", len(claimRefs))
149+
klog.V(6).Infof("UnprepareResourceClaims called with %d claim(s)", len(claimRefs))
151150

152151
var wg sync.WaitGroup
153152
ctx, cancel := context.WithTimeout(ctx, ErrorRetryMaxTimeout)
@@ -204,27 +203,11 @@ func (d *driver) nodeUnprepareResource(ctx context.Context, claimRef kubeletplug
204203
d.Lock()
205204
defer d.Unlock()
206205

207-
// Fetching the resource claim should not be needed (and not be done) in the
208-
// unprepare code path. Any state required during unprepare can be stored
209-
// via checkpointing.
210-
claim, err := d.client.ResourceV1beta1().ResourceClaims(claimRef.Namespace).Get(
211-
ctx,
212-
claimRef.Name,
213-
metav1.GetOptions{})
214-
215-
if err != nil {
216-
return isPermanentError(err), fmt.Errorf("failed to fetch ResourceClaim %s in namespace %s: %w", claimRef.Name, claimRef.Namespace, err)
217-
}
218-
219-
if claim.Status.Allocation == nil {
220-
return true, fmt.Errorf("no allocation set in ResourceClaim %s in namespace %s", claim.Name, claim.Namespace)
221-
}
222-
223-
if err := d.state.Unprepare(ctx, claim); err != nil {
224-
return isPermanentError(err), fmt.Errorf("error unpreparing devices for claim '%v': %w", claim.UID, err)
206+
if err := d.state.Unprepare(ctx, claimRef); err != nil {
207+
return isPermanentError(err), fmt.Errorf("error unpreparing devices for claim '%v': %w", claimRef.String(), err)
225208
}
226209

227-
klog.Infof("unprepared devices for claim '%v'", claim.UID)
210+
klog.Infof("unprepared devices for claim '%v'", claimRef.String())
228211
return true, nil
229212
}
230213

cmd/compute-domain-kubelet-plugin/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ import (
3232
)
3333

3434
const (
35-
DriverName = "compute-domain.nvidia.com"
36-
DriverPluginPath = "/var/lib/kubelet/plugins/" + DriverName
37-
DriverPluginCheckpointFile = "checkpoint.json"
35+
DriverName = "compute-domain.nvidia.com"
36+
DriverPluginPath = "/var/lib/kubelet/plugins/" + DriverName
37+
DriverPluginCheckpointFileBasename = "checkpoint.json"
3838
)
3939

4040
type Flags struct {

cmd/compute-domain-kubelet-plugin/prepared.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222

2323
type PreparedDeviceList []PreparedDevice
2424
type PreparedDevices []*PreparedDeviceGroup
25-
type PreparedClaims map[string]PreparedDevices
2625

2726
type PreparedDevice struct {
2827
Channel *PreparedComputeDomainChannel `json:"channel"`

cmd/gpu-kubelet-plugin/checkpoint.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"encoding/json"
55

6+
resourceapi "k8s.io/api/resource/v1beta1"
67
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
78
)
89

@@ -12,14 +13,22 @@ type Checkpoint struct {
1213
}
1314

1415
type CheckpointV1 struct {
15-
PreparedClaims PreparedClaims `json:"preparedClaims,omitempty"`
16+
PreparedClaimsByUID `json:"preparedClaims,omitempty"`
17+
}
18+
19+
// key: stringified claim UUID
20+
type PreparedClaimsByUID map[string]PreparedClaim
21+
22+
type PreparedClaim struct {
23+
Status resourceapi.ResourceClaimStatus
24+
PreparedDevices PreparedDevices
1625
}
1726

1827
func newCheckpoint() *Checkpoint {
1928
pc := &Checkpoint{
2029
Checksum: 0,
2130
V1: &CheckpointV1{
22-
PreparedClaims: make(PreparedClaims),
31+
PreparedClaimsByUID: make(PreparedClaimsByUID),
2332
},
2433
}
2534
return pc

0 commit comments

Comments
 (0)