using System.Net.WebSockets; using System.Security.Cryptography; using System.Text; using Microsoft.Extensions.Logging; using SVSim.BattleNode.Bridge; using SVSim.BattleNode.Protocol; using SVSim.BattleNode.Protocol.Bodies; using SVSim.BattleNode.Reliability; using SVSim.BattleNode.Wire; namespace SVSim.BattleNode.Sessions.Participants; /// /// Marker interface implemented by participants that own a handshake-phase cursor. /// reads the sender's /// when gating the handshake-phase arms (InitNetwork / InitBattle / Loaded / Swap) /// and the TurnEnd-AfterReady forwarder. Bots don't implement this — they never /// send the gating URIs. /// internal interface IHasHandshakePhase { BattleSessionPhase Phase { get; set; } } /// /// WS-backed participant. Owns the WS read loop, SIO encoding/decoding, per-WS /// + . Fires /// on each deduplicated inbound . /// PushAsync encodes + sends; ordered pushes get a playSeq from the sequencer, /// no-stock control pushes bypass it. /// public sealed class RealParticipant : IBattleParticipant, IHasHandshakePhase { private readonly WebSocket _ws; private readonly ILogger _log; private CancellationToken _sessionCt; public long ViewerId { get; } public MatchContext Context { get; } public InboundTracker Inbound { get; } = new(); public OutboundSequencer Outbound { get; } = new(); /// Per-side handshake progression. Session reads this when gating /// handshake-phase synthesis (Matched / BattleStart / Deal / Swap response / /// Ready). Session transitions via the setter after dispatch. Defaults to /// AwaitingInitNetwork; only RealParticipant tracks this — bots have no phase /// because they never send the gating URIs. Also satisfies /// (the interface BattleSession uses to gate /// handshake dispatch without depending on the concrete RealParticipant type). internal BattleSessionPhase Phase { get; set; } = BattleSessionPhase.AwaitingInitNetwork; BattleSessionPhase IHasHandshakePhase.Phase { get => Phase; set => Phase = value; } public event Func? FrameEmitted; private readonly TaskCompletionSource _sessionFinished = new(TaskCreationOptions.RunContinuationsAsynchronously); /// Called by the second arriver's handler (in a finally block) after /// session.RunAsync completes. Signals the first arriver's handler that it can /// return and let the HTTP request complete (which closes the WS). internal void MarkSessionFinished() => _sessionFinished.TrySetResult(true); /// Awaited by the first arriver's handler instead of calling RunAsync /// (the session already calls RunAsync on this instance from the second arriver's /// handler context — calling it twice would race the WS read loop). Returns when /// either MarkSessionFinished fires or the passed CT cancels. internal Task AwaitSessionFinishedAsync(CancellationToken ct) { if (_sessionFinished.Task.IsCompleted) return _sessionFinished.Task; var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var reg = ct.Register(() => tcs.TrySetCanceled(ct)); _sessionFinished.Task.ContinueWith(t => { reg.Dispose(); if (t.IsCompletedSuccessfully) tcs.TrySetResult(true); else if (t.IsFaulted) tcs.TrySetException(t.Exception!.InnerExceptions); else tcs.TrySetCanceled(); }, TaskContinuationOptions.ExecuteSynchronously); return tcs.Task; } public RealParticipant(WebSocket ws, long viewerId, MatchContext context, ILogger log) { _ws = ws; _log = log; ViewerId = viewerId; Context = context; } public async Task RunAsync(CancellationToken cancellation) { _sessionCt = cancellation; await SendEioOpenAsync(cancellation); var buffer = new byte[8192]; var pendingAttachments = new List(); SocketIoFrame? pendingFrame = null; while (_ws.State == WebSocketState.Open && !cancellation.IsCancellationRequested) { var msg = await ReadCompleteMessageAsync(buffer, cancellation); if (msg is null) break; if (msg.Value.IsText) { var text = Encoding.UTF8.GetString(msg.Value.Bytes); if (text.Length == 0) continue; var eio = EngineIoFrame.Parse(text); if (eio.Type == EngineIoPacketType.Ping) { await SendTextAsync("3", cancellation); continue; } if (eio.Type != EngineIoPacketType.Message) continue; var sio = SocketIoFrame.Parse(eio.Payload); if (sio.AttachmentCount > 0) { pendingFrame = sio; pendingAttachments.Clear(); continue; } await DispatchSocketIo(sio); } else { var bin = msg.Value.Bytes; if (bin.Length > 0 && bin[0] == (byte)EngineIoPacketType.Message) { bin = bin.AsSpan(1).ToArray(); } pendingAttachments.Add(bin); if (pendingFrame is not null && pendingAttachments.Count == pendingFrame.AttachmentCount) { var assembled = pendingFrame.WithAttachments(pendingAttachments.ToArray()); pendingFrame = null; await DispatchSocketIo(assembled); } } } } public async Task PushAsync(MsgEnvelope envelope, bool noStock, CancellationToken ct) { var stamped = noStock ? Outbound.WrapNoStock(envelope) : Outbound.AssignAndArchive(envelope); await EncodeAndSendAsync(stamped, WireConstants.SynchronizeEvent, ct); } public Task TerminateAsync(BattleFinishReason reason) { // WS will close via the read loop exiting; nothing to do here. return Task.CompletedTask; } public ValueTask DisposeAsync() { if (_ws.State == WebSocketState.Open || _ws.State == WebSocketState.CloseReceived) { try { _ws.Abort(); } catch { /* best effort */ } } _ws.Dispose(); return ValueTask.CompletedTask; } private async Task DispatchSocketIo(SocketIoFrame frame) { if (frame.Type is SocketIoPacketType.Event or SocketIoPacketType.BinaryEvent) { switch (frame.EventName) { case WireConstants.MsgEvent when frame.BinaryAttachments.Count == 1: await HandleMsgEventAsync(frame); return; case WireConstants.AliveEvent when frame.BinaryAttachments.Count == 1: await HandleAliveEventAsync(frame); return; } } _log.LogDebug("RealParticipant viewer={Vid}: dropping SIO event={Event}", ViewerId, frame.EventName); } private async Task HandleMsgEventAsync(SocketIoFrame frame) { try { MsgEnvelope env; try { env = MsgPayloadCodec.Decode(frame.BinaryAttachments[0]); } catch (Exception ex) { _log.LogWarning(ex, "RealParticipant viewer={Vid}: failed to decode msg envelope", ViewerId); return; } bool shouldDispatch = true; if (env.PubSeq.HasValue) { shouldDispatch = Inbound.Observe(env.PubSeq.Value); if (frame.AckId.HasValue) { await SendSioAckAsync(frame.AckId.Value, env.PubSeq.Value); } } if (!shouldDispatch) return; if (FrameEmitted is not null) { await FrameEmitted.Invoke(env, _sessionCt); } } catch (Exception ex) { _log.LogError(ex, "RealParticipant viewer={Vid}: unhandled in HandleMsgEventAsync", ViewerId); } } private async Task HandleAliveEventAsync(SocketIoFrame frame) { try { if (frame.AckId.HasValue) { await SendSioAckAsync(frame.AckId.Value, 0); } var aliveEnv = new MsgEnvelope( Uri: NetworkBattleUri.Gungnir, ViewerId: SVSim.BattleNode.Lifecycle.ScriptedLifecycle.FakeOpponentViewerId, Uuid: WireConstants.ServerUuid, Bid: null, Try: 0, Cat: EmitCategory.General, PubSeq: null, PlaySeq: null, Body: new AlivePushBody(Scs: WireConstants.OnlineStatus, Ocs: WireConstants.OnlineStatus)); var stamped = Outbound.WrapNoStock(aliveEnv); await EncodeAndSendAsync(stamped, WireConstants.AliveEvent, _sessionCt); } catch (Exception ex) { _log.LogError(ex, "RealParticipant viewer={Vid}: unhandled in HandleAliveEventAsync", ViewerId); } } private async Task EncodeAndSendAsync(MsgEnvelope env, string eventName, CancellationToken ct) { var key = NodeCrypto.GenerateKey(() => RandomNumberGenerator.GetInt32(0, 16)); var bytes = MsgPayloadCodec.Encode(env, key); var sio = SocketIoFrame.BinaryEventWithAttachments(eventName, new[] { bytes }); var (text, bins) = sio.Encode(); var eioText = $"{(int)EngineIoPacketType.Message}{text}"; await SendTextAsync(eioText, ct); foreach (var bin in bins) { var prefixed = new byte[bin.Length + 1]; prefixed[0] = (byte)EngineIoPacketType.Message; Buffer.BlockCopy(bin, 0, prefixed, 1, bin.Length); await _ws.SendAsync(prefixed, WebSocketMessageType.Binary, endOfMessage: true, ct); } } internal static int ClipAckArg(long arg, ILogger log, long viewerId) { if (arg > int.MaxValue) { log.LogWarning("RealParticipant viewer={Vid}: pubSeq {Seq} exceeds int.MaxValue; clipping.", viewerId, arg); return int.MaxValue; } if (arg < int.MinValue) { log.LogWarning("RealParticipant viewer={Vid}: pubSeq {Seq} below int.MinValue; clipping.", viewerId, arg); return int.MinValue; } return (int)arg; } private async Task SendSioAckAsync(int ackId, long arg) { var ack = SocketIoFrame.AckResponse(ackId, ClipAckArg(arg, _log, ViewerId)); var (text, _) = ack.Encode(); var eioText = $"{(int)EngineIoPacketType.Message}{text}"; await SendTextAsync(eioText, _sessionCt); } private async Task SendEioOpenAsync(CancellationToken ct) { var sid = Guid.NewGuid().ToString("N").Substring(0, 16); var handshake = new EngineIoHandshake(sid, Array.Empty(), 25000, 60000).ToJson(); await SendTextAsync($"0{handshake}", ct); } private Task SendTextAsync(string text, CancellationToken ct) { var bytes = Encoding.UTF8.GetBytes(text); return _ws.SendAsync(bytes, WebSocketMessageType.Text, endOfMessage: true, ct); } private async Task<(byte[] Bytes, bool IsText)?> ReadCompleteMessageAsync(byte[] buffer, CancellationToken ct) { using var ms = new MemoryStream(); WebSocketReceiveResult result; do { try { result = await _ws.ReceiveAsync(buffer, ct); } catch (OperationCanceledException) { return null; } catch (WebSocketException) { return null; } if (result.MessageType == WebSocketMessageType.Close) return null; ms.Write(buffer, 0, result.Count); } while (!result.EndOfMessage); return (ms.ToArray(), result.MessageType == WebSocketMessageType.Text); } }