| import random |
| |
| from lldbsuite.test.decorators import * |
| from lldbsuite.test.lldbtest import * |
| |
| from fork_testbase import GdbRemoteForkTestBase |
| |
| |
| class TestGdbRemoteFork(GdbRemoteForkTestBase): |
| def setUp(self): |
| GdbRemoteForkTestBase.setUp(self) |
| if self.getPlatform() == "linux" and self.getArchitecture() in [ |
| "arm", |
| "aarch64", |
| ]: |
| self.skipTest("Unsupported for Arm/AArch64 Linux") |
| |
| @add_test_categories(["fork"]) |
| def test_fork_multithreaded(self): |
| _, _, child_pid, _ = self.start_fork_test(["thread:new"] * 2 + ["fork"]) |
| |
| # detach the forked child |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $D;{}#00".format(child_pid), |
| "send packet: $OK#00", |
| "read packet: $k#00", |
| ], |
| True, |
| ) |
| self.expect_gdbremote_sequence() |
| |
| @add_test_categories(["fork"]) |
| def test_fork(self): |
| parent_pid, _ = self.fork_and_detach_test("fork") |
| |
| # resume the parent |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $c#00", |
| "send packet: $W00;process:{}#00".format(parent_pid), |
| ], |
| True, |
| ) |
| self.expect_gdbremote_sequence() |
| |
| @add_test_categories(["fork"]) |
| def test_vfork(self): |
| parent_pid, parent_tid = self.fork_and_detach_test("vfork") |
| |
| # resume the parent |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $c#00", |
| { |
| "direction": "send", |
| "regex": r"[$]T[0-9a-fA-F]{{2}}thread:p{}[.]{}.*vforkdone.*".format( |
| parent_pid, parent_tid |
| ), |
| }, |
| "read packet: $c#00", |
| "send packet: $W00;process:{}#00".format(parent_pid), |
| ], |
| True, |
| ) |
| self.expect_gdbremote_sequence() |
| |
| @add_test_categories(["fork"]) |
| def test_fork_follow(self): |
| self.fork_and_follow_test("fork") |
| |
| @add_test_categories(["fork"]) |
| def test_vfork_follow(self): |
| self.fork_and_follow_test("vfork") |
| |
| @add_test_categories(["fork"]) |
| def test_select_wrong_pid(self): |
| self.build() |
| self.prep_debug_monitor_and_inferior() |
| self.add_qSupported_packets(["multiprocess+"]) |
| ret = self.expect_gdbremote_sequence() |
| self.assertIn("multiprocess+", ret["qSupported_response"]) |
| self.reset_test_sequence() |
| |
| # get process pid |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $qC#00", |
| { |
| "direction": "send", |
| "regex": "[$]QCp([0-9a-f]+).([0-9a-f]+)#.*", |
| "capture": {1: "pid", 2: "tid"}, |
| }, |
| ], |
| True, |
| ) |
| ret = self.expect_gdbremote_sequence() |
| pid, tid = (int(ret[x], 16) for x in ("pid", "tid")) |
| self.reset_test_sequence() |
| |
| self.test_sequence.add_log_lines( |
| [ |
| # try switching to correct pid |
| "read packet: $Hgp{:x}.{:x}#00".format(pid, tid), |
| "send packet: $OK#00", |
| "read packet: $Hcp{:x}.{:x}#00".format(pid, tid), |
| "send packet: $OK#00", |
| # try switching to invalid tid |
| "read packet: $Hgp{:x}.{:x}#00".format(pid, tid + 1), |
| "send packet: $E15#00", |
| "read packet: $Hcp{:x}.{:x}#00".format(pid, tid + 1), |
| "send packet: $E15#00", |
| # try switching to invalid pid |
| "read packet: $Hgp{:x}.{:x}#00".format(pid + 1, tid), |
| "send packet: $Eff#00", |
| "read packet: $Hcp{:x}.{:x}#00".format(pid + 1, tid), |
| "send packet: $Eff#00", |
| ], |
| True, |
| ) |
| self.expect_gdbremote_sequence() |
| |
| @add_test_categories(["fork"]) |
| def test_detach_current(self): |
| self.build() |
| self.prep_debug_monitor_and_inferior() |
| self.add_qSupported_packets(["multiprocess+"]) |
| ret = self.expect_gdbremote_sequence() |
| self.assertIn("multiprocess+", ret["qSupported_response"]) |
| self.reset_test_sequence() |
| |
| # get process pid |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $qC#00", |
| { |
| "direction": "send", |
| "regex": "[$]QCp([0-9a-f]+).[0-9a-f]+#.*", |
| "capture": {1: "pid"}, |
| }, |
| ], |
| True, |
| ) |
| ret = self.expect_gdbremote_sequence() |
| pid = ret["pid"] |
| self.reset_test_sequence() |
| |
| # detach the process |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $D;{}#00".format(pid), |
| "send packet: $OK#00", |
| "read packet: $qC#00", |
| "send packet: $E44#00", |
| ], |
| True, |
| ) |
| self.expect_gdbremote_sequence() |
| |
| @add_test_categories(["fork"]) |
| def test_detach_all(self): |
| self.detach_all_test() |
| |
| @add_test_categories(["fork"]) |
| def test_kill_all(self): |
| parent_pid, _, child_pid, _ = self.start_fork_test(["fork"]) |
| |
| exit_regex = "[$]X09;process:([0-9a-f]+)#.*" |
| self.test_sequence.add_log_lines( |
| [ |
| # kill all processes |
| "read packet: $k#00", |
| {"direction": "send", "regex": exit_regex, "capture": {1: "pid1"}}, |
| {"direction": "send", "regex": exit_regex, "capture": {1: "pid2"}}, |
| ], |
| True, |
| ) |
| ret = self.expect_gdbremote_sequence() |
| self.assertEqual(set([ret["pid1"], ret["pid2"]]), set([parent_pid, child_pid])) |
| |
| @add_test_categories(["fork"]) |
| def test_vkill_child(self): |
| self.vkill_test(kill_child=True) |
| |
| @add_test_categories(["fork"]) |
| def test_vkill_parent(self): |
| self.vkill_test(kill_parent=True) |
| |
| @add_test_categories(["fork"]) |
| def test_vkill_both(self): |
| self.vkill_test(kill_parent=True, kill_child=True) |
| |
| @add_test_categories(["fork"]) |
| def test_c_parent(self): |
| self.resume_one_test(run_order=["parent", "parent"]) |
| |
| @add_test_categories(["fork"]) |
| def test_c_child(self): |
| self.resume_one_test(run_order=["child", "child"]) |
| |
| @add_test_categories(["fork"]) |
| def test_c_parent_then_child(self): |
| self.resume_one_test(run_order=["parent", "parent", "child", "child"]) |
| |
| @add_test_categories(["fork"]) |
| def test_c_child_then_parent(self): |
| self.resume_one_test(run_order=["child", "child", "parent", "parent"]) |
| |
| @add_test_categories(["fork"]) |
| def test_c_interspersed(self): |
| self.resume_one_test(run_order=["parent", "child", "parent", "child"]) |
| |
| @add_test_categories(["fork"]) |
| def test_vCont_parent(self): |
| self.resume_one_test(run_order=["parent", "parent"], use_vCont=True) |
| |
| @add_test_categories(["fork"]) |
| def test_vCont_child(self): |
| self.resume_one_test(run_order=["child", "child"], use_vCont=True) |
| |
| @add_test_categories(["fork"]) |
| def test_vCont_parent_then_child(self): |
| self.resume_one_test( |
| run_order=["parent", "parent", "child", "child"], use_vCont=True |
| ) |
| |
| @add_test_categories(["fork"]) |
| def test_vCont_child_then_parent(self): |
| self.resume_one_test( |
| run_order=["child", "child", "parent", "parent"], use_vCont=True |
| ) |
| |
| @add_test_categories(["fork"]) |
| def test_vCont_interspersed(self): |
| self.resume_one_test( |
| run_order=["parent", "child", "parent", "child"], use_vCont=True |
| ) |
| |
| @add_test_categories(["fork"]) |
| def test_vCont_two_processes(self): |
| parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( |
| ["fork", "stop"] |
| ) |
| |
| self.test_sequence.add_log_lines( |
| [ |
| # try to resume both processes |
| "read packet: $vCont;c:p{}.{};c:p{}.{}#00".format( |
| parent_pid, parent_tid, child_pid, child_tid |
| ), |
| "send packet: $E03#00", |
| ], |
| True, |
| ) |
| self.expect_gdbremote_sequence() |
| |
| @add_test_categories(["fork"]) |
| def test_vCont_all_processes_explicit(self): |
| self.start_fork_test(["fork", "stop"]) |
| |
| self.test_sequence.add_log_lines( |
| [ |
| # try to resume all processes implicitly |
| "read packet: $vCont;c:p-1.-1#00", |
| "send packet: $E03#00", |
| ], |
| True, |
| ) |
| self.expect_gdbremote_sequence() |
| |
| @add_test_categories(["fork"]) |
| def test_vCont_all_processes_implicit(self): |
| self.start_fork_test(["fork", "stop"]) |
| |
| self.test_sequence.add_log_lines( |
| [ |
| # try to resume all processes implicitly |
| "read packet: $vCont;c#00", |
| "send packet: $E03#00", |
| ], |
| True, |
| ) |
| self.expect_gdbremote_sequence() |
| |
| @add_test_categories(["fork"]) |
| def test_threadinfo(self): |
| parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( |
| ["fork", "thread:new", "stop"] |
| ) |
| pidtids = [ |
| (parent_pid, parent_tid), |
| (child_pid, child_tid), |
| ] |
| |
| self.add_threadinfo_collection_packets() |
| ret = self.expect_gdbremote_sequence() |
| prev_pidtids = set(self.parse_threadinfo_packets(ret)) |
| self.assertEqual( |
| prev_pidtids, |
| frozenset((int(pid, 16), int(tid, 16)) for pid, tid in pidtids), |
| ) |
| self.reset_test_sequence() |
| |
| for pidtid in pidtids: |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $Hcp{}.{}#00".format(*pidtid), |
| "send packet: $OK#00", |
| "read packet: $c#00", |
| { |
| "direction": "send", |
| "regex": self.stop_regex.format(*pidtid), |
| }, |
| ], |
| True, |
| ) |
| self.add_threadinfo_collection_packets() |
| ret = self.expect_gdbremote_sequence() |
| self.reset_test_sequence() |
| new_pidtids = set(self.parse_threadinfo_packets(ret)) |
| added_pidtid = new_pidtids - prev_pidtids |
| prev_pidtids = new_pidtids |
| |
| # verify that we've got exactly one new thread, and that |
| # the PID matches |
| self.assertEqual(len(added_pidtid), 1) |
| self.assertEqual(added_pidtid.pop()[0], int(pidtid[0], 16)) |
| |
| for pidtid in new_pidtids: |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), |
| "send packet: $OK#00", |
| ], |
| True, |
| ) |
| self.expect_gdbremote_sequence() |
| |
| @add_test_categories(["fork"]) |
| def test_memory_read_write(self): |
| self.build() |
| INITIAL_DATA = "Initial message" |
| self.prep_debug_monitor_and_inferior( |
| inferior_args=[ |
| "set-message:{}".format(INITIAL_DATA), |
| "get-data-address-hex:g_message", |
| "fork", |
| "print-message:", |
| "stop", |
| ] |
| ) |
| self.add_qSupported_packets(["multiprocess+", "fork-events+"]) |
| ret = self.expect_gdbremote_sequence() |
| self.assertIn("fork-events+", ret["qSupported_response"]) |
| self.reset_test_sequence() |
| |
| # continue and expect fork |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $c#00", |
| { |
| "type": "output_match", |
| "regex": self.maybe_strict_output_regex( |
| r"data address: 0x([0-9a-fA-F]+)\r\n" |
| ), |
| "capture": {1: "addr"}, |
| }, |
| { |
| "direction": "send", |
| "regex": self.fork_regex.format("fork"), |
| "capture": self.fork_capture, |
| }, |
| ], |
| True, |
| ) |
| ret = self.expect_gdbremote_sequence() |
| pidtids = { |
| "parent": (ret["parent_pid"], ret["parent_tid"]), |
| "child": (ret["child_pid"], ret["child_tid"]), |
| } |
| addr = ret["addr"] |
| self.reset_test_sequence() |
| |
| for name, pidtid in pidtids.items(): |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $Hgp{}.{}#00".format(*pidtid), |
| "send packet: $OK#00", |
| # read the current memory contents |
| "read packet: $m{},{:x}#00".format(addr, len(INITIAL_DATA) + 1), |
| { |
| "direction": "send", |
| "regex": r"^[$](.+)#.*$", |
| "capture": {1: "data"}, |
| }, |
| # write a new value |
| "read packet: $M{},{:x}:{}#00".format( |
| addr, len(name) + 1, seven.hexlify(name + "\0") |
| ), |
| "send packet: $OK#00", |
| # resume the process and wait for the trap |
| "read packet: $Hcp{}.{}#00".format(*pidtid), |
| "send packet: $OK#00", |
| "read packet: $c#00", |
| { |
| "type": "output_match", |
| "regex": self.maybe_strict_output_regex(r"message: (.*)\r\n"), |
| "capture": {1: "printed_message"}, |
| }, |
| { |
| "direction": "send", |
| "regex": self.stop_regex.format(*pidtid), |
| }, |
| ], |
| True, |
| ) |
| ret = self.expect_gdbremote_sequence() |
| data = seven.unhexlify(ret["data"]) |
| self.assertEqual(data, INITIAL_DATA + "\0") |
| self.assertEqual(ret["printed_message"], name) |
| self.reset_test_sequence() |
| |
| # we do the second round separately to make sure that initial data |
| # is correctly preserved while writing into the first process |
| |
| for name, pidtid in pidtids.items(): |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $Hgp{}.{}#00".format(*pidtid), |
| "send packet: $OK#00", |
| # read the current memory contents |
| "read packet: $m{},{:x}#00".format(addr, len(name) + 1), |
| { |
| "direction": "send", |
| "regex": r"^[$](.+)#.*$", |
| "capture": {1: "data"}, |
| }, |
| ], |
| True, |
| ) |
| ret = self.expect_gdbremote_sequence() |
| self.assertIsNotNone(ret.get("data")) |
| data = seven.unhexlify(ret.get("data")) |
| self.assertEqual(data, name + "\0") |
| self.reset_test_sequence() |
| |
| @add_test_categories(["fork"]) |
| def test_register_read_write(self): |
| parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( |
| ["fork", "thread:new", "stop"] |
| ) |
| pidtids = [ |
| (parent_pid, parent_tid), |
| (child_pid, child_tid), |
| ] |
| |
| for pidtid in pidtids: |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $Hcp{}.{}#00".format(*pidtid), |
| "send packet: $OK#00", |
| "read packet: $c#00", |
| { |
| "direction": "send", |
| "regex": self.stop_regex.format(*pidtid), |
| }, |
| ], |
| True, |
| ) |
| |
| self.add_threadinfo_collection_packets() |
| ret = self.expect_gdbremote_sequence() |
| self.reset_test_sequence() |
| |
| pidtids = set(self.parse_threadinfo_packets(ret)) |
| self.assertEqual(len(pidtids), 4) |
| # first, save register values from all the threads |
| thread_regs = {} |
| for pidtid in pidtids: |
| for regno in range(256): |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), |
| "send packet: $OK#00", |
| "read packet: $p{:x}#00".format(regno), |
| { |
| "direction": "send", |
| "regex": r"^[$](.+)#.*$", |
| "capture": {1: "data"}, |
| }, |
| ], |
| True, |
| ) |
| ret = self.expect_gdbremote_sequence() |
| data = ret.get("data") |
| self.assertIsNotNone(data) |
| # ignore registers shorter than 32 bits (this also catches |
| # "Exx" errors) |
| if len(data) >= 8: |
| break |
| else: |
| self.skipTest("no usable register found") |
| thread_regs[pidtid] = (regno, data) |
| |
| vals = set(x[1] for x in thread_regs.values()) |
| # NB: cheap hack to make the loop below easier |
| new_val = next(iter(vals)) |
| |
| # then, start altering them and verify that we don't unexpectedly |
| # change the value from another thread |
| for pidtid in pidtids: |
| old_val = thread_regs[pidtid] |
| regno = old_val[0] |
| old_val_length = len(old_val[1]) |
| # generate a unique new_val |
| while new_val in vals: |
| new_val = "{{:0{}x}}".format(old_val_length).format( |
| random.getrandbits(old_val_length * 4) |
| ) |
| vals.add(new_val) |
| |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), |
| "send packet: $OK#00", |
| "read packet: $p{:x}#00".format(regno), |
| { |
| "direction": "send", |
| "regex": r"^[$](.+)#.*$", |
| "capture": {1: "data"}, |
| }, |
| "read packet: $P{:x}={}#00".format(regno, new_val), |
| "send packet: $OK#00", |
| ], |
| True, |
| ) |
| ret = self.expect_gdbremote_sequence() |
| data = ret.get("data") |
| self.assertIsNotNone(data) |
| self.assertEqual(data, old_val[1]) |
| thread_regs[pidtid] = (regno, new_val) |
| |
| # finally, verify that new values took effect |
| for pidtid in pidtids: |
| old_val = thread_regs[pidtid] |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), |
| "send packet: $OK#00", |
| "read packet: $p{:x}#00".format(old_val[0]), |
| { |
| "direction": "send", |
| "regex": r"^[$](.+)#.*$", |
| "capture": {1: "data"}, |
| }, |
| ], |
| True, |
| ) |
| ret = self.expect_gdbremote_sequence() |
| data = ret.get("data") |
| self.assertIsNotNone(data) |
| self.assertEqual(data, old_val[1]) |
| |
| @add_test_categories(["fork"]) |
| def test_qC(self): |
| parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( |
| ["fork", "thread:new", "stop"] |
| ) |
| pidtids = [ |
| (parent_pid, parent_tid), |
| (child_pid, child_tid), |
| ] |
| |
| for pidtid in pidtids: |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $Hcp{}.{}#00".format(*pidtid), |
| "send packet: $OK#00", |
| "read packet: $c#00", |
| { |
| "direction": "send", |
| "regex": self.stop_regex.format(*pidtid), |
| }, |
| ], |
| True, |
| ) |
| |
| self.add_threadinfo_collection_packets() |
| ret = self.expect_gdbremote_sequence() |
| self.reset_test_sequence() |
| |
| pidtids = set(self.parse_threadinfo_packets(ret)) |
| self.assertEqual(len(pidtids), 4) |
| for pidtid in pidtids: |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), |
| "send packet: $OK#00", |
| "read packet: $qC#00", |
| "send packet: $QCp{:x}.{:x}#00".format(*pidtid), |
| ], |
| True, |
| ) |
| self.expect_gdbremote_sequence() |
| |
| @add_test_categories(["fork"]) |
| def test_T(self): |
| parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( |
| ["fork", "thread:new", "stop"] |
| ) |
| pidtids = [ |
| (parent_pid, parent_tid), |
| (child_pid, child_tid), |
| ] |
| |
| for pidtid in pidtids: |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $Hcp{}.{}#00".format(*pidtid), |
| "send packet: $OK#00", |
| "read packet: $c#00", |
| { |
| "direction": "send", |
| "regex": self.stop_regex.format(*pidtid), |
| }, |
| ], |
| True, |
| ) |
| |
| self.add_threadinfo_collection_packets() |
| ret = self.expect_gdbremote_sequence() |
| self.reset_test_sequence() |
| |
| pidtids = set(self.parse_threadinfo_packets(ret)) |
| self.assertEqual(len(pidtids), 4) |
| max_pid = max(pid for pid, tid in pidtids) |
| max_tid = max(tid for pid, tid in pidtids) |
| bad_pidtids = ( |
| (max_pid, max_tid + 1, "E02"), |
| (max_pid + 1, max_tid, "E01"), |
| (max_pid + 1, max_tid + 1, "E01"), |
| ) |
| |
| for pidtid in pidtids: |
| self.test_sequence.add_log_lines( |
| [ |
| # test explicit PID+TID |
| "read packet: $Tp{:x}.{:x}#00".format(*pidtid), |
| "send packet: $OK#00", |
| # test implicit PID via Hg |
| "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), |
| "send packet: $OK#00", |
| "read packet: $T{:x}#00".format(max_tid + 1), |
| "send packet: $E02#00", |
| "read packet: $T{:x}#00".format(pidtid[1]), |
| "send packet: $OK#00", |
| ], |
| True, |
| ) |
| for pid, tid, expected in bad_pidtids: |
| self.test_sequence.add_log_lines( |
| [ |
| "read packet: $Tp{:x}.{:x}#00".format(pid, tid), |
| "send packet: ${}#00".format(expected), |
| ], |
| True, |
| ) |
| self.expect_gdbremote_sequence() |