Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Realtime/Binding.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using Supabase.Realtime.Interfaces;
using Supabase.Realtime.PostgresChanges;

namespace Supabase.Realtime;

public class Binding

Check warning on line 6 in Realtime/Binding.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Missing XML comment for publicly visible type or member 'Binding'
{
public PostgresChangesOptions? Options { get; set; }

Check warning on line 8 in Realtime/Binding.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Missing XML comment for publicly visible type or member 'Binding.Options'

public IRealtimeChannel.PostgresChangesHandler? Handler { get; set; }

Check warning on line 10 in Realtime/Binding.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Missing XML comment for publicly visible type or member 'Binding.Handler'

public PostgresChangesOptions.ListenType? ListenType { get; set; }

Check warning on line 12 in Realtime/Binding.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Missing XML comment for publicly visible type or member 'Binding.ListenType'

public int? Id { get; set; }

Check warning on line 14 in Realtime/Binding.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Missing XML comment for publicly visible type or member 'Binding.Id'
}
30 changes: 28 additions & 2 deletions Realtime/PostgresChanges/PostgresChangesOptions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Newtonsoft.Json;
using System;
using Newtonsoft.Json;
using Supabase.Core.Attributes;
using System.Collections.Generic;

Expand Down Expand Up @@ -78,7 +79,7 @@
/// </summary>
[JsonProperty("event")]
public string Event => Core.Helpers.GetMappedToAttr(_listenType).Mapping!;

private readonly ListenType _listenType;

/// <summary>
Expand All @@ -97,4 +98,29 @@
Filter = filter;
Parameters = parameters;
}

private bool Equals(PostgresChangesOptions other)
{
return _listenType == other._listenType && Schema == other.Schema && Table == other.Table && Filter == other.Filter;
}

public override bool Equals(object? obj)

Check warning on line 107 in Realtime/PostgresChanges/PostgresChangesOptions.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Missing XML comment for publicly visible type or member 'PostgresChangesOptions.Equals(object?)'
{
if (obj is null) return false;
if (ReferenceEquals(this, obj)) return true;
if (obj.GetType() != GetType()) return false;
return Equals((PostgresChangesOptions)obj);
}

public override int GetHashCode()
{
unchecked
{
var hashCode = (int)_listenType;
hashCode = (hashCode * 397) ^ Schema.GetHashCode();
hashCode = (hashCode * 397) ^ (Table != null ? Table.GetHashCode() : 0);
hashCode = (hashCode * 397) ^ (Filter != null ? Filter.GetHashCode() : 0);
return hashCode;
}
}
}
6 changes: 5 additions & 1 deletion Realtime/PostgresChanges/PostgresChangesResponse.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Newtonsoft.Json;
using System.Collections.Generic;
using Newtonsoft.Json;
using Supabase.Postgrest.Models;
using Supabase.Realtime.Socket;

Expand Down Expand Up @@ -73,4 +74,7 @@
/// </summary>
[JsonProperty("data")]
public SocketResponsePayload<T>? Data { get; set; }

[JsonProperty("ids")]
public List<int?> Ids { get; set; }

Check warning on line 79 in Realtime/PostgresChanges/PostgresChangesResponse.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Non-nullable property 'Ids' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.
}
128 changes: 114 additions & 14 deletions Realtime/RealtimeChannel.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using System.Timers;
Expand Down Expand Up @@ -162,6 +163,8 @@ public class RealtimeChannel : IRealtimeChannel
private readonly Timer _rejoinTimer;
private bool _isRejoining;

private List<Binding> _bindings = [];

/// <summary>
/// Initializes a Channel - must call `Subscribe()` to receive events.
/// </summary>
Expand Down Expand Up @@ -331,10 +334,12 @@ private void NotifyMessageReceived(SocketResponse message)
public void AddPostgresChangeHandler(ListenType listenType, PostgresChangesHandler postgresChangeHandler)
{
if (!_postgresChangesHandlers.ContainsKey(listenType))
_postgresChangesHandlers[listenType] = new List<PostgresChangesHandler>();
_postgresChangesHandlers[listenType] = [];

if (!_postgresChangesHandlers[listenType].Contains(postgresChangeHandler))
_postgresChangesHandlers[listenType].Add(postgresChangeHandler);
if (_postgresChangesHandlers[listenType].Contains(postgresChangeHandler)) return;

_postgresChangesHandlers[listenType].Add(postgresChangeHandler);
BindPostgresChangesHandler(listenType, postgresChangeHandler);
}

/// <summary>
Expand All @@ -346,14 +351,20 @@ public void RemovePostgresChangeHandler(ListenType listenType, PostgresChangesHa
{
if (_postgresChangesHandlers.ContainsKey(listenType) &&
_postgresChangesHandlers[listenType].Contains(postgresChangeHandler))
{
_postgresChangesHandlers[listenType].Remove(postgresChangeHandler);
RemovePostgresChangesFromBinding(listenType, postgresChangeHandler);
}
}

/// <summary>
/// Clears all postgres changes listeners.
/// </summary>
public void ClearPostgresChangeHandlers() =>
_postgresChangesHandlers.Clear();
public void ClearPostgresChangeHandlers()
{
_postgresChangesHandlers.Clear();
_bindings.Clear();
}

/// <summary>
/// Adds an error event handler.
Expand Down Expand Up @@ -407,15 +418,7 @@ private void NotifyPostgresChanges(EventType eventType, PostgresChangesResponse
_ => ListenType.All
};

// Invoke the wildcard listener (but only once)
if (listenType != ListenType.All &&
_postgresChangesHandlers.TryGetValue(ListenType.All, out var changesHandler))
foreach (var handler in changesHandler.ToArray())
handler.Invoke(this, response);

if (_postgresChangesHandlers.TryGetValue(listenType, out var postgresChangesHandler))
foreach (var handler in postgresChangesHandler.ToArray())
handler.Invoke(this, response);
InvokeProperlyHandlerFromBind(listenType, response);
}

/// <summary>
Expand All @@ -428,6 +431,8 @@ private void NotifyPostgresChanges(EventType eventType, PostgresChangesResponse
public IRealtimeChannel Register(PostgresChangesOptions postgresChangesOptions)
{
PostgresChangesOptions.Add(postgresChangesOptions);

BindPostgresChangesOptions(postgresChangesOptions);
return this;
}

Expand Down Expand Up @@ -673,6 +678,8 @@ private void HandleJoinResponse(IRealtimePush<RealtimeChannel, SocketResponse> s
Options.SerializerSettings);
if (obj?.Payload == null) return;

obj.Payload.Response?.change?.ForEach(BindIdPostgresChanges);

switch (obj.Payload.Status)
{
// A response was received from the channel
Expand Down Expand Up @@ -764,4 +771,97 @@ internal void HandleSocketMessage(SocketResponse message)
break;
}
}

/// <summary>
/// Create a Binding and add to a list
/// </summary>
/// <param name="options"></param>
private void BindPostgresChangesOptions(PostgresChangesOptions options)
{
var founded = _bindings.FirstOrDefault(b => options.Equals(b.Options));
if (founded != null) return;

_bindings.Add(
new Binding
{
Options = options,
}
);
}

/// <summary>
/// Try to bind a PostgresChangesHandler to a PostgresChangesOptions
/// </summary>
/// <param name="listenType"></param>
/// <param name="handler"></param>
private void BindPostgresChangesHandler(ListenType listenType, PostgresChangesHandler handler)
{
var founded = _bindings.FirstOrDefault(b =>
(b.Options?.Event == Core.Helpers.GetMappedToAttr(listenType).Mapping || b.Options?.Event == "*") &&
b.Handler == null
);
if (founded == null) return;

founded.Handler = handler;
founded.ListenType = listenType;
}

/// <summary>
/// Filter the binding list and try to add an id from socket to its binding
/// </summary>
/// <param name="joinResponse"></param>
private void BindIdPostgresChanges(PhoenixPostgresChangeResponse joinResponse)
{
var founded = _bindings.FirstOrDefault(b => b.Options != null &&
b.Options.Event == joinResponse.eventName &&
b.Options.Table == joinResponse.table &&
b.Options.Schema == joinResponse.schema &&
b.Options.Filter == joinResponse.filter);
if (founded == null) return;
founded.Id = joinResponse?.id;
}

/// <summary>
/// Try to invoke the handler properly based on event type and socket response
/// </summary>
/// <param name="eventType"></param>
/// <param name="response"></param>
private void InvokeProperlyHandlerFromBind(ListenType eventType, PostgresChangesResponse response)
{
var all = _bindings.FirstOrDefault(b =>
{
if (b.Options == null && response.Payload == null && b.Handler == null) return false;

return response.Payload != null && response.Payload.Ids.Contains(b.Id) && eventType != ListenType.All &&
b.ListenType == ListenType.All;
});

if (all != null)
{
all.Handler?.Invoke(this, response);
return;
}

// Invoke specific handler
var result = _bindings.FirstOrDefault(b =>
{
if (b.Options == null && response.Payload == null && b.Handler == null) return false;

return response.Payload != null && response.Payload.Ids.Contains(b.Id) && b.ListenType == eventType;
});

result?.Handler?.Invoke(this, response);
}

/// <summary>
/// Remove handler from binding
/// </summary>
/// <param name="eventType"></param>
/// <param name="handler"></param>
private void RemovePostgresChangesFromBinding(ListenType eventType, PostgresChangesHandler handler)
{
var binding = _bindings.FirstOrDefault(b => b.Handler == handler && b.ListenType == eventType);
if (binding == null) return;
_bindings.Remove(binding);
}
}
2 changes: 2 additions & 0 deletions Realtime/RealtimeSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
Expand Down Expand Up @@ -88,6 +89,7 @@ public RealtimeSocket(string endpoint, ClientOptions options)
_connection = new WebsocketClient(new Uri(EndpointUrl), () =>
{
var socket = new ClientWebSocket();
if (RuntimeInformation.IsOSPlatform(OSPlatform.Create("BROWSER"))) return socket;

foreach (var header in Headers)
socket.Options.SetRequestHeader(header.Key, header.Value);
Expand Down
21 changes: 21 additions & 0 deletions Realtime/Socket/Responses/PhoenixPostgresChangeResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using Newtonsoft.Json;

namespace Supabase.Realtime.Socket.Responses;

public class PhoenixPostgresChangeResponse
{
[JsonProperty("id")]
public int? id { get; set; }

[JsonProperty("event")]
public string? eventName { get; set; }

[JsonProperty("filter")]
public string? filter { get; set; }

[JsonProperty("schema")]
public string? schema { get; set; }

[JsonProperty("table")]
public string? table { get; set; }
}
2 changes: 1 addition & 1 deletion Realtime/Socket/Responses/PhoenixResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class PhoenixResponse
/// The response.
/// </summary>
[JsonProperty("response")]
public object? Response;
public PostgresChangeResponse? Response;

/// <summary>
/// The status.
Expand Down
10 changes: 10 additions & 0 deletions Realtime/Socket/Responses/PostgresChangeResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Collections.Generic;
using Newtonsoft.Json;

namespace Supabase.Realtime.Socket.Responses;

public class PostgresChangeResponse
{
[JsonProperty("postgres_changes")]
public List<PhoenixPostgresChangeResponse> change { get; set; }

Check warning on line 9 in Realtime/Socket/Responses/PostgresChangeResponse.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Non-nullable property 'change' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.
}
Loading
Loading