Python Client  0.0.0.0
Python wrapper for Corelink DLL
corelink_client.py
Go to the documentation of this file.
1 """
2  @file corelink_client.py
3  Contains client to server commands.
4 """
5 import ctypes
6 import corelink_const
7 import corelink_exception
8 import corelink_send_stream
9 import corelink_stream_data
10 import corelink_recv_stream
11 
12 # pylint: disable=too-many-arguments
13 # Reasonable in this case.
14 
15 class Client:
16  """
17  Client class
18  """
19  @staticmethod
20  def is_active():
21  """
22  check whether it is active
23  """
24  corelink_const.LIB.getClientState.restype = ctypes.c_bool
25  ans = corelink_const.LIB.getClientState()
26  return ans
27 
28  @staticmethod
29  def connect(server_ip,port):
30  """
31  Connect
32  """
33  error_id_c = ctypes.c_int()
34  server_ip_c = ctypes.create_string_buffer(server_ip.encode('utf-8'))
35  port_c = ctypes.c_int(port)
36  corelink_const.LIB.corelinkConnect.argtypes = [ctypes.c_char_p,
37  ctypes.c_int,
38  ctypes.POINTER(ctypes.c_int)]
39  corelink_const.LIB.corelinkConnect(server_ip_c,port_c,ctypes.byref(error_id_c))
41 
42  @staticmethod
44  """
45  get_comm_response_data_helper
46  """
47  data_parsed = []
48  total_len = int.from_bytes(data[0],"little")
49  int_size = ctypes.sizeof(ctypes.c_int)
50  cur_p = int_size
51  cur_len = int.from_bytes(data[cur_p],"little")
52  cur_msg_p = (total_len+1)*int_size
53  for _ in range(total_len):
54  data_parsed.append(data[cur_msg_p:cur_msg_p+cur_len])
55  cur_msg_p = cur_msg_p + cur_len
56  cur_p = cur_p + int_size
57  cur_len = int.from_bytes(data[cur_p],"little")
58  return data_parsed
59 
60  @staticmethod
61  def cleanup():
62  """
63  Clean up
64  """
65  if not corelink_const.LIB.getClientState():
66  return
68  for i in streams:
71  corelink_const.LIB.corelinkCleanup()
72 
73  @staticmethod
74  def get_comm_response_data(comm_id_c):
75  """
76  get comm response data
77  """
78  response_len_c = ctypes.c_int()
79  corelink_const.LIB.getCommData.argtypes = [ctypes.POINTER(ctypes.c_int),
80  ctypes.POINTER(ctypes.c_int)]
81  corelink_const.LIB.getCommData.restype = ctypes.POINTER(ctypes.c_char)
82  response_msg = corelink_const.LIB.getCommData(ctypes.byref(comm_id_c),
83  ctypes.byref(response_len_c))
84  return Client.get_comm_response_data_helper(response_msg)
85 
86  @staticmethod
87  def get_data_stream_info(comm_id_c):
88  """
89  get data stream info
90  """
91  data_out = Client.get_comm_response_data(comm_id_c)
92  stream_id = int.from_bytes(data_out[0],"little")
93  state = int.from_bytes(data_out[1],"little")
94  mtu = int.from_bytes(data_out[2],"little")
95  stream_id_c = ctypes.c_int(stream_id)
96  corelink_const.LIB.getStreamRef.argtypes = [ctypes.POINTER(ctypes.c_int)]
97  corelink_const.LIB.getStreamRef.restype = ctypes.c_int
98  stream_ref = corelink_const.LIB.getStreamRef(ctypes.byref(stream_id_c))
99  return corelink_stream_data.StreamData(stream_id,
100  state,mtu,
101  data_out[3],
102  data_out[4],
103  data_out[5],
104  stream_ref,
105  data_out[6])
106 
107  @staticmethod
108  def create_sender(workspace,types,meta,echo,alert,protocol):
109  """
110  create sender
111  """
112  comm_id_c = ctypes.c_int()
113  error_id_c = ctypes.c_int()
114  workspace_c = ctypes.create_string_buffer(workspace.encode('utf-8'))
115  type_c = ctypes.create_string_buffer(types.encode('utf-8'))
116  meta_c = ctypes.create_string_buffer(meta.encode('utf-8'))
117  echo_c = ctypes.c_bool(False)
118  alert_c = ctypes.c_bool(False)
119  if echo:
120  echo_c = ctypes.c_bool(True)
121  if alert:
122  alert_c = ctypes.c_bool(True)
123  protocol_c = ctypes.c_int(protocol)
124 
125  corelink_const.LIB.commAddSender.argtypes = [ctypes.c_char_p,
126  ctypes.c_char_p,
127  ctypes.c_char_p,
128  ctypes.c_bool,
129  ctypes.c_bool,
130  ctypes.c_int,
131  ctypes.POINTER(ctypes.c_int),
132  ctypes.POINTER(ctypes.c_int)]
133  corelink_const.LIB.commAddSender(workspace_c,
134  type_c,
135  meta_c,
136  echo_c,
137  alert_c,
138  protocol_c,
139  ctypes.byref(comm_id_c),
140  ctypes.byref(error_id_c))
142  return corelink_send_stream.SendStream(Client.get_data_stream_info(comm_id_c))
143 
144  @staticmethod
145  def subscribe(receiver_id,sender_id):
146  """
147  Subscribe
148  """
149  receiver_id = ctypes.c_int(receiver_id)
150  sender_id = ctypes.c_int(sender_id)
151  corelink_const.LIB.commSubscribe.argtypes = [ctypes.POINTER(ctypes.c_int),
152  ctypes.POINTER(ctypes.c_int)]
153  corelink_const.LIB.commSubscribe.restype = ctypes.c_bool
154  output = corelink_const.LIB.commSubscribe(receiver_id,sender_id)
155  return output
156 
157  @staticmethod
158  def un_subscribe(receiver_id,sender_id):
159  """
160  Un subscribe
161  """
162  receiver_id = ctypes.c_int(receiver_id)
163  sender_id = ctypes.c_int(sender_id)
164  corelink_const.LIB.commUnSubscribe.argtypes = [ctypes.POINTER(ctypes.c_int),
165  ctypes.POINTER(ctypes.c_int)]
166  corelink_const.LIB.commUnSubscribe.restype = ctypes.c_bool
167  output = corelink_const.LIB.commUnSubscribe(receiver_id,sender_id)
168  return output
169 
170  @staticmethod
171  def rm_stream(sender_id):
172  """
173  remove stream
174  """
175  sender_id = ctypes.c_int(sender_id)
176  corelink_const.LIB.commDisconnect.argtypes = [ctypes.POINTER(ctypes.c_int)]
177  corelink_const.LIB.commDisconnect.restype = ctypes.c_bool
178  output = corelink_const.LIB.commDisconnect(sender_id)
179  return output
180 
181  @staticmethod
183  """
184  vec_string_to_char_array
185  """
186  lp_c_char = ctypes.POINTER(ctypes.c_char)
187  lp_lp_c_char = ctypes.POINTER(lp_c_char)
188  types_c = (ctypes.c_char_p * len(types))()
189  for (index,val) in enumerate(types):
190  types_c[index] = val.encode("UTF-8")
191  ans = ctypes.cast(types_c, lp_lp_c_char)
192  return ans
193 
194  @staticmethod
195  def create_receiver(workspace,types,meta,echo,alert,protocol):
196  """
197  create receiver
198  """
199  comm_id_c = ctypes.c_int()
200  error_id_c = ctypes.c_int()
201  workspace_c = ctypes.create_string_buffer(workspace.encode('utf-8'))
202  types_c = Client.vec_string_to_char_array(types)
203  meta_c = ctypes.create_string_buffer(meta.encode('utf-8'))
204  echo_c = ctypes.c_bool(False)
205  alert_c = ctypes.c_bool(False)
206  if echo:
207  echo_c = ctypes.c_bool(True)
208  if alert:
209  alert_c = ctypes.c_bool(True)
210  protocol_c = ctypes.c_int(protocol)
211  types_size_c = ctypes.c_int(len(types))
212  corelink_const.LIB.commAddReceiver.argtypes = [ctypes.c_char_p,
213  ctypes.POINTER(ctypes.POINTER(ctypes.c_char)),
214  ctypes.c_int,
215  ctypes.c_char_p,
216  ctypes.c_bool,
217  ctypes.c_bool,
218  ctypes.c_int,
219  ctypes.POINTER(ctypes.c_int),
220  ctypes.POINTER(ctypes.c_int)]
221  corelink_const.LIB.commAddReceiver(workspace_c,
222  types_c,
223  types_size_c,
224  meta_c,
225  echo_c,
226  alert_c,
227  protocol_c,
228  ctypes.byref(comm_id_c),
229  ctypes.byref(error_id_c))
231  return corelink_recv_stream.RecvStream(Client.get_data_stream_info(comm_id_c))
232 
233  @staticmethod
235  """
236  connect plugin
237  """
238  error_id_c = ctypes.c_int()
239  corelink_const.LIB.corelinkConnectPlugin.argtypes = [ctypes.POINTER(ctypes.c_int)]
240  corelink_const.LIB.corelinkConnectPlugin(ctypes.byref(error_id_c))
242 
243  @staticmethod
244  def get_token():
245  """
246  get token
247  """
248  corelink_const.LIB.getTokenStr.argtypes = [ctypes.POINTER(ctypes.c_char)]
249  token = ctypes.create_string_buffer(corelink_const.LIB.getTokenLen())
250  corelink_const.LIB.getTokenStr(token)
251  return token.value.decode("utf-8")
252 
253  @staticmethod
255  """
256  list server functions
257  """
258  comm_id_c = ctypes.c_int()
259  error_id_c = ctypes.c_int()
260  corelink_const.LIB.commListFunctions.argtypes = [ctypes.POINTER(ctypes.c_int),
261  ctypes.POINTER(ctypes.c_int)]
262  corelink_const.LIB.commListFunctions(ctypes.byref(comm_id_c), ctypes.byref(error_id_c))
264  temp = Client.get_comm_response_data(comm_id_c)
265  ans = [i.decode("utf-8") for i in temp]
266  return ans
267 
268  @staticmethod
270  """
271  describe server function
272  """
273  comm_id_c = ctypes.c_int()
274  error_id_c = ctypes.c_int()
275  function = str(function)
276  corelink_const.LIB.commGetFunctionInfo.argtypes = [ctypes.POINTER(ctypes.c_char),
277  ctypes.POINTER(ctypes.c_int),
278  ctypes.POINTER(ctypes.c_int)]
279  corelink_const.LIB.commGetFunctionInfo(function.encode("utf-8"),
280  ctypes.byref(comm_id_c),
281  ctypes.byref(error_id_c))
283  temp = Client.get_comm_response_data(comm_id_c)
284  ans = [i.decode("utf-8") for i in temp]
285  return ans
286 
287  @staticmethod
288  def add_workspace(workspace):
289  """
290  add workspace
291  """
292  workspace = str(workspace)
293  corelink_const.LIB.commAddWorkspace.argtypes = [ctypes.POINTER(ctypes.c_char)]
294  corelink_const.LIB.commAddWorkspace(workspace.encode("utf-8"))
295 
296  @staticmethod
297  def rm_workspace(workspace):
298  """
299  remove workspace
300  """
301  workspace = str(workspace)
302  corelink_const.LIB.commRmWorkspace.argtypes = [ctypes.POINTER(ctypes.c_char)]
303  corelink_const.LIB.commRmWorkspace(workspace.encode("utf-8"))
304 
305  @staticmethod
307  """
308  list workspace
309  """
310  comm_id_c = ctypes.c_int()
311  error_id_c = ctypes.c_int()
312  corelink_const.LIB.commListWorkspaces.argtypes = [ctypes.POINTER(ctypes.c_int),
313  ctypes.POINTER(ctypes.c_int)]
314  corelink_const.LIB.commListWorkspaces(ctypes.byref(comm_id_c), ctypes.byref(error_id_c))
316  temp = Client.get_comm_response_data(comm_id_c)
317  ans = [i.decode("utf-8") for i in temp]
318  return ans
319 
320  @staticmethod
321  def stream_info(stream_id):
322  """
323  stream info
324  """
325  stream_id_c = ctypes.c_int(stream_id)
326  comm_id_c = ctypes.c_int()
327  error_id_c = ctypes.c_int()
328  corelink_const.LIB.commGetStreamInfo.argtypes = [ctypes.POINTER(ctypes.c_int),
329  ctypes.POINTER(ctypes.c_int),
330  ctypes.POINTER(ctypes.c_int)]
331  corelink_const.LIB.commGetStreamInfo(ctypes.byref(stream_id_c),
332  ctypes.byref(comm_id_c),
333  ctypes.byref(error_id_c))
335  temp = Client.get_data_stream_info(comm_id_c)
336  return temp