diff options
Diffstat (limited to 'packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py')
-rw-r--r-- | packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py | 843 |
1 files changed, 843 insertions, 0 deletions
diff --git a/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py b/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py new file mode 100644 index 0000000..c0ea841 --- /dev/null +++ b/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py @@ -0,0 +1,843 @@ +"""Module for supporting unit testing of the lldb-server debug monitor exe. +""" + +from __future__ import print_function + + + +import os +import os.path +import platform +import re +import six +import socket_packet_pump +import subprocess +import time +from lldbsuite.test.lldbtest import * + +from six.moves import queue + +def _get_debug_monitor_from_lldb(lldb_exe, debug_monitor_basename): + """Return the debug monitor exe path given the lldb exe path. + + This method attempts to construct a valid debug monitor exe name + from a given lldb exe name. It will return None if the synthesized + debug monitor name is not found to exist. + + The debug monitor exe path is synthesized by taking the directory + of the lldb exe, and replacing the portion of the base name that + matches "lldb" (case insensitive) and replacing with the value of + debug_monitor_basename. + + Args: + lldb_exe: the path to an lldb executable. + + debug_monitor_basename: the base name portion of the debug monitor + that will replace 'lldb'. + + Returns: + A path to the debug monitor exe if it is found to exist; otherwise, + returns None. + + """ + if not lldb_exe: + return None + + exe_dir = os.path.dirname(lldb_exe) + exe_base = os.path.basename(lldb_exe) + + # we'll rebuild the filename by replacing lldb with + # the debug monitor basename, keeping any prefix or suffix in place. + regex = re.compile(r"lldb", re.IGNORECASE) + new_base = regex.sub(debug_monitor_basename, exe_base) + + debug_monitor_exe = os.path.join(exe_dir, new_base) + if os.path.exists(debug_monitor_exe): + return debug_monitor_exe + + new_base = regex.sub( 'LLDB.framework/Versions/A/Resources/' + debug_monitor_basename, exe_base) + debug_monitor_exe = os.path.join(exe_dir, new_base) + if os.path.exists(debug_monitor_exe): + return debug_monitor_exe + + return None + + +def get_lldb_server_exe(): + """Return the lldb-server exe path. + + Returns: + A path to the lldb-server exe if it is found to exist; otherwise, + returns None. + """ + if "LLDB_DEBUGSERVER_PATH" in os.environ: + return os.environ["LLDB_DEBUGSERVER_PATH"] + + return _get_debug_monitor_from_lldb(lldbtest_config.lldbExec, "lldb-server") + +def get_debugserver_exe(): + """Return the debugserver exe path. + + Returns: + A path to the debugserver exe if it is found to exist; otherwise, + returns None. + """ + if "LLDB_DEBUGSERVER_PATH" in os.environ: + return os.environ["LLDB_DEBUGSERVER_PATH"] + + return _get_debug_monitor_from_lldb(lldbtest_config.lldbExec, "debugserver") + +_LOG_LINE_REGEX = re.compile(r'^(lldb-server|debugserver)\s+<\s*(\d+)>' + + '\s+(read|send)\s+packet:\s+(.+)$') + + +def _is_packet_lldb_gdbserver_input(packet_type, llgs_input_is_read): + """Return whether a given packet is input for lldb-gdbserver. + + Args: + packet_type: a string indicating 'send' or 'receive', from a + gdbremote packet protocol log. + + llgs_input_is_read: true if lldb-gdbserver input (content sent to + lldb-gdbserver) is listed as 'read' or 'send' in the packet + log entry. + + Returns: + True if the packet should be considered input for lldb-gdbserver; False + otherwise. + """ + if packet_type == 'read': + # when llgs is the read side, then a read packet is meant for + # input to llgs (when captured from the llgs/debugserver exe). + return llgs_input_is_read + elif packet_type == 'send': + # when llgs is the send side, then a send packet is meant to + # be input to llgs (when captured from the lldb exe). + return not llgs_input_is_read + else: + # don't understand what type of packet this is + raise "Unknown packet type: {}".format(packet_type) + + +def handle_O_packet(context, packet_contents, logger): + """Handle O packets.""" + if (not packet_contents) or (len(packet_contents) < 1): + return False + elif packet_contents[0] != "O": + return False + elif packet_contents == "OK": + return False + + new_text = gdbremote_hex_decode_string(packet_contents[1:]) + context["O_content"] += new_text + context["O_count"] += 1 + + if logger: + logger.debug("text: new \"{}\", cumulative: \"{}\"".format(new_text, context["O_content"])) + + return True + +_STRIP_CHECKSUM_REGEX = re.compile(r'#[0-9a-fA-F]{2}$') +_STRIP_COMMAND_PREFIX_REGEX = re.compile(r"^\$") +_STRIP_COMMAND_PREFIX_M_REGEX = re.compile(r"^\$m") + + +def assert_packets_equal(asserter, actual_packet, expected_packet): + # strip off the checksum digits of the packet. When we're in + # no-ack mode, the # checksum is ignored, and should not be cause + # for a mismatched packet. + actual_stripped = _STRIP_CHECKSUM_REGEX.sub('', actual_packet) + expected_stripped = _STRIP_CHECKSUM_REGEX.sub('', expected_packet) + asserter.assertEqual(actual_stripped, expected_stripped) + +def expect_lldb_gdbserver_replay( + asserter, + sock, + test_sequence, + timeout_seconds, + logger=None): + """Replay socket communication with lldb-gdbserver and verify responses. + + Args: + asserter: the object providing assertEqual(first, second, msg=None), e.g. TestCase instance. + + sock: the TCP socket connected to the lldb-gdbserver exe. + + test_sequence: a GdbRemoteTestSequence instance that describes + the messages sent to the gdb remote and the responses + expected from it. + + timeout_seconds: any response taking more than this number of + seconds will cause an exception to be raised. + + logger: a Python logger instance. + + Returns: + The context dictionary from running the given gdbremote + protocol sequence. This will contain any of the capture + elements specified to any GdbRemoteEntry instances in + test_sequence. + + The context will also contain an entry, context["O_content"] + which contains the text from the inferior received via $O + packets. $O packets should not attempt to be matched + directly since they are not entirely deterministic as to + how many arrive and how much text is in each one. + + context["O_count"] will contain an integer of the number of + O packets received. + """ + + # Ensure we have some work to do. + if len(test_sequence.entries) < 1: + return {} + + context = {"O_count":0, "O_content":""} + with socket_packet_pump.SocketPacketPump(sock, logger) as pump: + # Grab the first sequence entry. + sequence_entry = test_sequence.entries.pop(0) + + # While we have an active sequence entry, send messages + # destined for the stub and collect/match/process responses + # expected from the stub. + while sequence_entry: + if sequence_entry.is_send_to_remote(): + # This is an entry to send to the remote debug monitor. + send_packet = sequence_entry.get_send_packet() + if logger: + if len(send_packet) == 1 and send_packet[0] == chr(3): + packet_desc = "^C" + else: + packet_desc = send_packet + logger.info("sending packet to remote: {}".format(packet_desc)) + sock.sendall(send_packet) + else: + # This is an entry expecting to receive content from the remote debug monitor. + + # We'll pull from (and wait on) the queue appropriate for the type of matcher. + # We keep separate queues for process output (coming from non-deterministic + # $O packet division) and for all other packets. + if sequence_entry.is_output_matcher(): + try: + # Grab next entry from the output queue. + content = pump.output_queue().get(True, timeout_seconds) + except queue.Empty: + if logger: + logger.warning("timeout waiting for stub output (accumulated output:{})".format(pump.get_accumulated_output())) + raise Exception("timed out while waiting for output match (accumulated output: {})".format(pump.get_accumulated_output())) + else: + try: + content = pump.packet_queue().get(True, timeout_seconds) + except queue.Empty: + if logger: + logger.warning("timeout waiting for packet match (receive buffer: {})".format(pump.get_receive_buffer())) + raise Exception("timed out while waiting for packet match (receive buffer: {})".format(pump.get_receive_buffer())) + + # Give the sequence entry the opportunity to match the content. + # Output matchers might match or pass after more output accumulates. + # Other packet types generally must match. + asserter.assertIsNotNone(content) + context = sequence_entry.assert_match(asserter, content, context=context) + + # Move on to next sequence entry as needed. Some sequence entries support executing multiple + # times in different states (for looping over query/response packets). + if sequence_entry.is_consumed(): + if len(test_sequence.entries) > 0: + sequence_entry = test_sequence.entries.pop(0) + else: + sequence_entry = None + + # Fill in the O_content entries. + context["O_count"] = 1 + context["O_content"] = pump.get_accumulated_output() + + return context + +def gdbremote_hex_encode_string(str): + output = '' + for c in str: + output += '{0:02x}'.format(ord(c)) + return output + +def gdbremote_hex_decode_string(str): + return str.decode("hex") + +def gdbremote_packet_encode_string(str): + checksum = 0 + for c in str: + checksum += ord(c) + return '$' + str + '#{0:02x}'.format(checksum % 256) + +def build_gdbremote_A_packet(args_list): + """Given a list of args, create a properly-formed $A packet containing each arg. + """ + payload = "A" + + # build the arg content + arg_index = 0 + for arg in args_list: + # Comma-separate the args. + if arg_index > 0: + payload += ',' + + # Hex-encode the arg. + hex_arg = gdbremote_hex_encode_string(arg) + + # Build the A entry. + payload += "{},{},{}".format(len(hex_arg), arg_index, hex_arg) + + # Next arg index, please. + arg_index += 1 + + # return the packetized payload + return gdbremote_packet_encode_string(payload) + + +def parse_reg_info_response(response_packet): + if not response_packet: + raise Exception("response_packet cannot be None") + + # Strip off prefix $ and suffix #xx if present. + response_packet = _STRIP_COMMAND_PREFIX_REGEX.sub("", response_packet) + response_packet = _STRIP_CHECKSUM_REGEX.sub("", response_packet) + + # Build keyval pairs + values = {} + for kv in response_packet.split(";"): + if len(kv) < 1: + continue + (key, val) = kv.split(':') + values[key] = val + + return values + + +def parse_threadinfo_response(response_packet): + if not response_packet: + raise Exception("response_packet cannot be None") + + # Strip off prefix $ and suffix #xx if present. + response_packet = _STRIP_COMMAND_PREFIX_M_REGEX.sub("", response_packet) + response_packet = _STRIP_CHECKSUM_REGEX.sub("", response_packet) + + # Return list of thread ids + return [int(thread_id_hex,16) for thread_id_hex in response_packet.split(",") if len(thread_id_hex) > 0] + +def unpack_endian_binary_string(endian, value_string): + """Unpack a gdb-remote binary (post-unescaped, i.e. not escaped) response to an unsigned int given endianness of the inferior.""" + if not endian: + raise Exception("endian cannot be None") + if not value_string or len(value_string) < 1: + raise Exception("value_string cannot be None or empty") + + if endian == 'little': + value = 0 + i = 0 + while len(value_string) > 0: + value += (ord(value_string[0]) << i) + value_string = value_string[1:] + i += 8 + return value + elif endian == 'big': + value = 0 + while len(value_string) > 0: + value = (value << 8) + ord(value_string[0]) + value_string = value_string[1:] + return value + else: + # pdp is valid but need to add parse code once needed. + raise Exception("unsupported endian:{}".format(endian)) + +def unpack_register_hex_unsigned(endian, value_string): + """Unpack a gdb-remote $p-style response to an unsigned int given endianness of inferior.""" + if not endian: + raise Exception("endian cannot be None") + if not value_string or len(value_string) < 1: + raise Exception("value_string cannot be None or empty") + + if endian == 'little': + value = 0 + i = 0 + while len(value_string) > 0: + value += (int(value_string[0:2], 16) << i) + value_string = value_string[2:] + i += 8 + return value + elif endian == 'big': + return int(value_string, 16) + else: + # pdp is valid but need to add parse code once needed. + raise Exception("unsupported endian:{}".format(endian)) + +def pack_register_hex(endian, value, byte_size=None): + """Unpack a gdb-remote $p-style response to an unsigned int given endianness of inferior.""" + if not endian: + raise Exception("endian cannot be None") + + if endian == 'little': + # Create the litt-endian return value. + retval = "" + while value != 0: + retval = retval + "{:02x}".format(value & 0xff) + value = value >> 8 + if byte_size: + # Add zero-fill to the right/end (MSB side) of the value. + retval += "00" * (byte_size - len(retval)/2) + return retval + + elif endian == 'big': + retval = value.encode("hex") + if byte_size: + # Add zero-fill to the left/front (MSB side) of the value. + retval = ("00" * (byte_size - len(retval)/2)) + retval + return retval + + else: + # pdp is valid but need to add parse code once needed. + raise Exception("unsupported endian:{}".format(endian)) + +class GdbRemoteEntryBase(object): + def is_output_matcher(self): + return False + +class GdbRemoteEntry(GdbRemoteEntryBase): + + def __init__(self, is_send_to_remote=True, exact_payload=None, regex=None, capture=None, expect_captures=None): + """Create an entry representing one piece of the I/O to/from a gdb remote debug monitor. + + Args: + + is_send_to_remote: True if this entry is a message to be + sent to the gdbremote debug monitor; False if this + entry represents text to be matched against the reply + from the gdbremote debug monitor. + + exact_payload: if not None, then this packet is an exact + send (when sending to the remote) or an exact match of + the response from the gdbremote. The checksums are + ignored on exact match requests since negotiation of + no-ack makes the checksum content essentially + undefined. + + regex: currently only valid for receives from gdbremote. + When specified (and only if exact_payload is None), + indicates the gdbremote response must match the given + regex. Match groups in the regex can be used for two + different purposes: saving the match (see capture + arg), or validating that a match group matches a + previously established value (see expect_captures). It + is perfectly valid to have just a regex arg and to + specify neither capture or expect_captures args. This + arg only makes sense if exact_payload is not + specified. + + capture: if specified, is a dictionary of regex match + group indices (should start with 1) to variable names + that will store the capture group indicated by the + index. For example, {1:"thread_id"} will store capture + group 1's content in the context dictionary where + "thread_id" is the key and the match group value is + the value. The value stored off can be used later in a + expect_captures expression. This arg only makes sense + when regex is specified. + + expect_captures: if specified, is a dictionary of regex + match group indices (should start with 1) to variable + names, where the match group should match the value + existing in the context at the given variable name. + For example, {2:"thread_id"} indicates that the second + match group must match the value stored under the + context's previously stored "thread_id" key. This arg + only makes sense when regex is specified. + """ + self._is_send_to_remote = is_send_to_remote + self.exact_payload = exact_payload + self.regex = regex + self.capture = capture + self.expect_captures = expect_captures + + def is_send_to_remote(self): + return self._is_send_to_remote + + def is_consumed(self): + # For now, all packets are consumed after first use. + return True + + def get_send_packet(self): + if not self.is_send_to_remote(): + raise Exception("get_send_packet() called on GdbRemoteEntry that is not a send-to-remote packet") + if not self.exact_payload: + raise Exception("get_send_packet() called on GdbRemoteEntry but it doesn't have an exact payload") + return self.exact_payload + + def _assert_exact_payload_match(self, asserter, actual_packet): + assert_packets_equal(asserter, actual_packet, self.exact_payload) + return None + + def _assert_regex_match(self, asserter, actual_packet, context): + # Ensure the actual packet matches from the start of the actual packet. + match = self.regex.match(actual_packet) + if not match: + asserter.fail("regex '{}' failed to match against content '{}'".format(self.regex.pattern, actual_packet)) + + if self.capture: + # Handle captures. + for group_index, var_name in list(self.capture.items()): + capture_text = match.group(group_index) + # It is okay for capture text to be None - which it will be if it is a group that can match nothing. + # The user must be okay with it since the regex itself matched above. + context[var_name] = capture_text + + if self.expect_captures: + # Handle comparing matched groups to context dictionary entries. + for group_index, var_name in list(self.expect_captures.items()): + capture_text = match.group(group_index) + if not capture_text: + raise Exception("No content to expect for group index {}".format(group_index)) + asserter.assertEqual(capture_text, context[var_name]) + + return context + + def assert_match(self, asserter, actual_packet, context=None): + # This only makes sense for matching lines coming from the + # remote debug monitor. + if self.is_send_to_remote(): + raise Exception("Attempted to match a packet being sent to the remote debug monitor, doesn't make sense.") + + # Create a new context if needed. + if not context: + context = {} + + # If this is an exact payload, ensure they match exactly, + # ignoring the packet checksum which is optional for no-ack + # mode. + if self.exact_payload: + self._assert_exact_payload_match(asserter, actual_packet) + return context + elif self.regex: + return self._assert_regex_match(asserter, actual_packet, context) + else: + raise Exception("Don't know how to match a remote-sent packet when exact_payload isn't specified.") + +class MultiResponseGdbRemoteEntry(GdbRemoteEntryBase): + """Represents a query/response style packet. + + Assumes the first item is sent to the gdb remote. + An end sequence regex indicates the end of the query/response + packet sequence. All responses up through (but not including) the + end response are stored in a context variable. + + Settings accepted from params: + + next_query or query: required. The typical query packet without the $ prefix or #xx suffix. + If there is a special first packet to start the iteration query, see the + first_query key. + + first_query: optional. If the first query requires a special query command, specify + it with this key. Do not specify the $ prefix or #xx suffix. + + append_iteration_suffix: defaults to False. Specify True if the 0-based iteration + index should be appended as a suffix to the command. e.g. qRegisterInfo with + this key set true will generate query packets of qRegisterInfo0, qRegisterInfo1, + etc. + + end_regex: required. Specifies a compiled regex object that will match the full text + of any response that signals an end to the iteration. It must include the + initial $ and ending #xx and must match the whole packet. + + save_key: required. Specifies the key within the context where an array will be stored. + Each packet received from the gdb remote that does not match the end_regex will get + appended to the array stored within the context at that key. + + runaway_response_count: optional. Defaults to 10000. If this many responses are retrieved, + assume there is something wrong with either the response collection or the ending + detection regex and throw an exception. + """ + def __init__(self, params): + self._next_query = params.get("next_query", params.get("query")) + if not self._next_query: + raise "either next_query or query key must be specified for MultiResponseGdbRemoteEntry" + + self._first_query = params.get("first_query", self._next_query) + self._append_iteration_suffix = params.get("append_iteration_suffix", False) + self._iteration = 0 + self._end_regex = params["end_regex"] + self._save_key = params["save_key"] + self._runaway_response_count = params.get("runaway_response_count", 10000) + self._is_send_to_remote = True + self._end_matched = False + + def is_send_to_remote(self): + return self._is_send_to_remote + + def get_send_packet(self): + if not self.is_send_to_remote(): + raise Exception("get_send_packet() called on MultiResponseGdbRemoteEntry that is not in the send state") + if self._end_matched: + raise Exception("get_send_packet() called on MultiResponseGdbRemoteEntry but end of query/response sequence has already been seen.") + + # Choose the first or next query for the base payload. + if self._iteration == 0 and self._first_query: + payload = self._first_query + else: + payload = self._next_query + + # Append the suffix as needed. + if self._append_iteration_suffix: + payload += "%x" % self._iteration + + # Keep track of the iteration. + self._iteration += 1 + + # Now that we've given the query packet, flip the mode to receive/match. + self._is_send_to_remote = False + + # Return the result, converted to packet form. + return gdbremote_packet_encode_string(payload) + + def is_consumed(self): + return self._end_matched + + def assert_match(self, asserter, actual_packet, context=None): + # This only makes sense for matching lines coming from the remote debug monitor. + if self.is_send_to_remote(): + raise Exception("assert_match() called on MultiResponseGdbRemoteEntry but state is set to send a query packet.") + + if self._end_matched: + raise Exception("assert_match() called on MultiResponseGdbRemoteEntry but end of query/response sequence has already been seen.") + + # Set up a context as needed. + if not context: + context = {} + + # Check if the packet matches the end condition. + match = self._end_regex.match(actual_packet) + if match: + # We're done iterating. + self._end_matched = True + return context + + # Not done iterating - save the packet. + context[self._save_key] = context.get(self._save_key, []) + context[self._save_key].append(actual_packet) + + # Check for a runaway response cycle. + if len(context[self._save_key]) >= self._runaway_response_count: + raise Exception("runaway query/response cycle detected: %d responses captured so far. Last response: %s" % + (len(context[self._save_key]), context[self._save_key][-1])) + + # Flip the mode to send for generating the query. + self._is_send_to_remote = True + return context + +class MatchRemoteOutputEntry(GdbRemoteEntryBase): + """Waits for output from the debug monitor to match a regex or time out. + + This entry type tries to match each time new gdb remote output is accumulated + using a provided regex. If the output does not match the regex within the + given timeframe, the command fails the playback session. If the regex does + match, any capture fields are recorded in the context. + + Settings accepted from params: + + regex: required. Specifies a compiled regex object that must either succeed + with re.match or re.search (see regex_mode below) within the given timeout + (see timeout_seconds below) or cause the playback to fail. + + regex_mode: optional. Available values: "match" or "search". If "match", the entire + stub output as collected so far must match the regex. If search, then the regex + must match starting somewhere within the output text accumulated thus far. + Default: "match" (i.e. the regex must match the entirety of the accumulated output + buffer, so unexpected text will generally fail the match). + + capture: optional. If specified, is a dictionary of regex match group indices (should start + with 1) to variable names that will store the capture group indicated by the + index. For example, {1:"thread_id"} will store capture group 1's content in the + context dictionary where "thread_id" is the key and the match group value is + the value. The value stored off can be used later in a expect_captures expression. + This arg only makes sense when regex is specified. + """ + def __init__(self, regex=None, regex_mode="match", capture=None): + self._regex = regex + self._regex_mode = regex_mode + self._capture = capture + self._matched = False + + if not self._regex: + raise Exception("regex cannot be None") + + if not self._regex_mode in ["match", "search"]: + raise Exception("unsupported regex mode \"{}\": must be \"match\" or \"search\"".format(self._regex_mode)) + + def is_output_matcher(self): + return True + + def is_send_to_remote(self): + # This is always a "wait for remote" command. + return False + + def is_consumed(self): + return self._matched + + def assert_match(self, asserter, accumulated_output, context): + # Validate args. + if not accumulated_output: + raise Exception("accumulated_output cannot be none") + if not context: + raise Exception("context cannot be none") + + # Validate that we haven't already matched. + if self._matched: + raise Exception("invalid state - already matched, attempting to match again") + + # If we don't have any content yet, we don't match. + if len(accumulated_output) < 1: + return context + + # Check if we match + if self._regex_mode == "match": + match = self._regex.match(accumulated_output) + elif self._regex_mode == "search": + match = self._regex.search(accumulated_output) + else: + raise Exception("Unexpected regex mode: {}".format(self._regex_mode)) + + # If we don't match, wait to try again after next $O content, or time out. + if not match: + # print("re pattern \"{}\" did not match against \"{}\"".format(self._regex.pattern, accumulated_output)) + return context + + # We do match. + self._matched = True + # print("re pattern \"{}\" matched against \"{}\"".format(self._regex.pattern, accumulated_output)) + + # Collect up any captures into the context. + if self._capture: + # Handle captures. + for group_index, var_name in list(self._capture.items()): + capture_text = match.group(group_index) + if not capture_text: + raise Exception("No content for group index {}".format(group_index)) + context[var_name] = capture_text + + return context + + +class GdbRemoteTestSequence(object): + + _LOG_LINE_REGEX = re.compile(r'^.*(read|send)\s+packet:\s+(.+)$') + + def __init__(self, logger): + self.entries = [] + self.logger = logger + + def add_log_lines(self, log_lines, remote_input_is_read): + for line in log_lines: + if type(line) == str: + # Handle log line import + # if self.logger: + # self.logger.debug("processing log line: {}".format(line)) + match = self._LOG_LINE_REGEX.match(line) + if match: + playback_packet = match.group(2) + direction = match.group(1) + if _is_packet_lldb_gdbserver_input(direction, remote_input_is_read): + # Handle as something to send to the remote debug monitor. + # if self.logger: + # self.logger.info("processed packet to send to remote: {}".format(playback_packet)) + self.entries.append(GdbRemoteEntry(is_send_to_remote=True, exact_payload=playback_packet)) + else: + # Log line represents content to be expected from the remote debug monitor. + # if self.logger: + # self.logger.info("receiving packet from llgs, should match: {}".format(playback_packet)) + self.entries.append(GdbRemoteEntry(is_send_to_remote=False,exact_payload=playback_packet)) + else: + raise Exception("failed to interpret log line: {}".format(line)) + elif type(line) == dict: + entry_type = line.get("type", "regex_capture") + if entry_type == "regex_capture": + # Handle more explicit control over details via dictionary. + direction = line.get("direction", None) + regex = line.get("regex", None) + capture = line.get("capture", None) + expect_captures = line.get("expect_captures", None) + + # Compile the regex. + if regex and (type(regex) == str): + regex = re.compile(regex) + + if _is_packet_lldb_gdbserver_input(direction, remote_input_is_read): + # Handle as something to send to the remote debug monitor. + # if self.logger: + # self.logger.info("processed dict sequence to send to remote") + self.entries.append(GdbRemoteEntry(is_send_to_remote=True, regex=regex, capture=capture, expect_captures=expect_captures)) + else: + # Log line represents content to be expected from the remote debug monitor. + # if self.logger: + # self.logger.info("processed dict sequence to match receiving from remote") + self.entries.append(GdbRemoteEntry(is_send_to_remote=False, regex=regex, capture=capture, expect_captures=expect_captures)) + elif entry_type == "multi_response": + self.entries.append(MultiResponseGdbRemoteEntry(line)) + elif entry_type == "output_match": + + regex = line.get("regex", None) + # Compile the regex. + if regex and (type(regex) == str): + regex = re.compile(regex) + + regex_mode = line.get("regex_mode", "match") + capture = line.get("capture", None) + self.entries.append(MatchRemoteOutputEntry(regex=regex, regex_mode=regex_mode, capture=capture)) + else: + raise Exception("unknown entry type \"%s\"" % entry_type) + +def process_is_running(pid, unknown_value=True): + """If possible, validate that the given pid represents a running process on the local system. + + Args: + + pid: an OS-specific representation of a process id. Should be an integral value. + + unknown_value: value used when we cannot determine how to check running local + processes on the OS. + + Returns: + + If we can figure out how to check running process ids on the given OS: + return True if the process is running, or False otherwise. + + If we don't know how to check running process ids on the given OS: + return the value provided by the unknown_value arg. + """ + if not isinstance(pid, six.integer_types): + raise Exception("pid must be an integral type (actual type: %s)" % str(type(pid))) + + process_ids = [] + + if lldb.remote_platform: + # Don't know how to get list of running process IDs on a remote + # platform + return unknown_value + elif platform.system() in ['Darwin', 'Linux', 'FreeBSD', 'NetBSD']: + # Build the list of running process ids + output = subprocess.check_output("ps ax | awk '{ print $1; }'", shell=True) + text_process_ids = output.split('\n')[1:] + # Convert text pids to ints + process_ids = [int(text_pid) for text_pid in text_process_ids if text_pid != ''] + # elif {your_platform_here}: + # fill in process_ids as a list of int type process IDs running on + # the local system. + else: + # Don't know how to get list of running process IDs on this + # OS, so return the "don't know" value. + return unknown_value + + # Check if the pid is in the process_ids + return pid in process_ids + +if __name__ == '__main__': + EXE_PATH = get_lldb_server_exe() + if EXE_PATH: + print("lldb-server path detected: {}".format(EXE_PATH)) + else: + print("lldb-server could not be found") |