diff options
author | dim <dim@FreeBSD.org> | 2016-01-06 20:12:03 +0000 |
---|---|---|
committer | dim <dim@FreeBSD.org> | 2016-01-06 20:12:03 +0000 |
commit | 78b9749c0a4ea980a8b934645da6ae98fcc665e8 (patch) | |
tree | dd2a1ddf0476664c2b823409c36cbccd52662ca7 /packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py | |
parent | 60cb593f9d55fa5ca7a5372b731f2330345b4b9a (diff) | |
download | FreeBSD-src-78b9749c0a4ea980a8b934645da6ae98fcc665e8.zip FreeBSD-src-78b9749c0a4ea980a8b934645da6ae98fcc665e8.tar.gz |
Vendor import of lldb trunk r256945:
https://llvm.org/svn/llvm-project/lldb/trunk@256945
Diffstat (limited to 'packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py')
-rw-r--r-- | packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py b/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py new file mode 100644 index 0000000..795a8c6 --- /dev/null +++ b/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py @@ -0,0 +1,187 @@ + +from __future__ import print_function + + + +import re +import select +import threading +import traceback +import codecs + +from six.moves import queue + +def _handle_output_packet_string(packet_contents): + if (not packet_contents) or (len(packet_contents) < 1): + return None + elif packet_contents[0] != "O": + return None + elif packet_contents == "OK": + return None + else: + return packet_contents[1:].decode("hex") + +def _dump_queue(the_queue): + while not the_queue.empty(): + print(codecs.encode(the_queue.get(True), "string_escape")) + print("\n") + +class SocketPacketPump(object): + """A threaded packet reader that partitions packets into two streams. + + All incoming $O packet content is accumulated with the current accumulation + state put into the OutputQueue. + + All other incoming packets are placed in the packet queue. + + A select thread can be started and stopped, and runs to place packet + content into the two queues. + """ + + _GDB_REMOTE_PACKET_REGEX = re.compile(r'^\$([^\#]*)#[0-9a-fA-F]{2}') + + def __init__(self, pump_socket, logger=None): + if not pump_socket: + raise Exception("pump_socket cannot be None") + + self._output_queue = queue.Queue() + self._packet_queue = queue.Queue() + self._thread = None + self._stop_thread = False + self._socket = pump_socket + self._logger = logger + self._receive_buffer = "" + self._accumulated_output = "" + + def __enter__(self): + """Support the python 'with' statement. + + Start the pump thread.""" + self.start_pump_thread() + return self + + def __exit__(self, exit_type, value, the_traceback): + """Support the python 'with' statement. + + Shut down the pump thread.""" + self.stop_pump_thread() + + # Warn if there is any content left in any of the queues. + # That would represent unmatched packets. + if not self.output_queue().empty(): + print("warning: output queue entries still exist:") + _dump_queue(self.output_queue()) + print("from here:") + traceback.print_stack() + + if not self.packet_queue().empty(): + print("warning: packet queue entries still exist:") + _dump_queue(self.packet_queue()) + print("from here:") + traceback.print_stack() + + def start_pump_thread(self): + if self._thread: + raise Exception("pump thread is already running") + self._stop_thread = False + self._thread = threading.Thread(target=self._run_method) + self._thread.start() + + def stop_pump_thread(self): + self._stop_thread = True + if self._thread: + self._thread.join() + + def output_queue(self): + return self._output_queue + + def packet_queue(self): + return self._packet_queue + + def _process_new_bytes(self, new_bytes): + if not new_bytes: + return + if len(new_bytes) < 1: + return + + # Add new bytes to our accumulated unprocessed packet bytes. + self._receive_buffer += new_bytes + + # Parse fully-formed packets into individual packets. + has_more = len(self._receive_buffer) > 0 + while has_more: + if len(self._receive_buffer) <= 0: + has_more = False + # handle '+' ack + elif self._receive_buffer[0] == "+": + self._packet_queue.put("+") + self._receive_buffer = self._receive_buffer[1:] + if self._logger: + self._logger.debug( + "parsed packet from stub: +\n" + + "new receive_buffer: {}".format( + self._receive_buffer)) + else: + packet_match = self._GDB_REMOTE_PACKET_REGEX.match( + self._receive_buffer) + if packet_match: + # Our receive buffer matches a packet at the + # start of the receive buffer. + new_output_content = _handle_output_packet_string( + packet_match.group(1)) + if new_output_content: + # This was an $O packet with new content. + self._accumulated_output += new_output_content + self._output_queue.put(self._accumulated_output) + else: + # Any packet other than $O. + self._packet_queue.put(packet_match.group(0)) + + # Remove the parsed packet from the receive + # buffer. + self._receive_buffer = self._receive_buffer[ + len(packet_match.group(0)):] + if self._logger: + self._logger.debug( + "parsed packet from stub: " + + packet_match.group(0)) + self._logger.debug( + "new receive_buffer: " + + self._receive_buffer) + else: + # We don't have enough in the receive bufferto make a full + # packet. Stop trying until we read more. + has_more = False + + def _run_method(self): + self._receive_buffer = "" + self._accumulated_output = "" + + if self._logger: + self._logger.info("socket pump starting") + + # Keep looping around until we're asked to stop the thread. + while not self._stop_thread: + can_read, _, _ = select.select([self._socket], [], [], 0) + if can_read and self._socket in can_read: + try: + new_bytes = self._socket.recv(4096) + if self._logger and new_bytes and len(new_bytes) > 0: + self._logger.debug( + "pump received bytes: {}".format(new_bytes)) + except: + # Likely a closed socket. Done with the pump thread. + if self._logger: + self._logger.debug( + "socket read failed, stopping pump read thread") + break + self._process_new_bytes(new_bytes) + + if self._logger: + self._logger.info("socket pump exiting") + + def get_accumulated_output(self): + return self._accumulated_output + + def get_receive_buffer(self): + return self._receive_buffer |