[FA-misc] Mass transit overhaul, needs testing and review

This commit is contained in:
gamer147
2026-01-21 23:16:31 -05:00
parent 055ef33666
commit f88f340d0a
97 changed files with 1150 additions and 858 deletions

View File

@@ -19,6 +19,7 @@
<ItemGroup>
<PackageReference Include="AppAny.Quartz.EntityFrameworkCore.Migrations.PostgreSQL" Version="0.5.1" />
<PackageReference Include="MassTransit" Version="8.4.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.11">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>

View File

@@ -1,4 +1,4 @@
using FictionArchive.Service.Shared.Services.EventBus;
using MassTransit;
using Newtonsoft.Json;
using Quartz;
@@ -6,30 +6,70 @@ namespace FictionArchive.Service.SchedulerService.Models.JobTemplates;
public class EventJobTemplate : IJob
{
private readonly IEventBus _eventBus;
private readonly IPublishEndpoint _publishEndpoint;
private readonly ISendEndpointProvider _sendEndpointProvider;
private readonly ILogger<EventJobTemplate> _logger;
public const string EventTypeParameter = "RoutingKey";
public const string EventTypeParameter = "MessageType";
public const string EventDataParameter = "MessageData";
public EventJobTemplate(IEventBus eventBus, ILogger<EventJobTemplate> logger)
public const string IsCommandParameter = "IsCommand";
public const string DestinationQueueParameter = "DestinationQueue";
public EventJobTemplate(
IPublishEndpoint publishEndpoint,
ISendEndpointProvider sendEndpointProvider,
ILogger<EventJobTemplate> logger)
{
_eventBus = eventBus;
_publishEndpoint = publishEndpoint;
_sendEndpointProvider = sendEndpointProvider;
_logger = logger;
}
public async Task Execute(IJobExecutionContext context)
{
try
{
var eventData = context.MergedJobDataMap.GetString(EventDataParameter);
var eventType = context.MergedJobDataMap.GetString(EventTypeParameter);
var eventObject = JsonConvert.DeserializeObject(eventData);
await _eventBus.Publish(eventObject, eventType);
var messageData = context.MergedJobDataMap.GetString(EventDataParameter);
var messageTypeName = context.MergedJobDataMap.GetString(EventTypeParameter);
var isCommand = context.MergedJobDataMap.GetBoolean(IsCommandParameter);
var messageType = Type.GetType(messageTypeName!);
if (messageType == null)
{
_logger.LogError("Could not resolve message type: {MessageType}", messageTypeName);
return;
}
var message = JsonConvert.DeserializeObject(messageData!, messageType);
if (message == null)
{
_logger.LogError("Could not deserialize message data for type: {MessageType}", messageTypeName);
return;
}
if (isCommand)
{
var destinationQueue = context.MergedJobDataMap.GetString(DestinationQueueParameter);
if (string.IsNullOrEmpty(destinationQueue))
{
_logger.LogError("Destination queue not specified for command message");
return;
}
var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"queue:{destinationQueue}"));
await endpoint.Send(message, messageType);
_logger.LogInformation("Sent command {MessageType} to queue {Queue}", messageTypeName, destinationQueue);
}
else
{
await _publishEndpoint.Publish(message, messageType);
_logger.LogInformation("Published event {MessageType}", messageTypeName);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "An error occurred while running an event job.");
throw; // Re-throw to let Quartz handle retries
}
}
}
}

View File

@@ -2,7 +2,7 @@ using FictionArchive.Service.SchedulerService.GraphQL;
using FictionArchive.Service.SchedulerService.Services;
using FictionArchive.Service.Shared;
using FictionArchive.Service.Shared.Extensions;
using FictionArchive.Service.Shared.Services.EventBus.Implementations;
using FictionArchive.Service.Shared.MassTransit;
using Quartz;
using Quartz.Impl.AdoJobStore;
@@ -34,14 +34,11 @@ public class Program
#endregion
#region Event Bus
#region MassTransit
if (!isSchemaExport)
{
builder.Services.AddRabbitMQ(opt =>
{
builder.Configuration.GetSection("RabbitMQ").Bind(opt);
});
builder.Services.AddFictionArchiveMassTransit(builder.Configuration);
}
#endregion

View File

@@ -1,7 +1,6 @@
using System.Data;
using FictionArchive.Service.SchedulerService.Models;
using FictionArchive.Service.SchedulerService.Models.JobTemplates;
using FictionArchive.Service.Shared.Services.EventBus;
using Quartz;
using Quartz.Impl.Matchers;

View File

@@ -6,8 +6,10 @@
}
},
"RabbitMQ": {
"ConnectionString": "amqp://localhost",
"ClientIdentifier": "SchedulerService"
"Host": "localhost",
"VirtualHost": "/",
"Username": "guest",
"Password": "guest"
},
"ConnectionStrings": {
"DefaultConnection": "Host=localhost;Database=FictionArchive_SchedulerService;Username=postgres;password=postgres"