feature/FA-misc_ReportingService #64
@@ -3,6 +3,7 @@ using Amazon.S3.Model;
|
|||||||
using FictionArchive.Common.Enums;
|
using FictionArchive.Common.Enums;
|
||||||
using FictionArchive.Service.FileService.Models;
|
using FictionArchive.Service.FileService.Models;
|
||||||
using FictionArchive.Service.Shared.Contracts.Events;
|
using FictionArchive.Service.Shared.Contracts.Events;
|
||||||
|
using FictionArchive.Service.Shared.Extensions;
|
||||||
using MassTransit;
|
using MassTransit;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
@@ -35,6 +36,10 @@ public class FileUploadRequestCreatedConsumer : IConsumer<IFileUploadRequestCrea
|
|||||||
{
|
{
|
||||||
var message = context.Message;
|
var message = context.Message;
|
||||||
|
|
||||||
|
await _publishEndpoint.ReportJobStatus(
|
||||||
|
message.RequestId, "FileUpload", $"Upload {message.FilePath}",
|
||||||
|
JobStatus.InProgress, parentJobId: message.ImportId);
|
||||||
|
|
||||||
var putObjectRequest = new PutObjectRequest
|
var putObjectRequest = new PutObjectRequest
|
||||||
{
|
{
|
||||||
BucketName = _s3Configuration.Bucket,
|
BucketName = _s3Configuration.Bucket,
|
||||||
@@ -58,6 +63,11 @@ public class FileUploadRequestCreatedConsumer : IConsumer<IFileUploadRequestCrea
|
|||||||
Status: RequestStatus.Failed,
|
Status: RequestStatus.Failed,
|
||||||
FileAccessUrl: null,
|
FileAccessUrl: null,
|
||||||
ErrorMessage: "An error occurred while uploading file to S3."));
|
ErrorMessage: "An error occurred while uploading file to S3."));
|
||||||
|
|
||||||
|
await _publishEndpoint.ReportJobStatus(
|
||||||
|
message.RequestId, "FileUpload", $"Upload {message.FilePath}",
|
||||||
|
JobStatus.Failed, parentJobId: message.ImportId,
|
||||||
|
errorMessage: "An error occurred while uploading file to S3.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -72,5 +82,10 @@ public class FileUploadRequestCreatedConsumer : IConsumer<IFileUploadRequestCrea
|
|||||||
Status: RequestStatus.Success,
|
Status: RequestStatus.Success,
|
||||||
FileAccessUrl: fileAccessUrl,
|
FileAccessUrl: fileAccessUrl,
|
||||||
ErrorMessage: null));
|
ErrorMessage: null));
|
||||||
|
|
||||||
|
await _publishEndpoint.ReportJobStatus(
|
||||||
|
message.RequestId, "FileUpload", $"Upload {message.FilePath}",
|
||||||
|
JobStatus.Completed, parentJobId: message.ImportId,
|
||||||
|
metadata: new Dictionary<string, string> { ["FileAccessUrl"] = fileAccessUrl });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,6 +27,11 @@ public class NovelImportSagaTests
|
|||||||
|
|
||||||
var sagaHarness = harness.GetSagaStateMachineHarness<NovelImportSaga, NovelImportSagaState>();
|
var sagaHarness = harness.GetSagaStateMachineHarness<NovelImportSaga, NovelImportSagaState>();
|
||||||
(await sagaHarness.Exists(importId, x => x.Importing)).HasValue.Should().BeTrue();
|
(await sagaHarness.Exists(importId, x => x.Importing)).HasValue.Should().BeTrue();
|
||||||
|
|
||||||
|
(await harness.Published.Any<IJobStatusUpdate>(x =>
|
||||||
|
x.Context.Message.JobId == importId &&
|
||||||
|
x.Context.Message.Status == JobStatus.InProgress &&
|
||||||
|
x.Context.Message.JobType == "NovelImport")).Should().BeTrue();
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
@@ -45,6 +50,11 @@ public class NovelImportSagaTests
|
|||||||
|
|
||||||
(await harness.Published.Any<INovelImportCompleted>(x =>
|
(await harness.Published.Any<INovelImportCompleted>(x =>
|
||||||
x.Context.Message.ImportId == importId && x.Context.Message.Success)).Should().BeTrue();
|
x.Context.Message.ImportId == importId && x.Context.Message.Success)).Should().BeTrue();
|
||||||
|
|
||||||
|
(await harness.Published.Any<IJobStatusUpdate>(x =>
|
||||||
|
x.Context.Message.JobId == importId &&
|
||||||
|
x.Context.Message.Status == JobStatus.Completed &&
|
||||||
|
x.Context.Message.JobType == "NovelImport")).Should().BeTrue();
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
@@ -79,6 +89,11 @@ public class NovelImportSagaTests
|
|||||||
|
|
||||||
var sagaHarness = harness.GetSagaStateMachineHarness<NovelImportSaga, NovelImportSagaState>();
|
var sagaHarness = harness.GetSagaStateMachineHarness<NovelImportSaga, NovelImportSagaState>();
|
||||||
(await sagaHarness.Exists(importId, x => x.Completed)).HasValue.Should().BeTrue();
|
(await sagaHarness.Exists(importId, x => x.Completed)).HasValue.Should().BeTrue();
|
||||||
|
|
||||||
|
(await harness.Published.Any<IJobStatusUpdate>(x =>
|
||||||
|
x.Context.Message.JobId == importId &&
|
||||||
|
x.Context.Message.Status == JobStatus.Completed &&
|
||||||
|
x.Context.Message.JobType == "NovelImport")).Should().BeTrue();
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
@@ -121,6 +136,48 @@ public class NovelImportSagaTests
|
|||||||
|
|
||||||
(await harness.Published.Any<INovelImportCompleted>(x =>
|
(await harness.Published.Any<INovelImportCompleted>(x =>
|
||||||
x.Context.Message.ImportId == importId && x.Context.Message.Success)).Should().BeTrue();
|
x.Context.Message.ImportId == importId && x.Context.Message.Success)).Should().BeTrue();
|
||||||
|
|
||||||
|
(await harness.Published.Any<IJobStatusUpdate>(x =>
|
||||||
|
x.Context.Message.JobId == importId &&
|
||||||
|
x.Context.Message.Status == JobStatus.Completed &&
|
||||||
|
x.Context.Message.JobType == "NovelImport")).Should().BeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Should_publish_failed_job_status_on_chapter_pull_fault()
|
||||||
|
{
|
||||||
|
await using var provider = CreateTestProvider();
|
||||||
|
var harness = provider.GetRequiredService<ITestHarness>();
|
||||||
|
await harness.Start();
|
||||||
|
|
||||||
|
var importId = Guid.NewGuid();
|
||||||
|
await harness.Bus.Publish<INovelImportRequested>(new NovelImportRequested(importId, "https://example.com/novel"));
|
||||||
|
await harness.Bus.Publish<INovelMetadataImported>(new NovelMetadataImported(importId, 1, 1, false));
|
||||||
|
|
||||||
|
var sagaHarness = harness.GetSagaStateMachineHarness<NovelImportSaga, NovelImportSagaState>();
|
||||||
|
(await sagaHarness.Exists(importId, x => x.Processing)).HasValue.Should().BeTrue();
|
||||||
|
|
||||||
|
await harness.Bus.Publish<Fault<IChapterPullRequested>>(new
|
||||||
|
{
|
||||||
|
Message = new ChapterPullRequested(importId, 1, 1, 1),
|
||||||
|
Exceptions = new[]
|
||||||
|
{
|
||||||
|
new
|
||||||
|
{
|
||||||
|
ExceptionType = typeof(Exception).FullName!,
|
||||||
|
Message = "Chapter pull failed",
|
||||||
|
StackTrace = "stack trace",
|
||||||
|
InnerException = (object?)null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
(await sagaHarness.Exists(importId, x => x.Failed)).HasValue.Should().BeTrue();
|
||||||
|
|
||||||
|
(await harness.Published.Any<IJobStatusUpdate>(x =>
|
||||||
|
x.Context.Message.JobId == importId &&
|
||||||
|
x.Context.Message.Status == JobStatus.Failed &&
|
||||||
|
x.Context.Message.JobType == "NovelImport")).Should().BeTrue();
|
||||||
}
|
}
|
||||||
|
|
||||||
private ServiceProvider CreateTestProvider()
|
private ServiceProvider CreateTestProvider()
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
|
using FictionArchive.Common.Enums;
|
||||||
using FictionArchive.Service.NovelService.Services;
|
using FictionArchive.Service.NovelService.Services;
|
||||||
using FictionArchive.Service.Shared.Contracts.Events;
|
using FictionArchive.Service.Shared.Contracts.Events;
|
||||||
|
using FictionArchive.Service.Shared.Extensions;
|
||||||
using MassTransit;
|
using MassTransit;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
@@ -21,6 +23,11 @@ public class ChapterPullRequestedConsumer : IConsumer<IChapterPullRequested>
|
|||||||
public async Task Consume(ConsumeContext<IChapterPullRequested> context)
|
public async Task Consume(ConsumeContext<IChapterPullRequested> context)
|
||||||
{
|
{
|
||||||
var message = context.Message;
|
var message = context.Message;
|
||||||
|
var chapterJobId = Guid.NewGuid();
|
||||||
|
|
||||||
|
await context.ReportJobStatus(
|
||||||
|
chapterJobId, "ChapterPull", $"Pull chapter {message.ChapterOrder}",
|
||||||
|
JobStatus.InProgress, parentJobId: message.ImportId);
|
||||||
|
|
||||||
var (chapter, imageCount) = await _novelUpdateService.PullChapterContents(
|
var (chapter, imageCount) = await _novelUpdateService.PullChapterContents(
|
||||||
message.ImportId,
|
message.ImportId,
|
||||||
@@ -33,5 +40,10 @@ public class ChapterPullRequestedConsumer : IConsumer<IChapterPullRequested>
|
|||||||
chapter.Id,
|
chapter.Id,
|
||||||
imageCount
|
imageCount
|
||||||
));
|
));
|
||||||
|
|
||||||
|
await context.ReportJobStatus(
|
||||||
|
chapterJobId, "ChapterPull", $"Pull chapter {message.ChapterOrder}",
|
||||||
|
JobStatus.Completed, parentJobId: message.ImportId,
|
||||||
|
metadata: new Dictionary<string, string> { ["ChapterId"] = chapter.Id.ToString() });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
using FictionArchive.Common.Enums;
|
||||||
using FictionArchive.Service.Shared.Contracts.Events;
|
using FictionArchive.Service.Shared.Contracts.Events;
|
||||||
using MassTransit;
|
using MassTransit;
|
||||||
using NodaTime;
|
using NodaTime;
|
||||||
@@ -49,6 +50,10 @@ public class NovelImportSaga : MassTransitStateMachine<NovelImportSagaState>
|
|||||||
ctx.Saga.StartedAt = _clock.GetCurrentInstant();
|
ctx.Saga.StartedAt = _clock.GetCurrentInstant();
|
||||||
})
|
})
|
||||||
.TransitionTo(Importing)
|
.TransitionTo(Importing)
|
||||||
|
.PublishAsync(ctx => ctx.Init<IJobStatusUpdate>(new JobStatusUpdate(
|
||||||
|
ctx.Saga.CorrelationId, null, "NovelImport",
|
||||||
|
$"Import {ctx.Saga.NovelUrl}", JobStatus.InProgress,
|
||||||
|
null, new Dictionary<string, string> { ["NovelUrl"] = ctx.Saga.NovelUrl })))
|
||||||
);
|
);
|
||||||
|
|
||||||
During(Importing,
|
During(Importing,
|
||||||
@@ -68,7 +73,11 @@ public class NovelImportSaga : MassTransitStateMachine<NovelImportSagaState>
|
|||||||
ctx.Saga.CorrelationId,
|
ctx.Saga.CorrelationId,
|
||||||
ctx.Saga.NovelId,
|
ctx.Saga.NovelId,
|
||||||
true,
|
true,
|
||||||
null))),
|
null)))
|
||||||
|
.PublishAsync(ctx => ctx.Init<IJobStatusUpdate>(new JobStatusUpdate(
|
||||||
|
ctx.Saga.CorrelationId, null, "NovelImport",
|
||||||
|
$"Import {ctx.Saga.NovelUrl}", JobStatus.Completed,
|
||||||
|
null, new Dictionary<string, string> { ["NovelId"] = ctx.Saga.NovelId.ToString() }))),
|
||||||
elseBinder => elseBinder.TransitionTo(Processing)
|
elseBinder => elseBinder.TransitionTo(Processing)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
@@ -87,7 +96,11 @@ public class NovelImportSaga : MassTransitStateMachine<NovelImportSagaState>
|
|||||||
c.Saga.CorrelationId,
|
c.Saga.CorrelationId,
|
||||||
c.Saga.NovelId,
|
c.Saga.NovelId,
|
||||||
true,
|
true,
|
||||||
null)))),
|
null)))
|
||||||
|
.PublishAsync(c => c.Init<IJobStatusUpdate>(new JobStatusUpdate(
|
||||||
|
c.Saga.CorrelationId, null, "NovelImport",
|
||||||
|
$"Import {c.Saga.NovelUrl}", JobStatus.Completed,
|
||||||
|
null, new Dictionary<string, string> { ["NovelId"] = c.Saga.NovelId.ToString() })))),
|
||||||
|
|
||||||
When(FileUploadStatusUpdate)
|
When(FileUploadStatusUpdate)
|
||||||
.Then(ctx => ctx.Saga.CompletedImages++)
|
.Then(ctx => ctx.Saga.CompletedImages++)
|
||||||
@@ -98,7 +111,11 @@ public class NovelImportSaga : MassTransitStateMachine<NovelImportSagaState>
|
|||||||
c.Saga.CorrelationId,
|
c.Saga.CorrelationId,
|
||||||
c.Saga.NovelId,
|
c.Saga.NovelId,
|
||||||
true,
|
true,
|
||||||
null)))),
|
null)))
|
||||||
|
.PublishAsync(c => c.Init<IJobStatusUpdate>(new JobStatusUpdate(
|
||||||
|
c.Saga.CorrelationId, null, "NovelImport",
|
||||||
|
$"Import {c.Saga.NovelUrl}", JobStatus.Completed,
|
||||||
|
null, new Dictionary<string, string> { ["NovelId"] = c.Saga.NovelId.ToString() })))),
|
||||||
|
|
||||||
When(ChapterPullFaulted)
|
When(ChapterPullFaulted)
|
||||||
.Then(ctx =>
|
.Then(ctx =>
|
||||||
@@ -111,7 +128,11 @@ public class NovelImportSaga : MassTransitStateMachine<NovelImportSagaState>
|
|||||||
ctx.Saga.CorrelationId,
|
ctx.Saga.CorrelationId,
|
||||||
ctx.Saga.NovelId,
|
ctx.Saga.NovelId,
|
||||||
false,
|
false,
|
||||||
ctx.Saga.ErrorMessage))),
|
ctx.Saga.ErrorMessage)))
|
||||||
|
.PublishAsync(ctx => ctx.Init<IJobStatusUpdate>(new JobStatusUpdate(
|
||||||
|
ctx.Saga.CorrelationId, null, "NovelImport",
|
||||||
|
$"Import {ctx.Saga.NovelUrl}", JobStatus.Failed,
|
||||||
|
ctx.Saga.ErrorMessage, null))),
|
||||||
|
|
||||||
When(FileUploadFaulted)
|
When(FileUploadFaulted)
|
||||||
.Then(ctx =>
|
.Then(ctx =>
|
||||||
@@ -125,6 +146,10 @@ public class NovelImportSaga : MassTransitStateMachine<NovelImportSagaState>
|
|||||||
ctx.Saga.NovelId,
|
ctx.Saga.NovelId,
|
||||||
false,
|
false,
|
||||||
ctx.Saga.ErrorMessage)))
|
ctx.Saga.ErrorMessage)))
|
||||||
|
.PublishAsync(ctx => ctx.Init<IJobStatusUpdate>(new JobStatusUpdate(
|
||||||
|
ctx.Saga.CorrelationId, null, "NovelImport",
|
||||||
|
$"Import {ctx.Saga.NovelUrl}", JobStatus.Failed,
|
||||||
|
ctx.Saga.ErrorMessage, null)))
|
||||||
);
|
);
|
||||||
|
|
||||||
SetCompletedWhenFinalized();
|
SetCompletedWhenFinalized();
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
using FictionArchive.Service.Shared.Services.Database;
|
using FictionArchive.Service.Shared.Services.Database;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Npgsql;
|
||||||
|
|
||||||
namespace FictionArchive.Service.Shared.Extensions;
|
namespace FictionArchive.Service.Shared.Extensions;
|
||||||
|
|
||||||
@@ -21,9 +22,14 @@ public static class DatabaseExtensions
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
var dataSourceBuilder = new Npgsql.NpgsqlDataSourceBuilder(connectionString);
|
||||||
|
dataSourceBuilder.UseNodaTime();
|
||||||
|
dataSourceBuilder.UseJsonNet();
|
||||||
|
var dataSource = dataSourceBuilder.Build();
|
||||||
|
|
||||||
services.AddDbContext<TContext>(options =>
|
services.AddDbContext<TContext>(options =>
|
||||||
{
|
{
|
||||||
options.UseNpgsql(connectionString, o =>
|
options.UseNpgsql(dataSource, o =>
|
||||||
{
|
{
|
||||||
o.UseNodaTime();
|
o.UseNodaTime();
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -30,6 +30,7 @@
|
|||||||
<PackageReference Include="NodaTime.Serialization.JsonNet" Version="3.2.0" />
|
<PackageReference Include="NodaTime.Serialization.JsonNet" Version="3.2.0" />
|
||||||
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="9.0.4" />
|
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="9.0.4" />
|
||||||
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL.NodaTime" Version="9.0.4" />
|
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL.NodaTime" Version="9.0.4" />
|
||||||
|
<PackageReference Include="Npgsql.Json.NET" Version="9.*" />
|
||||||
<PackageReference Include="Polly" Version="8.6.5" />
|
<PackageReference Include="Polly" Version="8.6.5" />
|
||||||
<PackageReference Include="MassTransit.RabbitMQ" Version="8.*" />
|
<PackageReference Include="MassTransit.RabbitMQ" Version="8.*" />
|
||||||
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="8.0.11" />
|
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="8.0.11" />
|
||||||
|
|||||||
Reference in New Issue
Block a user