test(battle-node): pump-level tests for async-Task dispatch, CT, Md5 clip

This commit is contained in:
gamer147
2026-06-01 11:15:10 -04:00
parent 21b7ddf6ae
commit e4fbb155e4

View File

@@ -0,0 +1,230 @@
using System.Net.WebSockets;
using System.Reflection;
using System.Text;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using NUnit.Framework;
using SVSim.BattleNode.Protocol;
using SVSim.BattleNode.Sessions;
using SVSim.BattleNode.Wire;
using SVSim.UnitTests.BattleNode.Infrastructure;
namespace SVSim.UnitTests.BattleNode.Sessions;
[TestFixture]
public class BattleSessionPumpTests
{
// ---- T1a: structural contract ----
[Test]
public void DispatchSocketIo_ReturnsTask_NotAsyncVoid()
{
// Regression for the M3 fix: async void hides exceptions and lets two dispatches run
// concurrently. The fix is to return Task and await it in the read loop. This test
// locks the structural contract; if anyone reverts to async void, this fails before
// the behavioral tests do.
var method = typeof(BattleSession).GetMethod(
"DispatchSocketIo",
BindingFlags.NonPublic | BindingFlags.Instance);
Assert.That(method, Is.Not.Null, "DispatchSocketIo method must exist");
Assert.That(method!.ReturnType, Is.EqualTo(typeof(Task)),
"DispatchSocketIo must return Task — async void breaks ordering + exception flow.");
}
// ---- T1b: in-order dispatch ----
[Test]
[Timeout(10000)]
public async Task RunAsync_ProcessesTwoMessages_SendsResponsesInOrder()
{
var ws = new TestWebSocket();
var session = new BattleSession(
ws: ws, battleId: "bid-pump", viewerId: 906243102,
log: NullLogger<BattleSession>.Instance);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8));
var runTask = session.RunAsync(cts.Token);
// Eat the EIO3 open handshake (first text send from the session).
await WaitForSendCountAsync(ws, atLeast: 1, cts.Token);
// Inbound 1: InitNetwork (expect ack + InitNetwork synchronize push).
await EnqueueMsgFrameAsync(ws, NetworkBattleUri.InitNetwork, pubSeq: 1, ackId: 1,
cat: EmitCategory.General);
// Inbound 2: InitBattle (expect ack + Matched synchronize push).
await EnqueueMsgFrameAsync(ws, NetworkBattleUri.InitBattle, pubSeq: 2, ackId: 2,
cat: EmitCategory.Matching);
ws.CompleteIncoming();
await runTask;
// Each inbound msg produces a SIO ack (text frame). With serial dispatch the
// two acks must come out in the same order as the inbound frames; concurrent
// dispatch could reorder them. This is best-effort smoke — it can pass even
// under concurrent dispatch if races happen to favor msg-1 — but it catches
// common reorderings. T1a (reflection on DispatchSocketIo's return type) is
// the structural lock.
var sends = ws.Sends.ToList();
var ackTextIndices = sends
.Select((s, i) => (s, i))
.Where(t => t.s.Type == WebSocketMessageType.Text && IsSioAckText(t.s.Payload))
.Select(t => t.i)
.ToList();
Assert.That(ackTextIndices.Count, Is.EqualTo(2), "Expected exactly two SIO acks.");
Assert.That(ackTextIndices[0], Is.LessThan(ackTextIndices[1]),
"Ack for msg-1 must precede ack for msg-2 — proves dispatches don't reorder under serial await.");
}
// ---- T2: cancellation through send ----
[Test]
[Timeout(10000)]
public async Task EncodeAndSendAsync_CtCancelledDuringBlockedSend_PropagatesOperationCanceled()
{
var ws = new TestWebSocket();
var session = new BattleSession(
ws: ws, battleId: "bid-cancel", viewerId: 906243102,
log: NullLogger<BattleSession>.Instance);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8));
var runTask = session.RunAsync(cts.Token);
await WaitForSendCountAsync(ws, atLeast: 1, cts.Token); // EIO open handshake
// Block the next send so the session is suspended mid-write.
ws.BlockNextSend();
// InitNetwork triggers an ack via SendSioAckAsync — that's the send that will block.
await EnqueueMsgFrameAsync(ws, NetworkBattleUri.InitNetwork, pubSeq: 1, ackId: 1,
cat: EmitCategory.General);
// Give the pump time to reach the blocked send.
await Task.Delay(100, CancellationToken.None);
// Cancel the session CT. The blocked send's await on the gate has registered for the
// CT and will throw OperationCanceledException.
cts.Cancel();
// RunAsync awaits the dispatch, which awaits the gated send. Cancelling the gate
// surfaces as OCE inside HandleMsgEventAsync's try/catch (it logs error and returns),
// then the next ReadCompleteMessageAsync sees the cancellation and exits.
await runTask;
// No assertion on exception type — the inner try/catch swallows it. The point of the
// test is that RunAsync TERMINATES rather than hanging indefinitely on the blocked send.
// If _sessionCt weren't threaded, the send would still be blocked on the gate and this
// test would timeout.
Assert.That(runTask.IsCompletedSuccessfully, Is.True,
"RunAsync must terminate after _sessionCt is cancelled, not hang on the blocked send.");
}
// ---- T3: full-pump Md5 regression ----
[Test]
[Timeout(10000)]
public async Task PubSeqExceedingIntMax_ClipsAndDoesNotKillSession()
{
var ws = new TestWebSocket();
var logCapture = new CapturingLogger();
var session = new BattleSession(
ws: ws, battleId: "bid-clip", viewerId: 906243102, log: logCapture);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8));
var runTask = session.RunAsync(cts.Token);
await WaitForSendCountAsync(ws, atLeast: 1, cts.Token);
await EnqueueMsgFrameAsync(ws, NetworkBattleUri.InitNetwork,
pubSeq: (long)int.MaxValue + 1L, ackId: 1, cat: EmitCategory.General);
ws.CompleteIncoming();
await runTask;
// Expect: at least one warning logged about clipping, AND the ack send happened.
Assert.That(logCapture.WarningCount, Is.GreaterThanOrEqualTo(1),
"Clip path must log a warning.");
Assert.That(logCapture.Warnings.Any(m => m.Contains("clipping")),
Is.True, $"Expected a 'clipping' warning. Got: {string.Join(" | ", logCapture.Warnings)}");
Assert.That(ws.Sends.Any(f => f.Type == WebSocketMessageType.Text && IsSioAckText(f.Payload)),
Is.True, "Ack must still be sent after clipping.");
}
// ---- helpers ----
private static async Task WaitForSendCountAsync(TestWebSocket ws, int atLeast, CancellationToken ct)
{
var sw = System.Diagnostics.Stopwatch.StartNew();
while (ws.Sends.Count < atLeast)
{
if (sw.ElapsedMilliseconds > 5000)
throw new TimeoutException($"WS did not produce {atLeast} sends within 5s.");
await Task.Delay(10, ct);
}
}
private static bool IsSioAckText(byte[] payload)
{
// SIO ack text frame starts with EIO Message digit '4' + SIO Ack digit '3'.
if (payload.Length < 2) return false;
return payload[0] == (byte)'4' && payload[1] == (byte)'3';
}
/// <summary>
/// Encode and enqueue a synthetic msg event frame for the pump to receive.
/// Pump shape: EIO text "4{sio-text}", then a single binary attachment containing the
/// msgpack-encoded encrypted payload.
/// </summary>
private static async Task EnqueueMsgFrameAsync(
TestWebSocket ws, NetworkBattleUri uri, long pubSeq, int ackId, EmitCategory cat)
{
var key = MakeKey();
var env = new MsgEnvelope(
uri, ViewerId: 906243102, Uuid: "udid-test", Bid: null, Try: 0, Cat: cat,
PubSeq: pubSeq, PlaySeq: null,
Body: new RawBody(new Dictionary<string, object?>()));
var encryptedBytes = MsgPayloadCodec.Encode(env, key);
// SIO BinaryEvent with one attachment, name "msg", with the ack id.
var sio = SocketIoFrame.BinaryEventWithAttachments(eventName: "msg", attachments: new[] { encryptedBytes });
var (sioTextOriginal, bins) = sio.Encode();
// sioTextOriginal looks like "51-[\"msg\",{\"_placeholder\":true,\"num\":0}]".
// Splice ackId before the '['.
var bracketIdx = sioTextOriginal.IndexOf('[');
var sioTextWithAck = sioTextOriginal.Substring(0, bracketIdx) + ackId + sioTextOriginal.Substring(bracketIdx);
var eioText = $"{(int)EngineIoPacketType.Message}{sioTextWithAck}";
ws.EnqueueIncoming(Encoding.UTF8.GetBytes(eioText), WebSocketMessageType.Text);
// EIO3 prefixes binary frames with the Message packet-type byte.
var prefixed = new byte[bins[0].Length + 1];
prefixed[0] = (byte)EngineIoPacketType.Message;
Buffer.BlockCopy(bins[0], 0, prefixed, 1, bins[0].Length);
ws.EnqueueIncoming(prefixed, WebSocketMessageType.Binary);
await Task.Yield();
}
private static string MakeKey()
{
var seq = 0;
return NodeCrypto.GenerateKey(() => (seq++ * 7) % 16);
}
private sealed class CapturingLogger : ILogger<BattleSession>
{
public List<string> Warnings { get; } = new();
public int WarningCount => Warnings.Count;
public IDisposable BeginScope<TState>(TState state) where TState : notnull => NullDisposable.Instance;
public bool IsEnabled(LogLevel logLevel) => true;
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception? exception,
Func<TState, Exception?, string> formatter)
{
if (logLevel == LogLevel.Warning) Warnings.Add(formatter(state, exception));
}
}
private sealed class NullDisposable : IDisposable
{
public static readonly NullDisposable Instance = new();
public void Dispose() { }
}
}