Skip to content

Commit 055b61f

Browse files
authored
IGNITE-22539 .NET: Add JobTarget (#3993)
Similar to IGNITE-22435 in Java, add `JobTarget` to .NET compute API to make it more readable. Additionally, make `JobDescriptor` generic so that the caller does not have to specify generic arguments. * Before: `compute.SubmitAsync<int, string>(target, new JobDescriptor(...))` * After: `compute.SubmitAsync(target, new JobDescriptor<string>(...))` This is also a step towards strongly-typed single-argument jobs.
1 parent 6e7f25d commit 055b61f

File tree

15 files changed

+376
-232
lines changed

15 files changed

+376
-232
lines changed

modules/platforms/dotnet/Apache.Ignite.Tests/BasicAuthenticatorTests.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Tests;
1919

2020
using System;
2121
using System.Threading.Tasks;
22+
using Ignite.Compute;
2223
using NUnit.Framework;
2324
using Security.Exception;
2425

@@ -93,11 +94,11 @@ private async Task EnableAuthn(bool enable)
9394
}
9495

9596
using var client = await IgniteClient.StartAsync(GetConfig(_authnEnabled));
96-
var nodes = await client.GetClusterNodesAsync();
97+
var nodes = JobTarget.AnyNode(await client.GetClusterNodesAsync());
9798

9899
try
99100
{
100-
await client.Compute.SubmitAsync<object>(nodes, new(EnableAuthnJob), enable ? 1 : 0);
101+
await client.Compute.SubmitAsync(nodes, new JobDescriptor<object>(EnableAuthnJob), enable ? 1 : 0);
101102
}
102103
catch (IgniteClientConnectionException)
103104
{

modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,10 @@ public async Task TestClientSendsComputeJobToTargetNodeWhenDirectConnectionExist
4444
using var client = await IgniteClient.StartAsync(clientCfg);
4545
client.WaitForConnections(3);
4646

47-
IJobExecution<string> exec2 = await client.Compute.SubmitAsync<string>(
48-
new[] { server2.Node }, new(string.Empty));
47+
var job = new JobDescriptor<string>(string.Empty);
4948

50-
IJobExecution<string> exec3 = await client.Compute.SubmitAsync<string>(
51-
new[] { server3.Node }, new(string.Empty));
49+
IJobExecution<string> exec2 = await client.Compute.SubmitAsync(JobTarget.Node(server2.Node), job);
50+
IJobExecution<string> exec3 = await client.Compute.SubmitAsync(JobTarget.Node(server3.Node), job);
5251

5352
Assert.AreEqual("s2", await exec2.GetResultAsync());
5453
Assert.AreEqual("s3", await exec3.GetResultAsync());
@@ -67,12 +66,10 @@ public async Task TestClientSendsComputeJobToDefaultNodeWhenDirectConnectionToTa
6766
using var server3 = new FakeServer(nodeName: "s3");
6867

6968
using var client = await server1.ConnectClientAsync();
69+
var job = new JobDescriptor<string>(string.Empty);
7070

71-
IJobExecution<string> exec2 = await client.Compute.SubmitAsync<string>(
72-
new[] { server2.Node }, new(string.Empty));
73-
74-
IJobExecution<string> exec3 = await client.Compute.SubmitAsync<string>(
75-
new[] { server3.Node }, new(string.Empty));
71+
IJobExecution<string> exec2 = await client.Compute.SubmitAsync(JobTarget.Node(server2.Node), job);
72+
IJobExecution<string> exec3 = await client.Compute.SubmitAsync(JobTarget.Node(server3.Node), job);
7673

7774
Assert.AreEqual("s1", await exec2.GetResultAsync());
7875
Assert.AreEqual("s1", await exec3.GetResultAsync());
@@ -100,13 +97,13 @@ public async Task TestClientRetriesComputeJobOnPrimaryAndDefaultNodes()
10097
client.WaitForConnections(2);
10198

10299
var nodeNames = new HashSet<string>();
100+
var job = new JobDescriptor<string>(string.Empty);
103101

104102
for (int i = 0; i < 100; i++)
105103
{
106104
var node = i % 2 == 0 ? server1.Node : server2.Node;
107105

108-
IJobExecution<string> jobExecution = await client.Compute.SubmitAsync<string>(
109-
new[] { node }, new(string.Empty));
106+
IJobExecution<string> jobExecution = await client.Compute.SubmitAsync(JobTarget.Node(node), job);
110107

111108
string res = await jobExecution.GetResultAsync();
112109

modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs

Lines changed: 125 additions & 81 deletions
Large diffs are not rendered by default.

modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Tests;
2020
using System;
2121
using System.Threading.Tasks;
2222
using Compute;
23+
using Ignite.Compute;
2324
using Ignite.Table;
2425
using Internal.Proto;
2526
using NUnit.Framework;
@@ -50,10 +51,9 @@ public async Task TestPutRoutesRequestToPrimaryNode([Values(true, false)] bool w
5051
{
5152
var keyTuple = new IgniteTuple { ["KEY"] = key };
5253

53-
var primaryNodeNameExec = await client.Compute.SubmitColocatedAsync<string>(
54-
TableName,
55-
keyTuple,
56-
new(ComputeTests.NodeNameJob));
54+
var primaryNodeNameExec = await client.Compute.SubmitAsync(
55+
JobTarget.Colocated(TableName, keyTuple),
56+
ComputeTests.NodeNameJob);
5757

5858
var primaryNodeName = await primaryNodeNameExec.GetResultAsync();
5959

modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -360,10 +360,13 @@ public async Task TestExecuteColocatedTupleKeyRoutesRequestToPrimaryNode(int key
360360
var key = new IgniteTuple { ["ID"] = keyId };
361361

362362
// Warm up.
363-
await client.Compute.SubmitColocatedAsync<object?>(FakeServer.ExistingTableName, key, new("job"));
363+
var jobTarget = JobTarget.Colocated(FakeServer.ExistingTableName, key);
364+
var jobDescriptor = new JobDescriptor<object?>("job");
365+
366+
await client.Compute.SubmitAsync(jobTarget, jobDescriptor);
364367

365368
await AssertOpOnNode(
366-
_ => client.Compute.SubmitColocatedAsync<object?>(FakeServer.ExistingTableName, key, new("job")),
369+
_ => client.Compute.SubmitAsync(jobTarget, jobDescriptor),
367370
ClientOp.ComputeExecuteColocated,
368371
expectedNode);
369372
}
@@ -376,13 +379,14 @@ public async Task TestExecuteColocatedObjectKeyRoutesRequestToPrimaryNode(int ke
376379
var expectedNode = node == 1 ? _server1 : _server2;
377380
var key = new SimpleKey(keyId);
378381

382+
var jobTarget = JobTarget.Colocated(FakeServer.ExistingTableName, key);
383+
var jobDescriptor = new JobDescriptor<object?>("job");
384+
379385
// Warm up.
380-
await client.Compute.SubmitColocatedAsync<object?, SimpleKey>(
381-
FakeServer.ExistingTableName, key, new("job"));
386+
await client.Compute.SubmitAsync(jobTarget, jobDescriptor);
382387

383388
await AssertOpOnNode(
384-
_ => client.Compute.SubmitColocatedAsync<object?, SimpleKey>(
385-
FakeServer.ExistingTableName, key, new("job")),
389+
_ => client.Compute.SubmitAsync(jobTarget, jobDescriptor),
386390
ClientOp.ComputeExecuteColocated,
387391
expectedNode);
388392
}

modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,8 @@ public async Task TestCustomColocationColumnOrder([Values(true, false)] bool rev
186186
var schemas = table.GetFieldValue<IDictionary<int, Task<Schema>>>("_schemas");
187187
var schema = schemas[1].GetAwaiter().GetResult();
188188
var clusterNodes = await Client.GetClusterNodesAsync();
189+
var jobTarget = JobTarget.AnyNode(clusterNodes);
190+
var job = new JobDescriptor<int>(TableRowColocationHashJob);
189191

190192
for (int i = 0; i < 100; i++)
191193
{
@@ -194,12 +196,7 @@ public async Task TestCustomColocationColumnOrder([Values(true, false)] bool rev
194196
using var writer = ProtoCommon.GetMessageWriter();
195197
var (clientColocationHash, _) = ser.Write(writer, null, schema, key);
196198

197-
var serverColocationHashExec = await Client.Compute.SubmitAsync<int>(
198-
clusterNodes,
199-
new(TableRowColocationHashJob),
200-
tableName,
201-
i);
202-
199+
var serverColocationHashExec = await Client.Compute.SubmitAsync(jobTarget, job, tableName, i);
203200
var serverColocationHash = await serverColocationHashExec.GetResultAsync();
204201

205202
Assert.AreEqual(serverColocationHash, clientColocationHash, key.ToString());
@@ -334,11 +331,11 @@ private async Task AssertClientAndServerHashesAreEqual(int timePrecision = 9, in
334331

335332
private async Task<int> GetServerHash(byte[] bytes, int count, int timePrecision, int timestampPrecision)
336333
{
337-
var nodes = await Client.GetClusterNodesAsync();
334+
var target = JobTarget.AnyNode(await Client.GetClusterNodesAsync());
338335

339-
IJobExecution<int> jobExecution = await Client.Compute.SubmitAsync<int>(
340-
nodes,
341-
new(ColocationHashJob),
336+
IJobExecution<int> jobExecution = await Client.Compute.SubmitAsync(
337+
target,
338+
new JobDescriptor<int>(ColocationHashJob),
342339
count,
343340
bytes,
344341
timePrecision,

modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ namespace Apache.Ignite.Tests.Table;
2323
using System.Linq;
2424
using System.Threading.Tasks;
2525
using Compute;
26+
using Ignite.Compute;
2627
using Ignite.Table;
2728
using NUnit.Framework;
2829

@@ -95,8 +96,9 @@ public async Task TestClientUsesLatestSchemaOnWriteDropColumn([ValueSource(nameo
9596
break;
9697

9798
case TestMode.Compute:
98-
await Client.Compute.SubmitColocatedAsync<string>(
99-
table.Name, rec2, new(ComputeTests.NodeNameJob));
99+
await Client.Compute.SubmitAsync(
100+
JobTarget.Colocated(table.Name, rec2),
101+
ComputeTests.NodeNameJob);
100102
break;
101103

102104
default:
@@ -152,8 +154,9 @@ public async Task TestClientUsesLatestSchemaOnWriteAddColumn([ValueSource(nameof
152154

153155
case TestMode.Compute:
154156
// ExecuteColocated requires key part only.
155-
await Client.Compute.SubmitColocatedAsync<string>(
156-
table.Name, rec, new(ComputeTests.NodeNameJob));
157+
await Client.Compute.SubmitAsync(
158+
JobTarget.Colocated(table.Name, rec),
159+
ComputeTests.NodeNameJob);
157160
break;
158161

159162
default:
@@ -292,8 +295,9 @@ public async Task TestClientUsesLatestSchemaOnWritePoco([ValueSource(nameof(Test
292295
break;
293296

294297
case TestMode.Compute:
295-
var jobExecution = await Client.Compute.SubmitColocatedAsync<string, Poco>(
296-
table.Name, new Poco(1, "foo"), new(ComputeTests.NodeNameJob));
298+
var jobExecution = await Client.Compute.SubmitAsync(
299+
JobTarget.Colocated(table.Name, new Poco(1, "foo")),
300+
ComputeTests.NodeNameJob);
297301

298302
await jobExecution.GetResultAsync();
299303

modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ public static async Task WaitForConditionAsync(
8181
public static void SetFieldValue(this object obj, string fieldName, object? value) =>
8282
GetNonPublicField(obj, fieldName).SetValue(obj, value);
8383

84+
public static bool IsRecordClass(this Type type) =>
85+
type.GetMethods().Any(m => m.Name == "<Clone>$" && m.ReturnType == type);
86+
8487
public static ILoggerFactory GetConsoleLoggerFactory(LogLevel minLevel) => new ConsoleLogger(minLevel);
8588

8689
public static void CheckByteArrayPoolLeak(int timeoutMs = 1000)

modules/platforms/dotnet/Apache.Ignite.Tests/ToStringTests.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,7 @@ public void TestAllPublicFacingTypesHaveConsistentToString()
5858
continue;
5959
}
6060

61-
if (code.Contains("record " + GetCleanTypeName(type)) ||
62-
code.Contains("record struct " + GetCleanTypeName(type)))
61+
if (code.Contains("record struct " + GetCleanTypeName(type)))
6362
{
6463
// records provide property-based ToString() in the same format we use.
6564
continue;
@@ -120,7 +119,7 @@ private static IEnumerable<Type> GetPublicFacingTypes()
120119
continue;
121120
}
122121

123-
if (type.IsInterface || type.IsAbstract || type.IsEnum)
122+
if (type.IsInterface || type.IsAbstract || type.IsEnum || type.IsRecordClass())
124123
{
125124
continue;
126125
}

modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public enum ClientOperationType
117117
TupleContainsKey,
118118

119119
/// <summary>
120-
/// Compute (<see cref="ICompute.SubmitAsync{T}"/>, <see cref="ICompute.SubmitBroadcast{T}"/>).
120+
/// Compute (<see cref="ICompute.SubmitAsync{TTarget,TResult}"/>, <see cref="ICompute.SubmitBroadcast{T}"/>).
121121
/// </summary>
122122
ComputeExecute,
123123

0 commit comments

Comments
 (0)