diff options
Diffstat (limited to 'packages/Python/lldbsuite/test/python_api/event/TestEvents.py')
-rw-r--r-- | packages/Python/lldbsuite/test/python_api/event/TestEvents.py | 288 |
1 files changed, 288 insertions, 0 deletions
diff --git a/packages/Python/lldbsuite/test/python_api/event/TestEvents.py b/packages/Python/lldbsuite/test/python_api/event/TestEvents.py new file mode 100644 index 0000000..0a1a0df --- /dev/null +++ b/packages/Python/lldbsuite/test/python_api/event/TestEvents.py @@ -0,0 +1,288 @@ +""" +Test lldb Python event APIs. +""" + +from __future__ import print_function + + + +import os, time +import re +import lldb +import lldbsuite.test.lldbutil as lldbutil +from lldbsuite.test.lldbtest import * + +@skipIfDarwin # llvm.org/pr25924, sometimes generating SIGSEGV +class EventAPITestCase(TestBase): + + mydir = TestBase.compute_mydir(__file__) + + def setUp(self): + # Call super's setUp(). + TestBase.setUp(self) + # Find the line number to of function 'c'. + self.line = line_number('main.c', '// Find the line number of function "c" here.') + + @add_test_categories(['pyapi']) + @expectedFailureLinux("llvm.org/pr23730") # Flaky, fails ~1/10 cases + @skipIfLinux # skip to avoid crashes + def test_listen_for_and_print_event(self): + """Exercise SBEvent API.""" + self.build() + exe = os.path.join(os.getcwd(), "a.out") + + self.dbg.SetAsync(True) + + # Create a target by the debugger. + target = self.dbg.CreateTarget(exe) + self.assertTrue(target, VALID_TARGET) + + # Now create a breakpoint on main.c by name 'c'. + breakpoint = target.BreakpointCreateByName('c', 'a.out') + + listener = lldb.SBListener("my listener") + + # Now launch the process, and do not stop at the entry point. + error = lldb.SBError() + process = target.Launch (listener, + None, # argv + None, # envp + None, # stdin_path + None, # stdout_path + None, # stderr_path + None, # working directory + 0, # launch flags + False, # Stop at entry + error) # error + + self.assertTrue(process.GetState() == lldb.eStateStopped, PROCESS_STOPPED) + + # Create an empty event object. + event = lldb.SBEvent() + + traceOn = self.TraceOn() + if traceOn: + lldbutil.print_stacktraces(process) + + # Create MyListeningThread class to wait for any kind of event. + import threading + class MyListeningThread(threading.Thread): + def run(self): + count = 0 + # Let's only try at most 4 times to retrieve any kind of event. + # After that, the thread exits. + while not count > 3: + if traceOn: + print("Try wait for event...") + if listener.WaitForEvent(5, event): + if traceOn: + desc = lldbutil.get_description(event) + print("Event description:", desc) + print("Event data flavor:", event.GetDataFlavor()) + print("Process state:", lldbutil.state_type_to_str(process.GetState())) + print() + else: + if traceOn: + print("timeout occurred waiting for event...") + count = count + 1 + return + + # Let's start the listening thread to retrieve the events. + my_thread = MyListeningThread() + my_thread.start() + + # Use Python API to continue the process. The listening thread should be + # able to receive the state changed events. + process.Continue() + + # Use Python API to kill the process. The listening thread should be + # able to receive the state changed event, too. + process.Kill() + + # Wait until the 'MyListeningThread' terminates. + my_thread.join() + + @add_test_categories(['pyapi']) + @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases + def test_wait_for_event(self): + """Exercise SBListener.WaitForEvent() API.""" + self.build() + exe = os.path.join(os.getcwd(), "a.out") + + self.dbg.SetAsync(True) + + # Create a target by the debugger. + target = self.dbg.CreateTarget(exe) + self.assertTrue(target, VALID_TARGET) + + # Now create a breakpoint on main.c by name 'c'. + breakpoint = target.BreakpointCreateByName('c', 'a.out') + #print("breakpoint:", breakpoint) + self.assertTrue(breakpoint and + breakpoint.GetNumLocations() == 1, + VALID_BREAKPOINT) + + # Get the debugger listener. + listener = self.dbg.GetListener() + + # Now launch the process, and do not stop at entry point. + error = lldb.SBError() + process = target.Launch (listener, + None, # argv + None, # envp + None, # stdin_path + None, # stdout_path + None, # stderr_path + None, # working directory + 0, # launch flags + False, # Stop at entry + error) # error + self.assertTrue(error.Success() and process, PROCESS_IS_VALID) + + # Create an empty event object. + event = lldb.SBEvent() + self.assertFalse(event, "Event should not be valid initially") + + # Create MyListeningThread to wait for any kind of event. + import threading + class MyListeningThread(threading.Thread): + def run(self): + count = 0 + # Let's only try at most 3 times to retrieve any kind of event. + while not count > 3: + if listener.WaitForEvent(5, event): + #print("Got a valid event:", event) + #print("Event data flavor:", event.GetDataFlavor()) + #print("Event type:", lldbutil.state_type_to_str(event.GetType())) + return + count = count + 1 + print("Timeout: listener.WaitForEvent") + + return + + # Use Python API to kill the process. The listening thread should be + # able to receive a state changed event. + process.Kill() + + # Let's start the listening thread to retrieve the event. + my_thread = MyListeningThread() + my_thread.start() + + # Wait until the 'MyListeningThread' terminates. + my_thread.join() + + self.assertTrue(event, + "My listening thread successfully received an event") + + @skipIfFreeBSD # llvm.org/pr21325 + @add_test_categories(['pyapi']) + @expectedFlakeyLinux("llvm.org/pr23617") # Flaky, fails ~1/10 cases + @expectedFailureWindows("llvm.org/pr24778") + def test_add_listener_to_broadcaster(self): + """Exercise some SBBroadcaster APIs.""" + self.build() + exe = os.path.join(os.getcwd(), "a.out") + + self.dbg.SetAsync(True) + + # Create a target by the debugger. + target = self.dbg.CreateTarget(exe) + self.assertTrue(target, VALID_TARGET) + + # Now create a breakpoint on main.c by name 'c'. + breakpoint = target.BreakpointCreateByName('c', 'a.out') + #print("breakpoint:", breakpoint) + self.assertTrue(breakpoint and + breakpoint.GetNumLocations() == 1, + VALID_BREAKPOINT) + + listener = lldb.SBListener("my listener") + + # Now launch the process, and do not stop at the entry point. + error = lldb.SBError() + process = target.Launch (listener, + None, # argv + None, # envp + None, # stdin_path + None, # stdout_path + None, # stderr_path + None, # working directory + 0, # launch flags + False, # Stop at entry + error) # error + + # Create an empty event object. + event = lldb.SBEvent() + self.assertFalse(event, "Event should not be valid initially") + + + # The finite state machine for our custom listening thread, with an + # initial state of None, which means no event has been received. + # It changes to 'connected' after 'connected' event is received (for remote platforms) + # It changes to 'running' after 'running' event is received (should happen only if the + # currentstate is either 'None' or 'connected') + # It changes to 'stopped' if a 'stopped' event is received (should happen only if the + # current state is 'running'.) + self.state = None + + # Create MyListeningThread to wait for state changed events. + # By design, a "running" event is expected following by a "stopped" event. + import threading + class MyListeningThread(threading.Thread): + def run(self): + #print("Running MyListeningThread:", self) + + # Regular expression pattern for the event description. + pattern = re.compile("data = {.*, state = (.*)}$") + + # Let's only try at most 6 times to retrieve our events. + count = 0 + while True: + if listener.WaitForEvent(5, event): + desc = lldbutil.get_description(event) + #print("Event description:", desc) + match = pattern.search(desc) + if not match: + break; + if match.group(1) == 'connected': + # When debugging remote targets with lldb-server, we + # first get the 'connected' event. + self.context.assertTrue(self.context.state == None) + self.context.state = 'connected' + continue + elif match.group(1) == 'running': + self.context.assertTrue(self.context.state == None or self.context.state == 'connected') + self.context.state = 'running' + continue + elif match.group(1) == 'stopped': + self.context.assertTrue(self.context.state == 'running') + # Whoopee, both events have been received! + self.context.state = 'stopped' + break + else: + break + print("Timeout: listener.WaitForEvent") + count = count + 1 + if count > 6: + break + + return + + # Use Python API to continue the process. The listening thread should be + # able to receive the state changed events. + process.Continue() + + # Start the listening thread to receive the "running" followed by the + # "stopped" events. + my_thread = MyListeningThread() + # Supply the enclosing context so that our listening thread can access + # the 'state' variable. + my_thread.context = self + my_thread.start() + + # Wait until the 'MyListeningThread' terminates. + my_thread.join() + + # The final judgement. :-) + self.assertTrue(self.state == 'stopped', + "Both expected state changed events received") |