75 lines
2.2 KiB
C#
75 lines
2.2 KiB
C#
using System.Text.Json;
|
|
using FictionArchive.Service.ReportingService.Models.Database;
|
|
using FictionArchive.Service.Shared.MassTransit.Contracts;
|
|
using MassTransit;
|
|
|
|
namespace FictionArchive.Service.ReportingService.Services.Consumers;
|
|
|
|
public class JobStateChangedEventConsumer : IConsumer<JobStateChangedEvent>
|
|
{
|
|
private readonly ReportingServiceDbContext _dbContext;
|
|
private readonly ILogger<JobStateChangedEventConsumer> _logger;
|
|
|
|
public JobStateChangedEventConsumer(
|
|
ReportingServiceDbContext dbContext,
|
|
ILogger<JobStateChangedEventConsumer> logger)
|
|
{
|
|
_dbContext = dbContext;
|
|
_logger = logger;
|
|
}
|
|
|
|
public async Task Consume(ConsumeContext<JobStateChangedEvent> context)
|
|
{
|
|
var @event = context.Message;
|
|
|
|
var job = await _dbContext.Jobs.FindAsync(@event.JobId);
|
|
|
|
if (job == null)
|
|
{
|
|
job = new Job
|
|
{
|
|
Id = @event.JobId,
|
|
JobType = @event.JobType,
|
|
Status = @event.ToState,
|
|
CreatedTime = @event.Timestamp,
|
|
UpdatedTime = @event.Timestamp,
|
|
Metadata = @event.Metadata != null
|
|
? JsonSerializer.SerializeToDocument(@event.Metadata)
|
|
: null
|
|
};
|
|
_dbContext.Jobs.Add(job);
|
|
}
|
|
else
|
|
{
|
|
job.Status = @event.ToState;
|
|
job.UpdatedTime = @event.Timestamp;
|
|
|
|
if (@event.Error != null)
|
|
{
|
|
job.ErrorMessage = @event.Error;
|
|
}
|
|
|
|
if (@event.ToState is "Completed" or "Failed")
|
|
{
|
|
job.CompletedTime = @event.Timestamp;
|
|
}
|
|
}
|
|
|
|
var historyEntry = new JobHistoryEntry
|
|
{
|
|
JobId = @event.JobId,
|
|
FromState = @event.FromState,
|
|
ToState = @event.ToState,
|
|
Message = @event.Message,
|
|
Error = @event.Error,
|
|
Timestamp = @event.Timestamp
|
|
};
|
|
_dbContext.JobHistoryEntries.Add(historyEntry);
|
|
|
|
await _dbContext.SaveChangesAsync();
|
|
|
|
_logger.LogDebug("Recorded job state change: {JobId} {FromState} -> {ToState}",
|
|
@event.JobId, @event.FromState, @event.ToState);
|
|
}
|
|
}
|