blob: 958d6449b516217291416a96d1727435012b5ed3 [file] [log] [blame]
from __future__ import print_function
import re
import select
import threading
import traceback
import codecs
from six.moves import queue
from lldbsuite.support import seven
def _handle_output_packet_string(packet_contents):
if (not packet_contents) or (len(packet_contents) < 1):
return None
elif packet_contents[0] != "O":
return None
elif packet_contents == "OK":
return None
else:
return seven.unhexlify(packet_contents[1:])
def _dump_queue(the_queue):
while not the_queue.empty():
print(codecs.encode(the_queue.get(True), "string_escape"))
print("\n")
class PumpQueues(object):
def __init__(self):
self._output_queue = queue.Queue()
self._packet_queue = queue.Queue()
def output_queue(self):
return self._output_queue
def packet_queue(self):
return self._packet_queue
def verify_queues_empty(self):
# Warn if there is any content left in any of the queues.
# That would represent unmatched packets.
if not self.output_queue().empty():
print("warning: output queue entries still exist:")
_dump_queue(self.output_queue())
print("from here:")
traceback.print_stack()
if not self.packet_queue().empty():
print("warning: packet queue entries still exist:")
_dump_queue(self.packet_queue())
print("from here:")
traceback.print_stack()
class SocketPacketPump(object):
"""A threaded packet reader that partitions packets into two streams.
All incoming $O packet content is accumulated with the current accumulation
state put into the OutputQueue.
All other incoming packets are placed in the packet queue.
A select thread can be started and stopped, and runs to place packet
content into the two queues.
"""
_GDB_REMOTE_PACKET_REGEX = re.compile(r'^\$([^\#]*)#[0-9a-fA-F]{2}')
def __init__(self, pump_socket, pump_queues, logger=None):
if not pump_socket:
raise Exception("pump_socket cannot be None")
self._thread = None
self._stop_thread = False
self._socket = pump_socket
self._logger = logger
self._receive_buffer = ""
self._accumulated_output = ""
self._pump_queues = pump_queues
def __enter__(self):
"""Support the python 'with' statement.
Start the pump thread."""
self.start_pump_thread()
return self
def __exit__(self, exit_type, value, the_traceback):
"""Support the python 'with' statement.
Shut down the pump thread."""
self.stop_pump_thread()
def start_pump_thread(self):
if self._thread:
raise Exception("pump thread is already running")
self._stop_thread = False
self._thread = threading.Thread(target=self._run_method)
self._thread.start()
def stop_pump_thread(self):
self._stop_thread = True
if self._thread:
self._thread.join()
def _process_new_bytes(self, new_bytes):
if not new_bytes:
return
if len(new_bytes) < 1:
return
# Add new bytes to our accumulated unprocessed packet bytes.
self._receive_buffer += new_bytes
# Parse fully-formed packets into individual packets.
has_more = len(self._receive_buffer) > 0
while has_more:
if len(self._receive_buffer) <= 0:
has_more = False
# handle '+' ack
elif self._receive_buffer[0] == "+":
self._pump_queues.packet_queue().put("+")
self._receive_buffer = self._receive_buffer[1:]
if self._logger:
self._logger.debug(
"parsed packet from stub: +\n" +
"new receive_buffer: {}".format(
self._receive_buffer))
else:
packet_match = self._GDB_REMOTE_PACKET_REGEX.match(
self._receive_buffer)
if packet_match:
# Our receive buffer matches a packet at the
# start of the receive buffer.
new_output_content = _handle_output_packet_string(
packet_match.group(1))
if new_output_content:
# This was an $O packet with new content.
self._accumulated_output += new_output_content
self._pump_queues.output_queue().put(self._accumulated_output)
else:
# Any packet other than $O.
self._pump_queues.packet_queue().put(packet_match.group(0))
# Remove the parsed packet from the receive
# buffer.
self._receive_buffer = self._receive_buffer[
len(packet_match.group(0)):]
if self._logger:
self._logger.debug(
"parsed packet from stub: " +
packet_match.group(0))
self._logger.debug(
"new receive_buffer: " +
self._receive_buffer)
else:
# We don't have enough in the receive bufferto make a full
# packet. Stop trying until we read more.
has_more = False
def _run_method(self):
self._receive_buffer = ""
self._accumulated_output = ""
if self._logger:
self._logger.info("socket pump starting")
# Keep looping around until we're asked to stop the thread.
while not self._stop_thread:
can_read, _, _ = select.select([self._socket], [], [], 0)
if can_read and self._socket in can_read:
try:
new_bytes = seven.bitcast_to_string(self._socket.recv(4096))
if self._logger and new_bytes and len(new_bytes) > 0:
self._logger.debug(
"pump received bytes: {}".format(new_bytes))
except:
# Likely a closed socket. Done with the pump thread.
if self._logger:
self._logger.debug(
"socket read failed, stopping pump read thread\n" +
traceback.format_exc(3))
break
self._process_new_bytes(new_bytes)
if self._logger:
self._logger.info("socket pump exiting")
def get_accumulated_output(self):
return self._accumulated_output
def get_receive_buffer(self):
return self._receive_buffer