From 77c99cc2302771855681556cd8a4fb3379346e9d Mon Sep 17 00:00:00 2001 From: gamer147 Date: Thu, 4 Jun 2026 21:00:41 -0400 Subject: [PATCH] fix(battle-node): serialize per-session dispatch to stop cross-thread state race In PvP a BattleSession subscribes to both participants' FrameEmitted, and each RealParticipant raises it from its own WebSocket read loop -- two threads. The dispatch path (ComputeFrames + the relay PushAsync calls) mutates shared, non-thread-safe state: the BattleSessionState dictionaries (deck maps, post-swap hands, idx->cardId reveal map). Concurrent frames from both players could corrupt those dictionaries (InvalidOperationException / torn playSeq / wrong card identity). Add a per-session SemaphoreSlim _dispatchGate around the whole HandleFrameAsync so both read loops funnel through one critical section. ComputeFrames stays lock-free (the direct-call test seam is single-threaded). Analysis during the fix showed each OutboundSequencer is single-writer-per-instance in steady state (A's loop only writes B's Outbound and vice-versa), so the live race is the shared BattleSessionState, which the gate fully serializes. TDD: BattleSessionDispatchConcurrencyTests drives both participants to AfterReady, then fires TurnStart from both at once; the target PushAsync records peak in-flight dispatches. Red (MaxConcurrent=2) before the gate, green (1) after. Co-Authored-By: Claude Opus 4.8 --- SVSim.BattleNode/Sessions/BattleSession.cs | 12 ++ .../BattleSessionDispatchConcurrencyTests.cs | 124 ++++++++++++++++++ 2 files changed, 136 insertions(+) create mode 100644 SVSim.UnitTests/BattleNode/Sessions/BattleSessionDispatchConcurrencyTests.cs diff --git a/SVSim.BattleNode/Sessions/BattleSession.cs b/SVSim.BattleNode/Sessions/BattleSession.cs index 46bbfcf..e7ae2b0 100644 --- a/SVSim.BattleNode/Sessions/BattleSession.cs +++ b/SVSim.BattleNode/Sessions/BattleSession.cs @@ -22,6 +22,13 @@ public sealed class BattleSession private readonly BattleSessionState _state = new(); + /// Serializes dispatch. Both participants' read loops raise FrameEmitted on their own + /// threads, and a dispatch ( + the relay PushAsync calls) mutates + /// shared, non-thread-safe state — the dictionaries and each + /// participant's OutboundSequencer. This gate funnels both threads through one critical + /// section so concurrent frames can't corrupt that state. + private readonly SemaphoreSlim _dispatchGate = new(1, 1); + /// The per-battle master seed (see ). /// Exposed for logging + future replay persistence. public int MasterSeed => _state.MasterSeed; @@ -146,6 +153,7 @@ public sealed class BattleSession private async Task HandleFrameAsync(IBattleParticipant from, MsgEnvelope env, CancellationToken ct) { + await _dispatchGate.WaitAsync(ct).ConfigureAwait(false); try { var routes = ComputeFrames(from, env); @@ -158,6 +166,10 @@ public sealed class BattleSession { _log.LogError(ex, "BattleSession {Bid}: unhandled in HandleFrameAsync", BattleId); } + finally + { + _dispatchGate.Release(); + } } /// diff --git a/SVSim.UnitTests/BattleNode/Sessions/BattleSessionDispatchConcurrencyTests.cs b/SVSim.UnitTests/BattleNode/Sessions/BattleSessionDispatchConcurrencyTests.cs new file mode 100644 index 0000000..7333da5 --- /dev/null +++ b/SVSim.UnitTests/BattleNode/Sessions/BattleSessionDispatchConcurrencyTests.cs @@ -0,0 +1,124 @@ +using System.Linq; +using Microsoft.Extensions.Logging.Abstractions; +using NUnit.Framework; +using SVSim.BattleNode.Bridge; +using SVSim.BattleNode.Protocol; +using SVSim.BattleNode.Sessions; +using SVSim.BattleNode.Sessions.Participants; + +namespace SVSim.UnitTests.BattleNode.Sessions; + +/// +/// In PvP a subscribes to BOTH participants' FrameEmitted, and each +/// RealParticipant raises it from its own WebSocket read loop — i.e. two threads. The dispatch path +/// (ComputeFrames + the relay PushAsync calls) mutates shared, non-thread-safe session state, so it +/// must be serialized per session. This drives the two participants' dispatch concurrently and asserts +/// no two dispatches ever overlap. +/// +[TestFixture] +public class BattleSessionDispatchConcurrencyTests +{ + [Test] + public async Task Concurrent_dispatch_from_both_participants_is_serialized() + { + var detector = new ConcurrencyDetector(); + var a = new ProbeParticipant(1001, CtxA(), detector); + var b = new ProbeParticipant(2002, CtxB(), detector); + var s = new BattleSession("bid-conc", BattleType.Pvp, a, b, NullLogger.Instance); + + // Reach AfterReady single-threaded (ComputeFrames returns routes but never calls PushAsync, + // so the detector is untouched during setup). + DriveToAfterReady(s, a); + DriveToAfterReady(s, b); + + detector.Arm(); + + // Fire a gameplay frame from each side at the same instant. A's TurnStart routes to B.PushAsync + // and B's to A.PushAsync, so both dispatches run their PushAsync concurrently unless the session + // serializes them. + using var gate = new ManualResetEventSlim(false); + var ta = Task.Run(async () => { gate.Wait(); await a.RaiseAsync(Env(NetworkBattleUri.TurnStart)); }); + var tb = Task.Run(async () => { gate.Wait(); await b.RaiseAsync(Env(NetworkBattleUri.TurnStart)); }); + gate.Set(); + await Task.WhenAll(ta, tb); + + Assert.That(detector.MaxConcurrent, Is.EqualTo(1), + "Two read-loop threads dispatched into shared session state concurrently; " + + "HandleFrameAsync must serialize dispatch per session."); + } + + private static void DriveToAfterReady(BattleSession s, ProbeParticipant p) + { + s.ComputeFrames(p, Env(NetworkBattleUri.InitNetwork)); + s.ComputeFrames(p, Env(NetworkBattleUri.InitBattle)); + s.ComputeFrames(p, Env(NetworkBattleUri.Loaded)); + s.ComputeFrames(p, Env(NetworkBattleUri.Swap)); + } + + private static MsgEnvelope Env(NetworkBattleUri uri) => + new(uri, ViewerId: 1, Uuid: "u", Bid: null, Try: 0, + Cat: EmitCategory.Battle, PubSeq: null, PlaySeq: null, + Body: new RawBody(new Dictionary())); + + private static MatchContext CtxA() => new( + SelfDeckCardIds: Enumerable.Range(1, 30).Select(_ => 100_011_010L).ToList(), + ClassId: "3", CharaId: "3", CardMasterName: "card_master_node_10015", + CountryCode: "KOR", UserName: "PlayerA", SleeveId: "3000011", + EmblemId: "701441011", DegreeId: "300003", FieldId: 43, IsOfficial: 0, BattleType: 11); + + private static MatchContext CtxB() => new( + SelfDeckCardIds: Enumerable.Range(1, 30).Select(_ => 200_011_010L).ToList(), + ClassId: "5", CharaId: "5", CardMasterName: "card_master_node_10015", + CountryCode: "JPN", UserName: "PlayerB", SleeveId: "3000022", + EmblemId: "701441022", DegreeId: "300004", FieldId: 44, IsOfficial: 0, BattleType: 11); + + /// Tracks the peak number of dispatches in flight at once. Records the count under a + /// short lock, then holds (outside the lock) to widen the overlap window so a serialization bug + /// is observed deterministically rather than relied on to interleave by chance. + private sealed class ConcurrencyDetector + { + private const int WidenMs = 50; + private readonly object _lock = new(); + private int _current; + private volatile bool _armed; + public int MaxConcurrent { get; private set; } + + public void Arm() => _armed = true; + + public async Task EnterAsync() + { + if (!_armed) return; + lock (_lock) + { + _current++; + if (_current > MaxConcurrent) MaxConcurrent = _current; + } + await Task.Delay(WidenMs); + lock (_lock) { _current--; } + } + } + + private sealed class ProbeParticipant : IBattleParticipant, IHasHandshakePhase + { + private readonly ConcurrencyDetector _detector; + public long ViewerId { get; } + public MatchContext Context { get; } + public BattleSessionPhase Phase { get; set; } = BattleSessionPhase.AwaitingInitNetwork; + public event Func? FrameEmitted; + + public ProbeParticipant(long viewerId, MatchContext context, ConcurrencyDetector detector) + { + ViewerId = viewerId; + Context = context; + _detector = detector; + } + + public Task RaiseAsync(MsgEnvelope env) => + FrameEmitted?.Invoke(env, CancellationToken.None) ?? Task.CompletedTask; + + public Task PushAsync(MsgEnvelope env, bool noStock, CancellationToken ct) => _detector.EnterAsync(); + public Task RunAsync(CancellationToken ct) => Task.CompletedTask; + public Task TerminateAsync(BattleFinishReason reason) => Task.CompletedTask; + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + } +}