using System.Text; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; using NodaTime; using NodaTime.Serialization.JsonNet; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace FictionArchive.Service.Shared.Services.EventBus.Implementations; public class RabbitMQEventBus : IEventBus, IHostedService { private readonly IServiceScopeFactory _serviceScopeFactory; private readonly RabbitMQConnectionProvider _connectionProvider; private readonly RabbitMQOptions _options; private readonly SubscriptionManager _subscriptionManager; private readonly ILogger _logger; private readonly JsonSerializerSettings _jsonSerializerSettings; private const string ExchangeName = "fiction-archive-event-bus"; private const string CreatedAtHeader = "X-Created-At"; private const string EventIdHeader = "X-Event-Id"; public RabbitMQEventBus(IServiceScopeFactory serviceScopeFactory, RabbitMQConnectionProvider connectionProvider, IOptions options, SubscriptionManager subscriptionManager, ILogger logger) { _serviceScopeFactory = serviceScopeFactory; _connectionProvider = connectionProvider; _subscriptionManager = subscriptionManager; _logger = logger; _options = options.Value; _jsonSerializerSettings = new JsonSerializerSettings().ConfigureForNodaTime(DateTimeZoneProviders.Tzdb); } public async Task Publish(TEvent integrationEvent) where TEvent : IIntegrationEvent { var routingKey = typeof(TEvent).Name; await Publish(integrationEvent, routingKey); } public async Task Publish(object integrationEvent, string eventType) { var channel = await _connectionProvider.GetDefaultChannelAsync(); var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(integrationEvent)); // headers var props = new BasicProperties(); props.Headers = new Dictionary() { { CreatedAtHeader, Instant.FromDateTimeUtc(DateTime.UtcNow).ToString() }, { EventIdHeader, Guid.NewGuid().ToString() } }; await channel.BasicPublishAsync(ExchangeName, eventType, true, props, body); _logger.LogInformation("Published event {EventName}", eventType); } public async Task StartAsync(CancellationToken cancellationToken) { _ = Task.Factory.StartNew(async () => { try { var channel = await _connectionProvider.GetDefaultChannelAsync(); await channel.ExchangeDeclareAsync(ExchangeName, ExchangeType.Direct, cancellationToken: cancellationToken); await channel.QueueDeclareAsync(_options.ClientIdentifier, true, false, false, cancellationToken: cancellationToken); var consumer = new AsyncEventingBasicConsumer(channel); consumer.ReceivedAsync += (sender, @event) => { return OnReceivedEvent(sender, @event, channel); }; await channel.BasicConsumeAsync(_options.ClientIdentifier, false, consumer, cancellationToken: cancellationToken); foreach (var subscription in _subscriptionManager.Subscriptions) { await channel.QueueBindAsync(_options.ClientIdentifier, ExchangeName, subscription.Key, cancellationToken: cancellationToken); _logger.LogInformation("Subscribed to {SubscriptionKey}", subscription.Key); } _logger.LogInformation("RabbitMQ EventBus started."); } catch (Exception e) { _logger.LogError(e, "An error occurred while starting the RabbitMQ EventBus"); } }, cancellationToken); } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } private async Task OnReceivedEvent(object sender, BasicDeliverEventArgs @event, IChannel channel) { var eventName = @event.RoutingKey; _logger.LogInformation("Received event {EventName}", eventName); try { if (!_subscriptionManager.Subscriptions.ContainsKey(eventName)) { _logger.LogWarning("Received event without subscription entry."); return; } var eventBody = Encoding.UTF8.GetString(@event.Body.Span); var eventObject = JsonConvert.DeserializeObject(eventBody, _subscriptionManager.Subscriptions[eventName], _jsonSerializerSettings) as IIntegrationEvent; using var scope = _serviceScopeFactory.CreateScope(); foreach (var service in scope.ServiceProvider.GetKeyedServices(eventName)) { await service.Handle(eventObject); } _logger.LogInformation("Finished handling event with name {EventName}", eventName); } catch (Exception e) { _logger.LogError(e, "An error occurred while handling an event."); } finally { await channel.BasicAckAsync(@event.DeliveryTag, false); } } }