Skip to content

Commit 4164f86

Browse files
committed
consolidating implementations
1 parent d617346 commit 4164f86

File tree

3 files changed

+87
-125
lines changed

3 files changed

+87
-125
lines changed

src/Cassandra/Orleans.Clustering.Cassandra/CassandraClusteringTable.cs

Lines changed: 3 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -49,82 +49,10 @@ async Task IMembershipTable.InitializeMembershipTable(bool tryInitTableVersion)
4949

5050
_queries = await OrleansQueries.CreateInstance(_session);
5151

52-
if (!await DoesTableAlreadyExistAsync())
53-
{
54-
try
55-
{
56-
await MakeTableAsync();
57-
}
58-
catch (WriteTimeoutException) // If there's contention on table creation, backoff a bit and try once more
59-
{
60-
// Randomize the delay to avoid contention, preferring that more instances will wait longer
61-
var nextSingle = Random.Shared.NextSingle();
62-
await Task.Delay(TimeSpan.FromSeconds(20) * Math.Sqrt(nextSingle));
63-
64-
if (!await DoesTableAlreadyExistAsync())
65-
{
66-
await MakeTableAsync();
67-
}
68-
}
69-
}
52+
await _queries.EnsureTableExistsAsync(_ttlSeconds);
7053

71-
72-
if (tryInitTableVersion && !await DoesClusterVersionAlreadyExistAsync(_identifier))
73-
{
74-
try
75-
{
76-
await Session.ExecuteAsync(await Queries.InsertMembershipVersion(_identifier));
77-
}
78-
catch (WriteTimeoutException) // If there's contention on table creation, backoff a bit and try once more
79-
{
80-
// Randomize the delay to avoid contention, preferring that more instances will wait longer
81-
var nextSingle = Random.Shared.NextSingle();
82-
await Task.Delay(TimeSpan.FromSeconds(20) * Math.Sqrt(nextSingle));
83-
84-
if (!await DoesClusterVersionAlreadyExistAsync(_identifier))
85-
{
86-
await Session.ExecuteAsync(await Queries.InsertMembershipVersion(_identifier));
87-
}
88-
}
89-
}
90-
}
91-
92-
private async Task MakeTableAsync()
93-
{
94-
await Session.ExecuteAsync(Queries.EnsureTableExists(_ttlSeconds));
95-
await Session.ExecuteAsync(Queries.EnsureIndexExists);
96-
}
97-
98-
private async Task<bool> DoesTableAlreadyExistAsync()
99-
{
100-
try
101-
{
102-
var resultSet = await Session.ExecuteAsync(Queries.CheckIfTableExists(Session.Keyspace, ConsistencyLevel.LocalOne));
103-
return resultSet.Any();
104-
}
105-
catch (UnavailableException)
106-
{
107-
var resultSet = await Session.ExecuteAsync(Queries.CheckIfTableExists(Session.Keyspace, ConsistencyLevel.One));
108-
return resultSet.Any();
109-
}
110-
catch (UnauthorizedException)
111-
{
112-
return false;
113-
}
114-
}
115-
116-
private async Task<bool> DoesClusterVersionAlreadyExistAsync(string clusterIdentifier)
117-
{
118-
try
119-
{
120-
var resultSet = await Session.ExecuteAsync(Queries.CheckIfClusterVersionExists(clusterIdentifier, ConsistencyLevel.LocalOne));
121-
return resultSet.Any();
122-
}
123-
catch (UnavailableException)
124-
{
125-
var resultSet = await Session.ExecuteAsync(Queries.CheckIfClusterVersionExists(clusterIdentifier, ConsistencyLevel.One));
126-
return resultSet.Any();
127-
}
54+
if (tryInitTableVersion)
55+
await _queries.EnsureClusterVersionExistsAsync(_identifier);
12856
}
12957

13058
async Task IMembershipTable.DeleteMembershipTableEntries(string clusterId)

src/Cassandra/Orleans.Clustering.Cassandra/CassandraGatewayListProvider.cs

Lines changed: 2 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using System;
22
using System.Collections.Generic;
3-
using System.Linq;
43
using System.Net;
54
using System.Threading.Tasks;
65
using Cassandra;
@@ -58,56 +57,9 @@ async Task IGatewayListProvider.InitializeGatewayListProvider()
5857

5958
_queries = await OrleansQueries.CreateInstance(_session);
6059

61-
if (await DoesTableAlreadyExistAsync())
62-
{
63-
return;
64-
}
65-
66-
try
67-
{
68-
await MakeTableAsync();
69-
}
70-
catch (WriteTimeoutException) // If there's contention on table creation, backoff a bit and try once more
71-
{
72-
// Randomize the delay to avoid contention, preferring that more instances will wait longer
73-
var nextSingle = Random.Shared.NextSingle();
74-
await Task.Delay(TimeSpan.FromSeconds(10) * Math.Sqrt(nextSingle));
75-
76-
if (await DoesTableAlreadyExistAsync())
77-
{
78-
return;
79-
}
80-
81-
await MakeTableAsync();
82-
}
83-
}
84-
85-
86-
87-
private async Task MakeTableAsync()
88-
{
89-
await Session.ExecuteAsync(Queries.EnsureTableExists(_ttlSeconds));
90-
await Session.ExecuteAsync(Queries.EnsureIndexExists);
91-
}
92-
93-
private async Task<bool> DoesTableAlreadyExistAsync()
94-
{
95-
try
96-
{
97-
var resultSet = await Session.ExecuteAsync(Queries.CheckIfTableExists(Session.Keyspace, ConsistencyLevel.LocalOne));
98-
return resultSet.Any();
99-
}
100-
catch (UnavailableException)
101-
{
102-
var resultSet = await Session.ExecuteAsync(Queries.CheckIfTableExists(Session.Keyspace, ConsistencyLevel.One));
103-
return resultSet.Any();
104-
}
105-
catch (UnauthorizedException)
106-
{
107-
return false;
108-
}
60+
await _queries.EnsureTableExistsAsync(_ttlSeconds);
10961
}
110-
62+
11163
async Task<IList<Uri>> IGatewayListProvider.GetGateways()
11264
{
11365
if (_cachedResult is not null && _cacheUntil > DateTime.UtcNow)

src/Cassandra/Orleans.Clustering.Cassandra/OrleansQueries.cs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,88 @@ private OrleansQueries(ISession session)
4040
Session = session;
4141
}
4242

43+
internal async Task EnsureTableExistsAsync(int? ttl)
44+
{
45+
if (!await DoesTableAlreadyExistAsync())
46+
{
47+
try
48+
{
49+
await MakeTableAsync(ttl);
50+
}
51+
catch (WriteTimeoutException) // If there's contention on table creation, backoff a bit and try once more
52+
{
53+
// Randomize the delay to avoid contention, preferring that more instances will wait longer
54+
var nextSingle = Random.Shared.NextSingle();
55+
await Task.Delay(TimeSpan.FromSeconds(20) * Math.Sqrt(nextSingle));
56+
57+
if (!await DoesTableAlreadyExistAsync())
58+
{
59+
await MakeTableAsync(ttl);
60+
}
61+
}
62+
}
63+
}
64+
65+
internal async Task EnsureClusterVersionExistsAsync(string clusterIdentifier)
66+
{
67+
if (!await DoesClusterVersionAlreadyExistAsync(clusterIdentifier))
68+
{
69+
try
70+
{
71+
await Session.ExecuteAsync(await InsertMembershipVersion(clusterIdentifier));
72+
}
73+
catch (WriteTimeoutException) // If there's contention on table creation, backoff a bit and try once more
74+
{
75+
// Randomize the delay to avoid contention, preferring that more instances will wait longer
76+
var nextSingle = Random.Shared.NextSingle();
77+
await Task.Delay(TimeSpan.FromSeconds(20) * Math.Sqrt(nextSingle));
78+
79+
if (!await DoesClusterVersionAlreadyExistAsync(clusterIdentifier))
80+
{
81+
await Session.ExecuteAsync(await InsertMembershipVersion(clusterIdentifier));
82+
}
83+
}
84+
}
85+
}
86+
87+
private async Task<bool> DoesClusterVersionAlreadyExistAsync(string clusterIdentifier)
88+
{
89+
try
90+
{
91+
var resultSet = await Session.ExecuteAsync(CheckIfClusterVersionExists(clusterIdentifier, ConsistencyLevel.LocalOne));
92+
return resultSet.Any();
93+
}
94+
catch (UnavailableException)
95+
{
96+
var resultSet = await Session.ExecuteAsync(CheckIfClusterVersionExists(clusterIdentifier, ConsistencyLevel.One));
97+
return resultSet.Any();
98+
}
99+
}
100+
101+
private async Task<bool> DoesTableAlreadyExistAsync()
102+
{
103+
try
104+
{
105+
var resultSet = await Session.ExecuteAsync(CheckIfTableExists(Session.Keyspace, ConsistencyLevel.LocalOne));
106+
return resultSet.Any();
107+
}
108+
catch (UnavailableException)
109+
{
110+
var resultSet = await Session.ExecuteAsync(CheckIfTableExists(Session.Keyspace, ConsistencyLevel.One));
111+
return resultSet.Any();
112+
}
113+
catch (UnauthorizedException)
114+
{
115+
return false;
116+
}
117+
}
118+
119+
private async Task MakeTableAsync(int? ttlSeconds)
120+
{
121+
await Session.ExecuteAsync(EnsureTableExists(ttlSeconds));
122+
await Session.ExecuteAsync(EnsureIndexExists);
123+
}
124+
43125
public ConsistencyLevel MembershipWriteConsistencyLevel { get; set; }
44126

45127
public ConsistencyLevel MembershipReadConsistencyLevel { get; set; }

0 commit comments

Comments
 (0)