From acd0997cfbef76750e5e620fb980d5285fb5246e Mon Sep 17 00:00:00 2001 From: gamer147 Date: Mon, 1 Jun 2026 19:57:45 -0400 Subject: [PATCH] feat(battle-node): add RealParticipant wrapping WS + sequencers Lifts the WS read loop, SIO encode/decode, per-WS OutboundSequencer + InboundTracker, and SIO ack out of BattleSession into a participant. PushAsync(noStock=false) assigns playSeq via the sequencer; noStock=true bypasses it. FrameEmitted fires on each deduplicated inbound envelope. The existing BattleSession keeps its own copy of the WS code for now; Task 9 cuts the handler over to use BattleSessionV2 + RealParticipant and Task 10 deletes the old BattleSession + duplicate code. --- .../Sessions/Participants/RealParticipant.cs | 262 ++++++++++++++++++ .../Participants/RealParticipantTests.cs | 69 +++++ 2 files changed, 331 insertions(+) create mode 100644 SVSim.BattleNode/Sessions/Participants/RealParticipant.cs create mode 100644 SVSim.UnitTests/BattleNode/Sessions/Participants/RealParticipantTests.cs diff --git a/SVSim.BattleNode/Sessions/Participants/RealParticipant.cs b/SVSim.BattleNode/Sessions/Participants/RealParticipant.cs new file mode 100644 index 0000000..d2ec4d9 --- /dev/null +++ b/SVSim.BattleNode/Sessions/Participants/RealParticipant.cs @@ -0,0 +1,262 @@ +using System.Net.WebSockets; +using System.Security.Cryptography; +using System.Text; +using Microsoft.Extensions.Logging; +using SVSim.BattleNode.Bridge; +using SVSim.BattleNode.Protocol; +using SVSim.BattleNode.Protocol.Bodies; +using SVSim.BattleNode.Reliability; +using SVSim.BattleNode.Wire; + +namespace SVSim.BattleNode.Sessions.Participants; + +/// +/// WS-backed participant. Owns the WS read loop, SIO encoding/decoding, per-WS +/// + . Fires +/// on each deduplicated inbound . +/// PushAsync encodes + sends; ordered pushes get a playSeq from the sequencer, +/// no-stock control pushes bypass it. +/// +public sealed class RealParticipant : IBattleParticipant +{ + private readonly WebSocket _ws; + private readonly ILogger _log; + private CancellationToken _sessionCt; + + public long ViewerId { get; } + public MatchContext Context { get; } + public InboundTracker Inbound { get; } = new(); + public OutboundSequencer Outbound { get; } = new(); + + public event Func? FrameEmitted; + + public RealParticipant(WebSocket ws, long viewerId, MatchContext context, + ILogger log) + { + _ws = ws; + _log = log; + ViewerId = viewerId; + Context = context; + } + + public async Task RunAsync(CancellationToken cancellation) + { + _sessionCt = cancellation; + await SendEioOpenAsync(cancellation); + + var buffer = new byte[8192]; + var pendingAttachments = new List(); + SocketIoFrame? pendingFrame = null; + + while (_ws.State == WebSocketState.Open && !cancellation.IsCancellationRequested) + { + var msg = await ReadCompleteMessageAsync(buffer, cancellation); + if (msg is null) break; + + if (msg.Value.IsText) + { + var text = Encoding.UTF8.GetString(msg.Value.Bytes); + if (text.Length == 0) continue; + var eio = EngineIoFrame.Parse(text); + if (eio.Type == EngineIoPacketType.Ping) + { + await SendTextAsync("3", cancellation); + continue; + } + if (eio.Type != EngineIoPacketType.Message) continue; + + var sio = SocketIoFrame.Parse(eio.Payload); + if (sio.AttachmentCount > 0) + { + pendingFrame = sio; + pendingAttachments.Clear(); + continue; + } + await DispatchSocketIo(sio); + } + else + { + var bin = msg.Value.Bytes; + if (bin.Length > 0 && bin[0] == (byte)EngineIoPacketType.Message) + { + bin = bin.AsSpan(1).ToArray(); + } + pendingAttachments.Add(bin); + if (pendingFrame is not null && pendingAttachments.Count == pendingFrame.AttachmentCount) + { + var assembled = pendingFrame.WithAttachments(pendingAttachments.ToArray()); + pendingFrame = null; + await DispatchSocketIo(assembled); + } + } + } + } + + public async Task PushAsync(MsgEnvelope envelope, bool noStock, CancellationToken ct) + { + var stamped = noStock ? Outbound.WrapNoStock(envelope) : Outbound.AssignAndArchive(envelope); + await EncodeAndSendAsync(stamped, WireConstants.SynchronizeEvent, ct); + } + + public Task TerminateAsync(BattleFinishReason reason) + { + // WS will close via the read loop exiting; nothing to do here. + return Task.CompletedTask; + } + + public ValueTask DisposeAsync() + { + if (_ws.State == WebSocketState.Open || _ws.State == WebSocketState.CloseReceived) + { + try { _ws.Abort(); } catch { /* best effort */ } + } + _ws.Dispose(); + return ValueTask.CompletedTask; + } + + private async Task DispatchSocketIo(SocketIoFrame frame) + { + if (frame.Type is SocketIoPacketType.Event or SocketIoPacketType.BinaryEvent) + { + switch (frame.EventName) + { + case WireConstants.MsgEvent when frame.BinaryAttachments.Count == 1: + await HandleMsgEventAsync(frame); + return; + case WireConstants.AliveEvent when frame.BinaryAttachments.Count == 1: + await HandleAliveEventAsync(frame); + return; + } + } + _log.LogDebug("RealParticipant viewer={Vid}: dropping SIO event={Event}", ViewerId, frame.EventName); + } + + private async Task HandleMsgEventAsync(SocketIoFrame frame) + { + try + { + MsgEnvelope env; + try { env = MsgPayloadCodec.Decode(frame.BinaryAttachments[0]); } + catch (Exception ex) + { + _log.LogWarning(ex, "RealParticipant viewer={Vid}: failed to decode msg envelope", ViewerId); + return; + } + + bool shouldDispatch = true; + if (env.PubSeq.HasValue) + { + shouldDispatch = Inbound.Observe(env.PubSeq.Value); + if (frame.AckId.HasValue) + { + await SendSioAckAsync(frame.AckId.Value, env.PubSeq.Value); + } + } + if (!shouldDispatch) return; + + if (FrameEmitted is not null) + { + await FrameEmitted.Invoke(env, _sessionCt); + } + } + catch (Exception ex) + { + _log.LogError(ex, "RealParticipant viewer={Vid}: unhandled in HandleMsgEventAsync", ViewerId); + } + } + + private async Task HandleAliveEventAsync(SocketIoFrame frame) + { + try + { + if (frame.AckId.HasValue) + { + await SendSioAckAsync(frame.AckId.Value, 0); + } + var aliveEnv = new MsgEnvelope( + Uri: NetworkBattleUri.Gungnir, + ViewerId: SVSim.BattleNode.Lifecycle.ScriptedLifecycle.FakeOpponentViewerId, + Uuid: WireConstants.ServerUuid, + Bid: null, + Try: 0, + Cat: EmitCategory.General, + PubSeq: null, + PlaySeq: null, + Body: new AlivePushBody(Scs: WireConstants.OnlineStatus, Ocs: WireConstants.OnlineStatus)); + var stamped = Outbound.WrapNoStock(aliveEnv); + await EncodeAndSendAsync(stamped, WireConstants.AliveEvent, _sessionCt); + } + catch (Exception ex) + { + _log.LogError(ex, "RealParticipant viewer={Vid}: unhandled in HandleAliveEventAsync", ViewerId); + } + } + + private async Task EncodeAndSendAsync(MsgEnvelope env, string eventName, CancellationToken ct) + { + var key = NodeCrypto.GenerateKey(() => RandomNumberGenerator.GetInt32(0, 16)); + var bytes = MsgPayloadCodec.Encode(env, key); + var sio = SocketIoFrame.BinaryEventWithAttachments(eventName, new[] { bytes }); + var (text, bins) = sio.Encode(); + var eioText = $"{(int)EngineIoPacketType.Message}{text}"; + await SendTextAsync(eioText, ct); + foreach (var bin in bins) + { + var prefixed = new byte[bin.Length + 1]; + prefixed[0] = (byte)EngineIoPacketType.Message; + Buffer.BlockCopy(bin, 0, prefixed, 1, bin.Length); + await _ws.SendAsync(prefixed, WebSocketMessageType.Binary, endOfMessage: true, ct); + } + } + + internal static int ClipAckArg(long arg, ILogger log, long viewerId) + { + if (arg > int.MaxValue) + { + log.LogWarning("RealParticipant viewer={Vid}: pubSeq {Seq} exceeds int.MaxValue; clipping.", viewerId, arg); + return int.MaxValue; + } + if (arg < int.MinValue) + { + log.LogWarning("RealParticipant viewer={Vid}: pubSeq {Seq} below int.MinValue; clipping.", viewerId, arg); + return int.MinValue; + } + return (int)arg; + } + + private async Task SendSioAckAsync(int ackId, long arg) + { + var ack = SocketIoFrame.AckResponse(ackId, ClipAckArg(arg, _log, ViewerId)); + var (text, _) = ack.Encode(); + var eioText = $"{(int)EngineIoPacketType.Message}{text}"; + await SendTextAsync(eioText, _sessionCt); + } + + private async Task SendEioOpenAsync(CancellationToken ct) + { + var sid = Guid.NewGuid().ToString("N").Substring(0, 16); + var handshake = new EngineIoHandshake(sid, Array.Empty(), 25000, 60000).ToJson(); + await SendTextAsync($"0{handshake}", ct); + } + + private Task SendTextAsync(string text, CancellationToken ct) + { + var bytes = Encoding.UTF8.GetBytes(text); + return _ws.SendAsync(bytes, WebSocketMessageType.Text, endOfMessage: true, ct); + } + + private async Task<(byte[] Bytes, bool IsText)?> ReadCompleteMessageAsync(byte[] buffer, CancellationToken ct) + { + using var ms = new MemoryStream(); + WebSocketReceiveResult result; + do + { + try { result = await _ws.ReceiveAsync(buffer, ct); } + catch (OperationCanceledException) { return null; } + catch (WebSocketException) { return null; } + if (result.MessageType == WebSocketMessageType.Close) return null; + ms.Write(buffer, 0, result.Count); + } while (!result.EndOfMessage); + return (ms.ToArray(), result.MessageType == WebSocketMessageType.Text); + } +} diff --git a/SVSim.UnitTests/BattleNode/Sessions/Participants/RealParticipantTests.cs b/SVSim.UnitTests/BattleNode/Sessions/Participants/RealParticipantTests.cs new file mode 100644 index 0000000..cd39373 --- /dev/null +++ b/SVSim.UnitTests/BattleNode/Sessions/Participants/RealParticipantTests.cs @@ -0,0 +1,69 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NUnit.Framework; +using SVSim.BattleNode.Bridge; +using SVSim.BattleNode.Protocol; +using SVSim.BattleNode.Protocol.Bodies; +using SVSim.BattleNode.Sessions; +using SVSim.BattleNode.Sessions.Participants; +using SVSim.UnitTests.BattleNode.Infrastructure; + +namespace SVSim.UnitTests.BattleNode.Sessions.Participants; + +[TestFixture] +public class RealParticipantTests +{ + [Test] + public void PushAsync_ordered_assigns_playSeq_via_OutboundSequencer() + { + var ws = new TestWebSocket(); + var p = new RealParticipant(ws, viewerId: 1, FixtureCtx(), + NullLogger.Instance); + + // First ordered push gets playSeq = 1; second = 2; etc. + // Inspect the participant's outbound sequencer state via its public Archive. + var env = NewEnvelope(NetworkBattleUri.Matched); + p.PushAsync(env, noStock: false, CancellationToken.None).Wait(); + p.PushAsync(env, noStock: false, CancellationToken.None).Wait(); + + Assert.That(p.Outbound.Archive.Count, Is.EqualTo(2)); + Assert.That(p.Outbound.Archive[1].PlaySeq, Is.EqualTo(1)); + Assert.That(p.Outbound.Archive[2].PlaySeq, Is.EqualTo(2)); + } + + [Test] + public void PushAsync_noStock_omits_playSeq() + { + var ws = new TestWebSocket(); + var p = new RealParticipant(ws, viewerId: 1, FixtureCtx(), + NullLogger.Instance); + + p.PushAsync(NewEnvelope(NetworkBattleUri.BattleFinish), noStock: true, CancellationToken.None).Wait(); + + // No playSeq archive entry for no-stock pushes. + Assert.That(p.Outbound.Archive.Count, Is.EqualTo(0)); + } + + [Test] + public void ViewerId_and_Context_are_exposed() + { + var ws = new TestWebSocket(); + var ctx = FixtureCtx(); + var p = new RealParticipant(ws, viewerId: 906243102L, ctx, + NullLogger.Instance); + + Assert.That(p.ViewerId, Is.EqualTo(906243102L)); + Assert.That(p.Context, Is.SameAs(ctx)); + } + + private static MatchContext FixtureCtx() => new( + SelfDeckCardIds: Enumerable.Range(1, 30).Select(_ => 100_011_010L).ToList(), + ClassId: "1", CharaId: "1", CardMasterName: "card_master_node_10015", + CountryCode: "KOR", UserName: "Player", SleeveId: "3000011", + EmblemId: "701441011", DegreeId: "300003", FieldId: 43, IsOfficial: 0, + BattleType: 11); + + private static MsgEnvelope NewEnvelope(NetworkBattleUri uri) => + new(uri, ViewerId: 1, Uuid: "u", Bid: null, Try: 0, + Cat: EmitCategory.Battle, PubSeq: null, PlaySeq: null, + Body: new ResultCodeOnlyBody()); +}