You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
108 lines
2.7 KiB
Python
108 lines
2.7 KiB
Python
from multiprocessing import Process,Semaphore
|
|
import multiprocessing as mp
|
|
import socket
|
|
import cmsisdsp.cg.nodes.host.message as msg
|
|
|
|
HOST = '127.0.0.1' # The remote host
|
|
PORT = 50007
|
|
|
|
class ModelicaConnectionLost(Exception):
|
|
pass
|
|
|
|
def connectToServer(inputMode,theid):
|
|
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.connect((HOST, PORT))
|
|
# Identify as vht input
|
|
if inputMode:
|
|
print("Connecting as INPUT")
|
|
theBytes=msg.list_to_bytes(msg.clientID(msg.VSIINPUT,theid))
|
|
else:
|
|
print("Connecting as OUTPUT")
|
|
theBytes=msg.list_to_bytes(msg.clientID(msg.VSIOUTPUT,theid))
|
|
#print("vs0: %d %d" % (int(theBytes[0]),int(theBytes[1])))
|
|
msg.sendBytes(s,theBytes)
|
|
return(s)
|
|
|
|
def source(theid,size,queue,started):
|
|
s=connectToServer(True,theid)
|
|
started.release()
|
|
try:
|
|
while True:
|
|
received=msg.receiveBytes(s,size)
|
|
queue.put(received)
|
|
except Exception as inst:
|
|
print(inst)
|
|
finally:
|
|
queue.close()
|
|
|
|
|
|
def sink(theid,size,queue,started):
|
|
s=connectToServer(False,theid)
|
|
data= bytes(size)
|
|
msg.sendBytes(s,data)
|
|
started.release()
|
|
try:
|
|
while True:
|
|
tosend=queue.get(True,2)
|
|
msg.sendBytes(s,tosend)
|
|
except Exception as inst:
|
|
print(inst)
|
|
finally:
|
|
queue.close()
|
|
|
|
class Source:
|
|
def __init__(self,theid,bufferSize):
|
|
self._bufferSize_ = bufferSize
|
|
self._srcQueue_ = mp.Queue()
|
|
self._started_ = Semaphore()
|
|
# Q15 data is sent so a *2 factor for bufferSize since
|
|
# source function is working with bytes
|
|
self._src_ = Process(target=source, args=(theid,2*bufferSize,self._srcQueue_,self._started_))
|
|
self._src_.start()
|
|
|
|
@property
|
|
def queue(self):
|
|
return(self._srcQueue_)
|
|
|
|
def get(self):
|
|
if self._src_.exitcode is None:
|
|
return(msg.bytes_to_list(self.queue.get(True,2)))
|
|
else:
|
|
raise ModelicaConnectionLost
|
|
|
|
def end(self):
|
|
self._src_.terminate()
|
|
|
|
def wait(self):
|
|
self._started_.acquire()
|
|
|
|
|
|
|
|
class Sink:
|
|
def __init__(self,theid,bufferSize):
|
|
self._bufferSize_ = bufferSize
|
|
self._sinkQueue_ = mp.Queue()
|
|
self._started_ = Semaphore()
|
|
# Q15 data is sent so a *2 factor for bufferSize since
|
|
# sink function is working with bytes
|
|
self._sink_ = Process(target=sink, args=(theid,2*bufferSize,self._sinkQueue_,self._started_))
|
|
self._sink_.start()
|
|
|
|
@property
|
|
def queue(self):
|
|
return(self._sinkQueue_)
|
|
|
|
def put(self,data):
|
|
if self._sink_.exitcode is None:
|
|
q15list=[int(x) for x in data]
|
|
self.queue.put(msg.list_to_bytes(q15list),True,1)
|
|
else:
|
|
raise ModelicaConnectionLost
|
|
|
|
def end(self):
|
|
self._sink_.terminate()
|
|
|
|
def wait(self):
|
|
self._started_.acquire()
|
|
|