OpenCores
URL https://opencores.org/ocsvn/riscv_vhdl/riscv_vhdl/trunk

Subversion Repositories riscv_vhdl

[/] [riscv_vhdl/] [trunk/] [debugger/] [scripts/] [rpc/] [client.py] - Rev 5

Compare with Previous | Blame | View Log

"""
 @copyright  Copyright 2017 GNSS Sensor Ltd. All right reserved.
 @author     Sergey Khabarov - sergeykhbr@gmail.com
 @brief      Transport TCP layer of the Python API.
"""
 
import threading
import socket
import time
from safe import safe_print
 
TCP_IP = '127.0.0.1'
TCP_PORT = 8687
BUFFER_SIZE = 1024
 
TCP_DEBUG = 1
 
class TcpClient(threading.Thread):
    def __init__(self, name, eventDone):
        threading.Thread.__init__(self)
        self.name = name
        self.skt = None
        self.eventDone = eventDone
        self.messageid = 0
        self.enabled = True
        self.eventTx = threading.Event()
        self.response = ""
        self.console_listeners = []
 
    def run(self):
        safe_print("Connecting to {0}:{1}\n".format(TCP_IP, TCP_PORT))
        self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.skt.connect((TCP_IP, TCP_PORT))
        self.eventDone.set()
 
        buffer = ""
        while self.enabled:
             rxstr = self.skt.recv(BUFFER_SIZE)
             if rxstr == '':
                 self.enabled = False
                 continue
 
             buffer += rxstr
             if '\0' in buffer:
                 strlist = buffer.split('\0')
                 if buffer.endswith('\0'):
                     buffer = ""
                 else:
                     buffer = strlist[len(strlist) - 1]
                     strlist = strlist[:-1]
 
                 for s in strlist:
                     if len(s) == 0:
                         continue
                     json = s.translate(None,'\n\r\0')
                     json = eval(json)
                     if TCP_DEBUG == 1:
                         safe_print("i<= {0}".format(json))
                     if json[0] == self.messageid:
                          self.messageid += 1
                          self.response = None
                          if len(json) > 1:
                              self.response = json[1]
                          self.eventTx.set()
                     elif json[0] == "Console":
                          for l in self.console_listeners:
                              l.callback(json[1])
                     else:
                          raise ValueError(
                            'Unexpected simulation response: {0}'.format(json))
             else:
                 pass
 
        # Ending of the thread:
        safe_print("Thread {0} stopped".format(self.name))
 
    def stop(self):
        self.enabled = False
        self.skt.shutdown(socket.SHUT_WR)
 
    def send(self, data):
        data.insert(0, self.messageid)
        tx = bytearray(str(data))
        tx.append(0)
        if TCP_DEBUG == 1:
            safe_print("o=> {0}".format(tx))
 
        self.eventTx.clear()
        self.response = ""
        self.skt.send(tx)
        self.eventTx.wait()
        return self.response
 
    def registerConsoleListener(self, listener):
        self.console_listeners.append(listener)
 
    def unregisterConsoleListener(self, listener):
        if listener in self.console_listeners:
            self.console_listeners.remove(listener)
 

Compare with Previous | Blame | View Log

powered by: WebSVN 2.1.0

© copyright 1999-2024 OpenCores.org, equivalent to Oliscience, all rights reserved. OpenCores®, registered trademark.