[FA-misc] Initial MassTransit implementation seems to work

This commit is contained in:
gamer147
2026-01-26 17:08:13 -05:00
parent e7435435c1
commit 579e05b853
96 changed files with 845 additions and 1229 deletions

View File

@@ -0,0 +1,11 @@
namespace FictionArchive.Service.Shared.Contracts.Events;
public interface IChapterCreated
{
uint ChapterId { get; }
uint NovelId { get; }
uint VolumeId { get; }
uint VolumeOrder { get; }
uint ChapterOrder { get; }
string ChapterTitle { get; }
}

View File

@@ -0,0 +1,8 @@
namespace FictionArchive.Service.Shared.Contracts.Events;
public interface IChapterPullRequested
{
uint NovelId { get; }
uint VolumeId { get; }
uint ChapterOrder { get; }
}

View File

@@ -0,0 +1,8 @@
namespace FictionArchive.Service.Shared.Contracts.Events;
public interface IFileUploadRequestCreated
{
Guid RequestId { get; }
string FilePath { get; }
byte[] FileData { get; }
}

View File

@@ -0,0 +1,11 @@
using FictionArchive.Common.Enums;
namespace FictionArchive.Service.Shared.Contracts.Events;
public interface IFileUploadRequestStatusUpdate
{
Guid RequestId { get; }
RequestStatus Status { get; }
string? FileAccessUrl { get; }
string? ErrorMessage { get; }
}

View File

@@ -0,0 +1,12 @@
using FictionArchive.Common.Enums;
namespace FictionArchive.Service.Shared.Contracts.Events;
public interface INovelCreated
{
uint NovelId { get; }
string Title { get; }
Language OriginalLanguage { get; }
string Source { get; }
string AuthorName { get; }
}

View File

@@ -0,0 +1,6 @@
namespace FictionArchive.Service.Shared.Contracts.Events;
public interface INovelUpdateRequested
{
string NovelUrl { get; }
}

View File

@@ -0,0 +1,7 @@
namespace FictionArchive.Service.Shared.Contracts.Events;
public interface ITranslationRequestCompleted
{
Guid? TranslationRequestId { get; }
string? TranslatedText { get; }
}

View File

@@ -0,0 +1,12 @@
using FictionArchive.Common.Enums;
namespace FictionArchive.Service.Shared.Contracts.Events;
public interface ITranslationRequestCreated
{
Guid TranslationRequestId { get; }
Language From { get; }
Language To { get; }
string Body { get; }
string TranslationEngineKey { get; }
}

View File

@@ -0,0 +1,12 @@
namespace FictionArchive.Service.Shared.Contracts.Events;
public interface IUserInvited
{
string InvitedUserId { get; }
string InvitedUsername { get; }
string InvitedEmail { get; }
string InvitedOAuthProviderId { get; }
string InviterId { get; }
string InviterUsername { get; }
string InviterOAuthProviderId { get; }
}

View File

@@ -0,0 +1,118 @@
using System.Text.RegularExpressions;
using FictionArchive.Service.Shared.Services.Filters;
using MassTransit;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
namespace FictionArchive.Service.Shared.Extensions;
public static class MassTransitExtensions
{
public static IServiceCollection AddFictionArchiveMassTransit(
this IServiceCollection services,
IConfiguration configuration,
Action<IBusRegistrationConfigurator>? configureConsumers = null)
{
services.AddMassTransit(x =>
{
configureConsumers?.Invoke(x);
x.UsingRabbitMq((context, cfg) =>
{
var (host, username, password) = ParseRabbitMqConfiguration(configuration);
cfg.Host(host, h =>
{
h.Username(username);
h.Password(password);
});
cfg.UseMessageRetry(r => r.Exponential(
retryLimit: 5,
minInterval: TimeSpan.FromSeconds(1),
maxInterval: TimeSpan.FromMinutes(1),
intervalDelta: TimeSpan.FromSeconds(2)));
cfg.UseConsumeFilter(typeof(LoggingConsumeFilter<>), context);
// Process one message at a time per consumer (matches old EventBus behavior)
cfg.PrefetchCount = 1;
cfg.ConfigureEndpoints(context);
});
});
return services;
}
/// <summary>
/// Parses RabbitMQ configuration from either ConnectionString format or separate Host/Username/Password keys.
/// ConnectionString format: amqp://[username:password@]host[:port]
/// </summary>
private static (string Host, string Username, string Password) ParseRabbitMqConfiguration(IConfiguration configuration)
{
var connectionString = configuration["RabbitMQ:ConnectionString"];
if (!string.IsNullOrEmpty(connectionString))
{
return ParseConnectionString(connectionString);
}
// Fallback to separate configuration keys
var host = configuration["RabbitMQ:Host"] ?? "localhost";
var username = configuration["RabbitMQ:Username"] ?? "guest";
var password = configuration["RabbitMQ:Password"] ?? "guest";
return (host, username, password);
}
/// <summary>
/// Parses an AMQP connection string into host, username, and password components.
/// Supports formats:
/// - amqp://host
/// - amqp://host:port
/// - amqp://username:password@host
/// - amqp://username:password@host:port
/// </summary>
private static (string Host, string Username, string Password) ParseConnectionString(string connectionString)
{
var username = "guest";
var password = "guest";
var host = "localhost";
// Try to parse as URI first
if (Uri.TryCreate(connectionString, UriKind.Absolute, out var uri))
{
host = uri.Host;
if (!string.IsNullOrEmpty(uri.UserInfo))
{
var userInfoParts = uri.UserInfo.Split(':', 2);
username = Uri.UnescapeDataString(userInfoParts[0]);
if (userInfoParts.Length > 1)
{
password = Uri.UnescapeDataString(userInfoParts[1]);
}
}
}
else
{
// Fallback regex parsing for edge cases
var match = Regex.Match(connectionString, @"amqp://(?:([^:]+):([^@]+)@)?([^:/]+)");
if (match.Success)
{
if (match.Groups[1].Success && match.Groups[2].Success)
{
username = match.Groups[1].Value;
password = match.Groups[2].Value;
}
if (match.Groups[3].Success)
{
host = match.Groups[3].Value;
}
}
}
return (host, username, password);
}
}

View File

@@ -31,7 +31,7 @@
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="9.0.4" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL.NodaTime" Version="9.0.4" />
<PackageReference Include="Polly" Version="8.6.5" />
<PackageReference Include="RabbitMQ.Client" Version="7.2.0" />
<PackageReference Include="MassTransit.RabbitMQ" Version="8.*" />
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="8.0.11" />
</ItemGroup>

View File

@@ -1,25 +0,0 @@
using Microsoft.Extensions.DependencyInjection;
namespace FictionArchive.Service.Shared.Services.EventBus;
public class EventBusBuilder<TEventBus> where TEventBus : class, IEventBus
{
private readonly IServiceCollection _services;
private readonly SubscriptionManager _subscriptionManager;
public EventBusBuilder(IServiceCollection services)
{
_services = services;
_services.AddSingleton<IEventBus, TEventBus>();
_subscriptionManager = new SubscriptionManager();
_services.AddSingleton<SubscriptionManager>(_subscriptionManager);
}
public EventBusBuilder<TEventBus> Subscribe<TEvent, TEventHandler>() where TEvent : IIntegrationEvent where TEventHandler : class, IIntegrationEventHandler<TEvent>
{
_services.AddKeyedTransient<IIntegrationEventHandler, TEventHandler>(typeof(TEvent).Name);
_subscriptionManager.RegisterSubscription<TEvent>();
return this;
}
}

View File

@@ -1,12 +0,0 @@
using Microsoft.Extensions.DependencyInjection;
namespace FictionArchive.Service.Shared.Services.EventBus;
public static class EventBusExtensions
{
public static EventBusBuilder<TEventBus> AddEventBus<TEventBus>(this IServiceCollection services)
where TEventBus : class, IEventBus
{
return new EventBusBuilder<TEventBus>(services);
}
}

View File

@@ -1,7 +0,0 @@
namespace FictionArchive.Service.Shared.Services.EventBus;
public interface IEventBus
{
Task Publish<TEvent>(TEvent integrationEvent) where TEvent : IIntegrationEvent;
Task Publish(object integrationEvent, string eventType);
}

View File

@@ -1,7 +0,0 @@
using NodaTime;
namespace FictionArchive.Service.Shared.Services.EventBus;
public interface IIntegrationEvent
{
}

View File

@@ -1,12 +0,0 @@
namespace FictionArchive.Service.Shared.Services.EventBus;
public interface IIntegrationEventHandler<in TEvent> : IIntegrationEventHandler where TEvent : IIntegrationEvent
{
Task Handle(TEvent @event);
Task IIntegrationEventHandler.Handle(IIntegrationEvent @event) => Handle((TEvent)@event);
}
public interface IIntegrationEventHandler
{
Task Handle(IIntegrationEvent @event);
}

View File

@@ -1,35 +0,0 @@
using RabbitMQ.Client;
namespace FictionArchive.Service.Shared.Services.EventBus.Implementations;
public class RabbitMQConnectionProvider
{
private readonly IConnectionFactory _connectionFactory;
private IConnection Connection { get; set; }
private IChannel DefaultChannel { get; set; }
public RabbitMQConnectionProvider(IConnectionFactory connectionFactory)
{
_connectionFactory = connectionFactory;
}
public async Task<IConnection> GetConnectionAsync()
{
if (Connection == null)
{
Connection = await _connectionFactory.CreateConnectionAsync();
}
return Connection;
}
public async Task<IChannel> GetDefaultChannelAsync()
{
if (DefaultChannel == null)
{
DefaultChannel = await (await GetConnectionAsync()).CreateChannelAsync();
}
return DefaultChannel;
}
}

View File

@@ -1,137 +0,0 @@
using System.Text;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using NodaTime;
using NodaTime.Serialization.JsonNet;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace FictionArchive.Service.Shared.Services.EventBus.Implementations;
public class RabbitMQEventBus : IEventBus, IHostedService
{
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly RabbitMQConnectionProvider _connectionProvider;
private readonly RabbitMQOptions _options;
private readonly SubscriptionManager _subscriptionManager;
private readonly ILogger<RabbitMQEventBus> _logger;
private readonly JsonSerializerSettings _jsonSerializerSettings;
private const string ExchangeName = "fiction-archive-event-bus";
private const string CreatedAtHeader = "X-Created-At";
private const string EventIdHeader = "X-Event-Id";
public RabbitMQEventBus(IServiceScopeFactory serviceScopeFactory, RabbitMQConnectionProvider connectionProvider, IOptions<RabbitMQOptions> options, SubscriptionManager subscriptionManager, ILogger<RabbitMQEventBus> logger)
{
_serviceScopeFactory = serviceScopeFactory;
_connectionProvider = connectionProvider;
_subscriptionManager = subscriptionManager;
_logger = logger;
_options = options.Value;
_jsonSerializerSettings = new JsonSerializerSettings().ConfigureForNodaTime(DateTimeZoneProviders.Tzdb);
}
public async Task Publish<TEvent>(TEvent integrationEvent) where TEvent : IIntegrationEvent
{
var routingKey = typeof(TEvent).Name;
await Publish(integrationEvent, routingKey);
}
public async Task Publish(object integrationEvent, string eventType)
{
var channel = await _connectionProvider.GetDefaultChannelAsync();
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(integrationEvent));
// headers
var props = new BasicProperties();
props.Headers = new Dictionary<string, object?>()
{
{ CreatedAtHeader, Instant.FromDateTimeUtc(DateTime.UtcNow).ToString() },
{ EventIdHeader, Guid.NewGuid().ToString() }
};
await channel.BasicPublishAsync(ExchangeName, eventType, true, props, body);
_logger.LogInformation("Published event {EventName}", eventType);
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_ = Task.Factory.StartNew(async () =>
{
try
{
var channel = await _connectionProvider.GetDefaultChannelAsync();
await channel.ExchangeDeclareAsync(ExchangeName, ExchangeType.Direct,
cancellationToken: cancellationToken);
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false, cancellationToken: cancellationToken);
await channel.QueueDeclareAsync(_options.ClientIdentifier, true, false, false,
cancellationToken: cancellationToken);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (sender, @event) =>
{
return OnReceivedEvent(sender, @event, channel);
};
await channel.BasicConsumeAsync(_options.ClientIdentifier, false, consumer, cancellationToken: cancellationToken);
foreach (var subscription in _subscriptionManager.Subscriptions)
{
await channel.QueueBindAsync(_options.ClientIdentifier, ExchangeName, subscription.Key,
cancellationToken: cancellationToken);
_logger.LogInformation("Subscribed to {SubscriptionKey}", subscription.Key);
}
_logger.LogInformation("RabbitMQ EventBus started.");
}
catch (Exception e)
{
_logger.LogError(e, "An error occurred while starting the RabbitMQ EventBus");
}
}, cancellationToken);
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
private async Task OnReceivedEvent(object sender, BasicDeliverEventArgs @event, IChannel channel)
{
var eventName = @event.RoutingKey;
_logger.LogInformation("Received event {EventName}", eventName);
try
{
if (!_subscriptionManager.Subscriptions.ContainsKey(eventName))
{
_logger.LogWarning("Received event without subscription entry.");
return;
}
var eventBody = Encoding.UTF8.GetString(@event.Body.Span);
var eventObject = JsonConvert.DeserializeObject(eventBody, _subscriptionManager.Subscriptions[eventName], _jsonSerializerSettings) as IIntegrationEvent;
using var scope = _serviceScopeFactory.CreateScope();
foreach (var service in scope.ServiceProvider.GetKeyedServices<IIntegrationEventHandler>(eventName))
{
await service.Handle(eventObject);
}
_logger.LogInformation("Finished handling event with name {EventName}", eventName);
}
catch (Exception e)
{
_logger.LogError(e, "An error occurred while handling an event.");
}
finally
{
await channel.BasicAckAsync(@event.DeliveryTag, false);
}
}
}

View File

@@ -1,24 +0,0 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace FictionArchive.Service.Shared.Services.EventBus.Implementations;
public static class RabbitMQExtensions
{
public static EventBusBuilder<RabbitMQEventBus> AddRabbitMQ(this IServiceCollection services, Action<RabbitMQOptions> configure)
{
services.Configure(configure);
services.AddSingleton<IConnectionFactory, ConnectionFactory>(provider =>
{
var options = provider.GetService<IOptions<RabbitMQOptions>>();
ConnectionFactory factory = new ConnectionFactory();
factory.Uri = new Uri(options.Value.ConnectionString);
return factory;
});
services.AddSingleton<RabbitMQConnectionProvider>();
services.AddHostedService<RabbitMQEventBus>();
return services.AddEventBus<RabbitMQEventBus>();
}
}

View File

@@ -1,7 +0,0 @@
namespace FictionArchive.Service.Shared.Services.EventBus.Implementations;
public class RabbitMQOptions
{
public string ConnectionString { get; set; }
public string ClientIdentifier { get; set; }
}

View File

@@ -1,11 +0,0 @@
namespace FictionArchive.Service.Shared.Services.EventBus;
public class SubscriptionManager
{
public Dictionary<string, Type> Subscriptions { get; } = new Dictionary<string, Type>();
public void RegisterSubscription<TEvent>()
{
Subscriptions.Add(typeof(TEvent).Name, typeof(TEvent));
}
}

View File

@@ -0,0 +1,33 @@
using MassTransit;
using Microsoft.Extensions.Logging;
namespace FictionArchive.Service.Shared.Services.Filters;
public class LoggingConsumeFilter<T> : IFilter<ConsumeContext<T>> where T : class
{
private readonly ILogger<LoggingConsumeFilter<T>> _logger;
public LoggingConsumeFilter(ILogger<LoggingConsumeFilter<T>> logger)
{
_logger = logger;
}
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
{
try
{
await next.Send(context);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Message {MessageType} failed after all retries. MessageId: {MessageId}, ConversationId: {ConversationId}",
typeof(T).Name,
context.MessageId,
context.ConversationId);
throw;
}
}
public void Probe(ProbeContext context) => context.CreateFilterScope("logging");
}