Snap for 10453563 from 8dccc7f036db4f36e4b47c62b2c79c07403d8a1d to mainline-ipsec-release

Change-Id: Ie3db261a0a1fdfa2eccab54834aa2a7da5fe00c2
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 991d118..b788662 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -8,7 +8,7 @@
     strategy:
       matrix:
         os: [ubuntu-latest, macos-latest, windows-latest]
-        python-version: [3.6, 3.7, 3.8]
+        python-version: [3.7, 3.8]
     steps:
     - name: Checkout repo
       uses: actions/checkout@v2
diff --git a/CHANGELOG.md b/CHANGELOG.md
index aaf6c90..a94d23c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,53 @@
 # Mobly Release History
 
 
+## Mobly Release 1.12.1: Minor Improvements and Fixes
+
+### New
+* A logger adapter that makes it easier for modules to add their own log line prefixes
+
+### Fixes
+* `is_emulator` property now works for Cuttlefish image
+* Handle SIGTERM properly
+* Fixed missing result fields and output directories
+
+[Full list of changes](https://github.com/google/mobly/milestone/28?closed=1)
+
+
+## Mobly Release 1.12: New Snippet Base Client and a New `pre_run` Stage
+
+This release introduces the new generic Mobly snippet base client and the new
+Android snippet client built on top. The new snippet base enables us to better
+scale Mobly snippets across various platforms.
+
+The old Android snippet client is now considered deprecated, and will be
+removed in the following release. Please update your code accordingly:
+* `snippet_client` -> `snippet_client_v2`
+* `snippet_event` -> `mobly.snippet.callback_event`
+* `callback_handler` -> `callback_handler_v2`
+
+The `generate_setup_tests` stage is renamed to `pre_run` to better reflect its
+true role: steps that happen before all the test case methods are finalized.
+This is a pure rename with no functional changes. Please migrate your code as
+the `generate_setup_tests` stage will stop working completely in the next
+release.
+
+### New
+* Added the new Mobly snippet base client.
+* Added the new Android snippet client v2 based on the new base client.
+* Support changing Mobly's logger level to `DEBUG` via cli arg.
+* Termination signal type is now included in result records.
+
+### Breaking Changes
+* The old Android snippet client is deprecated.
+* The `generate_setup_tests` stage is now `pre_run`.
+
+### Fixes
+* Various issues in the Android snippet client.
+
+[Full list of changes](https://github.com/google/mobly/milestone/27?closed=1)
+
+
 ## Mobly Release 1.11.1: Support Test Case `repeat` and `retry`.
 
 ### New
diff --git a/README.md b/README.md
index 60638e8..290a5b1 100644
--- a/README.md
+++ b/README.md
@@ -35,7 +35,7 @@
 
 ## System dependencies
   - adb (1.0.40+ recommended)
-  - python3.6+
+  - python3.7+
   - python-setuptools
 
 *To use Python3, use `pip3` and `python3` (or python3.x) accordingly.*
diff --git a/docs/instrumentation_tutorial.md b/docs/instrumentation_tutorial.md
index 46036ea..e88b96c 100644
--- a/docs/instrumentation_tutorial.md
+++ b/docs/instrumentation_tutorial.md
@@ -124,7 +124,7 @@
 
 If you have a custom runner that you use for instrumentation tests, then you can
 specify it in the *run_instrumentation_test* method call. Replace
-`com.example.package.test.CustomRunner` with the fully quailied package name of
+`com.example.package.test.CustomRunner` with the fully qualified package name of
 your real instrumentation runner.
 
 ```python
diff --git a/docs/tutorial.md b/docs/tutorial.md
index 7d73275..f53b948 100644
--- a/docs/tutorial.md
+++ b/docs/tutorial.md
@@ -8,8 +8,8 @@
 
 *   A computer with at least 2 USB ports.
 *   Mobly package and its system dependencies installed on the computer.
-*   One or two Android devices with the [Mobly Bundled Snippets]
-    (https://github.com/google/mobly-bundled-snippets) (MBS) installed. We will
+*   One or two Android devices with the [Mobly Bundled Snippets](
+    https://github.com/google/mobly-bundled-snippets) (MBS) installed. We will
     use MBS to trigger actions on the Android devices.
 *   A working adb setup. To check, connect one Android device to the computer
     and make sure it has "USB debugging" enabled. Make sure the device shows up
@@ -291,13 +291,13 @@
 There's potentially a lot more we could do in this test, e.g. check
 the hardware address, see whether we can pair devices, transfer files, etc.
 
-To learn more about the features included in MBS, go to [MBS repo]
-(https://github.com/google/mobly-bundled-snippets) to see how to check its help
+To learn more about the features included in MBS, go to [MBS repo](
+https://github.com/google/mobly-bundled-snippets) to see how to check its help
 menu.
 
 To learn more about Mobly Snippet Lib, including features like Espresso support
-and asynchronous calls, see the [snippet lib examples]
-(https://github.com/google/mobly-snippet-lib/tree/master/examples).
+and asynchronous calls, see the [snippet lib examples](
+https://github.com/google/mobly-snippet-lib/tree/master/examples).
 
 
 ## Example 6: Generated Tests
diff --git a/mobly/Android.bp b/mobly/Android.bp
index c6bc3f4..a922a11 100644
--- a/mobly/Android.bp
+++ b/mobly/Android.bp
@@ -21,14 +21,6 @@
     srcs: [
         "**/*.py",
     ],
-    version: {
-        py2: {
-            enabled: false,
-        },
-        py3: {
-            enabled: true,
-        },
-    },
     libs: [
         "py-portpicker",
         "py-timeout-decorator",
diff --git a/mobly/asserts.py b/mobly/asserts.py
index e3fa007..70e1256 100644
--- a/mobly/asserts.py
+++ b/mobly/asserts.py
@@ -181,9 +181,9 @@
 
 
 def assert_count_equal(first, second, msg=None, extras=None):
-  """Asserts that two iterables have the same element count.
+  """Asserts that two iterables have the same elements, the same number of
+  times, without regard to order.
 
-  Element order does not matter.
   Similar to assert_equal(Counter(list(first)), Counter(list(second))).
 
   Args:
diff --git a/mobly/base_suite.py b/mobly/base_suite.py
new file mode 100644
index 0000000..6febcd8
--- /dev/null
+++ b/mobly/base_suite.py
@@ -0,0 +1,73 @@
+# Copyright 2022 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import abc
+
+class BaseSuite(abc.ABC):
+  """Class used to define a Mobly suite.
+
+  To create a suite, inherit from this class and implement setup_suite.
+
+  Use `BaseSuite.add_test_class` to specify which classes to run with which
+  configs and test selectors.
+
+  After defining the sub class, the suite can be executed using
+  suite_runner.run_suite_class.
+
+  Users can use this class if they need to define their own setup and teardown
+  steps on the suite level. Otherwise, just use suite_runner.run_suite on the
+  list of test classes.
+  """
+
+  def __init__(self, runner, config):
+    self._runner = runner
+    self._config = config.copy()
+
+  @property
+  def user_params(self):
+    return self._config.user_params
+
+  def add_test_class(self, clazz, config=None, tests=None, name_suffix=None):
+    """Adds a test class to the suite.
+
+    Args:
+      clazz: class, a Mobly test class.
+      config: config_parser.TestRunConfig, the config to run the class with. If
+        not specified, the default config passed from google3 infra is used.
+      tests: list of strings, names of the tests to run in this test class, in
+        the execution order. If not specified, all tests in the class are
+        executed.
+      name_suffix: string, suffix to append to the class name for reporting.
+        This is used for differentiating the same class executed with different
+        parameters in a suite.
+    """
+    if not config:
+      config = self._config
+    self._runner.add_test_class(config, clazz, tests, name_suffix)
+
+  @abc.abstractmethod
+  def setup_suite(self, config):
+    """Function used to add test classes, has to be implemented by child class.
+
+    Args:
+      config: config_parser.TestRunConfig, the config provided by google3 infra.
+
+    Raises:
+      Error: when setup_suite is not implemented by child class.
+    """
+    pass
+
+  def teardown_suite(self):
+    """Function used to add post tests cleanup tasks (optional)."""
+    pass
diff --git a/mobly/base_test.py b/mobly/base_test.py
index c559b44..e7da22c 100644
--- a/mobly/base_test.py
+++ b/mobly/base_test.py
@@ -36,6 +36,8 @@
 TEST_STAGE_END_LOG_TEMPLATE = '[{parent_token}]#{child_token} <<< END <<<'
 
 # Names of execution stages, in the order they happen during test runs.
+STAGE_NAME_PRE_RUN = 'pre_run'
+# Deprecated, use `STAGE_NAME_PRE_RUN` instead.
 STAGE_NAME_SETUP_GENERATED_TESTS = 'setup_generated_tests'
 STAGE_NAME_SETUP_CLASS = 'setup_class'
 STAGE_NAME_SETUP_TEST = 'setup_test'
@@ -339,22 +341,26 @@
       self.summary_writer.dump(record.to_dict(),
                                records.TestSummaryEntryType.CONTROLLER_INFO)
 
-  def _setup_generated_tests(self):
-    """Proxy function to guarantee the base implementation of
-    setup_generated_tests is called.
+  def _pre_run(self):
+    """Proxy function to guarantee the base implementation of `pre_run` is
+    called.
 
     Returns:
       True if setup is successful, False otherwise.
     """
-    stage_name = STAGE_NAME_SETUP_GENERATED_TESTS
+    stage_name = STAGE_NAME_PRE_RUN
     record = records.TestResultRecord(stage_name, self.TAG)
     record.test_begin()
     self.current_test_info = runtime_test_info.RuntimeTestInfo(
         stage_name, self.log_path, record)
     try:
       with self._log_test_stage(stage_name):
+        self.pre_run()
+      # TODO(angli): Remove this context block after the full deprecation of
+      # `setup_generated_tests`.
+      with self._log_test_stage(stage_name):
         self.setup_generated_tests()
-        return True
+      return True
     except Exception as e:
       logging.exception('%s failed for %s.', stage_name, self.TAG)
       record.test_error(e)
@@ -363,7 +369,7 @@
                                records.TestSummaryEntryType.RECORD)
       return False
 
-  def setup_generated_tests(self):
+  def pre_run(self):
     """Preprocesses that need to be done before setup_class.
 
     This phase is used to do pre-test processes like generating tests.
@@ -374,6 +380,19 @@
     requested is unknown at this point.
     """
 
+  def setup_generated_tests(self):
+    """[DEPRECATED] Use `pre_run` instead.
+
+    Preprocesses that need to be done before setup_class.
+
+    This phase is used to do pre-test processes like generating tests.
+    This is the only place `self.generate_tests` should be called.
+
+    If this function throws an error, the test class will be marked failure
+    and the "Requested" field will be 0 because the number of tests
+    requested is unknown at this point.
+    """
+
   def _setup_class(self):
     """Proxy function to guarantee the base implementation of setup_class
     is called.
@@ -452,6 +471,7 @@
                                records.TestSummaryEntryType.RECORD)
     else:
       if expects.recorder.has_error:
+        record.test_error()
         record.update_record()
         self.results.add_class_error(record)
         self.summary_writer.dump(record.to_dict(),
@@ -641,6 +661,7 @@
       content: dict, the data to add to summary file.
     """
     if 'timestamp' not in content:
+      content = content.copy()
       content['timestamp'] = utils.get_current_epoch_time()
     self.summary_writer.dump(content, records.TestSummaryEntryType.USER_DATA)
 
@@ -754,7 +775,7 @@
           _, _, traceback = sys.exc_info()
           raise signals.TestError(e.details, e.extras).with_traceback(traceback)
         test_method()
-      except (signals.TestPass, signals.TestAbortSignal):
+      except (signals.TestPass, signals.TestAbortSignal, signals.TestSkip):
         raise
       except Exception:
         logging.exception('Exception occurred in %s.',
@@ -819,21 +840,22 @@
         self.current_test_info = None
     return tr_record
 
-  def _assert_function_name_in_stack(self, expected_func_name):
-    """Asserts that the current stack contains the given function name."""
+  def _assert_function_names_in_stack(self, expected_func_names):
+    """Asserts that the current stack contains any of the given function names.
+    """
     current_frame = inspect.currentframe()
     caller_frames = inspect.getouterframes(current_frame, 2)
     for caller_frame in caller_frames[2:]:
-      if caller_frame[3] == expected_func_name:
+      if caller_frame[3] in expected_func_names:
         return
-    raise Error('"%s" cannot be called outside of %s' %
-                (caller_frames[1][3], expected_func_name))
+    raise Error(f"'{caller_frames[1][3]}' cannot be called outside of the "
+                f"following functions: {expected_func_names}.")
 
   def generate_tests(self, test_logic, name_func, arg_sets, uid_func=None):
     """Generates tests in the test class.
 
-    This function has to be called inside a test class's
-    `self.setup_generated_tests` function.
+    This function has to be called inside a test class's `self.pre_run` or
+    `self.setup_generated_tests`.
 
     Generated tests are not written down as methods, but as a list of
     parameter sets. This way we reduce code repetition and improve test
@@ -854,7 +876,8 @@
         arguments as the test logic function and returns a string that
         is the corresponding UID.
     """
-    self._assert_function_name_in_stack(STAGE_NAME_SETUP_GENERATED_TESTS)
+    self._assert_function_names_in_stack(
+        [STAGE_NAME_PRE_RUN, STAGE_NAME_SETUP_GENERATED_TESTS])
     root_msg = 'During test generation of "%s":' % test_logic.__name__
     for args in arg_sets:
       test_name = name_func(*args)
@@ -866,8 +889,8 @@
       # decorators, copy the attributes added by the decorators to the
       # generated test methods as well, so the generated test methods
       # also have the retry/repeat behavior.
-      for attr_name in (
-        ATTR_MAX_RETRY_CNT, ATTR_MAX_CONSEC_ERROR, ATTR_REPEAT_CNT):
+      for attr_name in (ATTR_MAX_RETRY_CNT, ATTR_MAX_CONSEC_ERROR,
+                        ATTR_REPEAT_CNT):
         attr = getattr(test_logic, attr_name, None)
         if attr is not None:
           setattr(test_func, attr_name, attr)
@@ -907,7 +930,7 @@
     'test_*'.
 
     Note this only gets the names of tests that already exist. If
-    `setup_generated_test` has not happened when this was called, the
+    `generate_tests` has not happened when this was called, the
     generated tests won't be listed.
 
     Returns:
@@ -987,7 +1010,7 @@
     """
     logging.log_path = self.log_path
     # Executes pre-setup procedures, like generating test methods.
-    if not self._setup_generated_tests():
+    if not self._pre_run():
       return self.results
     logging.info('==========> %s <==========', self.TAG)
     # Devise the actual test methods to run in the test class.
diff --git a/mobly/config_parser.py b/mobly/config_parser.py
index 0f31cdc..2f2da91 100644
--- a/mobly/config_parser.py
+++ b/mobly/config_parser.py
@@ -176,8 +176,7 @@
   """
 
   def __init__(self):
-    # Init value is an empty string to avoid string joining errors.
-    self.log_path = ''
+    self.log_path = _DEFAULT_LOG_PATH
     # Deprecated, use 'testbed_name'
     self.test_bed_name = None
     self.testbed_name = None
diff --git a/mobly/controllers/android_device.py b/mobly/controllers/android_device.py
index 40b47a4..54166a5 100644
--- a/mobly/controllers/android_device.py
+++ b/mobly/controllers/android_device.py
@@ -183,7 +183,7 @@
                          'services failed to start.')
 
 
-def parse_device_list(device_list_str, key):
+def parse_device_list(device_list_str, key=None):
   """Parses a byte string representing a list of devices.
 
   The string is generated by calling either adb or fastboot. The tokens in
@@ -191,7 +191,9 @@
 
   Args:
     device_list_str: Output of adb or fastboot.
-    key: The token that signifies a device in device_list_str.
+    key: The token that signifies a device in device_list_str. Only devices
+      with the specified key in device_list_str are parsed, such as 'device' or
+      'fastbootd'. If not specified, all devices listed are parsed.
 
   Returns:
     A list of android device serial numbers.
@@ -204,7 +206,7 @@
   results = []
   for line in clean_lines:
     tokens = line.strip().split('\t')
-    if len(tokens) == 2 and tokens[1] == key:
+    if len(tokens) == 2 and (key is None or tokens[1] == key):
       results.append(tokens[0])
   return results
 
@@ -249,7 +251,7 @@
     A list of android device serials. Empty if there's none.
   """
   out = fastboot.FastbootProxy().devices()
-  return parse_device_list(out, 'fastboot')
+  return parse_device_list(out)
 
 
 def get_instances(serials):
@@ -491,8 +493,8 @@
   def __init__(self, serial=''):
     self._serial = str(serial)
     # logging.log_path only exists when this is used in an Mobly test run.
-    self._log_path_base = getattr(logging, 'log_path', '/tmp/logs')
-    self._log_path = os.path.join(self._log_path_base,
+    _log_path_base = utils.abs_path(getattr(logging, 'log_path', '/tmp/logs'))
+    self._log_path = os.path.join(_log_path_base,
                                   'AndroidDevice%s' % self._normalized_serial)
     self._debug_tag = self._serial
     self.log = AndroidDeviceLoggerAdapter(logging.getLogger(),
@@ -621,6 +623,8 @@
   def log_path(self):
     """A string that is the path for all logs collected from this device.
     """
+    if not os.path.exists(self._log_path):
+      utils.create_dir(self._log_path)
     return self._log_path
 
   @log_path.setter
@@ -861,10 +865,11 @@
       # as emulators in addition to other things, so only return True on
       # an exact match.
       return True
-    elif self.build_info['hardware'] in ['ranchu', 'goldfish']:
+    elif self.build_info['hardware'] in ['ranchu', 'goldfish', 'cutf_cvm']:
       # Ranchu and Goldfish are the hardware properties that the AOSP
       # emulators report, so if the device says it's an AOSP emulator, it
-      # probably is one.
+      # probably is one. Cuttlefish emulators report 'cutf_cvm` as the
+      # hardware property.
       return True
     else:
       return False
diff --git a/mobly/controllers/android_device_lib/adb.py b/mobly/controllers/android_device_lib/adb.py
index 84051cd..8b55b65 100644
--- a/mobly/controllers/android_device_lib/adb.py
+++ b/mobly/controllers/android_device_lib/adb.py
@@ -29,14 +29,15 @@
 
 # Number of attempts to execute "adb root", and seconds for interval time of
 # this commands.
-ADB_ROOT_RETRY_ATTMEPTS = 3
+ADB_ROOT_RETRY_ATTEMPTS = 3
 ADB_ROOT_RETRY_ATTEMPT_INTERVAL_SEC = 10
 
 # Qualified class name of the default instrumentation test runner.
 DEFAULT_INSTRUMENTATION_RUNNER = 'com.android.common.support.test.runner.AndroidJUnitRunner'
 
-# Adb getprop call should never take too long.
-DEFAULT_GETPROP_TIMEOUT_SEC = 5
+# `adb shell getprop` can take surprisingly long, when the device is a
+# networked virtual device.
+DEFAULT_GETPROP_TIMEOUT_SEC = 10
 DEFAULT_GETPROPS_ATTEMPTS = 3
 DEFAULT_GETPROPS_RETRY_SLEEP_SEC = 1
 
@@ -290,7 +291,8 @@
     out = self._exec_cmd(adb_cmd, shell=shell, timeout=timeout, stderr=stderr)
     return out
 
-  def _execute_adb_and_process_stdout(self, name, args, shell, handler) -> bytes:
+  def _execute_adb_and_process_stdout(self, name, args, shell,
+                                      handler) -> bytes:
     adb_cmd = self._construct_adb_cmd(name, args, shell=shell)
     err = self._execute_and_process_stdout(adb_cmd,
                                            shell=shell,
@@ -367,21 +369,22 @@
                      ret_code=0)
     return stdout
 
-  def getprop(self, prop_name):
+  def getprop(self, prop_name, timeout=DEFAULT_GETPROP_TIMEOUT_SEC):
     """Get a property of the device.
 
     This is a convenience wrapper for `adb shell getprop xxx`.
 
     Args:
       prop_name: A string that is the name of the property to get.
+      timeout: float, the number of seconds to wait before timing out.
+          If not specified, the DEFAULT_GETPROP_TIMEOUT_SEC is used.
 
     Returns:
       A string that is the value of the property, or None if the property
       doesn't exist.
     """
-    return self.shell(
-        ['getprop', prop_name],
-        timeout=DEFAULT_GETPROP_TIMEOUT_SEC).decode('utf-8').strip()
+    return self.shell(['getprop', prop_name],
+                      timeout=timeout).decode('utf-8').strip()
 
   def getprops(self, prop_names):
     """Get multiple properties of the device.
@@ -439,7 +442,11 @@
                                 timeout=None,
                                 stderr=None)
 
-  def instrument(self, package, options=None, runner=None, handler=None) -> bytes:
+  def instrument(self,
+                 package,
+                 options=None,
+                 runner=None,
+                 handler=None) -> bytes:
     """Runs an instrumentation command on the device.
 
     This is a convenience wrapper to avoid parameter formatting.
@@ -510,7 +517,8 @@
     Raises:
       AdbError: If the command exit code is not 0.
     """
-    for attempt in range(ADB_ROOT_RETRY_ATTMEPTS):
+    retry_interval = ADB_ROOT_RETRY_ATTEMPT_INTERVAL_SEC
+    for attempt in range(ADB_ROOT_RETRY_ATTEMPTS):
       try:
         return self._exec_adb_cmd('root',
                                   args=None,
@@ -518,12 +526,13 @@
                                   timeout=None,
                                   stderr=None)
       except AdbError as e:
-        if attempt + 1 < ADB_ROOT_RETRY_ATTMEPTS:
+        if attempt + 1 < ADB_ROOT_RETRY_ATTEMPTS:
           logging.debug('Retry the command "%s" since Error "%s" occurred.' %
                         (utils.cli_cmd_to_string(
                             e.cmd), e.stderr.decode('utf-8').strip()))
           # Buffer between "adb root" commands.
-          time.sleep(ADB_ROOT_RETRY_ATTEMPT_INTERVAL_SEC)
+          time.sleep(retry_interval)
+          retry_interval *= 2
         else:
           raise e
 
diff --git a/mobly/controllers/android_device_lib/callback_handler.py b/mobly/controllers/android_device_lib/callback_handler.py
index 6625cd6..c783b71 100644
--- a/mobly/controllers/android_device_lib/callback_handler.py
+++ b/mobly/controllers/android_device_lib/callback_handler.py
@@ -12,10 +12,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
 import time
 
-from mobly.controllers.android_device_lib import errors
 from mobly.controllers.android_device_lib import snippet_event
+from mobly.snippet import errors
+
+logging.warning(
+    'The module mobly.controllers.android_device_lib.callback_handler is '
+    'deprecated and will be removed in a future version. Use module '
+    'mobly.controllers.android_device_lib.callback_handler_v2 instead.')
 
 # The max timeout cannot be larger than the max time the socket waits for a
 # response message. Otherwise, the socket would timeout before the Rpc call
@@ -23,18 +29,18 @@
 MAX_TIMEOUT = 60 * 10
 DEFAULT_TIMEOUT = 120  # two minutes
 
-
-class Error(errors.DeviceError):
-  pass
-
-
-class TimeoutError(Error):
-  pass
+# Aliases of error types for backward compatibility.
+Error = errors.CallbackHandlerBaseError
+TimeoutError = errors.CallbackHandlerTimeoutError
 
 
 class CallbackHandler:
   """The class used to handle a specific group of callback events.
 
+  DEPRECATED: Use
+  mobly.controllers.android_device_lib.callback_handler_v2.CallbackHandlerV2
+  instead.
+
   All the events handled by a CallbackHandler are originally triggered by one
   async Rpc call. All the events are tagged with a callback_id specific to a
   call to an AsyncRpc method defined on the server side.
diff --git a/mobly/controllers/android_device_lib/callback_handler_v2.py b/mobly/controllers/android_device_lib/callback_handler_v2.py
new file mode 100644
index 0000000..5675f7a
--- /dev/null
+++ b/mobly/controllers/android_device_lib/callback_handler_v2.py
@@ -0,0 +1,67 @@
+# Copyright 2022 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""The callback handler V2 module for Android Mobly Snippet Lib."""
+
+from mobly.snippet import callback_handler_base
+from mobly.snippet import errors
+
+# The timeout error meesage when pulling events from the server
+TIMEOUT_ERROR_MESSAGE = 'EventSnippetException: timeout.'
+
+
+class CallbackHandlerV2(callback_handler_base.CallbackHandlerBase):
+  """The callback handler V2 class for Android Mobly Snippet Lib."""
+
+  def callEventWaitAndGetRpc(self, callback_id, event_name, timeout_sec):
+    """Waits and returns an existing CallbackEvent for the specified identifier.
+
+    This function calls snippet lib's eventWaitAndGet RPC.
+
+    Args:
+      callback_id: str, the callback identifier.
+      event_name: str, the callback name.
+      timeout_sec: float, the number of seconds to wait for the event.
+
+    Returns:
+      The event dictionary.
+
+    Raises:
+      errors.CallbackHandlerTimeoutError: The expected event does not occur
+        within the time limit.
+    """
+    timeout_ms = int(timeout_sec * 1000)
+    try:
+      return self._event_client.eventWaitAndGet(callback_id, event_name,
+                                                timeout_ms)
+    except Exception as e:
+      if TIMEOUT_ERROR_MESSAGE in str(e):
+        raise errors.CallbackHandlerTimeoutError(
+            self._device, (f'Timed out after waiting {timeout_sec}s for event '
+                           f'"{event_name}" triggered by {self._method_name} '
+                           f'({self.callback_id}).')) from e
+      raise
+
+  def callEventGetAllRpc(self, callback_id, event_name):
+    """Gets all existing events for the specified identifier without waiting.
+
+    This function calls snippet lib's eventGetAll RPC.
+
+    Args:
+      callback_id: str, the callback identifier.
+      event_name: str, the callback name.
+
+    Returns:
+      A list of event dictionaries.
+    """
+    return self._event_client.eventGetAll(callback_id, event_name)
diff --git a/mobly/controllers/android_device_lib/fastboot.py b/mobly/controllers/android_device_lib/fastboot.py
index 754a039..1ef4969 100644
--- a/mobly/controllers/android_device_lib/fastboot.py
+++ b/mobly/controllers/android_device_lib/fastboot.py
@@ -12,8 +12,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
 from subprocess import Popen, PIPE
 
+from mobly import utils
+
 
 def exe_cmd(*cmds):
   """Executes commands in a new shell. Directing stderr to PIPE.
@@ -33,6 +36,9 @@
   cmd = ' '.join(cmds)
   proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
   (out, err) = proc.communicate()
+  ret = proc.returncode
+  logging.debug('cmd: %s, stdout: %s, stderr: %s, ret: %s',
+                utils.cli_cmd_to_string(cmds), out, err, ret)
   if not err:
     return out
   return err
diff --git a/mobly/controllers/android_device_lib/jsonrpc_client_base.py b/mobly/controllers/android_device_lib/jsonrpc_client_base.py
index 39266dc..7f27fa5 100644
--- a/mobly/controllers/android_device_lib/jsonrpc_client_base.py
+++ b/mobly/controllers/android_device_lib/jsonrpc_client_base.py
@@ -57,7 +57,7 @@
 import threading
 
 from mobly.controllers.android_device_lib import callback_handler
-from mobly.controllers.android_device_lib import errors
+from mobly.snippet import errors
 
 # UID of the 'unknown' jsonrpc session. Will cause creation of a new session.
 UNKNOWN_UID = -1
@@ -72,29 +72,12 @@
 # off.
 _MAX_RPC_RESP_LOGGING_LENGTH = 1024
 
-
-class Error(errors.DeviceError):
-  pass
-
-
-class AppStartError(Error):
-  """Raised when the app is not able to be started."""
-
-
-class AppRestoreConnectionError(Error):
-  """Raised when failed to restore app from disconnection."""
-
-
-class ApiError(Error):
-  """Raised when remote API reports an error."""
-
-
-class ProtocolError(Error):
-  """Raised when there is some error in exchanging data with server."""
-  NO_RESPONSE_FROM_HANDSHAKE = 'No response from handshake.'
-  NO_RESPONSE_FROM_SERVER = ('No response from server. '
-                             'Check the device logcat for crashes.')
-  MISMATCHED_API_ID = 'RPC request-response ID mismatch.'
+# Aliases of error types for backward compatibility.
+Error = errors.Error
+AppStartError = errors.ServerStartError
+AppRestoreConnectionError = errors.ServerRestoreConnectionError
+ApiError = errors.ApiError
+ProtocolError = errors.ProtocolError
 
 
 class JsonRpcCommand:
@@ -253,13 +236,17 @@
     `SnippetClient.restore_app_connection`.
     """
     try:
-      if self._conn:
-        self._conn.close()
-        self._conn = None
+      self.close_socket_connection()
     finally:
       # Always clear the host port as part of the disconnect step.
       self.clear_host_port()
 
+  def close_socket_connection(self):
+    """Closes the socket connection to the server."""
+    if self._conn:
+      self._conn.close()
+      self._conn = None
+
   def clear_host_port(self):
     """Stops the adb port forwarding of the host port used by this client.
     """
diff --git a/mobly/controllers/android_device_lib/services/snippet_management_service.py b/mobly/controllers/android_device_lib/services/snippet_management_service.py
index 1cd0587..fae60e2 100644
--- a/mobly/controllers/android_device_lib/services/snippet_management_service.py
+++ b/mobly/controllers/android_device_lib/services/snippet_management_service.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 """Module for the snippet management service."""
 from mobly.controllers.android_device_lib import errors
-from mobly.controllers.android_device_lib import snippet_client
+from mobly.controllers.android_device_lib import snippet_client_v2
 from mobly.controllers.android_device_lib.services import base_service
 
 MISSING_SNIPPET_CLIENT_MSG = 'No snippet client is registered with name "%s".'
@@ -78,8 +78,9 @@
         raise Error(
             self, 'Snippet package "%s" has already been loaded under name'
             ' "%s".' % (package, snippet_name))
-    client = snippet_client.SnippetClient(package=package, ad=self._device)
-    client.start_app_and_connect()
+
+    client = snippet_client_v2.SnippetClientV2(package=package, ad=self._device)
+    client.initialize()
     self._snippet_clients[name] = client
 
   def remove_snippet_client(self, name):
@@ -94,14 +95,14 @@
     if name not in self._snippet_clients:
       raise Error(self._device, MISSING_SNIPPET_CLIENT_MSG % name)
     client = self._snippet_clients.pop(name)
-    client.stop_app()
+    client.stop()
 
   def start(self):
     """Starts all the snippet clients under management."""
     for client in self._snippet_clients.values():
       if not client.is_alive:
         self._device.log.debug('Starting SnippetClient<%s>.', client.package)
-        client.start_app_and_connect()
+        client.initialize()
       else:
         self._device.log.debug(
             'Not startng SnippetClient<%s> because it is already alive.',
@@ -112,7 +113,7 @@
     for client in self._snippet_clients.values():
       if client.is_alive:
         self._device.log.debug('Stopping SnippetClient<%s>.', client.package)
-        client.stop_app()
+        client.stop()
       else:
         self._device.log.debug(
             'Not stopping SnippetClient<%s> because it is not alive.',
@@ -126,14 +127,14 @@
     """
     for client in self._snippet_clients.values():
       self._device.log.debug('Pausing SnippetClient<%s>.', client.package)
-      client.disconnect()
+      client.close_connection()
 
   def resume(self):
     """Resumes all paused snippet clients."""
     for client in self._snippet_clients.values():
       if not client.is_alive:
         self._device.log.debug('Resuming SnippetClient<%s>.', client.package)
-        client.restore_app_connection()
+        client.restore_server_connection()
       else:
         self._device.log.debug('Not resuming SnippetClient<%s>.',
                                client.package)
diff --git a/mobly/controllers/android_device_lib/snippet_client.py b/mobly/controllers/android_device_lib/snippet_client.py
index a15b52c..42ae122 100644
--- a/mobly/controllers/android_device_lib/snippet_client.py
+++ b/mobly/controllers/android_device_lib/snippet_client.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 """JSON RPC interface to Mobly Snippet Lib."""
 
+import logging
 import re
 import time
 
@@ -20,6 +21,12 @@
 from mobly.controllers.android_device_lib import adb
 from mobly.controllers.android_device_lib import errors
 from mobly.controllers.android_device_lib import jsonrpc_client_base
+from mobly.snippet import errors as snippet_errors
+
+logging.warning('The module mobly.controllers.android_device_lib.snippet_client'
+                ' is deprecated and will be removed in a future version. Use'
+                ' module mobly.controllers.android_device_lib.snippet_client_v2'
+                ' instead.')
 
 _INSTRUMENTATION_RUNNER_PACKAGE = (
     'com.google.android.mobly.snippet.SnippetRunner')
@@ -56,18 +63,18 @@
 
 _NOHUP_COMMAND = 'nohup'
 
-
-class AppStartPreCheckError(jsonrpc_client_base.Error):
-  """Raised when pre checks for the snippet failed."""
-
-
-class ProtocolVersionError(jsonrpc_client_base.AppStartError):
-  """Raised when the protocol reported by the snippet is unknown."""
+# Aliases of error types for backward compatibility.
+AppStartPreCheckError = snippet_errors.ServerStartPreCheckError
+ProtocolVersionError = snippet_errors.ServerStartProtocolError
 
 
 class SnippetClient(jsonrpc_client_base.JsonRpcClientBase):
   """A client for interacting with snippet APKs using Mobly Snippet Lib.
 
+  DEPRECATED: Use
+  mobly.controllers.android_device_lib.snippet_client_v2.SnippetClientV2
+  instead.
+
   See superclass documentation for a list of public attributes.
 
   For a description of the launch protocols, see the documentation in
@@ -251,6 +258,8 @@
       raise errors.DeviceError(
           self._ad, 'Failed to stop existing apk. Unexpected output: %s' % out)
 
+    self._stop_event_client()
+
   def _start_event_client(self):
     """Overrides superclass."""
     event_client = SnippetClient(package=self.package, ad=self._ad)
@@ -259,6 +268,17 @@
     event_client.connect(self.uid, jsonrpc_client_base.JsonRpcCommand.CONTINUE)
     return event_client
 
+  def _stop_event_client(self):
+    """Releases all the resources acquired in `_start_event_client`."""
+    if self._event_client:
+      self._event_client.close_socket_connection()
+      # Without cleaning host_port of event_client, the event client will try to
+      # stop the port forwarding when deconstructed, which should only be
+      # stopped by the corresponding snippet client.
+      self._event_client.host_port = None
+      self._event_client.device_port = None
+      self._event_client = None
+
   def _restore_event_client(self):
     """Restores previously created event client."""
     if not self._event_client:
diff --git a/mobly/controllers/android_device_lib/snippet_client_v2.py b/mobly/controllers/android_device_lib/snippet_client_v2.py
index be3b98a..3adfde5 100644
--- a/mobly/controllers/android_device_lib/snippet_client_v2.py
+++ b/mobly/controllers/android_device_lib/snippet_client_v2.py
@@ -13,10 +13,14 @@
 # limitations under the License.
 """Snippet Client V2 for Interacting with Snippet Server on Android Device."""
 
+import enum
+import json
 import re
+import socket
 
 from mobly import utils
 from mobly.controllers.android_device_lib import adb
+from mobly.controllers.android_device_lib import callback_handler_v2
 from mobly.controllers.android_device_lib import errors as android_device_lib_errors
 from mobly.snippet import client_base
 from mobly.snippet import errors
@@ -58,15 +62,51 @@
 
 _NOHUP_COMMAND = 'nohup'
 
+# UID of the 'unknown' JSON RPC session. Will cause creation of a new session
+# in the snippet server.
+UNKNOWN_UID = -1
+
+# Maximum time to wait for the socket to open on the device.
+_SOCKET_CONNECTION_TIMEOUT = 60
+
+# Maximum time to wait for a response message on the socket.
+_SOCKET_READ_TIMEOUT = 60 * 10
+
+# The default timeout for callback handlers returned by this client
+_CALLBACK_DEFAULT_TIMEOUT_SEC = 60 * 2
+
+
+class ConnectionHandshakeCommand(enum.Enum):
+  """Commands to send to the server when sending the handshake request.
+
+  After creating the socket connection, the client must send a handshake request
+  to the server. When receiving the handshake request, the server will prepare
+  to communicate with the client. According to the command in the request,
+  the server will create a new session or reuse the current session.
+
+  INIT: Initiates a new session and makes a connection with this session.
+  CONTINUE: Makes a connection with the current session.
+  """
+  INIT = 'initiate'
+  CONTINUE = 'continue'
+
 
 class SnippetClientV2(client_base.ClientBase):
   """Snippet client V2 for interacting with snippet server on Android Device.
 
-  See base class documentation for a list of public attributes and communication
-  protocols.
-
   For a description of the launch protocols, see the documentation in
   mobly-snippet-lib, SnippetRunner.java.
+
+  We only list the public attributes introduced in this class. See base class
+  documentation for other public attributes and communication protocols.
+
+  Attributes:
+    host_port: int, the host port used for communicating with the snippet
+      server.
+    device_port: int, the device port listened by the snippet server.
+    uid: int, the uid of the server session with which this client communicates.
+      Default is `UNKNOWN_UID` and it will be set to a positive number after
+      the connection to the server is made successfully.
   """
 
   def __init__(self, package, ad):
@@ -77,9 +117,15 @@
       ad: AndroidDevice, the android device object associated with this client.
     """
     super().__init__(package=package, device=ad)
+    self.host_port = None
+    self.device_port = None
+    self.uid = UNKNOWN_UID
     self._adb = ad.adb
     self._user_id = None
     self._proc = None
+    self._client = None  # keep it to prevent close errors on connect failure
+    self._conn = None
+    self._event_client = None
 
   @property
   def user_id(self):
@@ -103,6 +149,11 @@
       self._user_id = self._adb.current_user_id
     return self._user_id
 
+  @property
+  def is_alive(self):
+    """Does the client have an active connection to the snippet server."""
+    return self._conn is not None
+
   def before_starting_server(self):
     """Performs the preparation steps before starting the remote server.
 
@@ -269,24 +320,282 @@
 
       self.log.debug('Discarded line from instrumentation output: "%s"', line)
 
+  def make_connection(self):
+    """Makes a connection to the snippet server on the remote device.
+
+    This function makes a persistent connection to the server. This connection
+    will be used for all the RPCs, and must be closed when deconstructing.
+
+    To connect to the Android device, it first forwards the device port to a
+    host port. Then, it creates a socket connection to the server on the device.
+    Finally, it sends a handshake request to the server, which requests the
+    server to prepare for the communication with the client.
+
+    This function uses self.host_port for communicating with the server. If
+    self.host_port is 0 or None, this function finds an available host port to
+    make the connection and set self.host_port to the found port.
+    """
+    self._forward_device_port()
+    self.create_socket_connection()
+    self.send_handshake_request()
+
+  def _forward_device_port(self):
+    """Forwards the device port to a host port."""
+    if not self.host_port:
+      self.host_port = utils.get_available_host_port()
+    self._adb.forward([f'tcp:{self.host_port}', f'tcp:{self.device_port}'])
+
+  def create_socket_connection(self):
+    """Creates a socket connection to the server.
+
+    After creating the connection successfully, it sets two attributes:
+    * `self._conn`: the created socket object, which will be used when it needs
+      to close the connection.
+    * `self._client`: the socket file, which will be used to send and receive
+      messages.
+
+    This function only creates a socket connection without sending any message
+    to the server.
+    """
+    try:
+      self.log.debug(
+          'Snippet client is creating socket connection to the snippet server '
+          'of %s through host port %d.', self.package, self.host_port)
+      self._conn = socket.create_connection(('localhost', self.host_port),
+                                            _SOCKET_CONNECTION_TIMEOUT)
+    except ConnectionRefusedError as err:
+      # Retry using '127.0.0.1' for IPv4 enabled machines that only resolve
+      # 'localhost' to '[::1]'.
+      self.log.debug('Failed to connect to localhost, trying 127.0.0.1: %s',
+                     str(err))
+      self._conn = socket.create_connection(('127.0.0.1', self.host_port),
+                                            _SOCKET_CONNECTION_TIMEOUT)
+
+    self._conn.settimeout(_SOCKET_READ_TIMEOUT)
+    self._client = self._conn.makefile(mode='brw')
+
+  def send_handshake_request(self,
+                             uid=UNKNOWN_UID,
+                             cmd=ConnectionHandshakeCommand.INIT):
+    """Sends a handshake request to the server to prepare for the communication.
+
+    Through the handshake response, this function checks whether the server
+    is ready for the communication. If ready, it sets `self.uid` to the
+    server session id. Otherwise, it sets `self.uid` to `UNKNOWN_UID`.
+
+    Args:
+      uid: int, the uid of the server session to continue. It will be ignored
+        if the `cmd` requires the server to create a new session.
+      cmd: ConnectionHandshakeCommand, the handshake command Enum for the
+        server, which requires the server to create a new session or use the
+        current session.
+
+    Raises:
+      errors.ProtocolError: something went wrong when sending the handshake
+        request.
+    """
+    request = json.dumps({'cmd': cmd.value, 'uid': uid})
+    self.log.debug('Sending handshake request %s.', request)
+    self._client_send(request)
+    response = self._client_receive()
+
+    if not response:
+      raise errors.ProtocolError(
+          self._device, errors.ProtocolError.NO_RESPONSE_FROM_HANDSHAKE)
+
+    response = self._decode_socket_response_bytes(response)
+
+    result = json.loads(response)
+    if result['status']:
+      self.uid = result['uid']
+    else:
+      self.uid = UNKNOWN_UID
+
+  def check_server_proc_running(self):
+    """See base class.
+
+    This client does nothing at this stage.
+    """
+
+  def send_rpc_request(self, request):
+    """Sends an RPC request to the server and receives a response.
+
+    Args:
+      request: str, the request to send the server.
+
+    Returns:
+      The string of the RPC response.
+
+    Raises:
+      errors.Error: if failed to send the request or receive a response.
+      errors.ProtocolError: if received an empty response from the server.
+      UnicodeError: if failed to decode the received response.
+    """
+    self._client_send(request)
+    response = self._client_receive()
+    if not response:
+      raise errors.ProtocolError(self._device,
+                                 errors.ProtocolError.NO_RESPONSE_FROM_SERVER)
+    return self._decode_socket_response_bytes(response)
+
+  def _client_send(self, message):
+    """Sends an RPC message through the connection.
+
+    Args:
+      message: str, the message to send.
+
+    Raises:
+      errors.Error: if a socket error occurred during the send.
+    """
+    try:
+      self._client.write(f'{message}\n'.encode('utf8'))
+      self._client.flush()
+    except socket.error as e:
+      raise errors.Error(
+          self._device,
+          f'Encountered socket error "{e}" sending RPC message "{message}"'
+      ) from e
+
+  def _client_receive(self):
+    """Receives the server's response of an RPC message.
+
+    Returns:
+      Raw bytes of the response.
+
+    Raises:
+      errors.Error: if a socket error occurred during the read.
+    """
+    try:
+      return self._client.readline()
+    except socket.error as e:
+      raise errors.Error(
+          self._device,
+          f'Encountered socket error "{e}" reading RPC response') from e
+
+  def _decode_socket_response_bytes(self, response):
+    """Returns a string decoded from the socket response bytes.
+
+    Args:
+      response: bytes, the response to be decoded.
+
+    Returns:
+      The string decoded from the given bytes.
+
+    Raises:
+      UnicodeError: if failed to decode the given bytes using encoding utf8.
+    """
+    try:
+      return str(response, encoding='utf8')
+    except UnicodeError:
+      self.log.error(
+          'Failed to decode socket response bytes using encoding '
+          'utf8: %s', response)
+      raise
+
+  def handle_callback(self, callback_id, ret_value, rpc_func_name):
+    """Creates the callback handler object.
+
+    If the client doesn't have an event client, it will start an event client
+    before creating a callback handler.
+
+    Args:
+      callback_id: see base class.
+      ret_value: see base class.
+      rpc_func_name: see base class.
+
+    Returns:
+      The callback handler object.
+    """
+    if self._event_client is None:
+      self._create_event_client()
+    return callback_handler_v2.CallbackHandlerV2(
+        callback_id=callback_id,
+        event_client=self._event_client,
+        ret_value=ret_value,
+        method_name=rpc_func_name,
+        device=self._device,
+        rpc_max_timeout_sec=_SOCKET_READ_TIMEOUT,
+        default_timeout_sec=_CALLBACK_DEFAULT_TIMEOUT_SEC)
+
+  def _create_event_client(self):
+    """Creates a separate client to the same session for propagating events.
+
+    As the server is already started by the snippet server on which this
+    function is called, the created event client connects to the same session
+    as the snippet server. It also reuses the same host port and device port.
+    """
+    self._event_client = SnippetClientV2(package=self.package, ad=self._device)
+    self._event_client.make_connection_with_forwarded_port(
+        self.host_port, self.device_port, self.uid,
+        ConnectionHandshakeCommand.CONTINUE)
+
+  def make_connection_with_forwarded_port(self,
+                                          host_port,
+                                          device_port,
+                                          uid=UNKNOWN_UID,
+                                          cmd=ConnectionHandshakeCommand.INIT):
+    """Makes a connection to the server with the given forwarded port.
+
+    This process assumes that a device port has already been forwarded to a
+    host port, and it only makes a connection to the snippet server based on
+    the forwarded port. This is typically used by clients that share the same
+    snippet server, e.g. the snippet client and its event client.
+
+    Args:
+      host_port: int, the host port which has already been forwarded.
+      device_port: int, the device port listened by the snippet server.
+      uid: int, the uid of the server session to continue. It will be ignored
+        if the `cmd` requires the server to create a new session.
+      cmd: ConnectionHandshakeCommand, the handshake command Enum for the
+        server, which requires the server to create a new session or use the
+        current session.
+    """
+    self.host_port = host_port
+    self.device_port = device_port
+    self._counter = self._id_counter()
+    self.create_socket_connection()
+    self.send_handshake_request(uid, cmd)
+
   def stop(self):
     """Releases all the resources acquired in `initialize`.
 
     This function releases following resources:
+    * Close the socket connection.
+    * Stop forwarding the device port to host.
     * Stop the standing server subprocess running on the host side.
     * Stop the snippet server running on the device side.
+    * Stop the event client and set `self._event_client` to None.
 
     Raises:
       android_device_lib_errors.DeviceError: if the server exited with errors on
         the device side.
     """
-    # TODO(mhaoli): This function is only partially implemented because we
-    # have not implemented the functionality of making connections in this
-    # class.
     self.log.debug('Stopping snippet package %s.', self.package)
+    self.close_connection()
     self._stop_server()
+    self._destroy_event_client()
     self.log.debug('Snippet package %s stopped.', self.package)
 
+  def close_connection(self):
+    """Closes the connection to the snippet server on the device.
+
+    This function closes the socket connection and stops forwarding the device
+    port to host.
+    """
+    try:
+      if self._conn:
+        self._conn.close()
+        self._conn = None
+    finally:
+      # Always clear the host port as part of the close step
+      self._stop_port_forwarding()
+
+  def _stop_port_forwarding(self):
+    """Stops the adb port forwarding used by this client."""
+    if self.host_port:
+      self._device.adb.forward(['--remove', f'tcp:{self.host_port}'])
+      self.host_port = None
+
   def _stop_server(self):
     """Releases all the resources acquired in `start_server`.
 
@@ -315,27 +624,82 @@
           self._device,
           f'Failed to stop existing apk. Unexpected output: {out}.')
 
-  # TODO(mhaoli): Temporally override these abstract methods so that we can
-  # initialize the instances in unit tests. We are implementing these functions
-  # in the next PR as soon as possible.
-  def make_connection(self):
-    raise NotImplementedError('To be implemented.')
-
-  def close_connection(self):
-    raise NotImplementedError('To be implemented.')
-
-  def __del__(self):
-    # Override the destructor to not call close_connection for now.
-    pass
-
-  def send_rpc_request(self, request):
-    raise NotImplementedError('To be implemented.')
-
-  def check_server_proc_running(self):
-    raise NotImplementedError('To be implemented.')
-
-  def handle_callback(self, callback_id, ret_value, rpc_func_name):
-    raise NotImplementedError('To be implemented.')
+  def _destroy_event_client(self):
+    """Releases all the resources acquired in `_create_event_client`."""
+    if self._event_client:
+      # Without cleaning host_port of event_client first, the close_connection
+      # will try to stop the port forwarding, which should only be stopped by
+      # the corresponding snippet client.
+      self._event_client.host_port = None
+      self._event_client.device_port = None
+      self._event_client.close_connection()
+      self._event_client = None
 
   def restore_server_connection(self, port=None):
-    raise NotImplementedError('To be implemented.')
+    """Restores the server after the device got reconnected.
+
+    Instead of creating a new instance of the client:
+      - Uses the given port (or find a new available host port if none is
+      given).
+      - Tries to connect to the remote server with the selected port.
+
+    Args:
+      port: int, if given, this is the host port from which to connect to the
+        remote device port. If not provided, find a new available port as host
+        port.
+
+    Raises:
+      errors.ServerRestoreConnectionError: when failed to restore the connection
+        to the snippet server.
+    """
+    try:
+      # If self.host_port is None, self._make_connection finds a new available
+      # port.
+      self.host_port = port
+      self._make_connection()
+    except Exception as e:
+      # Log the original error and raise ServerRestoreConnectionError.
+      self.log.error('Failed to re-connect to the server.')
+      raise errors.ServerRestoreConnectionError(
+          self._device,
+          (f'Failed to restore server connection for {self.package} at '
+           f'host port {self.host_port}, device port {self.device_port}.'
+          )) from e
+
+    # Because the previous connection was lost, update self._proc
+    self._proc = None
+    self._restore_event_client()
+
+  def _restore_event_client(self):
+    """Restores the previously created event client or creates a new one.
+
+    This function restores the connection of the previously created event
+    client, or creates a new client and makes a connection if it didn't
+    exist before.
+
+    The event client to restore reuses the same host port and device port
+    with the client on which function is called.
+    """
+    if self._event_client:
+      self._event_client.make_connection_with_forwarded_port(
+          self.host_port, self.device_port)
+
+  def help(self, print_output=True):
+    """Calls the help RPC, which returns the list of RPC calls available.
+
+    This RPC should normally be used in an interactive console environment
+    where the output should be printed instead of returned. Otherwise,
+    newlines will be escaped, which will make the output difficult to read.
+
+    Args:
+      print_output: bool, for whether the output should be printed.
+
+    Returns:
+      A string containing the help output otherwise None if `print_output`
+        wasn't set.
+    """
+    help_text = self._rpc('help')
+    if print_output:
+      print(help_text)
+    else:
+      return help_text
diff --git a/mobly/controllers/android_device_lib/snippet_event.py b/mobly/controllers/android_device_lib/snippet_event.py
index 88f8ca1..7ef75c1 100644
--- a/mobly/controllers/android_device_lib/snippet_event.py
+++ b/mobly/controllers/android_device_lib/snippet_event.py
@@ -11,11 +11,18 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+
+logging.warning('The module mobly.controllers.android_device_lib.snippet_event '
+                'is deprecated and will be removed in a future version. Use '
+                'module mobly.snippet.callback_event instead.')
 
 
 def from_dict(event_dict):
   """Create a SnippetEvent object from a dictionary.
 
+  DEPRECATED: Use mobly.snippet.callback_event.from_dict instead.
+
   Args:
     event_dict: a dictionary representing an event.
 
@@ -31,6 +38,8 @@
 class SnippetEvent:
   """The class that represents callback events for mobly snippet library.
 
+  DEPRECATED: Use mobly.snippet.callback_event.CallbackEvent instead.
+
   Attributes:
     callback_id: string, the callback ID associated with the event.
     name: string, the name of the event.
diff --git a/mobly/logger.py b/mobly/logger.py
index 200e299..ea1de12 100644
--- a/mobly/logger.py
+++ b/mobly/logger.py
@@ -17,6 +17,7 @@
 import os
 import re
 import sys
+from typing import Any, MutableMapping, Tuple
 
 from mobly import records
 from mobly import utils
@@ -168,7 +169,7 @@
   return _get_timestamp('%m-%d-%Y_%H-%M-%S-%f', delta)
 
 
-def _setup_test_logger(log_path, prefix=None):
+def _setup_test_logger(log_path, console_level, prefix=None):
   """Customizes the root logger for a test run.
 
   The logger object has a stream handler and a file handler. The stream
@@ -177,6 +178,9 @@
 
   Args:
     log_path: Location of the log file.
+    console_level: Log level threshold used for log messages printed
+      to the console. Logs with a level less severe than
+      console_level will not be printed to the console.
     prefix: A prefix for each log line in terminal.
     filename: Name of the log file. The default is the time the logger
       is requested.
@@ -192,7 +196,7 @@
   c_formatter = logging.Formatter(terminal_format, log_line_time_format)
   ch = logging.StreamHandler(sys.stdout)
   ch.setFormatter(c_formatter)
-  ch.setLevel(logging.INFO)
+  ch.setLevel(console_level)
   # Log everything to file
   f_formatter = logging.Formatter(log_line_format, log_line_time_format)
   # Write logger output to files
@@ -237,7 +241,10 @@
   utils.create_alias(actual_path, alias_path)
 
 
-def setup_test_logger(log_path, prefix=None, alias='latest'):
+def setup_test_logger(log_path,
+                      prefix=None,
+                      alias='latest',
+                      console_level=logging.INFO):
   """Customizes the root logger for a test run.
 
   In addition to configuring the Mobly logging handlers, this also sets two
@@ -256,9 +263,12 @@
       will not be created, which is useful to save storage space when the
       storage system (e.g. ZIP files) does not properly support
       shortcut/symlinks.
+    console_level: optional logging level, log level threshold used for log
+      messages printed to the console. Logs with a level less severe than
+      console_level will not be printed to the console.
   """
   utils.create_dir(log_path)
-  _setup_test_logger(log_path, prefix)
+  _setup_test_logger(log_path, console_level, prefix)
   logging.debug('Test output folder: "%s"', log_path)
   if alias:
     create_latest_log_alias(log_path, alias=alias)
@@ -364,3 +374,58 @@
     special characters.
   """
   return sanitize_filename(log_line_timestamp)
+
+
+class PrefixLoggerAdapter(logging.LoggerAdapter):
+  """A wrapper that adds a prefix to each log line.
+
+  This logger adapter class is like a decorator to Logger. It takes one
+  Logger-like object and returns a new Logger-like object. The new Logger-like
+  object will print logs with a custom prefix added. Creating new Logger-like
+  objects doesn't modify the behavior of the old Logger-like object.
+
+  Chaining multiple logger adapters is also supported. The multiple adapters
+  will take effect in the order in which they are chained, i.e. the log will be
+  '<prefix1> <prefix2> <prefix3> <message>' if we chain 3 PrefixLoggerAdapters.
+
+  Example Usage:
+
+  .. code-block:: python
+
+    logger = PrefixLoggerAdapter(logging.getLogger(), {
+      'log_prefix': <custom prefix>
+    })
+
+  Then each log line added by the logger will have a prefix:
+  '<custom prefix> <message>'.
+  """
+
+  _KWARGS_TYPE = MutableMapping[str, Any]
+  _PROCESS_RETURN_TYPE = Tuple[str, _KWARGS_TYPE]
+
+  # The key of log_preifx item in the dict self.extra
+  EXTRA_KEY_LOG_PREFIX: str = 'log_prefix'
+
+  extra: _KWARGS_TYPE
+
+  def process(self, msg: str, kwargs: _KWARGS_TYPE) -> _PROCESS_RETURN_TYPE:
+    """Processes the logging call to insert contextual information.
+
+    Args:
+      msg: The logging message.
+      kwargs: Keyword arguments passed in to a logging call.
+
+    Returns:
+      The message and kwargs modified.
+    """
+    new_msg = f'{self.extra[PrefixLoggerAdapter.EXTRA_KEY_LOG_PREFIX]} {msg}'
+    return (new_msg, kwargs)
+
+  def set_log_prefix(self, prefix: str) -> None:
+    """Sets the log prefix to the given string.
+
+    Args:
+      prefix: The new log prefix.
+    """
+    self.debug('Setting the log prefix to "%s".', prefix)
+    self.extra[PrefixLoggerAdapter.EXTRA_KEY_LOG_PREFIX] = prefix
diff --git a/mobly/records.py b/mobly/records.py
index f4ec4d5..b77817c 100644
--- a/mobly/records.py
+++ b/mobly/records.py
@@ -156,6 +156,7 @@
         yaml.safe_dump(new_content,
                        f,
                        explicit_start=True,
+                       explicit_end=True,
                        allow_unicode=True,
                        indent=4)
 
@@ -176,6 +177,7 @@
   RECORD_EXTRAS = 'Extras'
   RECORD_EXTRA_ERRORS = 'Extra Errors'
   RECORD_DETAILS = 'Details'
+  RECORD_TERMINATION_SIGNAL_TYPE = 'Termination Signal Type'
   RECORD_STACKTRACE = 'Stacktrace'
   RECORD_SIGNATURE = 'Signature'
   RECORD_RETRY_PARENT = 'Retry Parent'
@@ -217,6 +219,7 @@
 
   Attributes:
     exception: Exception object, the original Exception.
+    type: string, type name of the exception object.
     stacktrace: string, stacktrace of the Exception.
     extras: optional serializable, this corresponds to the
       `TestSignal.extras` field.
@@ -226,6 +229,7 @@
 
   def __init__(self, e, position=None):
     self.exception = e
+    self.type = type(e).__name__
     self.stacktrace = None
     self.extras = None
     self.position = position
@@ -274,7 +278,7 @@
     """
     try:
       exception = copy.deepcopy(self.exception)
-    except TypeError:
+    except (TypeError, RecursionError):
       # If the exception object cannot be copied, use the original
       # exception object.
       exception = self.exception
@@ -343,6 +347,16 @@
       return self.termination_signal.details
 
   @property
+  def termination_signal_type(self):
+    """Type name of the signal that caused the test's termination.
+
+    Note a passed test can have this as well due to the explicit pass
+    signal. If the test passed implicitly, this field would be None.
+    """
+    if self.termination_signal:
+      return self.termination_signal.type
+
+  @property
   def stacktrace(self):
     """The stacktrace string for the exception that terminated the test.
     """
@@ -490,6 +504,8 @@
       RECORD_RETRY_PARENT] = self.retry_parent.signature if self.retry_parent else None
     d[TestResultEnums.RECORD_EXTRAS] = self.extras
     d[TestResultEnums.RECORD_DETAILS] = self.details
+    d[TestResultEnums.
+      RECORD_TERMINATION_SIGNAL_TYPE] = self.termination_signal_type
     d[TestResultEnums.RECORD_EXTRA_ERRORS] = {
         key: value.to_dict() for (key, value) in self.extra_errors.items()
     }
diff --git a/mobly/snippet/callback_event.py b/mobly/snippet/callback_event.py
new file mode 100644
index 0000000..55471c7
--- /dev/null
+++ b/mobly/snippet/callback_event.py
@@ -0,0 +1,52 @@
+# Copyright 2022 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""The class that represents callback events for Mobly Snippet Lib."""
+
+
+def from_dict(event_dict):
+  """Creates a CallbackEvent object from a dictionary.
+
+  Args:
+    event_dict: dict, a dictionary representing an event.
+
+  Returns:
+    A CallbackEvent object.
+  """
+  return CallbackEvent(callback_id=event_dict['callbackId'],
+                       name=event_dict['name'],
+                       creation_time=event_dict['time'],
+                       data=event_dict['data'])
+
+
+class CallbackEvent:
+  """The class that represents callback events for Mobly Snippet Library.
+
+  Attributes:
+    callback_id: str, the callback ID associated with the event.
+    name: str, the name of the event.
+    creation_time: int, the epoch time when the event is created on the
+      RPC server side.
+    data: dict, the data held by the event. Can be None.
+  """
+
+  def __init__(self, callback_id, name, creation_time, data):
+    self.callback_id = callback_id
+    self.name = name
+    self.creation_time = creation_time
+    self.data = data
+
+  def __repr__(self):
+    return (
+        f'CallbackEvent(callback_id: {self.callback_id}, name: {self.name}, '
+        f'creation_time: {self.creation_time}, data: {self.data})')
diff --git a/mobly/snippet/callback_handler_base.py b/mobly/snippet/callback_handler_base.py
new file mode 100644
index 0000000..50465d1
--- /dev/null
+++ b/mobly/snippet/callback_handler_base.py
@@ -0,0 +1,240 @@
+# Copyright 2022 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Module for the base class to handle Mobly Snippet Lib's callback events."""
+import abc
+import time
+
+from mobly.snippet import callback_event
+from mobly.snippet import errors
+
+
+class CallbackHandlerBase(abc.ABC):
+  """Base class for handling Mobly Snippet Lib's callback events.
+
+  All the events handled by a callback handler are originally triggered by one
+  async RPC call. All the events are tagged with a callback_id specific to a
+  call to an async RPC method defined on the server side.
+
+  The raw message representing an event looks like:
+
+  .. code-block:: python
+
+    {
+      'callbackId': <string, callbackId>,
+      'name': <string, name of the event>,
+      'time': <long, epoch time of when the event was created on the
+        server side>,
+      'data': <dict, extra data from the callback on the server side>
+    }
+
+  Each message is then used to create a CallbackEvent object on the client
+  side.
+
+  Attributes:
+    ret_value: any, the direct return value of the async RPC call.
+  """
+
+  def __init__(self,
+               callback_id,
+               event_client,
+               ret_value,
+               method_name,
+               device,
+               rpc_max_timeout_sec,
+               default_timeout_sec=120):
+    """Initializes a callback handler base object.
+
+    Args:
+      callback_id: str, the callback ID which associates with a group of
+        callback events.
+      event_client: SnippetClientV2, the client object used to send RPC to the
+        server and receive response.
+      ret_value: any, the direct return value of the async RPC call.
+      method_name: str, the name of the executed Async snippet function.
+      device: DeviceController, the device object associated with this handler.
+      rpc_max_timeout_sec: float, maximum time for sending a single RPC call.
+      default_timeout_sec: float, the default timeout for this handler. It
+        must be no longer than rpc_max_timeout_sec.
+    """
+    self._id = callback_id
+    self.ret_value = ret_value
+    self._device = device
+    self._event_client = event_client
+    self._method_name = method_name
+
+    if rpc_max_timeout_sec < default_timeout_sec:
+      raise ValueError('The max timeout of a single RPC must be no smaller '
+                       'than the default timeout of the callback handler. '
+                       f'Got rpc_max_timeout_sec={rpc_max_timeout_sec}, '
+                       f'default_timeout_sec={default_timeout_sec}.')
+    self._rpc_max_timeout_sec = rpc_max_timeout_sec
+    self._default_timeout_sec = default_timeout_sec
+
+  @property
+  def rpc_max_timeout_sec(self):
+    """Maximum time for sending a single RPC call."""
+    return self._rpc_max_timeout_sec
+
+  @property
+  def default_timeout_sec(self):
+    """Default timeout used by this callback handler."""
+    return self._default_timeout_sec
+
+  @property
+  def callback_id(self):
+    """The callback ID which associates a group of callback events."""
+    return self._id
+
+  @abc.abstractmethod
+  def callEventWaitAndGetRpc(self, callback_id, event_name, timeout_sec):
+    """Calls snippet lib's RPC to wait for a callback event.
+
+    Override this method to use this class with various snippet lib
+    implementations.
+
+    This function waits and gets a CallbackEvent with the specified identifier
+    from the server. It will raise a timeout error if the expected event does
+    not occur within the time limit.
+
+    Args:
+      callback_id: str, the callback identifier.
+      event_name: str, the callback name.
+      timeout_sec: float, the number of seconds to wait for the event. It is
+        already checked that this argument is no longer than the max timeout
+        of a single RPC.
+
+    Returns:
+      The event dictionary.
+
+    Raises:
+      errors.CallbackHandlerTimeoutError: Raised if the expected event does not
+        occur within the time limit.
+    """
+
+  @abc.abstractmethod
+  def callEventGetAllRpc(self, callback_id, event_name):
+    """Calls snippet lib's RPC to get all existing snippet events.
+
+    Override this method to use this class with various snippet lib
+    implementations.
+
+    This function gets all existing events in the server with the specified
+    identifier without waiting.
+
+    Args:
+      callback_id: str, the callback identifier.
+      event_name: str, the callback name.
+
+    Returns:
+      A list of event dictionaries.
+    """
+
+  def waitAndGet(self, event_name, timeout=None):
+    """Waits and gets a CallbackEvent with the specified identifier.
+
+    It will raise a timeout error if the expected event does not occur within
+    the time limit.
+
+    Args:
+      event_name: str, the name of the event to get.
+      timeout: float, the number of seconds to wait before giving up. If None,
+        it will be set to self.default_timeout_sec.
+
+    Returns:
+      CallbackEvent, the oldest entry of the specified event.
+
+    Raises:
+      errors.CallbackHandlerBaseError: If the specified timeout is longer than
+        the max timeout supported.
+      errors.CallbackHandlerTimeoutError: The expected event does not occur
+        within the time limit.
+    """
+    if timeout is None:
+      timeout = self.default_timeout_sec
+
+    if timeout:
+      if timeout > self.rpc_max_timeout_sec:
+        raise errors.CallbackHandlerBaseError(
+            self._device,
+            f'Specified timeout {timeout} is longer than max timeout '
+            f'{self.rpc_max_timeout_sec}.')
+
+    raw_event = self.callEventWaitAndGetRpc(self._id, event_name, timeout)
+    return callback_event.from_dict(raw_event)
+
+  def waitForEvent(self, event_name, predicate, timeout=None):
+    """Waits for an event of the specific name that satisfies the predicate.
+
+    This call will block until the expected event has been received or time
+    out.
+
+    The predicate function defines the condition the event is expected to
+    satisfy. It takes an event and returns True if the condition is
+    satisfied, False otherwise.
+
+    Note all events of the same name that are received but don't satisfy
+    the predicate will be discarded and not be available for further
+    consumption.
+
+    Args:
+      event_name: str, the name of the event to wait for.
+      predicate: function, a function that takes an event (dictionary) and
+        returns a bool.
+      timeout: float, the number of seconds to wait before giving up. If None,
+        it will be set to self.default_timeout_sec.
+
+    Returns:
+      dictionary, the event that satisfies the predicate if received.
+
+    Raises:
+      errors.CallbackHandlerTimeoutError: raised if no event that satisfies the
+        predicate is received after timeout seconds.
+    """
+    if timeout is None:
+      timeout = self.default_timeout_sec
+
+    deadline = time.perf_counter() + timeout
+    while time.perf_counter() <= deadline:
+      single_rpc_timeout = deadline - time.perf_counter()
+      if single_rpc_timeout < 0:
+        break
+
+      single_rpc_timeout = min(single_rpc_timeout, self.rpc_max_timeout_sec)
+      try:
+        event = self.waitAndGet(event_name, single_rpc_timeout)
+      except errors.CallbackHandlerTimeoutError:
+        # Ignoring errors.CallbackHandlerTimeoutError since we need to throw
+        # one with a more specific message.
+        break
+      if predicate(event):
+        return event
+
+    raise errors.CallbackHandlerTimeoutError(
+        self._device,
+        f'Timed out after {timeout}s waiting for an "{event_name}" event that '
+        f'satisfies the predicate "{predicate.__name__}".')
+
+  def getAll(self, event_name):
+    """Gets all existing events in the server with the specified identifier.
+
+    This is a non-blocking call.
+
+    Args:
+      event_name: str, the name of the event to get.
+
+    Returns:
+      A list of CallbackEvent, each representing an event from the Server side.
+    """
+    raw_events = self.callEventGetAllRpc(self._id, event_name)
+    return [callback_event.from_dict(msg) for msg in raw_events]
diff --git a/mobly/snippet/client_base.py b/mobly/snippet/client_base.py
index 0009f20..3e046df 100644
--- a/mobly/snippet/client_base.py
+++ b/mobly/snippet/client_base.py
@@ -68,8 +68,6 @@
   Attributes:
     package: str, the user-visible name of the snippet library being
       communicated with.
-    host_port: int, the host port of this RPC client.
-    device_port: int, the device port of this RPC client.
     log: Logger, the logger of the corresponding device controller.
     verbose_logging: bool, if True, prints more detailed log
       information. Default is True.
@@ -85,8 +83,6 @@
     """
 
     self.package = package
-    self.host_port = None
-    self.device_port = None
     self.log = device.log
     self.verbose_logging = True
     self._device = device
@@ -101,12 +97,13 @@
     """Initializes the snippet client to interact with the remote device.
 
     This function contains following stages:
-      1. preparing to start the snippet server.
-      2. starting the snippet server on the remote device.
-      3. making a connection to the snippet server.
+      1. before starting server: preparing to start the snippet server.
+      2. start server: starting the snippet server on the remote device.
+      3. make connection: making a connection to the snippet server.
 
-    After this, the self.host_port and self.device_port attributes must be
-    set.
+    An error occurring at any stage will abort the initialization. Only errors
+    at the `start_server` and `make_connection` stages will trigger `stop` to
+    clean up.
 
     Raises:
       errors.ProtocolError: something went wrong when exchanging data with the
@@ -148,10 +145,8 @@
 
       raise
 
-    self.log.debug(
-        'Snippet package %s initialized after %.1fs on host port %d.',
-        self.package,
-        time.perf_counter() - start_time, self.host_port)
+    self.log.debug('Snippet package %s initialized after %.1fs.', self.package,
+                   time.perf_counter() - start_time)
 
   @abc.abstractmethod
   def before_starting_server(self):
@@ -160,6 +155,10 @@
     For example, subclass can check or modify the device settings at this
     stage.
 
+    NOTE: Any error at this stage will abort the initialization without cleanup.
+    So do not acquire resources in this function, or this function should
+    release the acquired resources if an error occurs.
+
     Raises:
       errors.ServerStartPreCheckError: when prechecks for starting the server
         failed.
@@ -194,10 +193,6 @@
       In this case, the client should implement `close_connection` to close
       the connection.
 
-    This function uses self.host_port for communicating with the server. If
-    self.host_port is 0 or None, this function finds an available host port to
-    make the connection and set self.host_port to the found port.
-
     Raises:
       errors.ProtocolError: something went wrong when exchanging data with the
         server.
@@ -240,7 +235,7 @@
     """Reconnects to the server after the device was disconnected.
 
     Instead of creating a new instance of the client:
-      - Uses the given port (or finds a new available host_port if 0 or None is
+      - Uses the given port (or finds a new available host port if 0 or None is
       given).
       - Tries to connect to the remote server with the selected port.
 
diff --git a/mobly/snippet/errors.py b/mobly/snippet/errors.py
index 764aea4..4d41adb 100644
--- a/mobly/snippet/errors.py
+++ b/mobly/snippet/errors.py
@@ -60,3 +60,12 @@
 
 class ServerDiedError(Error):
   """Raised if the snippet server died before all tests finish."""
+
+
+# Error types for callback handlers
+class CallbackHandlerBaseError(errors.DeviceError):
+  """Base error type for snippet clients."""
+
+
+class CallbackHandlerTimeoutError(Error):
+  """Raised if the expected event does not occur within the time limit."""
diff --git a/mobly/suite_runner.py b/mobly/suite_runner.py
index d921199..b2b2579 100644
--- a/mobly/suite_runner.py
+++ b/mobly/suite_runner.py
@@ -13,8 +13,14 @@
 # limitations under the License.
 """Runner for Mobly test suites.
 
-To create a test suite, call suite_runner.run_suite() with one or more
-individual test classes. For example:
+These is just example code to help users run a collection of Mobly test
+classes. Users can use it as is or customize it based on their requirements.
+
+There are two ways to use this runner.
+
+1. Call suite_runner.run_suite() with one or more individual test classes. This
+is for users who just need to execute a collection of test classes without any
+additional steps.
 
 .. code-block:: python
 
@@ -25,14 +31,47 @@
   ...
   if __name__ == '__main__':
     suite_runner.run_suite(foo_test.FooTest, bar_test.BarTest)
-"""
 
+2. Create a subclass of base_suite.BaseSuite and add the individual test
+classes. Using the BaseSuite class allows users to define their own setup
+and teardown steps on the suite level as well as custom config for each test
+class.
+
+.. code-block:: python
+
+  from mobly import base_suite
+  from mobly import suite_runner
+
+  from my.path import MyFooTest
+  from my.path import MyBarTest
+
+
+  class MySuite(base_suite.BaseSuite):
+
+    def setup_suite(self, config):
+      # Add a class with default config.
+      self.add_test_class(MyFooTest)
+      # Add a class with test selection.
+      self.add_test_class(MyBarTest,
+                          tests=['test_a', 'test_b'])
+      # Add the same class again with a custom config and suffix.
+      my_config = some_config_logic(config)
+      self.add_test_class(MyBarTest,
+                          config=my_config,
+                          name_suffix='WithCustomConfig')
+
+
+  if __name__ == '__main__':
+    suite_runner.run_suite_class()
+"""
 import argparse
 import collections
+import inspect
 import logging
 import sys
 
 from mobly import base_test
+from mobly import base_suite
 from mobly import config_parser
 from mobly import signals
 from mobly import test_runner
@@ -42,6 +81,137 @@
   pass
 
 
+def _parse_cli_args(argv):
+  """Parses cli args that are consumed by Mobly.
+
+  Args:
+    argv: A list that is then parsed as cli args. If None, defaults to cli
+      input.
+
+  Returns:
+    Namespace containing the parsed args.
+  """
+  parser = argparse.ArgumentParser(description='Mobly Suite Executable.')
+  group = parser.add_mutually_exclusive_group(required=True)
+  group.add_argument('-c',
+                     '--config',
+                     type=str,
+                     metavar='<PATH>',
+                     help='Path to the test configuration file.')
+  group.add_argument(
+      '-l',
+      '--list_tests',
+      action='store_true',
+      help='Print the names of the tests defined in a script without '
+      'executing them.')
+  parser.add_argument('--tests',
+                      '--test_case',
+                      nargs='+',
+                      type=str,
+                      metavar='[ClassA[.test_a] ClassB[.test_b] ...]',
+                      help='A list of test classes and optional tests to execute.')
+  parser.add_argument('-tb',
+                      '--test_bed',
+                      nargs='+',
+                      type=str,
+                      metavar='[<TEST BED NAME1> <TEST BED NAME2> ...]',
+                      help='Specify which test beds to run tests on.')
+
+  parser.add_argument('-v',
+                      '--verbose',
+                      action='store_true',
+                      help='Set console logger level to DEBUG')
+  if not argv:
+    argv = sys.argv[1:]
+  return parser.parse_known_args(argv)[0]
+
+
+def _find_suite_class():
+  """Finds the test suite class in the current module.
+
+  Walk through module members and find the subclass of BaseSuite. Only
+  one subclass is allowed in a module.
+
+  Returns:
+      The test suite class in the test module.
+  """
+  test_suites = []
+  main_module_members = sys.modules['__main__']
+  for _, module_member in main_module_members.__dict__.items():
+    if inspect.isclass(module_member):
+      if issubclass(module_member, base_suite.BaseSuite):
+        test_suites.append(module_member)
+  if len(test_suites) != 1:
+    logging.error('Expected 1 test class per file, found %s.',
+                  [t.__name__ for t in test_suites])
+    sys.exit(1)
+  return test_suites[0]
+
+
+def _print_test_names(test_classes):
+  """Prints the names of all the tests in all test classes.
+  Args:
+    test_classes: classes, the test classes to print names from.
+  """
+  for test_class in test_classes:
+    cls = test_class(config_parser.TestRunConfig())
+    test_names = []
+    try:
+      # Executes pre-setup procedures, this is required since it might
+      # generate test methods that we want to return as well.
+      cls._pre_run()
+      if cls.tests:
+        # Specified by run list in class.
+        test_names = list(cls.tests)
+      else:
+        # No test method specified by user, list all in test class.
+        test_names = cls.get_existing_test_names()
+    except Exception:
+      logging.exception('Failed to retrieve generated tests.')
+    finally:
+      cls._clean_up()
+    print('==========> %s <==========' % cls.TAG)
+    for name in test_names:
+      print(f"{cls.TAG}.{name}")
+
+
+def run_suite_class(argv=None):
+  """Executes tests in the test suite.
+
+  Args:
+    argv: A list that is then parsed as CLI args. If None, defaults to sys.argv.
+  """
+  cli_args = _parse_cli_args(argv)
+  suite_class = _find_suite_class()
+  if cli_args.list_tests:
+    _print_test_names([suite_class])
+    sys.exit(0)
+  test_configs = config_parser.load_test_config_file(cli_args.config,
+                                                     cli_args.test_bed)
+  config_count = len(test_configs)
+  if config_count != 1:
+    logging.error('Expect exactly one test config, found %d', config_count)
+  config = test_configs[0]
+  runner = test_runner.TestRunner(
+      log_dir=config.log_path, testbed_name=config.testbed_name)
+  suite = suite_class(runner, config)
+  console_level = logging.DEBUG if cli_args.verbose else logging.INFO
+  ok = False
+  with runner.mobly_logger(console_level=console_level):
+    try:
+      suite.setup_suite(config.copy())
+      try:
+        runner.run()
+        ok = runner.results.is_all_pass
+        print(ok)
+      except signals.TestAbortAll:
+        pass
+    finally:
+      suite.teardown_suite()
+  if not ok:
+    sys.exit(1)
+
+
 def run_suite(test_classes, argv=None):
   """Executes multiple test classes as a suite.
 
@@ -53,26 +223,7 @@
     argv: A list that is then parsed as cli args. If None, defaults to cli
       input.
   """
-  # Parse cli args.
-  parser = argparse.ArgumentParser(description='Mobly Suite Executable.')
-  parser.add_argument('-c',
-                      '--config',
-                      type=str,
-                      required=True,
-                      metavar='<PATH>',
-                      help='Path to the test configuration file.')
-  parser.add_argument(
-      '--tests',
-      '--test_case',
-      nargs='+',
-      type=str,
-      metavar='[ClassA[.test_a] ClassB[.test_b] ...]',
-      help='A list of test classes and optional tests to execute.')
-  if not argv:
-    argv = sys.argv[1:]
-  args = parser.parse_args(argv)
-  # Load test config file.
-  test_configs = config_parser.load_test_config_file(args.config)
+  args = _parse_cli_args(argv)
 
   # Check the classes that were passed in
   for test_class in test_classes:
@@ -82,14 +233,22 @@
           'mobly.base_test.BaseTestClass', test_class)
       sys.exit(1)
 
+  if args.list_tests:
+    _print_test_names(test_classes)
+    sys.exit(0)
+
+  # Load test config file.
+  test_configs = config_parser.load_test_config_file(args.config,
+                                                     args.test_bed)
   # Find the full list of tests to execute
   selected_tests = compute_selected_tests(test_classes, args.tests)
 
+  console_level = logging.DEBUG if args.verbose else logging.INFO
   # Execute the suite
   ok = True
   for config in test_configs:
     runner = test_runner.TestRunner(config.log_path, config.testbed_name)
-    with runner.mobly_logger():
+    with runner.mobly_logger(console_level=console_level):
       for (test_class, tests) in selected_tests.items():
         runner.add_test_class(config, test_class, tests)
       try:
@@ -155,7 +314,7 @@
   test_class_name_to_tests = collections.OrderedDict()
   for test_name in selected_tests:
     if '.' in test_name:  # Has a test method
-      (test_class_name, test_name) = test_name.split('.')
+      (test_class_name, test_name) = test_name.split('.', maxsplit=1)
       if test_class_name not in test_class_name_to_tests:
         # Never seen this class before
         test_class_name_to_tests[test_class_name] = [test_name]
diff --git a/mobly/test_runner.py b/mobly/test_runner.py
index 5324bf3..624f056 100644
--- a/mobly/test_runner.py
+++ b/mobly/test_runner.py
@@ -16,6 +16,7 @@
 import contextlib
 import logging
 import os
+import signal
 import sys
 import time
 
@@ -65,12 +66,13 @@
   tests = None
   if args.tests:
     tests = args.tests
+  console_level = logging.DEBUG if args.verbose else logging.INFO
   # Execute the test class with configs.
   ok = True
   for config in test_configs:
     runner = TestRunner(log_dir=config.log_path,
                         testbed_name=config.testbed_name)
-    with runner.mobly_logger():
+    with runner.mobly_logger(console_level=console_level):
       runner.add_test_class(config, test_class, tests)
       try:
         runner.run()
@@ -125,6 +127,11 @@
                       type=str,
                       metavar='[<TEST BED NAME1> <TEST BED NAME2> ...]',
                       help='Specify which test beds to run tests on.')
+
+  parser.add_argument('-v',
+                      '--verbose',
+                      action='store_true',
+                      help='Set console logger level to DEBUG')
   if not argv:
     argv = sys.argv[1:]
   return parser.parse_known_args(argv)[0]
@@ -245,6 +252,10 @@
                                            self._logger_start_time)
       return self.root_output_path
 
+    @property
+    def summary_file_path(self):
+      return os.path.join(self.root_output_path, records.OUTPUT_FILE_SUMMARY)
+
     def set_start_point(self):
       """Sets the start point of a test run.
 
@@ -289,20 +300,26 @@
     self._test_run_metadata = TestRunner._TestRunMetaData(log_dir, testbed_name)
 
   @contextlib.contextmanager
-  def mobly_logger(self, alias='latest'):
+  def mobly_logger(self, alias='latest', console_level=logging.INFO):
     """Starts and stops a logging context for a Mobly test run.
 
     Args:
       alias: optional string, the name of the latest log alias directory to
         create. If a falsy value is specified, then the directory will not
         be created.
+      console_level: optional logging level, log level threshold used for log
+        messages printed to the console. Logs with a level less severe than
+        console_level will not be printed to the console.
 
     Yields:
       The host file path where the logs for the test run are stored.
     """
     # Refresh the log path at the beginning of the logger context.
     root_output_path = self._test_run_metadata.generate_test_run_log_path()
-    logger.setup_test_logger(root_output_path, self._testbed_name, alias=alias)
+    logger.setup_test_logger(root_output_path,
+                             self._testbed_name,
+                             alias=alias,
+                             console_level=console_level)
     try:
       yield self._test_run_metadata.root_output_path
     finally:
@@ -388,8 +405,20 @@
     utils.create_dir(self._test_run_metadata.root_output_path)
 
     summary_writer = records.TestSummaryWriter(
-        os.path.join(self._test_run_metadata.root_output_path,
-                     records.OUTPUT_FILE_SUMMARY))
+        self._test_run_metadata.summary_file_path)
+
+    # When a SIGTERM is received during the execution of a test, the Mobly test
+    # immediately terminates without executing any of the finally blocks. This
+    # handler converts the SIGTERM into a TestAbortAll signal so that the
+    # finally blocks will execute. We use TestAbortAll because other exceptions
+    # will be caught in the base test class and it will continue executing
+    # remaining tests.
+    def sigterm_handler(*args):
+      logging.warning('Test received a SIGTERM. Aborting all tests.')
+      raise signals.TestAbortAll('Test received a SIGTERM.')
+
+    signal.signal(signal.SIGTERM, sigterm_handler)
+
     try:
       for test_run_info in self._test_run_infos:
         # Set up the test-specific config
@@ -413,6 +442,7 @@
           f'Summary for test run {self._test_run_metadata.run_id}:',
           f'Total time elapsed {self._test_run_metadata.time_elapsed_sec}s',
           f'Artifacts are saved in "{self._test_run_metadata.root_output_path}"',
+          f'Test summary saved in "{self._test_run_metadata.summary_file_path}"',
           f'Test results: {self.results.summary_str()}'
       ]
       logging.info('\n'.join(summary_lines))
diff --git a/mobly/utils.py b/mobly/utils.py
index 6015617..7d26496 100644
--- a/mobly/utils.py
+++ b/mobly/utils.py
@@ -33,8 +33,13 @@
 from typing import Tuple, overload
 
 import portpicker
-# TODO(ericth): Use Literal from typing if we only run on Python 3.8 or later.
-from typing_extensions import Literal
+
+# TODO(#851): Remove this try/except statement and typing_extensions from
+# install_requires when Python 3.8 is the minimum version we support.
+try:
+    from typing import Literal
+except ImportError:
+    from typing_extensions import Literal
 
 # File name length is limited to 255 chars on some OS, so we need to make sure
 # the file names we output fits within the limit.
@@ -477,7 +482,7 @@
   if timer is not None:
     timer.cancel()
   if timer_triggered.is_set():
-    raise subprocess.TimeoutExpired(cmd=cwd,
+    raise subprocess.TimeoutExpired(cmd=cmd,
                                     timeout=timeout,
                                     output=out,
                                     stderr=err)
diff --git a/setup.py b/setup.py
index 30f3ce2..a278b26 100755
--- a/setup.py
+++ b/setup.py
@@ -18,7 +18,9 @@
 import sys
 
 install_requires = [
-    'portpicker', 'pyserial', 'pyyaml', 'timeout_decorator', 'typing_extensions>=4.1.1'
+    'portpicker',
+    'pyyaml',
+    'typing_extensions>=4.1.1; python_version<"3.8"',
 ]
 
 if platform.system() == 'Windows':
@@ -44,13 +46,13 @@
 def main():
   setuptools.setup(
       name='mobly',
-      version='1.11.1',
+      version='1.12.1',
       maintainer='Ang Li',
       maintainer_email='mobly-github@googlegroups.com',
       description='Automation framework for special end-to-end test cases',
       license='Apache2.0',
       url='https://github.com/google/mobly',
-      download_url='https://github.com/google/mobly/tarball/1.11.1',
+      download_url='https://github.com/google/mobly/tarball/1.12.1',
       packages=setuptools.find_packages(exclude=['tests']),
       include_package_data=False,
       scripts=['tools/sl4a_shell.py', 'tools/snippet_shell.py'],
diff --git a/tests/lib/terminated_test.py b/tests/lib/terminated_test.py
new file mode 100644
index 0000000..b9dfdf4
--- /dev/null
+++ b/tests/lib/terminated_test.py
@@ -0,0 +1,38 @@
+# Copyright 2022 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import platform
+import signal
+
+from mobly import base_test
+from mobly import signals
+from mobly import test_runner
+
+
+class TerminatedTest(base_test.BaseTestClass):
+
+  def test_terminated(self):
+    # SIGTERM handler does not work on Windows. So just simulate the behaviour
+    # for the purpose of this test.
+    if platform.system() == 'Windows':
+      logging.warning('Test received a SIGTERM. Aborting all tests.')
+      raise signals.TestAbortAll('Test received a SIGTERM.')
+    else:
+      os.kill(os.getpid(), signal.SIGTERM)
+
+
+if __name__ == '__main__':
+  test_runner.main()
diff --git a/tests/mobly/base_test_test.py b/tests/mobly/base_test_test.py
index ebac0e0..be67437 100755
--- a/tests/mobly/base_test_test.py
+++ b/tests/mobly/base_test_test.py
@@ -1728,6 +1728,7 @@
     self.assertEqual(actual_record.test_name, 'teardown_class')
     self.assertEqual(actual_record.details, MSG_EXPECTED_EXCEPTION)
     self.assertEqual(actual_record.extras, MOCK_EXTRA)
+    self.assertIsNotNone(actual_record.end_time)
 
   def test_expect_in_setup_test(self):
     must_call = mock.Mock()
@@ -1960,6 +1961,62 @@
     bc.unpack_userparams(arg1="haha")
     self.assertEqual(bc.arg1, "haha")
 
+  def test_pre_run_failure(self):
+    """Test code path for `pre_run` failure.
+
+    When `pre_run` fails, pre-execution calculation is incomplete and the
+    number of tests requested is unknown. This is a
+    fatal issue that blocks any test execution in a class.
+
+    A class level error record is generated.
+    Unlike `setup_class` failure, no test is considered "skipped" in this
+    case as execution stage never started.
+    """
+
+    class MockBaseTest(base_test.BaseTestClass):
+
+      def pre_run(self):
+        raise Exception(MSG_EXPECTED_EXCEPTION)
+
+      def logic(self, a, b):
+        pass
+
+      def test_foo(self):
+        pass
+
+    bt_cls = MockBaseTest(self.mock_test_cls_configs)
+    bt_cls.run()
+    self.assertEqual(len(bt_cls.results.requested), 0)
+    class_record = bt_cls.results.error[0]
+    self.assertEqual(class_record.test_name, 'pre_run')
+    self.assertEqual(bt_cls.results.skipped, [])
+
+  # TODO(angli): remove after the full deprecation of `setup_generated_tests`.
+  def test_setup_generated_tests(self):
+
+    class MockBaseTest(base_test.BaseTestClass):
+
+      def setup_generated_tests(self):
+        self.generate_tests(test_logic=self.logic,
+                            name_func=self.name_gen,
+                            arg_sets=[(1, 2), (3, 4)])
+
+      def name_gen(self, a, b):
+        return 'test_%s_%s' % (a, b)
+
+      def logic(self, a, b):
+        pass
+
+    bt_cls = MockBaseTest(self.mock_test_cls_configs)
+    bt_cls.run()
+    self.assertEqual(len(bt_cls.results.requested), 2)
+    self.assertEqual(len(bt_cls.results.passed), 2)
+    self.assertIsNone(bt_cls.results.passed[0].uid)
+    self.assertIsNone(bt_cls.results.passed[1].uid)
+    self.assertEqual(bt_cls.results.passed[0].test_name, 'test_1_2')
+    self.assertEqual(bt_cls.results.passed[1].test_name, 'test_3_4')
+
+  # TODO(angli): remove after the full deprecation of `setup_generated_tests`.
   def test_setup_generated_tests_failure(self):
     """Test code path for setup_generated_tests failure.
 
@@ -1987,14 +2044,14 @@
     bt_cls.run()
     self.assertEqual(len(bt_cls.results.requested), 0)
     class_record = bt_cls.results.error[0]
-    self.assertEqual(class_record.test_name, 'setup_generated_tests')
+    self.assertEqual(class_record.test_name, 'pre_run')
     self.assertEqual(bt_cls.results.skipped, [])
 
   def test_generate_tests_run(self):
 
     class MockBaseTest(base_test.BaseTestClass):
 
-      def setup_generated_tests(self):
+      def pre_run(self):
         self.generate_tests(test_logic=self.logic,
                             name_func=self.name_gen,
                             arg_sets=[(1, 2), (3, 4)])
@@ -2018,7 +2075,7 @@
 
     class MockBaseTest(base_test.BaseTestClass):
 
-      def setup_generated_tests(self):
+      def pre_run(self):
         self.generate_tests(test_logic=self.logic,
                             name_func=self.name_gen,
                             uid_func=self.uid_logic,
@@ -2042,7 +2099,7 @@
 
     class MockBaseTest(base_test.BaseTestClass):
 
-      def setup_generated_tests(self):
+      def pre_run(self):
         self.generate_tests(test_logic=self.logic,
                             name_func=self.name_gen,
                             uid_func=self.uid_logic,
@@ -2068,7 +2125,7 @@
 
     class MockBaseTest(base_test.BaseTestClass):
 
-      def setup_generated_tests(self):
+      def pre_run(self):
         self.generate_tests(test_logic=self.logic,
                             name_func=self.name_gen,
                             arg_sets=[(1, 2), (3, 4)])
@@ -2085,7 +2142,7 @@
     self.assertEqual(len(bt_cls.results.passed), 1)
     self.assertEqual(bt_cls.results.passed[0].test_name, 'test_3_4')
 
-  def test_generate_tests_call_outside_of_setup_generated_tests(self):
+  def test_generate_tests_call_outside_of_pre_run(self):
 
     class MockBaseTest(base_test.BaseTestClass):
 
@@ -2107,7 +2164,8 @@
     self.assertEqual(actual_record.test_name, "test_ha")
     self.assertEqual(
         actual_record.details,
-        '"generate_tests" cannot be called outside of setup_generated_tests')
+        "'generate_tests' cannot be called outside of the followin"
+        "g functions: ['pre_run', 'setup_generated_tests'].")
     expected_summary = ("Error 1, Executed 1, Failed 0, Passed 0, "
                         "Requested 1, Skipped 0")
     self.assertEqual(bt_cls.results.summary_str(), expected_summary)
@@ -2116,7 +2174,7 @@
 
     class MockBaseTest(base_test.BaseTestClass):
 
-      def setup_generated_tests(self):
+      def pre_run(self):
         self.generate_tests(test_logic=self.logic,
                             name_func=self.name_gen,
                             arg_sets=[(1, 2), (3, 4)])
@@ -2130,7 +2188,7 @@
     bt_cls = MockBaseTest(self.mock_test_cls_configs)
     bt_cls.run()
     actual_record = bt_cls.results.error[0]
-    self.assertEqual(actual_record.test_name, "setup_generated_tests")
+    self.assertEqual(actual_record.test_name, "pre_run")
     self.assertEqual(
         actual_record.details,
         'During test generation of "logic": Test name "ha" already exists'
@@ -2141,6 +2199,7 @@
 
   def test_write_user_data(self):
     content = {'a': 1}
+    original_content = content.copy()
 
     class MockBaseTest(base_test.BaseTestClass):
 
@@ -2158,7 +2217,9 @@
           continue
         hit = True
         self.assertEqual(c['a'], content['a'])
+        self.assertIn('timestamp', c)
         self.assertIsNotNone(c['timestamp'])
+        self.assertEqual(content, original_content, 'Content arg was mutated.')
     self.assertTrue(hit)
 
   def test_record_controller_info(self):
@@ -2300,11 +2361,10 @@
       def _run_test_logic(self, arg):
         pass
 
-      def setup_generated_tests(self):
-        self.generate_tests(
-          self._run_test_logic,
-          name_func=lambda arg: f'test_generated_{arg}',
-          arg_sets=[(1,)])
+      def pre_run(self):
+        self.generate_tests(self._run_test_logic,
+                            name_func=lambda arg: f'test_generated_{arg}',
+                            arg_sets=[(1,)])
 
     bt_cls = MockBaseTest(self.mock_test_cls_configs)
     bt_cls.run()
@@ -2480,7 +2540,8 @@
   def test_retry_generated_test_last_pass(self):
     max_count = 3
     mock_action = mock.MagicMock(
-      side_effect = [Exception('Fail 1'), Exception('Fail 2'), None])
+        side_effect=[Exception('Fail 1'),
+                     Exception('Fail 2'), None])
 
     class MockBaseTest(base_test.BaseTestClass):
 
@@ -2488,11 +2549,10 @@
       def _run_test_logic(self, arg):
         mock_action()
 
-      def setup_generated_tests(self):
-        self.generate_tests(
-          self._run_test_logic,
-          name_func=lambda arg: f'test_generated_{arg}',
-          arg_sets=[(1,)])
+      def pre_run(self):
+        self.generate_tests(self._run_test_logic,
+                            name_func=lambda arg: f'test_generated_{arg}',
+                            arg_sets=[(1,)])
 
     bt_cls = MockBaseTest(self.mock_test_cls_configs)
     bt_cls.run()
diff --git a/tests/mobly/controllers/android_device_lib/adb_test.py b/tests/mobly/controllers/android_device_lib/adb_test.py
index 94e3975..a013959 100755
--- a/tests/mobly/controllers/android_device_lib/adb_test.py
+++ b/tests/mobly/controllers/android_device_lib/adb_test.py
@@ -505,6 +505,17 @@
           stderr=None,
           timeout=adb.DEFAULT_GETPROP_TIMEOUT_SEC)
 
+  def test_getprop_custom_timeout(self):
+    timeout_s = adb.DEFAULT_GETPROP_TIMEOUT_SEC * 2
+    with mock.patch.object(adb.AdbProxy, '_exec_cmd') as mock_exec_cmd:
+      mock_exec_cmd.return_value = b'blah'
+      self.assertEqual(adb.AdbProxy().getprop('haha', timeout=timeout_s),
+                       'blah')
+      mock_exec_cmd.assert_called_once_with(['adb', 'shell', 'getprop', 'haha'],
+                                            shell=False,
+                                            stderr=None,
+                                            timeout=timeout_s)
+
   def test__parse_getprop_output_special_values(self):
     mock_adb_output = (
         b'[selinux.restorecon_recursive]: [/data/misc_ce/10]\n'
@@ -758,8 +769,8 @@
                                        shell=False,
                                        timeout=None,
                                        stderr=None)
-    self.assertEqual(mock_sleep.call_count, 2)
-    mock_sleep.assert_called_with(10)
+    self.assertEqual(mock_sleep.call_count, adb.ADB_ROOT_RETRY_ATTEMPTS - 1)
+    mock_sleep.assert_has_calls([mock.call(10), mock.call(20)])
 
   def test_has_shell_command_called_correctly(self):
     with mock.patch.object(adb.AdbProxy, '_exec_cmd') as mock_exec_cmd:
diff --git a/tests/mobly/controllers/android_device_lib/callback_handler_test.py b/tests/mobly/controllers/android_device_lib/callback_handler_test.py
index 27f27b2..a30408f 100755
--- a/tests/mobly/controllers/android_device_lib/callback_handler_test.py
+++ b/tests/mobly/controllers/android_device_lib/callback_handler_test.py
@@ -47,7 +47,7 @@
                                                method_name=None,
                                                ad=mock.Mock())
     self.assertEqual(handler.callback_id, MOCK_CALLBACK_ID)
-    with self.assertRaisesRegex(AttributeError, "can't set attribute"):
+    with self.assertRaises(AttributeError):
       handler.callback_id = 'ha'
 
   def test_event_dict_to_snippet_event(self):
diff --git a/tests/mobly/controllers/android_device_lib/callback_handler_v2_test.py b/tests/mobly/controllers/android_device_lib/callback_handler_v2_test.py
new file mode 100644
index 0000000..b598cae
--- /dev/null
+++ b/tests/mobly/controllers/android_device_lib/callback_handler_v2_test.py
@@ -0,0 +1,152 @@
+# Copyright 2022 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Unit tests for callback_handler_v2.CallbackHandlerV2."""
+
+import unittest
+from unittest import mock
+
+from mobly.controllers.android_device_lib import callback_handler_v2
+from mobly.snippet import callback_event
+from mobly.snippet import errors
+
+MOCK_CALLBACK_ID = '2-1'
+MOCK_RAW_EVENT = {
+    'callbackId': '2-1',
+    'name': 'AsyncTaskResult',
+    'time': 20460228696,
+    'data': {
+        'exampleData': "Here's a simple event.",
+        'successful': True,
+        'secretNumber': 12
+    }
+}
+
+
+class CallbackHandlerV2Test(unittest.TestCase):
+  """Unit tests for callback_handler_v2.CallbackHandlerV2."""
+
+  def _make_callback_handler(self,
+                             callback_id=None,
+                             event_client=None,
+                             ret_value=None,
+                             method_name=None,
+                             device=None,
+                             rpc_max_timeout_sec=600,
+                             default_timeout_sec=120):
+    return callback_handler_v2.CallbackHandlerV2(
+        callback_id=callback_id,
+        event_client=event_client,
+        ret_value=ret_value,
+        method_name=method_name,
+        device=device,
+        rpc_max_timeout_sec=rpc_max_timeout_sec,
+        default_timeout_sec=default_timeout_sec)
+
+  def assert_event_correct(self, actual_event, expected_raw_event_dict):
+    expected_event = callback_event.from_dict(expected_raw_event_dict)
+    self.assertEqual(str(actual_event), str(expected_event))
+
+  def test_wait_and_get(self):
+    mock_event_client = mock.Mock()
+    mock_event_client.eventWaitAndGet = mock.Mock(return_value=MOCK_RAW_EVENT)
+    handler = self._make_callback_handler(callback_id=MOCK_CALLBACK_ID,
+                                          event_client=mock_event_client)
+    event = handler.waitAndGet('ha')
+    self.assert_event_correct(event, MOCK_RAW_EVENT)
+    mock_event_client.eventWaitAndGet.assert_called_once_with(
+        MOCK_CALLBACK_ID, 'ha', mock.ANY)
+
+  def test_wait_and_get_timeout_arg_transform(self):
+    mock_event_client = mock.Mock()
+    mock_event_client.eventWaitAndGet = mock.Mock(return_value=MOCK_RAW_EVENT)
+    handler = self._make_callback_handler(event_client=mock_event_client)
+
+    wait_and_get_timeout_sec = 10
+    expected_rpc_timeout_ms = 10000
+    _ = handler.waitAndGet('ha', timeout=wait_and_get_timeout_sec)
+    mock_event_client.eventWaitAndGet.assert_called_once_with(
+        mock.ANY, mock.ANY, expected_rpc_timeout_ms)
+
+  def test_wait_for_event(self):
+    mock_event_client = mock.Mock()
+    handler = self._make_callback_handler(callback_id=MOCK_CALLBACK_ID,
+                                          event_client=mock_event_client)
+
+    event_should_ignore = {
+        'callbackId': '2-1',
+        'name': 'AsyncTaskResult',
+        'time': 20460228696,
+        'data': {
+            'successful': False,
+        }
+    }
+    mock_event_client.eventWaitAndGet.side_effect = [
+        event_should_ignore, MOCK_RAW_EVENT
+    ]
+
+    def some_condition(event):
+      return event.data['successful']
+
+    event = handler.waitForEvent('AsyncTaskResult', some_condition, 0.01)
+    self.assert_event_correct(event, MOCK_RAW_EVENT)
+    mock_event_client.eventWaitAndGet.assert_has_calls([
+        mock.call(MOCK_CALLBACK_ID, 'AsyncTaskResult', mock.ANY),
+        mock.call(MOCK_CALLBACK_ID, 'AsyncTaskResult', mock.ANY),
+    ])
+
+  def test_get_all(self):
+    mock_event_client = mock.Mock()
+    handler = self._make_callback_handler(callback_id=MOCK_CALLBACK_ID,
+                                          event_client=mock_event_client)
+
+    mock_event_client.eventGetAll = mock.Mock(
+        return_value=[MOCK_RAW_EVENT, MOCK_RAW_EVENT])
+
+    all_events = handler.getAll('ha')
+    self.assertEqual(len(all_events), 2)
+    for event in all_events:
+      self.assert_event_correct(event, MOCK_RAW_EVENT)
+
+    mock_event_client.eventGetAll.assert_called_once_with(
+        MOCK_CALLBACK_ID, 'ha')
+
+  def test_wait_and_get_timeout_message_pattern_matches(self):
+    mock_event_client = mock.Mock()
+    android_snippet_timeout_msg = (
+        'com.google.android.mobly.snippet.event.EventSnippet$'
+        'EventSnippetException: timeout.')
+    mock_event_client.eventWaitAndGet = mock.Mock(
+        side_effect=errors.ApiError(mock.Mock(), android_snippet_timeout_msg))
+    handler = self._make_callback_handler(event_client=mock_event_client,
+                                          method_name='test_method')
+
+    expected_msg = ('Timed out after waiting .*s for event "ha" triggered by '
+                    'test_method .*')
+    with self.assertRaisesRegex(errors.CallbackHandlerTimeoutError,
+                                expected_msg):
+      handler.waitAndGet('ha')
+
+  def test_wait_and_get_reraise_if_pattern_not_match(self):
+    mock_event_client = mock.Mock()
+    snippet_timeout_msg = 'Snippet executed with error.'
+    mock_event_client.eventWaitAndGet = mock.Mock(
+        side_effect=errors.ApiError(mock.Mock(), snippet_timeout_msg))
+    handler = self._make_callback_handler(event_client=mock_event_client)
+
+    with self.assertRaisesRegex(errors.ApiError, snippet_timeout_msg):
+      handler.waitAndGet('ha')
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/tests/mobly/controllers/android_device_lib/fastboot_test.py b/tests/mobly/controllers/android_device_lib/fastboot_test.py
new file mode 100644
index 0000000..6e59dea
--- /dev/null
+++ b/tests/mobly/controllers/android_device_lib/fastboot_test.py
@@ -0,0 +1,43 @@
+# Copyright 2022 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from unittest import mock
+
+from mobly.controllers.android_device_lib import fastboot
+
+
+class FastbootTest(unittest.TestCase):
+  """Unit tests for mobly.controllers.android_device_lib.adb."""
+
+  @mock.patch('mobly.controllers.android_device_lib.fastboot.Popen')
+  @mock.patch('logging.debug')
+  def test_fastboot_commands_and_results_are_logged_to_debug_log(
+      self, mock_debug_logger, mock_popen):
+    expected_stdout = 'stdout'
+    expected_stderr = b'stderr'
+    mock_popen.return_value.communicate = mock.Mock(
+        return_value=(expected_stdout, expected_stderr))
+    mock_popen.return_value.returncode = 123
+
+    fastboot.FastbootProxy().fake_command('extra', 'flags')
+
+    mock_debug_logger.assert_called_with(
+        'cmd: %s, stdout: %s, stderr: %s, ret: %s',
+        '\'fastboot fake-command extra flags\'', expected_stdout,
+        expected_stderr, 123)
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/tests/mobly/controllers/android_device_lib/jsonrpc_client_base_test.py b/tests/mobly/controllers/android_device_lib/jsonrpc_client_base_test.py
index 96b52cb..4cbeb35 100755
--- a/tests/mobly/controllers/android_device_lib/jsonrpc_client_base_test.py
+++ b/tests/mobly/controllers/android_device_lib/jsonrpc_client_base_test.py
@@ -337,6 +337,22 @@
         testing_rpc_response[:jsonrpc_client_base._MAX_RPC_RESP_LOGGING_LENGTH],
         resp_len - jsonrpc_client_base._MAX_RPC_RESP_LOGGING_LENGTH)
 
+  def test_close_scoket_connection(self):
+    client = FakeRpcClient()
+    mock_conn = mock.Mock()
+    client._conn = mock_conn
+
+    client.close_socket_connection()
+    mock_conn.close.assert_called_once()
+    self.assertIsNone(client._conn)
+
+  def test_close_scoket_connection_without_connection(self):
+    client = FakeRpcClient()
+    client._conn = None
+
+    client.close_socket_connection()
+    self.assertIsNone(client._conn)
+
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/tests/mobly/controllers/android_device_lib/services/snippet_management_service_test.py b/tests/mobly/controllers/android_device_lib/services/snippet_management_service_test.py
index 469ce4e..162847b 100755
--- a/tests/mobly/controllers/android_device_lib/services/snippet_management_service_test.py
+++ b/tests/mobly/controllers/android_device_lib/services/snippet_management_service_test.py
@@ -18,7 +18,7 @@
 from mobly.controllers.android_device_lib.services import snippet_management_service
 
 MOCK_PACKAGE = 'com.mock.package'
-SNIPPET_CLIENT_CLASS_PATH = 'mobly.controllers.android_device_lib.snippet_client.SnippetClient'
+SNIPPET_CLIENT_V2_CLASS_PATH = 'mobly.controllers.android_device_lib.snippet_client_v2.SnippetClientV2'
 
 
 class SnippetManagementServiceTest(unittest.TestCase):
@@ -33,7 +33,7 @@
     manager.stop()
     self.assertFalse(manager.is_alive)
 
-  @mock.patch(SNIPPET_CLIENT_CLASS_PATH)
+  @mock.patch(SNIPPET_CLIENT_V2_CLASS_PATH)
   def test_get_snippet_client(self, mock_class):
     mock_client = mock_class.return_value
     manager = snippet_management_service.SnippetManagementService(
@@ -41,28 +41,28 @@
     manager.add_snippet_client('foo', MOCK_PACKAGE)
     self.assertEqual(manager.get_snippet_client('foo'), mock_client)
 
-  @mock.patch(SNIPPET_CLIENT_CLASS_PATH)
+  @mock.patch(SNIPPET_CLIENT_V2_CLASS_PATH)
   def test_get_snippet_client_fail(self, _):
     manager = snippet_management_service.SnippetManagementService(
         mock.MagicMock())
     self.assertIsNone(manager.get_snippet_client('foo'))
 
-  @mock.patch(SNIPPET_CLIENT_CLASS_PATH)
+  @mock.patch(SNIPPET_CLIENT_V2_CLASS_PATH)
   def test_stop_with_live_client(self, mock_class):
     mock_client = mock_class.return_value
     manager = snippet_management_service.SnippetManagementService(
         mock.MagicMock())
     manager.add_snippet_client('foo', MOCK_PACKAGE)
-    mock_client.start_app_and_connect.assert_called_once_with()
+    mock_client.initialize.assert_called_once_with()
     manager.stop()
-    mock_client.stop_app.assert_called_once_with()
-    mock_client.stop_app.reset_mock()
+    mock_client.stop.assert_called_once_with()
+    mock_client.stop.reset_mock()
     mock_client.is_alive = False
     self.assertFalse(manager.is_alive)
     manager.stop()
-    mock_client.stop_app.assert_not_called()
+    mock_client.stop.assert_not_called()
 
-  @mock.patch(SNIPPET_CLIENT_CLASS_PATH)
+  @mock.patch(SNIPPET_CLIENT_V2_CLASS_PATH)
   def test_add_snippet_client_dup_name(self, _):
     manager = snippet_management_service.SnippetManagementService(
         mock.MagicMock())
@@ -72,7 +72,7 @@
     with self.assertRaisesRegex(snippet_management_service.Error, msg):
       manager.add_snippet_client('foo', MOCK_PACKAGE + 'ha')
 
-  @mock.patch(SNIPPET_CLIENT_CLASS_PATH)
+  @mock.patch(SNIPPET_CLIENT_V2_CLASS_PATH)
   def test_add_snippet_client_dup_package(self, mock_class):
     mock_client = mock_class.return_value
     mock_client.package = MOCK_PACKAGE
@@ -84,7 +84,7 @@
     with self.assertRaisesRegex(snippet_management_service.Error, msg):
       manager.add_snippet_client('bar', MOCK_PACKAGE)
 
-  @mock.patch(SNIPPET_CLIENT_CLASS_PATH)
+  @mock.patch(SNIPPET_CLIENT_V2_CLASS_PATH)
   def test_remove_snippet_client(self, mock_class):
     mock_client = mock.MagicMock()
     mock_class.return_value = mock_client
@@ -96,7 +96,7 @@
     with self.assertRaisesRegex(snippet_management_service.Error, msg):
       manager.foo.do_something()
 
-  @mock.patch(SNIPPET_CLIENT_CLASS_PATH)
+  @mock.patch(SNIPPET_CLIENT_V2_CLASS_PATH)
   def test_remove_snippet_client(self, mock_class):
     mock_client = mock.MagicMock()
     mock_class.return_value = mock_client
@@ -107,31 +107,31 @@
         'No snippet client is registered with name "foo".'):
       manager.remove_snippet_client('foo')
 
-  @mock.patch(SNIPPET_CLIENT_CLASS_PATH)
+  @mock.patch(SNIPPET_CLIENT_V2_CLASS_PATH)
   def test_start_with_live_service(self, mock_class):
     mock_client = mock_class.return_value
     manager = snippet_management_service.SnippetManagementService(
         mock.MagicMock())
     manager.add_snippet_client('foo', MOCK_PACKAGE)
-    mock_client.start_app_and_connect.reset_mock()
+    mock_client.initialize.reset_mock()
     mock_client.is_alive = True
     manager.start()
-    mock_client.start_app_and_connect.assert_not_called()
+    mock_client.initialize.assert_not_called()
     self.assertTrue(manager.is_alive)
     mock_client.is_alive = False
     manager.start()
-    mock_client.start_app_and_connect.assert_called_once_with()
+    mock_client.initialize.assert_called_once_with()
 
-  @mock.patch(SNIPPET_CLIENT_CLASS_PATH)
+  @mock.patch(SNIPPET_CLIENT_V2_CLASS_PATH)
   def test_pause(self, mock_class):
     mock_client = mock_class.return_value
     manager = snippet_management_service.SnippetManagementService(
         mock.MagicMock())
     manager.add_snippet_client('foo', MOCK_PACKAGE)
     manager.pause()
-    mock_client.disconnect.assert_called_once_with()
+    mock_client.close_connection.assert_called_once_with()
 
-  @mock.patch(SNIPPET_CLIENT_CLASS_PATH)
+  @mock.patch(SNIPPET_CLIENT_V2_CLASS_PATH)
   def test_resume_positive_case(self, mock_class):
     mock_client = mock_class.return_value
     manager = snippet_management_service.SnippetManagementService(
@@ -139,9 +139,9 @@
     manager.add_snippet_client('foo', MOCK_PACKAGE)
     mock_client.is_alive = False
     manager.resume()
-    mock_client.restore_app_connection.assert_called_once_with()
+    mock_client.restore_server_connection.assert_called_once_with()
 
-  @mock.patch(SNIPPET_CLIENT_CLASS_PATH)
+  @mock.patch(SNIPPET_CLIENT_V2_CLASS_PATH)
   def test_resume_negative_case(self, mock_class):
     mock_client = mock_class.return_value
     manager = snippet_management_service.SnippetManagementService(
@@ -149,9 +149,9 @@
     manager.add_snippet_client('foo', MOCK_PACKAGE)
     mock_client.is_alive = True
     manager.resume()
-    mock_client.restore_app_connection.assert_not_called()
+    mock_client.restore_server_connection.assert_not_called()
 
-  @mock.patch(SNIPPET_CLIENT_CLASS_PATH)
+  @mock.patch(SNIPPET_CLIENT_V2_CLASS_PATH)
   def test_attribute_access(self, mock_class):
     mock_client = mock.MagicMock()
     mock_class.return_value = mock_client
diff --git a/tests/mobly/controllers/android_device_lib/snippet_client_test.py b/tests/mobly/controllers/android_device_lib/snippet_client_test.py
index 6bb2f87..53da1ae 100755
--- a/tests/mobly/controllers/android_device_lib/snippet_client_test.py
+++ b/tests/mobly/controllers/android_device_lib/snippet_client_test.py
@@ -178,6 +178,81 @@
     adb_proxy.forward.assert_called_once_with(['--remove', 'tcp:1'])
 
   @mock.patch('socket.create_connection')
+  @mock.patch('mobly.utils.stop_standing_subprocess')
+  def test_snippet_stop_app_stops_event_client(self,
+                                               mock_stop_standing_subprocess,
+                                               mock_create_connection):
+    adb_proxy = mock.MagicMock()
+    adb_proxy.shell.return_value = b'OK (0 tests)'
+    client = self._make_client(adb_proxy)
+    event_client = snippet_client.SnippetClient(
+        package=MOCK_PACKAGE_NAME, ad=client._ad)
+    client._event_client = event_client
+    event_client_conn = mock.Mock()
+    event_client._conn = event_client_conn
+
+    client.stop_app()
+    self.assertFalse(client.is_alive)
+    event_client_conn.close.assert_called_once()
+    self.assertIsNone(client._event_client)
+    self.assertIsNone(event_client._conn)
+
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.utils.stop_standing_subprocess')
+  def test_snippet_stop_app_stops_event_client_without_connection(
+      self, mock_stop_standing_subprocess, mock_create_connection):
+    adb_proxy = mock.MagicMock()
+    adb_proxy.shell.return_value = b'OK (0 tests)'
+    client = self._make_client(adb_proxy)
+    event_client = snippet_client.SnippetClient(
+        package=MOCK_PACKAGE_NAME, ad=client._ad)
+    client._event_client = event_client
+    event_client._conn = None
+
+    client.stop_app()
+    self.assertFalse(client.is_alive)
+    self.assertIsNone(client._event_client)
+    self.assertIsNone(event_client._conn)
+
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.utils.stop_standing_subprocess')
+  def test_snippet_stop_app_without_event_client(
+      self, mock_stop_standing_subprocess, mock_create_connection):
+    adb_proxy = mock.MagicMock()
+    adb_proxy.shell.return_value = b'OK (0 tests)'
+    client = self._make_client(adb_proxy)
+    client._event_client = None
+
+    client.stop_app()
+    self.assertFalse(client.is_alive)
+    self.assertIsNone(client._event_client)
+
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.utils.stop_standing_subprocess')
+  @mock.patch.object(snippet_client.SnippetClient, 'connect')
+  def test_event_client_does_not_stop_port_forwarding(
+      self, mock_stop_standing_subprocess, mock_create_connection,
+      mock_connect):
+    adb_proxy = mock.MagicMock()
+    adb_proxy.shell.return_value = b'OK (0 tests)'
+    client = self._make_client(adb_proxy)
+    client.host_port = 12345
+    client.device_port = 67890
+
+    event_client = client._start_event_client()
+    # Mock adb proxy of event client to validate forward call
+    event_client._ad = mock.MagicMock()
+    event_client._adb = event_client._ad.adb
+    client._event_client = event_client
+
+    # Verify that neither the stop process nor the deconstructor is trying to
+    # stop the port forwarding
+    client.stop_app()
+    event_client.__del__()
+
+    event_client._adb.forward.assert_not_called()
+
+  @mock.patch('socket.create_connection')
   @mock.patch('mobly.controllers.android_device_lib.snippet_client.'
               'utils.start_standing_subprocess')
   @mock.patch('mobly.controllers.android_device_lib.snippet_client.'
diff --git a/tests/mobly/controllers/android_device_lib/snippet_client_v2_test.py b/tests/mobly/controllers/android_device_lib/snippet_client_v2_test.py
index b97774b..1943abb 100644
--- a/tests/mobly/controllers/android_device_lib/snippet_client_v2_test.py
+++ b/tests/mobly/controllers/android_device_lib/snippet_client_v2_test.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 """Unit tests for mobly.controllers.android_device_lib.snippet_client_v2."""
 
+import socket
 import unittest
 from unittest import mock
 
@@ -25,19 +26,79 @@
 MOCK_PACKAGE_NAME = 'some.package.name'
 MOCK_SERVER_PATH = f'{MOCK_PACKAGE_NAME}/{snippet_client_v2._INSTRUMENTATION_RUNNER_PACKAGE}'
 MOCK_USER_ID = 0
+MOCK_DEVICE_PORT = 1234
+
+
+class _MockAdbProxy(mock_android_device.MockAdbProxy):
+  """Mock class of adb proxy which covers all the calls used by snippet clients.
+
+  To enable testing snippet clients, this class extends the functionality of
+  base class from the following aspects:
+  * Records the arguments of all the calls to the shell method and forward
+    method.
+  * Handles the adb calls to stop the snippet server in the shell function
+    properly.
+
+
+  Attributes:
+    mock_shell_func: mock.Mock, used for recording the calls to the shell
+      method.
+    mock_forward_func: mock.Mock, used for recording the calls to the forward
+      method.
+  """
+
+  def __init__(self, *args, **kwargs):
+    """Initializes the instance of _MockAdbProxy."""
+    super().__init__(*args, **kwargs)
+    self.mock_shell_func = mock.Mock()
+    self.mock_forward_func = mock.Mock()
+
+  def shell(self, *args, **kwargs):
+    """Mock `shell` of mobly.controllers.android_device_lib.adb.AdbProxy."""
+    # Record all the call args
+    self.mock_shell_func(*args, **kwargs)
+
+    # Handle the server stop command properly
+    if f'am instrument --user 0 -w -e action stop {MOCK_SERVER_PATH}' in args:
+      return b'OK (0 tests)'
+
+    # For other commands, hand it over to the base class.
+    return super().shell(*args, **kwargs)
+
+  def forward(self, *args, **kwargs):
+    """Mock `forward` of mobly.controllers.android_device_lib.adb.AdbProxy."""
+    self.mock_forward_func(*args, **kwargs)
+
+
+def _setup_mock_socket_file(mock_socket_create_conn, resp):
+  """Sets up a mock socket file from the mock connection.
+
+  Args:
+    mock_socket_create_conn: The mock method for creating a socket connection.
+    resp: iterable, the side effect of the `readline` function of the mock
+      socket file.
+
+  Returns:
+    The mock socket file that will be injected into the code.
+  """
+  fake_file = mock.Mock()
+  fake_file.readline.side_effect = resp
+  fake_conn = mock.Mock()
+  fake_conn.makefile.return_value = fake_file
+  mock_socket_create_conn.return_value = fake_conn
+  return fake_file
 
 
 class SnippetClientV2Test(unittest.TestCase):
   """Unit tests for SnippetClientV2."""
 
   def _make_client(self, adb_proxy=None, mock_properties=None):
-    adb_proxy = adb_proxy or mock_android_device.MockAdbProxy(
-        instrumented_packages=[
-            (MOCK_PACKAGE_NAME,
-             snippet_client_v2._INSTRUMENTATION_RUNNER_PACKAGE,
-             MOCK_PACKAGE_NAME)
-        ],
-        mock_properties=mock_properties)
+    adb_proxy = adb_proxy or _MockAdbProxy(instrumented_packages=[
+        (MOCK_PACKAGE_NAME, snippet_client_v2._INSTRUMENTATION_RUNNER_PACKAGE,
+         MOCK_PACKAGE_NAME)
+    ],
+                                           mock_properties=mock_properties)
+    self.adb = adb_proxy
 
     device = mock.Mock()
     device.adb = adb_proxy
@@ -48,6 +109,7 @@
         'build_version_sdk':
             adb_proxy.getprop('ro.build.version.sdk'),
     }
+    self.device = device
 
     self.client = snippet_client_v2.SnippetClientV2(MOCK_PACKAGE_NAME, device)
 
@@ -56,11 +118,223 @@
     mock_properties.update(extra_properties)
     self._make_client(mock_properties=mock_properties)
 
-  def _mock_server_process_starting_response(self, mock_start_subprocess,
-                                             resp_lines):
+  def _mock_server_process_starting_response(self,
+                                             mock_start_subprocess,
+                                             resp_lines=None):
+    resp_lines = resp_lines or [
+        b'SNIPPET START, PROTOCOL 1 0', b'SNIPPET SERVING, PORT 1234'
+    ]
     mock_proc = mock_start_subprocess.return_value
     mock_proc.stdout.readline.side_effect = resp_lines
 
+  def _make_client_and_mock_socket_conn(self,
+                                        mock_socket_create_conn,
+                                        socket_resp=None,
+                                        device_port=MOCK_DEVICE_PORT,
+                                        adb_proxy=None,
+                                        mock_properties=None,
+                                        set_counter=True):
+    """Makes the snippet client and mocks the socket connection."""
+    self._make_client(adb_proxy, mock_properties)
+
+    if socket_resp is None:
+      socket_resp = [b'{"status": true, "uid": 1}']
+    self.mock_socket_file = _setup_mock_socket_file(mock_socket_create_conn,
+                                                    socket_resp)
+    self.client.device_port = device_port
+    self.socket_conn = mock_socket_create_conn.return_value
+    if set_counter:
+      self.client._counter = self.client._id_counter()
+
+  def _assert_client_resources_released(self, mock_start_subprocess,
+                                        mock_stop_standing_subprocess,
+                                        mock_get_port):
+    """Asserts the resources had been released before the client stopped."""
+    self.assertIs(self.client._proc, None)
+    self.adb.mock_shell_func.assert_any_call(
+        f'am instrument --user {MOCK_USER_ID} -w -e action stop '
+        f'{MOCK_SERVER_PATH}')
+    mock_stop_standing_subprocess.assert_called_once_with(
+        mock_start_subprocess.return_value)
+    self.assertFalse(self.client.is_alive)
+    self.assertIs(self.client._conn, None)
+    self.socket_conn.close.assert_called()
+    self.assertIs(self.client.host_port, None)
+    self.adb.mock_forward_func.assert_any_call(
+        ['--remove', f'tcp:{mock_get_port.return_value}'])
+    self.assertIsNone(self.client._event_client)
+
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.utils.stop_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  def test_the_whole_lifecycle_with_a_sync_rpc(self, mock_start_subprocess,
+                                               mock_stop_standing_subprocess,
+                                               mock_socket_create_conn,
+                                               mock_get_port):
+    """Tests the whole lifecycle of the client with sending a sync RPC."""
+    socket_resp = [
+        b'{"status": true, "uid": 1}',
+        b'{"id": 0, "result": 123, "error": null, "callback": null}',
+    ]
+    expected_socket_writes = [
+        mock.call(b'{"cmd": "initiate", "uid": -1}\n'),
+        mock.call(b'{"id": 0, "method": "some_sync_rpc", '
+                  b'"params": [1, 2, "hello"]}\n'),
+    ]
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn,
+                                           socket_resp,
+                                           set_counter=False)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+
+    self.client.initialize()
+    rpc_result = self.client.some_sync_rpc(1, 2, 'hello')
+    self.client.stop()
+
+    self._assert_client_resources_released(mock_start_subprocess,
+                                           mock_stop_standing_subprocess,
+                                           mock_get_port)
+
+    self.assertListEqual(self.mock_socket_file.write.call_args_list,
+                         expected_socket_writes)
+    self.assertEqual(rpc_result, 123)
+
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.utils.stop_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.callback_handler_v2.'
+              'CallbackHandlerV2')
+  def test_the_whole_lifecycle_with_an_async_rpc(self, mock_callback_class,
+                                                 mock_start_subprocess,
+                                                 mock_stop_standing_subprocess,
+                                                 mock_socket_create_conn,
+                                                 mock_get_port):
+    """Tests the whole lifecycle of the client with sending an async RPC."""
+    mock_socket_resp = [
+        b'{"status": true, "uid": 1}',
+        b'{"id": 0, "result": 123, "error": null, "callback": "1-0"}',
+        b'{"status": true, "uid": 1}',
+    ]
+    expected_socket_writes = [
+        mock.call(b'{"cmd": "initiate", "uid": -1}\n'),
+        mock.call(b'{"id": 0, "method": "some_async_rpc", '
+                  b'"params": [1, 2, "async"]}\n'),
+        mock.call(b'{"cmd": "continue", "uid": 1}\n'),
+    ]
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn,
+                                           mock_socket_resp,
+                                           set_counter=False)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+
+    self.client.initialize()
+    rpc_result = self.client.some_async_rpc(1, 2, 'async')
+    event_client = self.client._event_client
+    self.client.stop()
+
+    self._assert_client_resources_released(mock_start_subprocess,
+                                           mock_stop_standing_subprocess,
+                                           mock_get_port)
+
+    self.assertListEqual(self.mock_socket_file.write.call_args_list,
+                         expected_socket_writes)
+    mock_callback_class.assert_called_with(
+        callback_id='1-0',
+        event_client=event_client,
+        ret_value=123,
+        method_name='some_async_rpc',
+        device=self.device,
+        rpc_max_timeout_sec=snippet_client_v2._SOCKET_READ_TIMEOUT,
+        default_timeout_sec=snippet_client_v2._CALLBACK_DEFAULT_TIMEOUT_SEC)
+    self.assertIs(rpc_result, mock_callback_class.return_value)
+    self.assertIsNone(event_client.host_port, None)
+    self.assertIsNone(event_client.device_port, None)
+
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.utils.stop_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.callback_handler_v2.'
+              'CallbackHandlerV2')
+  def test_the_whole_lifecycle_with_multiple_rpcs(self, mock_callback_class,
+                                                  mock_start_subprocess,
+                                                  mock_stop_standing_subprocess,
+                                                  mock_socket_create_conn,
+                                                  mock_get_port):
+    """Tests the whole lifecycle of the client with sending multiple RPCs."""
+    # Prepare the test
+    mock_socket_resp = [
+        b'{"status": true, "uid": 1}',
+        b'{"id": 0, "result": 123, "error": null, "callback": null}',
+        b'{"id": 1, "result": 456, "error": null, "callback": "1-0"}',
+        # Response for starting the event client
+        b'{"status": true, "uid": 1}',
+        b'{"id": 2, "result": 789, "error": null, "callback": null}',
+        b'{"id": 3, "result": 321, "error": null, "callback": "2-0"}',
+    ]
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn,
+                                           mock_socket_resp,
+                                           set_counter=False)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+
+    rpc_results_expected = [
+        123,
+        mock.Mock(),
+        789,
+        mock.Mock(),
+    ]
+    # Extract the two mock objects to use as return values of callback handler
+    # class
+    mock_callback_class.side_effect = [
+        rpc_results_expected[1], rpc_results_expected[3]
+    ]
+
+    # Run tests
+    rpc_results = []
+    self.client.initialize()
+    rpc_results.append(self.client.some_sync_rpc(1, 2, 'hello'))
+    rpc_results.append(self.client.some_async_rpc(3, 4, 'async'))
+    rpc_results.append(self.client.some_sync_rpc(5, 'hello'))
+    rpc_results.append(self.client.some_async_rpc(6, 'async'))
+    event_client = self.client._event_client
+    self.client.stop()
+
+    # Assertions
+    mock_callback_class_calls_expected = [
+        mock.call(callback_id='1-0',
+                  event_client=event_client,
+                  ret_value=456,
+                  method_name='some_async_rpc',
+                  device=self.device,
+                  rpc_max_timeout_sec=snippet_client_v2._SOCKET_READ_TIMEOUT,
+                  default_timeout_sec=(
+                      snippet_client_v2._CALLBACK_DEFAULT_TIMEOUT_SEC)),
+        mock.call(
+            callback_id='2-0',
+            event_client=event_client,
+            ret_value=321,
+            method_name='some_async_rpc',
+            device=self.device,
+            rpc_max_timeout_sec=snippet_client_v2._SOCKET_READ_TIMEOUT,
+            default_timeout_sec=snippet_client_v2._CALLBACK_DEFAULT_TIMEOUT_SEC)
+    ]
+    self.assertListEqual(rpc_results, rpc_results_expected)
+    mock_callback_class.assert_has_calls(mock_callback_class_calls_expected)
+    self._assert_client_resources_released(mock_start_subprocess,
+                                           mock_stop_standing_subprocess,
+                                           mock_get_port)
+    self.assertIsNone(event_client.host_port, None)
+    self.assertIsNone(event_client.device_port, None)
+
   def test_check_app_installed_normally(self):
     """Tests that app checker runs normally when app installed correctly."""
     self._make_client()
@@ -68,16 +342,14 @@
 
   def test_check_app_installed_fail_app_not_installed(self):
     """Tests that app checker fails without installing app."""
-    self._make_client(mock_android_device.MockAdbProxy())
+    self._make_client(_MockAdbProxy())
     expected_msg = f'.* {MOCK_PACKAGE_NAME} is not installed.'
     with self.assertRaisesRegex(errors.ServerStartPreCheckError, expected_msg):
       self.client._validate_snippet_app_on_device()
 
   def test_check_app_installed_fail_not_instrumented(self):
     """Tests that app checker fails without instrumenting app."""
-    self._make_client(
-        mock_android_device.MockAdbProxy(
-            installed_packages=[MOCK_PACKAGE_NAME]))
+    self._make_client(_MockAdbProxy(installed_packages=[MOCK_PACKAGE_NAME]))
     expected_msg = (
         f'.* {MOCK_PACKAGE_NAME} is installed, but it is not instrumented.')
     with self.assertRaisesRegex(errors.ServerStartPreCheckError, expected_msg):
@@ -86,7 +358,7 @@
   def test_check_app_installed_fail_instrumentation_not_installed(self):
     """Tests that app checker fails without installing instrumentation."""
     self._make_client(
-        mock_android_device.MockAdbProxy(instrumented_packages=[(
+        _MockAdbProxy(instrumented_packages=[(
             MOCK_PACKAGE_NAME,
             snippet_client_v2._INSTRUMENTATION_RUNNER_PACKAGE,
             'not.installed')]))
@@ -94,53 +366,44 @@
     with self.assertRaisesRegex(errors.ServerStartPreCheckError, expected_msg):
       self.client._validate_snippet_app_on_device()
 
-  @mock.patch.object(mock_android_device.MockAdbProxy, 'shell')
-  def test_disable_hidden_api_normally(self, mock_shell_func):
+  def test_disable_hidden_api_normally(self):
     """Tests the disabling hidden api process works normally."""
     self._make_client_with_extra_adb_properties({
         'ro.build.version.codename': 'S',
         'ro.build.version.sdk': '31',
     })
-    self.client._device.is_rootable = True
+    self.device.is_rootable = True
     self.client._disable_hidden_api_blocklist()
-    mock_shell_func.assert_called_with(
+    self.adb.mock_shell_func.assert_called_with(
         'settings put global hidden_api_blacklist_exemptions "*"')
 
-  @mock.patch.object(mock_android_device.MockAdbProxy, 'shell')
-  def test_disable_hidden_api_low_sdk(self, mock_shell_func):
+  def test_disable_hidden_api_low_sdk(self):
     """Tests it doesn't disable hidden api with low SDK."""
     self._make_client_with_extra_adb_properties({
         'ro.build.version.codename': 'O',
         'ro.build.version.sdk': '26',
     })
-    self.client._device.is_rootable = True
+    self.device.is_rootable = True
     self.client._disable_hidden_api_blocklist()
-    mock_shell_func.assert_not_called()
+    self.adb.mock_shell_func.assert_not_called()
 
-  @mock.patch.object(mock_android_device.MockAdbProxy, 'shell')
-  def test_disable_hidden_api_non_rootable(self, mock_shell_func):
+  def test_disable_hidden_api_non_rootable(self):
     """Tests it doesn't disable hidden api with non-rootable device."""
     self._make_client_with_extra_adb_properties({
         'ro.build.version.codename': 'S',
         'ro.build.version.sdk': '31',
     })
-    self.client._device.is_rootable = False
+    self.device.is_rootable = False
     self.client._disable_hidden_api_blocklist()
-    mock_shell_func.assert_not_called()
+    self.adb.mock_shell_func.assert_not_called()
 
   @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
               'utils.start_standing_subprocess')
-  @mock.patch.object(mock_android_device.MockAdbProxy,
-                     'shell',
-                     return_value=b'setsid')
+  @mock.patch.object(_MockAdbProxy, 'shell', return_value=b'setsid')
   def test_start_server_with_user_id(self, mock_adb, mock_start_subprocess):
     """Tests that `--user` is added to starting command with SDK >= 24."""
     self._make_client_with_extra_adb_properties({'ro.build.version.sdk': '30'})
-    self._mock_server_process_starting_response(
-        mock_start_subprocess,
-        resp_lines=[
-            b'SNIPPET START, PROTOCOL 1 234', b'SNIPPET SERVING, PORT 1234'
-        ])
+    self._mock_server_process_starting_response(mock_start_subprocess)
 
     self.client.start_server()
     start_cmd_list = [
@@ -155,17 +418,11 @@
 
   @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
               'utils.start_standing_subprocess')
-  @mock.patch.object(mock_android_device.MockAdbProxy,
-                     'shell',
-                     return_value=b'setsid')
+  @mock.patch.object(_MockAdbProxy, 'shell', return_value=b'setsid')
   def test_start_server_without_user_id(self, mock_adb, mock_start_subprocess):
     """Tests that `--user` is not added to starting command on SDK < 24."""
     self._make_client_with_extra_adb_properties({'ro.build.version.sdk': '21'})
-    self._mock_server_process_starting_response(
-        mock_start_subprocess,
-        resp_lines=[
-            b'SNIPPET START, PROTOCOL 1 234', b'SNIPPET SERVING, PORT 1234'
-        ])
+    self._mock_server_process_starting_response(mock_start_subprocess)
 
     self.client.start_server()
     start_cmd_list = [
@@ -179,7 +436,7 @@
 
   @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
               'utils.start_standing_subprocess')
-  @mock.patch.object(mock_android_device.MockAdbProxy,
+  @mock.patch.object(_MockAdbProxy,
                      'shell',
                      side_effect=adb.AdbError('cmd', 'stdout', 'stderr',
                                               'ret_code'))
@@ -187,11 +444,7 @@
                                                     mock_start_subprocess):
     """Checks the starting server command without persisting commands."""
     self._make_client()
-    self._mock_server_process_starting_response(
-        mock_start_subprocess,
-        resp_lines=[
-            b'SNIPPET START, PROTOCOL 1 234', b'SNIPPET SERVING, PORT 1234'
-        ])
+    self._mock_server_process_starting_response(mock_start_subprocess)
 
     self.client.start_server()
     start_cmd_list = [
@@ -211,11 +464,7 @@
   def test_start_server_with_nohup(self, mock_start_subprocess):
     """Checks the starting server command with nohup."""
     self._make_client()
-    self._mock_server_process_starting_response(
-        mock_start_subprocess,
-        resp_lines=[
-            b'SNIPPET START, PROTOCOL 1 234', b'SNIPPET SERVING, PORT 1234'
-        ])
+    self._mock_server_process_starting_response(mock_start_subprocess)
 
     def _mocked_shell(arg):
       if 'nohup' in arg:
@@ -239,11 +488,7 @@
   def test_start_server_with_setsid(self, mock_start_subprocess):
     """Checks the starting server command with setsid."""
     self._make_client()
-    self._mock_server_process_starting_response(
-        mock_start_subprocess,
-        resp_lines=[
-            b'SNIPPET START, PROTOCOL 1 234', b'SNIPPET SERVING, PORT 1234'
-        ])
+    self._mock_server_process_starting_response(mock_start_subprocess)
 
     def _mocked_shell(arg):
       if 'setsid' in arg:
@@ -337,57 +582,804 @@
       self.client.start_server()
 
   @mock.patch('mobly.utils.stop_standing_subprocess')
-  @mock.patch.object(mock_android_device.MockAdbProxy,
-                     'shell',
-                     return_value=b'OK (0 tests)')
-  def test_stop_server_normally(self, mock_android_device_shell,
-                                mock_stop_standing_subprocess):
+  def test_stop_normally(self, mock_stop_standing_subprocess):
     """Tests that stopping server process works normally."""
     self._make_client()
     mock_proc = mock.Mock()
     self.client._proc = mock_proc
+    mock_conn = mock.Mock()
+    self.client._conn = mock_conn
+    self.client.host_port = 12345
+
     self.client.stop()
+
     self.assertIs(self.client._proc, None)
-    mock_android_device_shell.assert_called_once_with(
+    self.adb.mock_shell_func.assert_called_once_with(
         f'am instrument --user {MOCK_USER_ID} -w -e action stop '
         f'{MOCK_SERVER_PATH}')
     mock_stop_standing_subprocess.assert_called_once_with(mock_proc)
+    self.assertFalse(self.client.is_alive)
+    self.assertIs(self.client._conn, None)
+    mock_conn.close.assert_called_once_with()
+    self.assertIs(self.client.host_port, None)
+    self.device.adb.mock_forward_func.assert_called_once_with(
+        ['--remove', 'tcp:12345'])
+    self.assertIsNone(self.client._event_client)
 
   @mock.patch('mobly.utils.stop_standing_subprocess')
-  @mock.patch.object(mock_android_device.MockAdbProxy,
-                     'shell',
-                     return_value=b'OK (0 tests)')
-  def test_stop_server_server_already_cleaned(self, mock_android_device_shell,
-                                              mock_stop_standing_subprocess):
-    """Tests stopping server process when subprocess is already cleaned."""
+  def test_stop_when_server_is_already_cleaned(self,
+                                               mock_stop_standing_subprocess):
+    """Tests that stop server process when subprocess is already cleaned."""
     self._make_client()
     self.client._proc = None
+    mock_conn = mock.Mock()
+    self.client._conn = mock_conn
+    self.client.host_port = 12345
+
     self.client.stop()
+
     self.assertIs(self.client._proc, None)
     mock_stop_standing_subprocess.assert_not_called()
-    mock_android_device_shell.assert_called_once_with(
+    self.adb.assert_called_once_with(
         f'am instrument --user {MOCK_USER_ID} -w -e action stop '
         f'{MOCK_SERVER_PATH}')
+    self.assertFalse(self.client.is_alive)
+    self.assertIs(self.client._conn, None)
+    mock_conn.close.assert_called_once_with()
+    self.assertIs(self.client.host_port, None)
+    self.device.adb.mock_forward_func.assert_called_once_with(
+        ['--remove', 'tcp:12345'])
 
   @mock.patch('mobly.utils.stop_standing_subprocess')
-  @mock.patch.object(mock_android_device.MockAdbProxy,
-                     'shell',
-                     return_value=b'Closed with error.')
-  def test_stop_server_stop_with_error(self, mock_android_device_shell,
-                                       mock_stop_standing_subprocess):
-    """Tests all resources are cleaned even if stopping server has error."""
+  def test_stop_when_conn_is_already_cleaned(self,
+                                             mock_stop_standing_subprocess):
+    """Tests that stop server process when the connection is already closed."""
     self._make_client()
     mock_proc = mock.Mock()
     self.client._proc = mock_proc
+    self.client._conn = None
+    self.client.host_port = 12345
+
+    self.client.stop()
+
+    self.assertIs(self.client._proc, None)
+    mock_stop_standing_subprocess.assert_called_once_with(mock_proc)
+    self.adb.assert_called_once_with(
+        f'am instrument --user {MOCK_USER_ID} -w -e action stop '
+        f'{MOCK_SERVER_PATH}')
+    self.assertFalse(self.client.is_alive)
+    self.assertIs(self.client._conn, None)
+    self.assertIs(self.client.host_port, None)
+    self.device.adb.mock_forward_func.assert_called_once_with(
+        ['--remove', 'tcp:12345'])
+
+  @mock.patch('mobly.utils.stop_standing_subprocess')
+  @mock.patch.object(_MockAdbProxy, 'shell', return_value=b'Closed with error.')
+  def test_stop_with_device_side_error(self, mock_adb_shell,
+                                       mock_stop_standing_subprocess):
+    """Tests all resources will be cleaned when server stop throws an error."""
+    self._make_client()
+    mock_proc = mock.Mock()
+    self.client._proc = mock_proc
+    mock_conn = mock.Mock()
+    self.client._conn = mock_conn
+    self.client.host_port = 12345
     with self.assertRaisesRegex(android_device_lib_errors.DeviceError,
                                 'Closed with error'):
       self.client.stop()
 
     self.assertIs(self.client._proc, None)
     mock_stop_standing_subprocess.assert_called_once_with(mock_proc)
-    mock_android_device_shell.assert_called_once_with(
+    mock_adb_shell.assert_called_once_with(
         f'am instrument --user {MOCK_USER_ID} -w -e action stop '
         f'{MOCK_SERVER_PATH}')
+    self.assertFalse(self.client.is_alive)
+    self.assertIs(self.client._conn, None)
+    mock_conn.close.assert_called_once_with()
+    self.assertIs(self.client.host_port, None)
+    self.device.adb.mock_forward_func.assert_called_once_with(
+        ['--remove', 'tcp:12345'])
+
+  @mock.patch('mobly.utils.stop_standing_subprocess')
+  def test_stop_with_conn_close_error(self, mock_stop_standing_subprocess):
+    """Tests port resource will be cleaned when socket close throws an error."""
+    del mock_stop_standing_subprocess
+    self._make_client()
+    mock_proc = mock.Mock()
+    self.client._proc = mock_proc
+    mock_conn = mock.Mock()
+    # The deconstructor will call this mock function again after tests, so
+    # only throw this error when it is called the first time.
+    mock_conn.close.side_effect = (OSError('Closed with error'), None)
+    self.client._conn = mock_conn
+    self.client.host_port = 12345
+    with self.assertRaisesRegex(OSError, 'Closed with error'):
+      self.client.stop()
+
+    self.device.adb.mock_forward_func.assert_called_once_with(
+        ['--remove', 'tcp:12345'])
+
+  @mock.patch('mobly.utils.stop_standing_subprocess')
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'create_socket_connection')
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'send_handshake_request')
+  def test_stop_with_event_client(self, mock_send_handshake_func,
+                                  mock_create_socket_conn_func,
+                                  mock_stop_standing_subprocess):
+    """Tests that stopping with an event client works normally."""
+    del mock_send_handshake_func
+    del mock_create_socket_conn_func
+    del mock_stop_standing_subprocess
+    self._make_client()
+    self.client.host_port = 12345
+    self.client.device_port = 45678
+    snippet_client_conn = mock.Mock()
+    self.client._conn = snippet_client_conn
+    self.client._create_event_client()
+    event_client = self.client._event_client
+    event_client_conn = mock.Mock()
+    event_client._conn = event_client_conn
+
+    self.client.stop()
+
+    # The snippet client called close method once
+    snippet_client_conn.close.assert_called_once_with()
+    # The event client called close method once
+    event_client_conn.close.assert_called_once_with()
+    self.assertIsNone(event_client._conn)
+    self.assertIsNone(event_client.host_port)
+    self.assertIsNone(event_client.device_port)
+    self.assertIsNone(self.client._event_client)
+    self.assertIs(self.client.host_port, None)
+    self.device.adb.mock_forward_func.assert_called_once_with(
+        ['--remove', 'tcp:12345'])
+
+  @mock.patch('mobly.utils.stop_standing_subprocess')
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'create_socket_connection')
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'send_handshake_request')
+  def test_stop_with_event_client_stops_port_forwarding_once(
+      self, mock_send_handshake_func, mock_create_socket_conn_func,
+      mock_stop_standing_subprocess):
+    """Tests that client with an event client stops port forwarding once."""
+    del mock_send_handshake_func
+    del mock_create_socket_conn_func
+    del mock_stop_standing_subprocess
+    self._make_client()
+    self.client.host_port = 12345
+    self.client.device_port = 45678
+    self.client._create_event_client()
+    event_client = self.client._event_client
+
+    self.client.stop()
+    event_client.__del__()
+    self.client.__del__()
+
+    self.assertIsNone(self.client._event_client)
+    self.device.adb.mock_forward_func.assert_called_once_with(
+        ['--remove', 'tcp:12345'])
+
+  def test_close_connection_normally(self):
+    """Tests that closing connection works normally."""
+    self._make_client()
+    mock_conn = mock.Mock()
+    self.client._conn = mock_conn
+    self.client.host_port = 123
+
+    self.client.close_connection()
+
+    self.assertIs(self.client._conn, None)
+    self.assertIs(self.client.host_port, None)
+    mock_conn.close.assert_called_once_with()
+    self.device.adb.mock_forward_func.assert_called_once_with(
+        ['--remove', 'tcp:123'])
+
+  def test_close_connection_when_host_port_has_been_released(self):
+    """Tests that close connection when the host port has been released."""
+    self._make_client()
+    mock_conn = mock.Mock()
+    self.client._conn = mock_conn
+    self.client.host_port = None
+
+    self.client.close_connection()
+
+    self.assertIs(self.client._conn, None)
+    self.assertIs(self.client.host_port, None)
+    mock_conn.close.assert_called_once_with()
+    self.device.adb.mock_forward_func.assert_not_called()
+
+  def test_close_connection_when_conn_have_been_closed(self):
+    """Tests that close connection when the connection has been closed."""
+    self._make_client()
+    self.client._conn = None
+    self.client.host_port = 123
+
+    self.client.close_connection()
+
+    self.assertIs(self.client._conn, None)
+    self.assertIs(self.client.host_port, None)
+    self.device.adb.mock_forward_func.assert_called_once_with(
+        ['--remove', 'tcp:123'])
+
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  def test_send_sync_rpc_normally(self, mock_start_subprocess,
+                                  mock_socket_create_conn, mock_get_port):
+    """Tests that sending a sync RPC works normally."""
+    del mock_get_port
+    socket_resp = [
+        b'{"status": true, "uid": 1}',
+        b'{"id": 0, "result": 123, "error": null, "callback": null}',
+    ]
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn, socket_resp)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+
+    self.client.make_connection()
+    rpc_result = self.client.some_rpc(1, 2, 'hello')
+
+    self.assertEqual(rpc_result, 123)
+    self.mock_socket_file.write.assert_called_with(
+        b'{"id": 0, "method": "some_rpc", "params": [1, 2, "hello"]}\n')
+
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.callback_handler_v2.'
+              'CallbackHandlerV2')
+  def test_async_rpc_start_event_client(self, mock_callback_class,
+                                        mock_start_subprocess,
+                                        mock_socket_create_conn):
+    """Tests that sending an async RPC starts the event client."""
+    socket_resp = [
+        b'{"status": true, "uid": 1}',
+        b'{"id": 0, "result": 123, "error": null, "callback": "1-0"}',
+        b'{"status": true, "uid": 1}',
+        b'{"id":1,"result":"async-rpc-event","callback":null,"error":null}',
+    ]
+    socket_write_expected = [
+        mock.call(b'{"cmd": "initiate", "uid": -1}\n'),
+        mock.call(b'{"id": 0, "method": "some_async_rpc", '
+                  b'"params": [1, 2, "hello"]}\n'),
+        mock.call(b'{"cmd": "continue", "uid": 1}\n'),
+        mock.call(b'{"id": 1, "method": "eventGetAll", '
+                  b'"params": ["1-0", "eventName"]}\n'),
+    ]
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn,
+                                           socket_resp,
+                                           set_counter=True)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+
+    self.client.host_port = 12345
+    self.client.make_connection()
+    rpc_result = self.client.some_async_rpc(1, 2, 'hello')
+
+    mock_callback_class.assert_called_with(
+        callback_id='1-0',
+        event_client=self.client._event_client,
+        ret_value=123,
+        method_name='some_async_rpc',
+        device=self.device,
+        rpc_max_timeout_sec=snippet_client_v2._SOCKET_READ_TIMEOUT,
+        default_timeout_sec=snippet_client_v2._CALLBACK_DEFAULT_TIMEOUT_SEC)
+    self.assertIs(rpc_result, mock_callback_class.return_value)
+
+    # Ensure the event client is alive
+    self.assertTrue(self.client._event_client.is_alive)
+
+    # Ensure the event client shared the same ports and uid with main client
+    self.assertEqual(self.client._event_client.host_port, 12345)
+    self.assertEqual(self.client._event_client.device_port, MOCK_DEVICE_PORT)
+    self.assertEqual(self.client._event_client.uid, self.client.uid)
+
+    # Ensure the event client has reset its own RPC id counter
+    self.assertEqual(next(self.client._counter), 1)
+    self.assertEqual(next(self.client._event_client._counter), 0)
+
+    # Ensure that event client can send RPCs
+    event_string = self.client._event_client.eventGetAll('1-0', 'eventName')
+    self.assertEqual(event_string, 'async-rpc-event')
+    self.assertListEqual(
+        self.mock_socket_file.write.call_args_list,
+        socket_write_expected,
+    )
+
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port')
+  def test_initialize_client_normally(self, mock_get_port,
+                                      mock_start_subprocess,
+                                      mock_socket_create_conn):
+    """Tests that initializing the client works normally."""
+    mock_get_port.return_value = 12345
+    socket_resp = [b'{"status": true, "uid": 1}']
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn,
+                                           socket_resp,
+                                           set_counter=True)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+
+    self.client.initialize()
+    self.assertTrue(self.client.is_alive)
+    self.assertEqual(self.client.uid, 1)
+    self.assertEqual(self.client.host_port, 12345)
+    self.assertEqual(self.client.device_port, MOCK_DEVICE_PORT)
+    self.assertEqual(next(self.client._counter), 0)
+
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port')
+  def test_restore_event_client(self, mock_get_port, mock_start_subprocess,
+                                mock_socket_create_conn):
+    """Tests restoring the event client."""
+    mock_get_port.return_value = 12345
+    socket_resp = [
+        # response of handshake when initializing the client
+        b'{"status": true, "uid": 1}',
+        # response of an async RPC
+        b'{"id": 0, "result": 123, "error": null, "callback": "1-0"}',
+        # response of starting event client
+        b'{"status": true, "uid": 1}',
+        # response of restoring server connection
+        b'{"status": true, "uid": 2}',
+        # response of restoring event client
+        b'{"status": true, "uid": 3}',
+        # response of restoring server connection
+        b'{"status": true, "uid": 4}',
+        # response of restoring event client
+        b'{"status": true, "uid": 5}',
+    ]
+    socket_write_expected = [
+        # request of handshake when initializing the client
+        mock.call(b'{"cmd": "initiate", "uid": -1}\n'),
+        # request of an async RPC
+        mock.call(b'{"id": 0, "method": "some_async_rpc", "params": []}\n'),
+        # request of starting event client
+        mock.call(b'{"cmd": "continue", "uid": 1}\n'),
+        # request of restoring server connection
+        mock.call(b'{"cmd": "initiate", "uid": -1}\n'),
+        # request of restoring event client
+        mock.call(b'{"cmd": "initiate", "uid": -1}\n'),
+        # request of restoring server connection
+        mock.call(b'{"cmd": "initiate", "uid": -1}\n'),
+        # request of restoring event client
+        mock.call(b'{"cmd": "initiate", "uid": -1}\n'),
+    ]
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn, socket_resp)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+
+    self.client.make_connection()
+    callback = self.client.some_async_rpc()
+
+    # before reconnect, clients use previously selected ports
+    self.assertEqual(self.client.host_port, 12345)
+    self.assertEqual(self.client.device_port, MOCK_DEVICE_PORT)
+    self.assertEqual(callback._event_client.host_port, 12345)
+    self.assertEqual(callback._event_client.device_port, MOCK_DEVICE_PORT)
+    self.assertEqual(next(self.client._event_client._counter), 0)
+
+    # after reconnect, if host port specified, clients use specified port
+    self.client.restore_server_connection(port=54321)
+    self.assertEqual(self.client.host_port, 54321)
+    self.assertEqual(self.client.device_port, MOCK_DEVICE_PORT)
+    self.assertEqual(callback._event_client.host_port, 54321)
+    self.assertEqual(callback._event_client.device_port, MOCK_DEVICE_PORT)
+    self.assertEqual(next(self.client._event_client._counter), 0)
+
+    # after reconnect, if host port not specified, clients use selected
+    # available port
+    mock_get_port.return_value = 56789
+    self.client.restore_server_connection()
+    self.assertEqual(self.client.host_port, 56789)
+    self.assertEqual(self.client.device_port, MOCK_DEVICE_PORT)
+    self.assertEqual(callback._event_client.host_port, 56789)
+    self.assertEqual(callback._event_client.device_port, MOCK_DEVICE_PORT)
+    self.assertEqual(next(self.client._event_client._counter), 0)
+
+    # if unable to reconnect for any reason, a
+    # errors.ServerRestoreConnectionError is raised.
+    mock_socket_create_conn.side_effect = IOError('socket timed out')
+    with self.assertRaisesRegex(
+        errors.ServerRestoreConnectionError,
+        (f'Failed to restore server connection for {MOCK_PACKAGE_NAME} at '
+         f'host port 56789, device port {MOCK_DEVICE_PORT}')):
+      self.client.restore_server_connection()
+
+    self.assertListEqual(self.mock_socket_file.write.call_args_list,
+                         socket_write_expected)
+
+  @mock.patch.object(snippet_client_v2.SnippetClientV2, '_make_connection')
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'send_handshake_request')
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'create_socket_connection')
+  def test_restore_server_connection_with_event_client(
+      self, mock_create_socket_conn_func, mock_send_handshake_func,
+      mock_make_connection):
+    """Tests restoring server connection when the event client is not None."""
+    self._make_client()
+    event_client = snippet_client_v2.SnippetClientV2('mock-package',
+                                                     mock.Mock())
+    self.client._event_client = event_client
+    self.client.device_port = 54321
+    self.client.uid = 5
+
+    self.client.restore_server_connection(port=12345)
+
+    mock_make_connection.assert_called_once_with()
+    self.assertEqual(event_client.host_port, 12345)
+    self.assertEqual(event_client.device_port, 54321)
+    self.assertEqual(next(event_client._counter), 0)
+    mock_create_socket_conn_func.assert_called_once_with()
+    mock_send_handshake_func.assert_called_once_with(
+        -1, snippet_client_v2.ConnectionHandshakeCommand.INIT)
+
+  @mock.patch('builtins.print')
+  def test_help_rpc_when_printing_by_default(self, mock_print):
+    """Tests the `help` method when it prints the output by default."""
+    self._make_client()
+    mock_rpc = mock.MagicMock()
+    self.client._rpc = mock_rpc
+
+    result = self.client.help()
+    mock_rpc.assert_called_once_with('help')
+    self.assertIsNone(result)
+    mock_print.assert_called_once_with(mock_rpc.return_value)
+
+  @mock.patch('builtins.print')
+  def test_help_rpc_when_not_printing(self, mock_print):
+    """Tests the `help` method when it was set not to print the output."""
+    self._make_client()
+    mock_rpc = mock.MagicMock()
+    self.client._rpc = mock_rpc
+
+    result = self.client.help(print_output=False)
+    mock_rpc.assert_called_once_with('help')
+    self.assertEqual(mock_rpc.return_value, result)
+    mock_print.assert_not_called()
+
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  def test_make_connection_normally(self, mock_get_port, mock_start_subprocess,
+                                    mock_socket_create_conn):
+    """Tests that making a connection works normally."""
+    del mock_get_port
+    socket_resp = [b'{"status": true, "uid": 1}']
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn, socket_resp)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+
+    self.client.make_connection()
+    self.assertEqual(self.client.uid, 1)
+    self.assertEqual(self.client.device_port, MOCK_DEVICE_PORT)
+    self.adb.mock_forward_func.assert_called_once_with(
+        ['tcp:12345', f'tcp:{MOCK_DEVICE_PORT}'])
+    mock_socket_create_conn.assert_called_once_with(
+        ('localhost', 12345), snippet_client_v2._SOCKET_CONNECTION_TIMEOUT)
+    self.socket_conn.settimeout.assert_called_once_with(
+        snippet_client_v2._SOCKET_READ_TIMEOUT)
+
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  def test_make_connection_with_preset_host_port(self, mock_get_port,
+                                                 mock_start_subprocess,
+                                                 mock_socket_create_conn):
+    """Tests that make a connection with the preset host port."""
+    del mock_get_port
+    socket_resp = [b'{"status": true, "uid": 1}']
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn, socket_resp)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+
+    self.client.host_port = 23456
+    self.client.make_connection()
+    self.assertEqual(self.client.uid, 1)
+    self.assertEqual(self.client.device_port, MOCK_DEVICE_PORT)
+    # Test that the host port for forwarding is 23456 instead of 12345
+    self.adb.mock_forward_func.assert_called_once_with(
+        ['tcp:23456', f'tcp:{MOCK_DEVICE_PORT}'])
+    mock_socket_create_conn.assert_called_once_with(
+        ('localhost', 23456), snippet_client_v2._SOCKET_CONNECTION_TIMEOUT)
+    self.socket_conn.settimeout.assert_called_once_with(
+        snippet_client_v2._SOCKET_READ_TIMEOUT)
+
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  def test_make_connection_with_ip(self, mock_get_port, mock_start_subprocess,
+                                   mock_socket_create_conn):
+    """Tests that make a connection with 127.0.0.1 instead of localhost."""
+    del mock_get_port
+    socket_resp = [b'{"status": true, "uid": 1}']
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn, socket_resp)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+
+    mock_conn = mock_socket_create_conn.return_value
+
+    # Refuse creating socket connection with 'localhost', only accept
+    # '127.0.0.1' as address
+    def _mock_create_conn_side_effect(address, *args, **kwargs):
+      del args, kwargs
+      if address[0] == '127.0.0.1':
+        return mock_conn
+      raise ConnectionRefusedError(f'Refusing connection to {address[0]}.')
+
+    mock_socket_create_conn.side_effect = _mock_create_conn_side_effect
+
+    self.client.make_connection()
+    self.assertEqual(self.client.uid, 1)
+    self.assertEqual(self.client.device_port, MOCK_DEVICE_PORT)
+    self.adb.mock_forward_func.assert_called_once_with(
+        ['tcp:12345', f'tcp:{MOCK_DEVICE_PORT}'])
+    mock_socket_create_conn.assert_any_call(
+        ('127.0.0.1', 12345), snippet_client_v2._SOCKET_CONNECTION_TIMEOUT)
+    self.socket_conn.settimeout.assert_called_once_with(
+        snippet_client_v2._SOCKET_READ_TIMEOUT)
+
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  @mock.patch('socket.create_connection')
+  def test_make_connection_io_error(self, mock_socket_create_conn,
+                                    mock_get_port):
+    """Tests IOError occurred trying to create a socket connection."""
+    del mock_get_port
+    mock_socket_create_conn.side_effect = IOError()
+    with self.assertRaises(IOError):
+      self._make_client()
+      self.client.device_port = 123
+      self.client.make_connection()
+
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  @mock.patch('socket.create_connection')
+  def test_make_connection_timeout(self, mock_socket_create_conn,
+                                   mock_get_port):
+    """Tests timeout occurred trying to create a socket connection."""
+    del mock_get_port
+    mock_socket_create_conn.side_effect = socket.timeout
+    with self.assertRaises(socket.timeout):
+      self._make_client()
+      self.client.device_port = 123
+      self.client.make_connection()
+
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  def test_make_connection_receives_none_handshake_response(
+      self, mock_start_subprocess, mock_socket_create_conn, mock_get_port):
+    """Tests make_connection receives None as the handshake response."""
+    del mock_get_port
+    socket_resp = [None]
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn, socket_resp)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+
+    with self.assertRaisesRegex(
+        errors.ProtocolError, errors.ProtocolError.NO_RESPONSE_FROM_HANDSHAKE):
+      self.client.make_connection()
+
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  def test_make_connection_receives_empty_handshake_response(
+      self, mock_start_subprocess, mock_socket_create_conn, mock_get_port):
+    """Tests make_connection receives an empty handshake response."""
+    del mock_get_port
+    socket_resp = [b'']
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn, socket_resp)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+
+    with self.assertRaisesRegex(
+        errors.ProtocolError, errors.ProtocolError.NO_RESPONSE_FROM_HANDSHAKE):
+      self.client.make_connection()
+
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  def test_make_connection_receives_invalid_handshake_response(
+      self, mock_start_subprocess, mock_socket_create_conn, mock_get_port):
+    """Tests make_connection receives an invalid handshake response."""
+    del mock_get_port
+    socket_resp = [b'{"status": false, "uid": 1}']
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn, socket_resp)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+
+    self.client.make_connection()
+    self.assertEqual(self.client.uid, -1)
+
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  def test_make_connection_send_handshake_request_error(self,
+                                                        mock_start_subprocess,
+                                                        mock_socket_create_conn,
+                                                        mock_get_port):
+    """Tests that an error occurred trying to send a handshake request."""
+    del mock_get_port
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    self.mock_socket_file.write.side_effect = socket.error('Socket write error')
+
+    with self.assertRaisesRegex(errors.Error, 'Socket write error'):
+      self.client.make_connection()
+
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  def test_make_connection_receive_handshake_response_error(
+      self, mock_start_subprocess, mock_socket_create_conn, mock_get_port):
+    """Tests that an error occurred trying to receive a handshake response."""
+    del mock_get_port
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    self.mock_socket_file.readline.side_effect = socket.error(
+        'Socket read error')
+
+    with self.assertRaisesRegex(errors.Error, 'Socket read error'):
+      self.client.make_connection()
+
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  def test_make_connection_decode_handshake_response_bytes_error(
+      self, mock_start_subprocess, mock_socket_create_conn, mock_get_port):
+    """Tests that an error occurred trying to decode a handshake response."""
+    del mock_get_port
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    self.client.log = mock.Mock()
+    socket_response = bytes('{"status": false, "uid": 1}', encoding='cp037')
+    self.mock_socket_file.readline.side_effect = [socket_response]
+
+    with self.assertRaises(UnicodeError):
+      self.client.make_connection()
+
+    self.client.log.error.assert_has_calls([
+        mock.call(
+            'Failed to decode socket response bytes using encoding utf8: %s',
+            socket_response)
+    ])
+
+  def test_rpc_sending_and_receiving(self):
+    """Test RPC sending and receiving.
+
+    Tests that when sending and receiving an RPC the correct data is used.
+    """
+    self._make_client()
+    rpc_request = '{"id": 0, "method": "some_rpc", "params": []}'
+    rpc_response_expected = ('{"id": 0, "result": 123, "error": null, '
+                             '"callback": null}')
+
+    socket_write_expected = [
+        mock.call(b'{"id": 0, "method": "some_rpc", "params": []}\n')
+    ]
+    socket_response = (b'{"id": 0, "result": 123, "error": null, '
+                       b'"callback": null}')
+
+    mock_socket_file = mock.Mock()
+    mock_socket_file.readline.return_value = socket_response
+    self.client._client = mock_socket_file
+
+    rpc_response = self.client.send_rpc_request(rpc_request)
+
+    self.assertEqual(rpc_response, rpc_response_expected)
+    self.assertEqual(mock_socket_file.write.call_args_list,
+                     socket_write_expected)
+
+  def test_rpc_send_socket_write_error(self):
+    """Tests that an error occurred trying to write the socket file."""
+    self._make_client()
+    self.client._client = mock.Mock()
+    self.client._client.write.side_effect = socket.error('Socket write error')
+
+    rpc_request = '{"id": 0, "method": "some_rpc", "params": []}'
+    with self.assertRaisesRegex(errors.Error, 'Socket write error'):
+      self.client.send_rpc_request(rpc_request)
+
+  def test_rpc_send_socket_read_error(self):
+    """Tests that an error occurred trying to read the socket file."""
+    self._make_client()
+    self.client._client = mock.Mock()
+    self.client._client.readline.side_effect = socket.error('Socket read error')
+
+    rpc_request = '{"id": 0, "method": "some_rpc", "params": []}'
+    with self.assertRaisesRegex(errors.Error, 'Socket read error'):
+      self.client.send_rpc_request(rpc_request)
+
+  def test_rpc_send_decode_socket_response_bytes_error(self):
+    """Tests that an error occurred trying to decode the socket response."""
+    self._make_client()
+    self.client.log = mock.Mock()
+    self.client._client = mock.Mock()
+    socket_response = bytes(
+        '{"id": 0, "result": 123, "error": null, "callback": null}',
+        encoding='cp037')
+    self.client._client.readline.return_value = socket_response
+
+    rpc_request = '{"id": 0, "method": "some_rpc", "params": []}'
+    with self.assertRaises(UnicodeError):
+      self.client.send_rpc_request(rpc_request)
+
+    self.client.log.error.assert_has_calls([
+        mock.call(
+            'Failed to decode socket response bytes using encoding utf8: %s',
+            socket_response)
+    ])
+
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'send_handshake_request')
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'create_socket_connection')
+  def test_make_conn_with_forwarded_port_init(self,
+                                              mock_create_socket_conn_func,
+                                              mock_send_handshake_func):
+    """Tests make_connection_with_forwarded_port initiates a new session."""
+    self._make_client()
+    self.client._counter = None
+    self.client.make_connection_with_forwarded_port(12345, 54321)
+
+    self.assertEqual(self.client.host_port, 12345)
+    self.assertEqual(self.client.device_port, 54321)
+    self.assertEqual(next(self.client._counter), 0)
+    mock_create_socket_conn_func.assert_called_once_with()
+    mock_send_handshake_func.assert_called_once_with(
+        -1, snippet_client_v2.ConnectionHandshakeCommand.INIT)
+
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'send_handshake_request')
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'create_socket_connection')
+  def test_make_conn_with_forwarded_port_continue(self,
+                                                  mock_create_socket_conn_func,
+                                                  mock_send_handshake_func):
+    """Tests make_connection_with_forwarded_port continues current session."""
+    self._make_client()
+    self.client._counter = None
+    self.client.make_connection_with_forwarded_port(
+        12345, 54321, 3, snippet_client_v2.ConnectionHandshakeCommand.CONTINUE)
+
+    self.assertEqual(self.client.host_port, 12345)
+    self.assertEqual(self.client.device_port, 54321)
+    self.assertEqual(next(self.client._counter), 0)
+    mock_create_socket_conn_func.assert_called_once_with()
+    mock_send_handshake_func.assert_called_once_with(
+        3, snippet_client_v2.ConnectionHandshakeCommand.CONTINUE)
 
 
 if __name__ == '__main__':
diff --git a/tests/mobly/controllers/android_device_test.py b/tests/mobly/controllers/android_device_test.py
index 6adb8f5..4aa5304 100755
--- a/tests/mobly/controllers/android_device_test.py
+++ b/tests/mobly/controllers/android_device_test.py
@@ -601,6 +601,7 @@
       self, sanitize_filename_mock, get_log_file_timestamp_mock, MockFastboot,
       MockAdbProxy):
     mock_serial = 1
+    sanitize_filename_mock.return_value = '1'
     ad = android_device.AndroidDevice(serial=mock_serial)
     get_log_file_timestamp_mock.return_value = '07-22-2019_17-53-34-450'
     filename = ad.generate_filename('MagicLog')
@@ -1004,7 +1005,7 @@
   @mock.patch('mobly.controllers.android_device_lib.fastboot.FastbootProxy',
               return_value=mock_android_device.MockFastbootProxy('1'))
   @mock.patch(
-      'mobly.controllers.android_device_lib.snippet_client.SnippetClient')
+      'mobly.controllers.android_device_lib.snippet_client_v2.SnippetClientV2')
   @mock.patch('mobly.utils.get_available_host_port')
   def test_AndroidDevice_load_snippet(self, MockGetPort, MockSnippetClient,
                                       MockFastboot, MockAdbProxy):
@@ -1017,7 +1018,7 @@
   @mock.patch('mobly.controllers.android_device_lib.fastboot.FastbootProxy',
               return_value=mock_android_device.MockFastbootProxy('1'))
   @mock.patch(
-      'mobly.controllers.android_device_lib.snippet_client.SnippetClient')
+      'mobly.controllers.android_device_lib.snippet_client_v2.SnippetClientV2')
   @mock.patch('mobly.utils.get_available_host_port')
   def test_AndroidDevice_getattr(self, MockGetPort, MockSnippetClient,
                                  MockFastboot, MockAdbProxy):
@@ -1032,7 +1033,7 @@
   @mock.patch('mobly.controllers.android_device_lib.fastboot.FastbootProxy',
               return_value=mock_android_device.MockFastbootProxy('1'))
   @mock.patch(
-      'mobly.controllers.android_device_lib.snippet_client.SnippetClient',
+      'mobly.controllers.android_device_lib.snippet_client_v2.SnippetClientV2',
       return_value=MockSnippetClient)
   @mock.patch('mobly.utils.get_available_host_port')
   def test_AndroidDevice_load_snippet_dup_package(self, MockGetPort,
@@ -1050,7 +1051,7 @@
   @mock.patch('mobly.controllers.android_device_lib.fastboot.FastbootProxy',
               return_value=mock_android_device.MockFastbootProxy('1'))
   @mock.patch(
-      'mobly.controllers.android_device_lib.snippet_client.SnippetClient',
+      'mobly.controllers.android_device_lib.snippet_client_v2.SnippetClientV2',
       return_value=MockSnippetClient)
   @mock.patch('mobly.utils.get_available_host_port')
   def test_AndroidDevice_load_snippet_dup_snippet_name(self, MockGetPort,
@@ -1069,7 +1070,7 @@
   @mock.patch('mobly.controllers.android_device_lib.fastboot.FastbootProxy',
               return_value=mock_android_device.MockFastbootProxy('1'))
   @mock.patch(
-      'mobly.controllers.android_device_lib.snippet_client.SnippetClient')
+      'mobly.controllers.android_device_lib.snippet_client_v2.SnippetClientV2')
   @mock.patch('mobly.utils.get_available_host_port')
   def test_AndroidDevice_load_snippet_dup_attribute_name(
       self, MockGetPort, MockSnippetClient, MockFastboot, MockAdbProxy):
@@ -1084,7 +1085,7 @@
   @mock.patch('mobly.controllers.android_device_lib.fastboot.FastbootProxy',
               return_value=mock_android_device.MockFastbootProxy('1'))
   @mock.patch(
-      'mobly.controllers.android_device_lib.snippet_client.SnippetClient')
+      'mobly.controllers.android_device_lib.snippet_client_v2.SnippetClientV2')
   @mock.patch('mobly.utils.get_available_host_port')
   def test_AndroidDevice_load_snippet_start_app_fails(self, MockGetPort,
                                                       MockSnippetClient,
@@ -1092,14 +1093,13 @@
                                                       MockAdbProxy):
     """Verifies that the correct exception is raised if start app failed.
 
-    It's possible that the `stop_app` call as part of the start app failure
+    It's possible that the `stop` call as part of the start app failure
     teardown also fails. So we want the exception from the start app
     failure.
     """
     expected_e = Exception('start failed.')
-    MockSnippetClient.start_app_and_connect = mock.Mock(side_effect=expected_e)
-    MockSnippetClient.stop_app = mock.Mock(
-        side_effect=Exception('stop failed.'))
+    MockSnippetClient.initialize = mock.Mock(side_effect=expected_e)
+    MockSnippetClient.stop = mock.Mock(side_effect=Exception('stop failed.'))
     ad = android_device.AndroidDevice(serial='1')
     try:
       ad.load_snippet('snippet', MOCK_SNIPPET_PACKAGE_NAME)
@@ -1111,7 +1111,7 @@
   @mock.patch('mobly.controllers.android_device_lib.fastboot.FastbootProxy',
               return_value=mock_android_device.MockFastbootProxy('1'))
   @mock.patch(
-      'mobly.controllers.android_device_lib.snippet_client.SnippetClient')
+      'mobly.controllers.android_device_lib.snippet_client_v2.SnippetClientV2')
   @mock.patch('mobly.utils.get_available_host_port')
   def test_AndroidDevice_unload_snippet(self, MockGetPort, MockSnippetClient,
                                         MockFastboot, MockAdbProxy):
@@ -1132,7 +1132,7 @@
   @mock.patch('mobly.controllers.android_device_lib.fastboot.FastbootProxy',
               return_value=mock_android_device.MockFastbootProxy('1'))
   @mock.patch(
-      'mobly.controllers.android_device_lib.snippet_client.SnippetClient')
+      'mobly.controllers.android_device_lib.snippet_client_v2.SnippetClientV2')
   @mock.patch('mobly.utils.get_available_host_port')
   @mock.patch.object(logcat.Logcat, '_open_logcat_file')
   def test_AndroidDevice_snippet_cleanup(self, open_logcat_mock, MockGetPort,
diff --git a/tests/mobly/logger_test.py b/tests/mobly/logger_test.py
index fa514e7..e0ac14d 100755
--- a/tests/mobly/logger_test.py
+++ b/tests/mobly/logger_test.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
 import os
 import shutil
 import tempfile
@@ -56,7 +57,8 @@
                                                mock_create_latest_log_alias,
                                                mock__setup_test_logger):
     logger.setup_test_logger(self.log_dir)
-    mock__setup_test_logger.assert_called_once_with(self.log_dir, None)
+    mock__setup_test_logger.assert_called_once_with(self.log_dir, logging.INFO,
+                                                    None)
     mock_create_latest_log_alias.assert_called_once_with(self.log_dir,
                                                          alias='latest')
 
@@ -208,6 +210,33 @@
     expected_filename = 'logcat.txt_'
     self.assertEqual(logger.sanitize_filename(fake_filename), expected_filename)
 
+  def test_prefix_logger_adapter_prefix_log_lines(self):
+    extra = {
+        logger.PrefixLoggerAdapter.EXTRA_KEY_LOG_PREFIX: '[MOCK_PREFIX]',
+    }
+    adapted_logger = logger.PrefixLoggerAdapter(mock.Mock(), extra)
+
+    kwargs = mock.Mock()
+    processed_log, processed_kwargs = adapted_logger.process('mock log line',
+                                                             kwargs=kwargs)
+
+    self.assertEqual(processed_log, '[MOCK_PREFIX] mock log line')
+    self.assertIs(processed_kwargs, kwargs)
+
+  def test_prefix_logger_adapter_modify_prefix(self):
+    extra = {
+        logger.PrefixLoggerAdapter.EXTRA_KEY_LOG_PREFIX: 'MOCK_PREFIX',
+    }
+    adapted_logger = logger.PrefixLoggerAdapter(mock.Mock(), extra)
+    adapted_logger.set_log_prefix('[NEW]')
+
+    kwargs = mock.Mock()
+    processed_log, processed_kwargs = adapted_logger.process('mock log line',
+                                                             kwargs=kwargs)
+
+    self.assertEqual(processed_log, '[NEW] mock log line')
+    self.assertIs(processed_kwargs, kwargs)
+
 
 if __name__ == "__main__":
   unittest.main()
diff --git a/tests/mobly/records_test.py b/tests/mobly/records_test.py
index f3758f5..ec6254d 100755
--- a/tests/mobly/records_test.py
+++ b/tests/mobly/records_test.py
@@ -36,6 +36,19 @@
     self._something = something
 
 
+class RecordTestRecursiveError(Exception):
+  """Error class with self recursion.
+
+  Used for ExceptionRecord tests.
+  """
+
+  def __init__(self):
+    super().__init__(self)  # create a self recursion here.
+
+  def __str__(self):
+    return 'Oh ha!'
+
+
 class RecordsTest(unittest.TestCase):
   """This test class tests the implementation of classes in mobly.records.
   """
@@ -50,12 +63,19 @@
   def tearDown(self):
     shutil.rmtree(self.tmp_path)
 
-  def verify_record(self, record, result, details, extras, stacktrace=None):
+  def verify_record(self,
+                    record,
+                    result,
+                    details,
+                    extras,
+                    termination_signal_type=None,
+                    stacktrace=None):
     record.update_record()
     # Verify each field.
     self.assertEqual(record.test_name, self.tn)
     self.assertEqual(record.result, result)
     self.assertEqual(record.details, details)
+    self.assertEqual(record.termination_signal_type, termination_signal_type)
     self.assertEqual(record.extras, extras)
     self.assertTrue(record.begin_time, 'begin time should not be empty.')
     self.assertTrue(record.end_time, 'end time should not be empty.')
@@ -66,6 +86,8 @@
     d[records.TestResultEnums.RECORD_NAME] = self.tn
     d[records.TestResultEnums.RECORD_RESULT] = result
     d[records.TestResultEnums.RECORD_DETAILS] = details
+    d[records.TestResultEnums.
+      RECORD_TERMINATION_SIGNAL_TYPE] = termination_signal_type
     d[records.TestResultEnums.RECORD_EXTRAS] = extras
     d[records.TestResultEnums.RECORD_BEGIN_TIME] = record.begin_time
     d[records.TestResultEnums.RECORD_END_TIME] = record.end_time
@@ -88,7 +110,7 @@
     self.assertTrue(str(record), 'str of the record should not be empty.')
     self.assertTrue(repr(record), "the record's repr shouldn't be empty.")
 
-  def test_result_record_pass_none(self):
+  def test_result_record_implicit_pass(self):
     record = records.TestResultRecord(self.tn)
     record.test_begin()
     record.test_pass()
@@ -97,7 +119,7 @@
                        details=None,
                        extras=None)
 
-  def test_result_record_pass_with_float_extra(self):
+  def test_result_record_explicit_pass_with_float_extra(self):
     record = records.TestResultRecord(self.tn)
     record.test_begin()
     s = signals.TestPass(self.details, self.float_extra)
@@ -105,9 +127,10 @@
     self.verify_record(record=record,
                        result=records.TestResultEnums.TEST_RESULT_PASS,
                        details=self.details,
+                       termination_signal_type='TestPass',
                        extras=self.float_extra)
 
-  def test_result_record_pass_with_json_extra(self):
+  def test_result_record_explicit_pass_with_json_extra(self):
     record = records.TestResultRecord(self.tn)
     record.test_begin()
     s = signals.TestPass(self.details, self.json_extra)
@@ -115,9 +138,11 @@
     self.verify_record(record=record,
                        result=records.TestResultEnums.TEST_RESULT_PASS,
                        details=self.details,
+                       termination_signal_type='TestPass',
                        extras=self.json_extra)
 
   def test_result_record_fail_none(self):
+    """Verifies that `test_fail` can be called without an error object."""
     record = records.TestResultRecord(self.tn)
     record.test_begin()
     record.test_fail()
@@ -139,6 +164,7 @@
     self.verify_record(record=record,
                        result=records.TestResultEnums.TEST_RESULT_FAIL,
                        details='Something failed.',
+                       termination_signal_type='Exception',
                        extras=None,
                        stacktrace='in test_result_record_fail_stacktrace\n    '
                        'raise Exception(\'Something failed.\')\nException: '
@@ -152,6 +178,7 @@
     self.verify_record(record=record,
                        result=records.TestResultEnums.TEST_RESULT_FAIL,
                        details=self.details,
+                       termination_signal_type='TestFailure',
                        extras=self.float_extra)
 
   def test_result_record_fail_with_unicode_test_signal(self):
@@ -163,6 +190,7 @@
     self.verify_record(record=record,
                        result=records.TestResultEnums.TEST_RESULT_FAIL,
                        details=details,
+                       termination_signal_type='TestFailure',
                        extras=self.float_extra)
 
   def test_result_record_fail_with_unicode_exception(self):
@@ -174,6 +202,7 @@
     self.verify_record(record=record,
                        result=records.TestResultEnums.TEST_RESULT_FAIL,
                        details=details,
+                       termination_signal_type='Exception',
                        extras=None)
 
   def test_result_record_fail_with_json_extra(self):
@@ -184,6 +213,7 @@
     self.verify_record(record=record,
                        result=records.TestResultEnums.TEST_RESULT_FAIL,
                        details=self.details,
+                       termination_signal_type='TestFailure',
                        extras=self.json_extra)
 
   def test_result_record_skip_none(self):
@@ -203,6 +233,7 @@
     self.verify_record(record=record,
                        result=records.TestResultEnums.TEST_RESULT_SKIP,
                        details=self.details,
+                       termination_signal_type='TestSkip',
                        extras=self.float_extra)
 
   def test_result_record_skip_with_json_extra(self):
@@ -213,6 +244,7 @@
     self.verify_record(record=record,
                        result=records.TestResultEnums.TEST_RESULT_SKIP,
                        details=self.details,
+                       termination_signal_type='TestSkip',
                        extras=self.json_extra)
 
   def test_result_add_operator_success(self):
@@ -406,6 +438,20 @@
     new_er = copy.deepcopy(er)
     self.assertIsNot(er, new_er)
     self.assertDictEqual(er.to_dict(), new_er.to_dict())
+    self.assertEqual(er.type, 'RecordTestError')
+
+  def test_recursive_exception_record_deepcopy(self):
+    """Makes sure ExceptionRecord wrapper handles deep copy properly in case of
+    recursive exception.
+    """
+    try:
+      raise RecordTestRecursiveError()
+    except RecordTestRecursiveError as e:
+      er = records.ExceptionRecord(e)
+    new_er = copy.deepcopy(er)
+    self.assertIsNot(er, new_er)
+    self.assertDictEqual(er.to_dict(), new_er.to_dict())
+    self.assertEqual(er.type, 'RecordTestRecursiveError')
 
   def test_add_controller_info_record(self):
     tr = records.TestResult()
diff --git a/tests/mobly/snippet/callback_event_test.py b/tests/mobly/snippet/callback_event_test.py
new file mode 100755
index 0000000..2593cc3
--- /dev/null
+++ b/tests/mobly/snippet/callback_event_test.py
@@ -0,0 +1,40 @@
+# Copyright 2022 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Unit tests for mobly.snippet.callback_event.CallbackEvent."""
+
+import unittest
+
+from mobly.snippet import callback_event
+
+MOCK_CALLBACK_ID = 'myCallbackId'
+MOCK_EVENT_NAME = 'onXyzEvent'
+MOCK_CREATION_TIME = '12345678'
+MOCK_DATA = {'foo': 'bar'}
+
+
+class CallbackEventTest(unittest.TestCase):
+  """Unit tests for mobly.snippet.callback_event.CallbackEvent."""
+
+  def test_basic(self):
+    """Verifies that an event object can be created and logged properly."""
+    event = callback_event.CallbackEvent(MOCK_CALLBACK_ID, MOCK_EVENT_NAME,
+                                         MOCK_CREATION_TIME, MOCK_DATA)
+    self.assertEqual(
+        repr(event),
+        "CallbackEvent(callback_id: myCallbackId, name: onXyzEvent, "
+        "creation_time: 12345678, data: {'foo': 'bar'})")
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/tests/mobly/snippet/callback_handler_base_test.py b/tests/mobly/snippet/callback_handler_base_test.py
new file mode 100644
index 0000000..0891fd5
--- /dev/null
+++ b/tests/mobly/snippet/callback_handler_base_test.py
@@ -0,0 +1,183 @@
+# Copyright 2022 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Unit tests for mobly.snippet.callback_handler_base.CallbackHandlerBase."""
+
+import unittest
+from unittest import mock
+
+from mobly.snippet import callback_event
+from mobly.snippet import callback_handler_base
+from mobly.snippet import errors
+
+MOCK_CALLBACK_ID = '2-1'
+MOCK_RAW_EVENT = {
+    'callbackId': '2-1',
+    'name': 'AsyncTaskResult',
+    'time': 20460228696,
+    'data': {
+        'exampleData': "Here's a simple event.",
+        'successful': True,
+        'secretNumber': 12
+    }
+}
+
+
+class FakeCallbackHandler(callback_handler_base.CallbackHandlerBase):
+  """Fake client class for unit tests."""
+
+  def __init__(self,
+               callback_id=None,
+               event_client=None,
+               ret_value=None,
+               method_name=None,
+               device=None,
+               rpc_max_timeout_sec=120,
+               default_timeout_sec=120):
+    """Initializes a fake callback handler object used for unit tests."""
+    super().__init__(callback_id, event_client, ret_value, method_name, device,
+                     rpc_max_timeout_sec, default_timeout_sec)
+    self.mock_rpc_func = mock.Mock()
+
+  def callEventWaitAndGetRpc(self, *args, **kwargs):
+    """See base class."""
+    return self.mock_rpc_func.callEventWaitAndGetRpc(*args, **kwargs)
+
+  def callEventGetAllRpc(self, *args, **kwargs):
+    """See base class."""
+    return self.mock_rpc_func.callEventGetAllRpc(*args, **kwargs)
+
+
+class CallbackHandlerBaseTest(unittest.TestCase):
+  """Unit tests for mobly.snippet.callback_handler_base.CallbackHandlerBase."""
+
+  def assert_event_correct(self, actual_event, expected_raw_event_dict):
+    expected_event = callback_event.from_dict(expected_raw_event_dict)
+    self.assertEqual(str(actual_event), str(expected_event))
+
+  def test_default_timeout_too_large(self):
+    err_msg = ('The max timeout of a single RPC must be no smaller than '
+               'the default timeout of the callback handler. '
+               'Got rpc_max_timeout_sec=10, default_timeout_sec=20.')
+    with self.assertRaisesRegex(ValueError, err_msg):
+      _ = FakeCallbackHandler(rpc_max_timeout_sec=10, default_timeout_sec=20)
+
+  def test_timeout_property(self):
+    handler = FakeCallbackHandler(rpc_max_timeout_sec=20,
+                                  default_timeout_sec=10)
+    self.assertEqual(handler.rpc_max_timeout_sec, 20)
+    self.assertEqual(handler.default_timeout_sec, 10)
+    with self.assertRaises(AttributeError):
+      handler.rpc_max_timeout_sec = 5
+
+    with self.assertRaises(AttributeError):
+      handler.default_timeout_sec = 5
+
+  def test_callback_id_property(self):
+    handler = FakeCallbackHandler(callback_id=MOCK_CALLBACK_ID)
+    self.assertEqual(handler.callback_id, MOCK_CALLBACK_ID)
+    with self.assertRaises(AttributeError):
+      handler.callback_id = 'ha'
+
+  def test_event_dict_to_snippet_event(self):
+    handler = FakeCallbackHandler(callback_id=MOCK_CALLBACK_ID)
+    handler.mock_rpc_func.callEventWaitAndGetRpc = mock.Mock(
+        return_value=MOCK_RAW_EVENT)
+
+    event = handler.waitAndGet('ha', timeout=10)
+    self.assert_event_correct(event, MOCK_RAW_EVENT)
+    handler.mock_rpc_func.callEventWaitAndGetRpc.assert_called_once_with(
+        MOCK_CALLBACK_ID, 'ha', 10)
+
+  def test_wait_and_get_timeout_default(self):
+    handler = FakeCallbackHandler(rpc_max_timeout_sec=20, default_timeout_sec=5)
+    handler.mock_rpc_func.callEventWaitAndGetRpc = mock.Mock(
+        return_value=MOCK_RAW_EVENT)
+    _ = handler.waitAndGet('ha')
+    handler.mock_rpc_func.callEventWaitAndGetRpc.assert_called_once_with(
+        mock.ANY, mock.ANY, 5)
+
+  def test_wait_and_get_timeout_ecxeed_threshold(self):
+    rpc_max_timeout_sec = 5
+    big_timeout_sec = 10
+    handler = FakeCallbackHandler(rpc_max_timeout_sec=rpc_max_timeout_sec,
+                                  default_timeout_sec=rpc_max_timeout_sec)
+    handler.mock_rpc_func.callEventWaitAndGetRpc = mock.Mock(
+        return_value=MOCK_RAW_EVENT)
+
+    expected_msg = (
+        f'Specified timeout {big_timeout_sec} is longer than max timeout '
+        f'{rpc_max_timeout_sec}.')
+    with self.assertRaisesRegex(errors.CallbackHandlerBaseError, expected_msg):
+      handler.waitAndGet('ha', big_timeout_sec)
+
+  def test_wait_for_event(self):
+    handler = FakeCallbackHandler()
+    handler.mock_rpc_func.callEventWaitAndGetRpc = mock.Mock(
+        return_value=MOCK_RAW_EVENT)
+
+    def some_condition(event):
+      return event.data['successful']
+
+    event = handler.waitForEvent('AsyncTaskResult', some_condition, 0.01)
+    self.assert_event_correct(event, MOCK_RAW_EVENT)
+
+  def test_wait_for_event_negative(self):
+    handler = FakeCallbackHandler()
+    handler.mock_rpc_func.callEventWaitAndGetRpc = mock.Mock(
+        return_value=MOCK_RAW_EVENT)
+
+    expected_msg = (
+        'Timed out after 0.01s waiting for an "AsyncTaskResult" event that'
+        ' satisfies the predicate "some_condition".')
+
+    def some_condition(_):
+      return False
+
+    with self.assertRaisesRegex(errors.CallbackHandlerTimeoutError,
+                                expected_msg):
+      handler.waitForEvent('AsyncTaskResult', some_condition, 0.01)
+
+  def test_wait_for_event_max_timeout(self):
+    """waitForEvent should not raise the timeout exceed threshold error."""
+    rpc_max_timeout_sec = 5
+    big_timeout_sec = 10
+    handler = FakeCallbackHandler(rpc_max_timeout_sec=rpc_max_timeout_sec,
+                                  default_timeout_sec=rpc_max_timeout_sec)
+    handler.mock_rpc_func.callEventWaitAndGetRpc = mock.Mock(
+        return_value=MOCK_RAW_EVENT)
+
+    def some_condition(event):
+      return event.data['successful']
+
+    # This line should not raise.
+    event = handler.waitForEvent('AsyncTaskResult',
+                                 some_condition,
+                                 timeout=big_timeout_sec)
+    self.assert_event_correct(event, MOCK_RAW_EVENT)
+
+  def test_get_all(self):
+    handler = FakeCallbackHandler(callback_id=MOCK_CALLBACK_ID)
+    handler.mock_rpc_func.callEventGetAllRpc = mock.Mock(
+        return_value=[MOCK_RAW_EVENT, MOCK_RAW_EVENT])
+
+    all_events = handler.getAll('ha')
+    for event in all_events:
+      self.assert_event_correct(event, MOCK_RAW_EVENT)
+
+    handler.mock_rpc_func.callEventGetAllRpc.assert_called_once_with(
+        MOCK_CALLBACK_ID, 'ha')
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/tests/mobly/suite_runner_test.py b/tests/mobly/suite_runner_test.py
index 7297236..dabf74f 100755
--- a/tests/mobly/suite_runner_test.py
+++ b/tests/mobly/suite_runner_test.py
@@ -12,18 +12,28 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import inspect
 import io
 import os
 import shutil
+import sys
 import tempfile
 import unittest
 from unittest import mock
 
+from mobly import base_suite
+from mobly import base_test
+from mobly import config_parser
+from mobly import test_runner
 from mobly import suite_runner
 from tests.lib import integration2_test
 from tests.lib import integration_test
 
 
+class FakeTest1(base_test.BaseTestClass):
+  pass
+
+
 class SuiteRunnerTest(unittest.TestCase):
 
   def setUp(self):
@@ -108,6 +118,61 @@
                            argv=['-c', tmp_file_path])
     mock_exit.assert_called_once_with(1)
 
+  @mock.patch('sys.exit')
+  @mock.patch.object(suite_runner, '_find_suite_class', autospec=True)
+  def test_run_suite_class(self, mock_find_suite_class, mock_exit):
+    mock_called = mock.MagicMock()
+
+    class FakeTestSuite(base_suite.BaseSuite):
+
+      def setup_suite(self, config):
+        mock_called.setup_suite()
+        super().setup_suite(config)
+        self.add_test_class(FakeTest1)
+
+      def teardown_suite(self):
+        mock_called.teardown_suite()
+        super().teardown_suite()
+
+    mock_find_suite_class.return_value = FakeTestSuite
+
+    tmp_file_path = os.path.join(self.tmp_dir, 'config.yml')
+    with io.open(tmp_file_path, 'w', encoding='utf-8') as f:
+      f.write(u"""
+        TestBeds:
+          # A test bed where adb will find Android devices.
+          - Name: SampleTestBed
+            Controllers:
+              MagicDevice: '*'
+      """)
+
+    mock_cli_args = ['test_binary', f'--config={tmp_file_path}']
+
+    with mock.patch.object(sys, 'argv', new=mock_cli_args):
+      suite_runner.run_suite_class()
+
+    mock_find_suite_class.assert_called_once()
+    mock_called.setup_suite.assert_called_once_with()
+    mock_called.teardown_suite.assert_called_once_with()
+    mock_exit.assert_not_called()
+
+  def test_print_test_names(self):
+    mock_test_class = mock.MagicMock()
+    mock_cls_instance = mock.MagicMock()
+    mock_test_class.return_value = mock_cls_instance
+    suite_runner._print_test_names([mock_test_class])
+    mock_cls_instance._pre_run.assert_called_once()
+    mock_cls_instance._clean_up.assert_called_once()
+
+  def test_print_test_names_with_exception(self):
+    mock_test_class = mock.MagicMock()
+    mock_cls_instance = mock.MagicMock()
+    mock_test_class.return_value = mock_cls_instance
+    suite_runner._print_test_names([mock_test_class])
+    mock_cls_instance._pre_run.side_effect = Exception(
+        'Something went wrong.')
+    mock_cls_instance._clean_up.assert_called_once()
+
 
 if __name__ == "__main__":
   unittest.main()
diff --git a/tests/mobly/test_runner_test.py b/tests/mobly/test_runner_test.py
index 0339c35..efddc4c 100755
--- a/tests/mobly/test_runner_test.py
+++ b/tests/mobly/test_runner_test.py
@@ -17,6 +17,7 @@
 import os
 import re
 import shutil
+import sys
 import tempfile
 import unittest
 from unittest import mock
@@ -25,12 +26,14 @@
 from mobly import records
 from mobly import signals
 from mobly import test_runner
+from mobly import utils
 from tests.lib import mock_android_device
 from tests.lib import mock_controller
 from tests.lib import integration_test
 from tests.lib import integration2_test
 from tests.lib import integration3_test
 from tests.lib import multiple_subclasses_module
+from tests.lib import terminated_test
 import yaml
 
 
@@ -265,6 +268,23 @@
     self.assertEqual(results['Passed'], 0)
     self.assertEqual(results['Failed'], 0)
 
+  def test_run_when_terminated(self):
+    mock_test_config = self.base_mock_test_config.copy()
+    tr = test_runner.TestRunner(self.log_dir, self.testbed_name)
+    tr.add_test_class(mock_test_config, terminated_test.TerminatedTest)
+
+    with self.assertRaises(signals.TestAbortAll):
+      with self.assertLogs(level=logging.WARNING) as log_output:
+        # Set handler log level due to bug in assertLogs.
+        # https://github.com/python/cpython/issues/86109
+        logging.getLogger().handlers[0].setLevel(logging.WARNING)
+        tr.run()
+
+    self.assertIn('Test received a SIGTERM. Aborting all tests.',
+                  log_output.output[0])
+    self.assertIn('Abort all subsequent test classes', log_output.output[1])
+    self.assertIn('Test received a SIGTERM.', log_output.output[1])
+
   def test_add_test_class_mismatched_log_path(self):
     tr = test_runner.TestRunner('/different/log/dir', self.testbed_name)
     with self.assertRaisesRegex(
diff --git a/tests/mobly/utils_test.py b/tests/mobly/utils_test.py
index 16204c9..7e95718 100755
--- a/tests/mobly/utils_test.py
+++ b/tests/mobly/utils_test.py
@@ -223,7 +223,7 @@
     self.assertEqual(ret, 0)
 
   def test_run_command_with_timeout_expired(self):
-    with self.assertRaises(subprocess.TimeoutExpired):
+    with self.assertRaisesRegex(subprocess.TimeoutExpired, 'sleep'):
       _ = utils.run_command(self.sleep_cmd(4), timeout=0.01)
 
   @mock.patch('threading.Timer')