diff --git a/SVSim.UnitTests/BattleNode/Sessions/BattleSessionPumpTests.cs b/SVSim.UnitTests/BattleNode/Sessions/BattleSessionPumpTests.cs new file mode 100644 index 0000000..106accd --- /dev/null +++ b/SVSim.UnitTests/BattleNode/Sessions/BattleSessionPumpTests.cs @@ -0,0 +1,230 @@ +using System.Net.WebSockets; +using System.Reflection; +using System.Text; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using NUnit.Framework; +using SVSim.BattleNode.Protocol; +using SVSim.BattleNode.Sessions; +using SVSim.BattleNode.Wire; +using SVSim.UnitTests.BattleNode.Infrastructure; + +namespace SVSim.UnitTests.BattleNode.Sessions; + +[TestFixture] +public class BattleSessionPumpTests +{ + // ---- T1a: structural contract ---- + + [Test] + public void DispatchSocketIo_ReturnsTask_NotAsyncVoid() + { + // Regression for the M3 fix: async void hides exceptions and lets two dispatches run + // concurrently. The fix is to return Task and await it in the read loop. This test + // locks the structural contract; if anyone reverts to async void, this fails before + // the behavioral tests do. + var method = typeof(BattleSession).GetMethod( + "DispatchSocketIo", + BindingFlags.NonPublic | BindingFlags.Instance); + Assert.That(method, Is.Not.Null, "DispatchSocketIo method must exist"); + Assert.That(method!.ReturnType, Is.EqualTo(typeof(Task)), + "DispatchSocketIo must return Task — async void breaks ordering + exception flow."); + } + + // ---- T1b: in-order dispatch ---- + + [Test] + [Timeout(10000)] + public async Task RunAsync_ProcessesTwoMessages_SendsResponsesInOrder() + { + var ws = new TestWebSocket(); + var session = new BattleSession( + ws: ws, battleId: "bid-pump", viewerId: 906243102, + log: NullLogger.Instance); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8)); + var runTask = session.RunAsync(cts.Token); + + // Eat the EIO3 open handshake (first text send from the session). + await WaitForSendCountAsync(ws, atLeast: 1, cts.Token); + + // Inbound 1: InitNetwork (expect ack + InitNetwork synchronize push). + await EnqueueMsgFrameAsync(ws, NetworkBattleUri.InitNetwork, pubSeq: 1, ackId: 1, + cat: EmitCategory.General); + + // Inbound 2: InitBattle (expect ack + Matched synchronize push). + await EnqueueMsgFrameAsync(ws, NetworkBattleUri.InitBattle, pubSeq: 2, ackId: 2, + cat: EmitCategory.Matching); + + ws.CompleteIncoming(); + await runTask; + + // Each inbound msg produces a SIO ack (text frame). With serial dispatch the + // two acks must come out in the same order as the inbound frames; concurrent + // dispatch could reorder them. This is best-effort smoke — it can pass even + // under concurrent dispatch if races happen to favor msg-1 — but it catches + // common reorderings. T1a (reflection on DispatchSocketIo's return type) is + // the structural lock. + var sends = ws.Sends.ToList(); + var ackTextIndices = sends + .Select((s, i) => (s, i)) + .Where(t => t.s.Type == WebSocketMessageType.Text && IsSioAckText(t.s.Payload)) + .Select(t => t.i) + .ToList(); + Assert.That(ackTextIndices.Count, Is.EqualTo(2), "Expected exactly two SIO acks."); + Assert.That(ackTextIndices[0], Is.LessThan(ackTextIndices[1]), + "Ack for msg-1 must precede ack for msg-2 — proves dispatches don't reorder under serial await."); + } + + // ---- T2: cancellation through send ---- + + [Test] + [Timeout(10000)] + public async Task EncodeAndSendAsync_CtCancelledDuringBlockedSend_PropagatesOperationCanceled() + { + var ws = new TestWebSocket(); + var session = new BattleSession( + ws: ws, battleId: "bid-cancel", viewerId: 906243102, + log: NullLogger.Instance); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8)); + var runTask = session.RunAsync(cts.Token); + + await WaitForSendCountAsync(ws, atLeast: 1, cts.Token); // EIO open handshake + + // Block the next send so the session is suspended mid-write. + ws.BlockNextSend(); + + // InitNetwork triggers an ack via SendSioAckAsync — that's the send that will block. + await EnqueueMsgFrameAsync(ws, NetworkBattleUri.InitNetwork, pubSeq: 1, ackId: 1, + cat: EmitCategory.General); + + // Give the pump time to reach the blocked send. + await Task.Delay(100, CancellationToken.None); + + // Cancel the session CT. The blocked send's await on the gate has registered for the + // CT and will throw OperationCanceledException. + cts.Cancel(); + + // RunAsync awaits the dispatch, which awaits the gated send. Cancelling the gate + // surfaces as OCE inside HandleMsgEventAsync's try/catch (it logs error and returns), + // then the next ReadCompleteMessageAsync sees the cancellation and exits. + await runTask; + // No assertion on exception type — the inner try/catch swallows it. The point of the + // test is that RunAsync TERMINATES rather than hanging indefinitely on the blocked send. + // If _sessionCt weren't threaded, the send would still be blocked on the gate and this + // test would timeout. + Assert.That(runTask.IsCompletedSuccessfully, Is.True, + "RunAsync must terminate after _sessionCt is cancelled, not hang on the blocked send."); + } + + // ---- T3: full-pump Md5 regression ---- + + [Test] + [Timeout(10000)] + public async Task PubSeqExceedingIntMax_ClipsAndDoesNotKillSession() + { + var ws = new TestWebSocket(); + var logCapture = new CapturingLogger(); + var session = new BattleSession( + ws: ws, battleId: "bid-clip", viewerId: 906243102, log: logCapture); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8)); + var runTask = session.RunAsync(cts.Token); + + await WaitForSendCountAsync(ws, atLeast: 1, cts.Token); + + await EnqueueMsgFrameAsync(ws, NetworkBattleUri.InitNetwork, + pubSeq: (long)int.MaxValue + 1L, ackId: 1, cat: EmitCategory.General); + + ws.CompleteIncoming(); + await runTask; + + // Expect: at least one warning logged about clipping, AND the ack send happened. + Assert.That(logCapture.WarningCount, Is.GreaterThanOrEqualTo(1), + "Clip path must log a warning."); + Assert.That(logCapture.Warnings.Any(m => m.Contains("clipping")), + Is.True, $"Expected a 'clipping' warning. Got: {string.Join(" | ", logCapture.Warnings)}"); + Assert.That(ws.Sends.Any(f => f.Type == WebSocketMessageType.Text && IsSioAckText(f.Payload)), + Is.True, "Ack must still be sent after clipping."); + } + + // ---- helpers ---- + + private static async Task WaitForSendCountAsync(TestWebSocket ws, int atLeast, CancellationToken ct) + { + var sw = System.Diagnostics.Stopwatch.StartNew(); + while (ws.Sends.Count < atLeast) + { + if (sw.ElapsedMilliseconds > 5000) + throw new TimeoutException($"WS did not produce {atLeast} sends within 5s."); + await Task.Delay(10, ct); + } + } + + private static bool IsSioAckText(byte[] payload) + { + // SIO ack text frame starts with EIO Message digit '4' + SIO Ack digit '3'. + if (payload.Length < 2) return false; + return payload[0] == (byte)'4' && payload[1] == (byte)'3'; + } + + /// + /// Encode and enqueue a synthetic msg event frame for the pump to receive. + /// Pump shape: EIO text "4{sio-text}", then a single binary attachment containing the + /// msgpack-encoded encrypted payload. + /// + private static async Task EnqueueMsgFrameAsync( + TestWebSocket ws, NetworkBattleUri uri, long pubSeq, int ackId, EmitCategory cat) + { + var key = MakeKey(); + var env = new MsgEnvelope( + uri, ViewerId: 906243102, Uuid: "udid-test", Bid: null, Try: 0, Cat: cat, + PubSeq: pubSeq, PlaySeq: null, + Body: new RawBody(new Dictionary())); + var encryptedBytes = MsgPayloadCodec.Encode(env, key); + + // SIO BinaryEvent with one attachment, name "msg", with the ack id. + var sio = SocketIoFrame.BinaryEventWithAttachments(eventName: "msg", attachments: new[] { encryptedBytes }); + var (sioTextOriginal, bins) = sio.Encode(); + // sioTextOriginal looks like "51-[\"msg\",{\"_placeholder\":true,\"num\":0}]". + // Splice ackId before the '['. + var bracketIdx = sioTextOriginal.IndexOf('['); + var sioTextWithAck = sioTextOriginal.Substring(0, bracketIdx) + ackId + sioTextOriginal.Substring(bracketIdx); + + var eioText = $"{(int)EngineIoPacketType.Message}{sioTextWithAck}"; + ws.EnqueueIncoming(Encoding.UTF8.GetBytes(eioText), WebSocketMessageType.Text); + // EIO3 prefixes binary frames with the Message packet-type byte. + var prefixed = new byte[bins[0].Length + 1]; + prefixed[0] = (byte)EngineIoPacketType.Message; + Buffer.BlockCopy(bins[0], 0, prefixed, 1, bins[0].Length); + ws.EnqueueIncoming(prefixed, WebSocketMessageType.Binary); + await Task.Yield(); + } + + private static string MakeKey() + { + var seq = 0; + return NodeCrypto.GenerateKey(() => (seq++ * 7) % 16); + } + + private sealed class CapturingLogger : ILogger + { + public List Warnings { get; } = new(); + public int WarningCount => Warnings.Count; + + public IDisposable BeginScope(TState state) where TState : notnull => NullDisposable.Instance; + public bool IsEnabled(LogLevel logLevel) => true; + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, + Func formatter) + { + if (logLevel == LogLevel.Warning) Warnings.Add(formatter(state, exception)); + } + } + + private sealed class NullDisposable : IDisposable + { + public static readonly NullDisposable Instance = new(); + public void Dispose() { } + } +}