Merge "Migrate Test Targets to New Android Ownership Model" into main
diff --git a/Android.bp b/Android.bp
index 9aaf7f0..5525a0c 100644
--- a/Android.bp
+++ b/Android.bp
@@ -37,3 +37,15 @@
         "mobly_device_flags",
     ],
 }
+
+// Tool to upload Mobly test results to Resultstore.
+// If releasing as new version "x.y", rename the generated zip file to
+// "results_uploader_x_y.zip".
+genrule {
+    name: "mobly_results_uploader",
+    tools: ["soong_zip"],
+    srcs: ["tools/results_uploader/**/*"],
+    cmd: "RESULTS_UPLOADER=tools/test/mobly_extensions/tools/results_uploader;" +
+        "$(location soong_zip) -o $(out) -C $${RESULTS_UPLOADER} -D $${RESULTS_UPLOADER};",
+    out: ["results_uploader.zip"],
+}
diff --git a/tools/device_flags.py b/tools/device_flags.py
index b6fbe80..22ba87d 100644
--- a/tools/device_flags.py
+++ b/tools/device_flags.py
@@ -18,7 +18,7 @@
 
 import os
 import tempfile
-from typing import Any, Dict, Optional
+from typing import Any
 
 from mobly.controllers import android_device
 from protos import aconfig_pb2
@@ -27,10 +27,14 @@
 _ACONFIG_PB_FILE = 'aconfig_flags.pb'
 
 _DEVICE_CONFIG_GET_CMD = 'device_config get'
+_DEVICE_CONFIG_PUT_CMD = 'device_config put'
 
 _READ_ONLY = aconfig_pb2.flag_permission.READ_ONLY
 _ENABLED = aconfig_pb2.flag_state.ENABLED
 
+_VAL_TRUE = 'true'
+_VAL_FALSE = 'false'
+
 
 class DeviceFlags:
     """Provides access to aconfig and device_config flag values of a device."""
@@ -39,10 +43,10 @@
         self._ad = ad
         self._aconfig_flags = None
 
-    def get_value(self, namespace: str, name: str) -> Optional[str]:
+    def get_value(self, namespace: str, key: str) -> str | None:
         """Gets the value of the requested flag.
 
-        Flags must be specified by both its namespace and name.
+        Flags must be specified by both its namespace and key.
 
         The method will first look for the flag from the device's
         aconfig_flags.pb files, and, if not found or the flag is READ_WRITE,
@@ -52,38 +56,39 @@
 
         Args:
             namespace: The namespace of the flag.
-            name: The full name of the flag.
-                For aconfig flags, it is equivalent to '{package}.{name}' from
-                    the aconfig proto.
-                For device_config flags, it is equivalent to '{KEY}' from the
-                    "device_config shell" command.
+            key: The full name of the flag. For aconfig flags, it is equivalent
+              to '{package}.{name}' from the aconfig proto. For device_config
+              flags, it is equivalent to '{KEY}' from the `device_config get`
+              command.
 
         Returns:
             The flag value as a string.
         """
         # Check aconfig
         aconfig_val = None
-        aconfig_flag = self._get_aconfig_flags().get(
-            '%s/%s' % (namespace, name))
+        aconfig_flag = self._get_aconfig_flags().get(f'{namespace}/{key}')
         if aconfig_flag is not None:
-            aconfig_val = 'true' if aconfig_flag.state == _ENABLED else 'false'
+            aconfig_val = (
+                _VAL_TRUE if aconfig_flag.state == _ENABLED else _VAL_FALSE)
             if aconfig_flag.permission == _READ_ONLY:
                 return aconfig_val
 
         # If missing or READ_WRITE, also check device_config
-        device_config_val = self._ad.adb.shell(
-            '%s %s %s' % (_DEVICE_CONFIG_GET_CMD, namespace, name)
-        ).decode('utf8').strip()
+        device_config_val = (
+            self._ad.adb.shell(f'{_DEVICE_CONFIG_GET_CMD} {namespace} {key}')
+            .decode('utf8')
+            .strip()
+        )
         return device_config_val if device_config_val != 'null' else aconfig_val
 
-    def get_bool(self, namespace: str, name: str) -> bool:
+    def get_bool(self, namespace: str, key: str) -> bool:
         """Gets the value of the requested flag as a boolean.
 
         See get_value() for details.
 
         Args:
             namespace: The namespace of the flag.
-            name: The key of the flag.
+            key: The key of the flag.
 
         Returns:
             The flag value as a boolean.
@@ -91,33 +96,84 @@
         Raises:
             ValueError if the flag value cannot be expressed as a boolean.
         """
-        val = self.get_value(namespace, name)
-        if val.lower() == 'true':
-            return True
-        if val.lower() == 'false':
-            return False
-        raise ValueError('Flag %s/%s is not a boolean (value: %s).'
-                         % (namespace, name, val))
+        val = self.get_value(namespace, key)
+        if val is not None:
+            if val.lower() == _VAL_TRUE:
+                return True
+            if val.lower() == _VAL_FALSE:
+                return False
+        raise ValueError(
+            f'Flag {namespace}/{key} is not a boolean (value: {val}).')
 
-    def _get_aconfig_flags(self) -> Dict[str, Any]:
+    def _get_aconfig_flags(self) -> dict[str, Any]:
         """Gets the aconfig flags as a dict. Loads from proto if necessary."""
         if self._aconfig_flags is None:
-            self._load_aconfig_flags()
+            self._aconfig_flags = self._load_aconfig_flags()
         return self._aconfig_flags
 
-    def _load_aconfig_flags(self) -> None:
+    def _load_aconfig_flags(self) -> dict[str, Any]:
         """Pull aconfig proto files from device, then load the flag info."""
-        self._aconfig_flags = {}
+        aconfig_flags = {}
         with tempfile.TemporaryDirectory() as tmp_dir:
             for partition in _ACONFIG_PARTITIONS:
                 device_path = os.path.join(
                     '/', partition, 'etc', _ACONFIG_PB_FILE)
                 host_path = os.path.join(
-                    tmp_dir, '%s_%s' % (partition, _ACONFIG_PB_FILE))
+                    tmp_dir, f'{partition}_{_ACONFIG_PB_FILE}')
                 self._ad.adb.pull([device_path, host_path])
                 with open(host_path, 'rb') as f:
                     parsed_flags = aconfig_pb2.parsed_flags.FromString(f.read())
                 for flag in parsed_flags.parsed_flag:
-                    full_name = '%s/%s.%s' % (
-                        flag.namespace, flag.package, flag.name)
-                    self._aconfig_flags[full_name] = flag
+                    full_name = f'{flag.namespace}/{flag.package}.{flag.name}'
+                    aconfig_flags[full_name] = flag
+        return aconfig_flags
+
+    def set_value(self, namespace: str, key: str, val: str) -> None:
+        """Sets the value of the requested flag.
+
+        This only supports flags that are set via `adb device_config`.
+
+        Args:
+            namespace: The namespace of the flag.
+            key: The key of the flag.
+            val: The desired value of the flag, in string format.
+        """
+        self._ad.adb.shell(f'{_DEVICE_CONFIG_PUT_CMD} {namespace} {key} {val}')
+
+    def enable(self, namespace: str, key: str) -> None:
+        """Enables the requested flag.
+
+        This only supports flags that are set via `adb device_config`.
+
+        Args:
+            namespace: The namespace of the flag.
+            key: The key of the flag.
+
+        Raises:
+            ValueError if the original flag value cannot be expressed as a
+              boolean.
+        """
+        # If the original value of the flag is not boolean, this will raise a
+        # ValueError.
+        _ = self.get_bool(namespace, key)
+
+        self.set_value(namespace, key, _VAL_TRUE)
+
+    def disable(self, namespace: str, key: str) -> None:
+        """Disables the requested flag.
+
+        This only supports flags that are set via `adb device_config`.
+
+        Args:
+            namespace: The namespace of the flag.
+            key: The key of the flag.
+
+        Raises:
+            ValueError if the original flag value cannot be expressed as a
+              boolean.
+        """
+        # If the original value of the flag is not boolean, this will raise a
+        # ValueError.
+        _ = self.get_bool(namespace, key)
+
+        self.set_value(namespace, key, _VAL_FALSE)
diff --git a/tools/device_flags_test.py b/tools/device_flags_test.py
index 40405a3..3efdf88 100644
--- a/tools/device_flags_test.py
+++ b/tools/device_flags_test.py
@@ -25,53 +25,96 @@
     """Unit tests for DeviceFlags."""
 
     def setUp(self) -> None:
+        super().setUp()
         self.ad = mock.MagicMock()
         self.device_flags = device_flags.DeviceFlags(self.ad)
         self.device_flags._aconfig_flags = {}
 
     def test_get_value_aconfig_flag_missing_use_device_config(self) -> None:
         self.ad.adb.shell.return_value = b'foo'
-        self.assertEqual(self.device_flags.get_value('sample', 'flag'), 'foo')
+
+        value = self.device_flags.get_value('sample', 'flag')
+
+        self.assertEqual(value, 'foo')
 
     def test_get_value_aconfig_flag_read_write_use_device_config(self) -> None:
         sample_flag = aconfig_pb2.parsed_flag()
         sample_flag.state = aconfig_pb2.flag_state.ENABLED
         sample_flag.permission = aconfig_pb2.flag_permission.READ_WRITE
         self.device_flags._aconfig_flags['sample/flag'] = sample_flag
-
         self.ad.adb.shell.return_value = b'false'
-        self.assertEqual(self.device_flags.get_value('sample', 'flag'), 'false')
+
+        value = self.device_flags.get_value('sample', 'flag')
+
+        self.assertEqual(value, 'false')
 
     def test_get_value_aconfig_flag_read_only_use_aconfig(self) -> None:
         sample_flag = aconfig_pb2.parsed_flag()
         sample_flag.state = aconfig_pb2.flag_state.ENABLED
         sample_flag.permission = aconfig_pb2.flag_permission.READ_ONLY
         self.device_flags._aconfig_flags['sample/flag'] = sample_flag
-
         self.ad.adb.shell.return_value = b'false'
-        self.assertEqual(self.device_flags.get_value('sample', 'flag'), 'true')
+
+        value = self.device_flags.get_value('sample', 'flag')
+
+        self.assertEqual(value, 'true')
 
     def test_get_value_device_config_null_use_aconfig(self) -> None:
         sample_flag = aconfig_pb2.parsed_flag()
         sample_flag.state = aconfig_pb2.flag_state.ENABLED
         sample_flag.permission = aconfig_pb2.flag_permission.READ_WRITE
         self.device_flags._aconfig_flags['sample/flag'] = sample_flag
-
         self.ad.adb.shell.return_value = b'null'
-        self.assertEqual(self.device_flags.get_value('sample', 'flag'), 'true')
 
-    def test_get_bool_with_valid_bool_value(self) -> None:
+        value = self.device_flags.get_value('sample', 'flag')
+
+        self.assertEqual(value, 'true')
+
+    def test_get_bool_with_valid_bool_value_true(self) -> None:
         self.ad.adb.shell.return_value = b'true'
-        self.assertTrue(self.device_flags.get_bool('sample', 'flag'))
 
+        value = self.device_flags.get_bool('sample', 'flag')
+
+        self.assertTrue(value)
+
+    def test_get_bool_with_valid_bool_value_false(self) -> None:
         self.ad.adb.shell.return_value = b'false'
-        self.assertFalse(self.device_flags.get_bool('sample', 'flag'))
+
+        value = self.device_flags.get_bool('sample', 'flag')
+
+        self.assertFalse(value)
 
     def test_get_bool_with_invalid_bool_value(self) -> None:
         self.ad.adb.shell.return_value = b'foo'
+
         with self.assertRaisesRegex(ValueError, 'not a boolean'):
             self.device_flags.get_bool('sample', 'flag')
 
+    def test_set_value_runs_correct_command(self) -> None:
+        self.device_flags.set_value('sample', 'flag', 'value')
+
+        self.ad.adb.shell.assert_called_with('device_config put sample flag value')
+
+    def test_enable_runs_correct_command(self) -> None:
+        self.ad.adb.shell.return_value = b'true'
+
+        self.device_flags.enable('sample', 'flag')
+
+        self.ad.adb.shell.assert_called_with('device_config put sample flag true')
+
+    def test_disable_runs_correct_command(self) -> None:
+        self.ad.adb.shell.return_value = b'true'
+
+        self.device_flags.disable('sample', 'flag')
+
+        self.ad.adb.shell.assert_called_with('device_config put sample flag false')
+
+    def test_disable_fails_with_non_boolean_original_value(self) -> None:
+        self.ad.adb.shell.return_value = b'foo'
+
+        with self.assertRaisesRegex(ValueError, 'not a boolean'):
+            self.device_flags.disable('sample', 'flag')
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/tools/results_uploader/CHANGELOG.md b/tools/results_uploader/CHANGELOG.md
new file mode 100644
index 0000000..1c9d2d0
--- /dev/null
+++ b/tools/results_uploader/CHANGELOG.md
@@ -0,0 +1,20 @@
+# Mobly Results Uploader release history
+
+## 0.2
+
+### Fixes
+* Properly URL-encode the target resource name.
+* Report targets with all skipped test cases as `skipped`.
+* Update Resultstore UI link from source.cloud to BTX.
+* Suppress warnings from imported modules.
+
+
+## 0.1
+
+### New
+
+* Add the `results_uploader` tool for uploading Mobly test results to the
+  Resultstore service.
+  * Uploads local test logs to a user-provided Google Cloud Storage location.
+  * Creates a new test invocation record via Resultstore API.
+  * Generates a web link to visualize results.
diff --git a/tools/results_uploader/README.md b/tools/results_uploader/README.md
new file mode 100644
index 0000000..f8061bf
--- /dev/null
+++ b/tools/results_uploader/README.md
@@ -0,0 +1,81 @@
+# Mobly Results Uploader
+
+The Results Uploader is a tool for generating shareable UI links for automated
+test results.
+
+It uploads test-generated files to Google Cloud Storage, and presents the
+results in an organized way on a dedicated web UI. The result URL can then be
+shared to anyone who is given access (including both Google and non-Google
+accounts), allowing for easy tracking and debugging.
+
+## First-time setup
+
+### Requirements
+* Python 3.11 or above
+
+### Instructions
+
+To start using the Results Uploader, you need to be able to access the shared
+Google Cloud Storage bucket:
+1. Confirm/request access to the shared GCP project with your Google contact.
+   The Googler will give you both a project name and storage bucket name to use.
+2. Install the gcloud CLI from https://cloud.google.com/sdk/docs/install
+    * If installation fails with the above method, try the alternative linked
+      [here](https://cloud.google.com/sdk/docs/downloads-versioned-archives#installation_instructions).
+3. Run the following commands in the terminal:
+    ```bash
+    gcloud auth login
+    gcloud auth application-default login
+    gcloud config set project <gcp_project>
+    gcloud auth application-default set-quota-project <gcp_project>
+    ```
+    * When prompted to log in on your browser, follow the instruction to log in
+      to Cloud SDK. Use the same account for which you requested access in
+      step 1.
+4. Download the provided `results_uploader.zip` and extract its files to a local
+   directory.
+
+## How to upload results
+1. Create a new terminal and run the following installation commands (first-time
+   only).
+
+   **Note**: `<results_uploader_dir>` must contain path separators, so it's not
+   confused with a package from PyPI. For example, if the unzipped directory is
+   `results_uploader`, specify it as `results_uploader/` or
+   `./results_uploader`.
+
+    ```bash
+    # on Linux
+
+    python3 -m venv venv
+    source venv/bin/activate
+    python3 -m pip install <results_uploader_dir>
+    ```
+    ```cmd
+    :: on Windows
+
+    python -m venv venv
+    venv\Scripts\activate
+    python -m pip install <results_uploader_dir>
+    ```
+
+2. At the end of a completed test run, you'll see the final lines on the console
+   output as follows. Record the folder path in the line starting with
+   "Artifacts are saved in".
+
+    ```
+    Total time elapsed 961.7551812920001s
+    Artifacts are saved in "/tmp/logs/mobly/Local5GTestbed/10-23-2023_10-30-50-685"
+    Test summary saved in "/tmp/logs/mobly/Local5GTestbed/10-23-2023_10-30-50-685/test_summary.yaml"
+    Test results: Error 0, Executed 1, Failed 0, Passed 1, Requested 0, Skipped 0
+    ```
+
+3. Run the uploader command, setting the `artifacts_folder` as the path recorded
+   in the previous step.
+    ```bash
+    results_uploader --mobly_dir=<artifacts_folder> --gcs_bucket=<cloud_storage_bucket>
+    ```
+
+4. If successful, at the end of the upload process you will get a link beginning
+   with http://btx.cloud.google.com. Simply share this link to others who
+   wish to view your test results.
diff --git a/tools/results_uploader/pyproject.toml b/tools/results_uploader/pyproject.toml
new file mode 100644
index 0000000..cb5e4a9
--- /dev/null
+++ b/tools/results_uploader/pyproject.toml
@@ -0,0 +1,23 @@
+[build-system]
+requires = ["setuptools"]
+build-backend = "setuptools.build_meta"
+
+[project]
+name = "results_uploader"
+version = "0.2"
+description = "Tool for uploading Mobly test results to Resultstore web UI."
+readme = "README.md"
+requires-python = ">=3.11"
+dependencies = [
+  "google-api-python-client",
+  "google-auth",
+  "google-auth-httplib2",
+  "google-cloud",
+  "google-cloud-storage",
+  "httplib2",
+  "mobly",
+  "pyyaml",
+]
+
+[project.scripts]
+results_uploader = "results_uploader:main"
diff --git a/tools/results_uploader/src/__init__.py b/tools/results_uploader/src/__init__.py
new file mode 100644
index 0000000..eccc7ff
--- /dev/null
+++ b/tools/results_uploader/src/__init__.py
@@ -0,0 +1,15 @@
+#!/usr/bin/env python3
+
+#  Copyright (C) 2024 The Android Open Source Project
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
diff --git a/tools/results_uploader/src/mobly_result_converter.py b/tools/results_uploader/src/mobly_result_converter.py
new file mode 100644
index 0000000..9348add
--- /dev/null
+++ b/tools/results_uploader/src/mobly_result_converter.py
@@ -0,0 +1,796 @@
+#!/usr/bin/env python3
+
+#  Copyright (C) 2024 The Android Open Source Project
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+"""A converter for Mobly result schema to Resultstore schema.
+
+Each Mobly test class maps to a Resultstore testsuite and each Mobly test method
+maps to a Resultstore testcase. For example:
+
+  Mobly schema:
+
+  Test Class: HelloWorldTest
+  Test Name: test_hello
+  Type: Record
+  Result: PASS
+
+  Resultstore schema:
+
+  <testsuite name="HelloWorldTest" tests=1>
+    <testcase name="test_hello"/>
+  </testsuite>
+"""
+
+import dataclasses
+import datetime
+import enum
+import glob
+import logging
+import os
+import pathlib
+import re
+from typing import Any, Dict, Iterator, List, Mapping, Optional
+from xml.etree import ElementTree
+
+from mobly import records
+import yaml
+
+_MOBLY_RECORD_TYPE_KEY = 'Type'
+
+_MOBLY_TEST_SUITE_NAME = 'MoblyTest'
+
+_TEST_INTERRUPTED_MESSAGE = 'Details: Test was interrupted manually.'
+
+_ILLEGAL_XML_CHARS = re.compile(
+    '[\x00-\x08\x0b\x0c\x0e-\x1F\uD800-\uDFFF\uFFFE\uFFFF]'
+)
+
+_ILLEGAL_YAML_CHARS = re.compile(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]')
+
+
+def as_posix_path(path):
+    """Returns a given path as a string in POSIX format."""
+    return str(pathlib.Path(path).as_posix())
+
+
+class MoblyResultstoreProperties(enum.Enum):
+    """Resultstore properties defined specifically for all Mobly tests.
+
+    All these properties apply to the testcase level. TEST_CLASS and
+    TEST_TYPE apply to both the testcase and testsuite level.
+    """
+
+    BEGIN_TIME = 'mobly_begin_time'
+    END_TIME = 'mobly_end_time'
+    TEST_CLASS = 'mobly_test_class'
+    TEST_TYPE = 'test_type'
+    UID = 'mobly_uid'
+    TEST_OUTPUT = 'test_output'
+    TEST_SIGNATURE = 'mobly_signature'
+    SKIP_REASON = 'skip_reason'
+    ERROR_MESSAGE = 'mobly_error_message'
+    ERROR_TYPE = 'mobly_error_type'
+    STACK_TRACE = 'mobly_stack_trace'
+
+
+_MOBLY_PROPERTY_VALUES = frozenset(e.value for e in MoblyResultstoreProperties)
+
+
+class ResultstoreTreeTags(enum.Enum):
+    """Common tags for Resultstore tree nodes."""
+
+    TESTSUITES = 'testsuites'
+    TESTSUITE = 'testsuite'
+    TESTCASE = 'testcase'
+    PROPERTIES = 'properties'
+    PROPERTY = 'property'
+    FAILURE = 'failure'
+    ERROR = 'error'
+
+
+class ResultstoreTreeAttributes(enum.Enum):
+    """Common attributes for Resultstore tree nodes."""
+
+    ERRORS = 'errors'
+    FAILURES = 'failures'
+    TESTS = 'tests'
+    CLASS_NAME = 'classname'
+    RESULT = 'result'
+    STATUS = 'status'
+    TIME = 'time'
+    TIMESTAMP = 'timestamp'
+    NAME = 'name'
+    VALUE = 'value'
+    MESSAGE = 'message'
+    RETRY_NUMBER = 'retrynumber'
+    REPEAT_NUMBER = 'repeatnumber'
+    ERROR_TYPE = 'type'
+    RERAN_TEST_NAME = 'rerantestname'
+
+
+@dataclasses.dataclass
+class TestSuiteSummary:
+    num_tests: int
+    num_errors: int
+    num_failures: int
+
+
+@dataclasses.dataclass
+class ReranNode:
+    reran_test_name: str
+    original_test_name: str
+    index: int
+    node_type: records.TestParentType
+
+
+def _find_all_elements(
+        mobly_root: ElementTree.Element,
+        class_name: Optional[str],
+        test_name: Optional[str],
+) -> Iterator[ElementTree.Element]:
+    """Finds all elements in the Resultstore tree with class name and/or
+    test_name.
+
+    If class name is absent, it will find all elements with the test name
+    across all test classes. If test name is absent it will find all elements
+    with class name. If both are absent, it will just return the Mobly root
+    tree.
+
+    Args:
+      mobly_root: Root element of the Mobly test Resultstore tree.
+      class_name: Mobly test class name to get the elements for.
+      test_name: Mobly test names to get the elements for.
+
+    Yields:
+      Iterator of elements satisfying the class_name and test_name search
+      criteria.
+    """
+    if class_name is None and test_name is None:
+        yield mobly_root
+        return
+
+    xpath = f'./{ResultstoreTreeTags.TESTSUITE.value}'
+    if class_name is not None:
+        xpath += f'[@{ResultstoreTreeAttributes.NAME.value}="{class_name}"]'
+    if test_name is not None:
+        xpath += (
+            f'/{ResultstoreTreeTags.TESTCASE.value}'
+            f'[@{ResultstoreTreeAttributes.NAME.value}="{test_name}"]'
+        )
+
+    yield from mobly_root.iterfind(xpath)
+
+
+def _create_or_return_properties_element(
+        element: ElementTree.Element,
+) -> ElementTree.Element:
+    properties_element = element.find(
+        f'./{ResultstoreTreeTags.PROPERTIES.value}')
+    if properties_element is not None:
+        return properties_element
+    return ElementTree.SubElement(element, ResultstoreTreeTags.PROPERTIES.value)
+
+
+def _add_or_update_property_element(
+        properties_element: ElementTree.Element, name: str, value: str
+):
+    """Adds a property element or update the property value."""
+    name = _ILLEGAL_XML_CHARS.sub('', name)
+    value = _ILLEGAL_XML_CHARS.sub('', value)
+    property_element = properties_element.find(
+        f'./{ResultstoreTreeTags.PROPERTY.value}'
+        f'[@{ResultstoreTreeAttributes.NAME.value}="{name}"]'
+    )
+    if property_element is None:
+        property_element = ElementTree.SubElement(
+            properties_element, ResultstoreTreeTags.PROPERTY.value
+        )
+        property_element.set(ResultstoreTreeAttributes.NAME.value, name)
+    property_element.set(ResultstoreTreeAttributes.VALUE.value, value)
+
+
+def _add_file_annotations(
+        entry: Mapping[str, Any],
+        properties_element: ElementTree.Element,
+        mobly_base_directory: Optional[str],
+        resultstore_root_directory: Optional[str],
+) -> None:
+    """Adds file annotations for a Mobly test case files.
+
+    The mobly_base_directory is used to find the files belonging to a test case.
+    The files under "mobly_base_directory/test_class/test_method" belong to the
+    test_class#test_method Resultstore node.
+
+    The resultstore_root_directory is used to determine the
+    relative path of the files to the Resultstore root directory for undeclared
+    outputs. The file annotation must be written for the relative path.
+
+    Args:
+      entry: Mobly summary entry for the test case.
+      properties_element: Test case properties element.
+      mobly_base_directory: Base directory of the Mobly test.
+      resultstore_root_directory: Root directory for Resultstore undeclared
+        outputs.
+    """
+    # If these directories are not provided, the converter will not add the
+    # annotations to associate the files with the test cases.
+    if (
+            mobly_base_directory is None
+            or resultstore_root_directory is None
+            or entry.get(records.TestResultEnums.RECORD_SIGNATURE, None) is None
+    ):
+        return
+
+    test_class = entry[records.TestResultEnums.RECORD_CLASS]
+    test_case_directory = os.path.join(
+        mobly_base_directory,
+        test_class,
+        entry[records.TestResultEnums.RECORD_SIGNATURE],
+    )
+
+    test_case_files = glob.glob(
+        os.path.join(test_case_directory, '**'), recursive=True
+    )
+    file_counter = 0
+    for file_path in test_case_files:
+        if not os.path.isfile(file_path):
+            continue
+        relative_path = os.path.relpath(file_path, resultstore_root_directory)
+        _add_or_update_property_element(
+            properties_element,
+            f'test_output{file_counter}',
+            as_posix_path(relative_path),
+        )
+        file_counter += 1
+
+
+def _create_mobly_root_element(
+        summary_record: Mapping[str, Any]
+) -> ElementTree.Element:
+    """Creates a Resultstore XML testsuite node for a Mobly test summary."""
+    full_summary = TestSuiteSummary(
+        num_tests=summary_record['Requested'],
+        num_errors=summary_record['Error'],
+        num_failures=summary_record['Failed'],
+    )
+    # Create the root Resultstore node to wrap the Mobly test.
+    main_wrapper = ElementTree.Element(ResultstoreTreeTags.TESTSUITES.value)
+    main_wrapper.set(ResultstoreTreeAttributes.NAME.value, '__main__')
+    main_wrapper.set(ResultstoreTreeAttributes.TIME.value, '0')
+    main_wrapper.set(
+        ResultstoreTreeAttributes.ERRORS.value, str(full_summary.num_errors)
+    )
+    main_wrapper.set(
+        ResultstoreTreeAttributes.FAILURES.value, str(full_summary.num_failures)
+    )
+    main_wrapper.set(
+        ResultstoreTreeAttributes.TESTS.value, str(full_summary.num_tests)
+    )
+
+    mobly_test_root = ElementTree.SubElement(
+        main_wrapper, ResultstoreTreeTags.TESTSUITE.value
+    )
+    mobly_test_root.set(
+        ResultstoreTreeAttributes.NAME.value, _MOBLY_TEST_SUITE_NAME
+    )
+    mobly_test_root.set(ResultstoreTreeAttributes.TIME.value, '0')
+    mobly_test_root.set(
+        ResultstoreTreeAttributes.ERRORS.value, str(full_summary.num_errors)
+    )
+    mobly_test_root.set(
+        ResultstoreTreeAttributes.FAILURES.value, str(full_summary.num_failures)
+    )
+    mobly_test_root.set(
+        ResultstoreTreeAttributes.TESTS.value, str(full_summary.num_tests)
+    )
+
+    return main_wrapper
+
+
+def _create_class_element(
+        class_name: str, class_summary: TestSuiteSummary
+) -> ElementTree.Element:
+    """Creates a Resultstore XML testsuite node for a Mobly test class summary.
+
+    Args:
+      class_name: Mobly test class name.
+      class_summary: Mobly test class summary.
+
+    Returns:
+      A Resultstore testsuite node representing one Mobly test class.
+    """
+    class_element = ElementTree.Element(ResultstoreTreeTags.TESTSUITE.value)
+    class_element.set(ResultstoreTreeAttributes.NAME.value, class_name)
+    class_element.set(ResultstoreTreeAttributes.TIME.value, '0')
+    class_element.set(
+        ResultstoreTreeAttributes.TESTS.value, str(class_summary.num_tests)
+    )
+    class_element.set(
+        ResultstoreTreeAttributes.ERRORS.value, str(class_summary.num_errors)
+    )
+    class_element.set(
+        ResultstoreTreeAttributes.FAILURES.value,
+        str(class_summary.num_failures)
+    )
+
+    properties_element = _create_or_return_properties_element(class_element)
+    _add_or_update_property_element(
+        properties_element,
+        MoblyResultstoreProperties.TEST_CLASS.value,
+        class_name,
+    )
+    _add_or_update_property_element(
+        properties_element,
+        MoblyResultstoreProperties.TEST_TYPE.value,
+        'mobly_class',
+    )
+
+    return class_element
+
+
+def _set_rerun_node(
+        signature: str,
+        child_parent_map: Mapping[str, str],
+        parent_type_map: Mapping[str, records.TestParentType],
+        signature_test_name_map: Mapping[str, str],
+        rerun_node_map: Dict[str, ReranNode],
+) -> None:
+    """Sets the rerun node in the rerun node map for the current test signature.
+
+    This function traverses the child parent map recursively until it finds the
+    root test run for the rerun chain. Then it uses the original test name from
+    there and builds the indices.
+
+    Args:
+      signature: Current test signature.
+      child_parent_map: Map of test signature to the parent test signature.
+      parent_type_map: Map of parent test signature to the parent type.
+      signature_test_name_map: Map of test signature to test name.
+      rerun_node_map: Map of test signature to rerun information.
+    """
+    if signature in rerun_node_map:
+        return
+
+    # If there is no parent, then this is the root test in the retry chain.
+    if signature not in child_parent_map:
+        if parent_type_map[signature] == records.TestParentType.REPEAT:
+            # If repeat, remove the '_#' suffix to get the original test name.
+            original_test_name = \
+              signature_test_name_map[signature].rsplit('_', 1)[0]
+        else:
+            original_test_name = signature_test_name_map[signature]
+        rerun_node_map[signature] = ReranNode(
+            signature_test_name_map[signature],
+            original_test_name,
+            0,
+            parent_type_map[signature],
+        )
+        return
+
+    parent_signature = child_parent_map[signature]
+    _set_rerun_node(
+        parent_signature,
+        child_parent_map,
+        parent_type_map,
+        signature_test_name_map,
+        rerun_node_map,
+    )
+
+    parent_node = rerun_node_map[parent_signature]
+    rerun_node_map[signature] = ReranNode(
+        signature_test_name_map[signature],
+        parent_node.original_test_name,
+        parent_node.index + 1,
+        parent_node.node_type,
+    )
+
+
+def _get_reran_nodes(
+        entries: List[Mapping[str, Any]]
+) -> Mapping[str, ReranNode]:
+    """Gets the nodes for any test case reruns.
+
+    Args:
+      entries: Summary entries for the Mobly test runs.
+
+    Returns:
+      A map of test signature to node information.
+    """
+    child_parent_map = {}
+    parent_type_map = {}
+    signature_test_name_map = {}
+    for entry in entries:
+        if records.TestResultEnums.RECORD_SIGNATURE not in entry:
+            continue
+        current_signature = entry[records.TestResultEnums.RECORD_SIGNATURE]
+        signature_test_name_map[current_signature] = entry[
+            records.TestResultEnums.RECORD_NAME
+        ]
+        # This is a dictionary with parent and type.
+        rerun_parent = entry.get(records.TestResultEnums.RECORD_PARENT, None)
+        if rerun_parent is not None:
+            parent_signature = rerun_parent['parent']
+            parent_type = (
+                records.TestParentType.RETRY
+                if rerun_parent['type'] == 'retry'
+                else records.TestParentType.REPEAT
+            )
+            child_parent_map[current_signature] = parent_signature
+            parent_type_map[parent_signature] = parent_type
+
+    rerun_node_map = {}
+    for signature in child_parent_map:
+        # Populates the rerun node map.
+        _set_rerun_node(
+            signature,
+            child_parent_map,
+            parent_type_map,
+            signature_test_name_map,
+            rerun_node_map,
+        )
+
+    return rerun_node_map
+
+
+def _process_record(
+        entry: Mapping[str, Any],
+        reran_node: Optional[ReranNode],
+        mobly_base_directory: Optional[str],
+        resultstore_root_directory: Optional[str],
+) -> ElementTree.Element:
+    """Processes a single Mobly test record entry to a Resultstore test case
+    node.
+
+    Args:
+      entry: Summary of a single Mobly test case.
+      reran_node: Rerun information if this test case is a rerun. Only present
+        if this test is part of a rerun chain.
+      mobly_base_directory: Base directory for the Mobly test. Artifacts from
+        the Mobly test will be saved here.
+      resultstore_root_directory: Root directory for Resultstore undeclared
+        outputs.
+
+    Returns:
+      A Resultstore XML node representing a single test case.
+    """
+    begin_time = entry[records.TestResultEnums.RECORD_BEGIN_TIME]
+    end_time = entry[records.TestResultEnums.RECORD_END_TIME]
+    testcase_element = ElementTree.Element(ResultstoreTreeTags.TESTCASE.value)
+    result = entry[records.TestResultEnums.RECORD_RESULT]
+
+    if reran_node is not None:
+        if reran_node.node_type == records.TestParentType.RETRY:
+            testcase_element.set(
+                ResultstoreTreeAttributes.RETRY_NUMBER.value,
+                str(reran_node.index)
+            )
+        elif reran_node.node_type == records.TestParentType.REPEAT:
+            testcase_element.set(
+                ResultstoreTreeAttributes.REPEAT_NUMBER.value,
+                str(reran_node.index)
+            )
+        testcase_element.set(
+            ResultstoreTreeAttributes.NAME.value, reran_node.original_test_name
+        )
+        testcase_element.set(
+            ResultstoreTreeAttributes.RERAN_TEST_NAME.value,
+            reran_node.reran_test_name,
+        )
+    else:
+        testcase_element.set(
+            ResultstoreTreeAttributes.NAME.value,
+            entry[records.TestResultEnums.RECORD_NAME],
+        )
+        testcase_element.set(
+            ResultstoreTreeAttributes.RERAN_TEST_NAME.value,
+            entry[records.TestResultEnums.RECORD_NAME],
+        )
+    testcase_element.set(
+        ResultstoreTreeAttributes.CLASS_NAME.value,
+        entry[records.TestResultEnums.RECORD_CLASS],
+    )
+    if result == records.TestResultEnums.TEST_RESULT_SKIP:
+        testcase_element.set(ResultstoreTreeAttributes.RESULT.value, 'skipped')
+        testcase_element.set(ResultstoreTreeAttributes.STATUS.value, 'notrun')
+        testcase_element.set(ResultstoreTreeAttributes.TIME.value, '0')
+    elif result is None:
+        testcase_element.set(ResultstoreTreeAttributes.RESULT.value,
+                             'completed')
+        testcase_element.set(ResultstoreTreeAttributes.STATUS.value, 'run')
+        testcase_element.set(ResultstoreTreeAttributes.TIME.value, '0')
+        testcase_element.set(
+            ResultstoreTreeAttributes.TIMESTAMP.value,
+            datetime.datetime.fromtimestamp(
+                begin_time / 1000, tz=datetime.timezone.utc
+            ).strftime('%Y-%m-%dT%H:%M:%SZ'),
+        )
+    else:
+        testcase_element.set(ResultstoreTreeAttributes.RESULT.value,
+                             'completed')
+        testcase_element.set(ResultstoreTreeAttributes.STATUS.value, 'run')
+        testcase_element.set(
+            ResultstoreTreeAttributes.TIME.value, str(end_time - begin_time)
+        )
+        testcase_element.set(
+            ResultstoreTreeAttributes.TIMESTAMP.value,
+            datetime.datetime.fromtimestamp(
+                begin_time / 1000, tz=datetime.timezone.utc
+            ).strftime('%Y-%m-%dT%H:%M:%SZ'),
+        )
+
+    # Add Mobly specific test case properties.
+    properties_element = _create_or_return_properties_element(testcase_element)
+    if result == records.TestResultEnums.TEST_RESULT_SKIP:
+        _add_or_update_property_element(
+            properties_element,
+            MoblyResultstoreProperties.SKIP_REASON.value,
+            f'Details: {entry[records.TestResultEnums.RECORD_DETAILS]}',
+        )
+    elif result is None:
+        _add_or_update_property_element(
+            properties_element,
+            MoblyResultstoreProperties.BEGIN_TIME.value,
+            str(begin_time),
+        )
+    else:
+        _add_or_update_property_element(
+            properties_element,
+            MoblyResultstoreProperties.BEGIN_TIME.value,
+            str(begin_time),
+        )
+        _add_or_update_property_element(
+            properties_element,
+            MoblyResultstoreProperties.END_TIME.value,
+            str(end_time),
+        )
+    _add_or_update_property_element(
+        properties_element,
+        MoblyResultstoreProperties.TEST_CLASS.value,
+        entry[records.TestResultEnums.RECORD_CLASS],
+    )
+    _add_or_update_property_element(
+        properties_element,
+        MoblyResultstoreProperties.TEST_TYPE.value,
+        'mobly_test',
+    )
+
+    if entry.get(records.TestResultEnums.RECORD_SIGNATURE, None) is not None:
+        _add_or_update_property_element(
+            properties_element,
+            MoblyResultstoreProperties.TEST_SIGNATURE.value,
+            entry[records.TestResultEnums.RECORD_SIGNATURE],
+        )
+
+    _add_file_annotations(
+        entry,
+        properties_element,
+        mobly_base_directory,
+        resultstore_root_directory,
+    )
+
+    if entry[records.TestResultEnums.RECORD_UID] is not None:
+        _add_or_update_property_element(
+            properties_element,
+            MoblyResultstoreProperties.UID.value,
+            entry[records.TestResultEnums.RECORD_UID],
+        )
+
+    if result is None:
+        error_element = ElementTree.SubElement(
+            testcase_element, ResultstoreTreeTags.ERROR.value
+        )
+        error_element.set(
+            ResultstoreTreeAttributes.MESSAGE.value, _TEST_INTERRUPTED_MESSAGE
+        )
+        error_element.text = _TEST_INTERRUPTED_MESSAGE
+    elif (
+            result == records.TestResultEnums.TEST_RESULT_FAIL
+            or result == records.TestResultEnums.TEST_RESULT_ERROR
+    ):
+        error_message = (
+            f'Details: {entry[records.TestResultEnums.RECORD_DETAILS]}')
+        tag = (
+            ResultstoreTreeTags.FAILURE.value
+            if result == records.TestResultEnums.TEST_RESULT_FAIL
+            else ResultstoreTreeTags.ERROR.value
+        )
+        failure_or_error_element = ElementTree.SubElement(testcase_element, tag)
+        failure_or_error_element.set(
+            ResultstoreTreeAttributes.MESSAGE.value, error_message
+        )
+        _add_or_update_property_element(
+            properties_element,
+            MoblyResultstoreProperties.ERROR_MESSAGE.value,
+            error_message,
+        )
+
+        # Add termination signal type and stack trace to the failure XML element
+        # and the test case properties.
+        termination_signal_type = entry[
+            records.TestResultEnums.RECORD_TERMINATION_SIGNAL_TYPE
+        ]
+        if termination_signal_type is None:
+            logging.warning(
+                'Test %s has %s result without a termination signal type.',
+                entry[records.TestResultEnums.RECORD_NAME],
+                result,
+            )
+        else:
+            failure_or_error_element.set(
+                ResultstoreTreeAttributes.ERROR_TYPE.value,
+                termination_signal_type
+            )
+            _add_or_update_property_element(
+                properties_element,
+                MoblyResultstoreProperties.ERROR_TYPE.value,
+                termination_signal_type,
+            )
+        stack_trace = entry[records.TestResultEnums.RECORD_STACKTRACE]
+        if stack_trace is None:
+            logging.warning(
+                'Test %s has %s result without a stack trace.',
+                entry[records.TestResultEnums.RECORD_NAME],
+                result,
+            )
+        else:
+            failure_or_error_element.text = stack_trace
+            _add_or_update_property_element(
+                properties_element,
+                MoblyResultstoreProperties.STACK_TRACE.value,
+                stack_trace,
+            )
+    return testcase_element
+
+
+def convert(
+        mobly_results_path: str,
+        mobly_base_directory: Optional[str] = None,
+        resultstore_root_directory: Optional[str] = None,
+) -> ElementTree.ElementTree:
+    """Converts a Mobly results summary file to Resultstore XML schema.
+
+    The mobly_base_directory and resultstore_root_directory will be used to
+    compute the file links for each Resultstore tree element. If these are
+    absent then the file links will be omitted.
+
+    Args:
+      mobly_results_path: Path to the Mobly summary YAML file.
+      mobly_base_directory: Base directory of the Mobly test.
+      resultstore_root_directory: Root directory for Resultstore undeclared
+        outputs.
+
+    Returns:
+      A Resultstore XML tree for the Mobly test.
+    """
+    logging.info('Generating Resultstore tree...')
+
+    with open(mobly_results_path, 'r', encoding='utf-8') as f:
+        summary_entries = list(
+            yaml.safe_load_all(_ILLEGAL_YAML_CHARS.sub('', f.read()))
+        )
+
+    summary_record = next(
+        entry
+        for entry in summary_entries
+        if entry[_MOBLY_RECORD_TYPE_KEY]
+        == records.TestSummaryEntryType.SUMMARY.value
+    )
+
+    main_root = _create_mobly_root_element(summary_record)
+
+    mobly_test_root = main_root[0]
+    mobly_root_properties = _create_or_return_properties_element(
+        mobly_test_root)
+    # Add files under the Mobly root directory to the Mobly test suite node.
+    if (
+            mobly_base_directory is not None
+            and resultstore_root_directory is not None
+    ):
+        file_counter = 0
+        for filename in os.listdir(mobly_base_directory):
+            file_path = os.path.join(mobly_base_directory, filename)
+            if not os.path.isfile(file_path):
+                continue
+            relative_path = os.path.relpath(file_path,
+                                            resultstore_root_directory)
+            _add_or_update_property_element(
+                mobly_root_properties,
+                f'test_output{file_counter}',
+                as_posix_path(relative_path),
+            )
+            file_counter += 1
+
+    test_case_entries = [
+        entry
+        for entry in summary_entries
+        if (entry[_MOBLY_RECORD_TYPE_KEY]
+            == records.TestSummaryEntryType.RECORD.value)
+    ]
+    # Populate the class summaries.
+    class_summaries = {}
+    for entry in test_case_entries:
+        class_name = entry[records.TestResultEnums.RECORD_CLASS]
+
+        if class_name not in class_summaries:
+            class_summaries[class_name] = TestSuiteSummary(
+                num_tests=0, num_errors=0, num_failures=0
+            )
+
+        class_summaries[class_name].num_tests += 1
+        if (
+                entry[records.TestResultEnums.RECORD_RESULT]
+                == records.TestResultEnums.TEST_RESULT_ERROR
+        ):
+            class_summaries[class_name].num_errors += 1
+        elif (
+                entry[records.TestResultEnums.RECORD_RESULT]
+                == records.TestResultEnums.TEST_RESULT_FAIL
+        ):
+            class_summaries[class_name].num_failures += 1
+
+    # Create class nodes.
+    class_elements = {}
+    for class_name, summary in class_summaries.items():
+        class_elements[class_name] = _create_class_element(class_name, summary)
+        mobly_test_root.append(class_elements[class_name])
+
+    # Append test case nodes to test class nodes.
+    reran_nodes = _get_reran_nodes(test_case_entries)
+    for entry in test_case_entries:
+        class_name = entry[records.TestResultEnums.RECORD_CLASS]
+        if (
+                records.TestResultEnums.RECORD_SIGNATURE in entry
+                and
+                entry[records.TestResultEnums.RECORD_SIGNATURE] in reran_nodes
+        ):
+            reran_node = reran_nodes[
+                entry[records.TestResultEnums.RECORD_SIGNATURE]]
+        else:
+            reran_node = None
+        class_elements[class_name].append(
+            _process_record(
+                entry, reran_node, mobly_base_directory,
+                resultstore_root_directory
+            )
+        )
+
+    user_data_entries = [
+        entry
+        for entry in summary_entries
+        if (entry[_MOBLY_RECORD_TYPE_KEY]
+            == records.TestSummaryEntryType.USER_DATA.value)
+    ]
+
+    for user_data_entry in user_data_entries:
+        class_name = user_data_entry.get(records.TestResultEnums.RECORD_CLASS,
+                                         None)
+        test_name = user_data_entry.get(records.TestResultEnums.RECORD_NAME,
+                                        None)
+
+        properties = user_data_entry.get('properties', None)
+        if not isinstance(properties, dict):
+            continue
+        for element in _find_all_elements(mobly_test_root, class_name,
+                                          test_name):
+            properties_element = _create_or_return_properties_element(element)
+            for name, value in properties.items():
+                if name in _MOBLY_PROPERTY_VALUES:
+                    # Do not override Mobly properties.
+                    continue
+                _add_or_update_property_element(
+                    properties_element, str(name), str(value)
+                )
+
+    return ElementTree.ElementTree(main_root)
diff --git a/tools/results_uploader/src/results_uploader.py b/tools/results_uploader/src/results_uploader.py
new file mode 100644
index 0000000..7b8d3b2
--- /dev/null
+++ b/tools/results_uploader/src/results_uploader.py
@@ -0,0 +1,284 @@
+#!/usr/bin/env python3
+
+#  Copyright (C) 2024 The Android Open Source Project
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+"""CLI uploader for Mobly test results to Resultstore."""
+
+import argparse
+import dataclasses
+import datetime
+import logging
+import os
+import pathlib
+import platform
+import shutil
+import tempfile
+import warnings
+from xml.etree import ElementTree
+
+import google.auth
+from google.cloud import storage
+from googleapiclient import discovery
+
+import mobly_result_converter
+import resultstore_client
+
+with warnings.catch_warnings():
+    warnings.simplefilter('ignore')
+    from google.cloud.storage import transfer_manager
+
+logging.getLogger('googleapiclient').setLevel(logging.WARNING)
+
+
+_RESULTSTORE_SERVICE_NAME = 'resultstore'
+_API_VERSION = 'v2'
+_DISCOVERY_SERVICE_URL = (
+    'https://{api}.googleapis.com/$discovery/rest?version={apiVersion}'
+)
+_TEST_XML = 'test.xml'
+_TEST_LOGS = 'test.log'
+_UNDECLARED_OUTPUTS = 'undeclared_outputs/'
+
+_TEST_SUMMARY_YAML = 'test_summary.yaml'
+_TEST_LOG_INFO = 'test_log.INFO'
+
+_RUN_IDENTIFIER = 'run_identifier'
+
+_ResultstoreTreeTags = mobly_result_converter.ResultstoreTreeTags
+_ResultstoreTreeAttributes = mobly_result_converter.ResultstoreTreeAttributes
+
+_Status = resultstore_client.Status
+
+
+@dataclasses.dataclass()
+class _TestResultInfo:
+    """Info from the parsed test summary used for the Resultstore invocation."""
+
+    # Aggregate status of the overall test run.
+    status: _Status = _Status.UNKNOWN
+    # Target ID for the test.
+    target_id: str | None = None
+
+
+def _convert_results(mobly_dir: str, dest_dir: str) -> _TestResultInfo:
+    """Converts Mobly test results into a Resultstore artifacts."""
+    test_result_info = _TestResultInfo()
+    logging.info('Converting raw Mobly logs into Resultstore artifacts...')
+    # Generate the test.xml
+    mobly_yaml_path = os.path.join(mobly_dir, _TEST_SUMMARY_YAML)
+    if os.path.isfile(mobly_yaml_path):
+        test_xml = mobly_result_converter.convert(
+            mobly_yaml_path, mobly_dir, mobly_dir
+        )
+        ElementTree.indent(test_xml)
+        test_xml.write(
+            os.path.join(dest_dir, _TEST_XML),
+            encoding='utf-8',
+            xml_declaration=True,
+        )
+        test_result_info = _get_test_result_info_from_test_xml(test_xml)
+
+    # Copy test_log.INFO to test.log
+    test_log_info = os.path.join(mobly_dir, _TEST_LOG_INFO)
+    if os.path.isfile(test_log_info):
+        shutil.copyfile(test_log_info, os.path.join(dest_dir, _TEST_LOGS))
+
+    # Copy directory to undeclared_outputs/
+    shutil.copytree(
+        mobly_dir,
+        os.path.join(dest_dir, _UNDECLARED_OUTPUTS),
+        dirs_exist_ok=True,
+    )
+    return test_result_info
+
+
+def _get_test_result_info_from_test_xml(
+        test_xml: ElementTree.ElementTree,
+) -> _TestResultInfo:
+    """Parses a test_xml element into a _TestResultInfo."""
+    test_result_info = _TestResultInfo()
+    mobly_suite_element = test_xml.getroot().find(
+        f'./{_ResultstoreTreeTags.TESTSUITE.value}'
+    )
+    if mobly_suite_element is None:
+        return test_result_info
+    # Set aggregate test status
+    test_result_info.status = _Status.PASSED
+    test_class_elements = mobly_suite_element.findall(
+        f'./{_ResultstoreTreeTags.TESTSUITE.value}')
+    failures = int(
+        mobly_suite_element.get(_ResultstoreTreeAttributes.FAILURES.value)
+    )
+    errors = int(
+        mobly_suite_element.get(_ResultstoreTreeAttributes.ERRORS.value))
+    if failures or errors:
+        test_result_info.status = _Status.FAILED
+    else:
+        all_skipped = all([test_case_element.get(
+            _ResultstoreTreeAttributes.RESULT.value) == 'skipped' for
+                           test_class_element in test_class_elements for
+                           test_case_element in test_class_element.findall(
+                f'./{_ResultstoreTreeTags.TESTCASE.value}')])
+        if all_skipped:
+            test_result_info.status = _Status.SKIPPED
+
+    # Set target ID based on test class names and run_identifier property
+    test_class_names = [
+        test_class_element.get(_ResultstoreTreeAttributes.NAME.value)
+        for test_class_element in test_class_elements
+    ]
+    target_id = '+'.join(test_class_names)
+    properties_element = mobly_suite_element.find(
+        f'./{_ResultstoreTreeTags.PROPERTIES.value}'
+    )
+    if properties_element is not None:
+        run_identifier = properties_element.find(
+            f'./{_ResultstoreTreeTags.PROPERTY.value}'
+            f'[@{_ResultstoreTreeAttributes.NAME.value}="{_RUN_IDENTIFIER}"]'
+        )
+        if run_identifier is not None:
+            run_identifier_value = run_identifier.get(
+                _ResultstoreTreeAttributes.VALUE.value
+            )
+            target_id = f'{target_id} ({run_identifier_value})'
+    test_result_info.target_id = target_id
+    return test_result_info
+
+
+def _upload_dir_to_gcs(
+        src_dir: str, gcs_bucket: str, gcs_dir: str
+) -> list[str]:
+    """Uploads the given directory to a GCS bucket."""
+    bucket_obj = storage.Client().bucket(gcs_bucket)
+
+    glob = pathlib.Path(src_dir).expanduser().rglob('*')
+    file_paths = [
+        str(path.relative_to(src_dir).as_posix())
+        for path in glob
+        if path.is_file()
+    ]
+
+    logging.info(
+        'Uploading %s files from %s to Cloud Storage bucket %s/%s...',
+        len(file_paths),
+        src_dir,
+        gcs_bucket,
+        gcs_dir,
+    )
+    # Ensure that the destination directory has a trailing '/'.
+    blob_name_prefix = gcs_dir
+    if blob_name_prefix and not blob_name_prefix.endswith('/'):
+        blob_name_prefix += '/'
+
+    # If running on Windows, disable multiprocessing for upload.
+    worker_type = (
+        transfer_manager.THREAD
+        if platform.system() == 'Windows'
+        else transfer_manager.PROCESS
+    )
+    results = transfer_manager.upload_many_from_filenames(
+        bucket_obj,
+        file_paths,
+        source_directory=src_dir,
+        blob_name_prefix=blob_name_prefix,
+        worker_type=worker_type,
+    )
+
+    success_paths = []
+    for file_name, result in zip(file_paths, results):
+        if isinstance(result, Exception):
+            logging.warning('Failed to upload %s. Error: %s', file_name, result)
+        else:
+            logging.debug('Uploaded %s.', file_name)
+            success_paths.append(file_name)
+    return success_paths
+
+
+def _upload_to_resultstore(
+        gcs_bucket: str,
+        gcs_dir: str,
+        file_paths: list[str],
+        status: _Status,
+        target_id: str | None,
+) -> None:
+    """Uploads test results to Resultstore."""
+    logging.info('Generating Resultstore link...')
+    service = discovery.build(
+        _RESULTSTORE_SERVICE_NAME,
+        _API_VERSION,
+        discoveryServiceUrl=_DISCOVERY_SERVICE_URL,
+    )
+    creds, project_id = google.auth.default()
+    client = resultstore_client.ResultstoreClient(service, creds, project_id)
+    client.create_invocation()
+    client.create_default_configuration()
+    client.create_target(target_id)
+    client.create_configured_target()
+    client.create_action(f'gs://{gcs_bucket}/{gcs_dir}', file_paths)
+    client.set_status(status)
+    client.merge_configured_target()
+    client.finalize_configured_target()
+    client.merge_target()
+    client.finalize_target()
+    client.merge_invocation()
+    client.finalize_invocation()
+
+
+def main():
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+        '-v', '--verbose', action='store_true', help='Enable debug logs.'
+    )
+    parser.add_argument(
+        '--mobly_dir',
+        required=True,
+        help='Directory on host where Mobly results are stored.',
+    )
+    parser.add_argument(
+        '--gcs_bucket',
+        required=True,
+        help='Bucket in GCS where test artifacts are uploaded.',
+    )
+    parser.add_argument(
+        '--gcs_dir',
+        help=(
+            'Directory to save test artifacts in GCS. Specify empty string to '
+            'store the files in the bucket root. If unspecified, use the '
+            'current timestamp as the GCS directory name.'
+        ),
+    )
+    parser.add_argument('--target_id', help='Custom target ID.')
+
+    args = parser.parse_args()
+    logging.basicConfig(level=(logging.DEBUG if args.verbose else logging.INFO))
+    gcs_dir_name = (
+        datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
+        if args.gcs_dir is None
+        else args.gcs_dir
+    )
+    with tempfile.TemporaryDirectory() as tmp:
+        test_result_info = _convert_results(args.mobly_dir, tmp)
+        gcs_files = _upload_dir_to_gcs(tmp, args.gcs_bucket, gcs_dir_name)
+    _upload_to_resultstore(
+        args.gcs_bucket,
+        gcs_dir_name,
+        gcs_files,
+        test_result_info.status,
+        args.target_id or test_result_info.target_id,
+    )
+
+
+if __name__ == '__main__':
+    main()
diff --git a/tools/results_uploader/src/resultstore_client.py b/tools/results_uploader/src/resultstore_client.py
new file mode 100644
index 0000000..1388359
--- /dev/null
+++ b/tools/results_uploader/src/resultstore_client.py
@@ -0,0 +1,371 @@
+#!/usr/bin/env python3
+
+#  Copyright (C) 2024 The Android Open Source Project
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+"""Resultstore client for Mobly tests."""
+
+import datetime
+import enum
+import logging
+import posixpath
+import urllib.parse
+import uuid
+
+from google.auth import credentials
+import google_auth_httplib2
+from googleapiclient import discovery
+import httplib2
+
+_DEFAULT_CONFIGURATION = 'default'
+_RESULTSTORE_BASE_LINK = 'https://btx.cloud.google.com/invocations'
+
+
+class Status(enum.Enum):
+    """Aggregate status of the Resultstore invocation and target."""
+    PASSED = 'PASSED'
+    FAILED = 'FAILED'
+    SKIPPED = 'SKIPPED'
+    UNKNOWN = 'UNKNOWN'
+
+
+class ResultstoreClient:
+    """Resultstore client for Mobly tests."""
+
+    def __init__(
+            self,
+            service: discovery.Resource,
+            creds: credentials.Credentials,
+            project_id: str,
+    ):
+        """Creates a ResultstoreClient.
+
+        Args:
+          service: discovery.Resource object for interacting with the API.
+          creds: credentials to add to HTTP request.
+          project_id: GCP project ID for Resultstore.
+        """
+        self._service = service
+        self._http = google_auth_httplib2.AuthorizedHttp(
+            creds, http=httplib2.Http(timeout=30)
+        )
+        self._project_id = project_id
+
+        self._request_id = ''
+        self._invocation_id = ''
+        self._authorization_token = ''
+        self._target_id = ''
+        self._encoded_target_id = ''
+
+        self._status = Status.UNKNOWN
+
+    @property
+    def _invocation_name(self):
+        """The resource name for the invocation."""
+        if not self._invocation_id:
+            return ''
+        return f'invocations/{self._invocation_id}'
+
+    @property
+    def _target_name(self):
+        """The resource name for the target."""
+        if not (self._invocation_name or self._encoded_target_id):
+            return ''
+        return f'{self._invocation_name}/targets/{self._encoded_target_id}'
+
+    @property
+    def _configured_target_name(self):
+        """The resource name for the configured target."""
+        if not self._target_name:
+            return
+        return f'{self._target_name}/configuredTargets/{_DEFAULT_CONFIGURATION}'
+
+    def set_status(self, status: Status) -> None:
+        """Sets the overall test run status."""
+        self._status = status
+
+    def create_invocation(self) -> str:
+        """Creates an invocation.
+
+        Returns:
+          The invocation ID.
+        """
+        logging.info('creating invocation...')
+        if self._invocation_id:
+            logging.warning(
+                'invocation %s already exists, skipping creation...',
+                self._invocation_id,
+            )
+            return None
+        invocation = {
+            'timing': {
+                'startTime': datetime.datetime.utcnow().isoformat() + 'Z'},
+            'invocationAttributes': {'projectId': self._project_id},
+        }
+        self._request_id = str(uuid.uuid4())
+        self._invocation_id = str(uuid.uuid4())
+        self._authorization_token = str(uuid.uuid4())
+        request = self._service.invocations().create(
+            body=invocation,
+            requestId=self._request_id,
+            invocationId=self._invocation_id,
+            authorizationToken=self._authorization_token,
+        )
+        res = request.execute(http=self._http)
+        logging.debug('invocations.create: %s', res)
+        return self._invocation_id
+
+    def create_default_configuration(self) -> None:
+        """Creates a default configuration."""
+        logging.info('creating default configuration...')
+        configuration = {
+            'id': {
+                'invocationId': self._invocation_id,
+                'configurationId': _DEFAULT_CONFIGURATION,
+            }
+        }
+        request = (
+            self._service.invocations()
+            .configs()
+            .create(
+                body=configuration,
+                parent=f'invocations/{self._invocation_id}',
+                configId=_DEFAULT_CONFIGURATION,
+                authorizationToken=self._authorization_token,
+            )
+        )
+        res = request.execute(http=self._http)
+        logging.debug('invocations.configs.create: %s', res)
+
+    def create_target(self, target_id: str | None = None) -> str:
+        """Creates a target.
+
+        Args:
+          target_id: An optional custom target ID.
+
+        Returns:
+          The target ID.
+        """
+        logging.info('creating target in %s...', self._invocation_name)
+        if self._target_id:
+            logging.warning(
+                'target %s already exists, skipping creation...',
+                self._target_id,
+            )
+            return
+        self._target_id = target_id or str(uuid.uuid4())
+        self._encoded_target_id = urllib.parse.quote(self._target_id, safe='')
+        target = {
+            'id': {
+                'invocationId': self._invocation_id,
+                'targetId': self._target_id,
+            },
+            'targetAttributes': {'type': 'TEST', 'language': 'PY'},
+        }
+        request = (
+            self._service.invocations()
+            .targets()
+            .create(
+                body=target,
+                parent=self._invocation_name,
+                targetId=self._target_id,
+                authorizationToken=self._authorization_token,
+            )
+        )
+        res = request.execute(http=self._http)
+        logging.debug('invocations.targets.create: %s', res)
+        return self._target_id
+
+    def create_configured_target(self) -> None:
+        """Creates a configured target."""
+        logging.info('creating configured target in %s...', self._target_name)
+        configured_target = {
+            'id': {
+                'invocationId': self._invocation_id,
+                'targetId': self._target_id,
+                'configurationId': _DEFAULT_CONFIGURATION,
+            },
+        }
+        request = (
+            self._service.invocations()
+            .targets()
+            .configuredTargets()
+            .create(
+                body=configured_target,
+                parent=self._target_name,
+                configId=_DEFAULT_CONFIGURATION,
+                authorizationToken=self._authorization_token,
+            )
+        )
+        res = request.execute(http=self._http)
+        logging.debug('invocations.targets.configuredTargets.create: %s', res)
+
+    def create_action(self, gcs_path: str, artifacts: list[str]) -> str:
+        """Creates an action.
+
+        Args:
+          gcs_path: The directory in GCS where artifacts are stored.
+          artifacts: List of paths (relative to gcs_path) to the test artifacts.
+
+        Returns:
+          The action ID.
+        """
+        logging.info('creating action in %s...', self._configured_target_name)
+        action_id = str(uuid.uuid4())
+        files = [
+            {'uid': path, 'uri': posixpath.join(gcs_path, path)}
+            for path in artifacts
+        ]
+        action = {
+            'id': {
+                'invocationId': self._invocation_id,
+                'targetId': self._target_id,
+                'configurationId': _DEFAULT_CONFIGURATION,
+                'actionId': action_id,
+            },
+            'testAction': {},
+            'files': files,
+        }
+        request = (
+            self._service.invocations()
+            .targets()
+            .configuredTargets()
+            .actions()
+            .create(
+                body=action,
+                parent=self._configured_target_name,
+                actionId=action_id,
+                authorizationToken=self._authorization_token,
+            )
+        )
+        res = request.execute(http=self._http)
+        logging.debug(
+            'invocations.targets.configuredTargets.actions.create: %s', res
+        )
+        return action_id
+
+    def merge_configured_target(self):
+        """Merges a configured target."""
+        logging.info('merging configured target %s...',
+                     self._configured_target_name)
+        merge_request = {
+            'configuredTarget': {
+                'statusAttributes': {'status': self._status.value},
+            },
+            'authorizationToken': self._authorization_token,
+            'updateMask': 'statusAttributes',
+        }
+        request = (
+            self._service.invocations()
+            .targets()
+            .configuredTargets()
+            .merge(
+                body=merge_request,
+                name=self._configured_target_name,
+            )
+        )
+        res = request.execute(http=self._http)
+        logging.debug('invocations.targets.configuredTargets.merge: %s', res)
+
+    def finalize_configured_target(self):
+        """Finalizes a configured target."""
+        logging.info('finalizing configured target %s...',
+                     self._configured_target_name)
+        finalize_request = {
+            'authorizationToken': self._authorization_token,
+        }
+        request = (
+            self._service.invocations()
+            .targets()
+            .configuredTargets()
+            .finalize(
+                body=finalize_request,
+                name=self._configured_target_name,
+            )
+        )
+        res = request.execute(http=self._http)
+        logging.debug('invocations.targets.configuredTargets.finalize: %s', res)
+
+    def merge_target(self):
+        """Merges a target."""
+        logging.info('merging target %s...', self._target_name)
+        merge_request = {
+            'target': {
+                'statusAttributes': {'status': self._status.value},
+            },
+            'authorizationToken': self._authorization_token,
+            'updateMask': 'statusAttributes',
+        }
+        request = (
+            self._service.invocations()
+            .targets()
+            .merge(
+                body=merge_request,
+                name=self._target_name,
+            )
+        )
+        res = request.execute(http=self._http)
+        logging.debug('invocations.targets.merge: %s', res)
+
+    def finalize_target(self):
+        """Finalizes a target."""
+        logging.info('finalizing target %s...', self._target_name)
+        finalize_request = {
+            'authorizationToken': self._authorization_token,
+        }
+        request = (
+            self._service.invocations()
+            .targets()
+            .finalize(
+                body=finalize_request,
+                name=self._target_name,
+            )
+        )
+        res = request.execute(http=self._http)
+        logging.debug('invocations.targets.finalize: %s', res)
+        self._target_id = ''
+        self._encoded_target_id = ''
+
+    def merge_invocation(self):
+        """Merges an invocation."""
+        logging.info('merging invocation %s...', self._invocation_name)
+        merge_request = {
+            'invocation': {'statusAttributes': {'status': self._status.value}},
+            'updateMask': 'statusAttributes',
+            'authorizationToken': self._authorization_token,
+        }
+        request = self._service.invocations().merge(body=merge_request,
+                                                    name=self._invocation_name)
+        res = request.execute(http=self._http)
+        logging.debug('invocations.merge: %s', res)
+
+    def finalize_invocation(self):
+        """Finalizes an invocation."""
+        logging.info('finalizing invocation %s...', self._invocation_name)
+        finalize_request = {
+            'authorizationToken': self._authorization_token,
+        }
+        request = self._service.invocations().finalize(
+            body=finalize_request, name=self._invocation_name
+        )
+        res = request.execute(http=self._http)
+        logging.debug('invocations.finalize: %s', res)
+        logging.info(
+            '----------\nresultstore link is %s/%s',
+            _RESULTSTORE_BASE_LINK,
+            self._invocation_id,
+        )
+        self._request_id = ''
+        self._invocation_id = ''
+        self._authorization_token = ''