This repository has been archived on 2024-07-22. You can view files and clone it, but cannot push or open issues or pull requests.
gameservicewarden/GameServiceWarden/GameServiceWarden.Core/UI/IPCMediator.cs

284 lines
12 KiB
C#

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipes;
using System.Net.Sockets;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using GameServiceWarden.InteractionAPI;
using GameServiceWarden.InteractionAPI.Communicable.Requests;
using GameServiceWarden.InteractionAPI.Communicable.Responses;
using SimpleLogger;
namespace GameServiceWarden.Core.UI
{
public class IPCMediator
{
private const int TIMEOUT = 1000;
public string PipeName {get { return name + ".pipe"; } }
private readonly string name;
private readonly ConcurrentDictionary<string, (NamedPipeServerStream, Task)> pipes;
public BlockingCollection<(string, CommunicableType, byte[])> RequestQueue { get; private set; }
private volatile bool active;
public bool IsRunning { get => active; }
private Task connectionTask;
private CancellationTokenSource stopAcceptingToken;
private volatile NamedPipeServerStream connectingPipe;
public IPCMediator(string name)
{
this.name = name;
pipes = new ConcurrentDictionary<string, (NamedPipeServerStream, Task)>();
RequestQueue = new BlockingCollection<(string, CommunicableType, byte[])>(new ConcurrentQueue<(string, CommunicableType, byte[])>());
}
public void Open()
{
if (active) throw new InvalidOperationException("IPC already opened.");
Logger.Log($"IPCMediator with name \"{name}\" opened.");
active = true;
stopAcceptingToken = new CancellationTokenSource();
connectionTask = AcceptConnections();
Logger.Log($"IPCMediator \"{name}\" has begun asynchronously accepting interfaces.", LogLevel.Debug);
}
public void Close()
{
if (!active) throw new InvalidOperationException("IPC not open.");
Logger.Log("Closing IPC mediator.");
active = false;
stopAcceptingToken.Cancel();
connectingPipe.Dispose();
InitiateDisconnectAll("Closing GameServiceWarden.").Wait();
try
{
if (!connectionTask.Wait(TIMEOUT)) throw new TimeoutException($"IPCMediator \"{name}\" was unable to stop accepting task within {TIMEOUT}ms.");
}
catch (AggregateException e)
{
e.Handle((exception) => exception is TaskCanceledException || (exception is SocketException && exception.Message.Equals("Operation canceled")));
}
RequestQueue.CompleteAdding();
stopAcceptingToken.Dispose();
}
public async Task ReplyAll(CommunicableType type, byte[] data)
{
IEnumerable<string> identifiers = pipes.Keys;
Stack<Task> replyTasks = new Stack<Task>();
foreach (string identifier in identifiers)
{
replyTasks.Push(Reply(identifier, type, data));
}
Task replyTask;
while (replyTasks.TryPop(out replyTask))
{
await replyTask;
}
}
public async Task Reply(string identifier, CommunicableType type, byte[] data)
{
CancellationTokenSource cancel = new CancellationTokenSource(1000);
byte[] header = ResponseHeader.Encode(type, (uint)data.Length);
await pipes[identifier].Item1.WriteAsync(header, 0, header.Length, cancel.Token);
await pipes[identifier].Item1.WriteAsync(data, cancel.Token);
cancel.Dispose();
}
public async Task InitiateDisconnect(string identifier, string reason)
{
Logger.Log($"Disconnecting \"{identifier}\". Reason: \"{reason}\"");
DisconnectResponse response;
response.reason = reason;
await Reply(identifier, CommunicableType.Disconnect, JsonSerializer.SerializeToUtf8Bytes(response));
(NamedPipeServerStream, Task) pipeAndTask = pipes[identifier];
pipeAndTask.Item1.Close();
await pipeAndTask.Item2;
Logger.Log($"Successfully disconnected \"{identifier}\".");
}
public async Task InitiateDisconnectAll(string reason) {
Logger.Log($"Disconnecting all of {pipes.Count} interfaces.");
foreach (string id in pipes.Keys)
{
await InitiateDisconnect(id, reason);
}
}
private async Task AcceptConnections()
{
List<Task> connectionTasks = new List<Task>();
Logger.Log("Accepting Interface connections.");
while (active)
{
connectingPipe = new NamedPipeServerStream(PipeName, PipeDirection.InOut, NamedPipeServerStream.MaxAllowedServerInstances, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
Logger.Log("Waiting for connection.", LogLevel.Debug);
await connectingPipe.WaitForConnectionAsync(stopAcceptingToken.Token);
connectionTasks.Add(OnConnection(connectingPipe));
for (int i = 0; i < connectionTasks.Count; i++)
{
if (connectionTasks[i].IsCompleted) {
Task connectionTask = connectionTasks[i];
connectionTasks.RemoveAt(i);
connectionTask.Wait();
}
}
}
Logger.Log("Waiting for connection tasks.", LogLevel.Debug);
foreach (Task task in connectionTasks)
{
task.Wait();
}
Logger.Log("Stopped accepting pipe connections.");
}
private async Task OnConnection(NamedPipeServerStream pipe)
{
Logger.Log("Interface attempting to connect.", LogLevel.Debug);
byte[] headerBuffer = new byte[sizeof(uint) * 2];
int headerFill = 0;
CancellationTokenSource headerCancel = new CancellationTokenSource(TIMEOUT);
try
{
int readLength;
do
{
readLength = await pipe.ReadAsync(headerBuffer, headerFill, headerBuffer.Length - headerFill, headerCancel.Token);
headerFill += readLength;
} while (readLength != 0 && headerFill != headerBuffer.Length);
}
catch (AggregateException e)
{
e.Handle((exception) => exception is TaskCanceledException);
Logger.Log($"Interface did not send header data within {TIMEOUT}ms.", LogLevel.Debug);
} finally {
await pipe.DisposeAsync();
headerCancel.Dispose();
}
if (headerFill != headerBuffer.Length) {
Logger.Log($"Interface failed to send header data.", LogLevel.Debug);
return;
}
CommunicableType comType;
uint bodyLength;
RequestHeader.Decode(headerBuffer, out comType, out bodyLength); //TODO do exception check.
byte[] bodyBuffer = new byte[bodyLength];
int bodyFill = 0;
CancellationTokenSource bodyCancel = new CancellationTokenSource(TIMEOUT);
try
{
int readLength = 0;
do
{
readLength = await pipe.ReadAsync(bodyBuffer, bodyFill, bodyBuffer.Length - bodyFill, bodyCancel.Token);
bodyFill += readLength;
} while (readLength != 0 && bodyFill != headerBuffer.Length);
}
catch (AggregateException e)
{
e.Handle((exception) => exception is TaskCanceledException);
Logger.Log($"Interface failed to send body data within {TIMEOUT}.", LogLevel.Debug);
} finally {
await pipe.DisposeAsync();
bodyCancel.Dispose();
}
if (bodyFill != bodyBuffer.Length) {
Logger.Log($"Interface failed to send body data.", LogLevel.Debug);
return;
}
ConnectRequest request = JsonSerializer.Deserialize<ConnectRequest>(bodyBuffer);
ConnectResponse response = new ConnectResponse();
bool requestAccepted = false;
if (string.IsNullOrWhiteSpace(request.requestedIdentifier)) {
response.invalidName = true;
response.errorMsg = $"The requested identifier \"{request.requestedIdentifier}\" is null or whitespace.";
Logger.Log(response.errorMsg, LogLevel.Debug);
} else if (pipes.ContainsKey(request.requestedIdentifier)) {
response.invalidName = true;
response.nameTaken = true;
response.errorMsg = $"Interface requested identifier \"{request.requestedIdentifier}\" is taken.";
Logger.Log(response.errorMsg, LogLevel.Debug);
} else {
requestAccepted = true;
response.identifier = request.requestedIdentifier;
}
CancellationTokenSource cancelResponse = new CancellationTokenSource(TIMEOUT);
try
{
await pipe.WriteAsync(JsonSerializer.SerializeToUtf8Bytes(response), cancelResponse.Token);
}
catch (AggregateException e)
{
e.Handle((exception) => exception is TaskCanceledException);
Logger.Log($"Interface did not receive response within {TIMEOUT}ms.", LogLevel.Debug);
}
if (!requestAccepted) {
cancelResponse.Dispose();
await pipe.DisposeAsync();
Logger.Log($"Interface failed to connect.");
return;
}
Logger.Log($"Interface \"{response.identifier}\" connected.");
pipes[request.requestedIdentifier] = (pipe, Listen(response.identifier, pipe));
}
private async Task Listen(string identifier, NamedPipeServerStream pipe)
{
Logger.Log($"Started listening to interface \"{identifier}\".", LogLevel.Debug);
byte[] buffer = new byte[1024];
byte[] headerBuffer = new byte[sizeof(uint) * 2];
byte[] bodyBuffer = null;
int bodyFill = 0;
int headerFill = 0;
int readLength = 0;
CommunicableType? comType = null;
while ((readLength = await pipe.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
for (int i = 0; i < readLength; i++)
{
if (bodyBuffer == null)
{
headerBuffer[headerFill] = buffer[i];
headerFill++;
if (headerFill == headerBuffer.Length)
{
uint length;
CommunicableType type;
RequestHeader.Decode(headerBuffer, out type, out length);
bodyBuffer = new byte[length];
headerFill = 0;
comType = type;
}
}
else
{
bodyBuffer[bodyFill] = buffer[i];
bodyFill++;
if (bodyFill == bodyBuffer.Length)
{
RequestQueue.Add((identifier, comType.Value, bodyBuffer));
bodyFill = 0;
bodyBuffer = null;
}
}
}
}
Logger.Log($"Pipe for interface \"{identifier}\" has closed.", LogLevel.Debug);
(NamedPipeServerStream, Task) removedPipe;
pipes.TryRemove(identifier, out removedPipe);
await removedPipe.Item1.DisposeAsync();
Logger.Log($"Stopped listening to interface \"{identifier}\".", LogLevel.Debug);
}
}
}