blob: 7cf330764329b1c611321612f9d3e146e67edbef [file] [edit]
// <copyright file="EventDispatcher.cs" company="Selenium Committers">
// Licensed to the Software Freedom Conservancy (SFC) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The SFC licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// </copyright>
using System.Collections.Concurrent;
using System.Text.Json;
using System.Text.Json.Serialization.Metadata;
using OpenQA.Selenium.Internal.Logging;
namespace OpenQA.Selenium.BiDi;
internal sealed class EventDispatcher : IAsyncDisposable
{
private static readonly ILogger _logger = Internal.Logging.Log.GetLogger<EventDispatcher>();
private readonly Func<ImmutableArray<string>, Session.SubscribeOptions?, CancellationToken, Task<Session.SubscribeResult>> _wireSubscribe;
private readonly Func<ImmutableArray<Session.Subscription>, Session.UnsubscribeByIdOptions?, CancellationToken, Task<Session.UnsubscribeResult>> _wireUnsubscribe;
private readonly IBiDi _bidi;
private readonly ConcurrentDictionary<string, EventSlot> _events = new();
public EventDispatcher(
Func<ImmutableArray<string>, Session.SubscribeOptions?, CancellationToken, Task<Session.SubscribeResult>> wireSubscribe,
Func<ImmutableArray<Session.Subscription>, Session.UnsubscribeByIdOptions?, CancellationToken, Task<Session.UnsubscribeResult>> wireUnsubscribe,
IBiDi bidi)
{
_wireSubscribe = wireSubscribe;
_wireUnsubscribe = wireUnsubscribe;
_bidi = bidi;
}
public Task<ISubscription> SubscribeAsync<TEventArgs>(
EventDescriptor<TEventArgs> descriptor,
Func<TEventArgs, ValueTask> handler,
ImmutableArray<BrowsingContext.BrowsingContext>? contexts = null,
Func<TEventArgs, bool>? filter = null,
CancellationToken cancellationToken = default)
where TEventArgs : EventArgs
{
return SubscribeAsync<TEventArgs>([descriptor], handler, contexts, filter, cancellationToken);
}
public async Task<ISubscription> SubscribeAsync<TEventArgs>(
IEnumerable<EventDescriptor> descriptors,
Func<TEventArgs, ValueTask> handler,
ImmutableArray<BrowsingContext.BrowsingContext>? contexts = null,
Func<TEventArgs, bool>? filter = null,
CancellationToken cancellationToken = default)
where TEventArgs : EventArgs
{
var (subscribeResult, slots) = await SubscribeCoreAsync(descriptors, contexts, null, cancellationToken).ConfigureAwait(false);
ISubscriptionSink subscription = null!;
subscription = new Subscription<TEventArgs>(
ct => UnsubscribeAsync(subscribeResult, slots, subscription, ct),
handler,
filter);
foreach (var slot in slots)
{
slot.Add(subscription);
}
return (ISubscription)subscription;
}
public Task<EventStream<TEventArgs>> SubscribeReaderAsync<TEventArgs>(
EventDescriptor<TEventArgs> descriptor,
ImmutableArray<BrowsingContext.BrowsingContext>? contexts = null,
Func<TEventArgs, bool>? filter = null,
CancellationToken cancellationToken = default)
where TEventArgs : EventArgs
{
return SubscribeReaderAsync<TEventArgs>([descriptor], contexts, filter, cancellationToken);
}
public async Task<EventStream<TEventArgs>> SubscribeReaderAsync<TEventArgs>(
IEnumerable<EventDescriptor> descriptors,
ImmutableArray<BrowsingContext.BrowsingContext>? contexts = null,
Func<TEventArgs, bool>? filter = null,
CancellationToken cancellationToken = default)
where TEventArgs : EventArgs
{
var (subscribeResult, slots) = await SubscribeCoreAsync(descriptors, contexts, null, cancellationToken).ConfigureAwait(false);
ISubscriptionSink subscription = null!;
subscription = new EventStream<TEventArgs>(
ct => UnsubscribeAsync(subscribeResult, slots, subscription, ct),
filter,
cancellationToken);
foreach (var slot in slots)
{
slot.Add(subscription);
}
return (EventStream<TEventArgs>)subscription;
}
public bool TryDeserializeAndDispatch(string method, ref Utf8JsonReader paramsReader)
{
if (!_events.TryGetValue(method, out var slot))
{
return false;
}
var eventParams = JsonSerializer.Deserialize(ref paramsReader, slot.JsonTypeInfo)
?? throw new BiDiException("Remote end returned null event args in the 'params' property.");
var eventArgs = slot.ArgsFactory(eventParams);
foreach (var subscription in slot.GetSnapshot())
{
try
{
subscription.Deliver(eventArgs);
}
catch (Exception ex)
{
_logger.Error($"Failed to deliver '{method}' event to subscription: {ex.Message}");
subscription.Complete(ex);
}
}
return true;
}
public async Task CompleteAllAsync(Exception? error)
{
foreach (var slot in _events.Values)
{
foreach (var subscription in slot.GetSnapshot())
{
subscription.Complete(error);
}
}
foreach (var slot in _events.Values)
{
foreach (var subscription in slot.GetSnapshot())
{
try
{
await subscription.DisposeAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.Warn($"Subscription disposal failed during shutdown: {ex.Message}");
}
}
}
}
public async ValueTask DisposeAsync()
{
await CompleteAllAsync(null).ConfigureAwait(false);
}
private async Task<(Session.Subscription SubscribeResult, EventSlot[] Slots)> SubscribeCoreAsync(
IEnumerable<EventDescriptor> descriptors,
ImmutableArray<BrowsingContext.BrowsingContext>? contexts,
ImmutableArray<Browser.UserContext>? userContexts,
CancellationToken cancellationToken)
{
var uniqueNames = new HashSet<string>();
var names = new List<string>();
var slots = new List<EventSlot>();
foreach (var descriptor in descriptors)
{
if (uniqueNames.Add(descriptor.Name))
{
names.Add(descriptor.Name);
slots.Add(GetOrCreateSlot(descriptor));
}
}
if (names.Count == 0)
{
throw new ArgumentException("At least one event descriptor must be provided.", nameof(descriptors));
}
var subscribeResult = await _wireSubscribe([.. names], new() { Contexts = contexts, UserContexts = userContexts }, cancellationToken)
.ConfigureAwait(false);
return (subscribeResult.Subscription, slots.ToArray());
}
private EventSlot GetOrCreateSlot(EventDescriptor descriptor)
{
return _events.GetOrAdd(descriptor.Name, _ =>
{
if (descriptor.JsonTypeInfo is null || descriptor.ArgsFactory is null)
{
throw new InvalidOperationException($"Event '{descriptor.Name}' does not have registration metadata.");
}
var bidi = _bidi;
var argsFactory = descriptor.ArgsFactory;
return new EventSlot(descriptor.JsonTypeInfo, ep => argsFactory(bidi, ep));
});
}
private async ValueTask UnsubscribeAsync(Session.Subscription subscriptionId, EventSlot[] slots, ISubscriptionSink subscription, CancellationToken cancellationToken)
{
try
{
await _wireUnsubscribe([subscriptionId], null, cancellationToken).ConfigureAwait(false);
}
finally
{
foreach (var slot in slots)
{
slot.Remove(subscription);
}
}
}
private sealed class EventSlot
{
public JsonTypeInfo JsonTypeInfo { get; }
public Func<object, EventArgs> ArgsFactory { get; }
private readonly object _lock = new();
private volatile ISubscriptionSink[] _subscriptions = [];
public EventSlot(JsonTypeInfo jsonTypeInfo, Func<object, EventArgs> argsFactory)
{
JsonTypeInfo = jsonTypeInfo;
ArgsFactory = argsFactory;
}
public ISubscriptionSink[] GetSnapshot() => _subscriptions;
public void Add(ISubscriptionSink subscription)
{
lock (_lock) _subscriptions = [.. _subscriptions, subscription];
}
public void Remove(ISubscriptionSink subscription)
{
lock (_lock) _subscriptions = Array.FindAll(_subscriptions, s => s != subscription);
}
}
}