119 lines
4.0 KiB
C#
119 lines
4.0 KiB
C#
using System.Text.RegularExpressions;
|
|
using FictionArchive.Service.Shared.Services.Filters;
|
|
using MassTransit;
|
|
using Microsoft.Extensions.Configuration;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
|
|
namespace FictionArchive.Service.Shared.Extensions;
|
|
|
|
public static class MassTransitExtensions
|
|
{
|
|
public static IServiceCollection AddFictionArchiveMassTransit(
|
|
this IServiceCollection services,
|
|
IConfiguration configuration,
|
|
Action<IBusRegistrationConfigurator>? configureConsumers = null)
|
|
{
|
|
services.AddMassTransit(x =>
|
|
{
|
|
configureConsumers?.Invoke(x);
|
|
|
|
x.UsingRabbitMq((context, cfg) =>
|
|
{
|
|
var (host, username, password) = ParseRabbitMqConfiguration(configuration);
|
|
|
|
cfg.Host(host, h =>
|
|
{
|
|
h.Username(username);
|
|
h.Password(password);
|
|
});
|
|
|
|
cfg.UseMessageRetry(r => r.Exponential(
|
|
retryLimit: 5,
|
|
minInterval: TimeSpan.FromSeconds(1),
|
|
maxInterval: TimeSpan.FromMinutes(1),
|
|
intervalDelta: TimeSpan.FromSeconds(2)));
|
|
|
|
cfg.UseConsumeFilter(typeof(LoggingConsumeFilter<>), context);
|
|
|
|
// Process one message at a time per consumer (matches old EventBus behavior)
|
|
cfg.PrefetchCount = 1;
|
|
|
|
cfg.ConfigureEndpoints(context);
|
|
});
|
|
});
|
|
|
|
return services;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Parses RabbitMQ configuration from either ConnectionString format or separate Host/Username/Password keys.
|
|
/// ConnectionString format: amqp://[username:password@]host[:port]
|
|
/// </summary>
|
|
private static (string Host, string Username, string Password) ParseRabbitMqConfiguration(IConfiguration configuration)
|
|
{
|
|
var connectionString = configuration["RabbitMQ:ConnectionString"];
|
|
|
|
if (!string.IsNullOrEmpty(connectionString))
|
|
{
|
|
return ParseConnectionString(connectionString);
|
|
}
|
|
|
|
// Fallback to separate configuration keys
|
|
var host = configuration["RabbitMQ:Host"] ?? "localhost";
|
|
var username = configuration["RabbitMQ:Username"] ?? "guest";
|
|
var password = configuration["RabbitMQ:Password"] ?? "guest";
|
|
|
|
return (host, username, password);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Parses an AMQP connection string into host, username, and password components.
|
|
/// Supports formats:
|
|
/// - amqp://host
|
|
/// - amqp://host:port
|
|
/// - amqp://username:password@host
|
|
/// - amqp://username:password@host:port
|
|
/// </summary>
|
|
private static (string Host, string Username, string Password) ParseConnectionString(string connectionString)
|
|
{
|
|
var username = "guest";
|
|
var password = "guest";
|
|
var host = "localhost";
|
|
|
|
// Try to parse as URI first
|
|
if (Uri.TryCreate(connectionString, UriKind.Absolute, out var uri))
|
|
{
|
|
host = uri.Host;
|
|
|
|
if (!string.IsNullOrEmpty(uri.UserInfo))
|
|
{
|
|
var userInfoParts = uri.UserInfo.Split(':', 2);
|
|
username = Uri.UnescapeDataString(userInfoParts[0]);
|
|
if (userInfoParts.Length > 1)
|
|
{
|
|
password = Uri.UnescapeDataString(userInfoParts[1]);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// Fallback regex parsing for edge cases
|
|
var match = Regex.Match(connectionString, @"amqp://(?:([^:]+):([^@]+)@)?([^:/]+)");
|
|
if (match.Success)
|
|
{
|
|
if (match.Groups[1].Success && match.Groups[2].Success)
|
|
{
|
|
username = match.Groups[1].Value;
|
|
password = match.Groups[2].Value;
|
|
}
|
|
if (match.Groups[3].Success)
|
|
{
|
|
host = match.Groups[3].Value;
|
|
}
|
|
}
|
|
}
|
|
|
|
return (host, username, password);
|
|
}
|
|
}
|