Files
FictionArchive/FictionArchive.Service.ReportingService/Services/Consumers/JobStateChangedEventConsumer.cs

75 lines
2.2 KiB
C#

using System.Text.Json;
using FictionArchive.Service.ReportingService.Models.Database;
using FictionArchive.Service.Shared.MassTransit.Contracts;
using MassTransit;
namespace FictionArchive.Service.ReportingService.Services.Consumers;
public class JobStateChangedEventConsumer : IConsumer<JobStateChangedEvent>
{
private readonly ReportingServiceDbContext _dbContext;
private readonly ILogger<JobStateChangedEventConsumer> _logger;
public JobStateChangedEventConsumer(
ReportingServiceDbContext dbContext,
ILogger<JobStateChangedEventConsumer> logger)
{
_dbContext = dbContext;
_logger = logger;
}
public async Task Consume(ConsumeContext<JobStateChangedEvent> context)
{
var @event = context.Message;
var job = await _dbContext.Jobs.FindAsync(@event.JobId);
if (job == null)
{
job = new Job
{
Id = @event.JobId,
JobType = @event.JobType,
Status = @event.ToState,
CreatedTime = @event.Timestamp,
UpdatedTime = @event.Timestamp,
Metadata = @event.Metadata != null
? JsonSerializer.SerializeToDocument(@event.Metadata)
: null
};
_dbContext.Jobs.Add(job);
}
else
{
job.Status = @event.ToState;
job.UpdatedTime = @event.Timestamp;
if (@event.Error != null)
{
job.ErrorMessage = @event.Error;
}
if (@event.ToState is "Completed" or "Failed")
{
job.CompletedTime = @event.Timestamp;
}
}
var historyEntry = new JobHistoryEntry
{
JobId = @event.JobId,
FromState = @event.FromState,
ToState = @event.ToState,
Message = @event.Message,
Error = @event.Error,
Timestamp = @event.Timestamp
};
_dbContext.JobHistoryEntries.Add(historyEntry);
await _dbContext.SaveChangesAsync();
_logger.LogDebug("Recorded job state change: {JobId} {FromState} -> {ToState}",
@event.JobId, @event.FromState, @event.ToState);
}
}