Files
FictionArchive/FictionArchive.Service.NovelService/Sagas/NovelImportSaga.cs
2026-02-01 10:19:52 -05:00

162 lines
7.6 KiB
C#

using FictionArchive.Common.Enums;
using FictionArchive.Service.Shared.Contracts.Events;
using MassTransit;
using NodaTime;
namespace FictionArchive.Service.NovelService.Sagas;
public class NovelImportSaga : MassTransitStateMachine<NovelImportSagaState>
{
public State Importing { get; private set; } = null!;
public State Processing { get; private set; } = null!;
public State Completed { get; private set; } = null!;
public State Failed { get; private set; } = null!;
public Event<INovelImportRequested> NovelImportRequested { get; private set; } = null!;
public Event<INovelMetadataImported> NovelMetadataImported { get; private set; } = null!;
public Event<IChapterPullCompleted> ChapterPullCompleted { get; private set; } = null!;
public Event<IFileUploadRequestStatusUpdate> FileUploadStatusUpdate { get; private set; } = null!;
public Event<Fault<IChapterPullRequested>> ChapterPullFaulted { get; private set; } = null!;
public Event<Fault<IFileUploadRequestCreated>> FileUploadFaulted { get; private set; } = null!;
private readonly IClock _clock;
public NovelImportSaga(IClock clock)
{
_clock = clock;
InstanceState(x => x.CurrentState);
Event(() => NovelImportRequested, x => x.CorrelateById(ctx => ctx.Message.ImportId));
Event(() => NovelMetadataImported, x => x.CorrelateById(ctx => ctx.Message.ImportId));
Event(() => ChapterPullCompleted, x => x.CorrelateById(ctx => ctx.Message.ImportId));
Event(() => FileUploadStatusUpdate, x =>
{
x.CorrelateById(ctx => ctx.Message.ImportId ?? Guid.Empty);
x.OnMissingInstance(m => m.Discard());
});
Event(() => ChapterPullFaulted, x => x.CorrelateById(ctx => ctx.Message.Message.ImportId));
Event(() => FileUploadFaulted, x =>
{
x.CorrelateById(ctx => ctx.Message.Message.ImportId ?? Guid.Empty);
x.OnMissingInstance(m => m.Discard());
});
Initially(
When(NovelImportRequested)
.Then(ctx =>
{
ctx.Saga.NovelUrl = ctx.Message.NovelUrl;
ctx.Saga.StartedAt = _clock.GetCurrentInstant();
})
.TransitionTo(Importing)
.PublishAsync(ctx => ctx.Init<IJobStatusUpdate>(new JobStatusUpdate(
ctx.Saga.CorrelationId, null, "NovelImport",
$"Import {ctx.Saga.NovelUrl}", JobStatus.InProgress,
null, new Dictionary<string, string> { ["NovelUrl"] = ctx.Saga.NovelUrl })))
);
During(Importing,
When(NovelMetadataImported)
.Then(ctx =>
{
ctx.Saga.NovelId = ctx.Message.NovelId;
ctx.Saga.ExpectedChapters = ctx.Message.ChaptersPendingPull;
ctx.Saga.ExpectedImages += ctx.Message.CoverImageQueued ? 1 : 0;
})
.IfElse(
ctx => ctx.Saga.ExpectedChapters == 0 && !ctx.Message.CoverImageQueued,
thenBinder => thenBinder
.Then(ctx => ctx.Saga.CompletedAt = _clock.GetCurrentInstant())
.TransitionTo(Completed)
.PublishAsync(ctx => ctx.Init<INovelImportCompleted>(new NovelImportCompleted(
ctx.Saga.CorrelationId,
ctx.Saga.NovelId,
true,
null)))
.PublishAsync(ctx => ctx.Init<IJobStatusUpdate>(new JobStatusUpdate(
ctx.Saga.CorrelationId, null, "NovelImport",
$"Import {ctx.Saga.NovelUrl}", JobStatus.Completed,
null, new Dictionary<string, string> { ["NovelId"] = ctx.Saga.NovelId.ToString() }))),
elseBinder => elseBinder.TransitionTo(Processing)
)
);
During(Processing,
When(ChapterPullCompleted)
.Then(ctx =>
{
ctx.Saga.CompletedChapters++;
ctx.Saga.ExpectedImages += ctx.Message.ImagesQueued;
})
.If(ctx => IsComplete(ctx.Saga), ctx => ctx
.Then(c => c.Saga.CompletedAt = _clock.GetCurrentInstant())
.TransitionTo(Completed)
.PublishAsync(c => c.Init<INovelImportCompleted>(new NovelImportCompleted(
c.Saga.CorrelationId,
c.Saga.NovelId,
true,
null)))
.PublishAsync(c => c.Init<IJobStatusUpdate>(new JobStatusUpdate(
c.Saga.CorrelationId, null, "NovelImport",
$"Import {c.Saga.NovelUrl}", JobStatus.Completed,
null, new Dictionary<string, string> { ["NovelId"] = c.Saga.NovelId.ToString() })))),
When(FileUploadStatusUpdate)
.Then(ctx => ctx.Saga.CompletedImages++)
.If(ctx => IsComplete(ctx.Saga), ctx => ctx
.Then(c => c.Saga.CompletedAt = _clock.GetCurrentInstant())
.TransitionTo(Completed)
.PublishAsync(c => c.Init<INovelImportCompleted>(new NovelImportCompleted(
c.Saga.CorrelationId,
c.Saga.NovelId,
true,
null)))
.PublishAsync(c => c.Init<IJobStatusUpdate>(new JobStatusUpdate(
c.Saga.CorrelationId, null, "NovelImport",
$"Import {c.Saga.NovelUrl}", JobStatus.Completed,
null, new Dictionary<string, string> { ["NovelId"] = c.Saga.NovelId.ToString() })))),
When(ChapterPullFaulted)
.Then(ctx =>
{
ctx.Saga.ErrorMessage = ctx.Message.Exceptions.FirstOrDefault()?.Message;
ctx.Saga.CompletedAt = _clock.GetCurrentInstant();
})
.TransitionTo(Failed)
.PublishAsync(ctx => ctx.Init<INovelImportCompleted>(new NovelImportCompleted(
ctx.Saga.CorrelationId,
ctx.Saga.NovelId,
false,
ctx.Saga.ErrorMessage)))
.PublishAsync(ctx => ctx.Init<IJobStatusUpdate>(new JobStatusUpdate(
ctx.Saga.CorrelationId, null, "NovelImport",
$"Import {ctx.Saga.NovelUrl}", JobStatus.Failed,
ctx.Saga.ErrorMessage, null))),
When(FileUploadFaulted)
.Then(ctx =>
{
ctx.Saga.ErrorMessage = ctx.Message.Exceptions.FirstOrDefault()?.Message;
ctx.Saga.CompletedAt = _clock.GetCurrentInstant();
})
.TransitionTo(Failed)
.PublishAsync(ctx => ctx.Init<INovelImportCompleted>(new NovelImportCompleted(
ctx.Saga.CorrelationId,
ctx.Saga.NovelId,
false,
ctx.Saga.ErrorMessage)))
.PublishAsync(ctx => ctx.Init<IJobStatusUpdate>(new JobStatusUpdate(
ctx.Saga.CorrelationId, null, "NovelImport",
$"Import {ctx.Saga.NovelUrl}", JobStatus.Failed,
ctx.Saga.ErrorMessage, null)))
);
SetCompletedWhenFinalized();
}
private static bool IsComplete(NovelImportSagaState saga) =>
saga.CompletedChapters >= saga.ExpectedChapters &&
saga.CompletedImages >= saga.ExpectedImages;
}