using System.Text.Json; using FictionArchive.Service.ReportingService.Models.Database; using FictionArchive.Service.Shared.MassTransit.Contracts; using MassTransit; namespace FictionArchive.Service.ReportingService.Services.Consumers; public class JobStateChangedEventConsumer : IConsumer { private readonly ReportingServiceDbContext _dbContext; private readonly ILogger _logger; public JobStateChangedEventConsumer( ReportingServiceDbContext dbContext, ILogger logger) { _dbContext = dbContext; _logger = logger; } public async Task Consume(ConsumeContext context) { var @event = context.Message; var job = await _dbContext.Jobs.FindAsync(@event.JobId); if (job == null) { job = new Job { Id = @event.JobId, JobType = @event.JobType, Status = @event.ToState, CreatedTime = @event.Timestamp, UpdatedTime = @event.Timestamp, Metadata = @event.Metadata != null ? JsonSerializer.SerializeToDocument(@event.Metadata) : null }; _dbContext.Jobs.Add(job); } else { job.Status = @event.ToState; job.UpdatedTime = @event.Timestamp; if (@event.Error != null) { job.ErrorMessage = @event.Error; } if (@event.ToState is "Completed" or "Failed") { job.CompletedTime = @event.Timestamp; } } var historyEntry = new JobHistoryEntry { JobId = @event.JobId, FromState = @event.FromState, ToState = @event.ToState, Message = @event.Message, Error = @event.Error, Timestamp = @event.Timestamp }; _dbContext.JobHistoryEntries.Add(historyEntry); await _dbContext.SaveChangesAsync(); _logger.LogDebug("Recorded job state change: {JobId} {FromState} -> {ToState}", @event.JobId, @event.FromState, @event.ToState); } }