C# Client  0.0.0.7
C# Library to interface with Corelink
CorelinkWebSocket.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Concurrent;
3 using System.IO;
4 using System.Net.Security;
5 using System.Net.WebSockets;
6 using System.Security.Cryptography.X509Certificates;
7 using System.Text;
8 using System.Threading;
9 using System.Threading.Tasks;
10 using SimpleJSON;
11 using UnityEngine;
12 
13 namespace CoreLink
14 {
15  // Code from https://www.patrykgalach.com/2019/11/11/implementing-websocket-in-unity/
17  {
22 
23  // WebSocket
24  private ClientWebSocket ws;
25  private UTF8Encoding encoder; // For websocket text message encoding.
26  private const UInt64 MAXREADSIZE = 1 * 1024 * 1024;
27  // Server address
28  private Uri serverUri;
29 
30  // Queues
31  public ConcurrentQueue<String> receiveQueue { get; }
32  public BlockingCollection<ArraySegment<byte>> sendQueue { get; }
33 
34  // Threads
35  private Thread receiveThread { get; set; }
36  private Thread sendThread { get; set; }
37 
42  public CorelinkWebSocket(string serverURL)
43  {
44  encoder = new UTF8Encoding();
45  ws = new ClientWebSocket();
46  //X509CertificateCollection certs = ws.Options.ClientCertificates;
47  //certs.Add(new X509Certificate("D:\\School\\College\\GradSchool\\HolodeckWork\\CoreLink\\networktest\\server\\config\\ca-crt.pem"));
48  //certs.Add(new X509Certificate("D:\\School\\College\\GradSchool\\HolodeckWork\\CoreLink\\networktest\\server\\config\\server-crt.pem"));
49  //ws.Options.ClientCertificates = certs;
50 
51  serverUri = new Uri("wss://" + serverURL + ":20012");
52 
53  receiveQueue = new ConcurrentQueue<string>();
54  receiveThread = new Thread(runReceive);
55  receiveThread.Start();
56 
57  sendQueue = new BlockingCollection<ArraySegment<byte>>();
58  sendThread = new Thread(runSend);
59  sendThread.Start();
60  }
61 
66  public async Task connect()
67  {
68 
69  //ws.Options.RemoteCertificateValidationCallback = ValidateServerCertificate;
70  //ws.RemoteCertificateValidationCallback
71  //certs.Add()
72  Control.Print("Connecting to: " + serverUri);
73  // Removed the await keyword to end this gracefully if it can't connect instead of crashing unity
74  ws.ConnectAsync(serverUri, CancellationToken.None);
75  while (isConnecting())
76  {
77  Control.Print("Waiting to connect...");
78  Task.Delay(100).Wait();
79  }
80  Control.Print("Connect status: " + ws.State);
81  }
83  //public static bool ValidateServerCertificate(
84  // object sender,
85  // X509Certificate certificate,
86  // X509Chain chain,
87  // SslPolicyErrors sslPolicyErrors)
88  //{
89  // if (sslPolicyErrors == SslPolicyErrors.None)
90  // return true;
91 
92  // Console.WriteLine("Certificate error: {0}", sslPolicyErrors);
93 
94  // // Do not allow this client to communicate with unauthenticated servers.
95  // return false;
96  //}
97  override public void stopConnection()
98  {
99  ws.CloseAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None);
100  while (!(ws.State == WebSocketState.CloseReceived))
101  {
102  Control.Print("Waiting to disconnect...");
103  Task.Delay(50).Wait();
104  }
105  Control.Print("Connect status: " + ws.State);
106  }
107  #region [Status]
108 
113  public bool isConnecting()
114  {
115  return ws.State == WebSocketState.Connecting;
116  }
117 
122  public bool isConnectionOpen()
123  {
124  return ws.State == WebSocketState.Open;
125  }
126 
127  #endregion
128 
129  #region [Send]
130 
135  override public void sendMessage(string message)
136  {
137  byte[] buffer = encoder.GetBytes(message);
138  //Control.Print("Message to queue for send: " + buffer.Length + ", message: " + message);
139  var sendBuf = new ArraySegment<byte>(buffer);
140 
141  sendQueue.Add(sendBuf);
142  }
143  public string getResponse()
144  {
145  // Check if server send new messages
146  var cqueue = receiveQueue;
147  string msg;
148  int iterations = 0;
149  while (!cqueue.TryPeek(out msg) && iterations < Control.timeoutIterations)
150  {
151  Thread.Sleep(100);
152  iterations++;
153 
154  }
155  if (iterations == Control.timeoutIterations)
156  {
157  throw new Exception("Timeout error");
158  }
159  cqueue.TryDequeue(out msg);
160 
161  return msg;
162  }
166  private async void runSend()
167  {
168  Control.Print("WebSocket Message Sender looping.");
169  ArraySegment<byte> msg;
170  while (true)
171  {
172  while (!sendQueue.IsCompleted)
173  {
174  msg = sendQueue.Take();
175  //Control.Print("Dequeued this message to send: " + msg);
176  await ws.SendAsync(msg, WebSocketMessageType.Text, true /* is last part of message */ , CancellationToken.None);
177  }
178  }
179  }
180 
181  #endregion
182 
183  #region [Receive]
184 
190  private async Task<string> receive(UInt64 maxSize = MAXREADSIZE)
191  {
192  // A read buffer, and a memory stream to stuff unknown number of chunks into:
193  byte[] buf = new byte[4 * 1024];
194  var ms = new MemoryStream();
195  ArraySegment<byte> arrayBuf = new ArraySegment<byte>(buf);
196  WebSocketReceiveResult chunkResult = null;
197 
198  if (isConnectionOpen())
199  {
200  do
201  {
202  chunkResult = await ws.ReceiveAsync(arrayBuf, CancellationToken.None);
203  ms.Write(arrayBuf.Array, arrayBuf.Offset, chunkResult.Count);
204  //Control.Print("Size of Chunk message: " + chunkResult.Count);
205  if ((UInt64)(chunkResult.Count) > MAXREADSIZE)
206  {
207  Console.Error.WriteLine("Warning: Message is bigger than expected!");
208  }
209  } while (!chunkResult.EndOfMessage);
210  ms.Seek(0, SeekOrigin.Begin);
211 
212  // Looking for UTF-8 JSON type messages.
213  if (chunkResult.MessageType == WebSocketMessageType.Text)
214  {
215  return streamToString(ms, Encoding.UTF8);
216  }
217  }
218  return "";
219  }
220 
224  private async void runReceive()
225  {
226  Control.Print("WebSocket Message Receiver looping.");
227  string result;
228  while (true)
229  {
230  //Control.Print("Awaiting Receive...");
231  result = await receive();
232  if (result != null && result.Length > 0)
233  {
234  JSONNode response = JSON.Parse(result);
235  if (response["function"])
236  {
237  string function = response["function"].Value;
238  if (function.Equals("subscriber"))
239  {
240  onSubscriber(result);
241  }
242  else if (function.Equals("update"))
243  {
244  onUpdate(result);
245  }
246  else if (function.Equals("stale"))
247  {
248  onStale(result);
249  }
250  else if (function.Equals("dropped"))
251  {
252  onDropped(result);
253  }
254  else
255  {
256  Control.Print("Unknown callback. Maybe this library is outdated?");
257  }
258  }
259  else
260  {
261  receiveQueue.Enqueue(result);
262  }
263 
264  }
265  else
266  {
267  Task.Delay(50).Wait();
268  }
269  }
270  }
271 
272  #endregion
273 
274  #region [Utility]
275  public static string streamToString(MemoryStream ms, Encoding encoding)
282  {
283  string readString = "";
284  if (encoding == Encoding.UTF8)
285  {
286  using (var reader = new StreamReader(ms, encoding))
287  {
288  readString = reader.ReadToEnd();
289  }
290  }
291  return readString;
292  }
293  #endregion
294  }
295 }