using System.Text; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; using NodaTime; using NodaTime.Serialization.JsonNet; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace FictionArchive.Service.Shared.Services.EventBus.Implementations; public class RabbitMQEventBus : IEventBus, IHostedService { private readonly IServiceScopeFactory _serviceScopeFactory; private readonly RabbitMQConnectionProvider _connectionProvider; private readonly RabbitMQOptions _options; private readonly SubscriptionManager _subscriptionManager; private readonly ILogger _logger; private readonly JsonSerializerSettings _jsonSerializerSettings; private const string ExchangeName = "fiction-archive-event-bus"; public RabbitMQEventBus(IServiceScopeFactory serviceScopeFactory, RabbitMQConnectionProvider connectionProvider, IOptions options, SubscriptionManager subscriptionManager, ILogger logger) { _serviceScopeFactory = serviceScopeFactory; _connectionProvider = connectionProvider; _subscriptionManager = subscriptionManager; _logger = logger; _options = options.Value; _jsonSerializerSettings = new JsonSerializerSettings().ConfigureForNodaTime(DateTimeZoneProviders.Tzdb); } public async Task Publish(TEvent integrationEvent) where TEvent : IntegrationEvent { var routingKey = typeof(TEvent).Name; var channel = await _connectionProvider.GetDefaultChannelAsync(); // Set integration event values integrationEvent.CreatedAt = Instant.FromDateTimeUtc(DateTime.UtcNow); integrationEvent.EventId = Guid.NewGuid(); var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(integrationEvent)); await channel.BasicPublishAsync(ExchangeName, routingKey, true, body); _logger.LogInformation("Published event {EventName}", routingKey); } public async Task StartAsync(CancellationToken cancellationToken) { _ = Task.Factory.StartNew(async () => { try { var channel = await _connectionProvider.GetDefaultChannelAsync(); await channel.ExchangeDeclareAsync(ExchangeName, ExchangeType.Direct, cancellationToken: cancellationToken); await channel.QueueDeclareAsync(_options.ClientIdentifier, true, false, false, cancellationToken: cancellationToken); var consumer = new AsyncEventingBasicConsumer(channel); consumer.ReceivedAsync += (sender, @event) => { return OnReceivedEvent(sender, @event, channel); }; foreach (var subscription in _subscriptionManager.Subscriptions) { await channel.QueueBindAsync(_options.ClientIdentifier, ExchangeName, subscription.Key, cancellationToken: cancellationToken); _logger.LogInformation("Subscribed to {SubscriptionKey}", subscription.Key); } await channel.BasicConsumeAsync(_options.ClientIdentifier, false, consumer, cancellationToken: cancellationToken); _logger.LogInformation("RabbitMQ EventBus started."); } catch (Exception e) { _logger.LogError(e, "An error occurred while starting the RabbitMQ EventBus"); } }, cancellationToken); } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } private async Task OnReceivedEvent(object sender, BasicDeliverEventArgs @event, IChannel channel) { var eventName = @event.RoutingKey; _logger.LogInformation("Received event {EventName}", eventName); try { if (!_subscriptionManager.Subscriptions.ContainsKey(eventName)) { _logger.LogWarning("Received event without subscription entry."); return; } var eventBody = Encoding.UTF8.GetString(@event.Body.Span); var eventObject = JsonConvert.DeserializeObject(eventBody, _subscriptionManager.Subscriptions[eventName], _jsonSerializerSettings) as IntegrationEvent; using var scope = _serviceScopeFactory.CreateScope(); foreach (var service in scope.ServiceProvider.GetKeyedServices(eventName)) { await service.Handle(eventObject); } _logger.LogInformation("Finished handling event with name {EventName}", eventName); } catch (Exception e) { _logger.LogError(e, "An error occurred while handling an event."); } finally { await channel.BasicAckAsync(@event.DeliveryTag, false); } } }