136 lines
5.6 KiB
C#
136 lines
5.6 KiB
C#
using FictionArchive.Service.Shared.Contracts.Events;
|
|
using MassTransit;
|
|
using NodaTime;
|
|
|
|
namespace FictionArchive.Service.NovelService.Sagas;
|
|
|
|
public class NovelImportSaga : MassTransitStateMachine<NovelImportSagaState>
|
|
{
|
|
public State Importing { get; private set; } = null!;
|
|
public State Processing { get; private set; } = null!;
|
|
public State Completed { get; private set; } = null!;
|
|
public State Failed { get; private set; } = null!;
|
|
|
|
public Event<INovelImportRequested> NovelImportRequested { get; private set; } = null!;
|
|
public Event<INovelMetadataImported> NovelMetadataImported { get; private set; } = null!;
|
|
public Event<IChapterPullCompleted> ChapterPullCompleted { get; private set; } = null!;
|
|
public Event<IFileUploadRequestStatusUpdate> FileUploadStatusUpdate { get; private set; } = null!;
|
|
public Event<Fault<IChapterPullRequested>> ChapterPullFaulted { get; private set; } = null!;
|
|
public Event<Fault<IFileUploadRequestCreated>> FileUploadFaulted { get; private set; } = null!;
|
|
|
|
private readonly IClock _clock;
|
|
|
|
public NovelImportSaga(IClock clock)
|
|
{
|
|
_clock = clock;
|
|
|
|
InstanceState(x => x.CurrentState);
|
|
|
|
Event(() => NovelImportRequested, x => x.CorrelateById(ctx => ctx.Message.ImportId));
|
|
Event(() => NovelMetadataImported, x => x.CorrelateById(ctx => ctx.Message.ImportId));
|
|
Event(() => ChapterPullCompleted, x => x.CorrelateById(ctx => ctx.Message.ImportId));
|
|
Event(() => FileUploadStatusUpdate, x =>
|
|
{
|
|
x.CorrelateById(ctx => ctx.Message.ImportId ?? Guid.Empty);
|
|
x.OnMissingInstance(m => m.Discard());
|
|
});
|
|
Event(() => ChapterPullFaulted, x => x.CorrelateById(ctx => ctx.Message.Message.ImportId));
|
|
Event(() => FileUploadFaulted, x =>
|
|
{
|
|
x.CorrelateById(ctx => ctx.Message.Message.ImportId ?? Guid.Empty);
|
|
x.OnMissingInstance(m => m.Discard());
|
|
});
|
|
|
|
Initially(
|
|
When(NovelImportRequested)
|
|
.Then(ctx =>
|
|
{
|
|
ctx.Saga.NovelUrl = ctx.Message.NovelUrl;
|
|
ctx.Saga.StartedAt = _clock.GetCurrentInstant();
|
|
})
|
|
.TransitionTo(Importing)
|
|
);
|
|
|
|
During(Importing,
|
|
When(NovelMetadataImported)
|
|
.Then(ctx =>
|
|
{
|
|
ctx.Saga.NovelId = ctx.Message.NovelId;
|
|
ctx.Saga.ExpectedChapters = ctx.Message.ChaptersPendingPull;
|
|
})
|
|
.IfElse(
|
|
ctx => ctx.Saga.ExpectedChapters == 0,
|
|
thenBinder => thenBinder
|
|
.Then(ctx => ctx.Saga.CompletedAt = _clock.GetCurrentInstant())
|
|
.TransitionTo(Completed)
|
|
.PublishAsync(ctx => ctx.Init<INovelImportCompleted>(new NovelImportCompleted(
|
|
ctx.Saga.CorrelationId,
|
|
ctx.Saga.NovelId,
|
|
true,
|
|
null))),
|
|
elseBinder => elseBinder.TransitionTo(Processing)
|
|
)
|
|
);
|
|
|
|
During(Processing,
|
|
When(ChapterPullCompleted)
|
|
.Then(ctx =>
|
|
{
|
|
ctx.Saga.CompletedChapters++;
|
|
ctx.Saga.ExpectedImages += ctx.Message.ImagesQueued;
|
|
})
|
|
.If(ctx => IsComplete(ctx.Saga), ctx => ctx
|
|
.Then(c => c.Saga.CompletedAt = _clock.GetCurrentInstant())
|
|
.TransitionTo(Completed)
|
|
.PublishAsync(c => c.Init<INovelImportCompleted>(new NovelImportCompleted(
|
|
c.Saga.CorrelationId,
|
|
c.Saga.NovelId,
|
|
true,
|
|
null)))),
|
|
|
|
When(FileUploadStatusUpdate)
|
|
.Then(ctx => ctx.Saga.CompletedImages++)
|
|
.If(ctx => IsComplete(ctx.Saga), ctx => ctx
|
|
.Then(c => c.Saga.CompletedAt = _clock.GetCurrentInstant())
|
|
.TransitionTo(Completed)
|
|
.PublishAsync(c => c.Init<INovelImportCompleted>(new NovelImportCompleted(
|
|
c.Saga.CorrelationId,
|
|
c.Saga.NovelId,
|
|
true,
|
|
null)))),
|
|
|
|
When(ChapterPullFaulted)
|
|
.Then(ctx =>
|
|
{
|
|
ctx.Saga.ErrorMessage = ctx.Message.Exceptions.FirstOrDefault()?.Message;
|
|
ctx.Saga.CompletedAt = _clock.GetCurrentInstant();
|
|
})
|
|
.TransitionTo(Failed)
|
|
.PublishAsync(ctx => ctx.Init<INovelImportCompleted>(new NovelImportCompleted(
|
|
ctx.Saga.CorrelationId,
|
|
ctx.Saga.NovelId,
|
|
false,
|
|
ctx.Saga.ErrorMessage))),
|
|
|
|
When(FileUploadFaulted)
|
|
.Then(ctx =>
|
|
{
|
|
ctx.Saga.ErrorMessage = ctx.Message.Exceptions.FirstOrDefault()?.Message;
|
|
ctx.Saga.CompletedAt = _clock.GetCurrentInstant();
|
|
})
|
|
.TransitionTo(Failed)
|
|
.PublishAsync(ctx => ctx.Init<INovelImportCompleted>(new NovelImportCompleted(
|
|
ctx.Saga.CorrelationId,
|
|
ctx.Saga.NovelId,
|
|
false,
|
|
ctx.Saga.ErrorMessage)))
|
|
);
|
|
|
|
SetCompletedWhenFinalized();
|
|
}
|
|
|
|
private static bool IsComplete(NovelImportSagaState saga) =>
|
|
saga.CompletedChapters >= saga.ExpectedChapters &&
|
|
saga.CompletedImages >= saga.ExpectedImages;
|
|
}
|