Skip to content

Commit d7c4848

Browse files
committed
schema: fix proto schema cache corruption and key collision
- Implement atomic refresh for proto schema descriptors - Replace incremental updates with complete state replacement - Fix cache key collision in GetSchemaBySubjectAndVersion - Remove unnecessary Go 1.21 loop variable capture pattern Fixes schema registry degradation where 502 errors during partial fetches caused inconsistent cache state requiring service restart.
1 parent 6c7064f commit d7c4848

File tree

1 file changed

+31
-87
lines changed

1 file changed

+31
-87
lines changed

backend/pkg/schema/service.go

Lines changed: 31 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -76,121 +76,65 @@ func (s *Service) CheckConnectivity(ctx context.Context) error {
7676

7777
// GetProtoDescriptors returns all file descriptors in a map where the key is the schema id.
7878
// The value is a set of file descriptors because each schema may references / imported proto schemas.
79-
//
80-
//nolint:gocognit,cyclop // complicated refresh logic
8179
func (s *Service) GetProtoDescriptors(ctx context.Context) (map[int]*desc.FileDescriptor, error) {
8280
// Singleflight makes sure to not run the function body if there are concurrent requests. We use this to avoid
8381
// duplicate requests against the schema registry
8482
key := "get-proto-descriptors"
8583
_, err, _ := s.requestGroup.Do(key, func() (any, error) {
84+
// 1. Fetch a complete, fresh state from the registry.
85+
// If the fetch is incomplete (returns any errors), abort the entire refresh.
86+
// This prevents us from ever working with a partial, inconsistent view.
8687
schemasRes, errs := s.registryClient.GetSchemas(ctx, true)
8788
if len(errs) > 0 {
8889
for _, err := range errs {
89-
s.logger.Error("failed to get schema from registry", zap.Error(err))
90-
}
91-
92-
if len(schemasRes) == 0 {
93-
return nil, nil
90+
s.logger.Error("failed to get schema from registry during refresh", zap.Error(err))
9491
}
92+
return nil, fmt.Errorf("failed to fetch a complete list of schemas, aborting refresh. first error: %w", errs[0])
9593
}
9694

97-
s.srRefreshMutex.Lock()
98-
defer s.srRefreshMutex.Unlock()
99-
100-
if s.protoSchemasByID == nil {
101-
s.protoSchemasByID = make(map[int]*SchemaVersionedResponse, len(schemasRes))
102-
}
103-
104-
// collect existing schema IDs
105-
existingSchemaIDs := make(map[int]struct{}, len(s.protoSchemasByID))
106-
for id := range s.protoSchemasByID {
107-
existingSchemaIDs[id] = struct{}{}
108-
}
109-
110-
schemasToCompile := make([]*SchemaVersionedResponse, 0, len(schemasRes))
111-
112-
newSchemaIDs := make(map[int]struct{}, len(schemasRes))
113-
114-
// Index all returned schemas by their respective subject name and version as stored in the schema registry
115-
// Collect the new or updated schemas to compile
95+
// 2. Prepare for compilation in temporary, local structures.
96+
// This ensures we don't touch the service's active state until we are done.
97+
newProtoSchemasByID := make(map[int]*SchemaVersionedResponse)
11698
schemasBySubjectAndVersion := make(map[string]map[int]*SchemaVersionedResponse)
117-
for _, schema := range schemasRes {
118-
schema := schema
11999

120-
if schema.Type != TypeProtobuf {
100+
for _, sch := range schemasRes {
101+
if sch.Type != TypeProtobuf {
121102
continue
122103
}
123-
_, exists := schemasBySubjectAndVersion[schema.Subject]
124-
if !exists {
125-
schemasBySubjectAndVersion[schema.Subject] = make(map[int]*SchemaVersionedResponse)
126-
}
127-
schemasBySubjectAndVersion[schema.Subject][schema.Version] = schema
104+
newProtoSchemasByID[sch.SchemaID] = sch
128105

129-
if existing, ok := s.protoSchemasByID[schema.SchemaID]; !ok || !strings.EqualFold(existing.Schema, schema.Schema) {
130-
schemasToCompile = append(schemasToCompile, schema)
106+
// Build the lookup map for resolving references during compilation.
107+
if _, exists := schemasBySubjectAndVersion[sch.Subject]; !exists {
108+
schemasBySubjectAndVersion[sch.Subject] = make(map[int]*SchemaVersionedResponse)
131109
}
132-
133-
newSchemaIDs[schema.SchemaID] = struct{}{}
134-
135-
s.protoSchemasByID[schema.SchemaID] = schema
110+
schemasBySubjectAndVersion[sch.Subject][sch.Version] = sch
136111
}
137112

113+
// 3. Compile all fetched schemas into a new, temporary map.
114+
// A single compilation failure invalidates the entire refresh attempt.
115+
newProtoFDByID := make(map[int]*desc.FileDescriptor, len(newProtoSchemasByID))
138116
compileStart := time.Now()
139-
140-
// 2. Compile each subject with each of it's references into one or more file descriptors so that they can be
141-
// registered in their own proto registry.
142-
newFDBySchemaID := make(map[int]*desc.FileDescriptor, len(schemasToCompile))
143-
for _, schema := range schemasToCompile {
144-
schema := schema
145-
146-
if schema.Type != TypeProtobuf {
147-
continue
148-
}
149-
117+
for _, schema := range newProtoSchemasByID {
150118
fd, err := s.compileProtoSchemas(schema, schemasBySubjectAndVersion)
151119
if err != nil {
152-
s.logger.Warn("failed to compile proto schema",
120+
s.logger.Error("failed to compile proto schema, aborting refresh",
153121
zap.String("subject", schema.Subject),
154122
zap.Int("schema_id", schema.SchemaID),
155123
zap.Error(err))
156-
continue
124+
return nil, fmt.Errorf("failed to compile schema for subject %q (id %d): %w", schema.Subject, schema.SchemaID, err)
157125
}
158-
newFDBySchemaID[schema.SchemaID] = fd
126+
newProtoFDByID[schema.SchemaID] = fd
159127
}
160128

161-
compileDuration := time.Since(compileStart)
162-
163-
// merge
164-
if s.protoFDByID == nil {
165-
s.protoFDByID = make(map[int]*desc.FileDescriptor, len(newFDBySchemaID))
166-
}
167-
168-
maps.Copy(s.protoFDByID, newFDBySchemaID)
169-
170-
schemasDeleted := 0
171-
172-
// remove schemas only if no errors
173-
if len(errs) == 0 {
174-
schemasIDsToDelete := make([]int, 0, len(s.protoSchemasByID)/2)
175-
176-
for id := range existingSchemaIDs {
177-
if _, ok := newSchemaIDs[id]; !ok {
178-
schemasIDsToDelete = append(schemasIDsToDelete, id)
179-
}
180-
}
181-
182-
for _, id := range schemasIDsToDelete {
183-
delete(s.protoSchemasByID, id)
184-
delete(s.protoFDByID, id)
185-
}
186-
187-
schemasDeleted = len(schemasIDsToDelete)
188-
}
129+
// 4. Success! Atomically swap the service's state with the new, fully compiled state.
130+
s.srRefreshMutex.Lock()
131+
s.protoSchemasByID = newProtoSchemasByID
132+
s.protoFDByID = newProtoFDByID
133+
s.srRefreshMutex.Unlock()
189134

190-
s.logger.Info("compiled new schemas",
191-
zap.Int("updated_schemas", len(schemasToCompile)),
192-
zap.Int("deleted_schemas", schemasDeleted),
193-
zap.Duration("compile_duration", compileDuration))
135+
s.logger.Info("successfully refreshed and recompiled protobuf schemas",
136+
zap.Int("total_schemas", len(newProtoFDByID)),
137+
zap.Duration("compile_duration", time.Since(compileStart)))
194138

195139
return nil, nil
196140
})
@@ -505,7 +449,7 @@ func (s *Service) ValidateProtobufSchema(ctx context.Context, name string, sch S
505449
// GetSchemaBySubjectAndVersion retrieves a schema from the schema registry
506450
// by a given <subject, version> tuple.
507451
func (s *Service) GetSchemaBySubjectAndVersion(ctx context.Context, subject string, version string) (*SchemaVersionedResponse, error) {
508-
cacheKey := subject + "v" + version
452+
cacheKey := fmt.Sprintf("%s::v::%s", subject, version)
509453
cachedSchema, err, _ := s.schemaBySubjectVersion.Get(cacheKey, func() (*SchemaVersionedResponse, error) {
510454
schema, err := s.registryClient.GetSchemaBySubject(ctx, subject, version, true)
511455
if err != nil {

0 commit comments

Comments
 (0)