diff --git a/FictionArchive.Service.ReportingService/Consumers/JobStatusUpdateConsumer.cs b/FictionArchive.Service.ReportingService/Consumers/JobStatusUpdateConsumer.cs new file mode 100644 index 0000000..3267ef4 --- /dev/null +++ b/FictionArchive.Service.ReportingService/Consumers/JobStatusUpdateConsumer.cs @@ -0,0 +1,66 @@ +using FictionArchive.Service.ReportingService.Models; +using FictionArchive.Service.ReportingService.Services; +using FictionArchive.Service.Shared.Contracts.Events; +using MassTransit; +using Microsoft.EntityFrameworkCore; + +namespace FictionArchive.Service.ReportingService.Consumers; + +public class JobStatusUpdateConsumer : IConsumer +{ + private readonly ILogger _logger; + private readonly ReportingDbContext _dbContext; + + public JobStatusUpdateConsumer( + ILogger logger, + ReportingDbContext dbContext) + { + _logger = logger; + _dbContext = dbContext; + } + + public async Task Consume(ConsumeContext context) + { + var message = context.Message; + + var existingJob = await _dbContext.Jobs.FirstOrDefaultAsync(j => j.Id == message.JobId); + + if (existingJob == null) + { + var job = new Job + { + Id = message.JobId, + ParentJobId = message.ParentJobId, + JobType = message.JobType, + DisplayName = message.DisplayName, + Status = message.Status, + ErrorMessage = message.ErrorMessage, + Metadata = message.Metadata != null + ? new Dictionary(message.Metadata) + : null + }; + + _dbContext.Jobs.Add(job); + _logger.LogInformation("Created job {JobId} of type {JobType}", message.JobId, message.JobType); + } + else + { + existingJob.Status = message.Status; + existingJob.DisplayName = message.DisplayName; + existingJob.ErrorMessage = message.ErrorMessage; + + if (message.Metadata != null) + { + existingJob.Metadata ??= new Dictionary(); + foreach (var kvp in message.Metadata) + { + existingJob.Metadata[kvp.Key] = kvp.Value; + } + } + + _logger.LogInformation("Updated job {JobId} to status {Status}", message.JobId, message.Status); + } + + await _dbContext.SaveChangesAsync(); + } +}