using System.Net.WebSockets; using System.Reflection; using System.Text; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using NUnit.Framework; using SVSim.BattleNode.Bridge; using SVSim.BattleNode.Protocol; using SVSim.BattleNode.Sessions; using SVSim.BattleNode.Wire; using SVSim.UnitTests.BattleNode.Infrastructure; namespace SVSim.UnitTests.BattleNode.Sessions; [TestFixture] public class BattleSessionPumpTests { // ---- T1a: structural contract ---- [Test] public void DispatchSocketIo_ReturnsTask_NotAsyncVoid() { // Regression for the M3 fix: async void hides exceptions and lets two dispatches run // concurrently. The fix is to return Task and await it in the read loop. This test // locks the structural contract; if anyone reverts to async void, this fails before // the behavioral tests do. var method = typeof(BattleSession).GetMethod( "DispatchSocketIo", BindingFlags.NonPublic | BindingFlags.Instance); Assert.That(method, Is.Not.Null, "DispatchSocketIo method must exist"); Assert.That(method!.ReturnType, Is.EqualTo(typeof(Task)), "DispatchSocketIo must return Task — async void breaks ordering + exception flow."); } // ---- T1b: in-order dispatch ---- [Test] [Timeout(10000)] public async Task RunAsync_ProcessesTwoMessages_SendsResponsesInOrder() { var ws = new TestWebSocket(); var session = new BattleSession( ws: ws, battleId: "bid-pump", viewerId: 906243102, context: FixtureCtx(), log: NullLogger.Instance); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8)); var runTask = session.RunAsync(cts.Token); // Eat the EIO3 open handshake (first text send from the session). await WaitForSendCountAsync(ws, atLeast: 1, cts.Token); // Inbound 1: InitNetwork (expect ack + InitNetwork synchronize push). await EnqueueMsgFrameAsync(ws, NetworkBattleUri.InitNetwork, pubSeq: 1, ackId: 1, cat: EmitCategory.General); // Inbound 2: InitBattle (expect ack + Matched synchronize push). await EnqueueMsgFrameAsync(ws, NetworkBattleUri.InitBattle, pubSeq: 2, ackId: 2, cat: EmitCategory.Matching); ws.CompleteIncoming(); await runTask; // Each inbound msg produces a SIO ack (text frame). With serial dispatch the // two acks must come out in the same order as the inbound frames; concurrent // dispatch could reorder them. This is best-effort smoke — it can pass even // under concurrent dispatch if races happen to favor msg-1 — but it catches // common reorderings. T1a (reflection on DispatchSocketIo's return type) is // the structural lock. var sends = ws.Sends.ToList(); var ackTextIndices = sends .Select((s, i) => (s, i)) .Where(t => t.s.Type == WebSocketMessageType.Text && IsSioAckText(t.s.Payload)) .Select(t => t.i) .ToList(); Assert.That(ackTextIndices.Count, Is.EqualTo(2), "Expected exactly two SIO acks."); Assert.That(ackTextIndices[0], Is.LessThan(ackTextIndices[1]), "Ack for msg-1 must precede ack for msg-2 — proves dispatches don't reorder under serial await."); } // ---- T2: cancellation through send ---- [Test] [Timeout(10000)] public async Task EncodeAndSendAsync_CtCancelledDuringBlockedSend_PropagatesOperationCanceled() { var ws = new TestWebSocket(); var session = new BattleSession( ws: ws, battleId: "bid-cancel", viewerId: 906243102, context: FixtureCtx(), log: NullLogger.Instance); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8)); var runTask = session.RunAsync(cts.Token); await WaitForSendCountAsync(ws, atLeast: 1, cts.Token); // EIO open handshake // Block the next send so the session is suspended mid-write. ws.BlockNextSend(); // InitNetwork triggers an ack via SendSioAckAsync — that's the send that will block. await EnqueueMsgFrameAsync(ws, NetworkBattleUri.InitNetwork, pubSeq: 1, ackId: 1, cat: EmitCategory.General); // Give the pump time to reach the blocked send. await Task.Delay(100, CancellationToken.None); // Cancel the session CT. The blocked send's await on the gate has registered for the // CT and will throw OperationCanceledException. cts.Cancel(); // RunAsync awaits the dispatch, which awaits the gated send. Cancelling the gate // surfaces as OCE inside HandleMsgEventAsync's try/catch (it logs error and returns), // then the next ReadCompleteMessageAsync sees the cancellation and exits. await runTask; // No assertion on exception type — the inner try/catch swallows it. The point of the // test is that RunAsync TERMINATES rather than hanging indefinitely on the blocked send. // If _sessionCt weren't threaded, the send would still be blocked on the gate and this // test would timeout. Assert.That(runTask.IsCompletedSuccessfully, Is.True, "RunAsync must terminate after _sessionCt is cancelled, not hang on the blocked send."); } // ---- T3: full-pump Md5 regression ---- [Test] [Timeout(10000)] public async Task PubSeqExceedingIntMax_ClipsAndDoesNotKillSession() { var ws = new TestWebSocket(); var logCapture = new CapturingLogger(); var session = new BattleSession( ws: ws, battleId: "bid-clip", viewerId: 906243102, context: FixtureCtx(), log: logCapture); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8)); var runTask = session.RunAsync(cts.Token); await WaitForSendCountAsync(ws, atLeast: 1, cts.Token); await EnqueueMsgFrameAsync(ws, NetworkBattleUri.InitNetwork, pubSeq: (long)int.MaxValue + 1L, ackId: 1, cat: EmitCategory.General); ws.CompleteIncoming(); await runTask; // Expect: at least one warning logged about clipping, AND the ack send happened. Assert.That(logCapture.WarningCount, Is.GreaterThanOrEqualTo(1), "Clip path must log a warning."); Assert.That(logCapture.Warnings.Any(m => m.Contains("clipping")), Is.True, $"Expected a 'clipping' warning. Got: {string.Join(" | ", logCapture.Warnings)}"); Assert.That(ws.Sends.Any(f => f.Type == WebSocketMessageType.Text && IsSioAckText(f.Payload)), Is.True, "Ack must still be sent after clipping."); } // ---- helpers ---- private static async Task WaitForSendCountAsync(TestWebSocket ws, int atLeast, CancellationToken ct) { var sw = System.Diagnostics.Stopwatch.StartNew(); while (ws.Sends.Count < atLeast) { if (sw.ElapsedMilliseconds > 5000) throw new TimeoutException($"WS did not produce {atLeast} sends within 5s."); await Task.Delay(10, ct); } } private static bool IsSioAckText(byte[] payload) { // SIO ack text frame starts with EIO Message digit '4' + SIO Ack digit '3'. if (payload.Length < 2) return false; return payload[0] == (byte)'4' && payload[1] == (byte)'3'; } /// /// Encode and enqueue a synthetic msg event frame for the pump to receive. /// Pump shape: EIO text "4{sio-text}", then a single binary attachment containing the /// msgpack-encoded encrypted payload. /// private static async Task EnqueueMsgFrameAsync( TestWebSocket ws, NetworkBattleUri uri, long pubSeq, int ackId, EmitCategory cat) { var key = MakeKey(); var env = new MsgEnvelope( uri, ViewerId: 906243102, Uuid: "udid-test", Bid: null, Try: 0, Cat: cat, PubSeq: pubSeq, PlaySeq: null, Body: new RawBody(new Dictionary())); var encryptedBytes = MsgPayloadCodec.Encode(env, key); // SIO BinaryEvent with one attachment, name "msg", with the ack id. var sio = SocketIoFrame.BinaryEventWithAttachments(eventName: "msg", attachments: new[] { encryptedBytes }); var (sioTextOriginal, bins) = sio.Encode(); // sioTextOriginal looks like "51-[\"msg\",{\"_placeholder\":true,\"num\":0}]". // Splice ackId before the '['. var bracketIdx = sioTextOriginal.IndexOf('['); var sioTextWithAck = sioTextOriginal.Substring(0, bracketIdx) + ackId + sioTextOriginal.Substring(bracketIdx); var eioText = $"{(int)EngineIoPacketType.Message}{sioTextWithAck}"; ws.EnqueueIncoming(Encoding.UTF8.GetBytes(eioText), WebSocketMessageType.Text); // EIO3 prefixes binary frames with the Message packet-type byte. var prefixed = new byte[bins[0].Length + 1]; prefixed[0] = (byte)EngineIoPacketType.Message; Buffer.BlockCopy(bins[0], 0, prefixed, 1, bins[0].Length); ws.EnqueueIncoming(prefixed, WebSocketMessageType.Binary); await Task.Yield(); } private static string MakeKey() { var seq = 0; return NodeCrypto.GenerateKey(() => (seq++ * 7) % 16); } private static MatchContext FixtureCtx() => new( SelfDeckCardIds: Enumerable.Range(1, 30).Select(i => 100_011_010L).ToList(), ClassId: "1", CharaId: "1", CardMasterName: "card_master_node_10015", CountryCode: "KOR", UserName: "Player", SleeveId: "3000011", EmblemId: "701441011", DegreeId: "300003", FieldId: 43, IsOfficial: 0, BattleType: 11); private sealed class CapturingLogger : ILogger { public List Warnings { get; } = new(); public int WarningCount => Warnings.Count; public IDisposable BeginScope(TState state) where TState : notnull => NullDisposable.Instance; public bool IsEnabled(LogLevel logLevel) => true; public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) { if (logLevel == LogLevel.Warning) Warnings.Add(formatter(state, exception)); } } private sealed class NullDisposable : IDisposable { public static readonly NullDisposable Instance = new(); public void Dispose() { } } }