-
Notifications
You must be signed in to change notification settings - Fork 0
/
TCPSmartServer.cs
229 lines (192 loc) · 7.86 KB
/
TCPSmartServer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
using CommsLIB.Communications;
using CommsLIB.Communications.FrameWrappers;
using CommsLIB.Logging;
using Microsoft.Extensions.Logging;
using ProtoBuf;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
namespace CommsLIB.Communications
{
public class TCPSmartServer<T , U> : IDisposable where T : FrameWrapperBase<U>, new()
{
#region logger
private readonly ILogger<TCPSmartServer<T, U>> logger = null;
#endregion
#region consts
private const int RCV_BUFFER_SIZE = 8192;
#endregion
#region members
private TcpListener server = null;
private int ListeningPort;
private string ListeningIP;
private Dictionary<string, CommunicatorBase<U>> ClientList = new Dictionary<string, CommunicatorBase<U>>();
private object lockerClientList = new object();
public event DataReadyEventHandler DataReadyEvent;
public delegate void DataReadyEventHandler(string ip, int port, long time, byte[] bytes, int offset, int length, string ID, ushort[] ipChunks);
public delegate void FrameReadyDelegate(U message, string ID);
public event FrameReadyDelegate FrameReadyEvent;
public delegate void ConnectionStateDelegate(string SourceID, bool connected);
public event ConnectionStateDelegate ConnectionStateEvent;
private Task listenTask;
private CancellationTokenSource cancelListenSource;
private CancellationToken cancelListenToken;
private Task senderTask;
private CancellationTokenSource cancelSenderSource;
private CancellationToken cancelSenderToken;
#endregion
#region fields
private bool UseCircularBuffers = false;
private Func<T> instantiateFrameWrapperFunc;
#endregion
public TCPSmartServer(int _port, string _ip = null, bool _useCircularBuffers=false, Func<T> _instantiateFrameWrapperFunc = null)
{
ListeningPort = _port;
ListeningIP = _ip;
UseCircularBuffers = _useCircularBuffers;
instantiateFrameWrapperFunc = _instantiateFrameWrapperFunc;
logger = this.GetLogger();
}
public void Start()
{
Stop();
server = new TcpListener(string.IsNullOrEmpty(ListeningIP) ? IPAddress.Any : IPAddress.Parse(ListeningIP), ListeningPort);
server.Start();
cancelListenSource = new CancellationTokenSource();
cancelListenToken = cancelListenSource.Token;
listenTask = new Task(() => DoListenForClients(server, cancelListenToken), cancelListenToken, TaskCreationOptions.LongRunning);
listenTask.Start();
}
public void Stop()
{
if (cancelListenToken.CanBeCanceled)
{
cancelListenSource.Cancel();
server.Stop();
server = null;
listenTask.Wait();
}
}
private static string GetIDFromSocket(Socket s)
{
return (s.RemoteEndPoint as IPEndPoint).Address.ToString() + ":" + (s.RemoteEndPoint as IPEndPoint).Port.ToString();
}
private void DoListenForClients(object state, CancellationToken token)
{
TcpListener _server = (state as TcpListener);
while (!cancelListenToken.IsCancellationRequested)
{
logger?.LogInformation("Waiting for a connection... ");
// Perform a blocking call to accept requests.
TcpClient tcpClient = _server.AcceptTcpClient();
// Get ID
string id = GetIDFromSocket(tcpClient.Client);
// Create Framewrapper
var framewrapper = instantiateFrameWrapperFunc != null ? instantiateFrameWrapperFunc() : new T();
// Create TCPNetCommunicator
CommunicatorBase<U> communicator = new TCPNETCommunicator<U>(tcpClient, framewrapper, UseCircularBuffers);
// Add to dict
lock (lockerClientList)
{
ClientList.Add(id, communicator);
}
// Subscribe to events
communicator.ConnectionStateEvent += OnCommunicatorConnection;
communicator.DataReadyEvent += OnCommunicatorData;
framewrapper.FrameAvailableEvent += OnFrameReady;
communicator.Init(null, false, id, 0);
framewrapper.Start();
communicator.Start();
}
}
private void OnFrameReady(string ID, U payload)
{
// Raise
FrameReadyEvent?.Invoke(payload, ID);
}
private void OnCommunicatorData(string ip, int port, long time, byte[] bytes, int offset, int length, string ID, ushort[] ipChunks)
{
// Not used normally
DataReadyEvent?.Invoke(ip, port, time, bytes, offset, length, ID, ipChunks);
}
private void OnCommunicatorConnection(string ID, CommsLIB.Base.ConnUri uri, bool connected)
{
// Raise
ConnectionStateEvent?.Invoke(ID, connected);
// Remove if disconnected
if (!connected)
lock (lockerClientList)
{
if (ClientList.TryGetValue(ID, out CommunicatorBase<U> communicator))
{
communicator.Dispose();
ClientList.Remove(ID);
}
}
}
public void Send2All(U data)
{
lock (lockerClientList)
{
foreach (KeyValuePair<string, CommunicatorBase<U>> kv in ClientList)
kv.Value.SendSync(data);
}
}
/// <summary>
/// Use only with Circular Buffer
/// </summary>
/// <param name="data"></param>
public void Send2AllAsync(U data)
{
if (!UseCircularBuffers)
throw new Exception("Cant use Send2AllAsync in this mode. Please use Circular Buffer");
lock (lockerClientList)
{
foreach (KeyValuePair<string, CommunicatorBase<U>> kv in ClientList)
kv.Value.SendASync(data);
}
}
public void Send2All(byte[] buff, int size)
{
lock (lockerClientList)
{
foreach (KeyValuePair<string, CommunicatorBase<U>> kv in ClientList)
kv.Value.SendASync(buff, size);
}
}
#region IDisposable Support
private bool disposedValue = false; // To detect redundant calls
// TODO
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
// TODO: dispose managed state (managed objects).
}
// TODO: free unmanaged resources (unmanaged objects) and override a finalizer below.
// TODO: set large fields to null.
disposedValue = true;
}
}
// TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
// ~TCPSmartServer()
// {
// // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
// Dispose(false);
// }
// This code added to correctly implement the disposable pattern.
public void Dispose()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
// TODO: uncomment the following line if the finalizer is overridden above.
// GC.SuppressFinalize(this);
}
#endregion
}
}