using System; using System.Buffers; using System.Net; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using WsjtxUtils.WsjtxMessages; using WsjtxUtils.WsjtxMessages.Messages; namespace WsjtxUtils.WsjtxUdpServer { /// /// A UDP server for WSJT-X clients /// public class WsjtxUdpServer : IDisposable { /// /// The value used for the Default Maximum Transmission Unit (MTU) /// public const int DefaultMtu = 1500; /// /// Size of the datagram buffers in bytes /// private readonly int _datagramBufferSize; /// /// The socket used for communications /// private readonly Socket _socket; /// /// The target handler for WSJT-X messages /// private readonly IWsjtxUdpMessageHandler _messageHandler; /// /// The target logger for messages /// private readonly ILogger _logger; /// /// Source for cancellation tokens /// private CancellationTokenSource? _cancellationTokenSource; /// /// The datagram handling task /// private Task? _handleDatagramsTask; /// /// Get whether this object has been disposed. /// public bool IsDisposed { get; private set; } /// /// Is the specified address multicast /// public bool IsMulticast { get; private set; } /// /// Is the server currently running /// public bool IsRunning { get; private set; } /// /// The endpoint the server is bound to /// public IPEndPoint LocalEndpoint { get; private set; } /// Creates a new WSJT-X UDP server. /// The handler invoked for each decoded message. /// The local address to bind (use a multicast address to auto-subscribe). /// UDP port to listen on. Defaults to 2237. /// Size of the reception buffer in bytes. /// Optional logger; if null, a no-op logger is used. public WsjtxUdpServer(IWsjtxUdpMessageHandler wsjtxUdpMessageHandler, IPAddress address, int port = 2237, int datagramBufferSize = DefaultMtu, ILogger? logger = null) { // set the message handling object _messageHandler = wsjtxUdpMessageHandler; // size of the buffers to allocate for reading/writing _datagramBufferSize = datagramBufferSize; _logger = logger ?? NullLogger.Instance; // check if the address is multicast and setup accordingly IsMulticast = IsAddressMulticast(address); LocalEndpoint = IsMulticast ? new IPEndPoint(IPAddress.Any, port) : new IPEndPoint(address, port); // setup UDP socket allowing for shared addresses _socket = new Socket(SocketType.Dgram, ProtocolType.Udp) { ExclusiveAddressUse = false }; _socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); _socket.Bind(LocalEndpoint); // if multicast join the group if (IsMulticast) { if (address.AddressFamily == AddressFamily.InterNetwork) _socket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership, new MulticastOption(address, LocalEndpoint.Address)); else _socket.SetSocketOption(SocketOptionLevel.IPv6, SocketOptionName.AddMembership, new IPv6MulticastOption(address)); } } /// /// Destructor /// ~WsjtxUdpServer() { Dispose(false); } /// /// Start the UDP sever and process datagrams /// /// public void Start(CancellationTokenSource? cancellationTokenSource = null) { _logger.LogInformation("Starting WSJT-X UDP server on {Endpoint} (multicast={IsMulticast})", LocalEndpoint, IsMulticast); if (IsRunning) throw new InvalidOperationException("The server is already running."); IsRunning = true; _cancellationTokenSource = cancellationTokenSource ?? new CancellationTokenSource(); _handleDatagramsTask = HandleDatagramLoopAsync(_cancellationTokenSource.Token); } /// /// Stop the UDP server and datagram processing /// public void Stop() { _logger.LogInformation("Stopping WSJT-X UDP server"); if (!IsRunning) throw new InvalidOperationException("The server is not running."); try { _cancellationTokenSource?.Cancel(); _handleDatagramsTask?.Wait(1000); } catch (AggregateException aggregateException) { aggregateException.Handle(ex => ex is TaskCanceledException); } finally { IsRunning = false; } } /// /// Send a WSJT-X message to the specified endpoint /// /// /// /// /// The number of bytes sent public int SendMessageTo(EndPoint remoteEndpoint, T message) where T : WsjtxMessage, IWsjtxDirectionIn { if (string.IsNullOrEmpty(message.Id)) throw new ArgumentException($"The client id can not be null or empty when sending {typeof(T).Name}.", nameof(message)); var datagramBuffer = ArrayPool.Shared.Rent(_datagramBufferSize); try { var bytesWritten = message.WriteMessageTo(datagramBuffer); return _socket.SendTo(datagramBuffer, bytesWritten, SocketFlags.None, remoteEndpoint); } finally { ArrayPool.Shared.Return(datagramBuffer); } } /// /// Send a WSJT-X message to the specified endpoint /// /// /// /// /// /// The number of bytes sent public async ValueTask SendMessageToAsync(EndPoint remoteEndpoint, T message, CancellationToken cancellationToken = default) where T : WsjtxMessage, IWsjtxDirectionIn { if (string.IsNullOrEmpty(message.Id)) throw new ArgumentException($"The client id can not be null or empty when sending {typeof(T).Name}.", nameof(message)); var datagramBuffer = ArrayPool.Shared.Rent(_datagramBufferSize); try { var bytesWritten = message.WriteMessageTo(datagramBuffer); return await _socket.SendToAsync(new ArraySegment(datagramBuffer, 0, bytesWritten), SocketFlags.None, remoteEndpoint); } finally { ArrayPool.Shared.Return(datagramBuffer); } } /// /// Dispose /// public void Dispose() { if (IsRunning) Stop(); Dispose(true); GC.SuppressFinalize(this); } #region Private Methods /// /// Dispose /// /// private void Dispose(bool disposing) { if (IsDisposed) return; // cleanup datagram loop _cancellationTokenSource?.Cancel(); _handleDatagramsTask?.Wait(2500); // Give time to handle the packet if (disposing) { // managed items _cancellationTokenSource?.Dispose(); _socket.Dispose(); } IsDisposed = true; } /// /// Process incoming datagrams /// /// /// private async Task HandleDatagramLoopAsync(CancellationToken cancellationToken) { var datagramBuffer = new byte[_datagramBufferSize]; while (!cancellationToken.IsCancellationRequested) { // wait for and read the next datagram into the buffer try { #if NETFRAMEWORK var result = await _socket.ReceiveFromAsync(new ArraySegment(datagramBuffer), SocketFlags.None, LocalEndpoint); #else var result = await _socket.ReceiveFromAsync(datagramBuffer, SocketFlags.None, LocalEndpoint, cancellationToken); #endif // check that we actually read some data if (result.ReceivedBytes <= 0) { _logger?.LogWarning("No data was read from socket for endpoint {RemoteEndpoint}, skipping.", result.RemoteEndPoint); continue; } // extract the framed packet based on the number of bytes that were read var frame = datagramBuffer.AsMemory(0, result.ReceivedBytes); var message = frame.DeserializeWsjtxMessage(); if (message is null) { _logger?.LogWarning("Received invalid or null WSJT-X frame from {RemoteEndpoint}, skipping.", result.RemoteEndPoint); continue; } // get the correct handler for the given message type var messageHandlingTask = message.MessageType switch { MessageType.Clear => _messageHandler.HandleClearMessageAsync(this, (Clear)message, result.RemoteEndPoint, cancellationToken), MessageType.Close => _messageHandler.HandleClosedMessageAsync(this, (Close)message, result.RemoteEndPoint, cancellationToken), MessageType.Decode => _messageHandler.HandleDecodeMessageAsync(this, (Decode)message, result.RemoteEndPoint, cancellationToken), MessageType.Heartbeat => _messageHandler.HandleHeartbeatMessageAsync(this, (Heartbeat)message, result.RemoteEndPoint, cancellationToken), MessageType.LoggedADIF => _messageHandler.HandleLoggedAdifMessageAsync(this, (LoggedAdif)message, result.RemoteEndPoint, cancellationToken), MessageType.QSOLogged => _messageHandler.HandleQsoLoggedMessageAsync(this, (QsoLogged)message, result.RemoteEndPoint, cancellationToken), MessageType.Status => _messageHandler.HandleStatusMessageAsync(this, (Status)message, result.RemoteEndPoint, cancellationToken), MessageType.WSPRDecode => _messageHandler.HandleWSPRDecodeMessageAsync(this, (WSPRDecode)message, result.RemoteEndPoint, cancellationToken), _ => null // no handler for unsupported messages }; // add logging to faulted tasks _ = messageHandlingTask?.ContinueWith( t => _logger?.LogError(t.Exception, "Handler for {MessageType} threw an exception", message.MessageType), TaskContinuationOptions.OnlyOnFaulted); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { // normal shutdown break; } catch (SocketException socketException) { _logger?.LogWarning("A socketException occured with: {ExceptionMessage}", socketException.Message); } catch (Exception exception) { _logger?.LogError(exception, "Unexpected error in receive loop"); } } } #endregion #region Static Methods /// /// Determine if the address is a multicast group /// /// /// public static bool IsAddressMulticast(IPAddress address) { if (address.IsIPv6Multicast) return true; var addressBytes = address.GetAddressBytes(); if (addressBytes.Length == 4) return addressBytes[0] >= 224 && addressBytes[0] <= 239; return false; } #endregion } }