2 using System.Collections.Concurrent;
5 using System.Net.WebSockets;
6 using System.Security.Cryptography.X509Certificates;
9 using System.Threading.Tasks;
24 private ClientWebSocket ws;
25 private UTF8Encoding encoder;
26 private const UInt64 MAXREADSIZE = 1 * 1024 * 1024;
28 private Uri serverUri;
32 public BlockingCollection<ArraySegment<byte>>
sendQueue {
get; }
35 private Thread receiveThread {
get;
set; }
36 private Thread sendThread {
get;
set; }
44 encoder =
new UTF8Encoding();
45 ws =
new ClientWebSocket();
51 serverUri =
new Uri(
"wss://" + serverURL +
":20012");
54 receiveThread =
new Thread(runReceive);
55 receiveThread.Start();
57 sendQueue =
new BlockingCollection<ArraySegment<byte>>();
58 sendThread =
new Thread(runSend);
74 ws.ConnectAsync(serverUri, CancellationToken.None);
78 Task.Delay(100).Wait();
99 ws.CloseAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None);
100 while (!(ws.State == WebSocketState.CloseReceived))
103 Task.Delay(50).Wait();
115 return ws.State == WebSocketState.Connecting;
124 return ws.State == WebSocketState.Open;
137 byte[] buffer = encoder.GetBytes(message);
139 var sendBuf =
new ArraySegment<byte>(buffer);
157 throw new Exception(
"Timeout error");
159 cqueue.TryDequeue(out msg);
166 private async
void runSend()
169 ArraySegment<byte> msg;
176 await ws.SendAsync(msg, WebSocketMessageType.Text,
true , CancellationToken.None);
190 private async Task<string> receive(UInt64 maxSize = MAXREADSIZE)
193 byte[] buf =
new byte[4 * 1024];
194 var ms =
new MemoryStream();
195 ArraySegment<byte> arrayBuf =
new ArraySegment<byte>(buf);
196 WebSocketReceiveResult chunkResult = null;
202 chunkResult = await ws.ReceiveAsync(arrayBuf, CancellationToken.None);
203 ms.Write(arrayBuf.Array, arrayBuf.Offset, chunkResult.Count);
205 if ((UInt64)(chunkResult.Count) > MAXREADSIZE)
207 Console.Error.WriteLine(
"Warning: Message is bigger than expected!");
209 }
while (!chunkResult.EndOfMessage);
210 ms.Seek(0, SeekOrigin.Begin);
213 if (chunkResult.MessageType == WebSocketMessageType.Text)
224 private async
void runReceive()
231 result = await receive();
232 if (result != null && result.Length > 0)
234 JSONNode response = JSON.Parse(result);
235 if (response[
"function"])
237 string function = response[
"function"].Value;
238 if (
function.Equals(
"subscriber"))
242 else if (
function.Equals(
"update"))
246 else if (
function.Equals(
"stale"))
250 else if (
function.Equals(
"dropped"))
256 Control.
Print(
"Unknown callback. Maybe this library is outdated?");
267 Task.Delay(50).Wait();
275 public static string streamToString(MemoryStream ms, Encoding encoding)
283 string readString =
"";
284 if (encoding == Encoding.UTF8)
286 using (var reader =
new StreamReader(ms, encoding))
288 readString = reader.ReadToEnd();
override void sendMessage(string message)
Method used to send a message to the server.
virtual void onUpdate(string msg)
virtual void onDropped(string msg)
virtual void onStale(string msg)
bool isConnecting()
Return if is connecting to the server.
static int timeoutIterations
In some callbacks, the code is waiting on a response from the server. This specifies the number of 10...
override void stopConnection()
CorelinkWebSocket(string serverURL)
Initializes a new instance of the T:WsClient class.
async Task connect()
Method which connects client to the server.
BlockingCollection< ArraySegment< byte > > sendQueue
bool isConnectionOpen()
Return if connection with server is open.
ConcurrentQueue< String > receiveQueue
static void Print(string message)
Abstract class for a control stream that contains the function prototypes for the server callback fun...
Singleton class to communicate with Corelink.
virtual void onSubscriber(string msg)
static string streamToString(MemoryStream ms, Encoding encoding)
Converts memory stream into string.