feat(battle-node): add RealParticipant wrapping WS + sequencers
Lifts the WS read loop, SIO encode/decode, per-WS OutboundSequencer + InboundTracker, and SIO ack out of BattleSession into a participant. PushAsync(noStock=false) assigns playSeq via the sequencer; noStock=true bypasses it. FrameEmitted fires on each deduplicated inbound envelope. The existing BattleSession keeps its own copy of the WS code for now; Task 9 cuts the handler over to use BattleSessionV2 + RealParticipant and Task 10 deletes the old BattleSession + duplicate code.
This commit is contained in:
262
SVSim.BattleNode/Sessions/Participants/RealParticipant.cs
Normal file
262
SVSim.BattleNode/Sessions/Participants/RealParticipant.cs
Normal file
@@ -0,0 +1,262 @@
|
|||||||
|
using System.Net.WebSockets;
|
||||||
|
using System.Security.Cryptography;
|
||||||
|
using System.Text;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using SVSim.BattleNode.Bridge;
|
||||||
|
using SVSim.BattleNode.Protocol;
|
||||||
|
using SVSim.BattleNode.Protocol.Bodies;
|
||||||
|
using SVSim.BattleNode.Reliability;
|
||||||
|
using SVSim.BattleNode.Wire;
|
||||||
|
|
||||||
|
namespace SVSim.BattleNode.Sessions.Participants;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// WS-backed participant. Owns the WS read loop, SIO encoding/decoding, per-WS
|
||||||
|
/// <see cref="OutboundSequencer"/> + <see cref="InboundTracker"/>. Fires
|
||||||
|
/// <see cref="FrameEmitted"/> on each deduplicated inbound <see cref="MsgEnvelope"/>.
|
||||||
|
/// PushAsync encodes + sends; ordered pushes get a playSeq from the sequencer,
|
||||||
|
/// no-stock control pushes bypass it.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class RealParticipant : IBattleParticipant
|
||||||
|
{
|
||||||
|
private readonly WebSocket _ws;
|
||||||
|
private readonly ILogger<RealParticipant> _log;
|
||||||
|
private CancellationToken _sessionCt;
|
||||||
|
|
||||||
|
public long ViewerId { get; }
|
||||||
|
public MatchContext Context { get; }
|
||||||
|
public InboundTracker Inbound { get; } = new();
|
||||||
|
public OutboundSequencer Outbound { get; } = new();
|
||||||
|
|
||||||
|
public event Func<MsgEnvelope, CancellationToken, Task>? FrameEmitted;
|
||||||
|
|
||||||
|
public RealParticipant(WebSocket ws, long viewerId, MatchContext context,
|
||||||
|
ILogger<RealParticipant> log)
|
||||||
|
{
|
||||||
|
_ws = ws;
|
||||||
|
_log = log;
|
||||||
|
ViewerId = viewerId;
|
||||||
|
Context = context;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task RunAsync(CancellationToken cancellation)
|
||||||
|
{
|
||||||
|
_sessionCt = cancellation;
|
||||||
|
await SendEioOpenAsync(cancellation);
|
||||||
|
|
||||||
|
var buffer = new byte[8192];
|
||||||
|
var pendingAttachments = new List<byte[]>();
|
||||||
|
SocketIoFrame? pendingFrame = null;
|
||||||
|
|
||||||
|
while (_ws.State == WebSocketState.Open && !cancellation.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
var msg = await ReadCompleteMessageAsync(buffer, cancellation);
|
||||||
|
if (msg is null) break;
|
||||||
|
|
||||||
|
if (msg.Value.IsText)
|
||||||
|
{
|
||||||
|
var text = Encoding.UTF8.GetString(msg.Value.Bytes);
|
||||||
|
if (text.Length == 0) continue;
|
||||||
|
var eio = EngineIoFrame.Parse(text);
|
||||||
|
if (eio.Type == EngineIoPacketType.Ping)
|
||||||
|
{
|
||||||
|
await SendTextAsync("3", cancellation);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (eio.Type != EngineIoPacketType.Message) continue;
|
||||||
|
|
||||||
|
var sio = SocketIoFrame.Parse(eio.Payload);
|
||||||
|
if (sio.AttachmentCount > 0)
|
||||||
|
{
|
||||||
|
pendingFrame = sio;
|
||||||
|
pendingAttachments.Clear();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
await DispatchSocketIo(sio);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
var bin = msg.Value.Bytes;
|
||||||
|
if (bin.Length > 0 && bin[0] == (byte)EngineIoPacketType.Message)
|
||||||
|
{
|
||||||
|
bin = bin.AsSpan(1).ToArray();
|
||||||
|
}
|
||||||
|
pendingAttachments.Add(bin);
|
||||||
|
if (pendingFrame is not null && pendingAttachments.Count == pendingFrame.AttachmentCount)
|
||||||
|
{
|
||||||
|
var assembled = pendingFrame.WithAttachments(pendingAttachments.ToArray());
|
||||||
|
pendingFrame = null;
|
||||||
|
await DispatchSocketIo(assembled);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task PushAsync(MsgEnvelope envelope, bool noStock, CancellationToken ct)
|
||||||
|
{
|
||||||
|
var stamped = noStock ? Outbound.WrapNoStock(envelope) : Outbound.AssignAndArchive(envelope);
|
||||||
|
await EncodeAndSendAsync(stamped, WireConstants.SynchronizeEvent, ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task TerminateAsync(BattleFinishReason reason)
|
||||||
|
{
|
||||||
|
// WS will close via the read loop exiting; nothing to do here.
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ValueTask DisposeAsync()
|
||||||
|
{
|
||||||
|
if (_ws.State == WebSocketState.Open || _ws.State == WebSocketState.CloseReceived)
|
||||||
|
{
|
||||||
|
try { _ws.Abort(); } catch { /* best effort */ }
|
||||||
|
}
|
||||||
|
_ws.Dispose();
|
||||||
|
return ValueTask.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task DispatchSocketIo(SocketIoFrame frame)
|
||||||
|
{
|
||||||
|
if (frame.Type is SocketIoPacketType.Event or SocketIoPacketType.BinaryEvent)
|
||||||
|
{
|
||||||
|
switch (frame.EventName)
|
||||||
|
{
|
||||||
|
case WireConstants.MsgEvent when frame.BinaryAttachments.Count == 1:
|
||||||
|
await HandleMsgEventAsync(frame);
|
||||||
|
return;
|
||||||
|
case WireConstants.AliveEvent when frame.BinaryAttachments.Count == 1:
|
||||||
|
await HandleAliveEventAsync(frame);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_log.LogDebug("RealParticipant viewer={Vid}: dropping SIO event={Event}", ViewerId, frame.EventName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task HandleMsgEventAsync(SocketIoFrame frame)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
MsgEnvelope env;
|
||||||
|
try { env = MsgPayloadCodec.Decode(frame.BinaryAttachments[0]); }
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_log.LogWarning(ex, "RealParticipant viewer={Vid}: failed to decode msg envelope", ViewerId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool shouldDispatch = true;
|
||||||
|
if (env.PubSeq.HasValue)
|
||||||
|
{
|
||||||
|
shouldDispatch = Inbound.Observe(env.PubSeq.Value);
|
||||||
|
if (frame.AckId.HasValue)
|
||||||
|
{
|
||||||
|
await SendSioAckAsync(frame.AckId.Value, env.PubSeq.Value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!shouldDispatch) return;
|
||||||
|
|
||||||
|
if (FrameEmitted is not null)
|
||||||
|
{
|
||||||
|
await FrameEmitted.Invoke(env, _sessionCt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_log.LogError(ex, "RealParticipant viewer={Vid}: unhandled in HandleMsgEventAsync", ViewerId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task HandleAliveEventAsync(SocketIoFrame frame)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (frame.AckId.HasValue)
|
||||||
|
{
|
||||||
|
await SendSioAckAsync(frame.AckId.Value, 0);
|
||||||
|
}
|
||||||
|
var aliveEnv = new MsgEnvelope(
|
||||||
|
Uri: NetworkBattleUri.Gungnir,
|
||||||
|
ViewerId: SVSim.BattleNode.Lifecycle.ScriptedLifecycle.FakeOpponentViewerId,
|
||||||
|
Uuid: WireConstants.ServerUuid,
|
||||||
|
Bid: null,
|
||||||
|
Try: 0,
|
||||||
|
Cat: EmitCategory.General,
|
||||||
|
PubSeq: null,
|
||||||
|
PlaySeq: null,
|
||||||
|
Body: new AlivePushBody(Scs: WireConstants.OnlineStatus, Ocs: WireConstants.OnlineStatus));
|
||||||
|
var stamped = Outbound.WrapNoStock(aliveEnv);
|
||||||
|
await EncodeAndSendAsync(stamped, WireConstants.AliveEvent, _sessionCt);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_log.LogError(ex, "RealParticipant viewer={Vid}: unhandled in HandleAliveEventAsync", ViewerId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task EncodeAndSendAsync(MsgEnvelope env, string eventName, CancellationToken ct)
|
||||||
|
{
|
||||||
|
var key = NodeCrypto.GenerateKey(() => RandomNumberGenerator.GetInt32(0, 16));
|
||||||
|
var bytes = MsgPayloadCodec.Encode(env, key);
|
||||||
|
var sio = SocketIoFrame.BinaryEventWithAttachments(eventName, new[] { bytes });
|
||||||
|
var (text, bins) = sio.Encode();
|
||||||
|
var eioText = $"{(int)EngineIoPacketType.Message}{text}";
|
||||||
|
await SendTextAsync(eioText, ct);
|
||||||
|
foreach (var bin in bins)
|
||||||
|
{
|
||||||
|
var prefixed = new byte[bin.Length + 1];
|
||||||
|
prefixed[0] = (byte)EngineIoPacketType.Message;
|
||||||
|
Buffer.BlockCopy(bin, 0, prefixed, 1, bin.Length);
|
||||||
|
await _ws.SendAsync(prefixed, WebSocketMessageType.Binary, endOfMessage: true, ct);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal static int ClipAckArg(long arg, ILogger log, long viewerId)
|
||||||
|
{
|
||||||
|
if (arg > int.MaxValue)
|
||||||
|
{
|
||||||
|
log.LogWarning("RealParticipant viewer={Vid}: pubSeq {Seq} exceeds int.MaxValue; clipping.", viewerId, arg);
|
||||||
|
return int.MaxValue;
|
||||||
|
}
|
||||||
|
if (arg < int.MinValue)
|
||||||
|
{
|
||||||
|
log.LogWarning("RealParticipant viewer={Vid}: pubSeq {Seq} below int.MinValue; clipping.", viewerId, arg);
|
||||||
|
return int.MinValue;
|
||||||
|
}
|
||||||
|
return (int)arg;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task SendSioAckAsync(int ackId, long arg)
|
||||||
|
{
|
||||||
|
var ack = SocketIoFrame.AckResponse(ackId, ClipAckArg(arg, _log, ViewerId));
|
||||||
|
var (text, _) = ack.Encode();
|
||||||
|
var eioText = $"{(int)EngineIoPacketType.Message}{text}";
|
||||||
|
await SendTextAsync(eioText, _sessionCt);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task SendEioOpenAsync(CancellationToken ct)
|
||||||
|
{
|
||||||
|
var sid = Guid.NewGuid().ToString("N").Substring(0, 16);
|
||||||
|
var handshake = new EngineIoHandshake(sid, Array.Empty<string>(), 25000, 60000).ToJson();
|
||||||
|
await SendTextAsync($"0{handshake}", ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Task SendTextAsync(string text, CancellationToken ct)
|
||||||
|
{
|
||||||
|
var bytes = Encoding.UTF8.GetBytes(text);
|
||||||
|
return _ws.SendAsync(bytes, WebSocketMessageType.Text, endOfMessage: true, ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<(byte[] Bytes, bool IsText)?> ReadCompleteMessageAsync(byte[] buffer, CancellationToken ct)
|
||||||
|
{
|
||||||
|
using var ms = new MemoryStream();
|
||||||
|
WebSocketReceiveResult result;
|
||||||
|
do
|
||||||
|
{
|
||||||
|
try { result = await _ws.ReceiveAsync(buffer, ct); }
|
||||||
|
catch (OperationCanceledException) { return null; }
|
||||||
|
catch (WebSocketException) { return null; }
|
||||||
|
if (result.MessageType == WebSocketMessageType.Close) return null;
|
||||||
|
ms.Write(buffer, 0, result.Count);
|
||||||
|
} while (!result.EndOfMessage);
|
||||||
|
return (ms.ToArray(), result.MessageType == WebSocketMessageType.Text);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,69 @@
|
|||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using NUnit.Framework;
|
||||||
|
using SVSim.BattleNode.Bridge;
|
||||||
|
using SVSim.BattleNode.Protocol;
|
||||||
|
using SVSim.BattleNode.Protocol.Bodies;
|
||||||
|
using SVSim.BattleNode.Sessions;
|
||||||
|
using SVSim.BattleNode.Sessions.Participants;
|
||||||
|
using SVSim.UnitTests.BattleNode.Infrastructure;
|
||||||
|
|
||||||
|
namespace SVSim.UnitTests.BattleNode.Sessions.Participants;
|
||||||
|
|
||||||
|
[TestFixture]
|
||||||
|
public class RealParticipantTests
|
||||||
|
{
|
||||||
|
[Test]
|
||||||
|
public void PushAsync_ordered_assigns_playSeq_via_OutboundSequencer()
|
||||||
|
{
|
||||||
|
var ws = new TestWebSocket();
|
||||||
|
var p = new RealParticipant(ws, viewerId: 1, FixtureCtx(),
|
||||||
|
NullLogger<RealParticipant>.Instance);
|
||||||
|
|
||||||
|
// First ordered push gets playSeq = 1; second = 2; etc.
|
||||||
|
// Inspect the participant's outbound sequencer state via its public Archive.
|
||||||
|
var env = NewEnvelope(NetworkBattleUri.Matched);
|
||||||
|
p.PushAsync(env, noStock: false, CancellationToken.None).Wait();
|
||||||
|
p.PushAsync(env, noStock: false, CancellationToken.None).Wait();
|
||||||
|
|
||||||
|
Assert.That(p.Outbound.Archive.Count, Is.EqualTo(2));
|
||||||
|
Assert.That(p.Outbound.Archive[1].PlaySeq, Is.EqualTo(1));
|
||||||
|
Assert.That(p.Outbound.Archive[2].PlaySeq, Is.EqualTo(2));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void PushAsync_noStock_omits_playSeq()
|
||||||
|
{
|
||||||
|
var ws = new TestWebSocket();
|
||||||
|
var p = new RealParticipant(ws, viewerId: 1, FixtureCtx(),
|
||||||
|
NullLogger<RealParticipant>.Instance);
|
||||||
|
|
||||||
|
p.PushAsync(NewEnvelope(NetworkBattleUri.BattleFinish), noStock: true, CancellationToken.None).Wait();
|
||||||
|
|
||||||
|
// No playSeq archive entry for no-stock pushes.
|
||||||
|
Assert.That(p.Outbound.Archive.Count, Is.EqualTo(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Test]
|
||||||
|
public void ViewerId_and_Context_are_exposed()
|
||||||
|
{
|
||||||
|
var ws = new TestWebSocket();
|
||||||
|
var ctx = FixtureCtx();
|
||||||
|
var p = new RealParticipant(ws, viewerId: 906243102L, ctx,
|
||||||
|
NullLogger<RealParticipant>.Instance);
|
||||||
|
|
||||||
|
Assert.That(p.ViewerId, Is.EqualTo(906243102L));
|
||||||
|
Assert.That(p.Context, Is.SameAs(ctx));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MatchContext FixtureCtx() => new(
|
||||||
|
SelfDeckCardIds: Enumerable.Range(1, 30).Select(_ => 100_011_010L).ToList(),
|
||||||
|
ClassId: "1", CharaId: "1", CardMasterName: "card_master_node_10015",
|
||||||
|
CountryCode: "KOR", UserName: "Player", SleeveId: "3000011",
|
||||||
|
EmblemId: "701441011", DegreeId: "300003", FieldId: 43, IsOfficial: 0,
|
||||||
|
BattleType: 11);
|
||||||
|
|
||||||
|
private static MsgEnvelope NewEnvelope(NetworkBattleUri uri) =>
|
||||||
|
new(uri, ViewerId: 1, Uuid: "u", Bid: null, Try: 0,
|
||||||
|
Cat: EmitCategory.Battle, PubSeq: null, PlaySeq: null,
|
||||||
|
Body: new ResultCodeOnlyBody());
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user