Files
SVSimServer/SVSim.BattleNode/Sessions/Participants/RealParticipant.cs
gamer147 9fc1d055d8 fix(battle-node): ack 'hand' SIO events to unblock client emit queue
Scripted-bot softlock root cause: client-stocked SELECT_SKILL_URI /
SLIDE_OBJECT_URI hand emits (e.g. target selection on unit play / leader
attack) arrive as SIO BinaryEvent("hand", ...) with an ack-id. Our
DispatchSocketIo only had cases for "msg" and "alive" — "hand" fell to
the default Debug-drop with no SIO ack going back. Client's
stockEmitMessageMgr (RealTimeNetworkAgent.cs:1463) blocks subsequent
emits until the previous one is acked, so all follow-up PlayActions /
TurnEndActions / TurnEnd frames were stocked but never transmitted. The
loader hooks at EmitMsg (intent) not the socket layer, which is why
battle-traffic.ndjson shows the frames as sent while the server never
received them. ~10s later the client gives up and aborts the WS.

Wire-shape proof from data_dumps/captures/logs/websocket_output.txt:
  line 619: [sio-in] uri=TurnStart pubSeq=17 ackId=16 ... (T3 start)
  line 689: [ws-rx-text] preview=451-26["hand", {...}] ← unhandled
  line 691: [ws-rx-bin]  binLen=58 pendingFrame=hand
  (no further [sio-in] entries — server received nothing else)
  line 709: [ws-recv-exit] reason=OperationCanceled wsState=Aborted

New HandleHandEventAsync (RealParticipant.cs):
- Fire-and-forget hand frames (no ack-id; TOUCH_URI / SELECT_OBJECT_URI /
  TURN_END_READY_URI) are silently swallowed — no queue-blocking risk
- Stocked hand frames decode the binary attachment via the same
  msgpack-string + NodeCrypto.Decrypt pipeline as HandleMsgEventAsync,
  parse the JSON, extract top-level "pubSeq", and SendSioAckAsync with
  that pubSeq as the ack arg (matches what stockEmitMessageMgr.GetSelectData
  expects to look up)
- Body shape is {"StockHandData":[uri_int, viewerId, udid, ...params,
  pubSeq], "try":0, "pubSeq":N} — NOT a MsgEnvelope (no top-level "uri"),
  so we can't reuse HandleMsgEventAsync as-is
- Missing-pubSeq fallback acks with arg=0 (rare path, logged at Warning)
  so we never softlock from a malformed body

WireConstants gets the HandEvent = "hand" constant for the dispatch case.

In scripted/Bot mode the ack-only handler is correct (no opponent to
forward touches to). PvP-side forwarding semantics are unverified — see
docs/audits/battle-node-sio-events-2026-06-02.md (outer repo) for the
full event inventory and remaining gaps.

Tests:
- RealParticipantHandEventTests covers the three paths: stocked-with-ack,
  fire-and-forget (no ack expected), missing-pubSeq fallback (arg=0). Each
  drives a real hand frame through RunAsync via TestWebSocket and asserts
  the SIO ack frame shape (43<ackId>[<arg>]) in outbound sends.
- 175 battle-node tests passing (was 172; +3 new). Full suite green.

Diagnostic logs ([sio-in] / [sio-out] / [ws-rx-text] / [ws-rx-bin] /
[ws-recv-exit] / [ws-loop-exit]) are left in place for one verification
cycle. After a live re-run confirms the fix, they should be stripped per
the audit doc's recommended-order step 2.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-02 16:41:40 -04:00

432 lines
18 KiB
C#

using System.Net.WebSockets;
using System.Security.Cryptography;
using System.Text;
using Microsoft.Extensions.Logging;
using SVSim.BattleNode.Bridge;
using SVSim.BattleNode.Protocol;
using SVSim.BattleNode.Protocol.Bodies;
using SVSim.BattleNode.Reliability;
using SVSim.BattleNode.Wire;
namespace SVSim.BattleNode.Sessions.Participants;
/// <summary>
/// Marker interface implemented by participants that own a handshake-phase cursor.
/// <see cref="BattleSession.ComputeFrames"/> reads the sender's <see cref="Phase"/>
/// when gating the handshake-phase arms (InitNetwork / InitBattle / Loaded / Swap)
/// and the TurnEnd-AfterReady forwarder. Bots don't implement this — they never
/// send the gating URIs.
/// </summary>
internal interface IHasHandshakePhase
{
BattleSessionPhase Phase { get; set; }
}
/// <summary>
/// WS-backed participant. Owns the WS read loop, SIO encoding/decoding, per-WS
/// <see cref="OutboundSequencer"/> + <see cref="InboundTracker"/>. Fires
/// <see cref="FrameEmitted"/> on each deduplicated inbound <see cref="MsgEnvelope"/>.
/// PushAsync encodes + sends; ordered pushes get a playSeq from the sequencer,
/// no-stock control pushes bypass it.
/// </summary>
public sealed class RealParticipant : IBattleParticipant, IHasHandshakePhase
{
private readonly WebSocket _ws;
private readonly ILogger<RealParticipant> _log;
private CancellationToken _sessionCt;
public long ViewerId { get; }
public MatchContext Context { get; }
public InboundTracker Inbound { get; } = new();
public OutboundSequencer Outbound { get; } = new();
/// <summary>Per-side handshake progression. Session reads this when gating
/// handshake-phase synthesis (Matched / BattleStart / Deal / Swap response /
/// Ready). Session transitions via the setter after dispatch. Defaults to
/// AwaitingInitNetwork; only RealParticipant tracks this — bots have no phase
/// because they never send the gating URIs. Also satisfies
/// <see cref="IHasHandshakePhase"/> (the interface BattleSession uses to gate
/// handshake dispatch without depending on the concrete RealParticipant type).</summary>
internal BattleSessionPhase Phase { get; set; } = BattleSessionPhase.AwaitingInitNetwork;
BattleSessionPhase IHasHandshakePhase.Phase
{
get => Phase;
set => Phase = value;
}
public event Func<MsgEnvelope, CancellationToken, Task>? FrameEmitted;
private readonly TaskCompletionSource<bool> _sessionFinished
= new(TaskCreationOptions.RunContinuationsAsynchronously);
/// <summary>Called by the second arriver's handler (in a finally block) after
/// session.RunAsync completes. Signals the first arriver's handler that it can
/// return and let the HTTP request complete (which closes the WS).</summary>
internal void MarkSessionFinished() => _sessionFinished.TrySetResult(true);
/// <summary>Awaited by the first arriver's handler instead of calling RunAsync
/// (the session already calls RunAsync on this instance from the second arriver's
/// handler context — calling it twice would race the WS read loop). Returns when
/// either MarkSessionFinished fires or the passed CT cancels.</summary>
internal Task AwaitSessionFinishedAsync(CancellationToken ct)
{
if (_sessionFinished.Task.IsCompleted) return _sessionFinished.Task;
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var reg = ct.Register(() => tcs.TrySetCanceled(ct));
_sessionFinished.Task.ContinueWith(t =>
{
reg.Dispose();
if (t.IsCompletedSuccessfully) tcs.TrySetResult(true);
else if (t.IsFaulted) tcs.TrySetException(t.Exception!.InnerExceptions);
else tcs.TrySetCanceled();
}, TaskContinuationOptions.ExecuteSynchronously);
return tcs.Task;
}
public RealParticipant(WebSocket ws, long viewerId, MatchContext context,
ILogger<RealParticipant> log)
{
_ws = ws;
_log = log;
ViewerId = viewerId;
Context = context;
}
public async Task RunAsync(CancellationToken cancellation)
{
_sessionCt = cancellation;
await SendEioOpenAsync(cancellation);
var buffer = new byte[8192];
var pendingAttachments = new List<byte[]>();
SocketIoFrame? pendingFrame = null;
string exitReason = "loop-condition-false";
try
{
while (_ws.State == WebSocketState.Open && !cancellation.IsCancellationRequested)
{
var msg = await ReadCompleteMessageAsync(buffer, cancellation);
if (msg is null) { exitReason = "read-returned-null"; break; }
if (msg.Value.IsText)
{
var text = Encoding.UTF8.GetString(msg.Value.Bytes);
if (text.Length == 0) continue;
var eio = EngineIoFrame.Parse(text);
// Diagnostic: log every EIO frame type so we can see Ping/Close packets too.
_log.LogInformation(
"[ws-rx-text] viewer={Vid} eioType={Eio} len={Len} preview={Preview}",
ViewerId, eio.Type, text.Length,
text.Length > 60 ? text.Substring(0, 60) + "..." : text);
if (eio.Type == EngineIoPacketType.Ping)
{
await SendTextAsync("3", cancellation);
continue;
}
if (eio.Type != EngineIoPacketType.Message) continue;
var sio = SocketIoFrame.Parse(eio.Payload);
if (sio.AttachmentCount > 0)
{
pendingFrame = sio;
pendingAttachments.Clear();
continue;
}
await DispatchSocketIo(sio);
}
else
{
var bin = msg.Value.Bytes;
if (bin.Length > 0 && bin[0] == (byte)EngineIoPacketType.Message)
{
bin = bin.AsSpan(1).ToArray();
}
pendingAttachments.Add(bin);
_log.LogInformation(
"[ws-rx-bin] viewer={Vid} binLen={Len} pendingFrame={Pending} attachCount={AttachCount}",
ViewerId, bin.Length, pendingFrame?.EventName ?? "(null)", pendingAttachments.Count);
if (pendingFrame is not null && pendingAttachments.Count == pendingFrame.AttachmentCount)
{
var assembled = pendingFrame.WithAttachments(pendingAttachments.ToArray());
pendingFrame = null;
await DispatchSocketIo(assembled);
}
}
}
}
catch (Exception ex)
{
exitReason = $"throw:{ex.GetType().Name}:{ex.Message}";
throw;
}
finally
{
_log.LogWarning(
"[ws-loop-exit] viewer={Vid} reason={Reason} wsState={State} cancelled={Cancelled}",
ViewerId, exitReason, _ws.State, cancellation.IsCancellationRequested);
}
}
public async Task PushAsync(MsgEnvelope envelope, bool noStock, CancellationToken ct)
{
var stamped = noStock ? Outbound.WrapNoStock(envelope) : Outbound.AssignAndArchive(envelope);
// Temporary diagnostic log to pair with the [sio-in] log — gives full inbound/outbound
// sequence numbering for the scripted-attack TurnEnd-never-fires investigation.
_log.LogInformation(
"[sio-out] viewer={Vid} uri={Uri} pubSeq={Pseq} playSeq={Plseq} noStock={NoStock}",
ViewerId, stamped.Uri, stamped.PubSeq, stamped.PlaySeq, noStock);
await EncodeAndSendAsync(stamped, WireConstants.SynchronizeEvent, ct);
}
public Task TerminateAsync(BattleFinishReason reason)
{
// WS will close via the read loop exiting; nothing to do here.
return Task.CompletedTask;
}
public ValueTask DisposeAsync()
{
if (_ws.State == WebSocketState.Open || _ws.State == WebSocketState.CloseReceived)
{
try { _ws.Abort(); } catch { /* best effort */ }
}
_ws.Dispose();
return ValueTask.CompletedTask;
}
private async Task DispatchSocketIo(SocketIoFrame frame)
{
if (frame.Type is SocketIoPacketType.Event or SocketIoPacketType.BinaryEvent)
{
switch (frame.EventName)
{
case WireConstants.MsgEvent when frame.BinaryAttachments.Count == 1:
await HandleMsgEventAsync(frame);
return;
case WireConstants.AliveEvent when frame.BinaryAttachments.Count == 1:
await HandleAliveEventAsync(frame);
return;
case WireConstants.HandEvent when frame.BinaryAttachments.Count == 1:
await HandleHandEventAsync(frame);
return;
}
}
_log.LogDebug("RealParticipant viewer={Vid}: dropping SIO event={Event}", ViewerId, frame.EventName);
}
private async Task HandleMsgEventAsync(SocketIoFrame frame)
{
try
{
MsgEnvelope env;
try { env = MsgPayloadCodec.Decode(frame.BinaryAttachments[0]); }
catch (Exception ex)
{
_log.LogWarning(ex, "RealParticipant viewer={Vid}: failed to decode msg envelope", ViewerId);
return;
}
bool shouldDispatch = true;
bool ackSent = false;
long? ackArg = null;
if (env.PubSeq.HasValue)
{
shouldDispatch = Inbound.Observe(env.PubSeq.Value);
if (frame.AckId.HasValue)
{
await SendSioAckAsync(frame.AckId.Value, env.PubSeq.Value);
ackSent = true;
ackArg = env.PubSeq.Value;
}
}
// Temporary diagnostic log to chase a scripted-bot bug where the client
// sends TurnEndActions after a PlayActions(type=10) attack but never sends the
// follow-up TurnEnd. We need to confirm whether the server actually sent the
// SIO ack for the offending TurnEndActions (and what arg). Log shape:
// uri / pubSeq / ackId / dispatch / ackSent / ackArg / watermark
_log.LogInformation(
"[sio-in] viewer={Vid} uri={Uri} pubSeq={Pseq} ackId={AckId} dispatch={Dispatch} ackSent={AckSent} ackArg={AckArg} highWaterMark={Hwm}",
ViewerId,
env.Uri,
env.PubSeq,
frame.AckId,
shouldDispatch,
ackSent,
ackArg,
Inbound.HighWaterMark);
if (!shouldDispatch) return;
if (FrameEmitted is not null)
{
await FrameEmitted.Invoke(env, _sessionCt);
}
}
catch (Exception ex)
{
_log.LogError(ex, "RealParticipant viewer={Vid}: unhandled in HandleMsgEventAsync", ViewerId);
}
}
/// <summary>
/// Ack <c>hand</c> events from the client so the client's <c>stockEmitMessageMgr</c>
/// drains and subsequent emits transmit. Wire shape differs from <c>msg</c>: the body
/// is <c>{"StockHandData":[uri_int, viewerId, udid, ...params, pubSeq], "try":0, "pubSeq":N}</c>
/// (no top-level <c>uri</c>), so we can't reuse <see cref="HandleMsgEventAsync"/> — its
/// <see cref="MsgEnvelope.FromJson"/> path requires a top-level <c>uri</c>.
/// <para>
/// In scripted/Bot mode the server has no opponent to forward touches to; ack-only is
/// correct. PvP-side forwarding (so the other player's client can render opponent
/// touch/cursor UI) is unverified — see <c>docs/audits/battle-node-sio-events-2026-06-02.md</c>.
/// </para>
/// <para>
/// Fire-and-forget hand frames (TOUCH_URI / SELECT_OBJECT_URI / TURN_END_READY_URI) arrive
/// with no ack-id; we swallow without decoding. Stocked variants (SELECT_SKILL_URI /
/// SLIDE_OBJECT_URI) arrive with an ack-id and must be acked with the body's <c>pubSeq</c>
/// or the client's emit queue softlocks behind them.
/// </para>
/// </summary>
private async Task HandleHandEventAsync(SocketIoFrame frame)
{
if (!frame.AckId.HasValue)
{
// Fire-and-forget; no queue-blocking risk. Swallow without decoding.
return;
}
try
{
var encryptedString = MessagePack.MessagePackSerializer.Deserialize<string>(frame.BinaryAttachments[0]);
var json = NodeCrypto.DecryptForNode(encryptedString);
using var doc = System.Text.Json.JsonDocument.Parse(json);
if (!doc.RootElement.TryGetProperty("pubSeq", out var psEl)
|| psEl.ValueKind != System.Text.Json.JsonValueKind.Number)
{
_log.LogWarning(
"RealParticipant viewer={Vid}: 'hand' event ackId={AckId} body missing numeric pubSeq; " +
"acking with 0 as a fallback (client's stockEmitMessageMgr lookup will see null selectData).",
ViewerId, frame.AckId);
await SendSioAckAsync(frame.AckId.Value, 0);
return;
}
await SendSioAckAsync(frame.AckId.Value, psEl.GetInt64());
}
catch (Exception ex)
{
_log.LogWarning(ex,
"RealParticipant viewer={Vid}: failed to decode 'hand' event body; not acking. ackId={AckId}",
ViewerId, frame.AckId);
}
}
private async Task HandleAliveEventAsync(SocketIoFrame frame)
{
try
{
if (frame.AckId.HasValue)
{
await SendSioAckAsync(frame.AckId.Value, 0);
}
var aliveEnv = new MsgEnvelope(
Uri: NetworkBattleUri.Gungnir,
ViewerId: SVSim.BattleNode.Lifecycle.ScriptedLifecycle.FakeOpponentViewerId,
Uuid: WireConstants.ServerUuid,
Bid: null,
Try: 0,
Cat: EmitCategory.General,
PubSeq: null,
PlaySeq: null,
Body: new AlivePushBody(Scs: WireConstants.OnlineStatus, Ocs: WireConstants.OnlineStatus));
var stamped = Outbound.WrapNoStock(aliveEnv);
await EncodeAndSendAsync(stamped, WireConstants.AliveEvent, _sessionCt);
}
catch (Exception ex)
{
_log.LogError(ex, "RealParticipant viewer={Vid}: unhandled in HandleAliveEventAsync", ViewerId);
}
}
private async Task EncodeAndSendAsync(MsgEnvelope env, string eventName, CancellationToken ct)
{
var key = NodeCrypto.GenerateKey(() => RandomNumberGenerator.GetInt32(0, 16));
var bytes = MsgPayloadCodec.Encode(env, key);
var sio = SocketIoFrame.BinaryEventWithAttachments(eventName, new[] { bytes });
var (text, bins) = sio.Encode();
var eioText = $"{(int)EngineIoPacketType.Message}{text}";
await SendTextAsync(eioText, ct);
foreach (var bin in bins)
{
var prefixed = new byte[bin.Length + 1];
prefixed[0] = (byte)EngineIoPacketType.Message;
Buffer.BlockCopy(bin, 0, prefixed, 1, bin.Length);
await _ws.SendAsync(prefixed, WebSocketMessageType.Binary, endOfMessage: true, ct);
}
}
internal static int ClipAckArg(long arg, ILogger log, long viewerId)
{
if (arg > int.MaxValue)
{
log.LogWarning("RealParticipant viewer={Vid}: pubSeq {Seq} exceeds int.MaxValue; clipping.", viewerId, arg);
return int.MaxValue;
}
if (arg < int.MinValue)
{
log.LogWarning("RealParticipant viewer={Vid}: pubSeq {Seq} below int.MinValue; clipping.", viewerId, arg);
return int.MinValue;
}
return (int)arg;
}
private async Task SendSioAckAsync(int ackId, long arg)
{
var ack = SocketIoFrame.AckResponse(ackId, ClipAckArg(arg, _log, ViewerId));
var (text, _) = ack.Encode();
var eioText = $"{(int)EngineIoPacketType.Message}{text}";
await SendTextAsync(eioText, _sessionCt);
}
private async Task SendEioOpenAsync(CancellationToken ct)
{
var sid = Guid.NewGuid().ToString("N").Substring(0, 16);
var handshake = new EngineIoHandshake(sid, Array.Empty<string>(), 25000, 60000).ToJson();
await SendTextAsync($"0{handshake}", ct);
}
private Task SendTextAsync(string text, CancellationToken ct)
{
var bytes = Encoding.UTF8.GetBytes(text);
return _ws.SendAsync(bytes, WebSocketMessageType.Text, endOfMessage: true, ct);
}
private async Task<(byte[] Bytes, bool IsText)?> ReadCompleteMessageAsync(byte[] buffer, CancellationToken ct)
{
using var ms = new MemoryStream();
WebSocketReceiveResult result;
do
{
try { result = await _ws.ReceiveAsync(buffer, ct); }
catch (OperationCanceledException)
{
_log.LogWarning("[ws-recv-exit] viewer={Vid} reason=OperationCanceled wsState={State}", ViewerId, _ws.State);
return null;
}
catch (WebSocketException wsex)
{
_log.LogWarning(wsex, "[ws-recv-exit] viewer={Vid} reason=WebSocketException wsState={State} errCode={ErrCode}",
ViewerId, _ws.State, wsex.WebSocketErrorCode);
return null;
}
if (result.MessageType == WebSocketMessageType.Close)
{
_log.LogWarning("[ws-recv-exit] viewer={Vid} reason=ClientCloseFrame wsState={State} closeStatus={Status} desc={Desc}",
ViewerId, _ws.State, result.CloseStatus, result.CloseStatusDescription);
return null;
}
ms.Write(buffer, 0, result.Count);
} while (!result.EndOfMessage);
return (ms.ToArray(), result.MessageType == WebSocketMessageType.Text);
}
}