feat(battle-node): RealParticipant session-finished signal + Pvp cascade
RealParticipant gains _sessionFinished TCS + MarkSessionFinished / AwaitSessionFinishedAsync. PvP first-arriver's handler awaits the signal instead of calling self.RunAsync (which the session does internally on the same instance — double-call would race the WS read). BattleSession.RunAsync branches on Type: Pvp uses WhenAny + synthesize BattleFinish(Win) to survivor + WhenAll(drain); Scripted/Bot keep Phase 1's WhenAll-everything semantics. Disconnect cascade now drives end-of-battle when a WS drops without a graceful Retire.
This commit is contained in:
@@ -43,18 +43,53 @@ public sealed class BattleSession
|
||||
|
||||
public async Task RunAsync(CancellationToken cancellation)
|
||||
{
|
||||
// Run both participants' inbound loops in parallel and wait for them all to
|
||||
// complete. NoOp/Scripted bots return immediately; Real returns when the WS
|
||||
// closes. Using WhenAny here would have killed the session as soon as the
|
||||
// scripted bot's no-op RunAsync resolved. Phase 2's Pvp/Bot cases will need
|
||||
// disconnect propagation; that's wired in their own task.
|
||||
var aTask = A.RunAsync(cancellation);
|
||||
var bTask = B.RunAsync(cancellation);
|
||||
try { await Task.WhenAll(aTask, bTask); } catch { /* swallow cancellation */ }
|
||||
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellation);
|
||||
var aTask = A.RunAsync(cts.Token);
|
||||
var bTask = B.RunAsync(cts.Token);
|
||||
|
||||
if (Type == BattleType.Pvp)
|
||||
{
|
||||
// WhenAny: first WS drop / first graceful close triggers cascade.
|
||||
// ScriptedBotParticipant.RunAsync also returns immediately; that's not used
|
||||
// here (Pvp has two RealParticipants), but we'd still want a synthesized
|
||||
// BattleFinish for the survivor if either side terminates first.
|
||||
var first = await Task.WhenAny(aTask, bTask).ConfigureAwait(false);
|
||||
var survivor = first == aTask ? B : A;
|
||||
|
||||
if (Phase != BattleSessionPhase.Terminal)
|
||||
{
|
||||
// Involuntary drop (no graceful Retire): synthesize BattleFinish(Win) to survivor.
|
||||
try
|
||||
{
|
||||
await survivor.PushAsync(
|
||||
BuildBattleFinish(BattleResult.Win), noStock: true, cancellation)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.LogWarning(ex,
|
||||
"BattleSession {Bid}: failed to push BattleFinish to survivor (their WS may also be closed)",
|
||||
BattleId);
|
||||
}
|
||||
Phase = BattleSessionPhase.Terminal;
|
||||
}
|
||||
|
||||
cts.Cancel(); // unblock the survivor's RunAsync read loop
|
||||
try { await Task.WhenAll(aTask, bTask).ConfigureAwait(false); }
|
||||
catch { /* swallow cancellation / WS exceptions */ }
|
||||
}
|
||||
else
|
||||
{
|
||||
// Phase 1 semantics for Scripted/Bot: wait for ALL participants. The bot's
|
||||
// RunAsync returns immediately; the session keeps running for the real one.
|
||||
try { await Task.WhenAll(aTask, bTask).ConfigureAwait(false); }
|
||||
catch { /* swallow */ }
|
||||
}
|
||||
|
||||
await Task.WhenAll(
|
||||
A.TerminateAsync(BattleFinishReason.NormalFinish),
|
||||
B.TerminateAsync(BattleFinishReason.NormalFinish));
|
||||
B.TerminateAsync(BattleFinishReason.NormalFinish))
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private Task OnFrameFromA(MsgEnvelope env, CancellationToken ct) => HandleFrameAsync(A, env, ct);
|
||||
|
||||
@@ -57,6 +57,33 @@ public sealed class RealParticipant : IBattleParticipant, IHasHandshakePhase
|
||||
|
||||
public event Func<MsgEnvelope, CancellationToken, Task>? FrameEmitted;
|
||||
|
||||
private readonly TaskCompletionSource<bool> _sessionFinished
|
||||
= new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
/// <summary>Called by the second arriver's handler (in a finally block) after
|
||||
/// session.RunAsync completes. Signals the first arriver's handler that it can
|
||||
/// return and let the HTTP request complete (which closes the WS).</summary>
|
||||
internal void MarkSessionFinished() => _sessionFinished.TrySetResult(true);
|
||||
|
||||
/// <summary>Awaited by the first arriver's handler instead of calling RunAsync
|
||||
/// (the session already calls RunAsync on this instance from the second arriver's
|
||||
/// handler context — calling it twice would race the WS read loop). Returns when
|
||||
/// either MarkSessionFinished fires or the passed CT cancels.</summary>
|
||||
internal Task AwaitSessionFinishedAsync(CancellationToken ct)
|
||||
{
|
||||
if (_sessionFinished.Task.IsCompleted) return _sessionFinished.Task;
|
||||
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
var reg = ct.Register(() => tcs.TrySetCanceled(ct));
|
||||
_sessionFinished.Task.ContinueWith(t =>
|
||||
{
|
||||
reg.Dispose();
|
||||
if (t.IsCompletedSuccessfully) tcs.TrySetResult(true);
|
||||
else if (t.IsFaulted) tcs.TrySetException(t.Exception!.InnerExceptions);
|
||||
else tcs.TrySetCanceled();
|
||||
}, TaskContinuationOptions.ExecuteSynchronously);
|
||||
return tcs.Task;
|
||||
}
|
||||
|
||||
public RealParticipant(WebSocket ws, long viewerId, MatchContext context,
|
||||
ILogger<RealParticipant> log)
|
||||
{
|
||||
|
||||
@@ -127,6 +127,48 @@ public class RealParticipantTests
|
||||
"B's Phase must not change when A's Phase is set.");
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task AwaitSessionFinishedAsync_returns_when_MarkSessionFinished_fires()
|
||||
{
|
||||
var ws = new TestWebSocket();
|
||||
var p = new RealParticipant(ws, viewerId: 1, FixtureCtx(),
|
||||
NullLogger<RealParticipant>.Instance);
|
||||
|
||||
var awaiter = p.AwaitSessionFinishedAsync(CancellationToken.None);
|
||||
p.MarkSessionFinished();
|
||||
|
||||
await awaiter; // should complete promptly
|
||||
Assert.Pass();
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void AwaitSessionFinishedAsync_cancels_on_token()
|
||||
{
|
||||
var ws = new TestWebSocket();
|
||||
var p = new RealParticipant(ws, viewerId: 1, FixtureCtx(),
|
||||
NullLogger<RealParticipant>.Instance);
|
||||
|
||||
using var cts = new CancellationTokenSource();
|
||||
var awaiter = p.AwaitSessionFinishedAsync(cts.Token);
|
||||
cts.Cancel();
|
||||
|
||||
Assert.That(async () => await awaiter, Throws.InstanceOf<OperationCanceledException>());
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task MarkSessionFinished_is_idempotent()
|
||||
{
|
||||
var ws = new TestWebSocket();
|
||||
var p = new RealParticipant(ws, viewerId: 1, FixtureCtx(),
|
||||
NullLogger<RealParticipant>.Instance);
|
||||
|
||||
p.MarkSessionFinished();
|
||||
p.MarkSessionFinished(); // should not throw
|
||||
|
||||
await p.AwaitSessionFinishedAsync(CancellationToken.None);
|
||||
Assert.Pass();
|
||||
}
|
||||
|
||||
private static MatchContext FixtureCtx() => new(
|
||||
SelfDeckCardIds: Enumerable.Range(1, 30).Select(_ => 100_011_010L).ToList(),
|
||||
ClassId: "1", CharaId: "1", CardMasterName: "card_master_node_10015",
|
||||
|
||||
Reference in New Issue
Block a user