| # Usage: |
| # ./bin/lldb $LLVM/lldb/test/API/functionalities/interactive_scripted_process/main \ |
| # -o "br set -p 'Break here'" \ |
| # -o "command script import $LLVM/lldb/test/API/functionalities/interactive_scripted_process/interactive_scripted_process.py" \ |
| # -o "create_mux" \ |
| # -o "create_sub" \ |
| # -o "br set -p 'also break here'" -o 'continue' |
| |
| import os, json, struct, signal, tempfile |
| |
| from threading import Thread |
| from typing import Any, Dict |
| |
| import lldb |
| from lldb.plugins.scripted_process import PassthroughScriptedProcess |
| from lldb.plugins.scripted_process import PassthroughScriptedThread |
| |
| |
| class MultiplexedScriptedProcess(PassthroughScriptedProcess): |
| def __init__(self, exe_ctx: lldb.SBExecutionContext, args: lldb.SBStructuredData): |
| super().__init__(exe_ctx, args) |
| self.multiplexer = None |
| if isinstance(self.driving_process, lldb.SBProcess) and self.driving_process: |
| parity = args.GetValueForKey("parity") |
| # TODO: Change to Walrus operator (:=) with oneline if assignment |
| # Requires python 3.8 |
| val = parity.GetUnsignedIntegerValue() |
| if val is not None: |
| self.parity = val |
| |
| # Turn PassthroughScriptedThread into MultiplexedScriptedThread |
| for thread in self.threads.values(): |
| thread.__class__ = MultiplexedScriptedThread |
| |
| def get_process_id(self) -> int: |
| return self.parity + 420 |
| |
| def launch(self, should_stop: bool = True) -> lldb.SBError: |
| self.first_launch = True |
| return lldb.SBError() |
| |
| def resume(self, should_stop: bool) -> lldb.SBError: |
| if self.first_launch: |
| self.first_launch = False |
| return super().resume() |
| else: |
| if not self.multiplexer: |
| error = lldb.SBError("Multiplexer is not set.") |
| return error |
| return self.multiplexer.resume(should_stop) |
| |
| def get_threads_info(self) -> Dict[int, Any]: |
| if not self.multiplexer: |
| return super().get_threads_info() |
| filtered_threads = self.multiplexer.get_threads_info(pid=self.get_process_id()) |
| # Update the filtered thread class from PassthroughScriptedThread to MultiplexedScriptedThread |
| return dict( |
| map( |
| lambda pair: (pair[0], MultiplexedScriptedThread(pair[1])), |
| filtered_threads.items(), |
| ) |
| ) |
| |
| def create_breakpoint(self, addr, error, pid=None): |
| if not self.multiplexer: |
| error.SetErrorString("Multiplexer is not set.") |
| return self.multiplexer.create_breakpoint(addr, error, self.get_process_id()) |
| |
| def get_scripted_thread_plugin(self) -> str: |
| return f"{MultiplexedScriptedThread.__module__}.{MultiplexedScriptedThread.__name__}" |
| |
| |
| class MultiplexedScriptedThread(PassthroughScriptedThread): |
| def get_name(self) -> str: |
| parity = "Odd" if self.scripted_process.parity % 2 else "Even" |
| return f"{parity}{MultiplexedScriptedThread.__name__}.thread-{self.idx}" |
| |
| |
| class MultiplexerScriptedProcess(PassthroughScriptedProcess): |
| listener = None |
| multiplexed_processes = None |
| |
| def wait_for_driving_process_to_stop(self): |
| def handle_process_state_event(): |
| # Update multiplexer process |
| log("Updating interactive scripted process threads") |
| dbg = self.driving_target.GetDebugger() |
| new_driving_thread_ids = [] |
| for driving_thread in self.driving_process: |
| new_driving_thread_ids.append(driving_thread.id) |
| log(f"{len(self.threads)} New thread {hex(driving_thread.id)}") |
| structured_data = lldb.SBStructuredData() |
| structured_data.SetFromJSON( |
| json.dumps( |
| { |
| "driving_target_idx": dbg.GetIndexOfTarget( |
| self.driving_target |
| ), |
| "thread_idx": driving_thread.GetIndexID(), |
| } |
| ) |
| ) |
| |
| self.threads[driving_thread.id] = PassthroughScriptedThread( |
| self, structured_data |
| ) |
| |
| for thread_id in self.threads: |
| if thread_id not in new_driving_thread_ids: |
| log(f"Removing old thread {hex(thread_id)}") |
| del self.threads[thread_id] |
| |
| print(f"New thread count: {len(self.threads)}") |
| |
| mux_process = self.target.GetProcess() |
| mux_process.ForceScriptedState(lldb.eStateRunning) |
| mux_process.ForceScriptedState(lldb.eStateStopped) |
| |
| for child_process in self.multiplexed_processes.values(): |
| child_process.ForceScriptedState(lldb.eStateRunning) |
| child_process.ForceScriptedState(lldb.eStateStopped) |
| |
| event = lldb.SBEvent() |
| while True: |
| if not self.driving_process: |
| continue |
| if self.listener.WaitForEvent(1, event): |
| event_mask = event.GetType() |
| if event_mask & lldb.SBProcess.eBroadcastBitStateChanged: |
| state = lldb.SBProcess.GetStateFromEvent(event) |
| log(f"Received public process state event: {state}") |
| if state == lldb.eStateStopped: |
| # If it's a stop event, iterate over the driving process |
| # thread, looking for a breakpoint stop reason, if internal |
| # continue. |
| handle_process_state_event() |
| else: |
| continue |
| |
| def __init__(self, exe_ctx: lldb.SBExecutionContext, args: lldb.SBStructuredData): |
| super().__init__(exe_ctx, args, launched_driving_process=False) |
| if isinstance(self.driving_target, lldb.SBTarget) and self.driving_target: |
| self.listener = lldb.SBListener( |
| "lldb.listener.multiplexer-scripted-process" |
| ) |
| self.multiplexed_processes = {} |
| |
| # Copy breakpoints from real target to passthrough |
| with tempfile.NamedTemporaryFile() as tf: |
| bkpt_file = lldb.SBFileSpec(tf.name) |
| error = self.driving_target.BreakpointsWriteToFile(bkpt_file) |
| if error.Fail(): |
| log( |
| "Failed to save breakpoints from driving target (%s)" |
| % error.GetCString() |
| ) |
| bkpts_list = lldb.SBBreakpointList(self.target) |
| error = self.target.BreakpointsCreateFromFile(bkpt_file, bkpts_list) |
| if error.Fail(): |
| log( |
| "Failed create breakpoints from driving target \ |
| (bkpt file: %s)" |
| % tf.name |
| ) |
| |
| # Copy breakpoint from passthrough to real target |
| if error.Success(): |
| self.driving_target.DeleteAllBreakpoints() |
| for bkpt in self.target.breakpoints: |
| if bkpt.IsValid(): |
| for bl in bkpt: |
| real_bpkt = self.driving_target.BreakpointCreateBySBAddress( |
| bl.GetAddress() |
| ) |
| if not real_bpkt.IsValid(): |
| log( |
| "Failed to set breakpoint at address %s in \ |
| driving target" |
| % hex(bl.GetLoadAddress()) |
| ) |
| |
| self.listener_thread = Thread( |
| target=self.wait_for_driving_process_to_stop, daemon=True |
| ) |
| self.listener_thread.start() |
| |
| def launch(self, should_stop: bool = True) -> lldb.SBError: |
| if not self.driving_target: |
| return lldb.SBError( |
| f"{self.__class__.__name__}.resume: Invalid driving target." |
| ) |
| |
| if self.driving_process: |
| return lldb.SBError( |
| f"{self.__class__.__name__}.resume: Invalid driving process." |
| ) |
| |
| error = lldb.SBError() |
| launch_info = lldb.SBLaunchInfo(None) |
| launch_info.SetListener(self.listener) |
| driving_process = self.driving_target.Launch(launch_info, error) |
| |
| if not driving_process or error.Fail(): |
| return error |
| |
| self.driving_process = driving_process |
| |
| for module in self.driving_target.modules: |
| path = module.file.fullpath |
| load_addr = module.GetObjectFileHeaderAddress().GetLoadAddress( |
| self.driving_target |
| ) |
| self.loaded_images.append({"path": path, "load_addr": load_addr}) |
| |
| self.first_resume = True |
| return error |
| |
| def resume(self, should_stop: bool = True) -> lldb.SBError: |
| if self.first_resume: |
| # When we resume the multiplexer process for the first time, |
| # we shouldn't do anything because lldb's execution machinery |
| # will resume the driving process by itself. |
| |
| # Also, no need to update the multiplexer scripted process state |
| # here because since it's listening for the real process stop events. |
| # Once it receives the stop event from the driving process, |
| # `wait_for_driving_process_to_stop` will update the multiplexer |
| # state for us. |
| |
| self.first_resume = False |
| return lldb.SBError() |
| |
| if not self.driving_process: |
| return lldb.SBError( |
| f"{self.__class__.__name__}.resume: Invalid driving process." |
| ) |
| |
| return self.driving_process.Continue() |
| |
| def get_threads_info(self, pid: int = None) -> Dict[int, Any]: |
| if not pid: |
| return super().get_threads_info() |
| parity = pid % 2 |
| return dict(filter(lambda pair: pair[0] % 2 == parity, self.threads.items())) |
| |
| def create_breakpoint(self, addr, error, pid=None): |
| if not self.driving_target: |
| error.SetErrorString("%s has no driving target." % self.__class__.__name__) |
| return False |
| |
| def create_breakpoint_with_name(target, load_addr, name, error): |
| addr = lldb.SBAddress(load_addr, target) |
| if not addr.IsValid(): |
| error.SetErrorString("Invalid breakpoint address %s" % hex(load_addr)) |
| return False |
| bkpt = target.BreakpointCreateBySBAddress(addr) |
| if not bkpt.IsValid(): |
| error.SetErrorString( |
| "Failed to create breakpoint at address %s" |
| % hex(addr.GetLoadAddress()) |
| ) |
| return False |
| error = bkpt.AddNameWithErrorHandling(name) |
| return error.Success() |
| |
| name = ( |
| "multiplexer_scripted_process" |
| if not pid |
| else f"multiplexed_scripted_process_{pid}" |
| ) |
| |
| if pid is not None: |
| # This means that this method has been called from one of the |
| # multiplexed scripted process. That also means that the multiplexer |
| # target doesn't have this breakpoint created. |
| mux_error = lldb.SBError() |
| bkpt = create_breakpoint_with_name(self.target, addr, name, mux_error) |
| if mux_error.Fail(): |
| error.SetError( |
| "Failed to create breakpoint in multiplexer \ |
| target: %s" |
| % mux_error.GetCString() |
| ) |
| return False |
| return create_breakpoint_with_name(self.driving_target, addr, name, error) |
| |
| |
| def multiplex(mux_process, muxed_process): |
| muxed_process.GetScriptedImplementation().multiplexer = ( |
| mux_process.GetScriptedImplementation() |
| ) |
| mux_process.GetScriptedImplementation().multiplexed_processes[ |
| muxed_process.GetProcessID() |
| ] = muxed_process |
| |
| |
| def launch_scripted_process(target, class_name, dictionary): |
| structured_data = lldb.SBStructuredData() |
| structured_data.SetFromJSON(json.dumps(dictionary)) |
| |
| launch_info = lldb.SBLaunchInfo(None) |
| launch_info.SetProcessPluginName("ScriptedProcess") |
| launch_info.SetScriptedProcessClassName(class_name) |
| launch_info.SetScriptedProcessDictionary(structured_data) |
| |
| error = lldb.SBError() |
| return target.Launch(launch_info, error) |
| |
| |
| def duplicate_target(driving_target): |
| error = lldb.SBError() |
| exe = driving_target.executable.fullpath |
| triple = driving_target.triple |
| debugger = driving_target.GetDebugger() |
| return debugger.CreateTargetWithFileAndTargetTriple(exe, triple) |
| |
| |
| def create_mux_process(debugger, command, exe_ctx, result, dict): |
| if not debugger.GetNumTargets() > 0: |
| return result.SetError( |
| "Interactive scripted processes requires one non scripted process." |
| ) |
| |
| debugger.SetAsync(True) |
| |
| driving_target = debugger.GetSelectedTarget() |
| if not driving_target: |
| return result.SetError("Driving target is invalid") |
| |
| # Create a seconde target for the multiplexer scripted process |
| mux_target = duplicate_target(driving_target) |
| if not mux_target: |
| return result.SetError( |
| "Couldn't duplicate driving target to launch multiplexer scripted process" |
| ) |
| |
| class_name = f"{__name__}.{MultiplexerScriptedProcess.__name__}" |
| dictionary = {"driving_target_idx": debugger.GetIndexOfTarget(driving_target)} |
| mux_process = launch_scripted_process(mux_target, class_name, dictionary) |
| if not mux_process: |
| return result.SetError("Couldn't launch multiplexer scripted process") |
| |
| |
| def create_child_processes(debugger, command, exe_ctx, result, dict): |
| if not debugger.GetNumTargets() >= 2: |
| return result.SetError("Scripted Multiplexer process not setup") |
| |
| debugger.SetAsync(True) |
| |
| # Create a seconde target for the multiplexer scripted process |
| mux_target = debugger.GetSelectedTarget() |
| if not mux_target: |
| return result.SetError("Couldn't get multiplexer scripted process target") |
| mux_process = mux_target.GetProcess() |
| if not mux_process: |
| return result.SetError("Couldn't get multiplexer scripted process") |
| |
| driving_target = mux_process.GetScriptedImplementation().driving_target |
| if not driving_target: |
| return result.SetError("Driving target is invalid") |
| |
| # Create a target for the multiplexed even scripted process |
| even_target = duplicate_target(driving_target) |
| if not even_target: |
| return result.SetError( |
| "Couldn't duplicate driving target to launch multiplexed even scripted process" |
| ) |
| |
| class_name = f"{__name__}.{MultiplexedScriptedProcess.__name__}" |
| dictionary = {"driving_target_idx": debugger.GetIndexOfTarget(mux_target)} |
| dictionary["parity"] = 0 |
| even_process = launch_scripted_process(even_target, class_name, dictionary) |
| if not even_process: |
| return result.SetError("Couldn't launch multiplexed even scripted process") |
| multiplex(mux_process, even_process) |
| |
| # Create a target for the multiplexed odd scripted process |
| odd_target = duplicate_target(driving_target) |
| if not odd_target: |
| return result.SetError( |
| "Couldn't duplicate driving target to launch multiplexed odd scripted process" |
| ) |
| |
| dictionary["parity"] = 1 |
| odd_process = launch_scripted_process(odd_target, class_name, dictionary) |
| if not odd_process: |
| return result.SetError("Couldn't launch multiplexed odd scripted process") |
| multiplex(mux_process, odd_process) |
| |
| |
| def log(message): |
| # FIXME: For now, we discard the log message until we can pass it to an lldb |
| # logging channel. |
| should_log = False |
| if should_log: |
| print(message) |
| |
| |
| def __lldb_init_module(dbg, dict): |
| dbg.HandleCommand( |
| "command script add -o -f interactive_scripted_process.create_mux_process create_mux" |
| ) |
| dbg.HandleCommand( |
| "command script add -o -f interactive_scripted_process.create_child_processes create_sub" |
| ) |