diff --git a/SVSim.UnitTests/BattleNode/Infrastructure/TestWebSocket.cs b/SVSim.UnitTests/BattleNode/Infrastructure/TestWebSocket.cs new file mode 100644 index 0000000..3a5975e --- /dev/null +++ b/SVSim.UnitTests/BattleNode/Infrastructure/TestWebSocket.cs @@ -0,0 +1,108 @@ +using System.Collections.Concurrent; +using System.Net.WebSockets; +using System.Threading.Channels; + +namespace SVSim.UnitTests.BattleNode.Infrastructure; + +/// +/// In-memory for driving +/// without booting the EmulatedEntrypoint host. Enqueue inbound frames with +/// ; outbound sends land in . Call +/// to signal end-of-stream — the next +/// returns a Close result so +/// exits its loop cleanly. +/// +public sealed class TestWebSocket : WebSocket +{ + private readonly Channel _incoming = Channel.CreateUnbounded(); + private readonly ConcurrentQueue _sends = new(); + private WebSocketState _state = WebSocketState.Open; + private TaskCompletionSource? _sendGate; + + public IReadOnlyCollection Sends => _sends.ToArray(); + + public override WebSocketCloseStatus? CloseStatus { get; } = null; + public override string? CloseStatusDescription { get; } = null; + public override WebSocketState State => _state; + public override string? SubProtocol { get; } = null; + + public void EnqueueIncoming(byte[] payload, WebSocketMessageType type) + { + _incoming.Writer.TryWrite(new TestWebSocketFrame(payload, type)); + } + + public void CompleteIncoming() + { + _incoming.Writer.TryComplete(); + } + + /// + /// Install a gate that blocks every subsequent call until + /// . Used by the cancellation regression test to suspend + /// a write while the caller cancels the session CT. + /// + public void BlockNextSend() + { + _sendGate = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + + public void ReleaseSends() + { + _sendGate?.TrySetResult(true); + _sendGate = null; + } + + public override async Task ReceiveAsync( + ArraySegment buffer, CancellationToken cancellationToken) + { + try + { + var frame = await _incoming.Reader.ReadAsync(cancellationToken); + var count = Math.Min(buffer.Count, frame.Payload.Length); + Array.Copy(frame.Payload, 0, buffer.Array!, buffer.Offset, count); + // Single-fragment frames only — the test never enqueues partial frames. + return new WebSocketReceiveResult(count, frame.Type, endOfMessage: true); + } + catch (ChannelClosedException) + { + _state = WebSocketState.Closed; + return new WebSocketReceiveResult(0, WebSocketMessageType.Close, endOfMessage: true); + } + } + + public override async Task SendAsync( + ArraySegment buffer, WebSocketMessageType messageType, + bool endOfMessage, CancellationToken cancellationToken) + { + if (_sendGate is not null) + { + // Race-aware: stash the gate locally because ReleaseSends nulls the field. + var gate = _sendGate; + using var reg = cancellationToken.Register(() => gate.TrySetCanceled(cancellationToken)); + await gate.Task; + } + var copy = new byte[buffer.Count]; + Array.Copy(buffer.Array!, buffer.Offset, copy, 0, buffer.Count); + _sends.Enqueue(new TestWebSocketFrame(copy, messageType)); + } + + public override Task CloseAsync( + WebSocketCloseStatus closeStatus, string? statusDescription, CancellationToken cancellationToken) + { + _state = WebSocketState.Closed; + return Task.CompletedTask; + } + + public override Task CloseOutputAsync( + WebSocketCloseStatus closeStatus, string? statusDescription, CancellationToken cancellationToken) + { + _state = WebSocketState.CloseSent; + return Task.CompletedTask; + } + + public override void Abort() => _state = WebSocketState.Aborted; + + public override void Dispose() { /* no-op */ } +} + +public sealed record TestWebSocketFrame(byte[] Payload, WebSocketMessageType Type);