1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
|
from __future__ import print_function
import re
import select
import threading
import traceback
import codecs
from six.moves import queue
def _handle_output_packet_string(packet_contents):
if (not packet_contents) or (len(packet_contents) < 1):
return None
elif packet_contents[0] != "O":
return None
elif packet_contents == "OK":
return None
else:
return packet_contents[1:].decode("hex")
def _dump_queue(the_queue):
while not the_queue.empty():
print(codecs.encode(the_queue.get(True), "string_escape"))
print("\n")
class SocketPacketPump(object):
"""A threaded packet reader that partitions packets into two streams.
All incoming $O packet content is accumulated with the current accumulation
state put into the OutputQueue.
All other incoming packets are placed in the packet queue.
A select thread can be started and stopped, and runs to place packet
content into the two queues.
"""
_GDB_REMOTE_PACKET_REGEX = re.compile(r'^\$([^\#]*)#[0-9a-fA-F]{2}')
def __init__(self, pump_socket, logger=None):
if not pump_socket:
raise Exception("pump_socket cannot be None")
self._output_queue = queue.Queue()
self._packet_queue = queue.Queue()
self._thread = None
self._stop_thread = False
self._socket = pump_socket
self._logger = logger
self._receive_buffer = ""
self._accumulated_output = ""
def __enter__(self):
"""Support the python 'with' statement.
Start the pump thread."""
self.start_pump_thread()
return self
def __exit__(self, exit_type, value, the_traceback):
"""Support the python 'with' statement.
Shut down the pump thread."""
self.stop_pump_thread()
# Warn if there is any content left in any of the queues.
# That would represent unmatched packets.
if not self.output_queue().empty():
print("warning: output queue entries still exist:")
_dump_queue(self.output_queue())
print("from here:")
traceback.print_stack()
if not self.packet_queue().empty():
print("warning: packet queue entries still exist:")
_dump_queue(self.packet_queue())
print("from here:")
traceback.print_stack()
def start_pump_thread(self):
if self._thread:
raise Exception("pump thread is already running")
self._stop_thread = False
self._thread = threading.Thread(target=self._run_method)
self._thread.start()
def stop_pump_thread(self):
self._stop_thread = True
if self._thread:
self._thread.join()
def output_queue(self):
return self._output_queue
def packet_queue(self):
return self._packet_queue
def _process_new_bytes(self, new_bytes):
if not new_bytes:
return
if len(new_bytes) < 1:
return
# Add new bytes to our accumulated unprocessed packet bytes.
self._receive_buffer += new_bytes
# Parse fully-formed packets into individual packets.
has_more = len(self._receive_buffer) > 0
while has_more:
if len(self._receive_buffer) <= 0:
has_more = False
# handle '+' ack
elif self._receive_buffer[0] == "+":
self._packet_queue.put("+")
self._receive_buffer = self._receive_buffer[1:]
if self._logger:
self._logger.debug(
"parsed packet from stub: +\n" +
"new receive_buffer: {}".format(
self._receive_buffer))
else:
packet_match = self._GDB_REMOTE_PACKET_REGEX.match(
self._receive_buffer)
if packet_match:
# Our receive buffer matches a packet at the
# start of the receive buffer.
new_output_content = _handle_output_packet_string(
packet_match.group(1))
if new_output_content:
# This was an $O packet with new content.
self._accumulated_output += new_output_content
self._output_queue.put(self._accumulated_output)
else:
# Any packet other than $O.
self._packet_queue.put(packet_match.group(0))
# Remove the parsed packet from the receive
# buffer.
self._receive_buffer = self._receive_buffer[
len(packet_match.group(0)):]
if self._logger:
self._logger.debug(
"parsed packet from stub: " +
packet_match.group(0))
self._logger.debug(
"new receive_buffer: " +
self._receive_buffer)
else:
# We don't have enough in the receive bufferto make a full
# packet. Stop trying until we read more.
has_more = False
def _run_method(self):
self._receive_buffer = ""
self._accumulated_output = ""
if self._logger:
self._logger.info("socket pump starting")
# Keep looping around until we're asked to stop the thread.
while not self._stop_thread:
can_read, _, _ = select.select([self._socket], [], [], 0)
if can_read and self._socket in can_read:
try:
new_bytes = self._socket.recv(4096)
if self._logger and new_bytes and len(new_bytes) > 0:
self._logger.debug(
"pump received bytes: {}".format(new_bytes))
except:
# Likely a closed socket. Done with the pump thread.
if self._logger:
self._logger.debug(
"socket read failed, stopping pump read thread")
break
self._process_new_bytes(new_bytes)
if self._logger:
self._logger.info("socket pump exiting")
def get_accumulated_output(self):
return self._accumulated_output
def get_receive_buffer(self):
return self._receive_buffer
|