136 lines
5.4 KiB
C#
136 lines
5.4 KiB
C#
using System.Text;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Hosting;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using Newtonsoft.Json;
|
|
using NodaTime;
|
|
using NodaTime.Serialization.JsonNet;
|
|
using RabbitMQ.Client;
|
|
using RabbitMQ.Client.Events;
|
|
|
|
namespace FictionArchive.Service.Shared.Services.EventBus.Implementations;
|
|
|
|
public class RabbitMQEventBus : IEventBus, IHostedService
|
|
{
|
|
private readonly IServiceScopeFactory _serviceScopeFactory;
|
|
private readonly RabbitMQConnectionProvider _connectionProvider;
|
|
private readonly RabbitMQOptions _options;
|
|
private readonly SubscriptionManager _subscriptionManager;
|
|
private readonly ILogger<RabbitMQEventBus> _logger;
|
|
|
|
private readonly JsonSerializerSettings _jsonSerializerSettings;
|
|
|
|
private const string ExchangeName = "fiction-archive-event-bus";
|
|
private const string CreatedAtHeader = "X-Created-At";
|
|
private const string EventIdHeader = "X-Event-Id";
|
|
|
|
public RabbitMQEventBus(IServiceScopeFactory serviceScopeFactory, RabbitMQConnectionProvider connectionProvider, IOptions<RabbitMQOptions> options, SubscriptionManager subscriptionManager, ILogger<RabbitMQEventBus> logger)
|
|
{
|
|
_serviceScopeFactory = serviceScopeFactory;
|
|
_connectionProvider = connectionProvider;
|
|
_subscriptionManager = subscriptionManager;
|
|
_logger = logger;
|
|
_options = options.Value;
|
|
_jsonSerializerSettings = new JsonSerializerSettings().ConfigureForNodaTime(DateTimeZoneProviders.Tzdb);
|
|
}
|
|
|
|
public async Task Publish<TEvent>(TEvent integrationEvent) where TEvent : IIntegrationEvent
|
|
{
|
|
var routingKey = typeof(TEvent).Name;
|
|
|
|
await Publish(integrationEvent, routingKey);
|
|
}
|
|
|
|
public async Task Publish(object integrationEvent, string eventType)
|
|
{
|
|
var channel = await _connectionProvider.GetDefaultChannelAsync();
|
|
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(integrationEvent));
|
|
|
|
// headers
|
|
var props = new BasicProperties();
|
|
props.Headers = new Dictionary<string, object?>()
|
|
{
|
|
{ CreatedAtHeader, Instant.FromDateTimeUtc(DateTime.UtcNow).ToString() },
|
|
{ EventIdHeader, Guid.NewGuid().ToString() }
|
|
};
|
|
|
|
await channel.BasicPublishAsync(ExchangeName, eventType, true, props, body);
|
|
_logger.LogInformation("Published event {EventName}", eventType);
|
|
}
|
|
|
|
public async Task StartAsync(CancellationToken cancellationToken)
|
|
{
|
|
_ = Task.Factory.StartNew(async () =>
|
|
{
|
|
try
|
|
{
|
|
var channel = await _connectionProvider.GetDefaultChannelAsync();
|
|
await channel.ExchangeDeclareAsync(ExchangeName, ExchangeType.Direct,
|
|
cancellationToken: cancellationToken);
|
|
|
|
await channel.QueueDeclareAsync(_options.ClientIdentifier, true, false, false,
|
|
cancellationToken: cancellationToken);
|
|
var consumer = new AsyncEventingBasicConsumer(channel);
|
|
consumer.ReceivedAsync += (sender, @event) =>
|
|
{
|
|
return OnReceivedEvent(sender, @event, channel);
|
|
};
|
|
|
|
await channel.BasicConsumeAsync(_options.ClientIdentifier, false, consumer, cancellationToken: cancellationToken);
|
|
|
|
foreach (var subscription in _subscriptionManager.Subscriptions)
|
|
{
|
|
await channel.QueueBindAsync(_options.ClientIdentifier, ExchangeName, subscription.Key,
|
|
cancellationToken: cancellationToken);
|
|
_logger.LogInformation("Subscribed to {SubscriptionKey}", subscription.Key);
|
|
}
|
|
|
|
_logger.LogInformation("RabbitMQ EventBus started.");
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
_logger.LogError(e, "An error occurred while starting the RabbitMQ EventBus");
|
|
}
|
|
}, cancellationToken);
|
|
}
|
|
|
|
public Task StopAsync(CancellationToken cancellationToken)
|
|
{
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
private async Task OnReceivedEvent(object sender, BasicDeliverEventArgs @event, IChannel channel)
|
|
{
|
|
var eventName = @event.RoutingKey;
|
|
_logger.LogInformation("Received event {EventName}", eventName);
|
|
try
|
|
{
|
|
if (!_subscriptionManager.Subscriptions.ContainsKey(eventName))
|
|
{
|
|
_logger.LogWarning("Received event without subscription entry.");
|
|
return;
|
|
}
|
|
|
|
var eventBody = Encoding.UTF8.GetString(@event.Body.Span);
|
|
var eventObject = JsonConvert.DeserializeObject(eventBody, _subscriptionManager.Subscriptions[eventName], _jsonSerializerSettings) as IIntegrationEvent;
|
|
|
|
using var scope = _serviceScopeFactory.CreateScope();
|
|
|
|
foreach (var service in scope.ServiceProvider.GetKeyedServices<IIntegrationEventHandler>(eventName))
|
|
{
|
|
await service.Handle(eventObject);
|
|
}
|
|
_logger.LogInformation("Finished handling event with name {EventName}", eventName);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
_logger.LogError(e, "An error occurred while handling an event.");
|
|
}
|
|
finally
|
|
{
|
|
await channel.BasicAckAsync(@event.DeliveryTag, false);
|
|
}
|
|
|
|
}
|
|
} |