Skip to content

Commit 0774c89

Browse files
committed
fix(Operator): Fixed the WorkflowInstanceController not to claim instances of workflows explicitly assigned to other operators
Signed-off-by: Charles d'Avernas <[email protected]>
1 parent a1bc5c4 commit 0774c89

File tree

5 files changed

+45
-4
lines changed

5 files changed

+45
-4
lines changed

src/operator/Synapse.Operator/Program.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262

6363
services.AddScoped<WorkflowController>();
6464
services.AddScoped<IResourceController<Workflow>>(provider => provider.GetRequiredService<WorkflowController>());
65+
services.AddScoped<IWorkflowController>(provider => provider.GetRequiredService<WorkflowController>());
6566

6667
services.AddScoped<WorkflowInstanceController>();
6768
services.AddScoped<IResourceController<WorkflowInstance>>(provider => provider.GetRequiredService<WorkflowInstanceController>());

src/operator/Synapse.Operator/Services/Interfaces/IOperatorController.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
namespace Synapse.Operator.Services;
1515

1616
/// <summary>
17-
/// Defines the fundamentals of the service used to access the current Synapse Operator
17+
/// Defines the fundamentals of a service used to access the current Synapse Operator
1818
/// </summary>
1919
public interface IOperatorController
2020
: IHostedService
@@ -25,4 +25,4 @@ public interface IOperatorController
2525
/// </summary>
2626
IResourceMonitor<Resources.Operator> Operator { get; }
2727

28-
}
28+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright © 2024-Present The Synapse Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"),
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
namespace Synapse.Operator.Services;
15+
16+
/// <summary>
17+
/// Defines the fundamentals of a service used to access all monitored workflows
18+
/// </summary>
19+
public interface IWorkflowController
20+
{
21+
22+
/// <summary>
23+
/// Gets a dictionary containing all monitored workflows
24+
/// </summary>
25+
IReadOnlyDictionary<string, Workflow> Workflows { get; }
26+
27+
}

src/operator/Synapse.Operator/Services/WorkflowController.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ namespace Synapse.Operator.Services;
2525
/// <param name="operatorOptions">The current <see cref="Configuration.OperatorOptions"/></param>
2626
/// <param name="operatorAccessor">The service used to access the current <see cref="Resources.Operator"/></param>
2727
public class WorkflowController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<Workflow>> controllerOptions, IResourceRepository resources, IOptions<OperatorOptions> operatorOptions, IOperatorController operatorAccessor)
28-
: ResourceController<Workflow>(loggerFactory, controllerOptions, resources)
28+
: ResourceController<Workflow>(loggerFactory, controllerOptions, resources), IWorkflowController
2929
{
3030

3131
/// <summary>
@@ -48,6 +48,9 @@ public class WorkflowController(IServiceProvider serviceProvider, ILoggerFactory
4848
/// </summary>
4949
protected ConcurrentDictionary<string, WorkflowScheduler> Schedulers { get; } = [];
5050

51+
/// <inheritdoc/>
52+
public IReadOnlyDictionary<string, Workflow> Workflows => this.Resources.AsReadOnly();
53+
5154
/// <inheritdoc/>
5255
public override async Task StartAsync(CancellationToken cancellationToken)
5356
{

src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ namespace Synapse.Operator.Services;
2323
/// <param name="controllerOptions">The service used to access the current <see cref="IOptions{TOptions}"/></param>
2424
/// <param name="repository">The service used to manage <see cref="IResource"/>s</param>
2525
/// <param name="operatorController">The service used to access the current <see cref="Resources.Operator"/></param>
26+
/// <param name="workflowController">The service used to access all monitored <see cref="Workflow"/>s</param>
2627
/// <param name="documents">The <see cref="IRepository"/> used to manage <see cref="Document"/>s</param>
27-
public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<WorkflowInstance>> controllerOptions, IResourceRepository repository, IOperatorController operatorController, IRepository<Document, string> documents)
28+
public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<WorkflowInstance>> controllerOptions, IResourceRepository repository, IOperatorController operatorController, IWorkflowController workflowController, IRepository<Document, string> documents)
2829
: ResourceController<WorkflowInstance>(loggerFactory, controllerOptions, repository)
2930
{
3031

@@ -38,6 +39,11 @@ public class WorkflowInstanceController(IServiceProvider serviceProvider, ILogge
3839
/// </summary>
3940
protected IResourceMonitor<Resources.Operator> Operator => operatorController.Operator;
4041

42+
/// <summary>
43+
/// Gets a dictionary containing all monitored <see cref="Workflow"/>s
44+
/// </summary>
45+
protected IReadOnlyDictionary<string, Workflow> Workflows => workflowController.Workflows;
46+
4147
/// <summary>
4248
/// Gets the <see cref="IRepository"/> used to manage <see cref="Document"/>s
4349
/// </summary>
@@ -88,6 +94,8 @@ protected virtual async Task<bool> TryClaimAsync(WorkflowInstance resource, Canc
8894
{
8995
ArgumentNullException.ThrowIfNull(resource);
9096
if (resource.Metadata.Labels != null && resource.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out var operatorQualifiedName)) return operatorQualifiedName == this.Operator.Resource.GetQualifiedName();
97+
if (this.Workflows.TryGetValue(this.GetResourceCacheKey(resource.Spec.Definition.Name, resource.Spec.Definition.Namespace), out var workflow) && workflow != null
98+
&& workflow.Metadata.Labels != null && workflow.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out operatorQualifiedName)) return operatorQualifiedName == this.Operator.Resource.GetQualifiedName();
9199
try
92100
{
93101
var originalResource = resource.Clone();
@@ -113,6 +121,8 @@ protected virtual async Task<bool> TryReleaseAsync(WorkflowInstance resource, Ca
113121
{
114122
ArgumentNullException.ThrowIfNull(resource);
115123
if (resource.Metadata.Labels != null && resource.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out var operatorQualifiedName)) return operatorQualifiedName == this.Operator.Resource.GetQualifiedName();
124+
if (this.Workflows.TryGetValue(this.GetResourceCacheKey(resource.Spec.Definition.Name, resource.Spec.Definition.Namespace), out var workflow) && workflow != null
125+
&& workflow.Metadata.Labels != null && workflow.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out operatorQualifiedName)) return operatorQualifiedName == this.Operator.Resource.GetQualifiedName();
116126
try
117127
{
118128
var originalResource = resource.Clone();

0 commit comments

Comments
 (0)