Snap for 11031605 from 078fa319688234bae7a2a3c9e93a8dc3313a368c to mainline-os-statsd-release

Change-Id: Ibce7a4467a017513dd0fea51dd485242115bde25
diff --git a/Android.bp b/Android.bp
index d52d055..a49ddf9 100644
--- a/Android.bp
+++ b/Android.bp
@@ -62,7 +62,7 @@
     ],
 
     libs: [
-        "ub-uiautomator",
+        "androidx.test.uiautomator_uiautomator",
         "androidx.test.ext.truth",
         "androidx.test.rules",
         "androidx.annotation_annotation",
diff --git a/BUILD b/BUILD
new file mode 100644
index 0000000..9d0ae86
--- /dev/null
+++ b/BUILD
@@ -0,0 +1,34 @@
+# Copyright 2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# Description:
+#   Standalone Android library for downloading and managing files on device.
+
+load("//tools/build_defs/license:license.bzl", "license")
+
+package(
+    default_applicable_licenses = ["//:license"],
+    default_visibility = [
+        "//visibility:public",
+    ],
+    licenses = ["notice"],
+)
+
+license(
+    name = "license",
+    package_name = "mobiledatadownload",
+)
+
+licenses(["notice"])
+
+exports_files(["LICENSE"])
diff --git a/java/com/google/android/libraries/mobiledatadownload/BUILD b/java/com/google/android/libraries/mobiledatadownload/BUILD
index ca39a4e..a462e75 100644
--- a/java/com/google/android/libraries/mobiledatadownload/BUILD
+++ b/java/com/google/android/libraries/mobiledatadownload/BUILD
@@ -36,6 +36,7 @@
         "MobileDataDownload.java",
         "MobileDataDownloadImpl.java",
         "ReadDataFileGroupRequest.java",
+        "ReadDataFileGroupsByFilterRequest.java",
         "RemoveFileGroupRequest.java",
         "RemoveFileGroupsByFilterRequest.java",
         "RemoveFileGroupsByFilterResponse.java",
diff --git a/java/com/google/android/libraries/mobiledatadownload/MobileDataDownload.java b/java/com/google/android/libraries/mobiledatadownload/MobileDataDownload.java
index 1894e86..0db3be8 100644
--- a/java/com/google/android/libraries/mobiledatadownload/MobileDataDownload.java
+++ b/java/com/google/android/libraries/mobiledatadownload/MobileDataDownload.java
@@ -94,6 +94,25 @@
   }
 
   /**
+   * Gets DataFileGroup definitions that were added to MDD by filter. This API cannot be used to
+   * access files.
+   *
+   * <p>Only present fields in {@link ReadDataFileGroupsByFilterRequest} will be used to perform the
+   * filtering. For example, if no account is specified in the filter, file groups won't be filtered
+   * based on account.
+   *
+   * @param readDataFileGroupsByFilterRequest The request to get multiple data file groups after
+   *     filtering.
+   * @return The ListenableFuture that will resolve to a list of the requested data file groups.
+   *     This ListenableFuture will resolve to all data file groups when {@code
+   *     readDataFileGroupsByFilterRequest.includeAllGroups} is true.
+   */
+  default ListenableFuture<ImmutableList<DataFileGroup>> readDataFileGroupsByFilter(
+      ReadDataFileGroupsByFilterRequest readDataFileGroupsByFilterRequest) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
    * Returns the latest downloaded data that we have for the given group name.
    *
    * <p>This api takes an instance of {@link GetFileGroupRequest} that contains group name, and it
diff --git a/java/com/google/android/libraries/mobiledatadownload/MobileDataDownloadImpl.java b/java/com/google/android/libraries/mobiledatadownload/MobileDataDownloadImpl.java
index 04abda1..c45ef1f 100644
--- a/java/com/google/android/libraries/mobiledatadownload/MobileDataDownloadImpl.java
+++ b/java/com/google/android/libraries/mobiledatadownload/MobileDataDownloadImpl.java
@@ -28,8 +28,10 @@
 import android.content.Context;
 import android.net.Uri;
 import android.text.TextUtils;
+
 import androidx.core.app.NotificationCompat;
 import androidx.core.app.NotificationManagerCompat;
+
 import com.google.android.libraries.mobiledatadownload.DownloadException.DownloadResultCode;
 import com.google.android.libraries.mobiledatadownload.TaskScheduler.ConstraintOverrides;
 import com.google.android.libraries.mobiledatadownload.TaskScheduler.NetworkState;
@@ -66,14 +68,18 @@
 import com.google.mobiledatadownload.ClientConfigProto.ClientFileGroup;
 import com.google.mobiledatadownload.DownloadConfigProto;
 import com.google.mobiledatadownload.DownloadConfigProto.DataFileGroup;
+import com.google.mobiledatadownload.LogEnumsProto.MddLibApiName;
+import com.google.mobiledatadownload.LogEnumsProto.MddLibApiResult;
+import com.google.mobiledatadownload.LogProto.DataDownloadFileGroupStats;
+import com.google.mobiledatadownload.LogProto.MddLibApiResultLog;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFile;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFileGroupInternal;
 import com.google.mobiledatadownload.internal.MetadataProto.DownloadConditions;
 import com.google.mobiledatadownload.internal.MetadataProto.DownloadConditions.DeviceNetworkPolicy;
 import com.google.mobiledatadownload.internal.MetadataProto.GroupKey;
-import com.google.mobiledatadownload.LogProto.DataDownloadFileGroupStats;
 import com.google.protobuf.Any;
 import com.google.protobuf.InvalidProtocolBufferException;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -84,6 +90,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+
 import javax.annotation.Nullable;
 
 /**
@@ -92,1711 +99,1933 @@
  */
 class MobileDataDownloadImpl implements MobileDataDownload {
 
-  private static final String TAG = "MobileDataDownload";
-  private static final long DUMP_DEBUG_INFO_TIMEOUT = 3;
+    private static final String TAG = "MobileDataDownload";
+    private static final long DUMP_DEBUG_INFO_TIMEOUT = 3;
 
-  private final Context context;
-  private final EventLogger eventLogger;
-  private final List<FileGroupPopulator> fileGroupPopulatorList;
-  private final Optional<TaskScheduler> taskSchedulerOptional;
-  private final MobileDataDownloadManager mobileDataDownloadManager;
-  private final SynchronousFileStorage fileStorage;
-  private final Flags flags;
-  private final Downloader singleFileDownloader;
+    private final Context context;
+    private final EventLogger eventLogger;
+    private final List<FileGroupPopulator> fileGroupPopulatorList;
+    private final Optional<TaskScheduler> taskSchedulerOptional;
+    private final MobileDataDownloadManager mobileDataDownloadManager;
+    private final SynchronousFileStorage fileStorage;
+    private final Flags flags;
+    private final Downloader singleFileDownloader;
 
-  // Track all the on-going foreground downloads. This map is keyed by ForegroundDownloadKey.
-  private final DownloadFutureMap<ClientFileGroup> foregroundDownloadFutureMap;
+    // Track all the on-going foreground downloads. This map is keyed by ForegroundDownloadKey.
+    private final DownloadFutureMap<ClientFileGroup> foregroundDownloadFutureMap;
 
-  // Track all on-going background download requests started by downloadFileGroup. This map is keyed
-  // by ForegroundDownloadKey so request can be kept in sync with foregroundDownloadFutureMap.
-  private final DownloadFutureMap<ClientFileGroup> downloadFutureMap;
+    // Track all on-going background download requests started by downloadFileGroup. This map is
+    // keyed
+    // by ForegroundDownloadKey so request can be kept in sync with foregroundDownloadFutureMap.
+    private final DownloadFutureMap<ClientFileGroup> downloadFutureMap;
 
-  // This executor will execute tasks sequentially.
-  private final Executor sequentialControlExecutor;
-  // ExecutionSequencer will execute a ListenableFuture and its Futures.transforms before taking the
-  // next task (<internal>). Most of MDD API should use
-  // ExecutionSequencer to guarantee Metadata synchronization. Currently only downloadFileGroup and
-  // handleTask APIs do not use ExecutionSequencer since their execution could take long time and
-  // using ExecutionSequencer would block other APIs.
-  private final PropagatedExecutionSequencer futureSerializer =
-      PropagatedExecutionSequencer.create();
-  private final Optional<DownloadProgressMonitor> downloadMonitorOptional;
-  private final Optional<Class<?>> foregroundDownloadServiceClassOptional;
-  private final AsyncFunction<DataFileGroupInternal, Boolean> customFileGroupValidator;
-  private final TimeSource timeSource;
+    // This executor will execute tasks sequentially.
+    private final Executor sequentialControlExecutor;
+    // ExecutionSequencer will execute a ListenableFuture and its Futures.transforms before
+    // taking the
+    // next task (<internal>). Most of MDD API should use
+    // ExecutionSequencer to guarantee Metadata synchronization. Currently only downloadFileGroup
+    // and
+    // handleTask APIs do not use ExecutionSequencer since their execution could take long time and
+    // using ExecutionSequencer would block other APIs.
+    private final PropagatedExecutionSequencer futureSerializer =
+            PropagatedExecutionSequencer.create();
+    private final Optional<DownloadProgressMonitor> downloadMonitorOptional;
+    private final Optional<Class<?>> foregroundDownloadServiceClassOptional;
+    private final AsyncFunction<DataFileGroupInternal, Boolean> customFileGroupValidator;
+    private final TimeSource timeSource;
 
-  MobileDataDownloadImpl(
-      Context context,
-      EventLogger eventLogger,
-      MobileDataDownloadManager mobileDataDownloadManager,
-      Executor sequentialControlExecutor,
-      List<FileGroupPopulator> fileGroupPopulatorList,
-      Optional<TaskScheduler> taskSchedulerOptional,
-      SynchronousFileStorage fileStorage,
-      Optional<DownloadProgressMonitor> downloadMonitorOptional,
-      Optional<Class<?>> foregroundDownloadServiceClassOptional,
-      Flags flags,
-      Downloader singleFileDownloader,
-      Optional<CustomFileGroupValidator> customValidatorOptional,
-      TimeSource timeSource) {
-    this.context = context;
-    this.eventLogger = eventLogger;
-    this.fileGroupPopulatorList = fileGroupPopulatorList;
-    this.taskSchedulerOptional = taskSchedulerOptional;
-    this.sequentialControlExecutor = sequentialControlExecutor;
-    this.mobileDataDownloadManager = mobileDataDownloadManager;
-    this.fileStorage = fileStorage;
-    this.downloadMonitorOptional = downloadMonitorOptional;
-    this.foregroundDownloadServiceClassOptional = foregroundDownloadServiceClassOptional;
-    this.flags = flags;
-    this.singleFileDownloader = singleFileDownloader;
-    this.customFileGroupValidator =
-        createCustomFileGroupValidator(
-            customValidatorOptional,
-            mobileDataDownloadManager,
-            sequentialControlExecutor,
-            fileStorage);
-    this.downloadFutureMap = DownloadFutureMap.create(sequentialControlExecutor);
-    this.foregroundDownloadFutureMap =
-        DownloadFutureMap.create(
-            sequentialControlExecutor,
-            createCallbacksForForegroundService(context, foregroundDownloadServiceClassOptional));
-    this.timeSource = timeSource;
-  }
-
-  // Wraps the custom validator because the validation at a lower level of the stack where
-  // the ClientFileGroup is not available, yet ClientFileGroup is the client-facing API we'd
-  // like to expose.
-  private static AsyncFunction<DataFileGroupInternal, Boolean> createCustomFileGroupValidator(
-      Optional<CustomFileGroupValidator> validatorOptional,
-      MobileDataDownloadManager mobileDataDownloadManager,
-      Executor executor,
-      SynchronousFileStorage fileStorage) {
-    if (!validatorOptional.isPresent()) {
-      return unused -> immediateFuture(true);
+    MobileDataDownloadImpl(
+            Context context,
+            EventLogger eventLogger,
+            MobileDataDownloadManager mobileDataDownloadManager,
+            Executor sequentialControlExecutor,
+            List<FileGroupPopulator> fileGroupPopulatorList,
+            Optional<TaskScheduler> taskSchedulerOptional,
+            SynchronousFileStorage fileStorage,
+            Optional<DownloadProgressMonitor> downloadMonitorOptional,
+            Optional<Class<?>> foregroundDownloadServiceClassOptional,
+            Flags flags,
+            Downloader singleFileDownloader,
+            Optional<CustomFileGroupValidator> customValidatorOptional,
+            TimeSource timeSource) {
+        this.context = context;
+        this.eventLogger = eventLogger;
+        this.fileGroupPopulatorList = fileGroupPopulatorList;
+        this.taskSchedulerOptional = taskSchedulerOptional;
+        this.sequentialControlExecutor = sequentialControlExecutor;
+        this.mobileDataDownloadManager = mobileDataDownloadManager;
+        this.fileStorage = fileStorage;
+        this.downloadMonitorOptional = downloadMonitorOptional;
+        this.foregroundDownloadServiceClassOptional = foregroundDownloadServiceClassOptional;
+        this.flags = flags;
+        this.singleFileDownloader = singleFileDownloader;
+        this.customFileGroupValidator =
+                createCustomFileGroupValidator(
+                        customValidatorOptional,
+                        mobileDataDownloadManager,
+                        sequentialControlExecutor,
+                        fileStorage);
+        this.downloadFutureMap = DownloadFutureMap.create(sequentialControlExecutor);
+        this.foregroundDownloadFutureMap =
+                DownloadFutureMap.create(
+                        sequentialControlExecutor,
+                        createCallbacksForForegroundService(context,
+                                foregroundDownloadServiceClassOptional));
+        this.timeSource = timeSource;
     }
 
-    return internalFileGroup ->
-        PropagatedFutures.transformAsync(
-            createClientFileGroup(
-                internalFileGroup,
-                /* account= */ null,
-                ClientFileGroup.Status.PENDING_CUSTOM_VALIDATION,
-                /* preserveZipDirectories= */ false,
-                /* verifyIsolatedStructure= */ true,
-                mobileDataDownloadManager,
-                executor,
-                fileStorage),
-            propagateAsyncFunction(
-                clientFileGroup -> validatorOptional.get().validateFileGroup(clientFileGroup)),
-            executor);
-  }
-
-  /**
-   * Functional interface used as callback for logging file group stats. Used to create file group
-   * stats from the result of the future.
-   *
-   * @see attachMddApiLogging
-   */
-  private interface StatsFromApiResultCreator<T> {
-    DataDownloadFileGroupStats create(T result);
-  }
-
-  /**
-   * Functional interface used as callback when logging API result. Used to get the API result code
-   * from the result of the API future if it succeeds.
-   *
-   * <p>Note: The need for this is due to {@link addFileGroup} returning false instead of an
-   * exception if it fails. For other APIs with proper exception handling, it should suffice to
-   * immediately return the success code.
-   *
-   * <p>TODO(b/143572409): Remove once addGroupForDownload is updated to return void.
-   *
-   * @see attachMddApiLogging
-   */
-  private interface ResultCodeFromApiResultGetter<T> {
-    int get(T result);
-  }
-
-  /**
-   * Helper function used to log mdd api stats. Adds FutureCallback to the {@code resultFuture}
-   * which is the result of mdd api call and logs in onSuccess and onFailure functions of callback.
-   *
-   * @param apiName Code of the api being logged.
-   * @param resultFuture Future result of the api call.
-   * @param startTimeNs start time in ns.
-   * @param defaultFileGroupStats Initial file group stats.
-   * @param statsCreator This functional interface is invoked from the onSuccess of FutureCallback
-   *     with the result of the future. File group stats returned here is merged with the initial
-   *     stats and logged.
-   */
-  private <T> void attachMddApiLogging(
-      int apiName,
-      ListenableFuture<T> resultFuture,
-      long startTimeNs,
-      DataDownloadFileGroupStats defaultFileGroupStats,
-      StatsFromApiResultCreator<T> statsCreator,
-      ResultCodeFromApiResultGetter<T> resultCodeGetter) {
-    // Using listener instead of transform since we need to log even if the future fails.
-    // Note: Listener is being registered on directexecutor for accurate latency measurement.
-    resultFuture.addListener(
-        propagateRunnable(
-            () -> {
-              long latencyNs = timeSource.elapsedRealtimeNanos() - startTimeNs;
-              // Log the stats asynchronously.
-              // Note: To avoid adding latency to mdd api calls, log asynchronously.
-              var unused =
-                  PropagatedFutures.submit(
-                      () -> {
-                        int resultCode;
-                        T result = null;
-                        DataDownloadFileGroupStats fileGroupStats = defaultFileGroupStats;
-                        try {
-                          result = Futures.getDone(resultFuture);
-                          resultCode = resultCodeGetter.get(result);
-                        } catch (Throwable t) {
-                          resultCode = ExceptionToMddResultMapper.map(t);
-                        }
-
-                        // Merge stats created from result of api with the default stats.
-                        if (result != null) {
-                          fileGroupStats =
-                              fileGroupStats.toBuilder()
-                                  .mergeFrom(statsCreator.create(result))
-                                  .build();
-                        }
-
-                        Void resultLog = null;
-
-                        eventLogger.logMddLibApiResultLog(resultLog);
-                      },
-                      sequentialControlExecutor);
-            }),
-        directExecutor());
-  }
-
-  @Override
-  public ListenableFuture<Boolean> addFileGroup(AddFileGroupRequest addFileGroupRequest) {
-    long startTimeNs = timeSource.elapsedRealtimeNanos();
-
-    ListenableFuture<Boolean> resultFuture =
-        futureSerializer.submitAsync(
-            () -> addFileGroupHelper(addFileGroupRequest), sequentialControlExecutor);
-
-    DataDownloadFileGroupStats defaultFileGroupStats =
-        DataDownloadFileGroupStats.newBuilder()
-            .setFileGroupName(addFileGroupRequest.dataFileGroup().getGroupName())
-            .setBuildId(addFileGroupRequest.dataFileGroup().getBuildId())
-            .setVariantId(addFileGroupRequest.dataFileGroup().getVariantId())
-            .setHasAccount(addFileGroupRequest.accountOptional().isPresent())
-            .setFileGroupVersionNumber(
-                addFileGroupRequest.dataFileGroup().getFileGroupVersionNumber())
-            .setOwnerPackage(addFileGroupRequest.dataFileGroup().getOwnerPackage())
-            .setFileCount(addFileGroupRequest.dataFileGroup().getFileCount())
-            .build();
-    attachMddApiLogging(
-        0,
-        resultFuture,
-        startTimeNs,
-        defaultFileGroupStats,
-        /* statsCreator= */ unused -> defaultFileGroupStats,
-        /* resultCodeGetter= */ succeeded -> succeeded ? 0 : 0);
-
-    return resultFuture;
-  }
-
-  private ListenableFuture<Boolean> addFileGroupHelper(AddFileGroupRequest addFileGroupRequest) {
-    LogUtil.d(
-        "%s: Adding for download group = '%s', variant = '%s', buildId = '%d' and"
-            + " associating it with account = '%s', variant = '%s'",
-        TAG,
-        addFileGroupRequest.dataFileGroup().getGroupName(),
-        addFileGroupRequest.dataFileGroup().getVariantId(),
-        addFileGroupRequest.dataFileGroup().getBuildId(),
-        String.valueOf(addFileGroupRequest.accountOptional().orNull()),
-        String.valueOf(addFileGroupRequest.variantIdOptional().orNull()));
-
-    DataFileGroup dataFileGroup = addFileGroupRequest.dataFileGroup();
-
-    // Ensure that the owner package is always set as the host app.
-    if (!dataFileGroup.hasOwnerPackage()) {
-      dataFileGroup = dataFileGroup.toBuilder().setOwnerPackage(context.getPackageName()).build();
-    } else if (!context.getPackageName().equals(dataFileGroup.getOwnerPackage())) {
-      LogUtil.e(
-          "%s: Added group = '%s' with wrong owner package: '%s' v.s. '%s' ",
-          TAG,
-          dataFileGroup.getGroupName(),
-          context.getPackageName(),
-          dataFileGroup.getOwnerPackage());
-      return immediateFuture(false);
-    }
-
-    GroupKey.Builder groupKeyBuilder =
-        GroupKey.newBuilder()
-            .setGroupName(dataFileGroup.getGroupName())
-            .setOwnerPackage(dataFileGroup.getOwnerPackage());
-
-    if (addFileGroupRequest.accountOptional().isPresent()) {
-      groupKeyBuilder.setAccount(
-          AccountUtil.serialize(addFileGroupRequest.accountOptional().get()));
-    }
-
-    if (addFileGroupRequest.variantIdOptional().isPresent()) {
-      groupKeyBuilder.setVariantId(addFileGroupRequest.variantIdOptional().get());
-    }
-
-    try {
-      DataFileGroupInternal dataFileGroupInternal = ProtoConversionUtil.convert(dataFileGroup);
-      return mobileDataDownloadManager.addGroupForDownloadInternal(
-          groupKeyBuilder.build(), dataFileGroupInternal, customFileGroupValidator);
-    } catch (InvalidProtocolBufferException e) {
-      // TODO(b/118137672): Consider rethrow exception instead of returning false.
-      LogUtil.e(e, "%s: Unable to convert from DataFileGroup to DataFileGroupInternal.", TAG);
-      return immediateFuture(false);
-    }
-  }
-
-  // TODO: Change to return ListenableFuture<Void>.
-  @Override
-  public ListenableFuture<Boolean> removeFileGroup(RemoveFileGroupRequest removeFileGroupRequest) {
-    return futureSerializer.submitAsync(
-        () -> {
-          GroupKey.Builder groupKeyBuilder =
-              GroupKey.newBuilder()
-                  .setGroupName(removeFileGroupRequest.groupName())
-                  .setOwnerPackage(context.getPackageName());
-          if (removeFileGroupRequest.accountOptional().isPresent()) {
-            groupKeyBuilder.setAccount(
-                AccountUtil.serialize(removeFileGroupRequest.accountOptional().get()));
-          }
-          if (removeFileGroupRequest.variantIdOptional().isPresent()) {
-            groupKeyBuilder.setVariantId(removeFileGroupRequest.variantIdOptional().get());
-          }
-
-          GroupKey groupKey = groupKeyBuilder.build();
-          return PropagatedFutures.transform(
-              mobileDataDownloadManager.removeFileGroup(
-                  groupKey, removeFileGroupRequest.pendingOnly()),
-              voidArg -> true,
-              sequentialControlExecutor);
-        },
-        sequentialControlExecutor);
-  }
-
-  @Override
-  public ListenableFuture<RemoveFileGroupsByFilterResponse> removeFileGroupsByFilter(
-      RemoveFileGroupsByFilterRequest removeFileGroupsByFilterRequest) {
-    return futureSerializer.submitAsync(
-        () ->
-            PropagatedFluentFuture.from(mobileDataDownloadManager.getAllFreshGroups())
-                .transformAsync(
-                    allFreshGroupKeyAndGroups -> {
-                      ImmutableSet.Builder<GroupKey> groupKeysToRemoveBuilder =
-                          ImmutableSet.builder();
-                      for (GroupKeyAndGroup groupKeyAndGroup : allFreshGroupKeyAndGroups) {
-                        if (applyRemoveFileGroupsFilter(
-                            removeFileGroupsByFilterRequest, groupKeyAndGroup)) {
-                          // Remove downloaded status so pending/downloaded versions of the same
-                          // group are treated as one.
-                          groupKeysToRemoveBuilder.add(
-                              groupKeyAndGroup.groupKey().toBuilder().clearDownloaded().build());
-                        }
-                      }
-                      ImmutableSet<GroupKey> groupKeysToRemove = groupKeysToRemoveBuilder.build();
-                      if (groupKeysToRemove.isEmpty()) {
-                        return immediateFuture(
-                            RemoveFileGroupsByFilterResponse.newBuilder()
-                                .setRemovedFileGroupsCount(0)
-                                .build());
-                      }
-                      return PropagatedFutures.transform(
-                          mobileDataDownloadManager.removeFileGroups(groupKeysToRemove.asList()),
-                          unused ->
-                              RemoveFileGroupsByFilterResponse.newBuilder()
-                                  .setRemovedFileGroupsCount(groupKeysToRemove.size())
-                                  .build(),
-                          sequentialControlExecutor);
-                    },
-                    sequentialControlExecutor),
-        sequentialControlExecutor);
-  }
-
-  // Perform filtering using options from RemoveFileGroupsByFilterRequest
-  private static boolean applyRemoveFileGroupsFilter(
-      RemoveFileGroupsByFilterRequest removeFileGroupsByFilterRequest,
-      GroupKeyAndGroup groupKeyAndGroup) {
-    // If request filters by account, ensure account is present and is equal
-    Optional<Account> accountOptional = removeFileGroupsByFilterRequest.accountOptional();
-    if (!accountOptional.isPresent() && groupKeyAndGroup.groupKey().hasAccount()) {
-      // Account must explicitly be provided in order to remove account associated file groups.
-      return false;
-    }
-    if (accountOptional.isPresent()
-        && !AccountUtil.serialize(accountOptional.get())
-            .equals(groupKeyAndGroup.groupKey().getAccount())) {
-      return false;
-    }
-
-    return true;
-  }
-
-  /**
-   * Helper function to create {@link DataDownloadFileGroupStats} object from {@link
-   * GetFileGroupRequest} for getFileGroup() logging.
-   *
-   * <p>Used when the matching file group is not found or a failure occurred.
-   * file_group_version_number and build_id are set to -1 by default.
-   */
-  private DataDownloadFileGroupStats createFileGroupStatsFromGetFileGroupRequest(
-      GetFileGroupRequest getFileGroupRequest) {
-    DataDownloadFileGroupStats.Builder fileGroupStatsBuilder =
-        DataDownloadFileGroupStats.newBuilder();
-    fileGroupStatsBuilder.setFileGroupName(getFileGroupRequest.groupName());
-    if (getFileGroupRequest.variantIdOptional().isPresent()) {
-      fileGroupStatsBuilder.setVariantId(getFileGroupRequest.variantIdOptional().get());
-    }
-    if (getFileGroupRequest.accountOptional().isPresent()) {
-      fileGroupStatsBuilder.setHasAccount(true);
-    } else {
-      fileGroupStatsBuilder.setHasAccount(false);
-    }
-
-    fileGroupStatsBuilder.setFileGroupVersionNumber(
-        MddConstants.FILE_GROUP_NOT_FOUND_FILE_GROUP_VERSION_NUMBER);
-    fileGroupStatsBuilder.setBuildId(MddConstants.FILE_GROUP_NOT_FOUND_BUILD_ID);
-
-    return fileGroupStatsBuilder.build();
-  }
-
-  // TODO: Futures.immediateFuture(null) uses a different annotation for Nullable.
-  @SuppressWarnings("nullness")
-  @Override
-  public ListenableFuture<ClientFileGroup> getFileGroup(GetFileGroupRequest getFileGroupRequest) {
-    long startTimeNs = timeSource.elapsedRealtimeNanos();
-
-    ListenableFuture<ClientFileGroup> resultFuture =
-        futureSerializer.submitAsync(
-            () -> {
-              GroupKey groupKey =
-                  createGroupKey(
-                      getFileGroupRequest.groupName(),
-                      getFileGroupRequest.accountOptional(),
-                      getFileGroupRequest.variantIdOptional());
-              return PropagatedFutures.transformAsync(
-                  mobileDataDownloadManager.getFileGroup(groupKey, /* downloaded= */ true),
-                  dataFileGroup ->
-                      createClientFileGroupAndLogQueryStats(
-                          groupKey,
-                          dataFileGroup,
-                          /* downloaded= */ true,
-                          getFileGroupRequest.preserveZipDirectories(),
-                          getFileGroupRequest.verifyIsolatedStructure()),
-                  sequentialControlExecutor);
-            },
-            sequentialControlExecutor);
-
-    attachMddApiLogging(
-        0,
-        resultFuture,
-        startTimeNs,
-        createFileGroupStatsFromGetFileGroupRequest(getFileGroupRequest),
-        /* statsCreator= */ result -> createFileGroupDetails(result),
-        /* resultCodeGetter= */ unused -> 0);
-    return resultFuture;
-  }
-
-  @SuppressWarnings("nullness")
-  @Override
-  public ListenableFuture<DataFileGroup> readDataFileGroup(
-      ReadDataFileGroupRequest readDataFileGroupRequest) {
-    return futureSerializer.submitAsync(
-        () -> {
-          GroupKey groupKey =
-              createGroupKey(
-                  readDataFileGroupRequest.groupName(),
-                  readDataFileGroupRequest.accountOptional(),
-                  readDataFileGroupRequest.variantIdOptional());
-          return PropagatedFutures.transformAsync(
-              mobileDataDownloadManager.getFileGroup(groupKey, /* downloaded= */ true),
-              internalFileGroup -> immediateFuture(ProtoConversionUtil.reverse(internalFileGroup)),
-              sequentialControlExecutor);
-        },
-        sequentialControlExecutor);
-  }
-
-  private GroupKey createGroupKey(
-      String groupName, Optional<Account> accountOptional, Optional<String> variantOptional) {
-    GroupKey.Builder groupKeyBuilder =
-        GroupKey.newBuilder().setGroupName(groupName).setOwnerPackage(context.getPackageName());
-
-    if (accountOptional.isPresent()) {
-      groupKeyBuilder.setAccount(AccountUtil.serialize(accountOptional.get()));
-    }
-
-    if (variantOptional.isPresent()) {
-      groupKeyBuilder.setVariantId(variantOptional.get());
-    }
-
-    return groupKeyBuilder.build();
-  }
-
-  private ListenableFuture<ClientFileGroup> createClientFileGroupAndLogQueryStats(
-      GroupKey groupKey,
-      @Nullable DataFileGroupInternal dataFileGroup,
-      boolean downloaded,
-      boolean preserveZipDirectories,
-      boolean verifyIsolatedStructure) {
-    return PropagatedFutures.transform(
-        createClientFileGroup(
-            dataFileGroup,
-            groupKey.hasAccount() ? groupKey.getAccount() : null,
-            downloaded ? ClientFileGroup.Status.DOWNLOADED : ClientFileGroup.Status.PENDING,
-            preserveZipDirectories,
-            verifyIsolatedStructure,
-            mobileDataDownloadManager,
-            sequentialControlExecutor,
-            fileStorage),
-        clientFileGroup -> {
-          if (clientFileGroup != null) {
-            eventLogger.logMddQueryStats(createFileGroupDetails(clientFileGroup));
-          }
-          return clientFileGroup;
-        },
-        sequentialControlExecutor);
-  }
-
-  @SuppressWarnings("nullness")
-  private static ListenableFuture<ClientFileGroup> createClientFileGroup(
-      @Nullable DataFileGroupInternal dataFileGroup,
-      @Nullable String account,
-      ClientFileGroup.Status status,
-      boolean preserveZipDirectories,
-      boolean verifyIsolatedStructure,
-      MobileDataDownloadManager manager,
-      Executor executor,
-      SynchronousFileStorage fileStorage) {
-    if (dataFileGroup == null) {
-      return immediateFuture(null);
-    }
-    ClientFileGroup.Builder clientFileGroupBuilder =
-        ClientFileGroup.newBuilder()
-            .setGroupName(dataFileGroup.getGroupName())
-            .setOwnerPackage(dataFileGroup.getOwnerPackage())
-            .setVersionNumber(dataFileGroup.getFileGroupVersionNumber())
-//            .setCustomProperty(dataFileGroup.getCustomProperty())
-            .setBuildId(dataFileGroup.getBuildId())
-            .setVariantId(dataFileGroup.getVariantId())
-            .setStatus(status)
-            .addAllLocale(dataFileGroup.getLocaleList());
-
-    if (account != null) {
-      clientFileGroupBuilder.setAccount(account);
-    }
-
-    if (dataFileGroup.hasCustomMetadata()) {
-      clientFileGroupBuilder.setCustomMetadata(dataFileGroup.getCustomMetadata());
-    }
-
-    List<DataFile> dataFiles = dataFileGroup.getFileList();
-    ListenableFuture<Void> addOnDeviceUrisFuture = immediateVoidFuture();
-    if (status == ClientFileGroup.Status.DOWNLOADED
-        || status == ClientFileGroup.Status.PENDING_CUSTOM_VALIDATION) {
-      addOnDeviceUrisFuture =
-          PropagatedFluentFuture.from(
-                  manager.getDataFileUris(dataFileGroup, verifyIsolatedStructure))
-              .transformAsync(
-                  dataFileUriMap -> {
-                    for (DataFile dataFile : dataFiles) {
-                      if (!dataFileUriMap.containsKey(dataFile)) {
-                        return immediateFailedFuture(
-                            DownloadException.builder()
-                                .setDownloadResultCode(
-                                    DownloadResultCode.DOWNLOADED_FILE_NOT_FOUND_ERROR)
-                                .setMessage("getDataFileUris() resolved to null")
-                                .build());
-                      }
-                      Uri uri = dataFileUriMap.get(dataFile);
-
-                      try {
-                        if (!preserveZipDirectories && fileStorage.isDirectory(uri)) {
-                          String rootPath = uri.getPath();
-                          if (rootPath != null) {
-                            clientFileGroupBuilder.addAllFile(
-                                listAllClientFilesOfDirectory(fileStorage, uri, rootPath));
-                          }
-                        } else {
-                          clientFileGroupBuilder.addFile(
-                              createClientFile(
-                                  dataFile.getFileId(),
-                                  dataFile.getByteSize(),
-                                  dataFile.getDownloadedFileByteSize(),
-                                  uri.toString(),
-                                  dataFile.hasCustomMetadata()
-                                      ? dataFile.getCustomMetadata()
-                                      : null));
-                        }
-                      } catch (IOException e) {
-                        LogUtil.e(e, "Failed to list files under directory:" + uri);
-                      }
-                    }
-                    return immediateVoidFuture();
-                  },
-                  executor);
-    } else {
-      for (DataFile dataFile : dataFiles) {
-        clientFileGroupBuilder.addFile(
-            createClientFile(
-                dataFile.getFileId(),
-                dataFile.getByteSize(),
-                dataFile.getDownloadedFileByteSize(),
-                /* uri= */ null,
-                dataFile.hasCustomMetadata() ? dataFile.getCustomMetadata() : null));
-      }
-    }
-
-    return PropagatedFluentFuture.from(addOnDeviceUrisFuture)
-        .transform(unused -> clientFileGroupBuilder.build(), executor)
-        .catching(DownloadException.class, exn -> null, executor);
-  }
-
-  private static ClientFile createClientFile(
-      String fileId,
-      int byteSize,
-      int downloadByteSize,
-      @Nullable String uri,
-      @Nullable Any customMetadata) {
-    ClientFile.Builder clientFileBuilder =
-        ClientFile.newBuilder().setFileId(fileId).setFullSizeInBytes(byteSize);
-    if (downloadByteSize > 0) {
-      // Files with downloaded transforms like compress and zip could have different downloaded
-      // file size than the final file size on disk. Return the downloaded file size for client to
-      // track and calculate the download progress.
-      clientFileBuilder.setDownloadSizeInBytes(downloadByteSize);
-    }
-    if (uri != null) {
-      clientFileBuilder.setFileUri(uri);
-    }
-    if (customMetadata != null) {
-      clientFileBuilder.setCustomMetadata(customMetadata);
-    }
-    return clientFileBuilder.build();
-  }
-
-  private static List<ClientFile> listAllClientFilesOfDirectory(
-      SynchronousFileStorage fileStorage, Uri dirUri, String rootDir) throws IOException {
-    List<ClientFile> clientFileList = new ArrayList<>();
-    for (Uri childUri : fileStorage.children(dirUri)) {
-      if (fileStorage.isDirectory(childUri)) {
-        clientFileList.addAll(listAllClientFilesOfDirectory(fileStorage, childUri, rootDir));
-      } else {
-        String childPath = childUri.getPath();
-        if (childPath != null) {
-          ClientFile clientFile =
-              ClientFile.newBuilder()
-                  .setFileId(childPath.replaceFirst(rootDir, ""))
-                  .setFullSizeInBytes((int) fileStorage.fileSize(childUri))
-                  .setFileUri(childUri.toString())
-                  .build();
-          clientFileList.add(clientFile);
+    // Wraps the custom validator because the validation at a lower level of the stack where
+    // the ClientFileGroup is not available, yet ClientFileGroup is the client-facing API we'd
+    // like to expose.
+    private static AsyncFunction<DataFileGroupInternal, Boolean> createCustomFileGroupValidator(
+            Optional<CustomFileGroupValidator> validatorOptional,
+            MobileDataDownloadManager mobileDataDownloadManager,
+            Executor executor,
+            SynchronousFileStorage fileStorage) {
+        if (!validatorOptional.isPresent()) {
+            return unused -> immediateFuture(true);
         }
-      }
-    }
-    return clientFileList;
-  }
 
-  @Override
-  public ListenableFuture<ImmutableList<ClientFileGroup>> getFileGroupsByFilter(
-      GetFileGroupsByFilterRequest getFileGroupsByFilterRequest) {
-    return futureSerializer.submitAsync(
-        () ->
-            PropagatedFutures.transformAsync(
-                mobileDataDownloadManager.getAllFreshGroups(),
-                allFreshGroupKeyAndGroups -> {
-                  ListenableFuture<ImmutableList.Builder<ClientFileGroup>>
-                      clientFileGroupsBuilderFuture =
-                          immediateFuture(ImmutableList.<ClientFileGroup>builder());
-                  for (GroupKeyAndGroup groupKeyAndGroup : allFreshGroupKeyAndGroups) {
-                    clientFileGroupsBuilderFuture =
-                        PropagatedFutures.transformAsync(
-                            clientFileGroupsBuilderFuture,
-                            clientFileGroupsBuilder -> {
-                              GroupKey groupKey = groupKeyAndGroup.groupKey();
-                              DataFileGroupInternal dataFileGroup =
-                                  groupKeyAndGroup.dataFileGroup();
-                              if (applyFilter(
-                                  getFileGroupsByFilterRequest, groupKey, dataFileGroup)) {
-                                return PropagatedFutures.transform(
-                                    createClientFileGroupAndLogQueryStats(
-                                        groupKey,
-                                        dataFileGroup,
-                                        groupKey.getDownloaded(),
-                                        getFileGroupsByFilterRequest.preserveZipDirectories(),
-                                        getFileGroupsByFilterRequest.verifyIsolatedStructure()),
-                                    clientFileGroup -> {
-                                      if (clientFileGroup != null) {
-                                        clientFileGroupsBuilder.add(clientFileGroup);
-                                      }
-                                      return clientFileGroupsBuilder;
-                                    },
-                                    sequentialControlExecutor);
-                              }
-                              return immediateFuture(clientFileGroupsBuilder);
-                            },
+        return internalFileGroup ->
+                PropagatedFutures.transformAsync(
+                        createClientFileGroup(
+                                internalFileGroup,
+                                /* account= */ null,
+                                ClientFileGroup.Status.PENDING_CUSTOM_VALIDATION,
+                                /* preserveZipDirectories= */ false,
+                                /* verifyIsolatedStructure= */ true,
+                                mobileDataDownloadManager,
+                                executor,
+                                fileStorage),
+                        propagateAsyncFunction(
+                                clientFileGroup -> validatorOptional.get().validateFileGroup(
+                                        clientFileGroup)),
+                        executor);
+    }
+
+    /**
+     * Functional interface used as callback for logging file group stats. Used to create file group
+     * stats from the result of the future.
+     *
+     * @see attachMddApiLogging
+     */
+    private interface StatsFromApiResultCreator<T> {
+        DataDownloadFileGroupStats create(T result);
+    }
+
+    /**
+     * Functional interface used as callback when logging API result. Used to get the API result
+     * code
+     * from the result of the API future if it succeeds.
+     *
+     * <p>Note: The need for this is due to {@link addFileGroup} returning false instead of an
+     * exception if it fails. For other APIs with proper exception handling, it should suffice to
+     * immediately return the success code.
+     *
+     * <p>TODO(b/143572409): Remove once addGroupForDownload is updated to return void.
+     *
+     * @see attachMddApiLogging
+     */
+    private interface ResultCodeFromApiResultGetter<T> {
+        MddLibApiResult.Code get(T result);
+    }
+
+    /**
+     * Helper function used to log mdd api stats. Adds FutureCallback to the {@code resultFuture}
+     * which is the result of mdd api call and logs in onSuccess and onFailure functions of
+     * callback.
+     *
+     * @param apiName               Code of the api being logged.
+     * @param resultFuture          Future result of the api call.
+     * @param startTimeNs           start time in ns.
+     * @param defaultFileGroupStats Initial file group stats.
+     * @param statsCreator          This functional interface is invoked from the onSuccess of
+     *                              FutureCallback
+     *                              with the result of the future. File group stats returned here is
+     *                              merged with the initial
+     *                              stats and logged.
+     */
+    private <T> void attachMddApiLogging(
+            MddLibApiName.Code apiName,
+            ListenableFuture<T> resultFuture,
+            long startTimeNs,
+            DataDownloadFileGroupStats defaultFileGroupStats,
+            StatsFromApiResultCreator<T> statsCreator,
+            ResultCodeFromApiResultGetter<T> resultCodeGetter) {
+        // Using listener instead of transform since we need to log even if the future fails.
+        // Note: Listener is being registered on directexecutor for accurate latency measurement.
+        resultFuture.addListener(
+                propagateRunnable(
+                        () -> {
+                            long latencyNs = timeSource.elapsedRealtimeNanos() - startTimeNs;
+                            // Log the stats asynchronously.
+                            // Note: To avoid adding latency to mdd api calls, log asynchronously.
+                            var unused =
+                                    PropagatedFutures.submit(
+                                            () -> {
+                                                MddLibApiResult.Code resultCode;
+                                                T result = null;
+                                                DataDownloadFileGroupStats fileGroupStats =
+                                                        defaultFileGroupStats;
+                                                try {
+                                                    result = Futures.getDone(resultFuture);
+                                                    resultCode = resultCodeGetter.get(result);
+                                                } catch (Throwable t) {
+                                                    resultCode = ExceptionToMddResultMapper.map(t);
+                                                }
+
+                                                // Merge stats created from result of api with
+                                                // the default stats.
+                                                if (result != null) {
+                                                    fileGroupStats =
+                                                            fileGroupStats.toBuilder()
+                                                                    .mergeFrom(statsCreator.create(
+                                                                            result))
+                                                                    .build();
+                                                }
+
+                                                MddLibApiResultLog resultLog =
+                                                        MddLibApiResultLog.newBuilder()
+                                                                .setApiUsed(apiName)
+                                                                .setResult(resultCode)
+                                                                .setLatencyNs(latencyNs)
+                                                                .addDataDownloadFileGroupStats(
+                                                                        fileGroupStats)
+                                                                .build();
+
+                                                eventLogger.logMddLibApiResultLog(resultLog);
+                                            },
+                                            sequentialControlExecutor);
+                        }),
+                directExecutor());
+    }
+
+    @Override
+    public ListenableFuture<Boolean> addFileGroup(AddFileGroupRequest addFileGroupRequest) {
+        long startTimeNs = timeSource.elapsedRealtimeNanos();
+
+        ListenableFuture<Boolean> resultFuture =
+                futureSerializer.submitAsync(
+                        () -> addFileGroupHelper(addFileGroupRequest), sequentialControlExecutor);
+
+        DataDownloadFileGroupStats defaultFileGroupStats =
+                DataDownloadFileGroupStats.newBuilder()
+                        .setFileGroupName(addFileGroupRequest.dataFileGroup().getGroupName())
+                        .setBuildId(addFileGroupRequest.dataFileGroup().getBuildId())
+                        .setVariantId(addFileGroupRequest.dataFileGroup().getVariantId())
+                        .setHasAccount(addFileGroupRequest.accountOptional().isPresent())
+                        .setFileGroupVersionNumber(
+                                addFileGroupRequest.dataFileGroup().getFileGroupVersionNumber())
+                        .setOwnerPackage(addFileGroupRequest.dataFileGroup().getOwnerPackage())
+                        .setFileCount(addFileGroupRequest.dataFileGroup().getFileCount())
+                        .build();
+        attachMddApiLogging(
+                MddLibApiName.Code.ADD_FILE_GROUP,
+                resultFuture,
+                startTimeNs,
+                defaultFileGroupStats,
+                /* statsCreator= */ unused -> defaultFileGroupStats,
+                /* resultCodeGetter= */ succeeded ->
+                        succeeded ? MddLibApiResult.Code.RESULT_SUCCESS
+                                : MddLibApiResult.Code.RESULT_FAILURE);
+
+        return resultFuture;
+    }
+
+    private ListenableFuture<Boolean> addFileGroupHelper(AddFileGroupRequest addFileGroupRequest) {
+        LogUtil.d(
+                "%s: Adding for download group = '%s', variant = '%s', buildId = '%d' and"
+                        + " associating it with account = '%s', variant = '%s'",
+                TAG,
+                addFileGroupRequest.dataFileGroup().getGroupName(),
+                addFileGroupRequest.dataFileGroup().getVariantId(),
+                addFileGroupRequest.dataFileGroup().getBuildId(),
+                String.valueOf(addFileGroupRequest.accountOptional().orNull()),
+                String.valueOf(addFileGroupRequest.variantIdOptional().orNull()));
+
+        DataFileGroup dataFileGroup = addFileGroupRequest.dataFileGroup();
+
+        // Ensure that the owner package is always set as the host app.
+        if (!dataFileGroup.hasOwnerPackage()) {
+            dataFileGroup = dataFileGroup.toBuilder().setOwnerPackage(
+                    context.getPackageName()).build();
+        } else if (!context.getPackageName().equals(dataFileGroup.getOwnerPackage())) {
+            LogUtil.e(
+                    "%s: Added group = '%s' with wrong owner package: '%s' v.s. '%s' ",
+                    TAG,
+                    dataFileGroup.getGroupName(),
+                    context.getPackageName(),
+                    dataFileGroup.getOwnerPackage());
+            return immediateFuture(false);
+        }
+
+        GroupKey.Builder groupKeyBuilder =
+                GroupKey.newBuilder()
+                        .setGroupName(dataFileGroup.getGroupName())
+                        .setOwnerPackage(dataFileGroup.getOwnerPackage());
+
+        if (addFileGroupRequest.accountOptional().isPresent()) {
+            groupKeyBuilder.setAccount(
+                    AccountUtil.serialize(addFileGroupRequest.accountOptional().get()));
+        }
+
+        if (addFileGroupRequest.variantIdOptional().isPresent()) {
+            groupKeyBuilder.setVariantId(addFileGroupRequest.variantIdOptional().get());
+        }
+
+        try {
+            DataFileGroupInternal dataFileGroupInternal = ProtoConversionUtil.convert(
+                    dataFileGroup);
+            return mobileDataDownloadManager.addGroupForDownloadInternal(
+                    groupKeyBuilder.build(), dataFileGroupInternal, customFileGroupValidator);
+        } catch (InvalidProtocolBufferException e) {
+            // TODO(b/118137672): Consider rethrow exception instead of returning false.
+            LogUtil.e(e, "%s: Unable to convert from DataFileGroup to DataFileGroupInternal.", TAG);
+            return immediateFuture(false);
+        }
+    }
+
+    // TODO: Change to return ListenableFuture<Void>.
+    @Override
+    public ListenableFuture<Boolean> removeFileGroup(
+            RemoveFileGroupRequest removeFileGroupRequest) {
+        return futureSerializer.submitAsync(
+                () -> {
+                    GroupKey.Builder groupKeyBuilder =
+                            GroupKey.newBuilder()
+                                    .setGroupName(removeFileGroupRequest.groupName())
+                                    .setOwnerPackage(context.getPackageName());
+                    if (removeFileGroupRequest.accountOptional().isPresent()) {
+                        groupKeyBuilder.setAccount(
+                                AccountUtil.serialize(
+                                        removeFileGroupRequest.accountOptional().get()));
+                    }
+                    if (removeFileGroupRequest.variantIdOptional().isPresent()) {
+                        groupKeyBuilder.setVariantId(
+                                removeFileGroupRequest.variantIdOptional().get());
+                    }
+
+                    GroupKey groupKey = groupKeyBuilder.build();
+                    return PropagatedFutures.transform(
+                            mobileDataDownloadManager.removeFileGroup(
+                                    groupKey, removeFileGroupRequest.pendingOnly()),
+                            voidArg -> true,
                             sequentialControlExecutor);
-                  }
-
-                  return PropagatedFutures.transform(
-                      clientFileGroupsBuilderFuture,
-                      ImmutableList.Builder::build,
-                      sequentialControlExecutor);
-                },
-                sequentialControlExecutor),
-        sequentialControlExecutor);
-  }
-
-  // Perform filtering using options from GetFileGroupsByFilterRequest
-  private static boolean applyFilter(
-      GetFileGroupsByFilterRequest getFileGroupsByFilterRequest,
-      GroupKey groupKey,
-      DataFileGroupInternal fileGroup) {
-    if (getFileGroupsByFilterRequest.includeAllGroups()) {
-      return true;
-    }
-
-    // If request filters by group name, ensure name is equal
-    Optional<String> groupNameOptional = getFileGroupsByFilterRequest.groupNameOptional();
-    if (groupNameOptional.isPresent()
-        && !TextUtils.equals(groupNameOptional.get(), groupKey.getGroupName())) {
-      return false;
-    }
-
-    // When the caller requests account independent groups only.
-    if (getFileGroupsByFilterRequest.groupWithNoAccountOnly()) {
-      return !groupKey.hasAccount();
-    }
-
-    // When the caller requests account dependent groups as well.
-    if (getFileGroupsByFilterRequest.accountOptional().isPresent()
-        && !AccountUtil.serialize(getFileGroupsByFilterRequest.accountOptional().get())
-            .equals(groupKey.getAccount())) {
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * Creates {@link DataDownloadFileGroupStats} from {@link ClientFileGroup} for remote logging
-   * purposes.
-   */
-  private static DataDownloadFileGroupStats createFileGroupDetails(
-      ClientFileGroup clientFileGroup) {
-    return DataDownloadFileGroupStats.newBuilder()
-        .setFileGroupName(clientFileGroup.getGroupName())
-        .setOwnerPackage(clientFileGroup.getOwnerPackage())
-        .setFileGroupVersionNumber(clientFileGroup.getVersionNumber())
-        .setFileCount(clientFileGroup.getFileCount())
-        .setVariantId(clientFileGroup.getVariantId())
-        .setBuildId(clientFileGroup.getBuildId())
-        .build();
-  }
-
-  @Override
-  public ListenableFuture<Void> importFiles(ImportFilesRequest importFilesRequest) {
-    GroupKey.Builder groupKeyBuilder =
-        GroupKey.newBuilder()
-            .setGroupName(importFilesRequest.groupName())
-            .setOwnerPackage(context.getPackageName());
-
-    if (importFilesRequest.accountOptional().isPresent()) {
-      groupKeyBuilder.setAccount(AccountUtil.serialize(importFilesRequest.accountOptional().get()));
-    }
-
-    GroupKey groupKey = groupKeyBuilder.build();
-
-    ImmutableList.Builder<DataFile> updatedDataFileListBuilder =
-        ImmutableList.builderWithExpectedSize(importFilesRequest.updatedDataFileList().size());
-    for (DownloadConfigProto.DataFile dataFile : importFilesRequest.updatedDataFileList()) {
-      updatedDataFileListBuilder.add(ProtoConversionUtil.convertDataFile(dataFile));
-    }
-
-    return futureSerializer.submitAsync(
-        () ->
-            mobileDataDownloadManager.importFiles(
-                groupKey,
-                importFilesRequest.buildId(),
-                importFilesRequest.variantId(),
-                updatedDataFileListBuilder.build(),
-                importFilesRequest.inlineFileMap(),
-                importFilesRequest.customPropertyOptional(),
-                customFileGroupValidator),
-        sequentialControlExecutor);
-  }
-
-  @Override
-  public ListenableFuture<Void> downloadFile(SingleFileDownloadRequest singleFileDownloadRequest) {
-    return singleFileDownloader.download(
-        MddLiteConversionUtil.convertToDownloadRequest(singleFileDownloadRequest));
-  }
-
-  @Override
-  public ListenableFuture<ClientFileGroup> downloadFileGroup(
-      DownloadFileGroupRequest downloadFileGroupRequest) {
-    // Submit the call to sequentialControlExecutor, but don't use futureSerializer. This will
-    // ensure that multiple calls are enqueued to the executor in a FIFO order, but these calls
-    // won't block each other when the download is in progress.
-    return PropagatedFutures.submitAsync(
-        () ->
-            PropagatedFutures.transformAsync(
-                // Check if requested file group has already been downloaded
-                getDownloadGroupState(downloadFileGroupRequest),
-                downloadGroupState -> {
-                  switch (downloadGroupState.getKind()) {
-                    case IN_PROGRESS_FUTURE:
-                      // If the file group download is in progress, return that future immediately
-                      return downloadGroupState.inProgressFuture();
-                    case DOWNLOADED_GROUP:
-                      // If the file group is already downloaded, return that immediately.
-                      return immediateFuture(downloadGroupState.downloadedGroup());
-                    case PENDING_GROUP:
-                      return downloadPendingFileGroup(downloadFileGroupRequest);
-                  }
-                  throw new AssertionError(
-                      String.format(
-                          "received unsupported DownloadGroupState kind %s",
-                          downloadGroupState.getKind()));
-                },
-                sequentialControlExecutor),
-        sequentialControlExecutor);
-  }
-
-  /** Helper method to download a group after it's determined to be pending. */
-  private ListenableFuture<ClientFileGroup> downloadPendingFileGroup(
-      DownloadFileGroupRequest downloadFileGroupRequest) {
-    String groupName = downloadFileGroupRequest.groupName();
-    GroupKey.Builder groupKeyBuilder =
-        GroupKey.newBuilder().setGroupName(groupName).setOwnerPackage(context.getPackageName());
-
-    if (downloadFileGroupRequest.accountOptional().isPresent()) {
-      groupKeyBuilder.setAccount(
-          AccountUtil.serialize(downloadFileGroupRequest.accountOptional().get()));
-    }
-    if (downloadFileGroupRequest.variantIdOptional().isPresent()) {
-      groupKeyBuilder.setVariantId(downloadFileGroupRequest.variantIdOptional().get());
-    }
-
-    GroupKey groupKey = groupKeyBuilder.build();
-
-    if (downloadFileGroupRequest.listenerOptional().isPresent()) {
-      if (downloadMonitorOptional.isPresent()) {
-        downloadMonitorOptional
-            .get()
-            .addDownloadListener(groupName, downloadFileGroupRequest.listenerOptional().get());
-      } else {
-        return immediateFailedFuture(
-            DownloadException.builder()
-                .setDownloadResultCode(DownloadResultCode.DOWNLOAD_MONITOR_NOT_PROVIDED_ERROR)
-                .setMessage(
-                    "downloadFileGroup: DownloadListener is present but Download Monitor"
-                        + " is not provided!")
-                .build());
-      }
-    }
-
-    Optional<DownloadConditions> downloadConditions;
-    try {
-      downloadConditions =
-          downloadFileGroupRequest.downloadConditionsOptional().isPresent()
-              ? Optional.of(
-                  ProtoConversionUtil.convert(
-                      downloadFileGroupRequest.downloadConditionsOptional().get()))
-              : Optional.absent();
-    } catch (InvalidProtocolBufferException e) {
-      return immediateFailedFuture(e);
-    }
-
-    // Get the key used for the download future map
-    ForegroundDownloadKey downloadKey =
-        ForegroundDownloadKey.ofFileGroup(
-            downloadFileGroupRequest.groupName(),
-            downloadFileGroupRequest.accountOptional(),
-            downloadFileGroupRequest.variantIdOptional());
-
-    // Create a ListenableFutureTask to delay starting the downloadFuture until we can add the
-    // future to our map.
-    ListenableFutureTask<Void> startTask = ListenableFutureTask.create(() -> null);
-    ListenableFuture<ClientFileGroup> downloadFuture =
-        PropagatedFluentFuture.from(startTask)
-            .transformAsync(
-                unused ->
-                    mobileDataDownloadManager.downloadFileGroup(
-                        groupKey, downloadConditions, customFileGroupValidator),
-                sequentialControlExecutor)
-            .transformAsync(
-                dataFileGroup ->
-                    createClientFileGroup(
-                        dataFileGroup,
-                        downloadFileGroupRequest.accountOptional().isPresent()
-                            ? AccountUtil.serialize(
-                                downloadFileGroupRequest.accountOptional().get())
-                            : null,
-                        ClientFileGroup.Status.DOWNLOADED,
-                        downloadFileGroupRequest.preserveZipDirectories(),
-                        downloadFileGroupRequest.verifyIsolatedStructure(),
-                        mobileDataDownloadManager,
-                        sequentialControlExecutor,
-                        fileStorage),
-                sequentialControlExecutor)
-            .transform(Preconditions::checkNotNull, sequentialControlExecutor);
-
-    // Get a handle on the download task so we can get the CFG during transforms
-    PropagatedFluentFuture<ClientFileGroup> downloadTaskFuture =
-        PropagatedFluentFuture.from(downloadFutureMap.add(downloadKey.toString(), downloadFuture))
-            .transformAsync(
-                unused -> {
-                  // Now that the download future is added, start the task and return the future
-                  startTask.run();
-                  return downloadFuture;
                 },
                 sequentialControlExecutor);
+    }
 
-    ListenableFuture<ClientFileGroup> transformFuture =
-        downloadTaskFuture
-            .transformAsync(
-                unused -> downloadFutureMap.remove(downloadKey.toString()),
-                sequentialControlExecutor)
-            .transformAsync(
-                unused -> {
-                  ClientFileGroup clientFileGroup = getDone(downloadTaskFuture);
-
-                  if (downloadFileGroupRequest.listenerOptional().isPresent()) {
-                    try {
-                      downloadFileGroupRequest.listenerOptional().get().onComplete(clientFileGroup);
-                    } catch (Exception e) {
-                      LogUtil.w(
-                          e,
-                          "%s: Listener onComplete failed for group %s",
-                          TAG,
-                          clientFileGroup.getGroupName());
-                    }
-                    if (downloadMonitorOptional.isPresent()) {
-                      downloadMonitorOptional.get().removeDownloadListener(groupName);
-                    }
-                  }
-                  return immediateFuture(clientFileGroup);
-                },
+    @Override
+    public ListenableFuture<RemoveFileGroupsByFilterResponse> removeFileGroupsByFilter(
+            RemoveFileGroupsByFilterRequest removeFileGroupsByFilterRequest) {
+        return futureSerializer.submitAsync(
+                () ->
+                        PropagatedFluentFuture.from(mobileDataDownloadManager.getAllFreshGroups())
+                                .transformAsync(
+                                        allFreshGroupKeyAndGroups -> {
+                                            ImmutableSet.Builder<GroupKey>
+                                                    groupKeysToRemoveBuilder =
+                                                    ImmutableSet.builder();
+                                            for (GroupKeyAndGroup groupKeyAndGroup :
+                                                    allFreshGroupKeyAndGroups) {
+                                                if (applyRemoveFileGroupsFilter(
+                                                        removeFileGroupsByFilterRequest,
+                                                        groupKeyAndGroup)) {
+                                                    // Remove downloaded status so
+                                                    // pending/downloaded versions of the same
+                                                    // group are treated as one.
+                                                    groupKeysToRemoveBuilder.add(
+                                                            groupKeyAndGroup.groupKey().toBuilder().clearDownloaded().build());
+                                                }
+                                            }
+                                            ImmutableSet<GroupKey> groupKeysToRemove =
+                                                    groupKeysToRemoveBuilder.build();
+                                            if (groupKeysToRemove.isEmpty()) {
+                                                return immediateFuture(
+                                                        RemoveFileGroupsByFilterResponse.newBuilder()
+                                                                .setRemovedFileGroupsCount(0)
+                                                                .build());
+                                            }
+                                            return PropagatedFutures.transform(
+                                                    mobileDataDownloadManager.removeFileGroups(
+                                                            groupKeysToRemove.asList()),
+                                                    unused ->
+                                                            RemoveFileGroupsByFilterResponse.newBuilder()
+                                                                    .setRemovedFileGroupsCount(
+                                                                            groupKeysToRemove.size())
+                                                                    .build(),
+                                                    sequentialControlExecutor);
+                                        },
+                                        sequentialControlExecutor),
                 sequentialControlExecutor);
-
-    PropagatedFutures.addCallback(
-        transformFuture,
-        new FutureCallback<ClientFileGroup>() {
-          @Override
-          public void onSuccess(ClientFileGroup result) {}
-
-          @Override
-          public void onFailure(Throwable t) {
-            if (downloadFileGroupRequest.listenerOptional().isPresent()) {
-              downloadFileGroupRequest.listenerOptional().get().onFailure(t);
-
-              if (downloadMonitorOptional.isPresent()) {
-                downloadMonitorOptional.get().removeDownloadListener(groupName);
-              }
-            }
-
-            // Remove future from map
-            ListenableFuture<Void> unused = downloadFutureMap.remove(downloadKey.toString());
-          }
-        },
-        sequentialControlExecutor);
-
-    return transformFuture;
-  }
-
-  @Override
-  public ListenableFuture<Void> downloadFileWithForegroundService(
-      SingleFileDownloadRequest singleFileDownloadRequest) {
-    return singleFileDownloader.downloadWithForegroundService(
-        MddLiteConversionUtil.convertToDownloadRequest(singleFileDownloadRequest));
-  }
-
-  @Override
-  public ListenableFuture<ClientFileGroup> downloadFileGroupWithForegroundService(
-      DownloadFileGroupRequest downloadFileGroupRequest) {
-    LogUtil.d("%s: downloadFileGroupWithForegroundService start.", TAG);
-    if (!foregroundDownloadServiceClassOptional.isPresent()) {
-      return immediateFailedFuture(
-          new IllegalArgumentException(
-              "downloadFileGroupWithForegroundService: ForegroundDownloadService is not"
-                  + " provided!"));
     }
 
-    if (!downloadMonitorOptional.isPresent()) {
-      return immediateFailedFuture(
-          DownloadException.builder()
-              .setDownloadResultCode(DownloadResultCode.DOWNLOAD_MONITOR_NOT_PROVIDED_ERROR)
-              .setMessage(
-                  "downloadFileGroupWithForegroundService: Download Monitor is not provided!")
-              .build());
+    // Perform filtering using options from RemoveFileGroupsByFilterRequest
+    private static boolean applyRemoveFileGroupsFilter(
+            RemoveFileGroupsByFilterRequest removeFileGroupsByFilterRequest,
+            GroupKeyAndGroup groupKeyAndGroup) {
+        // If request filters by account, ensure account is present and is equal
+        Optional<Account> accountOptional = removeFileGroupsByFilterRequest.accountOptional();
+        if (!accountOptional.isPresent() && groupKeyAndGroup.groupKey().hasAccount()) {
+            // Account must explicitly be provided in order to remove account associated file
+            // groups.
+            return false;
+        }
+        if (accountOptional.isPresent()
+                && !AccountUtil.serialize(accountOptional.get())
+                .equals(groupKeyAndGroup.groupKey().getAccount())) {
+            return false;
+        }
+
+        return true;
     }
 
-    // Submit the call to sequentialControlExecutor, but don't use futureSerializer. This will
-    // ensure that multiple calls are enqueued to the executor in a FIFO order, but these calls
-    // won't block each other when the download is in progress.
-    return PropagatedFutures.submitAsync(
-        () ->
-            PropagatedFutures.transformAsync(
-                // Check if requested file group has already been downloaded
-                getDownloadGroupState(downloadFileGroupRequest),
-                downloadGroupState -> {
-                  switch (downloadGroupState.getKind()) {
-                    case IN_PROGRESS_FUTURE:
-                      // If the file group download is in progress, return that future immediately
-                      return downloadGroupState.inProgressFuture();
-                    case DOWNLOADED_GROUP:
-                      // If the file group is already downloaded, return that immediately
-                      return immediateFuture(downloadGroupState.downloadedGroup());
-                    case PENDING_GROUP:
-                      return downloadPendingFileGroupWithForegroundService(
-                          downloadFileGroupRequest, downloadGroupState.pendingGroup());
-                  }
-                  throw new AssertionError(
-                      String.format(
-                          "received unsupported DownloadGroupState kind %s",
-                          downloadGroupState.getKind()));
-                },
-                sequentialControlExecutor),
-        sequentialControlExecutor);
-  }
+    /**
+     * Helper function to create {@link DataDownloadFileGroupStats} object from {@link
+     * GetFileGroupRequest} for getFileGroup() logging.
+     *
+     * <p>Used when the matching file group is not found or a failure occurred.
+     * file_group_version_number and build_id are set to -1 by default.
+     */
+    private DataDownloadFileGroupStats createFileGroupStatsFromGetFileGroupRequest(
+            GetFileGroupRequest getFileGroupRequest) {
+        DataDownloadFileGroupStats.Builder fileGroupStatsBuilder =
+                DataDownloadFileGroupStats.newBuilder();
+        fileGroupStatsBuilder.setFileGroupName(getFileGroupRequest.groupName());
+        if (getFileGroupRequest.variantIdOptional().isPresent()) {
+            fileGroupStatsBuilder.setVariantId(getFileGroupRequest.variantIdOptional().get());
+        }
+        if (getFileGroupRequest.accountOptional().isPresent()) {
+            fileGroupStatsBuilder.setHasAccount(true);
+        } else {
+            fileGroupStatsBuilder.setHasAccount(false);
+        }
 
-  /**
-   * Helper method to download a file group in the foreground after it has been confirmed to be
-   * pending.
-   */
-  private ListenableFuture<ClientFileGroup> downloadPendingFileGroupWithForegroundService(
-      DownloadFileGroupRequest downloadFileGroupRequest, DataFileGroupInternal pendingGroup) {
-    // It's OK to recreate the NotificationChannel since it can also be used to restore a
-    // deleted channel and to update an existing channel's name, description, group, and/or
-    // importance.
-    NotificationUtil.createNotificationChannel(context);
+        fileGroupStatsBuilder.setFileGroupVersionNumber(
+                MddConstants.FILE_GROUP_NOT_FOUND_FILE_GROUP_VERSION_NUMBER);
+        fileGroupStatsBuilder.setBuildId(MddConstants.FILE_GROUP_NOT_FOUND_BUILD_ID);
 
-    String groupName = downloadFileGroupRequest.groupName();
-    GroupKey.Builder groupKeyBuilder =
-        GroupKey.newBuilder().setGroupName(groupName).setOwnerPackage(context.getPackageName());
-
-    if (downloadFileGroupRequest.accountOptional().isPresent()) {
-      groupKeyBuilder.setAccount(
-          AccountUtil.serialize(downloadFileGroupRequest.accountOptional().get()));
-    }
-    if (downloadFileGroupRequest.variantIdOptional().isPresent()) {
-      groupKeyBuilder.setVariantId(downloadFileGroupRequest.variantIdOptional().get());
+        return fileGroupStatsBuilder.build();
     }
 
-    GroupKey groupKey = groupKeyBuilder.build();
-    ForegroundDownloadKey foregroundDownloadKey =
-        ForegroundDownloadKey.ofFileGroup(
-            groupName,
-            downloadFileGroupRequest.accountOptional(),
-            downloadFileGroupRequest.variantIdOptional());
+    // TODO: Futures.immediateFuture(null) uses a different annotation for Nullable.
+    @SuppressWarnings("nullness")
+    @Override
+    public ListenableFuture<ClientFileGroup> getFileGroup(GetFileGroupRequest getFileGroupRequest) {
+        long startTimeNs = timeSource.elapsedRealtimeNanos();
 
-    DownloadListener downloadListenerWithNotification =
-        createDownloadListenerWithNotification(downloadFileGroupRequest, pendingGroup);
-    // The downloadMonitor will trigger the DownloadListener.
-    downloadMonitorOptional
-        .get()
-        .addDownloadListener(
-            downloadFileGroupRequest.groupName(), downloadListenerWithNotification);
-
-    Optional<DownloadConditions> downloadConditions;
-    try {
-      downloadConditions =
-          downloadFileGroupRequest.downloadConditionsOptional().isPresent()
-              ? Optional.of(
-                  ProtoConversionUtil.convert(
-                      downloadFileGroupRequest.downloadConditionsOptional().get()))
-              : Optional.absent();
-    } catch (InvalidProtocolBufferException e) {
-      return immediateFailedFuture(e);
-    }
-
-    // Create a ListenableFutureTask to delay starting the downloadFuture until we can add the
-    // future to our map.
-    ListenableFutureTask<Void> startTask = ListenableFutureTask.create(() -> null);
-    PropagatedFluentFuture<ClientFileGroup> downloadFileGroupFuture =
-        PropagatedFluentFuture.from(startTask)
-            .transformAsync(
-                unused ->
-                    mobileDataDownloadManager.downloadFileGroup(
-                        groupKey, downloadConditions, customFileGroupValidator),
-                sequentialControlExecutor)
-            .transformAsync(
-                dataFileGroup ->
-                    createClientFileGroup(
-                        dataFileGroup,
-                        downloadFileGroupRequest.accountOptional().isPresent()
-                            ? AccountUtil.serialize(
-                                downloadFileGroupRequest.accountOptional().get())
-                            : null,
-                        ClientFileGroup.Status.DOWNLOADED,
-                        downloadFileGroupRequest.preserveZipDirectories(),
-                        downloadFileGroupRequest.verifyIsolatedStructure(),
-                        mobileDataDownloadManager,
-                        sequentialControlExecutor,
-                        fileStorage),
-                sequentialControlExecutor)
-            .transform(Preconditions::checkNotNull, sequentialControlExecutor);
-
-    ListenableFuture<ClientFileGroup> transformFuture =
-        PropagatedFutures.transformAsync(
-            foregroundDownloadFutureMap.add(
-                foregroundDownloadKey.toString(), downloadFileGroupFuture),
-            unused -> {
-              // Now that the download future is added, start the task and return the future
-              startTask.run();
-              return downloadFileGroupFuture;
-            },
-            sequentialControlExecutor);
-
-    PropagatedFutures.addCallback(
-        transformFuture,
-        new FutureCallback<ClientFileGroup>() {
-          @Override
-          public void onSuccess(ClientFileGroup clientFileGroup) {
-            // Currently the MobStore monitor does not support onSuccess so we have to add
-            // callback to the download future here.
-            try {
-              downloadListenerWithNotification.onComplete(clientFileGroup);
-            } catch (Exception e) {
-              LogUtil.w(
-                  e,
-                  "%s: Listener onComplete failed for group %s",
-                  TAG,
-                  clientFileGroup.getGroupName());
-            }
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-            // Currently the MobStore monitor does not support onFailure so we have to add
-            // callback to the download future here.
-            downloadListenerWithNotification.onFailure(t);
-          }
-        },
-        sequentialControlExecutor);
-
-    return transformFuture;
-  }
-
-  /** Helper method to return a {@link DownloadGroupState} for the given request. */
-  private ListenableFuture<DownloadGroupState> getDownloadGroupState(
-      DownloadFileGroupRequest downloadFileGroupRequest) {
-    ForegroundDownloadKey foregroundDownloadKey =
-        ForegroundDownloadKey.ofFileGroup(
-            downloadFileGroupRequest.groupName(),
-            downloadFileGroupRequest.accountOptional(),
-            downloadFileGroupRequest.variantIdOptional());
-
-    String groupName = downloadFileGroupRequest.groupName();
-    GroupKey.Builder groupKeyBuilder =
-        GroupKey.newBuilder().setGroupName(groupName).setOwnerPackage(context.getPackageName());
-
-    if (downloadFileGroupRequest.accountOptional().isPresent()) {
-      groupKeyBuilder.setAccount(
-          AccountUtil.serialize(downloadFileGroupRequest.accountOptional().get()));
-    }
-
-    if (downloadFileGroupRequest.variantIdOptional().isPresent()) {
-      groupKeyBuilder.setVariantId(downloadFileGroupRequest.variantIdOptional().get());
-    }
-
-    boolean isDownloadListenerPresent = downloadFileGroupRequest.listenerOptional().isPresent();
-    GroupKey groupKey = groupKeyBuilder.build();
-
-    return futureSerializer.submitAsync(
-        () -> {
-          ListenableFuture<Optional<ListenableFuture<ClientFileGroup>>>
-              foregroundDownloadFutureOptional =
-                  foregroundDownloadFutureMap.get(foregroundDownloadKey.toString());
-          ListenableFuture<Optional<ListenableFuture<ClientFileGroup>>>
-              backgroundDownloadFutureOptional =
-                  downloadFutureMap.get(foregroundDownloadKey.toString());
-
-          return PropagatedFutures.whenAllSucceed(
-                  foregroundDownloadFutureOptional, backgroundDownloadFutureOptional)
-              .callAsync(
-                  () -> {
-                    if (getDone(foregroundDownloadFutureOptional).isPresent()) {
-                      return immediateFuture(
-                          DownloadGroupState.ofInProgressFuture(
-                              getDone(foregroundDownloadFutureOptional).get()));
-                    } else if (getDone(backgroundDownloadFutureOptional).isPresent()) {
-                      return immediateFuture(
-                          DownloadGroupState.ofInProgressFuture(
-                              getDone(backgroundDownloadFutureOptional).get()));
-                    }
-
-                    // Get pending and downloaded versions to tell if we should return downloaded
-                    // version early
-                    ListenableFuture<GroupPair> fileGroupVersionsFuture =
-                        PropagatedFutures.transformAsync(
-                            mobileDataDownloadManager.getFileGroup(
-                                groupKey, /* downloaded= */ false),
-                            pendingDataFileGroup ->
-                                PropagatedFutures.transform(
+        ListenableFuture<ClientFileGroup> resultFuture =
+                futureSerializer.submitAsync(
+                        () -> {
+                            GroupKey groupKey =
+                                    createGroupKey(
+                                            getFileGroupRequest.groupName(),
+                                            getFileGroupRequest.accountOptional(),
+                                            getFileGroupRequest.variantIdOptional());
+                            return PropagatedFutures.transformAsync(
                                     mobileDataDownloadManager.getFileGroup(
-                                        groupKey, /* downloaded= */ true),
-                                    downloadedDataFileGroup ->
-                                        GroupPair.create(
-                                            pendingDataFileGroup, downloadedDataFileGroup),
-                                    sequentialControlExecutor),
-                            sequentialControlExecutor);
-
-                    return PropagatedFutures.transformAsync(
-                        fileGroupVersionsFuture,
-                        fileGroupVersionsPair -> {
-                          // if pending version is not null, return pending version
-                          if (fileGroupVersionsPair.pendingGroup() != null) {
-                            return immediateFuture(
-                                DownloadGroupState.ofPendingGroup(
-                                    checkNotNull(fileGroupVersionsPair.pendingGroup())));
-                          }
-                          // If both groups are null, return group not found failure
-                          if (fileGroupVersionsPair.downloadedGroup() == null) {
-                            // TODO(b/174808410): Add Logging
-                            // file group is not pending nor downloaded -- return failure.
-                            DownloadException failure =
-                                DownloadException.builder()
-                                    .setDownloadResultCode(DownloadResultCode.GROUP_NOT_FOUND_ERROR)
-                                    .setMessage(
-                                        "Nothing to download for file group: "
-                                            + groupKey.getGroupName())
-                                    .build();
-                            if (isDownloadListenerPresent) {
-                              downloadFileGroupRequest.listenerOptional().get().onFailure(failure);
-                            }
-                            return immediateFailedFuture(failure);
-                          }
-
-                          DataFileGroupInternal downloadedDataFileGroup =
-                              checkNotNull(fileGroupVersionsPair.downloadedGroup());
-
-                          // Notify download listener (if present) that file group has been
-                          // downloaded.
-                          if (isDownloadListenerPresent) {
-                            downloadMonitorOptional
-                                .get()
-                                .addDownloadListener(
-                                    downloadFileGroupRequest.groupName(),
-                                    downloadFileGroupRequest.listenerOptional().get());
-                          }
-                          PropagatedFluentFuture<ClientFileGroup> transformFuture =
-                              PropagatedFluentFuture.from(
-                                      createClientFileGroup(
-                                          downloadedDataFileGroup,
-                                          downloadFileGroupRequest.accountOptional().isPresent()
-                                              ? AccountUtil.serialize(
-                                                  downloadFileGroupRequest.accountOptional().get())
-                                              : null,
-                                          ClientFileGroup.Status.DOWNLOADED,
-                                          downloadFileGroupRequest.preserveZipDirectories(),
-                                          downloadFileGroupRequest.verifyIsolatedStructure(),
-                                          mobileDataDownloadManager,
-                                          sequentialControlExecutor,
-                                          fileStorage))
-                                  .transform(Preconditions::checkNotNull, sequentialControlExecutor)
-                                  .transform(
-                                      clientFileGroup -> {
-                                        if (isDownloadListenerPresent) {
-                                          try {
-                                            downloadFileGroupRequest
-                                                .listenerOptional()
-                                                .get()
-                                                .onComplete(clientFileGroup);
-                                          } catch (Exception e) {
-                                            LogUtil.w(
-                                                e,
-                                                "%s: Listener onComplete failed for group %s",
-                                                TAG,
-                                                clientFileGroup.getGroupName());
-                                          }
-                                          downloadMonitorOptional
-                                              .get()
-                                              .removeDownloadListener(groupName);
-                                        }
-                                        return clientFileGroup;
-                                      },
-                                      sequentialControlExecutor);
-                          transformFuture.addCallback(
-                              new FutureCallback<ClientFileGroup>() {
-                                @Override
-                                public void onSuccess(ClientFileGroup result) {}
-
-                                @Override
-                                public void onFailure(Throwable t) {
-                                  if (isDownloadListenerPresent) {
-                                    downloadMonitorOptional.get().removeDownloadListener(groupName);
-                                  }
-                                }
-                              },
-                              sequentialControlExecutor);
-
-                          // Use directExecutor here since we are performing a trivial operation.
-                          return transformFuture.transform(
-                              DownloadGroupState::ofDownloadedGroup, directExecutor());
+                                            groupKey, /* downloaded= */ true),
+                                    dataFileGroup ->
+                                            createClientFileGroupAndLogQueryStats(
+                                                    groupKey,
+                                                    dataFileGroup,
+                                                    /* downloaded= */ true,
+                                                    getFileGroupRequest.preserveZipDirectories(),
+                                                    getFileGroupRequest.verifyIsolatedStructure()),
+                                    sequentialControlExecutor);
                         },
                         sequentialControlExecutor);
-                  },
-                  sequentialControlExecutor);
-        },
-        sequentialControlExecutor);
-  }
 
-  private DownloadListener createDownloadListenerWithNotification(
-      DownloadFileGroupRequest downloadRequest, DataFileGroupInternal fileGroup) {
-
-    String networkPausedMessage = getNetworkPausedMessage(downloadRequest, fileGroup);
-
-    NotificationManagerCompat notificationManager = NotificationManagerCompat.from(context);
-    ForegroundDownloadKey foregroundDownloadKey =
-        ForegroundDownloadKey.ofFileGroup(
-            downloadRequest.groupName(),
-            downloadRequest.accountOptional(),
-            downloadRequest.variantIdOptional());
-
-    NotificationCompat.Builder notification =
-        NotificationUtil.createNotificationBuilder(
-            context,
-            downloadRequest.groupSizeBytes(),
-            downloadRequest.contentTitleOptional().or(downloadRequest.groupName()),
-            downloadRequest.contentTextOptional().or(downloadRequest.groupName()));
-    int notificationKey = NotificationUtil.notificationKeyForKey(downloadRequest.groupName());
-
-    if (downloadRequest.showNotifications() == DownloadFileGroupRequest.ShowNotifications.ALL) {
-      NotificationUtil.createCancelAction(
-          context,
-          foregroundDownloadServiceClassOptional.get(),
-          foregroundDownloadKey.toString(),
-          notification,
-          notificationKey);
-
-      notificationManager.notify(notificationKey, notification.build());
+        attachMddApiLogging(
+                MddLibApiName.Code.GET_FILE_GROUP,
+                resultFuture,
+                startTimeNs,
+                createFileGroupStatsFromGetFileGroupRequest(getFileGroupRequest),
+                /* statsCreator= */ result -> createFileGroupDetails(result),
+                /* resultCodeGetter= */ unused -> MddLibApiResult.Code.RESULT_SUCCESS);
+        return resultFuture;
     }
 
-    return new DownloadListener() {
-      @Override
-      public void onProgress(long currentSize) {
-        // TODO(b/229123693): return this future once DownloadListener has an async api.
-        // There can be a race condition, where onProgress can be called
-        // after onComplete or onFailure which removes the future and the notification.
-        // Check foregroundDownloadFutureMap first before updating notification.
-        ListenableFuture<?> unused =
-            PropagatedFutures.transformAsync(
-                foregroundDownloadFutureMap.containsKey(foregroundDownloadKey.toString()),
-                futureInProgress -> {
-                  if (futureInProgress
-                      && downloadRequest.showNotifications()
-                          == DownloadFileGroupRequest.ShowNotifications.ALL) {
-                    notification
-                        .setCategory(NotificationCompat.CATEGORY_PROGRESS)
-                        .setSmallIcon(android.R.drawable.stat_sys_download)
-                        .setProgress(
-                            downloadRequest.groupSizeBytes(),
-                            (int) currentSize,
-                            /* indeterminate= */ downloadRequest.groupSizeBytes() <= 0);
-                    notificationManager.notify(notificationKey, notification.build());
-                  }
-                  if (downloadRequest.listenerOptional().isPresent()) {
-                    downloadRequest.listenerOptional().get().onProgress(currentSize);
-                  }
-                  return immediateVoidFuture();
-                },
-                sequentialControlExecutor);
-      }
-
-      @Override
-      public void pausedForConnectivity() {
-        // TODO(b/229123693): return this future once DownloadListener has an async api.
-        // There can be a race condition, where pausedForConnectivity can be called
-        // after onComplete or onFailure which removes the future and the notification.
-        // Check foregroundDownloadFutureMap first before updating notification.
-        ListenableFuture<?> unused =
-            PropagatedFutures.transformAsync(
-                foregroundDownloadFutureMap.containsKey(foregroundDownloadKey.toString()),
-                futureInProgress -> {
-                  if (futureInProgress
-                      && downloadRequest.showNotifications()
-                          == DownloadFileGroupRequest.ShowNotifications.ALL) {
-                    notification
-                        .setCategory(NotificationCompat.CATEGORY_STATUS)
-                        .setContentText(networkPausedMessage)
-                        .setSmallIcon(android.R.drawable.stat_sys_download)
-                        .setOngoing(true)
-                        // hide progress bar.
-                        .setProgress(0, 0, false);
-                    notificationManager.notify(notificationKey, notification.build());
-                  }
-                  if (downloadRequest.listenerOptional().isPresent()) {
-                    downloadRequest.listenerOptional().get().pausedForConnectivity();
-                  }
-                  return immediateVoidFuture();
-                },
-                sequentialControlExecutor);
-      }
-
-      @Override
-      public void onComplete(ClientFileGroup clientFileGroup) {
-        // TODO(b/229123693): return this future once DownloadListener has an async api.
-        ListenableFuture<?> unused =
-            PropagatedFutures.submitAsync(
-                () -> {
-                  boolean onCompleteFailed = false;
-                  if (downloadRequest.listenerOptional().isPresent()) {
-                    try {
-                      downloadRequest.listenerOptional().get().onComplete(clientFileGroup);
-                    } catch (Exception e) {
-                      LogUtil.w(
-                          e,
-                          "%s: Delegate onComplete failed for group %s, showing failure"
-                              + " notification.",
-                          TAG,
-                          clientFileGroup.getGroupName());
-                      onCompleteFailed = true;
-                    }
-                  }
-
-                  // Clear the notification action.
-                  if (downloadRequest.showNotifications()
-                      == DownloadFileGroupRequest.ShowNotifications.ALL) {
-                    notification.mActions.clear();
-
-                    if (onCompleteFailed) {
-                      // Show download failed in notification.
-                      notification
-                          .setCategory(NotificationCompat.CATEGORY_STATUS)
-                          .setContentText(NotificationUtil.getDownloadFailedMessage(context))
-                          .setOngoing(false)
-                          .setSmallIcon(android.R.drawable.stat_sys_warning)
-                          // hide progress bar.
-                          .setProgress(0, 0, false);
-
-                      notificationManager.notify(notificationKey, notification.build());
-                    } else {
-                      NotificationUtil.cancelNotificationForKey(
-                          context, downloadRequest.groupName());
-                    }
-                  }
-
-                  downloadMonitorOptional.get().removeDownloadListener(downloadRequest.groupName());
-
-                  return foregroundDownloadFutureMap.remove(foregroundDownloadKey.toString());
-                },
-                sequentialControlExecutor);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        // TODO(b/229123693): return this future once DownloadListener has an async api.
-        ListenableFuture<?> unused =
-            PropagatedFutures.submitAsync(
-                () -> {
-                  if (downloadRequest.showNotifications()
-                      == DownloadFileGroupRequest.ShowNotifications.ALL) {
-                    // Clear the notification action.
-                    notification.mActions.clear();
-
-                    // Show download failed in notification.
-                    notification
-                        .setCategory(NotificationCompat.CATEGORY_STATUS)
-                        .setContentText(NotificationUtil.getDownloadFailedMessage(context))
-                        .setOngoing(false)
-                        .setSmallIcon(android.R.drawable.stat_sys_warning)
-                        // hide progress bar.
-                        .setProgress(0, 0, false);
-
-                    notificationManager.notify(notificationKey, notification.build());
-                  }
-
-                  if (downloadRequest.listenerOptional().isPresent()) {
-                    downloadRequest.listenerOptional().get().onFailure(t);
-                  }
-                  downloadMonitorOptional.get().removeDownloadListener(downloadRequest.groupName());
-
-                  return foregroundDownloadFutureMap.remove(foregroundDownloadKey.toString());
-                },
-                sequentialControlExecutor);
-      }
-    };
-  }
-
-  // Helper method to get the correct network paused message
-  private String getNetworkPausedMessage(
-      DownloadFileGroupRequest downloadRequest, DataFileGroupInternal fileGroup) {
-    DeviceNetworkPolicy networkPolicyForDownload =
-        fileGroup.getDownloadConditions().getDeviceNetworkPolicy();
-    if (downloadRequest.downloadConditionsOptional().isPresent()) {
-      try {
-        networkPolicyForDownload =
-            ProtoConversionUtil.convert(downloadRequest.downloadConditionsOptional().get())
-                .getDeviceNetworkPolicy();
-      } catch (InvalidProtocolBufferException unused) {
-        // Do nothing -- we will rely on the file group's network policy.
-      }
-    }
-
-    switch (networkPolicyForDownload) {
-      case DOWNLOAD_FIRST_ON_WIFI_THEN_ON_ANY_NETWORK: // fallthrough
-      case DOWNLOAD_ONLY_ON_WIFI:
-        return NotificationUtil.getDownloadPausedWifiMessage(context);
-      default:
-        return NotificationUtil.getDownloadPausedMessage(context);
-    }
-  }
-
-  @Override
-  public void cancelForegroundDownload(String downloadKey) {
-    LogUtil.d("%s: CancelForegroundDownload for key = %s", TAG, downloadKey);
-    ListenableFuture<?> unused =
-        PropagatedFutures.transformAsync(
-            foregroundDownloadFutureMap.get(downloadKey),
-            downloadFuture -> {
-              if (downloadFuture.isPresent()) {
-                LogUtil.v(
-                    "%s: CancelForegroundDownload future found for key = %s, cancelling...",
-                    TAG, downloadKey);
-                downloadFuture.get().cancel(false);
-              }
-              return immediateVoidFuture();
-            },
-            sequentialControlExecutor);
-    // Attempt cancel with internal MDD Lite instance in case it's a single file uri (cancel call is
-    // a noop if internal MDD Lite doesn't know about it).
-    singleFileDownloader.cancelForegroundDownload(downloadKey);
-  }
-
-  @Override
-  public void schedulePeriodicTasks() {
-    schedulePeriodicTasksInternal(Optional.absent());
-  }
-
-  @Override
-  public ListenableFuture<Void> schedulePeriodicBackgroundTasks() {
-    return futureSerializer.submit(
-        () -> {
-          schedulePeriodicTasksInternal(/* constraintOverridesMap= */ Optional.absent());
-          return null;
-        },
-        sequentialControlExecutor);
-  }
-
-  @Override
-  public ListenableFuture<Void> schedulePeriodicBackgroundTasks(
-      Optional<Map<String, ConstraintOverrides>> constraintOverridesMap) {
-    return futureSerializer.submit(
-        () -> {
-          schedulePeriodicTasksInternal(constraintOverridesMap);
-          return null;
-        },
-        sequentialControlExecutor);
-  }
-
-  private void schedulePeriodicTasksInternal(
-      Optional<Map<String, ConstraintOverrides>> constraintOverridesMap) {
-    if (!taskSchedulerOptional.isPresent()) {
-      LogUtil.e(
-          "%s: Called schedulePeriodicTasksInternal when taskScheduler is not provided.", TAG);
-      return;
-    }
-
-    TaskScheduler taskScheduler = taskSchedulerOptional.get();
-
-    // Schedule task that runs on charging without any network, every 6 hours.
-    taskScheduler.schedulePeriodicTask(
-        TaskScheduler.CHARGING_PERIODIC_TASK,
-        flags.chargingGcmTaskPeriod(),
-        NetworkState.NETWORK_STATE_ANY,
-        getConstraintOverrides(constraintOverridesMap, TaskScheduler.CHARGING_PERIODIC_TASK));
-
-    // Schedule maintenance task that runs on charging, once every day.
-    // This task should run even if mdd is disabled, to handle cleanup.
-    taskScheduler.schedulePeriodicTask(
-        TaskScheduler.MAINTENANCE_PERIODIC_TASK,
-        flags.maintenanceGcmTaskPeriod(),
-        NetworkState.NETWORK_STATE_ANY,
-        getConstraintOverrides(constraintOverridesMap, TaskScheduler.MAINTENANCE_PERIODIC_TASK));
-
-    // Schedule task that runs on cellular+charging, every 6 hours.
-    taskScheduler.schedulePeriodicTask(
-        TaskScheduler.CELLULAR_CHARGING_PERIODIC_TASK,
-        flags.cellularChargingGcmTaskPeriod(),
-        NetworkState.NETWORK_STATE_CONNECTED,
-        getConstraintOverrides(
-            constraintOverridesMap, TaskScheduler.CELLULAR_CHARGING_PERIODIC_TASK));
-
-    // Schedule task that runs on wifi+charging, every 6 hours.
-    taskScheduler.schedulePeriodicTask(
-        TaskScheduler.WIFI_CHARGING_PERIODIC_TASK,
-        flags.wifiChargingGcmTaskPeriod(),
-        NetworkState.NETWORK_STATE_UNMETERED,
-        getConstraintOverrides(constraintOverridesMap, TaskScheduler.WIFI_CHARGING_PERIODIC_TASK));
-  }
-
-  private static Optional<ConstraintOverrides> getConstraintOverrides(
-      Optional<Map<String, ConstraintOverrides>> constraintOverridesMap,
-      String maintenancePeriodicTask) {
-    return constraintOverridesMap.isPresent()
-        ? Optional.fromNullable(constraintOverridesMap.get().get(maintenancePeriodicTask))
-        : Optional.absent();
-  }
-
-  @Override
-  public ListenableFuture<Void> cancelPeriodicBackgroundTasks() {
-    return futureSerializer.submit(
-        () -> {
-          cancelPeriodicTasksInternal();
-          return null;
-        },
-        sequentialControlExecutor);
-  }
-
-  private void cancelPeriodicTasksInternal() {
-    if (!taskSchedulerOptional.isPresent()) {
-      LogUtil.w("%s: Called cancelPeriodicTasksInternal when taskScheduler is not provided.", TAG);
-      return;
-    }
-
-    TaskScheduler taskScheduler = taskSchedulerOptional.get();
-
-    taskScheduler.cancelPeriodicTask(TaskScheduler.CHARGING_PERIODIC_TASK);
-    taskScheduler.cancelPeriodicTask(TaskScheduler.MAINTENANCE_PERIODIC_TASK);
-    taskScheduler.cancelPeriodicTask(TaskScheduler.CELLULAR_CHARGING_PERIODIC_TASK);
-    taskScheduler.cancelPeriodicTask(TaskScheduler.WIFI_CHARGING_PERIODIC_TASK);
-  }
-
-  @Override
-  public ListenableFuture<Void> handleTask(String tag) {
-    // All work done here that touches metadata (MobileDataDownloadManager) should be serialized
-    // through sequentialControlExecutor.
-    switch (tag) {
-      case TaskScheduler.MAINTENANCE_PERIODIC_TASK:
+    @SuppressWarnings("nullness")
+    @Override
+    public ListenableFuture<DataFileGroup> readDataFileGroup(
+            ReadDataFileGroupRequest readDataFileGroupRequest) {
         return futureSerializer.submitAsync(
-            mobileDataDownloadManager::maintenance, sequentialControlExecutor);
-
-      case TaskScheduler.CHARGING_PERIODIC_TASK:
-        ListenableFuture<Void> refreshFileGroupsFuture = refreshFileGroups();
-        return PropagatedFutures.transformAsync(
-            refreshFileGroupsFuture,
-            propagateAsyncFunction(
-                v -> mobileDataDownloadManager.verifyAllPendingGroups(customFileGroupValidator)),
-            sequentialControlExecutor);
-
-      case TaskScheduler.CELLULAR_CHARGING_PERIODIC_TASK:
-        return refreshAndDownload(false /*onWifi*/);
-
-      case TaskScheduler.WIFI_CHARGING_PERIODIC_TASK:
-        return refreshAndDownload(true /*onWifi*/);
-
-      default:
-        LogUtil.d("%s: gcm task doesn't belong to MDD", TAG);
-        return immediateFailedFuture(
-            new IllegalArgumentException("Unknown task tag sent to MDD.handleTask() " + tag));
-    }
-  }
-
-  private ListenableFuture<Void> refreshAndDownload(boolean onWifi) {
-    // We will do 2 passes to support 2-step downloads. In each step, we will refresh and then
-    // download.
-    return PropagatedFluentFuture.from(refreshFileGroups())
-        .transformAsync(
-            v ->
-                mobileDataDownloadManager.downloadAllPendingGroups(
-                    onWifi, customFileGroupValidator),
-            sequentialControlExecutor)
-        .transformAsync(v -> refreshFileGroups(), sequentialControlExecutor)
-        .transformAsync(
-            v ->
-                mobileDataDownloadManager.downloadAllPendingGroups(
-                    onWifi, customFileGroupValidator),
-            sequentialControlExecutor);
-  }
-
-  private ListenableFuture<Void> refreshFileGroups() {
-    List<ListenableFuture<Void>> refreshFutures = new ArrayList<>();
-    for (FileGroupPopulator fileGroupPopulator : fileGroupPopulatorList) {
-      refreshFutures.add(fileGroupPopulator.refreshFileGroups(this));
+                () -> {
+                    GroupKey groupKey =
+                            createGroupKey(
+                                    readDataFileGroupRequest.groupName(),
+                                    readDataFileGroupRequest.accountOptional(),
+                                    readDataFileGroupRequest.variantIdOptional());
+                    return PropagatedFutures.transformAsync(
+                            mobileDataDownloadManager.getFileGroup(groupKey, /* downloaded= */
+                                    true),
+                            internalFileGroup -> {
+                                if (internalFileGroup == null) {
+                                    return immediateFailedFuture(
+                                            DownloadException.builder()
+                                                    .setDownloadResultCode(
+                                                            DownloadResultCode.GROUP_NOT_FOUND_ERROR)
+                                                    .setMessage("Requested group not found.")
+                                                    .build());
+                                }
+                                return immediateFuture(
+                                        ProtoConversionUtil.reverse(internalFileGroup));
+                            },
+                            sequentialControlExecutor);
+                },
+                sequentialControlExecutor);
     }
 
-    return PropagatedFutures.whenAllComplete(refreshFutures)
-        .call(() -> null, sequentialControlExecutor);
-  }
-
-  @Override
-  public ListenableFuture<Void> maintenance() {
-    return handleTask(TaskScheduler.MAINTENANCE_PERIODIC_TASK);
-  }
-
-  @Override
-  public ListenableFuture<Void> collectGarbage() {
-    return futureSerializer.submitAsync(
-        mobileDataDownloadManager::removeExpiredGroupsAndFiles, sequentialControlExecutor);
-  }
-
-  @Override
-  public ListenableFuture<Void> clear() {
-    return futureSerializer.submitAsync(
-        mobileDataDownloadManager::clear, sequentialControlExecutor);
-  }
-
-  // incompatible argument for parameter msg of e.
-  // incompatible types in return.
-  @Override
-  public String getDebugInfoAsString() {
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    PrintWriter writer = new PrintWriter(out);
-    try {
-      // Okay to block here because this method is for debugging only.
-      mobileDataDownloadManager.dump(writer).get(DUMP_DEBUG_INFO_TIMEOUT, TimeUnit.SECONDS);
-      writer.println("==== MOBSTORE_DEBUG_INFO ====");
-      writer.print(fileStorage.getDebugInfo());
-    } catch (ExecutionException | TimeoutException e) {
-      String errString = String.format("%s: Couldn't get debug info: %s", TAG, e);
-      LogUtil.e(errString);
-      return errString;
-    } catch (InterruptedException e) {
-      // see <internal>
-      Thread.currentThread().interrupt();
-      String errString = String.format("%s: Couldn't get debug info: %s", TAG, e);
-      LogUtil.e(errString);
-      return errString;
+    @Override
+    public ListenableFuture<ImmutableList<DataFileGroup>> readDataFileGroupsByFilter(
+            ReadDataFileGroupsByFilterRequest request) {
+        return futureSerializer.submitAsync(
+                () ->
+                        PropagatedFutures.transformAsync(
+                                mobileDataDownloadManager.getAllFreshGroups(),
+                                freshGroups -> {
+                                    ImmutableList<GroupKeyAndGroup> filteredGroups =
+                                            filterGroups(
+                                                    request.includeAllGroups(),
+                                                    request.groupNameOptional(),
+                                                    request.groupWithNoAccountOnly(),
+                                                    request.accountOptional(),
+                                                    request.downloadedOptional(),
+                                                    freshGroups);
+                                    ImmutableList.Builder<DataFileGroup> dataFileGroupsBuilder =
+                                            ImmutableList.<DataFileGroup>builder();
+                                    for (GroupKeyAndGroup keyAndGroup : filteredGroups) {
+                                        try {
+                                            dataFileGroupsBuilder.add(
+                                                    ProtoConversionUtil.reverse(
+                                                            keyAndGroup.dataFileGroup()));
+                                        } catch (InvalidProtocolBufferException e) {
+                                            return immediateFailedFuture(e);
+                                        }
+                                    }
+                                    return immediateFuture(dataFileGroupsBuilder.build());
+                                },
+                                sequentialControlExecutor),
+                sequentialControlExecutor);
     }
-    writer.flush();
-    return out.toString();
-  }
 
-  @Override
-  public ListenableFuture<Void> reportUsage(UsageEvent usageEvent) {
-    eventLogger.logMddUsageEvent(createFileGroupDetails(usageEvent.clientFileGroup()), null);
+    private GroupKey createGroupKey(
+            String groupName, Optional<Account> accountOptional, Optional<String> variantOptional) {
+        GroupKey.Builder groupKeyBuilder =
+                GroupKey.newBuilder().setGroupName(groupName).setOwnerPackage(
+                        context.getPackageName());
 
-    return immediateVoidFuture();
-  }
-
-  private static DownloadFutureMap.StateChangeCallbacks createCallbacksForForegroundService(
-      Context context, Optional<Class<?>> foregroundDownloadServiceClassOptional) {
-    return new DownloadFutureMap.StateChangeCallbacks() {
-      @Override
-      public void onAdd(String key, int newSize) {
-        // Only start foreground service if this is the first future we are adding.
-        if (newSize == 1 && foregroundDownloadServiceClassOptional.isPresent()) {
-          NotificationUtil.startForegroundDownloadService(
-              context, foregroundDownloadServiceClassOptional.get(), key);
+        if (accountOptional.isPresent()) {
+            groupKeyBuilder.setAccount(AccountUtil.serialize(accountOptional.get()));
         }
-      }
 
-      @Override
-      public void onRemove(String key, int newSize) {
-        // Only stop foreground service if there are no more futures remaining.
-        if (newSize == 0 && foregroundDownloadServiceClassOptional.isPresent()) {
-          NotificationUtil.stopForegroundDownloadService(
-              context, foregroundDownloadServiceClassOptional.get(), key);
+        if (variantOptional.isPresent()) {
+            groupKeyBuilder.setVariantId(variantOptional.get());
         }
-      }
-    };
-  }
+
+        return groupKeyBuilder.build();
+    }
+
+    private ListenableFuture<ClientFileGroup> createClientFileGroupAndLogQueryStats(
+            GroupKey groupKey,
+            @Nullable DataFileGroupInternal dataFileGroup,
+            boolean downloaded,
+            boolean preserveZipDirectories,
+            boolean verifyIsolatedStructure) {
+        return PropagatedFutures.transform(
+                createClientFileGroup(
+                        dataFileGroup,
+                        groupKey.hasAccount() ? groupKey.getAccount() : null,
+                        downloaded ? ClientFileGroup.Status.DOWNLOADED
+                                : ClientFileGroup.Status.PENDING,
+                        preserveZipDirectories,
+                        verifyIsolatedStructure,
+                        mobileDataDownloadManager,
+                        sequentialControlExecutor,
+                        fileStorage),
+                clientFileGroup -> {
+                    if (clientFileGroup != null) {
+                        eventLogger.logMddQueryStats(createFileGroupDetails(clientFileGroup));
+                    }
+                    return clientFileGroup;
+                },
+                sequentialControlExecutor);
+    }
+
+    @SuppressWarnings("nullness")
+    private static ListenableFuture<ClientFileGroup> createClientFileGroup(
+            @Nullable DataFileGroupInternal dataFileGroup,
+            @Nullable String account,
+            ClientFileGroup.Status status,
+            boolean preserveZipDirectories,
+            boolean verifyIsolatedStructure,
+            MobileDataDownloadManager manager,
+            Executor executor,
+            SynchronousFileStorage fileStorage) {
+        if (dataFileGroup == null) {
+            return immediateFuture(null);
+        }
+        ClientFileGroup.Builder clientFileGroupBuilder =
+                ClientFileGroup.newBuilder()
+                        .setGroupName(dataFileGroup.getGroupName())
+                        .setOwnerPackage(dataFileGroup.getOwnerPackage())
+                        .setVersionNumber(dataFileGroup.getFileGroupVersionNumber())
+                        .setBuildId(dataFileGroup.getBuildId())
+                        .setVariantId(dataFileGroup.getVariantId())
+                        .setStatus(status)
+                        .addAllLocale(dataFileGroup.getLocaleList());
+
+        if (account != null) {
+            clientFileGroupBuilder.setAccount(account);
+        }
+
+        if (dataFileGroup.hasCustomMetadata()) {
+            clientFileGroupBuilder.setCustomMetadata(dataFileGroup.getCustomMetadata());
+        }
+
+        List<DataFile> dataFiles = dataFileGroup.getFileList();
+        ListenableFuture<Void> addOnDeviceUrisFuture = immediateVoidFuture();
+        if (status == ClientFileGroup.Status.DOWNLOADED
+                || status == ClientFileGroup.Status.PENDING_CUSTOM_VALIDATION) {
+            addOnDeviceUrisFuture =
+                    PropagatedFluentFuture.from(
+                                    manager.getDataFileUris(dataFileGroup, verifyIsolatedStructure))
+                            .transformAsync(
+                                    dataFileUriMap -> {
+                                        for (DataFile dataFile : dataFiles) {
+                                            if (!dataFileUriMap.containsKey(dataFile)) {
+                                                return immediateFailedFuture(
+                                                        DownloadException.builder()
+                                                                .setDownloadResultCode(
+                                                                        DownloadResultCode.DOWNLOADED_FILE_NOT_FOUND_ERROR)
+                                                                .setMessage(
+                                                                        "getDataFileUris() "
+                                                                                + "resolved to null")
+                                                                .build());
+                                            }
+                                            Uri uri = dataFileUriMap.get(dataFile);
+
+                                            try {
+                                                if (!preserveZipDirectories
+                                                        && fileStorage.isDirectory(uri)) {
+                                                    String rootPath = uri.getPath();
+                                                    if (rootPath != null) {
+                                                        clientFileGroupBuilder.addAllFile(
+                                                                listAllClientFilesOfDirectory(
+                                                                        fileStorage, uri,
+                                                                        rootPath));
+                                                    }
+                                                } else {
+                                                    clientFileGroupBuilder.addFile(
+                                                            createClientFile(
+                                                                    dataFile.getFileId(),
+                                                                    dataFile.getByteSize(),
+                                                                    dataFile.getDownloadedFileByteSize(),
+                                                                    uri.toString(),
+                                                                    dataFile.hasCustomMetadata()
+                                                                            ?
+                                                                            dataFile.getCustomMetadata()
+                                                                            : null));
+                                                }
+                                            } catch (IOException e) {
+                                                LogUtil.e(e, "Failed to list files under directory:"
+                                                        + uri);
+                                            }
+                                        }
+                                        return immediateVoidFuture();
+                                    },
+                                    executor);
+        } else {
+            for (DataFile dataFile : dataFiles) {
+                clientFileGroupBuilder.addFile(
+                        createClientFile(
+                                dataFile.getFileId(),
+                                dataFile.getByteSize(),
+                                dataFile.getDownloadedFileByteSize(),
+                                /* uri= */ null,
+                                dataFile.hasCustomMetadata() ? dataFile.getCustomMetadata()
+                                        : null));
+            }
+        }
+
+        return PropagatedFluentFuture.from(addOnDeviceUrisFuture)
+                .transform(unused -> clientFileGroupBuilder.build(), executor)
+                .catching(DownloadException.class, exn -> null, executor);
+    }
+
+    private static ClientFile createClientFile(
+            String fileId,
+            int byteSize,
+            int downloadByteSize,
+            @Nullable String uri,
+            @Nullable Any customMetadata) {
+        ClientFile.Builder clientFileBuilder =
+                ClientFile.newBuilder().setFileId(fileId).setFullSizeInBytes(byteSize);
+        if (downloadByteSize > 0) {
+            // Files with downloaded transforms like compress and zip could have different
+            // downloaded
+            // file size than the final file size on disk. Return the downloaded file size for
+            // client to
+            // track and calculate the download progress.
+            clientFileBuilder.setDownloadSizeInBytes(downloadByteSize);
+        }
+        if (uri != null) {
+            clientFileBuilder.setFileUri(uri);
+        }
+        if (customMetadata != null) {
+            clientFileBuilder.setCustomMetadata(customMetadata);
+        }
+        return clientFileBuilder.build();
+    }
+
+    private static List<ClientFile> listAllClientFilesOfDirectory(
+            SynchronousFileStorage fileStorage, Uri dirUri, String rootDir) throws IOException {
+        List<ClientFile> clientFileList = new ArrayList<>();
+        for (Uri childUri : fileStorage.children(dirUri)) {
+            if (fileStorage.isDirectory(childUri)) {
+                clientFileList.addAll(
+                        listAllClientFilesOfDirectory(fileStorage, childUri, rootDir));
+            } else {
+                String childPath = childUri.getPath();
+                if (childPath != null) {
+                    ClientFile clientFile =
+                            ClientFile.newBuilder()
+                                    .setFileId(childPath.replaceFirst(rootDir, ""))
+                                    .setFullSizeInBytes((int) fileStorage.fileSize(childUri))
+                                    .setFileUri(childUri.toString())
+                                    .build();
+                    clientFileList.add(clientFile);
+                }
+            }
+        }
+        return clientFileList;
+    }
+
+    @Override
+    public ListenableFuture<ImmutableList<ClientFileGroup>> getFileGroupsByFilter(
+            GetFileGroupsByFilterRequest getFileGroupsByFilterRequest) {
+        return futureSerializer.submitAsync(
+                () ->
+                        PropagatedFutures.transformAsync(
+                                PropagatedFutures.transform(
+                                        mobileDataDownloadManager.getAllFreshGroups(),
+                                        allFreshGroups ->
+                                                filterGroups(
+                                                        getFileGroupsByFilterRequest.includeAllGroups(),
+                                                        getFileGroupsByFilterRequest.groupNameOptional(),
+                                                        getFileGroupsByFilterRequest.groupWithNoAccountOnly(),
+                                                        getFileGroupsByFilterRequest.accountOptional(),
+                                                        Optional.absent(),
+                                                        allFreshGroups),
+                                        sequentialControlExecutor),
+                                filteredGroupKeyAndGroups -> {
+                                    ListenableFuture<ImmutableList.Builder<ClientFileGroup>>
+                                            clientFileGroupsBuilderFuture =
+                                            immediateFuture(
+                                                    ImmutableList.<ClientFileGroup>builder());
+                                    for (GroupKeyAndGroup groupKeyAndGroup :
+                                     filteredGroupKeyAndGroups) {
+                                        clientFileGroupsBuilderFuture =
+                                                PropagatedFutures.transformAsync(
+                                                        clientFileGroupsBuilderFuture,
+                                                        clientFileGroupsBuilder -> {
+                                                            GroupKey groupKey =
+                                                                    groupKeyAndGroup.groupKey();
+                                                            DataFileGroupInternal dataFileGroup =
+                                                                    groupKeyAndGroup.dataFileGroup();
+                                                            return PropagatedFutures.transform(
+                                                                    createClientFileGroupAndLogQueryStats(
+                                                                            groupKey,
+                                                                            dataFileGroup,
+                                                                            groupKey.getDownloaded(),
+                                                                            getFileGroupsByFilterRequest.preserveZipDirectories(),
+                                                                            getFileGroupsByFilterRequest.verifyIsolatedStructure()),
+                                                                    clientFileGroup -> {
+                                                                        if (clientFileGroup
+                                                                                != null) {
+                                                                            clientFileGroupsBuilder.add(
+                                                                                    clientFileGroup);
+                                                                        }
+                                                                        return clientFileGroupsBuilder;
+                                                                    },
+                                                                    sequentialControlExecutor);
+                                                        },
+                                                        sequentialControlExecutor);
+                                    }
+                                    return PropagatedFutures.transform(
+                                            clientFileGroupsBuilderFuture,
+                                            ImmutableList.Builder::build,
+                                            sequentialControlExecutor);
+                                },
+                                sequentialControlExecutor),
+                sequentialControlExecutor);
+    }
+
+    private static ImmutableList<GroupKeyAndGroup> filterGroups(
+            boolean includeAllGroups,
+            Optional<String> groupNameOptional,
+            boolean groupWithNoAccountOnly,
+            Optional<Account> accountOptional,
+            Optional<Boolean> downloadedOptional,
+            List<GroupKeyAndGroup> allGroupKeyAndGroups) {
+        var builder = ImmutableList.<GroupKeyAndGroup>builder();
+        if (includeAllGroups) {
+            builder.addAll(allGroupKeyAndGroups);
+            return builder.build();
+        }
+
+        for (GroupKeyAndGroup groupKeyAndGroup : allGroupKeyAndGroups) {
+            GroupKey groupKey = groupKeyAndGroup.groupKey();
+            DataFileGroupInternal dataFileGroup = groupKeyAndGroup.dataFileGroup();
+            if (applyFilter(
+                    groupNameOptional,
+                    groupWithNoAccountOnly,
+                    accountOptional,
+                    downloadedOptional,
+                    groupKey,
+                    dataFileGroup)) {
+                builder.add(groupKeyAndGroup);
+            }
+        }
+        return builder.build();
+    }
+
+    /** Check if given data matches with {@code groupKey} and {@code fileGroup}. */
+    private static boolean applyFilter(
+            Optional<String> groupNameOptional,
+            boolean groupWithNoAccountOnly,
+            Optional<Account> accountOptional,
+            Optional<Boolean> downloadedOptional,
+            GroupKey groupKey,
+            DataFileGroupInternal fileGroup) {
+        // If request filters by group name, ensure name is equal
+        if (groupNameOptional.isPresent()
+                && !TextUtils.equals(groupNameOptional.get(), groupKey.getGroupName())) {
+            return false;
+        }
+
+        // When the caller requests account independent groups only.
+        if (groupWithNoAccountOnly) {
+            return !groupKey.hasAccount();
+        }
+
+        // When the caller requests account dependent groups as well.
+        if (accountOptional.isPresent()
+                && !AccountUtil.serialize(accountOptional.get()).equals(groupKey.getAccount())) {
+            return false;
+        }
+
+        if (downloadedOptional.isPresent()
+                && !downloadedOptional.get().equals(groupKey.getDownloaded())) {
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Creates {@link DataDownloadFileGroupStats} from {@link ClientFileGroup} for remote logging
+     * purposes.
+     */
+    private static DataDownloadFileGroupStats createFileGroupDetails(
+            ClientFileGroup clientFileGroup) {
+        return DataDownloadFileGroupStats.newBuilder()
+                .setFileGroupName(clientFileGroup.getGroupName())
+                .setOwnerPackage(clientFileGroup.getOwnerPackage())
+                .setFileGroupVersionNumber(clientFileGroup.getVersionNumber())
+                .setFileCount(clientFileGroup.getFileCount())
+                .setVariantId(clientFileGroup.getVariantId())
+                .setBuildId(clientFileGroup.getBuildId())
+                .build();
+    }
+
+    @Override
+    public ListenableFuture<Void> importFiles(ImportFilesRequest importFilesRequest) {
+        GroupKey.Builder groupKeyBuilder =
+                GroupKey.newBuilder()
+                        .setGroupName(importFilesRequest.groupName())
+                        .setOwnerPackage(context.getPackageName());
+
+        if (importFilesRequest.accountOptional().isPresent()) {
+            groupKeyBuilder.setAccount(
+                    AccountUtil.serialize(importFilesRequest.accountOptional().get()));
+        }
+
+        GroupKey groupKey = groupKeyBuilder.build();
+
+        ImmutableList.Builder<DataFile> updatedDataFileListBuilder =
+                ImmutableList.builderWithExpectedSize(
+                        importFilesRequest.updatedDataFileList().size());
+        for (DownloadConfigProto.DataFile dataFile : importFilesRequest.updatedDataFileList()) {
+            updatedDataFileListBuilder.add(ProtoConversionUtil.convertDataFile(dataFile));
+        }
+
+        return futureSerializer.submitAsync(
+                () ->
+                        mobileDataDownloadManager.importFiles(
+                                groupKey,
+                                importFilesRequest.buildId(),
+                                importFilesRequest.variantId(),
+                                updatedDataFileListBuilder.build(),
+                                importFilesRequest.inlineFileMap(),
+                                importFilesRequest.customPropertyOptional(),
+                                customFileGroupValidator),
+                sequentialControlExecutor);
+    }
+
+    @Override
+    public ListenableFuture<Void> downloadFile(
+            SingleFileDownloadRequest singleFileDownloadRequest) {
+        return singleFileDownloader.download(
+                MddLiteConversionUtil.convertToDownloadRequest(singleFileDownloadRequest));
+    }
+
+    @Override
+    public ListenableFuture<ClientFileGroup> downloadFileGroup(
+            DownloadFileGroupRequest downloadFileGroupRequest) {
+        // Submit the call to sequentialControlExecutor, but don't use futureSerializer. This will
+        // ensure that multiple calls are enqueued to the executor in a FIFO order, but these calls
+        // won't block each other when the download is in progress.
+        return PropagatedFutures.submitAsync(
+                () ->
+                        PropagatedFutures.transformAsync(
+                                // Check if requested file group has already been downloaded
+                                getDownloadGroupState(downloadFileGroupRequest),
+                                downloadGroupState -> {
+                                    switch (downloadGroupState.getKind()) {
+                                        case IN_PROGRESS_FUTURE:
+                                            // If the file group download is in progress, return
+                                            // that future immediately
+                                            return downloadGroupState.inProgressFuture();
+                                        case DOWNLOADED_GROUP:
+                                            // If the file group is already downloaded, return
+                                            // that immediately.
+                                            return immediateFuture(
+                                                    downloadGroupState.downloadedGroup());
+                                        case PENDING_GROUP:
+                                            return downloadPendingFileGroup(
+                                                    downloadFileGroupRequest);
+                                    }
+                                    throw new AssertionError(
+                                            String.format(
+                                                    "received unsupported DownloadGroupState kind"
+                                                            + " %s",
+                                                    downloadGroupState.getKind()));
+                                },
+                                sequentialControlExecutor),
+                sequentialControlExecutor);
+    }
+
+    /** Helper method to download a group after it's determined to be pending. */
+    private ListenableFuture<ClientFileGroup> downloadPendingFileGroup(
+            DownloadFileGroupRequest downloadFileGroupRequest) {
+        String groupName = downloadFileGroupRequest.groupName();
+        GroupKey.Builder groupKeyBuilder =
+                GroupKey.newBuilder().setGroupName(groupName).setOwnerPackage(
+                        context.getPackageName());
+
+        if (downloadFileGroupRequest.accountOptional().isPresent()) {
+            groupKeyBuilder.setAccount(
+                    AccountUtil.serialize(downloadFileGroupRequest.accountOptional().get()));
+        }
+        if (downloadFileGroupRequest.variantIdOptional().isPresent()) {
+            groupKeyBuilder.setVariantId(downloadFileGroupRequest.variantIdOptional().get());
+        }
+
+        GroupKey groupKey = groupKeyBuilder.build();
+
+        if (downloadFileGroupRequest.listenerOptional().isPresent()) {
+            if (downloadMonitorOptional.isPresent()) {
+                downloadMonitorOptional
+                        .get()
+                        .addDownloadListener(groupName,
+                                downloadFileGroupRequest.listenerOptional().get());
+            } else {
+                return immediateFailedFuture(
+                        DownloadException.builder()
+                                .setDownloadResultCode(
+                                        DownloadResultCode.DOWNLOAD_MONITOR_NOT_PROVIDED_ERROR)
+                                .setMessage(
+                                        "downloadFileGroup: DownloadListener is present but "
+                                        + "Download Monitor"
+                                                + " is not provided!")
+                                .build());
+            }
+        }
+
+        Optional<DownloadConditions> downloadConditions;
+        try {
+            downloadConditions =
+                    downloadFileGroupRequest.downloadConditionsOptional().isPresent()
+                            ? Optional.of(
+                            ProtoConversionUtil.convert(
+                                    downloadFileGroupRequest.downloadConditionsOptional().get()))
+                            : Optional.absent();
+        } catch (InvalidProtocolBufferException e) {
+            return immediateFailedFuture(e);
+        }
+
+        // Get the key used for the download future map
+        ForegroundDownloadKey downloadKey =
+                ForegroundDownloadKey.ofFileGroup(
+                        downloadFileGroupRequest.groupName(),
+                        downloadFileGroupRequest.accountOptional(),
+                        downloadFileGroupRequest.variantIdOptional());
+
+        // Create a ListenableFutureTask to delay starting the downloadFuture until we can add the
+        // future to our map.
+        ListenableFutureTask<Void> startTask = ListenableFutureTask.create(() -> null);
+        ListenableFuture<ClientFileGroup> downloadFuture =
+                PropagatedFluentFuture.from(startTask)
+                        .transformAsync(
+                                unused ->
+                                        mobileDataDownloadManager.downloadFileGroup(
+                                                groupKey, downloadConditions,
+                                                customFileGroupValidator),
+                                sequentialControlExecutor)
+                        .transformAsync(
+                                dataFileGroup ->
+                                        createClientFileGroup(
+                                                dataFileGroup,
+                                                downloadFileGroupRequest.accountOptional().isPresent()
+                                                        ? AccountUtil.serialize(
+                                                        downloadFileGroupRequest.accountOptional().get())
+                                                        : null,
+                                                ClientFileGroup.Status.DOWNLOADED,
+                                                downloadFileGroupRequest.preserveZipDirectories(),
+                                                downloadFileGroupRequest.verifyIsolatedStructure(),
+                                                mobileDataDownloadManager,
+                                                sequentialControlExecutor,
+                                                fileStorage),
+                                sequentialControlExecutor)
+                        .transform(Preconditions::checkNotNull, sequentialControlExecutor);
+
+        // Get a handle on the download task so we can get the CFG during transforms
+        PropagatedFluentFuture<ClientFileGroup> downloadTaskFuture =
+                PropagatedFluentFuture.from(
+                                downloadFutureMap.add(downloadKey.toString(), downloadFuture))
+                        .transformAsync(
+                                unused -> {
+                                    // Now that the download future is added, start the task and
+                                    // return the future
+                                    startTask.run();
+                                    return downloadFuture;
+                                },
+                                sequentialControlExecutor);
+
+        ListenableFuture<ClientFileGroup> transformFuture =
+                downloadTaskFuture
+                        .transformAsync(
+                                unused -> downloadFutureMap.remove(downloadKey.toString()),
+                                sequentialControlExecutor)
+                        .transformAsync(
+                                unused -> {
+                                    ClientFileGroup clientFileGroup = getDone(downloadTaskFuture);
+
+                                    if (downloadFileGroupRequest.listenerOptional().isPresent()) {
+                                        try {
+                                            downloadFileGroupRequest.listenerOptional().get().onComplete(
+                                                    clientFileGroup);
+                                        } catch (Exception e) {
+                                            LogUtil.w(
+                                                    e,
+                                                    "%s: Listener onComplete failed for group %s",
+                                                    TAG,
+                                                    clientFileGroup.getGroupName());
+                                        }
+                                        if (downloadMonitorOptional.isPresent()) {
+                                            downloadMonitorOptional.get().removeDownloadListener(
+                                                    groupName);
+                                        }
+                                    }
+                                    return immediateFuture(clientFileGroup);
+                                },
+                                sequentialControlExecutor);
+
+        PropagatedFutures.addCallback(
+                transformFuture,
+                new FutureCallback<ClientFileGroup>() {
+                    @Override
+                    public void onSuccess(ClientFileGroup result) {
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        if (downloadFileGroupRequest.listenerOptional().isPresent()) {
+                            downloadFileGroupRequest.listenerOptional().get().onFailure(t);
+
+                            if (downloadMonitorOptional.isPresent()) {
+                                downloadMonitorOptional.get().removeDownloadListener(groupName);
+                            }
+                        }
+
+                        // Remove future from map
+                        ListenableFuture<Void> unused = downloadFutureMap.remove(
+                                downloadKey.toString());
+                    }
+                },
+                sequentialControlExecutor);
+
+        return transformFuture;
+    }
+
+    @Override
+    public ListenableFuture<Void> downloadFileWithForegroundService(
+            SingleFileDownloadRequest singleFileDownloadRequest) {
+        return singleFileDownloader.downloadWithForegroundService(
+                MddLiteConversionUtil.convertToDownloadRequest(singleFileDownloadRequest));
+    }
+
+    @Override
+    public ListenableFuture<ClientFileGroup> downloadFileGroupWithForegroundService(
+            DownloadFileGroupRequest downloadFileGroupRequest) {
+        LogUtil.d("%s: downloadFileGroupWithForegroundService start.", TAG);
+        if (!foregroundDownloadServiceClassOptional.isPresent()) {
+            return immediateFailedFuture(
+                    new IllegalArgumentException(
+                            "downloadFileGroupWithForegroundService: ForegroundDownloadService is"
+                                    + " not"
+                                    + " provided!"));
+        }
+
+        if (!downloadMonitorOptional.isPresent()) {
+            return immediateFailedFuture(
+                    DownloadException.builder()
+                            .setDownloadResultCode(
+                                    DownloadResultCode.DOWNLOAD_MONITOR_NOT_PROVIDED_ERROR)
+                            .setMessage(
+                                    "downloadFileGroupWithForegroundService: Download Monitor is "
+                                    + "not provided!")
+                            .build());
+        }
+
+        // Submit the call to sequentialControlExecutor, but don't use futureSerializer. This will
+        // ensure that multiple calls are enqueued to the executor in a FIFO order, but these calls
+        // won't block each other when the download is in progress.
+        return PropagatedFutures.submitAsync(
+                () ->
+                        PropagatedFutures.transformAsync(
+                                // Check if requested file group has already been downloaded
+                                getDownloadGroupState(downloadFileGroupRequest),
+                                downloadGroupState -> {
+                                    switch (downloadGroupState.getKind()) {
+                                        case IN_PROGRESS_FUTURE:
+                                            // If the file group download is in progress, return
+                                            // that future immediately
+                                            return downloadGroupState.inProgressFuture();
+                                        case DOWNLOADED_GROUP:
+                                            // If the file group is already downloaded, return
+                                            // that immediately
+                                            return immediateFuture(
+                                                    downloadGroupState.downloadedGroup());
+                                        case PENDING_GROUP:
+                                            return downloadPendingFileGroupWithForegroundService(
+                                                    downloadFileGroupRequest,
+                                                    downloadGroupState.pendingGroup());
+                                    }
+                                    throw new AssertionError(
+                                            String.format(
+                                                    "received unsupported DownloadGroupState kind"
+                                                    + " %s",
+                                                    downloadGroupState.getKind()));
+                                },
+                                sequentialControlExecutor),
+                sequentialControlExecutor);
+    }
+
+    /**
+     * Helper method to download a file group in the foreground after it has been confirmed to be
+     * pending.
+     */
+    private ListenableFuture<ClientFileGroup> downloadPendingFileGroupWithForegroundService(
+            DownloadFileGroupRequest downloadFileGroupRequest, DataFileGroupInternal pendingGroup) {
+        // It's OK to recreate the NotificationChannel since it can also be used to restore a
+        // deleted channel and to update an existing channel's name, description, group, and/or
+        // importance.
+        NotificationUtil.createNotificationChannel(context);
+
+        String groupName = downloadFileGroupRequest.groupName();
+        GroupKey.Builder groupKeyBuilder =
+                GroupKey.newBuilder().setGroupName(groupName).setOwnerPackage(
+                        context.getPackageName());
+
+        if (downloadFileGroupRequest.accountOptional().isPresent()) {
+            groupKeyBuilder.setAccount(
+                    AccountUtil.serialize(downloadFileGroupRequest.accountOptional().get()));
+        }
+        if (downloadFileGroupRequest.variantIdOptional().isPresent()) {
+            groupKeyBuilder.setVariantId(downloadFileGroupRequest.variantIdOptional().get());
+        }
+
+        GroupKey groupKey = groupKeyBuilder.build();
+        ForegroundDownloadKey foregroundDownloadKey =
+                ForegroundDownloadKey.ofFileGroup(
+                        groupName,
+                        downloadFileGroupRequest.accountOptional(),
+                        downloadFileGroupRequest.variantIdOptional());
+
+        DownloadListener downloadListenerWithNotification =
+                createDownloadListenerWithNotification(downloadFileGroupRequest, pendingGroup);
+        // The downloadMonitor will trigger the DownloadListener.
+        downloadMonitorOptional
+                .get()
+                .addDownloadListener(
+                        downloadFileGroupRequest.groupName(), downloadListenerWithNotification);
+
+        Optional<DownloadConditions> downloadConditions;
+        try {
+            downloadConditions =
+                    downloadFileGroupRequest.downloadConditionsOptional().isPresent()
+                            ? Optional.of(
+                            ProtoConversionUtil.convert(
+                                    downloadFileGroupRequest.downloadConditionsOptional().get()))
+                            : Optional.absent();
+        } catch (InvalidProtocolBufferException e) {
+            return immediateFailedFuture(e);
+        }
+
+        // Create a ListenableFutureTask to delay starting the downloadFuture until we can add the
+        // future to our map.
+        ListenableFutureTask<Void> startTask = ListenableFutureTask.create(() -> null);
+        PropagatedFluentFuture<ClientFileGroup> downloadFileGroupFuture =
+                PropagatedFluentFuture.from(startTask)
+                        .transformAsync(
+                                unused ->
+                                        mobileDataDownloadManager.downloadFileGroup(
+                                                groupKey, downloadConditions,
+                                                customFileGroupValidator),
+                                sequentialControlExecutor)
+                        .transformAsync(
+                                dataFileGroup ->
+                                        createClientFileGroup(
+                                                dataFileGroup,
+                                                downloadFileGroupRequest.accountOptional().isPresent()
+                                                        ? AccountUtil.serialize(
+                                                        downloadFileGroupRequest.accountOptional().get())
+                                                        : null,
+                                                ClientFileGroup.Status.DOWNLOADED,
+                                                downloadFileGroupRequest.preserveZipDirectories(),
+                                                downloadFileGroupRequest.verifyIsolatedStructure(),
+                                                mobileDataDownloadManager,
+                                                sequentialControlExecutor,
+                                                fileStorage),
+                                sequentialControlExecutor)
+                        .transform(Preconditions::checkNotNull, sequentialControlExecutor);
+
+        ListenableFuture<ClientFileGroup> transformFuture =
+                PropagatedFutures.transformAsync(
+                        foregroundDownloadFutureMap.add(
+                                foregroundDownloadKey.toString(), downloadFileGroupFuture),
+                        unused -> {
+                            // Now that the download future is added, start the task and return
+                            // the future
+                            startTask.run();
+                            return downloadFileGroupFuture;
+                        },
+                        sequentialControlExecutor);
+
+        PropagatedFutures.addCallback(
+                transformFuture,
+                new FutureCallback<ClientFileGroup>() {
+                    @Override
+                    public void onSuccess(ClientFileGroup clientFileGroup) {
+                        // Currently the MobStore monitor does not support onSuccess so we have
+                        // to add
+                        // callback to the download future here.
+                        try {
+                            downloadListenerWithNotification.onComplete(clientFileGroup);
+                        } catch (Exception e) {
+                            LogUtil.w(
+                                    e,
+                                    "%s: Listener onComplete failed for group %s",
+                                    TAG,
+                                    clientFileGroup.getGroupName());
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        // Currently the MobStore monitor does not support onFailure so we have
+                        // to add
+                        // callback to the download future here.
+                        downloadListenerWithNotification.onFailure(t);
+                    }
+                },
+                sequentialControlExecutor);
+
+        return transformFuture;
+    }
+
+    /** Helper method to return a {@link DownloadGroupState} for the given request. */
+    private ListenableFuture<DownloadGroupState> getDownloadGroupState(
+            DownloadFileGroupRequest downloadFileGroupRequest) {
+        ForegroundDownloadKey foregroundDownloadKey =
+                ForegroundDownloadKey.ofFileGroup(
+                        downloadFileGroupRequest.groupName(),
+                        downloadFileGroupRequest.accountOptional(),
+                        downloadFileGroupRequest.variantIdOptional());
+
+        String groupName = downloadFileGroupRequest.groupName();
+        GroupKey.Builder groupKeyBuilder =
+                GroupKey.newBuilder().setGroupName(groupName).setOwnerPackage(
+                        context.getPackageName());
+
+        if (downloadFileGroupRequest.accountOptional().isPresent()) {
+            groupKeyBuilder.setAccount(
+                    AccountUtil.serialize(downloadFileGroupRequest.accountOptional().get()));
+        }
+
+        if (downloadFileGroupRequest.variantIdOptional().isPresent()) {
+            groupKeyBuilder.setVariantId(downloadFileGroupRequest.variantIdOptional().get());
+        }
+
+        boolean isDownloadListenerPresent = downloadFileGroupRequest.listenerOptional().isPresent();
+        GroupKey groupKey = groupKeyBuilder.build();
+
+        return futureSerializer.submitAsync(
+                () -> {
+                    ListenableFuture<Optional<ListenableFuture<ClientFileGroup>>>
+                            foregroundDownloadFutureOptional =
+                            foregroundDownloadFutureMap.get(foregroundDownloadKey.toString());
+                    ListenableFuture<Optional<ListenableFuture<ClientFileGroup>>>
+                            backgroundDownloadFutureOptional =
+                            downloadFutureMap.get(foregroundDownloadKey.toString());
+
+                    return PropagatedFutures.whenAllSucceed(
+                                    foregroundDownloadFutureOptional,
+                                     backgroundDownloadFutureOptional)
+                            .callAsync(
+                                    () -> {
+                                        if (getDone(foregroundDownloadFutureOptional).isPresent()) {
+                                            return immediateFuture(
+                                                    DownloadGroupState.ofInProgressFuture(
+                                                            getDone(foregroundDownloadFutureOptional).get()));
+                                        } else if (getDone(
+                                                backgroundDownloadFutureOptional).isPresent()) {
+                                            return immediateFuture(
+                                                    DownloadGroupState.ofInProgressFuture(
+                                                            getDone(backgroundDownloadFutureOptional).get()));
+                                        }
+
+                                        // Get pending and downloaded versions to tell if we
+                                        // should return downloaded
+                                        // version early
+                                        ListenableFuture<GroupPair> fileGroupVersionsFuture =
+                                                PropagatedFutures.transformAsync(
+                                                        mobileDataDownloadManager.getFileGroup(
+                                                                groupKey, /* downloaded= */ false),
+                                                        pendingDataFileGroup ->
+                                                                PropagatedFutures.transform(
+                                                                        mobileDataDownloadManager.getFileGroup(
+                                                                                groupKey, /*
+                                                                                downloaded= */
+                                                                                true),
+                                                                        downloadedDataFileGroup ->
+                                                                                GroupPair.create(
+                                                                                        pendingDataFileGroup,
+                                                                                        downloadedDataFileGroup),
+                                                                        sequentialControlExecutor),
+                                                        sequentialControlExecutor);
+
+                                        return PropagatedFutures.transformAsync(
+                                                fileGroupVersionsFuture,
+                                                fileGroupVersionsPair -> {
+                                                    // if pending version is not null, return
+                                                    // pending version
+                                                    if (fileGroupVersionsPair.pendingGroup()
+                                                            != null) {
+                                                        return immediateFuture(
+                                                                DownloadGroupState.ofPendingGroup(
+                                                                        checkNotNull(
+                                                                                fileGroupVersionsPair.pendingGroup())));
+                                                    }
+                                                    // If both groups are null, return group not
+                                                    // found failure
+                                                    if (fileGroupVersionsPair.downloadedGroup()
+                                                            == null) {
+                                                        // TODO(b/174808410): Add Logging
+                                                        // file group is not pending nor
+                                                        // downloaded -- return failure.
+                                                        DownloadException failure =
+                                                                DownloadException.builder()
+                                                                        .setDownloadResultCode(
+                                                                                DownloadResultCode.GROUP_NOT_FOUND_ERROR)
+                                                                        .setMessage(
+                                                                                "Nothing to "
+                                                                                + "download for "
+                                                                                        + "file group: "
+                                                                                        + groupKey.getGroupName())
+                                                                        .build();
+                                                        if (isDownloadListenerPresent) {
+                                                            downloadFileGroupRequest.listenerOptional().get().onFailure(
+                                                                    failure);
+                                                        }
+                                                        return immediateFailedFuture(failure);
+                                                    }
+
+                                                    DataFileGroupInternal downloadedDataFileGroup =
+                                                            checkNotNull(
+                                                                    fileGroupVersionsPair.downloadedGroup());
+
+                                                    // Notify download listener (if present) that
+                                                    // file group has been
+                                                    // downloaded.
+                                                    if (isDownloadListenerPresent) {
+                                                        downloadMonitorOptional
+                                                                .get()
+                                                                .addDownloadListener(
+                                                                        downloadFileGroupRequest.groupName(),
+                                                                        downloadFileGroupRequest.listenerOptional().get());
+                                                    }
+                                                    PropagatedFluentFuture<ClientFileGroup>
+                                                            transformFuture =
+                                                            PropagatedFluentFuture.from(
+                                                                            createClientFileGroup(
+                                                                                    downloadedDataFileGroup,
+                                                                                    downloadFileGroupRequest.accountOptional().isPresent()
+                                                                                            ?
+                                                                                            AccountUtil.serialize(
+                                                                                            downloadFileGroupRequest.accountOptional().get())
+                                                                                            : null,
+                                                                                    ClientFileGroup.Status.DOWNLOADED,
+                                                                                    downloadFileGroupRequest.preserveZipDirectories(),
+                                                                                    downloadFileGroupRequest.verifyIsolatedStructure(),
+                                                                                    mobileDataDownloadManager,
+                                                                                    sequentialControlExecutor,
+                                                                                    fileStorage))
+                                                                    .transform(
+                                                                            Preconditions::checkNotNull,
+                                                                            sequentialControlExecutor)
+                                                                    .transform(
+                                                                            clientFileGroup -> {
+                                                                                if (isDownloadListenerPresent) {
+                                                                                    try {
+                                                                                        downloadFileGroupRequest
+                                                                                                .listenerOptional()
+                                                                                                .get()
+                                                                                                .onComplete(
+                                                                                                        clientFileGroup);
+                                                                                    } catch (
+                                                                                            Exception e) {
+                                                                                        LogUtil.w(
+                                                                                                e,
+                                                                                                "%s: Listener onComplete failed for group %s",
+                                                                                                TAG,
+                                                                                                clientFileGroup.getGroupName());
+                                                                                    }
+                                                                                    downloadMonitorOptional
+                                                                                            .get()
+                                                                                            .removeDownloadListener(
+                                                                                                    groupName);
+                                                                                }
+                                                                                return clientFileGroup;
+                                                                            },
+                                                                            sequentialControlExecutor);
+                                                    transformFuture.addCallback(
+                                                            new FutureCallback<ClientFileGroup>() {
+                                                                @Override
+                                                                public void onSuccess(
+                                                                        ClientFileGroup result) {
+                                                                }
+
+                                                                @Override
+                                                                public void onFailure(Throwable t) {
+                                                                    if (isDownloadListenerPresent) {
+                                                                        downloadMonitorOptional.get().removeDownloadListener(
+                                                                                groupName);
+                                                                    }
+                                                                }
+                                                            },
+                                                            sequentialControlExecutor);
+
+                                                    // Use directExecutor here since we are performing a trivial operation.
+                                                    return transformFuture.transform(
+                                                            DownloadGroupState::ofDownloadedGroup,
+                                                            directExecutor());
+                                                },
+                                                sequentialControlExecutor);
+                                    },
+                                    sequentialControlExecutor);
+                },
+                sequentialControlExecutor);
+    }
+
+    private DownloadListener createDownloadListenerWithNotification(
+            DownloadFileGroupRequest downloadRequest, DataFileGroupInternal fileGroup) {
+
+        String networkPausedMessage = getNetworkPausedMessage(downloadRequest, fileGroup);
+
+        NotificationManagerCompat notificationManager = NotificationManagerCompat.from(context);
+        ForegroundDownloadKey foregroundDownloadKey =
+                ForegroundDownloadKey.ofFileGroup(
+                        downloadRequest.groupName(),
+                        downloadRequest.accountOptional(),
+                        downloadRequest.variantIdOptional());
+
+        NotificationCompat.Builder notification =
+                NotificationUtil.createNotificationBuilder(
+                        context,
+                        downloadRequest.groupSizeBytes(),
+                        downloadRequest.contentTitleOptional().or(downloadRequest.groupName()),
+                        downloadRequest.contentTextOptional().or(downloadRequest.groupName()));
+        int notificationKey = NotificationUtil.notificationKeyForKey(downloadRequest.groupName());
+
+        if (downloadRequest.showNotifications() == DownloadFileGroupRequest.ShowNotifications.ALL) {
+            NotificationUtil.createCancelAction(
+                    context,
+                    foregroundDownloadServiceClassOptional.get(),
+                    foregroundDownloadKey.toString(),
+                    notification,
+                    notificationKey);
+
+            notificationManager.notify(notificationKey, notification.build());
+        }
+
+        return new DownloadListener() {
+            @Override
+            public void onProgress(long currentSize) {
+                // TODO(b/229123693): return this future once DownloadListener has an async api.
+                // There can be a race condition, where onProgress can be called
+                // after onComplete or onFailure which removes the future and the notification.
+                // Check foregroundDownloadFutureMap first before updating notification.
+                ListenableFuture<?> unused =
+                        PropagatedFutures.transformAsync(
+                                foregroundDownloadFutureMap.containsKey(
+                                        foregroundDownloadKey.toString()),
+                                futureInProgress -> {
+                                    if (futureInProgress
+                                            && downloadRequest.showNotifications()
+                                            == DownloadFileGroupRequest.ShowNotifications.ALL) {
+                                        notification
+                                                .setCategory(NotificationCompat.CATEGORY_PROGRESS)
+                                                .setSmallIcon(android.R.drawable.stat_sys_download)
+                                                .setProgress(
+                                                        downloadRequest.groupSizeBytes(),
+                                                        (int) currentSize,
+                                                        /* indeterminate= */
+                                                        downloadRequest.groupSizeBytes() <= 0);
+                                        notificationManager.notify(notificationKey,
+                                                notification.build());
+                                    }
+                                    if (downloadRequest.listenerOptional().isPresent()) {
+                                        downloadRequest.listenerOptional().get().onProgress(
+                                                currentSize);
+                                    }
+                                    return immediateVoidFuture();
+                                },
+                                sequentialControlExecutor);
+            }
+
+            @Override
+            public void pausedForConnectivity() {
+                // TODO(b/229123693): return this future once DownloadListener has an async api.
+                // There can be a race condition, where pausedForConnectivity can be called
+                // after onComplete or onFailure which removes the future and the notification.
+                // Check foregroundDownloadFutureMap first before updating notification.
+                ListenableFuture<?> unused =
+                        PropagatedFutures.transformAsync(
+                                foregroundDownloadFutureMap.containsKey(
+                                        foregroundDownloadKey.toString()),
+                                futureInProgress -> {
+                                    if (futureInProgress
+                                            && downloadRequest.showNotifications()
+                                            == DownloadFileGroupRequest.ShowNotifications.ALL) {
+                                        notification
+                                                .setCategory(NotificationCompat.CATEGORY_STATUS)
+                                                .setContentText(networkPausedMessage)
+                                                .setSmallIcon(android.R.drawable.stat_sys_download)
+                                                .setOngoing(true)
+                                                // hide progress bar.
+                                                .setProgress(0, 0, false);
+                                        notificationManager.notify(notificationKey,
+                                                notification.build());
+                                    }
+                                    if (downloadRequest.listenerOptional().isPresent()) {
+                                        downloadRequest.listenerOptional().get().pausedForConnectivity();
+                                    }
+                                    return immediateVoidFuture();
+                                },
+                                sequentialControlExecutor);
+            }
+
+            @Override
+            public void onComplete(ClientFileGroup clientFileGroup) {
+                // TODO(b/229123693): return this future once DownloadListener has an async api.
+                ListenableFuture<?> unused =
+                        PropagatedFutures.submitAsync(
+                                () -> {
+                                    boolean onCompleteFailed = false;
+                                    if (downloadRequest.listenerOptional().isPresent()) {
+                                        try {
+                                            downloadRequest.listenerOptional().get().onComplete(
+                                                    clientFileGroup);
+                                        } catch (Exception e) {
+                                            LogUtil.w(
+                                                    e,
+                                                    "%s: Delegate onComplete failed for group %s, showing failure"
+                                                            + " notification.",
+                                                    TAG,
+                                                    clientFileGroup.getGroupName());
+                                            onCompleteFailed = true;
+                                        }
+                                    }
+
+                                    // Clear the notification action.
+                                    if (downloadRequest.showNotifications()
+                                            == DownloadFileGroupRequest.ShowNotifications.ALL) {
+                                        notification.mActions.clear();
+
+                                        if (onCompleteFailed) {
+                                            // Show download failed in notification.
+                                            notification
+                                                    .setCategory(NotificationCompat.CATEGORY_STATUS)
+                                                    .setContentText(
+                                                            NotificationUtil.getDownloadFailedMessage(
+                                                                    context))
+                                                    .setOngoing(false)
+                                                    .setSmallIcon(
+                                                            android.R.drawable.stat_sys_warning)
+                                                    // hide progress bar.
+                                                    .setProgress(0, 0, false);
+
+                                            notificationManager.notify(notificationKey,
+                                                    notification.build());
+                                        } else {
+                                            NotificationUtil.cancelNotificationForKey(
+                                                    context, downloadRequest.groupName());
+                                        }
+                                    }
+
+                                    downloadMonitorOptional.get().removeDownloadListener(
+                                            downloadRequest.groupName());
+
+                                    return foregroundDownloadFutureMap.remove(
+                                            foregroundDownloadKey.toString());
+                                },
+                                sequentialControlExecutor);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                // TODO(b/229123693): return this future once DownloadListener has an async api.
+                ListenableFuture<?> unused =
+                        PropagatedFutures.submitAsync(
+                                () -> {
+                                    if (downloadRequest.showNotifications()
+                                            == DownloadFileGroupRequest.ShowNotifications.ALL) {
+                                        // Clear the notification action.
+                                        notification.mActions.clear();
+
+                                        // Show download failed in notification.
+                                        notification
+                                                .setCategory(NotificationCompat.CATEGORY_STATUS)
+                                                .setContentText(
+                                                        NotificationUtil.getDownloadFailedMessage(
+                                                                context))
+                                                .setOngoing(false)
+                                                .setSmallIcon(android.R.drawable.stat_sys_warning)
+                                                // hide progress bar.
+                                                .setProgress(0, 0, false);
+
+                                        notificationManager.notify(notificationKey,
+                                                notification.build());
+                                    }
+
+                                    if (downloadRequest.listenerOptional().isPresent()) {
+                                        downloadRequest.listenerOptional().get().onFailure(t);
+                                    }
+                                    downloadMonitorOptional.get().removeDownloadListener(
+                                            downloadRequest.groupName());
+
+                                    return foregroundDownloadFutureMap.remove(
+                                            foregroundDownloadKey.toString());
+                                },
+                                sequentialControlExecutor);
+            }
+        };
+    }
+
+    // Helper method to get the correct network paused message
+    private String getNetworkPausedMessage(
+            DownloadFileGroupRequest downloadRequest, DataFileGroupInternal fileGroup) {
+        DeviceNetworkPolicy networkPolicyForDownload =
+                fileGroup.getDownloadConditions().getDeviceNetworkPolicy();
+        if (downloadRequest.downloadConditionsOptional().isPresent()) {
+            try {
+                networkPolicyForDownload =
+                        ProtoConversionUtil.convert(
+                                        downloadRequest.downloadConditionsOptional().get())
+                                .getDeviceNetworkPolicy();
+            } catch (InvalidProtocolBufferException unused) {
+                // Do nothing -- we will rely on the file group's network policy.
+            }
+        }
+
+        switch (networkPolicyForDownload) {
+            case DOWNLOAD_FIRST_ON_WIFI_THEN_ON_ANY_NETWORK: // fallthrough
+            case DOWNLOAD_ONLY_ON_WIFI:
+                return NotificationUtil.getDownloadPausedWifiMessage(context);
+            default:
+                return NotificationUtil.getDownloadPausedMessage(context);
+        }
+    }
+
+    @Override
+    public void cancelForegroundDownload(String downloadKey) {
+        LogUtil.d("%s: CancelForegroundDownload for key = %s", TAG, downloadKey);
+        ListenableFuture<?> unused =
+                PropagatedFutures.transformAsync(
+                        foregroundDownloadFutureMap.get(downloadKey),
+                        downloadFuture -> {
+                            if (downloadFuture.isPresent()) {
+                                LogUtil.v(
+                                        "%s: CancelForegroundDownload future found for key = %s, cancelling...",
+                                        TAG, downloadKey);
+                                downloadFuture.get().cancel(false);
+                            }
+                            return immediateVoidFuture();
+                        },
+                        sequentialControlExecutor);
+        // Attempt cancel with internal MDD Lite instance in case it's a single file uri (cancel call is
+        // a noop if internal MDD Lite doesn't know about it).
+        singleFileDownloader.cancelForegroundDownload(downloadKey);
+    }
+
+    @Override
+    public void schedulePeriodicTasks() {
+        schedulePeriodicTasksInternal(Optional.absent());
+    }
+
+    @Override
+    public ListenableFuture<Void> schedulePeriodicBackgroundTasks() {
+        return futureSerializer.submit(
+                () -> {
+                    schedulePeriodicTasksInternal(/* constraintOverridesMap= */ Optional.absent());
+                    return null;
+                },
+                sequentialControlExecutor);
+    }
+
+    @Override
+    public ListenableFuture<Void> schedulePeriodicBackgroundTasks(
+            Optional<Map<String, ConstraintOverrides>> constraintOverridesMap) {
+        return futureSerializer.submit(
+                () -> {
+                    schedulePeriodicTasksInternal(constraintOverridesMap);
+                    return null;
+                },
+                sequentialControlExecutor);
+    }
+
+    private void schedulePeriodicTasksInternal(
+            Optional<Map<String, ConstraintOverrides>> constraintOverridesMap) {
+        if (!taskSchedulerOptional.isPresent()) {
+            LogUtil.e(
+                    "%s: Called schedulePeriodicTasksInternal when taskScheduler is not provided.",
+                    TAG);
+            return;
+        }
+
+        TaskScheduler taskScheduler = taskSchedulerOptional.get();
+
+        // Schedule task that runs on charging without any network, every 6 hours.
+        taskScheduler.schedulePeriodicTask(
+                TaskScheduler.CHARGING_PERIODIC_TASK,
+                flags.chargingGcmTaskPeriod(),
+                NetworkState.NETWORK_STATE_ANY,
+                getConstraintOverrides(constraintOverridesMap,
+                        TaskScheduler.CHARGING_PERIODIC_TASK));
+
+        // Schedule maintenance task that runs on charging, once every day.
+        // This task should run even if mdd is disabled, to handle cleanup.
+        taskScheduler.schedulePeriodicTask(
+                TaskScheduler.MAINTENANCE_PERIODIC_TASK,
+                flags.maintenanceGcmTaskPeriod(),
+                NetworkState.NETWORK_STATE_ANY,
+                getConstraintOverrides(constraintOverridesMap,
+                        TaskScheduler.MAINTENANCE_PERIODIC_TASK));
+
+        // Schedule task that runs on cellular+charging, every 6 hours.
+        taskScheduler.schedulePeriodicTask(
+                TaskScheduler.CELLULAR_CHARGING_PERIODIC_TASK,
+                flags.cellularChargingGcmTaskPeriod(),
+                NetworkState.NETWORK_STATE_CONNECTED,
+                getConstraintOverrides(
+                        constraintOverridesMap, TaskScheduler.CELLULAR_CHARGING_PERIODIC_TASK));
+
+        // Schedule task that runs on wifi+charging, every 6 hours.
+        taskScheduler.schedulePeriodicTask(
+                TaskScheduler.WIFI_CHARGING_PERIODIC_TASK,
+                flags.wifiChargingGcmTaskPeriod(),
+                NetworkState.NETWORK_STATE_UNMETERED,
+                getConstraintOverrides(constraintOverridesMap,
+                        TaskScheduler.WIFI_CHARGING_PERIODIC_TASK));
+    }
+
+    private static Optional<ConstraintOverrides> getConstraintOverrides(
+            Optional<Map<String, ConstraintOverrides>> constraintOverridesMap,
+            String maintenancePeriodicTask) {
+        return constraintOverridesMap.isPresent()
+                ? Optional.fromNullable(constraintOverridesMap.get().get(maintenancePeriodicTask))
+                : Optional.absent();
+    }
+
+    @Override
+    public ListenableFuture<Void> cancelPeriodicBackgroundTasks() {
+        return futureSerializer.submit(
+                () -> {
+                    cancelPeriodicTasksInternal();
+                    return null;
+                },
+                sequentialControlExecutor);
+    }
+
+    private void cancelPeriodicTasksInternal() {
+        if (!taskSchedulerOptional.isPresent()) {
+            LogUtil.w("%s: Called cancelPeriodicTasksInternal when taskScheduler is not provided.",
+                    TAG);
+            return;
+        }
+
+        TaskScheduler taskScheduler = taskSchedulerOptional.get();
+
+        taskScheduler.cancelPeriodicTask(TaskScheduler.CHARGING_PERIODIC_TASK);
+        taskScheduler.cancelPeriodicTask(TaskScheduler.MAINTENANCE_PERIODIC_TASK);
+        taskScheduler.cancelPeriodicTask(TaskScheduler.CELLULAR_CHARGING_PERIODIC_TASK);
+        taskScheduler.cancelPeriodicTask(TaskScheduler.WIFI_CHARGING_PERIODIC_TASK);
+    }
+
+    @Override
+    public ListenableFuture<Void> handleTask(String tag) {
+        // All work done here that touches metadata (MobileDataDownloadManager) should be serialized
+        // through sequentialControlExecutor.
+        switch (tag) {
+            case TaskScheduler.MAINTENANCE_PERIODIC_TASK:
+                return futureSerializer.submitAsync(
+                        mobileDataDownloadManager::maintenance, sequentialControlExecutor);
+
+            case TaskScheduler.CHARGING_PERIODIC_TASK:
+                ListenableFuture<Void> refreshFileGroupsFuture = refreshFileGroups();
+                return PropagatedFutures.transformAsync(
+                        refreshFileGroupsFuture,
+                        propagateAsyncFunction(
+                                v -> mobileDataDownloadManager.verifyAllPendingGroups(
+                                        customFileGroupValidator)),
+                        sequentialControlExecutor);
+
+            case TaskScheduler.CELLULAR_CHARGING_PERIODIC_TASK:
+                return refreshAndDownload(false /*onWifi*/);
+
+            case TaskScheduler.WIFI_CHARGING_PERIODIC_TASK:
+                return refreshAndDownload(true /*onWifi*/);
+
+            default:
+                LogUtil.d("%s: gcm task doesn't belong to MDD", TAG);
+                return immediateFailedFuture(
+                        new IllegalArgumentException(
+                                "Unknown task tag sent to MDD.handleTask() " + tag));
+        }
+    }
+
+    private ListenableFuture<Void> refreshAndDownload(boolean onWifi) {
+        // We will do 2 passes to support 2-step downloads. In each step, we will refresh and then
+        // download.
+        return PropagatedFluentFuture.from(refreshFileGroups())
+                .transformAsync(
+                        v ->
+                                mobileDataDownloadManager.downloadAllPendingGroups(
+                                        onWifi, customFileGroupValidator),
+                        sequentialControlExecutor)
+                .transformAsync(v -> refreshFileGroups(), sequentialControlExecutor)
+                .transformAsync(
+                        v ->
+                                mobileDataDownloadManager.downloadAllPendingGroups(
+                                        onWifi, customFileGroupValidator),
+                        sequentialControlExecutor);
+    }
+
+    private ListenableFuture<Void> refreshFileGroups() {
+        List<ListenableFuture<Void>> refreshFutures = new ArrayList<>();
+        for (FileGroupPopulator fileGroupPopulator : fileGroupPopulatorList) {
+            refreshFutures.add(fileGroupPopulator.refreshFileGroups(this));
+        }
+
+        return PropagatedFutures.whenAllComplete(refreshFutures)
+                .call(() -> null, sequentialControlExecutor);
+    }
+
+    @Override
+    public ListenableFuture<Void> maintenance() {
+        return handleTask(TaskScheduler.MAINTENANCE_PERIODIC_TASK);
+    }
+
+    @Override
+    public ListenableFuture<Void> collectGarbage() {
+        return futureSerializer.submitAsync(
+                mobileDataDownloadManager::removeExpiredGroupsAndFiles, sequentialControlExecutor);
+    }
+
+    @Override
+    public ListenableFuture<Void> clear() {
+        return futureSerializer.submitAsync(
+                mobileDataDownloadManager::clear, sequentialControlExecutor);
+    }
+
+    // incompatible argument for parameter msg of e.
+    // incompatible types in return.
+    @Override
+    public String getDebugInfoAsString() {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        PrintWriter writer = new PrintWriter(out);
+        try {
+            // Okay to block here because this method is for debugging only.
+            mobileDataDownloadManager.dump(writer).get(DUMP_DEBUG_INFO_TIMEOUT, TimeUnit.SECONDS);
+            writer.println("==== MOBSTORE_DEBUG_INFO ====");
+            writer.print(fileStorage.getDebugInfo());
+        } catch (ExecutionException | TimeoutException e) {
+            String errString = String.format("%s: Couldn't get debug info: %s", TAG, e);
+            LogUtil.e(errString);
+            return errString;
+        } catch (InterruptedException e) {
+            // see <internal>
+            Thread.currentThread().interrupt();
+            String errString = String.format("%s: Couldn't get debug info: %s", TAG, e);
+            LogUtil.e(errString);
+            return errString;
+        }
+        writer.flush();
+        return out.toString();
+    }
+
+    @Override
+    public ListenableFuture<Void> reportUsage(UsageEvent usageEvent) {
+        eventLogger.logMddUsageEvent(createFileGroupDetails(usageEvent.clientFileGroup()), null);
+
+        return immediateVoidFuture();
+    }
+
+    private static DownloadFutureMap.StateChangeCallbacks createCallbacksForForegroundService(
+            Context context, Optional<Class<?>> foregroundDownloadServiceClassOptional) {
+        return new DownloadFutureMap.StateChangeCallbacks() {
+            @Override
+            public void onAdd(String key, int newSize) {
+                // Only start foreground service if this is the first future we are adding.
+                if (newSize == 1 && foregroundDownloadServiceClassOptional.isPresent()) {
+                    NotificationUtil.startForegroundDownloadService(
+                            context, foregroundDownloadServiceClassOptional.get(), key);
+                }
+            }
+
+            @Override
+            public void onRemove(String key, int newSize) {
+                // Only stop foreground service if there are no more futures remaining.
+                if (newSize == 0 && foregroundDownloadServiceClassOptional.isPresent()) {
+                    NotificationUtil.stopForegroundDownloadService(
+                            context, foregroundDownloadServiceClassOptional.get(), key);
+                }
+            }
+        };
+    }
 }
diff --git a/java/com/google/android/libraries/mobiledatadownload/ReadDataFileGroupsByFilterRequest.java b/java/com/google/android/libraries/mobiledatadownload/ReadDataFileGroupsByFilterRequest.java
new file mode 100644
index 0000000..c73be16
--- /dev/null
+++ b/java/com/google/android/libraries/mobiledatadownload/ReadDataFileGroupsByFilterRequest.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.android.libraries.mobiledatadownload;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import android.accounts.Account;
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Optional;
+import com.google.errorprone.annotations.Immutable;
+
+/** Request to get multiple data file groups after filtering. */
+@AutoValue
+@AutoValue.CopyAnnotations
+@Immutable
+@SuppressWarnings("Immutable")
+public abstract class ReadDataFileGroupsByFilterRequest {
+  ReadDataFileGroupsByFilterRequest() {}
+
+  // If this value is set to true, groupName should not be set.
+  public abstract boolean includeAllGroups();
+
+  // If this value is set to true, only groups without account will be returned, and accountOptional
+  // should be absent. The default value is false.
+  public abstract boolean groupWithNoAccountOnly();
+
+  public abstract Optional<String> groupNameOptional();
+
+  public abstract Optional<Account> accountOptional();
+
+  public abstract Optional<Boolean> downloadedOptional();
+
+  public static Builder newBuilder() {
+    return new AutoValue_ReadDataFileGroupsByFilterRequest.Builder()
+        .setIncludeAllGroups(false)
+        .setGroupWithNoAccountOnly(false);
+  }
+
+  /** Builder for {@link ReadDataFileGroupsByFilterRequest}. */
+  @AutoValue.Builder
+  public abstract static class Builder {
+    Builder() {}
+
+    /** Sets the flag whether all groups are included. */
+    public abstract Builder setIncludeAllGroups(boolean includeAllGroups);
+
+    /** Sets the flag whether to only return account independent groups. */
+    public abstract Builder setGroupWithNoAccountOnly(boolean groupWithNoAccountOnly);
+
+    /**
+     * Sets the name of the file group, which is optional. When groupNameOptional is absent, caller
+     * must set the includeAllGroups.
+     */
+    public abstract Builder setGroupNameOptional(Optional<String> groupNameOptional);
+
+    /** Sets the account that is associated with the file group, which is optional. */
+    public abstract Builder setAccountOptional(Optional<Account> accountOptional);
+
+    /**
+     * Use setDownloaded instead.
+     *
+     * @see setDownloaded.
+     */
+    abstract Builder setDownloadedOptional(Optional<Boolean> downloadedOptional);
+
+    /**
+     * Sets whether to include only downloaded or pending file groups.
+     *
+     * @param downloaded if set to true, only downloaded file groups returned. If set to false, only
+     *     pending groups are included. If not set, all groups are returned.
+     */
+    public Builder setDownloaded(Boolean downloaded) {
+      return setDownloadedOptional(Optional.of(downloaded));
+    }
+
+    abstract ReadDataFileGroupsByFilterRequest autoBuild();
+
+    public final ReadDataFileGroupsByFilterRequest build() {
+      ReadDataFileGroupsByFilterRequest readDataFileGroupsByFilterRequest = autoBuild();
+
+      if (readDataFileGroupsByFilterRequest.includeAllGroups()) {
+        checkArgument(!readDataFileGroupsByFilterRequest.groupNameOptional().isPresent());
+        checkArgument(!readDataFileGroupsByFilterRequest.accountOptional().isPresent());
+        checkArgument(!readDataFileGroupsByFilterRequest.groupWithNoAccountOnly());
+        checkArgument(!readDataFileGroupsByFilterRequest.downloadedOptional().isPresent());
+      } else {
+        checkArgument(
+            readDataFileGroupsByFilterRequest.groupNameOptional().isPresent(),
+            "Request must provide a group name or source to filter by");
+      }
+
+      if (readDataFileGroupsByFilterRequest.groupWithNoAccountOnly()) {
+        checkArgument(!readDataFileGroupsByFilterRequest.accountOptional().isPresent());
+      }
+
+      return readDataFileGroupsByFilterRequest;
+    }
+  }
+}
diff --git a/java/com/google/android/libraries/mobiledatadownload/downloader/MultiSchemeFileDownloader.java b/java/com/google/android/libraries/mobiledatadownload/downloader/MultiSchemeFileDownloader.java
index 7dfc5b4..cd769df 100644
--- a/java/com/google/android/libraries/mobiledatadownload/downloader/MultiSchemeFileDownloader.java
+++ b/java/com/google/android/libraries/mobiledatadownload/downloader/MultiSchemeFileDownloader.java
@@ -61,6 +61,20 @@
     return new Builder();
   }
 
+  /** Returns a Builder containing all registered FileDownloaders. */
+  public Builder toBuilder() {
+    final Builder builder = new Builder();
+    for (Map.Entry<String, FileDownloader> entry : schemeToDownloader.entrySet()) {
+      builder.addScheme(entry.getKey(), entry.getValue());
+    }
+    return builder;
+  }
+
+  /** Returns true if a FileDownloader is registered for the given scheme. */
+  public boolean supportsScheme(String scheme) {
+    return schemeToDownloader.containsKey(scheme);
+  }
+
   private MultiSchemeFileDownloader(Builder builder) {
     this.schemeToDownloader = ImmutableMap.copyOf(builder.schemeToDownloader);
   }
diff --git a/java/com/google/android/libraries/mobiledatadownload/internal/DataFileGroupValidator.java b/java/com/google/android/libraries/mobiledatadownload/internal/DataFileGroupValidator.java
index f05e831..3d49157 100644
--- a/java/com/google/android/libraries/mobiledatadownload/internal/DataFileGroupValidator.java
+++ b/java/com/google/android/libraries/mobiledatadownload/internal/DataFileGroupValidator.java
@@ -20,6 +20,7 @@
 import com.google.android.libraries.mobiledatadownload.file.transforms.TransformProtos;
 import com.google.android.libraries.mobiledatadownload.internal.logging.LogUtil;
 import com.google.android.libraries.mobiledatadownload.internal.util.FileGroupUtil;
+import com.google.mobiledatadownload.TransformProto.Transforms;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFile;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFile.ChecksumType;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFileGroupInternal;
@@ -27,7 +28,6 @@
 import com.google.mobiledatadownload.internal.MetadataProto.DeltaFile;
 import com.google.mobiledatadownload.internal.MetadataProto.DeltaFile.DiffDecoder;
 import com.google.mobiledatadownload.internal.MetadataProto.DownloadConditions.DeviceNetworkPolicy;
-import com.google.mobiledatadownload.TransformProto.Transforms;
 
 /** DataFileGroupValidator - validates the passed in DataFileGroup */
 public class DataFileGroupValidator {
diff --git a/java/com/google/android/libraries/mobiledatadownload/internal/ExceptionToMddResultMapper.java b/java/com/google/android/libraries/mobiledatadownload/internal/ExceptionToMddResultMapper.java
index f5fa536..44db96c 100644
--- a/java/com/google/android/libraries/mobiledatadownload/internal/ExceptionToMddResultMapper.java
+++ b/java/com/google/android/libraries/mobiledatadownload/internal/ExceptionToMddResultMapper.java
@@ -16,6 +16,8 @@
 package com.google.android.libraries.mobiledatadownload.internal;
 
 import com.google.android.libraries.mobiledatadownload.DownloadException;
+import com.google.mobiledatadownload.LogEnumsProto.MddLibApiResult;
+
 import java.io.IOException;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
@@ -27,39 +29,40 @@
  */
 public final class ExceptionToMddResultMapper {
 
-  private ExceptionToMddResultMapper() {}
-
-  /**
-   * Maps Exception to appropriate int for logging.
-   *
-   * <p>If t is an ExecutionException, then the cause (t.getCause()) is mapped.
-   */
-  public static int map(Throwable t) {
-
-    Throwable cause;
-    if (t instanceof ExecutionException) {
-      cause = t.getCause();
-    } else {
-      cause = t;
+    private ExceptionToMddResultMapper() {
     }
 
-    if (cause instanceof CancellationException) {
-      return 0;
-    } else if (cause instanceof InterruptedException) {
-      return 0;
-    } else if (cause instanceof IOException) {
-      return 0;
-    } else if (cause instanceof IllegalStateException) {
-      return 0;
-    } else if (cause instanceof IllegalArgumentException) {
-      return 0;
-    } else if (cause instanceof UnsupportedOperationException) {
-      return 0;
-    } else if (cause instanceof DownloadException) {
-      return 0;
-    }
+    /**
+     * Maps Exception to appropriate MddLibApiResult.Code for logging.
+     *
+     * <p>If t is an ExecutionException, then the cause (t.getCause()) is mapped.
+     */
+    public static MddLibApiResult.Code map(Throwable t) {
 
-    // Capturing all other errors occurred during execution as unknown errors.
-    return 0;
-  }
+        Throwable cause;
+        if (t instanceof ExecutionException) {
+            cause = t.getCause();
+        } else {
+            cause = t;
+        }
+
+        if (cause instanceof CancellationException) {
+            return MddLibApiResult.Code.RESULT_CANCELLED;
+        } else if (cause instanceof InterruptedException) {
+            return MddLibApiResult.Code.RESULT_INTERRUPTED;
+        } else if (cause instanceof IOException) {
+            return MddLibApiResult.Code.RESULT_IO_ERROR;
+        } else if (cause instanceof IllegalStateException) {
+            return MddLibApiResult.Code.RESULT_ILLEGAL_STATE;
+        } else if (cause instanceof IllegalArgumentException) {
+            return MddLibApiResult.Code.RESULT_ILLEGAL_ARGUMENT;
+        } else if (cause instanceof UnsupportedOperationException) {
+            return MddLibApiResult.Code.RESULT_UNSUPPORTED_OPERATION;
+        } else if (cause instanceof DownloadException) {
+            return MddLibApiResult.Code.RESULT_DOWNLOAD_ERROR;
+        }
+
+        // Capturing all other errors occurred during execution as unknown errors.
+        return MddLibApiResult.Code.RESULT_FAILURE;
+    }
 }
diff --git a/java/com/google/android/libraries/mobiledatadownload/internal/ExpirationHandler.java b/java/com/google/android/libraries/mobiledatadownload/internal/ExpirationHandler.java
index 7c0f698..adf0997 100644
--- a/java/com/google/android/libraries/mobiledatadownload/internal/ExpirationHandler.java
+++ b/java/com/google/android/libraries/mobiledatadownload/internal/ExpirationHandler.java
@@ -35,11 +35,11 @@
 import com.google.android.libraries.mobiledatadownload.tracing.PropagatedFutures;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.mobiledatadownload.LogEnumsProto.MddClientEvent;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFile;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFileGroupInternal;
 import com.google.mobiledatadownload.internal.MetadataProto.GroupKey;
 import com.google.mobiledatadownload.internal.MetadataProto.NewFileKey;
-import com.google.mobiledatadownload.LogEnumsProto.MddClientEvent;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
diff --git a/java/com/google/android/libraries/mobiledatadownload/internal/FileGroupManager.java b/java/com/google/android/libraries/mobiledatadownload/internal/FileGroupManager.java
index 7cf3eb6..068523d 100644
--- a/java/com/google/android/libraries/mobiledatadownload/internal/FileGroupManager.java
+++ b/java/com/google/android/libraries/mobiledatadownload/internal/FileGroupManager.java
@@ -73,6 +73,9 @@
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.errorprone.annotations.CheckReturnValue;
+import com.google.mobiledatadownload.LogEnumsProto.MddClientEvent;
+import com.google.mobiledatadownload.LogEnumsProto.MddDownloadResult;
+import com.google.mobiledatadownload.LogProto.DataDownloadFileGroupStats;
 import com.google.mobiledatadownload.internal.MetadataProto;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFile;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFile.AndroidSharingType;
@@ -86,9 +89,6 @@
 import com.google.mobiledatadownload.internal.MetadataProto.GroupKeyProperties;
 import com.google.mobiledatadownload.internal.MetadataProto.NewFileKey;
 import com.google.mobiledatadownload.internal.MetadataProto.SharedFile;
-import com.google.mobiledatadownload.LogEnumsProto.MddClientEvent;
-import com.google.mobiledatadownload.LogEnumsProto.MddDownloadResult;
-import com.google.mobiledatadownload.LogProto.DataDownloadFileGroupStats;
 import com.google.protobuf.Any;
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -2344,7 +2344,7 @@
       }
     }
     ImmutableMap<DataFile, NewFileKey> nonSideloadedKeyMap =
-        nonSideloadedKeyMapBuilder.build();
+        nonSideloadedKeyMapBuilder.buildKeepingLast();
 
     return PropagatedFluentFuture.from(
             sharedFileManager.getOnDeviceUris(ImmutableSet.copyOf(nonSideloadedKeyMap.values())))
@@ -2358,7 +2358,7 @@
                   onDeviceUriMap.put(keyMapEntry.getKey(), nonSideloadedUriMap.get(newFileKey));
                 }
               }
-              return onDeviceUriMap.build();
+              return onDeviceUriMap.buildKeepingLast();
             },
             sequentialControlExecutor);
   }
@@ -2378,7 +2378,7 @@
       isolatedFileUrisBuilder.put(
           dataFile, FileGroupUtil.appendIsolatedFileUri(isolatedRootUri, dataFile));
     }
-    return isolatedFileUrisBuilder.build();
+    return isolatedFileUrisBuilder.buildKeepingLast();
   }
 
   /**
@@ -2426,7 +2426,7 @@
             TAG, isolatedUri, onDeviceUri);
       }
     }
-    return verifiedUriMapBuilder.build();
+    return verifiedUriMapBuilder.buildKeepingLast();
   }
 
   /**
@@ -2845,9 +2845,6 @@
     if (!prevGroup.getAllowedReadersEnum().equals(newGroup.getAllowedReadersEnum())) {
       return Optional.of(0);
     }
-//    if (!prevGroup.getExperimentInfo().equals(newGroup.getExperimentInfo())) {
-//      return Optional.of(0);
-//    }
     return Optional.absent();
   }
 
diff --git a/java/com/google/android/libraries/mobiledatadownload/internal/MobileDataDownloadManager.java b/java/com/google/android/libraries/mobiledatadownload/internal/MobileDataDownloadManager.java
index b9496e2..002b896 100644
--- a/java/com/google/android/libraries/mobiledatadownload/internal/MobileDataDownloadManager.java
+++ b/java/com/google/android/libraries/mobiledatadownload/internal/MobileDataDownloadManager.java
@@ -53,13 +53,13 @@
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.errorprone.annotations.CheckReturnValue;
+import com.google.mobiledatadownload.LogEnumsProto.MddClientEvent;
+import com.google.mobiledatadownload.TransformProto.Transforms;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFile;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFile.ChecksumType;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFileGroupInternal;
 import com.google.mobiledatadownload.internal.MetadataProto.DownloadConditions;
 import com.google.mobiledatadownload.internal.MetadataProto.GroupKey;
-import com.google.mobiledatadownload.LogEnumsProto.MddClientEvent;
-import com.google.mobiledatadownload.TransformProto.Transforms;
 import com.google.protobuf.Any;
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -337,7 +337,7 @@
                 // downloaded, pendingGroup must be non-null.
                 DataFileGroupInternal group = checkNotNull(getDone(pendingGroupFuture));
                 eventLogger.logEventSampled(
-                    MddClientEvent.Code.EVENT_CODE_UNSPECIFIED,
+                    MddClientEvent.Code.DATA_DOWNLOAD_COMPLETE_IMMEDIATE,
                     group.getGroupName(),
                     group.getFileGroupVersionNumber(),
                     group.getBuildId(),
@@ -439,7 +439,7 @@
     if (useIsolatedStructure) {
       isolatedUriMapBuilder.putAll(fileGroupManager.getIsolatedFileUris(dataFileGroup));
     }
-    ImmutableMap<DataFile, Uri> isolatedUriMap = isolatedUriMapBuilder.build();
+    ImmutableMap<DataFile, Uri> isolatedUriMap = isolatedUriMapBuilder.buildKeepingLast();
 
     return PropagatedFluentFuture.from(init())
         .transformAsync(
@@ -491,7 +491,7 @@
                   finalUriMapBuilder.put(entry);
                 }
               }
-              return finalUriMapBuilder.build();
+              return finalUriMapBuilder.buildKeepingLast();
             },
             sequentialControlExecutor);
   }
diff --git a/java/com/google/android/libraries/mobiledatadownload/internal/SharedFileManager.java b/java/com/google/android/libraries/mobiledatadownload/internal/SharedFileManager.java
index 2c1775d..6ba466f 100644
--- a/java/com/google/android/libraries/mobiledatadownload/internal/SharedFileManager.java
+++ b/java/com/google/android/libraries/mobiledatadownload/internal/SharedFileManager.java
@@ -57,6 +57,7 @@
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.errorprone.annotations.CheckReturnValue;
+import com.google.mobiledatadownload.LogEnumsProto.MddClientEvent;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFile;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFileGroupInternal;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFileGroupInternal.AllowedReaders;
@@ -68,7 +69,6 @@
 import com.google.mobiledatadownload.internal.MetadataProto.GroupKey;
 import com.google.mobiledatadownload.internal.MetadataProto.NewFileKey;
 import com.google.mobiledatadownload.internal.MetadataProto.SharedFile;
-import com.google.mobiledatadownload.LogEnumsProto.MddClientEvent;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
@@ -963,7 +963,7 @@
                   uriMapBuilder.put(newFileKey, onDeviceUri);
                 }
               }
-              return immediateFuture(uriMapBuilder.build());
+              return immediateFuture(uriMapBuilder.buildKeepingLast());
             },
             sequentialControlExecutor);
   }
diff --git a/java/com/google/android/libraries/mobiledatadownload/internal/downloader/DeltaFileDownloaderCallbackImpl.java b/java/com/google/android/libraries/mobiledatadownload/internal/downloader/DeltaFileDownloaderCallbackImpl.java
index 345616d..2583905 100644
--- a/java/com/google/android/libraries/mobiledatadownload/internal/downloader/DeltaFileDownloaderCallbackImpl.java
+++ b/java/com/google/android/libraries/mobiledatadownload/internal/downloader/DeltaFileDownloaderCallbackImpl.java
@@ -38,13 +38,13 @@
 import com.google.common.base.Ascii;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.mobiledatadownload.LogProto.DataDownloadFileGroupStats;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFile;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFileGroupInternal.AllowedReaders;
 import com.google.mobiledatadownload.internal.MetadataProto.DeltaFile;
 import com.google.mobiledatadownload.internal.MetadataProto.FileStatus;
 import com.google.mobiledatadownload.internal.MetadataProto.GroupKey;
 import com.google.mobiledatadownload.internal.MetadataProto.NewFileKey;
-import com.google.mobiledatadownload.LogProto.DataDownloadFileGroupStats;
 import java.io.IOException;
 import java.util.concurrent.Executor;
 
diff --git a/java/com/google/android/libraries/mobiledatadownload/internal/logging/DownloadStateLogger.java b/java/com/google/android/libraries/mobiledatadownload/internal/logging/DownloadStateLogger.java
index 85b13a9..b10692b 100644
--- a/java/com/google/android/libraries/mobiledatadownload/internal/logging/DownloadStateLogger.java
+++ b/java/com/google/android/libraries/mobiledatadownload/internal/logging/DownloadStateLogger.java
@@ -18,6 +18,7 @@
 import com.google.errorprone.annotations.CheckReturnValue;
 import com.google.mobiledatadownload.LogEnumsProto.MddClientEvent;
 import com.google.mobiledatadownload.LogProto.DataDownloadFileGroupStats;
+import com.google.mobiledatadownload.LogProto.MddDownloadLatency;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFileGroupBookkeeping;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFileGroupInternal;
 
@@ -56,7 +57,7 @@
   public void logStarted(DataFileGroupInternal fileGroup) {
     switch (operation) {
       case DOWNLOAD:
-        logEventWithDataFileGroup(MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, fileGroup);
+        logEventWithDataFileGroup(MddClientEvent.Code.DATA_DOWNLOAD_STARTED, fileGroup);
         break;
       case IMPORT:
         logEventWithDataFileGroup(MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, fileGroup);
@@ -89,7 +90,7 @@
   public void logComplete(DataFileGroupInternal fileGroup) {
     switch (operation) {
       case DOWNLOAD:
-        logEventWithDataFileGroup(MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, fileGroup);
+        logEventWithDataFileGroup(MddClientEvent.Code.DATA_DOWNLOAD_COMPLETE, fileGroup);
         logDownloadLatency(fileGroup);
         break;
       case IMPORT:
@@ -120,7 +121,12 @@
     long downloadStartedTimestamp = bookkeeping.getGroupDownloadStartedTimestampInMillis();
     long downloadCompleteTimestamp = bookkeeping.getGroupDownloadedTimestampInMillis();
 
-    Void downloadLatency = null;
+    MddDownloadLatency downloadLatency =
+        MddDownloadLatency.newBuilder()
+            .setDownloadAttemptCount(bookkeeping.getDownloadStartedCount())
+            .setDownloadLatencyMs(downloadCompleteTimestamp - downloadStartedTimestamp)
+            .setTotalLatencyMs(downloadCompleteTimestamp - newFilesReceivedTimestamp)
+            .build();
 
     eventLogger.logMddDownloadLatency(fileGroupDetails, downloadLatency);
   }
diff --git a/java/com/google/android/libraries/mobiledatadownload/internal/logging/EventLogger.java b/java/com/google/android/libraries/mobiledatadownload/internal/logging/EventLogger.java
index 83e8311..03d42d3 100644
--- a/java/com/google/android/libraries/mobiledatadownload/internal/logging/EventLogger.java
+++ b/java/com/google/android/libraries/mobiledatadownload/internal/logging/EventLogger.java
@@ -21,110 +21,119 @@
 import com.google.mobiledatadownload.LogEnumsProto.MddClientEvent;
 import com.google.mobiledatadownload.LogEnumsProto.MddDownloadResult;
 import com.google.mobiledatadownload.LogProto.DataDownloadFileGroupStats;
+import com.google.mobiledatadownload.LogProto.MddDownloadLatency;
 import com.google.mobiledatadownload.LogProto.MddFileGroupStatus;
+import com.google.mobiledatadownload.LogProto.MddLibApiResultLog;
+import com.google.mobiledatadownload.LogProto.MddNetworkStats;
 import com.google.mobiledatadownload.LogProto.MddStorageStats;
+
 import java.util.List;
 
 /** Interface for remote logging. */
 public interface EventLogger {
 
-  /** Log an mdd event */
-  void logEventSampled(MddClientEvent.Code eventCode);
+    /** Log an mdd event */
+    void logEventSampled(MddClientEvent.Code eventCode);
 
-  /** Log an mdd event with an associated file group. */
-  void logEventSampled(
-      MddClientEvent.Code eventCode,
-      String fileGroupName,
-      int fileGroupVersionNumber,
-      long buildId,
-      String variantId);
+    /** Log an mdd event with an associated file group. */
+    void logEventSampled(
+            MddClientEvent.Code eventCode,
+            String fileGroupName,
+            int fileGroupVersionNumber,
+            long buildId,
+            String variantId);
 
-  /**
-   * Log an mdd event. This not sampled. Caller should make sure this method is called after
-   * sampling at the passed in value of sample interval.
-   */
-  void logEventAfterSample(MddClientEvent.Code eventCode, int sampleInterval);
+    /**
+     * Log an mdd event. This not sampled. Caller should make sure this method is called after
+     * sampling at the passed in value of sample interval.
+     */
+    void logEventAfterSample(MddClientEvent.Code eventCode, int sampleInterval);
 
-  /**
-   * Log mdd file group stats. The buildFileGroupStats callable is only called if the event is going
-   * to be logged.
-   *
-   * @param buildFileGroupStats callable which builds a List of FileGroupStatusWithDetails. Each
-   *     file group status will be logged individually.
-   * @return a future that completes when the logging work is done. The future will complete with a
-   *     failure if the callable fails or if there is an error when logging.
-   */
-  ListenableFuture<Void> logMddFileGroupStats(
-      AsyncCallable<List<FileGroupStatusWithDetails>> buildFileGroupStats);
+    /**
+     * Log mdd file group stats. The buildFileGroupStats callable is only called if the event is
+     * going
+     * to be logged.
+     *
+     * @param buildFileGroupStats callable which builds a List of FileGroupStatusWithDetails. Each
+     *                            file group status will be logged individually.
+     * @return a future that completes when the logging work is done. The future will complete
+     * with a
+     * failure if the callable fails or if there is an error when logging.
+     */
+    ListenableFuture<Void> logMddFileGroupStats(
+            AsyncCallable<List<FileGroupStatusWithDetails>> buildFileGroupStats);
 
-  /** Simple wrapper class for MDD file group stats and details. */
-  @AutoValue
-  abstract class FileGroupStatusWithDetails {
-    abstract MddFileGroupStatus fileGroupStatus();
+    /** Simple wrapper class for MDD file group stats and details. */
+    @AutoValue
+    abstract class FileGroupStatusWithDetails {
+        abstract MddFileGroupStatus fileGroupStatus();
 
-    abstract DataDownloadFileGroupStats fileGroupDetails();
+        abstract DataDownloadFileGroupStats fileGroupDetails();
 
-    static FileGroupStatusWithDetails create(
-        MddFileGroupStatus fileGroupStatus, DataDownloadFileGroupStats fileGroupDetails) {
-      return new AutoValue_EventLogger_FileGroupStatusWithDetails(
-          fileGroupStatus, fileGroupDetails);
+        static FileGroupStatusWithDetails create(
+                MddFileGroupStatus fileGroupStatus, DataDownloadFileGroupStats fileGroupDetails) {
+            return new AutoValue_EventLogger_FileGroupStatusWithDetails(
+                    fileGroupStatus, fileGroupDetails);
+        }
     }
-  }
 
-  /** Log mdd api call stats. */
-  void logMddApiCallStats(DataDownloadFileGroupStats fileGroupDetails, Void apiCallStats);
+    /** Log mdd api call stats. */
+    void logMddApiCallStats(DataDownloadFileGroupStats fileGroupDetails, Void apiCallStats);
 
-  void logMddLibApiResultLog(Void mddLibApiResultLog);
+    void logMddLibApiResultLog(MddLibApiResultLog mddLibApiResultLog);
 
-  /**
-   * Log mdd storage stats. The buildMddStorageStats callable is only called if the event is going
-   * to be logged.
-   *
-   * @param buildMddStorageStats callable which builds the MddStorageStats to log.
-   * @return a future that completes when the logging work is done. The future will complete with a
-   *     failure if the callable fails or if there is an error when logging.
-   */
-  ListenableFuture<Void> logMddStorageStats(AsyncCallable<MddStorageStats> buildMddStorageStats);
+    /**
+     * Log mdd storage stats. The buildMddStorageStats callable is only called if the event is going
+     * to be logged.
+     *
+     * @param buildMddStorageStats callable which builds the MddStorageStats to log.
+     * @return a future that completes when the logging work is done. The future will complete
+     * with a
+     * failure if the callable fails or if there is an error when logging.
+     */
+    ListenableFuture<Void> logMddStorageStats(AsyncCallable<MddStorageStats> buildMddStorageStats);
 
-  /**
-   * Log mdd network stats. The buildMddNetworkStats callable is only called if the event is going
-   * to be logged.
-   *
-   * @param buildMddNetworkStats callable which builds the Void to log.
-   * @return a future that completes when the logging work is done. The future will complete with a
-   *     failure if the callable fails or if there is an error when logging.
-   */
-  ListenableFuture<Void> logMddNetworkStats(AsyncCallable<Void> buildMddNetworkStats);
+    /**
+     * Log mdd network stats. The buildMddNetworkStats callable is only called if the event is going
+     * to be logged.
+     *
+     * @param buildMddNetworkStats callable which builds the MddNetworkStats to log.
+     * @return a future that completes when the logging work is done. The future will complete
+     * with a
+     * failure if the callable fails or if there is an error when logging.
+     */
+    ListenableFuture<Void> logMddNetworkStats(AsyncCallable<MddNetworkStats> buildMddNetworkStats);
 
-  /** Log the number of unaccounted files/metadata deleted during maintenance */
-  void logMddDataDownloadFileExpirationEvent(int eventCode, int count);
+    /** Log the number of unaccounted files/metadata deleted during maintenance */
+    void logMddDataDownloadFileExpirationEvent(int eventCode, int count);
 
-  /** Log the network savings of MDD download features */
-  void logMddNetworkSavings(
-      DataDownloadFileGroupStats fileGroupDetails,
-      int code,
-      long fullFileSize,
-      long downloadedFileSize,
-      String fileId,
-      int deltaIndex);
+    /** Log the network savings of MDD download features */
+    void logMddNetworkSavings(
+            DataDownloadFileGroupStats fileGroupDetails,
+            int code,
+            long fullFileSize,
+            long downloadedFileSize,
+            String fileId,
+            int deltaIndex);
 
-  /** Log mdd download result events. */
-  void logMddDownloadResult(
-      MddDownloadResult.Code code, DataDownloadFileGroupStats fileGroupDetails);
+    /** Log mdd download result events. */
+    void logMddDownloadResult(
+            MddDownloadResult.Code code, DataDownloadFileGroupStats fileGroupDetails);
 
-  /** Log stats of mdd {@code getFileGroup} and {@code getFileGroupByFilter} calls. */
-  void logMddQueryStats(DataDownloadFileGroupStats fileGroupDetails);
+    /** Log stats of mdd {@code getFileGroup} and {@code getFileGroupByFilter} calls. */
+    void logMddQueryStats(DataDownloadFileGroupStats fileGroupDetails);
 
-  /** Log mdd stats on android sharing events. */
-  void logMddAndroidSharingLog(Void event);
+    /** Log mdd stats on android sharing events. */
+    void logMddAndroidSharingLog(Void event);
 
-  /** Log mdd download latency. */
-  void logMddDownloadLatency(DataDownloadFileGroupStats fileGroupStats, Void downloadLatency);
+    /** Log mdd download latency. */
+    void logMddDownloadLatency(
+            DataDownloadFileGroupStats fileGroupStats, MddDownloadLatency downloadLatency);
 
-  /** Log mdd usage event. */
-  void logMddUsageEvent(DataDownloadFileGroupStats fileGroupDetails, Void usageEventLog);
+    /** Log mdd usage event. */
+    void logMddUsageEvent(DataDownloadFileGroupStats fileGroupDetails, Void usageEventLog);
 
-  /** Log new config received event. */
-  void logNewConfigReceived(
-      DataDownloadFileGroupStats fileGroupDetails, Void newConfigReceivedInfo);
+    /** Log new config received event. */
+    void logNewConfigReceived(
+            DataDownloadFileGroupStats fileGroupDetails, Void newConfigReceivedInfo);
 }
diff --git a/java/com/google/android/libraries/mobiledatadownload/internal/logging/LogUtil.java b/java/com/google/android/libraries/mobiledatadownload/internal/logging/LogUtil.java
index bc4375e..149d7f9 100644
--- a/java/com/google/android/libraries/mobiledatadownload/internal/logging/LogUtil.java
+++ b/java/com/google/android/libraries/mobiledatadownload/internal/logging/LogUtil.java
@@ -17,7 +17,6 @@
 
 import android.annotation.SuppressLint;
 import android.util.Log;
-import com.google.errorprone.annotations.CanIgnoreReturnValue;
 import com.google.errorprone.annotations.FormatMethod;
 import com.google.errorprone.annotations.FormatString;
 import java.util.Locale;
@@ -30,7 +29,6 @@
 
   private static final Random random = new Random();
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   public static int getLogPriority() {
     int level = Log.ASSERT;
     while (level > Log.VERBOSE) {
@@ -42,7 +40,6 @@
     return level;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   public static int v(String msg) {
     if (Log.isLoggable(TAG, Log.VERBOSE)) {
       return Log.v(TAG, msg);
@@ -50,7 +47,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @FormatMethod
   public static int v(@FormatString String format, Object obj0) {
     if (Log.isLoggable(TAG, Log.VERBOSE)) {
@@ -60,7 +56,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @FormatMethod
   public static int v(@FormatString String format, Object obj0, Object obj1) {
     if (Log.isLoggable(TAG, Log.VERBOSE)) {
@@ -70,7 +65,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @FormatMethod
   public static int v(@FormatString String format, Object... params) {
     if (Log.isLoggable(TAG, Log.VERBOSE)) {
@@ -80,7 +74,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   public static int d(String msg) {
     if (Log.isLoggable(TAG, Log.DEBUG)) {
       return Log.d(TAG, msg);
@@ -88,7 +81,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @FormatMethod
   public static int d(@FormatString String format, Object obj0) {
     if (Log.isLoggable(TAG, Log.DEBUG)) {
@@ -98,7 +90,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @FormatMethod
   public static int d(@FormatString String format, Object obj0, Object obj1) {
     if (Log.isLoggable(TAG, Log.DEBUG)) {
@@ -108,7 +99,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @FormatMethod
   public static int d(@FormatString String format, Object... params) {
     if (Log.isLoggable(TAG, Log.DEBUG)) {
@@ -118,7 +108,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @FormatMethod
   public static int d(@Nullable Throwable tr, @FormatString String format, Object... params) {
     if (Log.isLoggable(TAG, Log.DEBUG)) {
@@ -128,7 +117,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   public static int i(String msg) {
     if (Log.isLoggable(TAG, Log.INFO)) {
       return Log.i(TAG, msg);
@@ -136,7 +124,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @FormatMethod
   public static int i(@FormatString String format, Object obj0) {
     if (Log.isLoggable(TAG, Log.INFO)) {
@@ -146,7 +133,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @FormatMethod
   public static int i(@FormatString String format, Object obj0, Object obj1) {
     if (Log.isLoggable(TAG, Log.INFO)) {
@@ -156,7 +142,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @FormatMethod
   public static int i(@FormatString String format, Object... params) {
     if (Log.isLoggable(TAG, Log.INFO)) {
@@ -166,7 +151,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   public static int e(String msg) {
     if (Log.isLoggable(TAG, Log.ERROR)) {
       return Log.e(TAG, msg);
@@ -174,7 +158,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @FormatMethod
   public static int e(@FormatString String format, Object obj0) {
     if (Log.isLoggable(TAG, Log.ERROR)) {
@@ -184,7 +167,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @FormatMethod
   public static int e(@FormatString String format, Object obj0, Object obj1) {
     if (Log.isLoggable(TAG, Log.ERROR)) {
@@ -194,7 +176,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @FormatMethod
   public static int e(@FormatString String format, Object... params) {
     if (Log.isLoggable(TAG, Log.ERROR)) {
@@ -204,7 +185,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @SuppressLint("LogTagMismatch")
   public static int e(@Nullable Throwable tr, String msg) {
     if (Log.isLoggable(TAG, Log.ERROR)) {
@@ -219,13 +199,11 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @FormatMethod
   public static int e(@Nullable Throwable tr, @FormatString String format, Object... params) {
     return Log.isLoggable(TAG, Log.ERROR) ? e(tr, format(format, params)) : 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   public static int w(String msg) {
     if (Log.isLoggable(TAG, Log.WARN)) {
       return Log.w(TAG, msg);
@@ -233,7 +211,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @FormatMethod
   public static int w(@FormatString String format, Object obj0) {
     if (Log.isLoggable(TAG, Log.WARN)) {
@@ -243,7 +220,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @FormatMethod
   public static int w(@FormatString String format, Object obj0, Object obj1) {
     if (Log.isLoggable(TAG, Log.WARN)) {
@@ -253,7 +229,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @FormatMethod
   public static int w(@FormatString String format, Object... params) {
     if (Log.isLoggable(TAG, Log.WARN)) {
@@ -263,7 +238,6 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @SuppressLint("LogTagMismatch")
   @FormatMethod
   public static int w(@Nullable Throwable tr, @FormatString String format, Object... params) {
@@ -280,13 +254,11 @@
     return 0;
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   @FormatMethod
   private static String format(@FormatString String format, Object... args) {
     return String.format(Locale.US, format, args);
   }
 
-  @CanIgnoreReturnValue // pushed down from class to method; see <internal>
   public static boolean shouldSampleInterval(long sampleInterval) {
     if (sampleInterval <= 0L) {
       if (sampleInterval < 0L) {
diff --git a/java/com/google/android/libraries/mobiledatadownload/internal/logging/MddEventLogger.java b/java/com/google/android/libraries/mobiledatadownload/internal/logging/MddEventLogger.java
index 620421c..9be820d 100644
--- a/java/com/google/android/libraries/mobiledatadownload/internal/logging/MddEventLogger.java
+++ b/java/com/google/android/libraries/mobiledatadownload/internal/logging/MddEventLogger.java
@@ -21,6 +21,7 @@
 import android.content.Context;
 import android.content.Intent;
 import android.content.IntentFilter;
+
 import com.google.android.libraries.mobiledatadownload.Flags;
 import com.google.android.libraries.mobiledatadownload.Logger;
 import com.google.android.libraries.mobiledatadownload.tracing.PropagatedFluentFuture;
@@ -34,10 +35,14 @@
 import com.google.mobiledatadownload.LogProto.AndroidClientInfo;
 import com.google.mobiledatadownload.LogProto.DataDownloadFileGroupStats;
 import com.google.mobiledatadownload.LogProto.MddDeviceInfo;
+import com.google.mobiledatadownload.LogProto.MddDownloadLatency;
 import com.google.mobiledatadownload.LogProto.MddDownloadResultLog;
+import com.google.mobiledatadownload.LogProto.MddLibApiResultLog;
 import com.google.mobiledatadownload.LogProto.MddLogData;
+import com.google.mobiledatadownload.LogProto.MddNetworkStats;
 import com.google.mobiledatadownload.LogProto.MddStorageStats;
 import com.google.mobiledatadownload.LogProto.StableSamplingInfo;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -45,327 +50,354 @@
 /** Assembles data and logs them with underlying {@link Logger}. */
 public final class MddEventLogger implements EventLogger {
 
-  private static final String TAG = "MddEventLogger";
+    private static final String TAG = "MddEventLogger";
 
-  private final Context context;
-  private final Logger logger;
-  // A process that has mdi download module loaded will get restarted if a new module version is
-  // installed.
-  private final int moduleVersion;
-  private final String hostPackageName;
-  private final Flags flags;
-  private final LogSampler logSampler;
+    private final Context context;
+    private final Logger logger;
+    // A process that has mdi download module loaded will get restarted if a new module version is
+    // installed.
+    private final int moduleVersion;
+    private final String hostPackageName;
+    private final Flags flags;
+    private final LogSampler logSampler;
 
-  private Optional<LoggingStateStore> loggingStateStore = Optional.absent();
+    private Optional<LoggingStateStore> loggingStateStore = Optional.absent();
 
-  public MddEventLogger(
-      Context context, Logger logger, int moduleVersion, LogSampler logSampler, Flags flags) {
-    this.context = context;
-    this.logger = logger;
-    this.moduleVersion = moduleVersion;
-    this.hostPackageName = context.getPackageName();
-    this.logSampler = logSampler;
-    this.flags = flags;
-  }
-
-  /**
-   * This should be called before MddEventLogger is used. If it is not called before MddEventLogger
-   * is used, stable sampling will not be used.
-   *
-   * <p>Note(rohitsat): this is required because LoggingStateStore is constructed with a PDS in the
-   * MainMddLibModule. MddEventLogger is required to construct the MainMddLibModule.
-   *
-   * @param loggingStateStore the LoggingStateStore that contains the persisted random number for
-   *     stable sampling.
-   */
-  public void setLoggingStateStore(LoggingStateStore loggingStateStore) {
-    this.loggingStateStore = Optional.of(loggingStateStore);
-  }
-
-  @Override
-  public void logEventSampled(MddClientEvent.Code eventCode) {
-    sampleAndSendLogEvent(eventCode, MddLogData.newBuilder(), flags.mddDefaultSampleInterval());
-  }
-
-  @Override
-  public void logEventSampled(
-      MddClientEvent.Code eventCode,
-      String fileGroupName,
-      int fileGroupVersionNumber,
-      long buildId,
-      String variantId) {
-
-    DataDownloadFileGroupStats dataDownloadFileGroupStats =
-        DataDownloadFileGroupStats.newBuilder()
-            .setFileGroupName(fileGroupName)
-            .setFileGroupVersionNumber(fileGroupVersionNumber)
-            .setBuildId(buildId)
-            .setVariantId(variantId)
-            .build();
-
-    sampleAndSendLogEvent(
-        eventCode,
-        MddLogData.newBuilder().setDataDownloadFileGroupStats(dataDownloadFileGroupStats),
-        flags.mddDefaultSampleInterval());
-  }
-
-  @Override
-  public void logEventAfterSample(MddClientEvent.Code eventCode, int sampleInterval) {
-    // TODO(b/138392640): delete this method once the pds migration is complete. If it's necessary
-    // for other use cases, we can establish a pattern where this class is still responsible for
-    // sampling.
-    MddLogData.Builder logData = MddLogData.newBuilder();
-    processAndSendEventWithoutStableSampling(eventCode, logData, sampleInterval);
-  }
-
-  @Override
-  public void logMddApiCallStats(DataDownloadFileGroupStats fileGroupDetails, Void apiCallStats) {
-    // TODO(b/144684763): update this to use stable sampling. Leaving it as is for now since it is
-    // fairly high volume.
-    long sampleInterval = flags.apiLoggingSampleInterval();
-    if (!LogUtil.shouldSampleInterval(sampleInterval)) {
-      return;
+    public MddEventLogger(
+            Context context, Logger logger, int moduleVersion, LogSampler logSampler, Flags flags) {
+        this.context = context;
+        this.logger = logger;
+        this.moduleVersion = moduleVersion;
+        this.hostPackageName = context.getPackageName();
+        this.logSampler = logSampler;
+        this.flags = flags;
     }
-    MddLogData.Builder logData =
-        MddLogData.newBuilder().setDataDownloadFileGroupStats(fileGroupDetails);
-    processAndSendEventWithoutStableSampling(
-        MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, logData, sampleInterval);
-  }
 
-  @Override
-  public void logMddLibApiResultLog(Void mddLibApiResultLog) {
-    MddLogData.Builder logData = MddLogData.newBuilder();
-
-    sampleAndSendLogEvent(
-        MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, logData, flags.apiLoggingSampleInterval());
-  }
-
-  @Override
-  public ListenableFuture<Void> logMddFileGroupStats(
-      AsyncCallable<List<EventLogger.FileGroupStatusWithDetails>> buildFileGroupStats) {
-    return lazySampleAndSendLogEvent(
-        MddClientEvent.Code.DATA_DOWNLOAD_FILE_GROUP_STATUS,
-        () ->
-            PropagatedFutures.transform(
-                buildFileGroupStats.call(),
-                fileGroupStatusAndDetailsList -> {
-                  List<MddLogData> allMddLogData = new ArrayList<>();
-
-                  for (FileGroupStatusWithDetails fileGroupStatusAndDetails :
-                      fileGroupStatusAndDetailsList) {
-                    allMddLogData.add(
-                        MddLogData.newBuilder()
-                            .setMddFileGroupStatus(fileGroupStatusAndDetails.fileGroupStatus())
-                            .setDataDownloadFileGroupStats(
-                                fileGroupStatusAndDetails.fileGroupDetails())
-                            .build());
-                  }
-                  return allMddLogData;
-                },
-                directExecutor()),
-        flags.groupStatsLoggingSampleInterval());
-  }
-
-  @Override
-  public ListenableFuture<Void> logMddStorageStats(
-      AsyncCallable<MddStorageStats> buildStorageStats) {
-    return lazySampleAndSendLogEvent(
-        MddClientEvent.Code.DATA_DOWNLOAD_STORAGE_STATS,
-        () ->
-            PropagatedFutures.transform(
-                buildStorageStats.call(),
-                storageStats ->
-                    Arrays.asList(MddLogData.newBuilder().setMddStorageStats(storageStats).build()),
-                directExecutor()),
-        flags.storageStatsLoggingSampleInterval());
-  }
-
-  @Override
-  public ListenableFuture<Void> logMddNetworkStats(AsyncCallable<Void> buildNetworkStats) {
-    return lazySampleAndSendLogEvent(
-        MddClientEvent.Code.EVENT_CODE_UNSPECIFIED,
-        () ->
-            PropagatedFutures.transform(
-                buildNetworkStats.call(), networkStats -> Arrays.asList(), directExecutor()),
-        flags.networkStatsLoggingSampleInterval());
-  }
-
-  @Override
-  public void logMddDataDownloadFileExpirationEvent(int eventCode, int count) {
-    MddLogData.Builder logData = MddLogData.newBuilder();
-    sampleAndSendLogEvent(
-        MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, logData, flags.mddDefaultSampleInterval());
-  }
-
-  @Override
-  public void logMddNetworkSavings(
-      DataDownloadFileGroupStats fileGroupDetails,
-      int code,
-      long fullFileSize,
-      long downloadedFileSize,
-      String fileId,
-      int deltaIndex) {
-    MddLogData.Builder logData = MddLogData.newBuilder();
-
-    sampleAndSendLogEvent(
-        MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, logData, flags.mddDefaultSampleInterval());
-  }
-
-  @Override
-  public void logMddQueryStats(DataDownloadFileGroupStats fileGroupDetails) {
-    MddLogData.Builder logData = MddLogData.newBuilder();
-
-    sampleAndSendLogEvent(
-        MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, logData, flags.mddDefaultSampleInterval());
-  }
-
-  @Override
-  public void logMddDownloadLatency(
-      DataDownloadFileGroupStats fileGroupDetails, Void downloadLatency) {
-    MddLogData.Builder logData =
-        MddLogData.newBuilder().setDataDownloadFileGroupStats(fileGroupDetails);
-
-    sampleAndSendLogEvent(
-        MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, logData, flags.mddDefaultSampleInterval());
-  }
-
-  @Override
-  public void logMddDownloadResult(
-      MddDownloadResult.Code code, DataDownloadFileGroupStats fileGroupDetails) {
-    MddLogData.Builder logData =
-        MddLogData.newBuilder()
-            .setMddDownloadResultLog(
-                MddDownloadResultLog.newBuilder()
-                    .setResult(code)
-                    .setDataDownloadFileGroupStats(fileGroupDetails));
-
-    sampleAndSendLogEvent(
-        MddClientEvent.Code.DATA_DOWNLOAD_RESULT_LOG, logData, flags.mddDefaultSampleInterval());
-  }
-
-  @Override
-  public void logMddAndroidSharingLog(Void event) {
-    // TODO(b/144684763): consider moving this to stable sampling depending on frequency of events.
-    long sampleInterval = flags.mddAndroidSharingSampleInterval();
-    if (!LogUtil.shouldSampleInterval(sampleInterval)) {
-      return;
+    /**
+     * This should be called before MddEventLogger is used. If it is not called before
+     * MddEventLogger
+     * is used, stable sampling will not be used.
+     *
+     * <p>Note(rohitsat): this is required because LoggingStateStore is constructed with a PDS in
+     * the
+     * MainMddLibModule. MddEventLogger is required to construct the MainMddLibModule.
+     *
+     * @param loggingStateStore the LoggingStateStore that contains the persisted random number for
+     *                          stable sampling.
+     */
+    public void setLoggingStateStore(LoggingStateStore loggingStateStore) {
+        this.loggingStateStore = Optional.of(loggingStateStore);
     }
-    MddLogData.Builder logData = MddLogData.newBuilder();
-    processAndSendEventWithoutStableSampling(
-        MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, logData, sampleInterval);
-  }
 
-  @Override
-  public void logMddUsageEvent(DataDownloadFileGroupStats fileGroupDetails, Void usageEventLog) {
-    MddLogData.Builder logData =
-        MddLogData.newBuilder().setDataDownloadFileGroupStats(fileGroupDetails);
+    @Override
+    public void logEventSampled(MddClientEvent.Code eventCode) {
+        sampleAndSendLogEvent(eventCode, MddLogData.newBuilder(), flags.mddDefaultSampleInterval());
+    }
 
-    sampleAndSendLogEvent(
-        MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, logData, flags.mddDefaultSampleInterval());
-  }
+    @Override
+    public void logEventSampled(
+            MddClientEvent.Code eventCode,
+            String fileGroupName,
+            int fileGroupVersionNumber,
+            long buildId,
+            String variantId) {
 
-  @Override
-  public void logNewConfigReceived(
-      DataDownloadFileGroupStats fileGroupDetails, Void newConfigReceivedInfo) {
-    MddLogData.Builder logData =
-        MddLogData.newBuilder().setDataDownloadFileGroupStats(fileGroupDetails);
-    sampleAndSendLogEvent(
-        MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, logData, flags.mddDefaultSampleInterval());
-  }
+        DataDownloadFileGroupStats dataDownloadFileGroupStats =
+                DataDownloadFileGroupStats.newBuilder()
+                        .setFileGroupName(fileGroupName)
+                        .setFileGroupVersionNumber(fileGroupVersionNumber)
+                        .setBuildId(buildId)
+                        .setVariantId(variantId)
+                        .build();
 
-  /**
-   * Determines whether the log event will be a part of the sample, and if so calls {@code
-   * buildStats} to construct the log event. This is like {@link sampleAndSendLogEvent} but
-   * constructs the log event lazy. This is useful if constructing the log event is expensive.
-   */
-  private ListenableFuture<Void> lazySampleAndSendLogEvent(
-      MddClientEvent.Code eventCode,
-      AsyncCallable<List<MddLogData>> buildStats,
-      int sampleInterval) {
-    return PropagatedFutures.transformAsync(
-        logSampler.shouldLog(sampleInterval, loggingStateStore),
-        samplingInfoOptional -> {
-          if (!samplingInfoOptional.isPresent()) {
-            return immediateVoidFuture();
-          }
+        sampleAndSendLogEvent(
+                eventCode,
+                MddLogData.newBuilder().setDataDownloadFileGroupStats(dataDownloadFileGroupStats),
+                flags.mddDefaultSampleInterval());
+    }
 
-          return PropagatedFluentFuture.from(buildStats.call())
-              .transform(
-                  icingLogDataList -> {
-                    if (icingLogDataList != null) {
-                      for (MddLogData icingLogData : icingLogDataList) {
-                        processAndSendEvent(
-                            eventCode,
-                            icingLogData.toBuilder(),
-                            sampleInterval,
-                            samplingInfoOptional.get());
-                      }
+    @Override
+    public void logEventAfterSample(MddClientEvent.Code eventCode, int sampleInterval) {
+        // TODO(b/138392640): delete this method once the pds migration is complete. If it's
+        //  necessary
+        // for other use cases, we can establish a pattern where this class is still responsible for
+        // sampling.
+        MddLogData.Builder logData = MddLogData.newBuilder();
+        processAndSendEventWithoutStableSampling(eventCode, logData, sampleInterval);
+    }
+
+    @Override
+    public void logMddApiCallStats(DataDownloadFileGroupStats fileGroupDetails, Void apiCallStats) {
+        // TODO(b/144684763): update this to use stable sampling. Leaving it as is for now since
+        //  it is
+        // fairly high volume.
+        long sampleInterval = flags.apiLoggingSampleInterval();
+        if (!LogUtil.shouldSampleInterval(sampleInterval)) {
+            return;
+        }
+        MddLogData.Builder logData =
+                MddLogData.newBuilder().setDataDownloadFileGroupStats(fileGroupDetails);
+        processAndSendEventWithoutStableSampling(
+                MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, logData, sampleInterval);
+    }
+
+    @Override
+    public void logMddLibApiResultLog(MddLibApiResultLog mddLibApiResultLog) {
+        MddLogData.Builder logData = MddLogData.newBuilder().setMddLibApiResultLog(
+                mddLibApiResultLog);
+
+        sampleAndSendLogEvent(
+                MddClientEvent.Code.DATA_DOWNLOAD_LIB_API_RESULT,
+                logData,
+                flags.apiLoggingSampleInterval());
+    }
+
+    @Override
+    public ListenableFuture<Void> logMddFileGroupStats(
+            AsyncCallable<List<EventLogger.FileGroupStatusWithDetails>> buildFileGroupStats) {
+        return lazySampleAndSendLogEvent(
+                MddClientEvent.Code.DATA_DOWNLOAD_FILE_GROUP_STATUS,
+                () ->
+                        PropagatedFutures.transform(
+                                buildFileGroupStats.call(),
+                                fileGroupStatusAndDetailsList -> {
+                                    List<MddLogData> allMddLogData = new ArrayList<>();
+
+                                    for (FileGroupStatusWithDetails fileGroupStatusAndDetails :
+                                            fileGroupStatusAndDetailsList) {
+                                        allMddLogData.add(
+                                                MddLogData.newBuilder()
+                                                        .setMddFileGroupStatus(
+                                                                fileGroupStatusAndDetails.fileGroupStatus())
+                                                        .setDataDownloadFileGroupStats(
+                                                                fileGroupStatusAndDetails.fileGroupDetails())
+                                                        .build());
+                                    }
+                                    return allMddLogData;
+                                },
+                                directExecutor()),
+                flags.groupStatsLoggingSampleInterval());
+    }
+
+    @Override
+    public ListenableFuture<Void> logMddStorageStats(
+            AsyncCallable<MddStorageStats> buildStorageStats) {
+        return lazySampleAndSendLogEvent(
+                MddClientEvent.Code.DATA_DOWNLOAD_STORAGE_STATS,
+                () ->
+                        PropagatedFutures.transform(
+                                buildStorageStats.call(),
+                                storageStats ->
+                                        Arrays.asList(MddLogData.newBuilder().setMddStorageStats(
+                                                storageStats).build()),
+                                directExecutor()),
+                flags.storageStatsLoggingSampleInterval());
+    }
+
+    @Override
+    public ListenableFuture<Void> logMddNetworkStats(
+            AsyncCallable<MddNetworkStats> buildNetworkStats) {
+        return lazySampleAndSendLogEvent(
+                MddClientEvent.Code.DATA_DOWNLOAD_NETWORK_STATS,
+                () ->
+                        PropagatedFutures.transform(
+                                buildNetworkStats.call(),
+                                networkStats ->
+                                        Arrays.asList(MddLogData.newBuilder().setMddNetworkStats(
+                                                networkStats).build()),
+                                directExecutor()),
+                flags.networkStatsLoggingSampleInterval());
+    }
+
+    @Override
+    public void logMddDataDownloadFileExpirationEvent(int eventCode, int count) {
+        MddLogData.Builder logData = MddLogData.newBuilder();
+        sampleAndSendLogEvent(
+                MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, logData,
+                flags.mddDefaultSampleInterval());
+    }
+
+    @Override
+    public void logMddNetworkSavings(
+            DataDownloadFileGroupStats fileGroupDetails,
+            int code,
+            long fullFileSize,
+            long downloadedFileSize,
+            String fileId,
+            int deltaIndex) {
+        MddLogData.Builder logData = MddLogData.newBuilder();
+
+        sampleAndSendLogEvent(
+                MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, logData,
+                flags.mddDefaultSampleInterval());
+    }
+
+    @Override
+    public void logMddQueryStats(DataDownloadFileGroupStats fileGroupDetails) {
+        MddLogData.Builder logData = MddLogData.newBuilder();
+
+        sampleAndSendLogEvent(
+                MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, logData,
+                flags.mddDefaultSampleInterval());
+    }
+
+    @Override
+    public void logMddDownloadLatency(
+            DataDownloadFileGroupStats fileGroupDetails, MddDownloadLatency downloadLatency) {
+        MddLogData.Builder logData =
+                MddLogData.newBuilder()
+                        .setMddDownloadLatency(downloadLatency)
+                        .setDataDownloadFileGroupStats(fileGroupDetails);
+
+        sampleAndSendLogEvent(
+                MddClientEvent.Code.DATA_DOWNLOAD_LATENCY_LOG, logData,
+                flags.mddDefaultSampleInterval());
+    }
+
+    @Override
+    public void logMddDownloadResult(
+            MddDownloadResult.Code code, DataDownloadFileGroupStats fileGroupDetails) {
+        MddLogData.Builder logData =
+                MddLogData.newBuilder()
+                        .setMddDownloadResultLog(
+                                MddDownloadResultLog.newBuilder()
+                                        .setResult(code)
+                                        .setDataDownloadFileGroupStats(fileGroupDetails));
+
+        sampleAndSendLogEvent(
+                MddClientEvent.Code.DATA_DOWNLOAD_RESULT_LOG, logData,
+                flags.mddDefaultSampleInterval());
+    }
+
+    @Override
+    public void logMddAndroidSharingLog(Void event) {
+        // TODO(b/144684763): consider moving this to stable sampling depending on frequency of
+        //  events.
+        long sampleInterval = flags.mddAndroidSharingSampleInterval();
+        if (!LogUtil.shouldSampleInterval(sampleInterval)) {
+            return;
+        }
+        MddLogData.Builder logData = MddLogData.newBuilder();
+        processAndSendEventWithoutStableSampling(
+                MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, logData, sampleInterval);
+    }
+
+    @Override
+    public void logMddUsageEvent(DataDownloadFileGroupStats fileGroupDetails, Void usageEventLog) {
+        MddLogData.Builder logData =
+                MddLogData.newBuilder().setDataDownloadFileGroupStats(fileGroupDetails);
+
+        sampleAndSendLogEvent(
+                MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, logData,
+                flags.mddDefaultSampleInterval());
+    }
+
+    @Override
+    public void logNewConfigReceived(
+            DataDownloadFileGroupStats fileGroupDetails, Void newConfigReceivedInfo) {
+        MddLogData.Builder logData =
+                MddLogData.newBuilder().setDataDownloadFileGroupStats(fileGroupDetails);
+        sampleAndSendLogEvent(
+                MddClientEvent.Code.EVENT_CODE_UNSPECIFIED, logData,
+                flags.mddDefaultSampleInterval());
+    }
+
+    /**
+     * Determines whether the log event will be a part of the sample, and if so calls {@code
+     * buildStats} to construct the log event. This is like {@link sampleAndSendLogEvent} but
+     * constructs the log event lazy. This is useful if constructing the log event is expensive.
+     */
+    private ListenableFuture<Void> lazySampleAndSendLogEvent(
+            MddClientEvent.Code eventCode,
+            AsyncCallable<List<MddLogData>> buildStats,
+            int sampleInterval) {
+        return PropagatedFutures.transformAsync(
+                logSampler.shouldLog(sampleInterval, loggingStateStore),
+                samplingInfoOptional -> {
+                    if (!samplingInfoOptional.isPresent()) {
+                        return immediateVoidFuture();
                     }
-                    return null;
-                  },
-                  directExecutor());
-        },
-        directExecutor());
-  }
 
-  private void sampleAndSendLogEvent(
-      MddClientEvent.Code eventCode, MddLogData.Builder logData, long sampleInterval) {
-    // NOTE: When using a single-threaded executor, logging may be delayed since other
-    // work will come before the log sampler check.
-    PropagatedFutures.addCallback(
-        logSampler.shouldLog(sampleInterval, loggingStateStore),
-        new FutureCallback<Optional<StableSamplingInfo>>() {
-          @Override
-          public void onSuccess(Optional<StableSamplingInfo> stableSamplingInfo) {
-            if (stableSamplingInfo.isPresent()) {
-              processAndSendEvent(eventCode, logData, sampleInterval, stableSamplingInfo.get());
-            }
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-            LogUtil.e(t, "%s: failure when sampling log!", TAG);
-          }
-        },
-        directExecutor());
-  }
-
-  /** Adds all transforms common to all logs and sends the event to Logger. */
-  private void processAndSendEventWithoutStableSampling(
-      MddClientEvent.Code eventCode, MddLogData.Builder logData, long sampleInterval) {
-    processAndSendEvent(
-        eventCode,
-        logData,
-        sampleInterval,
-        StableSamplingInfo.newBuilder().setStableSamplingUsed(false).build());
-  }
-
-  /** Adds all transforms common to all logs and sends the event to Logger. */
-  private void processAndSendEvent(
-      MddClientEvent.Code eventCode,
-      MddLogData.Builder logData,
-      long sampleInterval,
-      StableSamplingInfo stableSamplingInfo) {
-    if (eventCode.equals(MddClientEvent.Code.EVENT_CODE_UNSPECIFIED)) {
-      LogUtil.e("%s: unspecified code used, skipping event log", TAG);
-      // return early for unspecified codes.
-      return;
+                    return PropagatedFluentFuture.from(buildStats.call())
+                            .transform(
+                                    icingLogDataList -> {
+                                        if (icingLogDataList != null) {
+                                            for (MddLogData icingLogData : icingLogDataList) {
+                                                processAndSendEvent(
+                                                        eventCode,
+                                                        icingLogData.toBuilder(),
+                                                        sampleInterval,
+                                                        samplingInfoOptional.get());
+                                            }
+                                        }
+                                        return null;
+                                    },
+                                    directExecutor());
+                },
+                directExecutor());
     }
-    logData
-        .setSamplingInterval(sampleInterval)
-        .setDeviceInfo(MddDeviceInfo.newBuilder().setDeviceStorageLow(isDeviceStorageLow(context)))
-        .setAndroidClientInfo(
-            AndroidClientInfo.newBuilder()
-                .setHostPackageName(hostPackageName)
-                .setModuleVersion(moduleVersion))
-        .setStableSamplingInfo(stableSamplingInfo);
-    logger.log(logData.build(), eventCode.getNumber());
-  }
 
-  /** Returns whether the device is in low storage state. */
-  private static boolean isDeviceStorageLow(Context context) {
-    // Check if the system says storage is low, by reading the sticky intent.
-    return context.registerReceiver(null, new IntentFilter(Intent.ACTION_DEVICE_STORAGE_LOW))
-        != null;
-  }
+    private void sampleAndSendLogEvent(
+            MddClientEvent.Code eventCode, MddLogData.Builder logData, long sampleInterval) {
+        // NOTE: When using a single-threaded executor, logging may be delayed since other
+        // work will come before the log sampler check.
+        PropagatedFutures.addCallback(
+                logSampler.shouldLog(sampleInterval, loggingStateStore),
+                new FutureCallback<Optional<StableSamplingInfo>>() {
+                    @Override
+                    public void onSuccess(Optional<StableSamplingInfo> stableSamplingInfo) {
+                        if (stableSamplingInfo.isPresent()) {
+                            processAndSendEvent(eventCode, logData, sampleInterval,
+                                    stableSamplingInfo.get());
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        LogUtil.e(t, "%s: failure when sampling log!", TAG);
+                    }
+                },
+                directExecutor());
+    }
+
+    /** Adds all transforms common to all logs and sends the event to Logger. */
+    private void processAndSendEventWithoutStableSampling(
+            MddClientEvent.Code eventCode, MddLogData.Builder logData, long sampleInterval) {
+        processAndSendEvent(
+                eventCode,
+                logData,
+                sampleInterval,
+                StableSamplingInfo.newBuilder().setStableSamplingUsed(false).build());
+    }
+
+    /** Adds all transforms common to all logs and sends the event to Logger. */
+    private void processAndSendEvent(
+            MddClientEvent.Code eventCode,
+            MddLogData.Builder logData,
+            long sampleInterval,
+            StableSamplingInfo stableSamplingInfo) {
+        if (eventCode.equals(MddClientEvent.Code.EVENT_CODE_UNSPECIFIED)) {
+            LogUtil.e("%s: unspecified code used, skipping event log", TAG);
+            // return early for unspecified codes.
+            return;
+        }
+
+        logData
+                .setSamplingInterval(sampleInterval)
+                .setDeviceInfo(
+                        MddDeviceInfo.newBuilder().setDeviceStorageLow(isDeviceStorageLow(context)))
+                .setAndroidClientInfo(
+                        AndroidClientInfo.newBuilder()
+                                .setHostPackageName(hostPackageName)
+                                .setModuleVersion(moduleVersion))
+                .setStableSamplingInfo(stableSamplingInfo);
+        logger.log(logData.build(), eventCode.getNumber());
+    }
+
+    /** Returns whether the device is in low storage state. */
+    private static boolean isDeviceStorageLow(Context context) {
+        // Check if the system says storage is low, by reading the sticky intent.
+        return context.registerReceiver(null, new IntentFilter(Intent.ACTION_DEVICE_STORAGE_LOW))
+                != null;
+    }
 }
diff --git a/java/com/google/android/libraries/mobiledatadownload/internal/logging/NetworkLogger.java b/java/com/google/android/libraries/mobiledatadownload/internal/logging/NetworkLogger.java
index 9f2ac1f..3109bf2 100644
--- a/java/com/google/android/libraries/mobiledatadownload/internal/logging/NetworkLogger.java
+++ b/java/com/google/android/libraries/mobiledatadownload/internal/logging/NetworkLogger.java
@@ -19,14 +19,19 @@
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 
 import android.content.Context;
+
 import com.google.android.libraries.mobiledatadownload.Flags;
 import com.google.android.libraries.mobiledatadownload.annotations.InstanceId;
 import com.google.android.libraries.mobiledatadownload.internal.ApplicationContext;
 import com.google.android.libraries.mobiledatadownload.tracing.PropagatedFutures;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.mobiledatadownload.LogProto.DataDownloadFileGroupStats;
+import com.google.mobiledatadownload.LogProto.MddNetworkStats;
 import com.google.mobiledatadownload.internal.MetadataProto.FileGroupLoggingState;
+
 import java.util.List;
+
 import javax.inject.Inject;
 
 /**
@@ -67,11 +72,32 @@
                 allDataUsageFuture, this::buildNetworkStats, directExecutor()));
   }
 
-  private Void buildNetworkStats(List<FileGroupLoggingState> allDataUsage) {
+  private MddNetworkStats buildNetworkStats(List<FileGroupLoggingState> allDataUsage) {
     long totalMddWifiCount = 0;
     long totalMddCellularCount = 0;
-    Void networkStatsBuilder = null;
+    MddNetworkStats.Builder networkStatsBuilder = MddNetworkStats.newBuilder();
 
-    return networkStatsBuilder;
+    for (FileGroupLoggingState fileGroupLoggingState : allDataUsage) {
+      networkStatsBuilder.addGroupStats(
+          MddNetworkStats.GroupStats.newBuilder()
+              .setDataDownloadFileGroupStats(
+                  DataDownloadFileGroupStats.newBuilder()
+                      .setOwnerPackage(fileGroupLoggingState.getGroupKey().getOwnerPackage())
+                      .setFileGroupName(fileGroupLoggingState.getGroupKey().getGroupName())
+                      .setFileGroupVersionNumber(fileGroupLoggingState.getFileGroupVersionNumber())
+                      .setBuildId(fileGroupLoggingState.getBuildId())
+                      .setVariantId(fileGroupLoggingState.getVariantId())
+                      .build())
+              .setTotalWifiBytes(fileGroupLoggingState.getWifiUsage())
+              .setTotalCellularBytes(fileGroupLoggingState.getCellularUsage()));
+
+      totalMddWifiCount += fileGroupLoggingState.getWifiUsage();
+      totalMddCellularCount += fileGroupLoggingState.getCellularUsage();
+    }
+
+    networkStatsBuilder
+        .setTotalMddWifiBytes(totalMddWifiCount)
+        .setTotalMddCellularBytes(totalMddCellularCount);
+    return networkStatsBuilder.build();
   }
 }
diff --git a/java/com/google/android/libraries/mobiledatadownload/internal/logging/NoOpEventLogger.java b/java/com/google/android/libraries/mobiledatadownload/internal/logging/NoOpEventLogger.java
index 2f043f5..e28227a 100644
--- a/java/com/google/android/libraries/mobiledatadownload/internal/logging/NoOpEventLogger.java
+++ b/java/com/google/android/libraries/mobiledatadownload/internal/logging/NoOpEventLogger.java
@@ -22,79 +22,97 @@
 import com.google.mobiledatadownload.LogEnumsProto.MddClientEvent;
 import com.google.mobiledatadownload.LogEnumsProto.MddDownloadResult;
 import com.google.mobiledatadownload.LogProto.DataDownloadFileGroupStats;
+import com.google.mobiledatadownload.LogProto.MddDownloadLatency;
+import com.google.mobiledatadownload.LogProto.MddLibApiResultLog;
+import com.google.mobiledatadownload.LogProto.MddNetworkStats;
 import com.google.mobiledatadownload.LogProto.MddStorageStats;
+
 import java.util.List;
 
 /** No-Op EventLogger implementation. */
 public final class NoOpEventLogger implements EventLogger {
 
-  @Override
-  public void logEventSampled(MddClientEvent.Code eventCode) {}
+    @Override
+    public void logEventSampled(MddClientEvent.Code eventCode) {
+    }
 
-  @Override
-  public void logEventSampled(
-      MddClientEvent.Code eventCode,
-      String fileGroupName,
-      int fileGroupVersionNumber,
-      long buildId,
-      String variantId) {}
+    @Override
+    public void logEventSampled(
+            MddClientEvent.Code eventCode,
+            String fileGroupName,
+            int fileGroupVersionNumber,
+            long buildId,
+            String variantId) {
+    }
 
-  @Override
-  public void logEventAfterSample(MddClientEvent.Code eventCode, int sampleInterval) {}
+    @Override
+    public void logEventAfterSample(MddClientEvent.Code eventCode, int sampleInterval) {
+    }
 
-  @Override
-  public ListenableFuture<Void> logMddFileGroupStats(
-      AsyncCallable<List<EventLogger.FileGroupStatusWithDetails>> buildFileGroupStats) {
-    return immediateVoidFuture();
-  }
+    @Override
+    public ListenableFuture<Void> logMddFileGroupStats(
+            AsyncCallable<List<EventLogger.FileGroupStatusWithDetails>> buildFileGroupStats) {
+        return immediateVoidFuture();
+    }
 
-  @Override
-  public void logMddApiCallStats(DataDownloadFileGroupStats fileGroupDetails, Void apiCallStats) {}
+    @Override
+    public void logMddApiCallStats(DataDownloadFileGroupStats fileGroupDetails, Void apiCallStats) {
+    }
 
-  @Override
-  public void logMddLibApiResultLog(Void mddLibApiResultLog) {}
+    @Override
+    public void logMddLibApiResultLog(MddLibApiResultLog mddLibApiResultLog) {
+    }
 
-  @Override
-  public ListenableFuture<Void> logMddStorageStats(
-      AsyncCallable<MddStorageStats> buildMddStorageStats) {
-    return immediateVoidFuture();
-  }
+    @Override
+    public ListenableFuture<Void> logMddStorageStats(
+            AsyncCallable<MddStorageStats> buildMddStorageStats) {
+        return immediateVoidFuture();
+    }
 
-  @Override
-  public ListenableFuture<Void> logMddNetworkStats(AsyncCallable<Void> buildMddNetworkStats) {
-    return immediateVoidFuture();
-  }
+    @Override
+    public ListenableFuture<Void> logMddNetworkStats(
+            AsyncCallable<MddNetworkStats> buildMddNetworkStats) {
+        return immediateVoidFuture();
+    }
 
-  @Override
-  public void logMddDataDownloadFileExpirationEvent(int eventCode, int count) {}
+    @Override
+    public void logMddDataDownloadFileExpirationEvent(int eventCode, int count) {
+    }
 
-  @Override
-  public void logMddNetworkSavings(
-      DataDownloadFileGroupStats fileGroupDetails,
-      int code,
-      long fullFileSize,
-      long downloadedFileSize,
-      String fileId,
-      int deltaIndex) {}
+    @Override
+    public void logMddNetworkSavings(
+            DataDownloadFileGroupStats fileGroupDetails,
+            int code,
+            long fullFileSize,
+            long downloadedFileSize,
+            String fileId,
+            int deltaIndex) {
+    }
 
-  @Override
-  public void logMddDownloadResult(
-      MddDownloadResult.Code code, DataDownloadFileGroupStats fileGroupDetails) {}
+    @Override
+    public void logMddDownloadResult(
+            MddDownloadResult.Code code, DataDownloadFileGroupStats fileGroupDetails) {
+    }
 
-  @Override
-  public void logMddQueryStats(DataDownloadFileGroupStats fileGroupDetails) {}
+    @Override
+    public void logMddQueryStats(DataDownloadFileGroupStats fileGroupDetails) {
+    }
 
-  @Override
-  public void logMddAndroidSharingLog(Void event) {}
+    @Override
+    public void logMddAndroidSharingLog(Void event) {
+    }
 
-  @Override
-  public void logMddDownloadLatency(
-      DataDownloadFileGroupStats fileGroupStats, Void downloadLatency) {}
+    @Override
+    public void logMddDownloadLatency(
+            DataDownloadFileGroupStats fileGroupStats, MddDownloadLatency downloadLatency) {
+    }
 
-  @Override
-  public void logMddUsageEvent(DataDownloadFileGroupStats fileGroupDetails, Void usageEventLog) {}
+    @Override
+    public void logMddUsageEvent(DataDownloadFileGroupStats fileGroupDetails, Void usageEventLog) {
+    }
 
-  @Override
-  public void logNewConfigReceived(
-      DataDownloadFileGroupStats fileGroupDetails, Void newConfigReceivedInfo) {}
+    @Override
+    public void logNewConfigReceived(
+            DataDownloadFileGroupStats fileGroupDetails, Void newConfigReceivedInfo) {
+    }
 }
diff --git a/java/com/google/android/libraries/mobiledatadownload/internal/logging/SharedPreferencesLoggingState.java b/java/com/google/android/libraries/mobiledatadownload/internal/logging/SharedPreferencesLoggingState.java
index 7fd90ef..c41a3af 100644
--- a/java/com/google/android/libraries/mobiledatadownload/internal/logging/SharedPreferencesLoggingState.java
+++ b/java/com/google/android/libraries/mobiledatadownload/internal/logging/SharedPreferencesLoggingState.java
@@ -22,8 +22,6 @@
 import android.content.Context;
 import android.content.SharedPreferences;
 
-import androidx.annotation.VisibleForTesting;
-
 import com.google.android.libraries.mobiledatadownload.TimeSource;
 import com.google.android.libraries.mobiledatadownload.internal.util.FileGroupsMetadataUtil;
 import com.google.android.libraries.mobiledatadownload.internal.util.FileGroupsMetadataUtil.GroupKeyDeserializationException;
@@ -58,8 +56,7 @@
 
     private static final String LAST_MAINTENANCE_RUN_SECS_KEY = "last_maintenance_secs";
 
-    @VisibleForTesting
-    static final String SALT_KEY = "stable_log_sampling_salt";
+    private static final String SALT_KEY = "stable_log_sampling_salt";
     private static final String SALT_TIMESTAMP_MILLIS_KEY =
             "log_sampling_salt_set_timestamp_millis";
 
diff --git a/java/com/google/android/libraries/mobiledatadownload/internal/logging/testing/FakeEventLogger.java b/java/com/google/android/libraries/mobiledatadownload/internal/logging/testing/FakeEventLogger.java
index ea5134c..6fafad7 100644
--- a/java/com/google/android/libraries/mobiledatadownload/internal/logging/testing/FakeEventLogger.java
+++ b/java/com/google/android/libraries/mobiledatadownload/internal/logging/testing/FakeEventLogger.java
@@ -18,152 +18,160 @@
 import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
 
 import com.google.android.libraries.mobiledatadownload.internal.logging.EventLogger;
-import com.google.android.libraries.mobiledatadownload.internal.logging.EventLogger.FileGroupStatusWithDetails;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.util.concurrent.AsyncCallable;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.mobiledatadownload.LogEnumsProto.MddClientEvent;
 import com.google.mobiledatadownload.LogEnumsProto.MddDownloadResult;
 import com.google.mobiledatadownload.LogProto.DataDownloadFileGroupStats;
+import com.google.mobiledatadownload.LogProto.MddDownloadLatency;
+import com.google.mobiledatadownload.LogProto.MddLibApiResultLog;
+import com.google.mobiledatadownload.LogProto.MddNetworkStats;
 import com.google.mobiledatadownload.LogProto.MddStorageStats;
+
 import java.util.ArrayList;
 import java.util.List;
 
 /** Fake implementation of {@link EventLogger} for use in tests. */
 public final class FakeEventLogger implements EventLogger {
 
-  private final ArrayList<MddClientEvent.Code> loggedCodes = new ArrayList<>();
-  private final ArrayListMultimap<DataDownloadFileGroupStats, Void> loggedLatencies =
-      ArrayListMultimap.create();
-  private final ArrayListMultimap<DataDownloadFileGroupStats, Void> loggedNewConfigReceived =
-      ArrayListMultimap.create();
-  private final List<Void> loggedMddLibApiResultLog = new ArrayList<>();
-  private final ArrayList<DataDownloadFileGroupStats> loggedMddQueryStats = new ArrayList<>();
+    private final ArrayList<MddClientEvent.Code> loggedCodes = new ArrayList<>();
+    private final ArrayListMultimap<DataDownloadFileGroupStats, MddDownloadLatency>
+            loggedLatencies =
+            ArrayListMultimap.create();
+    private final ArrayListMultimap<DataDownloadFileGroupStats, Void> loggedNewConfigReceived =
+            ArrayListMultimap.create();
+    private final List<MddLibApiResultLog> loggedMddLibApiResultLog = new ArrayList<>();
+    private final ArrayList<DataDownloadFileGroupStats> loggedMddQueryStats = new ArrayList<>();
 
-  @Override
-  public void logEventSampled(MddClientEvent.Code eventCode) {
-    loggedCodes.add(eventCode);
-  }
+    @Override
+    public void logEventSampled(MddClientEvent.Code eventCode) {
+        loggedCodes.add(eventCode);
+    }
 
-  @Override
-  public void logEventSampled(
-      MddClientEvent.Code eventCode,
-      String fileGroupName,
-      int fileGroupVersionNumber,
-      long buildId,
-      String variantId) {
-    loggedCodes.add(eventCode);
-  }
+    @Override
+    public void logEventSampled(
+            MddClientEvent.Code eventCode,
+            String fileGroupName,
+            int fileGroupVersionNumber,
+            long buildId,
+            String variantId) {
+        loggedCodes.add(eventCode);
+    }
 
-  @Override
-  public void logEventAfterSample(MddClientEvent.Code eventCode, int sampleInterval) {
-    loggedCodes.add(eventCode);
-  }
+    @Override
+    public void logEventAfterSample(MddClientEvent.Code eventCode, int sampleInterval) {
+        loggedCodes.add(eventCode);
+    }
 
-  @Override
-  public ListenableFuture<Void> logMddFileGroupStats(
-      AsyncCallable<List<FileGroupStatusWithDetails>> buildFileGroupStats) {
-    return immediateFailedFuture(
-        new UnsupportedOperationException("This method is not implemented in the fake yet."));
-  }
+    @Override
+    public ListenableFuture<Void> logMddFileGroupStats(
+            AsyncCallable<List<FileGroupStatusWithDetails>> buildFileGroupStats) {
+        return immediateFailedFuture(
+                new UnsupportedOperationException(
+                        "This method is not implemented in the fake yet."));
+    }
 
-  @Override
-  public void logMddApiCallStats(DataDownloadFileGroupStats fileGroupDetails, Void apiCallStats) {
-    throw new UnsupportedOperationException("This method is not implemented in the fake yet.");
-  }
+    @Override
+    public void logMddApiCallStats(DataDownloadFileGroupStats fileGroupDetails, Void apiCallStats) {
+        throw new UnsupportedOperationException("This method is not implemented in the fake yet.");
+    }
 
-  @Override
-  public void logMddLibApiResultLog(Void mddLibApiResultLog) {
-    loggedMddLibApiResultLog.add(mddLibApiResultLog);
-  }
+    @Override
+    public void logMddLibApiResultLog(MddLibApiResultLog mddLibApiResultLog) {
+        loggedMddLibApiResultLog.add(mddLibApiResultLog);
+    }
 
-  public List<Void> getLoggedMddLibApiResultLogs() {
-    return loggedMddLibApiResultLog;
-  }
+    public List<MddLibApiResultLog> getLoggedMddLibApiResultLogs() {
+        return loggedMddLibApiResultLog;
+    }
 
-  @Override
-  public ListenableFuture<Void> logMddStorageStats(
-      AsyncCallable<MddStorageStats> buildMddStorageStats) {
-    return immediateFailedFuture(
-        new UnsupportedOperationException("This method is not implemented in the fake yet."));
-  }
+    @Override
+    public ListenableFuture<Void> logMddStorageStats(
+            AsyncCallable<MddStorageStats> buildMddStorageStats) {
+        return immediateFailedFuture(
+                new UnsupportedOperationException(
+                        "This method is not implemented in the fake yet."));
+    }
 
-  @Override
-  public ListenableFuture<Void> logMddNetworkStats(AsyncCallable<Void> buildMddNetworkStats) {
-    return immediateFailedFuture(
-        new UnsupportedOperationException("This method is not implemented in the fake yet."));
-  }
+    @Override
+    public ListenableFuture<Void> logMddNetworkStats(
+            AsyncCallable<MddNetworkStats> buildMddNetworkStats) {
+        return immediateFailedFuture(
+                new UnsupportedOperationException(
+                        "This method is not implemented in the fake yet."));
+    }
 
-  @Override
-  public void logMddDataDownloadFileExpirationEvent(int eventCode, int count) {
-    throw new UnsupportedOperationException("This method is not implemented in the fake yet.");
-  }
+    @Override
+    public void logMddDataDownloadFileExpirationEvent(int eventCode, int count) {
+        throw new UnsupportedOperationException("This method is not implemented in the fake yet.");
+    }
 
-  @Override
-  public void logMddNetworkSavings(
-      DataDownloadFileGroupStats fileGroupDetails,
-      int code,
-      long fullFileSize,
-      long downloadedFileSize,
-      String fileId,
-      int deltaIndex) {
-    throw new UnsupportedOperationException("This method is not implemented in the fake yet.");
-  }
+    @Override
+    public void logMddNetworkSavings(
+            DataDownloadFileGroupStats fileGroupDetails,
+            int code,
+            long fullFileSize,
+            long downloadedFileSize,
+            String fileId,
+            int deltaIndex) {
+        throw new UnsupportedOperationException("This method is not implemented in the fake yet.");
+    }
 
-  @Override
-  public void logMddDownloadResult(
-      MddDownloadResult.Code code, DataDownloadFileGroupStats fileGroupDetails) {
-    throw new UnsupportedOperationException("This method is not implemented in the fake yet.");
-  }
+    @Override
+    public void logMddDownloadResult(
+            MddDownloadResult.Code code, DataDownloadFileGroupStats fileGroupDetails) {
+        throw new UnsupportedOperationException("This method is not implemented in the fake yet.");
+    }
 
-  @Override
-  public void logMddQueryStats(DataDownloadFileGroupStats fileGroupDetails) {
-    loggedMddQueryStats.add(fileGroupDetails);
-  }
+    @Override
+    public void logMddQueryStats(DataDownloadFileGroupStats fileGroupDetails) {
+        loggedMddQueryStats.add(fileGroupDetails);
+    }
 
-  @Override
-  public void logMddAndroidSharingLog(Void event) {
-    throw new UnsupportedOperationException("This method is not implemented in the fake yet.");
-  }
+    @Override
+    public void logMddAndroidSharingLog(Void event) {
+        throw new UnsupportedOperationException("This method is not implemented in the fake yet.");
+    }
 
-  @Override
-  public void logMddDownloadLatency(
-      DataDownloadFileGroupStats fileGroupStats, Void downloadLatency) {
-    loggedLatencies.put(fileGroupStats, downloadLatency);
-  }
+    @Override
+    public void logMddDownloadLatency(
+            DataDownloadFileGroupStats fileGroupStats, MddDownloadLatency downloadLatency) {
+        loggedLatencies.put(fileGroupStats, downloadLatency);
+    }
 
-  @Override
-  public void logMddUsageEvent(DataDownloadFileGroupStats fileGroupDetails, Void usageEventLog) {
-    throw new UnsupportedOperationException("This method is not implemented in the fake yet.");
-  }
+    @Override
+    public void logMddUsageEvent(DataDownloadFileGroupStats fileGroupDetails, Void usageEventLog) {
+        throw new UnsupportedOperationException("This method is not implemented in the fake yet.");
+    }
 
-  @Override
-  public void logNewConfigReceived(
-      DataDownloadFileGroupStats fileGroupDetails, Void newConfigReceivedInfo) {
-    loggedNewConfigReceived.put(fileGroupDetails, newConfigReceivedInfo);
-  }
+    @Override
+    public void logNewConfigReceived(
+            DataDownloadFileGroupStats fileGroupDetails, Void newConfigReceivedInfo) {
+        loggedNewConfigReceived.put(fileGroupDetails, newConfigReceivedInfo);
+    }
 
-  public void reset() {
-    loggedCodes.clear();
-    loggedLatencies.clear();
-    loggedMddQueryStats.clear();
-    loggedNewConfigReceived.clear();
-    loggedMddLibApiResultLog.clear();
-  }
+    public void reset() {
+        loggedCodes.clear();
+        loggedLatencies.clear();
+        loggedMddQueryStats.clear();
+        loggedNewConfigReceived.clear();
+        loggedMddLibApiResultLog.clear();
+    }
 
-  public ArrayListMultimap<DataDownloadFileGroupStats, Void> getLoggedNewConfigReceived() {
-    return loggedNewConfigReceived;
-  }
+    public ArrayListMultimap<DataDownloadFileGroupStats, Void> getLoggedNewConfigReceived() {
+        return loggedNewConfigReceived;
+    }
 
-  public List<MddClientEvent.Code> getLoggedCodes() {
-    return loggedCodes;
-  }
+    public List<MddClientEvent.Code> getLoggedCodes() {
+        return loggedCodes;
+    }
 
-  public ArrayListMultimap<DataDownloadFileGroupStats, Void> getLoggedLatencies() {
-    return loggedLatencies;
-  }
+    public ArrayListMultimap<DataDownloadFileGroupStats, MddDownloadLatency> getLoggedLatencies() {
+        return loggedLatencies;
+    }
 
-  public ArrayList<DataDownloadFileGroupStats> getLoggedMddQueryStats() {
-    return loggedMddQueryStats;
-  }
+    public ArrayList<DataDownloadFileGroupStats> getLoggedMddQueryStats() {
+        return loggedMddQueryStats;
+    }
 }
diff --git a/java/com/google/android/libraries/mobiledatadownload/internal/proto/BUILD b/java/com/google/android/libraries/mobiledatadownload/internal/proto/BUILD
index 6be1b57..28d6216 100644
--- a/java/com/google/android/libraries/mobiledatadownload/internal/proto/BUILD
+++ b/java/com/google/android/libraries/mobiledatadownload/internal/proto/BUILD
@@ -13,7 +13,9 @@
 # limitations under the License.
 package(
     default_applicable_licenses = ["//:license"],
-    default_visibility = ["//:__subpackages__"],
+    default_visibility = [
+        "//visibility:public",
+    ],
     licenses = ["notice"],
 )
 
diff --git a/java/com/google/android/libraries/mobiledatadownload/internal/proto/metadata.proto b/java/com/google/android/libraries/mobiledatadownload/internal/proto/metadata.proto
index 406133c..9fabf90 100644
--- a/java/com/google/android/libraries/mobiledatadownload/internal/proto/metadata.proto
+++ b/java/com/google/android/libraries/mobiledatadownload/internal/proto/metadata.proto
@@ -573,21 +573,21 @@
 }
 
 // Metadata used by
-// com.google.android.libraries.mdi.download.MobileDataDownloadManager
+// com.google.android.libraries.mobiledatadownload.MobileDataDownloadManager
 message MobileDataDownloadManagerMetadata {
   optional bool mdd_migrated_to_offroad = 1;
   optional int32 reset_trigger = 2;
 }
 
 // Metadata used by
-// com.google.android.libraries.mdi.download.SharedFileManager
+// com.google.android.libraries.mobiledatadownload.SharedFileManager
 message SharedFileManagerMetadata {
   optional bool migrated_to_new_file_key = 1;
   optional int64 next_file_name = 2;
 }
 
 // Collects all data used by
-// com.google.android.libraries.mdi.download.internal.Migrations
+// com.google.android.libraries.mobiledatadownload.internal.Migrations
 message MigrationsStore {
   enum FileKeyVersion {
     NEW_FILE_KEY = 0;
@@ -599,7 +599,7 @@
 }
 
 // Collects all data used by
-// com.google.android.libraries.mdi.download.internal.FileGroupsMetadata
+// com.google.android.libraries.mobiledatadownload.internal.FileGroupsMetadata
 message FileGroupsMetadataStore {
   // Key must be a serialized GroupKey.
   map<string, DataFileGroupInternal> data_file_groups = 1;
@@ -609,7 +609,7 @@
 }
 
 // Collects all data used by
-// com.google.android.libraries.mdi.download.internal.SharedFilesMetadata
+// com.google.android.libraries.mobiledatadownload.internal.SharedFilesMetadata
 message SharedFilesMetadataStore {
   // The key must be a serialized NewFileKey.
   map<string, SharedFile> shared_files = 1;
diff --git a/java/com/google/android/libraries/mobiledatadownload/internal/util/FileGroupsMetadataUtil.java b/java/com/google/android/libraries/mobiledatadownload/internal/util/FileGroupsMetadataUtil.java
index 2948df6..f64ac06 100644
--- a/java/com/google/android/libraries/mobiledatadownload/internal/util/FileGroupsMetadataUtil.java
+++ b/java/com/google/android/libraries/mobiledatadownload/internal/util/FileGroupsMetadataUtil.java
@@ -20,9 +20,9 @@
 import com.google.android.libraries.mobiledatadownload.internal.logging.LogUtil;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
-import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFileGroupInternal;
 import com.google.mobiledatadownload.internal.MetadataProto.GroupKey;
+import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
diff --git a/java/com/google/android/libraries/mobiledatadownload/lite/BUILD b/java/com/google/android/libraries/mobiledatadownload/lite/BUILD
index 4f8c1f5..9d978ac 100644
--- a/java/com/google/android/libraries/mobiledatadownload/lite/BUILD
+++ b/java/com/google/android/libraries/mobiledatadownload/lite/BUILD
@@ -44,6 +44,7 @@
         "//java/com/google/android/libraries/mobiledatadownload/internal/util:DownloadFutureMap",
         "//java/com/google/android/libraries/mobiledatadownload/tracing",
         "//java/com/google/android/libraries/mobiledatadownload/tracing:concurrent",
+        "@androidx_annotation_annotation",
         "@androidx_core_core",
         "@com_google_auto_value",
         "@com_google_errorprone_error_prone_annotations",
diff --git a/java/com/google/android/libraries/mobiledatadownload/lite/DownloaderImpl.java b/java/com/google/android/libraries/mobiledatadownload/lite/DownloaderImpl.java
index 8472667..75bafd3 100644
--- a/java/com/google/android/libraries/mobiledatadownload/lite/DownloaderImpl.java
+++ b/java/com/google/android/libraries/mobiledatadownload/lite/DownloaderImpl.java
@@ -438,7 +438,7 @@
                     notificationManager.notify(notificationKey, notification.build());
                   } else {
                     NotificationUtil.cancelNotificationForKey(
-                        context, downloadRequest.destinationFileUri().toString());
+                        context, foregroundDownloadKey.toString());
                   }
 
                   return immediateVoidFuture();
diff --git a/java/com/google/android/libraries/mobiledatadownload/populator/proto/BUILD b/java/com/google/android/libraries/mobiledatadownload/populator/proto/BUILD
index 637afee..240b864 100644
--- a/java/com/google/android/libraries/mobiledatadownload/populator/proto/BUILD
+++ b/java/com/google/android/libraries/mobiledatadownload/populator/proto/BUILD
@@ -14,7 +14,7 @@
 package(
     default_applicable_licenses = ["//:license"],
     default_visibility = [
-        "//:__subpackages__",
+        "//visibility:public",
     ],
     licenses = ["notice"],
 )
@@ -23,7 +23,6 @@
     name = "metadata_proto",
     srcs = ["metadata.proto"],
     cc_api_version = 2,
-    deps = [],
     alwayslink = 1,
 )
 
diff --git a/javatests/com/google/android/libraries/mobiledatadownload/file/backends/BlobStoreBackendTest.java b/javatests/com/google/android/libraries/mobiledatadownload/file/backends/BlobStoreBackendTest.java
index c172205..7de85c2 100644
--- a/javatests/com/google/android/libraries/mobiledatadownload/file/backends/BlobStoreBackendTest.java
+++ b/javatests/com/google/android/libraries/mobiledatadownload/file/backends/BlobStoreBackendTest.java
@@ -24,10 +24,10 @@
 import android.content.Context;
 import android.net.Uri;
 import android.os.ParcelFileDescriptor;
-import android.support.test.uiautomator.UiDevice;
 import android.util.Log;
 import android.util.Pair;
 import androidx.test.core.app.ApplicationProvider;
+import androidx.test.uiautomator.UiDevice;
 import com.google.android.libraries.mobiledatadownload.file.common.LimitExceededException;
 import com.google.common.io.BaseEncoding;
 import com.google.common.io.ByteStreams;
diff --git a/javatests/com/google/android/libraries/mobiledatadownload/internal/MddTestUtil.java b/javatests/com/google/android/libraries/mobiledatadownload/internal/MddTestUtil.java
index e3976c7..433a7aa 100644
--- a/javatests/com/google/android/libraries/mobiledatadownload/internal/MddTestUtil.java
+++ b/javatests/com/google/android/libraries/mobiledatadownload/internal/MddTestUtil.java
@@ -23,8 +23,8 @@
 
 import android.content.Context;
 import android.os.Build.VERSION;
-import android.support.test.uiautomator.UiDevice;
 import android.util.Log;
+import androidx.test.uiautomator.UiDevice;
 import com.google.mobiledatadownload.internal.MetadataProto.BaseFile;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFile;
 import com.google.mobiledatadownload.internal.MetadataProto.DataFileGroupInternal;
diff --git a/javatests/com/google/android/libraries/mobiledatadownload/internal/logging/MddEventLoggerTest.java b/javatests/com/google/android/libraries/mobiledatadownload/internal/logging/MddEventLoggerTest.java
index 468f7fe..0c94bb2 100644
--- a/javatests/com/google/android/libraries/mobiledatadownload/internal/logging/MddEventLoggerTest.java
+++ b/javatests/com/google/android/libraries/mobiledatadownload/internal/logging/MddEventLoggerTest.java
@@ -16,13 +16,16 @@
 package com.google.android.libraries.mobiledatadownload.internal.logging;
 
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoInteractions;
 
 import android.content.Context;
+
 import androidx.test.core.app.ApplicationProvider;
+
 import com.google.android.libraries.mobiledatadownload.Logger;
 import com.google.android.libraries.mobiledatadownload.testing.FakeTimeSource;
 import com.google.android.libraries.mobiledatadownload.testing.MddTestDependencies;
@@ -36,8 +39,7 @@
 import com.google.mobiledatadownload.LogProto.MddDownloadResultLog;
 import com.google.mobiledatadownload.LogProto.MddLogData;
 import com.google.mobiledatadownload.LogProto.StableSamplingInfo;
-import java.security.SecureRandom;
-import java.util.Random;
+
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -47,226 +49,233 @@
 import org.mockito.junit.MockitoRule;
 import org.robolectric.RobolectricTestRunner;
 
+import java.security.SecureRandom;
+import java.util.Random;
+
 @RunWith(RobolectricTestRunner.class)
 public class MddEventLoggerTest {
 
-  @Rule public final MockitoRule mocks = MockitoJUnit.rule();
+    @Rule
+    public final MockitoRule mocks = MockitoJUnit.rule();
 
-  private static final int SOME_MODULE_VERSION = 42;
-  private static final int SAMPLING_ALWAYS = 1;
-  private static final int SAMPLING_NEVER = 0;
+    private static final int SOME_MODULE_VERSION = 42;
+    private static final int SAMPLING_ALWAYS = 1;
+    private static final int SAMPLING_NEVER = 0;
 
-  @Mock private Logger mockLogger;
-  private MddEventLogger mddEventLogger;
+    @Mock
+    private Logger mockLogger;
+    private MddEventLogger mddEventLogger;
 
-  private final Context context = ApplicationProvider.getApplicationContext();
-  private final TestFlags flags = new TestFlags();
+    private final Context context = ApplicationProvider.getApplicationContext();
+    private final TestFlags flags = new TestFlags();
 
-  @Before
-  public void setUp() throws Exception {
-    mddEventLogger =
-        new MddEventLogger(
-            context,
-            mockLogger,
-            SOME_MODULE_VERSION,
-            new LogSampler(flags, new SecureRandom()),
-            flags);
-    mddEventLogger.setLoggingStateStore(
-        MddTestDependencies.LoggingStateStoreImpl.SHARED_PREFERENCES.loggingStateStore(
-            context, Optional.absent(), new FakeTimeSource(), directExecutor(), new Random(0)));
-  }
-
-  private MddLogData.Builder newLogDataBuilderWithClientInfo() {
-    return MddLogData.newBuilder()
-        .setAndroidClientInfo(
-            AndroidClientInfo.newBuilder()
-                .setModuleVersion(SOME_MODULE_VERSION)
-                .setHostPackageName(context.getPackageName()));
-  }
-
-  @Test
-  public void testSampleInterval_zero_none() {
-    assertFalse(LogUtil.shouldSampleInterval(0));
-  }
-
-  @Test
-  public void testSampleInterval_negative_none() {
-    assertFalse(LogUtil.shouldSampleInterval(-1));
-  }
-
-  @Test
-  public void testSampleInterval_always() {
-    assertTrue(LogUtil.shouldSampleInterval(1));
-  }
-
-  @Test
-  public void testLogMddEvents_noLog() {
-    overrideDefaultSampleInterval(SAMPLING_NEVER);
-
-    mddEventLogger.logEventSampled(
-        MddClientEvent.Code.EVENT_CODE_UNSPECIFIED,
-        "fileGroup",
-        /* fileGroupVersionNumber= */ 0,
-        /* buildId= */ 0,
-        /* variantId= */ "");
-    verifyNoInteractions(mockLogger);
-  }
-
-  @Test
-  public void testLogMddEvents() {
-    overrideDefaultSampleInterval(SAMPLING_ALWAYS);
-    mddEventLogger.logEventSampled(
-        MddClientEvent.Code.EVENT_CODE_UNSPECIFIED,
-        "fileGroup",
-        /* fileGroupVersionNumber= */ 1,
-        /* buildId= */ 123,
-        /* variantId= */ "testVariant");
-
-    MddLogData expectedData =
-        newLogDataBuilderWithClientInfo()
-            .setSamplingInterval(SAMPLING_ALWAYS)
-            .setDataDownloadFileGroupStats(
-                DataDownloadFileGroupStats.newBuilder()
-                    .setFileGroupName("fileGroup")
-                    .setFileGroupVersionNumber(1)
-                    .setBuildId(123)
-                    .setVariantId("testVariant"))
-            .setDeviceInfo(MddDeviceInfo.newBuilder().setDeviceStorageLow(false))
-            .setStableSamplingInfo(getStableSamplingInfo())
-            .build();
-
-    verify(mockLogger).log(expectedData, MddClientEvent.Code.EVENT_CODE_UNSPECIFIED_VALUE);
-  }
-
-  @Test
-  public void testLogExpirationHandlerRemoveUnaccountedFilesSampled() {
-    final int unaccountedFileCount = 5;
-    overrideDefaultSampleInterval(SAMPLING_ALWAYS);
-    mddEventLogger.logMddDataDownloadFileExpirationEvent(0, unaccountedFileCount);
-
-    MddLogData expectedData =
-        newLogDataBuilderWithClientInfo()
-            .setSamplingInterval(SAMPLING_ALWAYS)
-            .setDeviceInfo(MddDeviceInfo.newBuilder().setDeviceStorageLow(false))
-            .setStableSamplingInfo(getStableSamplingInfo())
-            .build();
-
-    verify(mockLogger).log(expectedData, MddClientEvent.Code.EVENT_CODE_UNSPECIFIED_VALUE);
-  }
-
-  @Test
-  public void testLogMddNetworkSavingsSampled() {
-    overrideDefaultSampleInterval(SAMPLING_ALWAYS);
-    DataDownloadFileGroupStats icingDataDownloadFileGroupStats =
-        DataDownloadFileGroupStats.newBuilder()
-            .setFileGroupName("fileGroup")
-            .setFileGroupVersionNumber(1)
-            .build();
-    mddEventLogger.logMddNetworkSavings(
-        icingDataDownloadFileGroupStats, 0, 200L, 100L, "file-id", 1);
-    MddLogData expectedData =
-        newLogDataBuilderWithClientInfo()
-            .setSamplingInterval(SAMPLING_ALWAYS)
-            .setDeviceInfo(MddDeviceInfo.newBuilder().setDeviceStorageLow(false))
-            .setStableSamplingInfo(getStableSamplingInfo())
-            .build();
-
-    verify(mockLogger).log(expectedData, MddClientEvent.Code.EVENT_CODE_UNSPECIFIED_VALUE);
-  }
-
-  @Test
-  public void testLogMddDownloadResult() {
-    overrideDefaultSampleInterval(SAMPLING_ALWAYS);
-    DataDownloadFileGroupStats icingDataDownloadFileGroupStats =
-        DataDownloadFileGroupStats.newBuilder()
-            .setFileGroupName("fileGroup")
-            .setFileGroupVersionNumber(1)
-            .build();
-    mddEventLogger.logMddDownloadResult(
-        MddDownloadResult.Code.LOW_DISK_ERROR, icingDataDownloadFileGroupStats);
-
-    MddLogData expectedData =
-        newLogDataBuilderWithClientInfo()
-            .setSamplingInterval(SAMPLING_ALWAYS)
-            .setMddDownloadResultLog(
-                MddDownloadResultLog.newBuilder()
-                    .setResult(MddDownloadResult.Code.LOW_DISK_ERROR)
-                    .setDataDownloadFileGroupStats(icingDataDownloadFileGroupStats))
-            .setDeviceInfo(MddDeviceInfo.newBuilder().setDeviceStorageLow(false))
-            .setStableSamplingInfo(getStableSamplingInfo())
-            .build();
-
-    verify(mockLogger).log(expectedData, MddClientEvent.Code.DATA_DOWNLOAD_RESULT_LOG_VALUE);
-  }
-
-  @Test
-  public void testLogMddUsageEvent() {
-    overrideDefaultSampleInterval(SAMPLING_ALWAYS);
-
-    DataDownloadFileGroupStats icingDataDownloadFileGroupStats =
-        DataDownloadFileGroupStats.newBuilder()
-            .setFileGroupName("fileGroup")
-            .setFileGroupVersionNumber(1)
-            .setBuildId(123)
-            .setVariantId("variant-id")
-            .build();
-
-    Void usageEventLog = null;
-
-    mddEventLogger.logMddUsageEvent(icingDataDownloadFileGroupStats, usageEventLog);
-
-    MddLogData expectedData =
-        newLogDataBuilderWithClientInfo()
-            .setDataDownloadFileGroupStats(icingDataDownloadFileGroupStats)
-            .setSamplingInterval(SAMPLING_ALWAYS)
-            .setDeviceInfo(MddDeviceInfo.newBuilder().setDeviceStorageLow(false))
-            .setStableSamplingInfo(getStableSamplingInfo())
-            .build();
-
-    verify(mockLogger).log(expectedData, MddClientEvent.Code.EVENT_CODE_UNSPECIFIED_VALUE);
-  }
-
-  @Test
-  public void testlogMddLibApiResultLog() {
-    overrideApiLoggingSampleInterval(SAMPLING_ALWAYS);
-
-    DataDownloadFileGroupStats icingDataDownloadFileGroupStats =
-        DataDownloadFileGroupStats.newBuilder()
-            .setFileGroupName("fileGroup")
-            .setFileGroupVersionNumber(1)
-            .build();
-
-    Void mddLibApiResultLog = null;
-    mddEventLogger.logMddLibApiResultLog(mddLibApiResultLog);
-
-    MddLogData expectedData =
-        newLogDataBuilderWithClientInfo()
-            .setSamplingInterval(SAMPLING_ALWAYS)
-            .setDeviceInfo(MddDeviceInfo.newBuilder().setDeviceStorageLow(false))
-            .setStableSamplingInfo(getStableSamplingInfo())
-            .build();
-
-    verify(mockLogger).log(expectedData, MddClientEvent.Code.EVENT_CODE_UNSPECIFIED_VALUE);
-  }
-
-  private void overrideDefaultSampleInterval(int sampleInterval) {
-    flags.mddDefaultSampleInterval = Optional.of(sampleInterval);
-  }
-
-  private void overrideApiLoggingSampleInterval(int sampleInterval) {
-    flags.apiLoggingSampleInterval = Optional.of(sampleInterval);
-  }
-
-  private StableSamplingInfo getStableSamplingInfo() {
-    if (flags.enableRngBasedDeviceStableSampling()) {
-      return StableSamplingInfo.newBuilder()
-          .setStableSamplingUsed(true)
-          .setStableSamplingFirstEnabledTimestampMs(0)
-          .setPartOfAlwaysLoggingGroup(false)
-          .setInvalidSamplingRateUsed(false)
-          .build();
+    @Before
+    public void setUp() throws Exception {
+        mddEventLogger =
+                new MddEventLogger(
+                        context,
+                        mockLogger,
+                        SOME_MODULE_VERSION,
+                        new LogSampler(flags, new SecureRandom()),
+                        flags);
+        mddEventLogger.setLoggingStateStore(
+                MddTestDependencies.LoggingStateStoreImpl.SHARED_PREFERENCES.loggingStateStore(
+                        context, Optional.absent(), new FakeTimeSource(), directExecutor(),
+                        new Random(0)));
     }
 
-    return StableSamplingInfo.newBuilder().setStableSamplingUsed(false).build();
-  }
+    private MddLogData.Builder newLogDataBuilderWithClientInfo() {
+        return MddLogData.newBuilder()
+                .setAndroidClientInfo(
+                        AndroidClientInfo.newBuilder()
+                                .setModuleVersion(SOME_MODULE_VERSION)
+                                .setHostPackageName(context.getPackageName()));
+    }
+
+    @Test
+    public void testSampleInterval_zero_none() {
+        assertFalse(LogUtil.shouldSampleInterval(0));
+    }
+
+    @Test
+    public void testSampleInterval_negative_none() {
+        assertFalse(LogUtil.shouldSampleInterval(-1));
+    }
+
+    @Test
+    public void testSampleInterval_always() {
+        assertTrue(LogUtil.shouldSampleInterval(1));
+    }
+
+    @Test
+    public void testLogMddEvents_noLog() {
+        overrideDefaultSampleInterval(SAMPLING_NEVER);
+
+        mddEventLogger.logEventSampled(
+                MddClientEvent.Code.EVENT_CODE_UNSPECIFIED,
+                "fileGroup",
+                /* fileGroupVersionNumber= */ 0,
+                /* buildId= */ 0,
+                /* variantId= */ "");
+        verifyNoInteractions(mockLogger);
+    }
+
+    @Test
+    public void testLogMddEvents() {
+        overrideDefaultSampleInterval(SAMPLING_ALWAYS);
+        mddEventLogger.logEventSampled(
+                MddClientEvent.Code.EVENT_CODE_UNSPECIFIED,
+                "fileGroup",
+                /* fileGroupVersionNumber= */ 1,
+                /* buildId= */ 123,
+                /* variantId= */ "testVariant");
+
+        MddLogData expectedData =
+                newLogDataBuilderWithClientInfo()
+                        .setSamplingInterval(SAMPLING_ALWAYS)
+                        .setDataDownloadFileGroupStats(
+                                DataDownloadFileGroupStats.newBuilder()
+                                        .setFileGroupName("fileGroup")
+                                        .setFileGroupVersionNumber(1)
+                                        .setBuildId(123)
+                                        .setVariantId("testVariant"))
+                        .setDeviceInfo(MddDeviceInfo.newBuilder().setDeviceStorageLow(false))
+                        .setStableSamplingInfo(getStableSamplingInfo())
+                        .build();
+
+        verify(mockLogger).log(expectedData, MddClientEvent.Code.EVENT_CODE_UNSPECIFIED_VALUE);
+    }
+
+    @Test
+    public void testLogExpirationHandlerRemoveUnaccountedFilesSampled() {
+        final int unaccountedFileCount = 5;
+        overrideDefaultSampleInterval(SAMPLING_ALWAYS);
+        mddEventLogger.logMddDataDownloadFileExpirationEvent(0, unaccountedFileCount);
+
+        MddLogData expectedData =
+                newLogDataBuilderWithClientInfo()
+                        .setSamplingInterval(SAMPLING_ALWAYS)
+                        .setDeviceInfo(MddDeviceInfo.newBuilder().setDeviceStorageLow(false))
+                        .setStableSamplingInfo(getStableSamplingInfo())
+                        .build();
+
+        verify(mockLogger).log(expectedData, MddClientEvent.Code.EVENT_CODE_UNSPECIFIED_VALUE);
+    }
+
+    @Test
+    public void testLogMddNetworkSavingsSampled() {
+        overrideDefaultSampleInterval(SAMPLING_ALWAYS);
+        DataDownloadFileGroupStats icingDataDownloadFileGroupStats =
+                DataDownloadFileGroupStats.newBuilder()
+                        .setFileGroupName("fileGroup")
+                        .setFileGroupVersionNumber(1)
+                        .build();
+        mddEventLogger.logMddNetworkSavings(
+                icingDataDownloadFileGroupStats, 0, 200L, 100L, "file-id", 1);
+        MddLogData expectedData =
+                newLogDataBuilderWithClientInfo()
+                        .setSamplingInterval(SAMPLING_ALWAYS)
+                        .setDeviceInfo(MddDeviceInfo.newBuilder().setDeviceStorageLow(false))
+                        .setStableSamplingInfo(getStableSamplingInfo())
+                        .build();
+
+        verify(mockLogger).log(expectedData, MddClientEvent.Code.EVENT_CODE_UNSPECIFIED_VALUE);
+    }
+
+    @Test
+    public void testLogMddDownloadResult() {
+        overrideDefaultSampleInterval(SAMPLING_ALWAYS);
+        DataDownloadFileGroupStats icingDataDownloadFileGroupStats =
+                DataDownloadFileGroupStats.newBuilder()
+                        .setFileGroupName("fileGroup")
+                        .setFileGroupVersionNumber(1)
+                        .build();
+        mddEventLogger.logMddDownloadResult(
+                MddDownloadResult.Code.LOW_DISK_ERROR, icingDataDownloadFileGroupStats);
+
+        MddLogData expectedData =
+                newLogDataBuilderWithClientInfo()
+                        .setSamplingInterval(SAMPLING_ALWAYS)
+                        .setMddDownloadResultLog(
+                                MddDownloadResultLog.newBuilder()
+                                        .setResult(MddDownloadResult.Code.LOW_DISK_ERROR)
+                                        .setDataDownloadFileGroupStats(
+                                                icingDataDownloadFileGroupStats))
+                        .setDeviceInfo(MddDeviceInfo.newBuilder().setDeviceStorageLow(false))
+                        .setStableSamplingInfo(getStableSamplingInfo())
+                        .build();
+
+        verify(mockLogger).log(expectedData, MddClientEvent.Code.DATA_DOWNLOAD_RESULT_LOG_VALUE);
+    }
+
+    @Test
+    public void testLogMddUsageEvent() {
+        overrideDefaultSampleInterval(SAMPLING_ALWAYS);
+
+        DataDownloadFileGroupStats icingDataDownloadFileGroupStats =
+                DataDownloadFileGroupStats.newBuilder()
+                        .setFileGroupName("fileGroup")
+                        .setFileGroupVersionNumber(1)
+                        .setBuildId(123)
+                        .setVariantId("variant-id")
+                        .build();
+
+        Void usageEventLog = null;
+
+        mddEventLogger.logMddUsageEvent(icingDataDownloadFileGroupStats, usageEventLog);
+
+        MddLogData expectedData =
+                newLogDataBuilderWithClientInfo()
+                        .setDataDownloadFileGroupStats(icingDataDownloadFileGroupStats)
+                        .setSamplingInterval(SAMPLING_ALWAYS)
+                        .setDeviceInfo(MddDeviceInfo.newBuilder().setDeviceStorageLow(false))
+                        .setStableSamplingInfo(getStableSamplingInfo())
+                        .build();
+
+        verify(mockLogger).log(expectedData, MddClientEvent.Code.EVENT_CODE_UNSPECIFIED_VALUE);
+    }
+
+    @Test
+    public void testlogMddLibApiResultLog() {
+        overrideApiLoggingSampleInterval(SAMPLING_ALWAYS);
+
+        DataDownloadFileGroupStats icingDataDownloadFileGroupStats =
+                DataDownloadFileGroupStats.newBuilder()
+                        .setFileGroupName("fileGroup")
+                        .setFileGroupVersionNumber(1)
+                        .build();
+
+        Void mddLibApiResultLog = null;
+        mddEventLogger.logMddLibApiResultLog(mddLibApiResultLog);
+
+        MddLogData expectedData =
+                newLogDataBuilderWithClientInfo()
+                        .setSamplingInterval(SAMPLING_ALWAYS)
+                        .setDeviceInfo(MddDeviceInfo.newBuilder().setDeviceStorageLow(false))
+                        .setStableSamplingInfo(getStableSamplingInfo())
+                        .build();
+
+        verify(mockLogger).log(expectedData, MddClientEvent.Code.EVENT_CODE_UNSPECIFIED_VALUE);
+    }
+
+    private void overrideDefaultSampleInterval(int sampleInterval) {
+        flags.mddDefaultSampleInterval = Optional.of(sampleInterval);
+    }
+
+    private void overrideApiLoggingSampleInterval(int sampleInterval) {
+        flags.apiLoggingSampleInterval = Optional.of(sampleInterval);
+    }
+
+    private StableSamplingInfo getStableSamplingInfo() {
+        if (flags.enableRngBasedDeviceStableSampling()) {
+            return StableSamplingInfo.newBuilder()
+                    .setStableSamplingUsed(true)
+                    .setStableSamplingFirstEnabledTimestampMs(0)
+                    .setPartOfAlwaysLoggingGroup(false)
+                    .setInvalidSamplingRateUsed(false)
+                    .build();
+        }
+
+        return StableSamplingInfo.newBuilder().setStableSamplingUsed(false).build();
+    }
 }
diff --git a/proto/BUILD b/proto/BUILD
index 09df231..7b46203 100644
--- a/proto/BUILD
+++ b/proto/BUILD
@@ -2,7 +2,9 @@
 
 package(
     default_applicable_licenses = ["//:license"],
-    default_visibility = ["//visibility:public"],
+    default_visibility = [
+        "//visibility:public",
+    ],
     licenses = ["notice"],
 )
 
@@ -20,6 +22,11 @@
     deps = [":client_config_proto"],
 )
 
+java_proto_library(
+    name = "client_config_java_proto",
+    deps = [":client_config_proto"],
+)
+
 proto_library(
     name = "download_config_proto",
     srcs = ["download_config.proto"],
@@ -41,6 +48,11 @@
     deps = [":download_config_proto"],
 )
 
+java_proto_library(
+    name = "download_config_java_proto",
+    deps = [":download_config_proto"],
+)
+
 proto_library(
     name = "transform_proto",
     srcs = ["transform.proto"],
diff --git a/proto/client_config.proto b/proto/client_config.proto
index f6260c7..e5c0034 100644
--- a/proto/client_config.proto
+++ b/proto/client_config.proto
@@ -13,11 +13,10 @@
 // limitations under the License.
 syntax = "proto2";
 
-package com.google.android.libraries.mdi.download;
+package com.google.android.libraries.mobiledatadownload;
 
 import "google/protobuf/any.proto";
 
-//option jspb_use_correct_proto2_semantics = false;  // <internal>
 option java_package = "com.google.mobiledatadownload";
 option java_outer_classname = "ClientConfigProto";
 option objc_class_prefix = "ICN";
diff --git a/proto/download_config.proto b/proto/download_config.proto
index 1b88c23..ec4478a 100644
--- a/proto/download_config.proto
+++ b/proto/download_config.proto
@@ -21,7 +21,6 @@
 option java_package = "com.google.mobiledatadownload";
 option java_outer_classname = "DownloadConfigProto";
 option objc_class_prefix = "Icing";
-//option go_api_flag = "OPEN_TO_OPAQUE_HYBRID";  // See <internal>.
 
 // The top-level proto for Mobile Data Download (<internal>).
 message DownloadConfig {
@@ -530,8 +529,6 @@
         // prefix encoding, however, for the S2CellIds the high-order bits
         // encode the face-ID and as a result we often end up with large
         // numbers.
-//        optional fixed64 s2_cell_id = 1 [
-//          (datapol.semantic_type) = ST_LOCATION
         optional fixed64 s2_cell_id = 1;
       }
 
diff --git a/proto/log_enums.proto b/proto/log_enums.proto
index a86c611..2f8d93c 100644
--- a/proto/log_enums.proto
+++ b/proto/log_enums.proto
@@ -43,13 +43,42 @@
     // Logged with DataDownloadFileGroupStats, MddFileGroupStatus.
     DATA_DOWNLOAD_FILE_GROUP_STATUS = 1044;
 
-    // Log MddStorageStats in daily maintenance.
-    DATA_DOWNLOAD_STORAGE_STATS = 1055;
-
     // MDD download result log.
     DATA_DOWNLOAD_RESULT_LOG = 1068;
 
-    reserved 1000 to 1043, 1045 to 1054, 1056 to 1067, 1069 to 1113;
+    // Log MddStorageStats in daily maintenance.
+    DATA_DOWNLOAD_STORAGE_STATS = 1055;
+
+    // Log event for MDD Lib api result.
+    DATA_DOWNLOAD_LIB_API_RESULT = 1108;
+
+    // Log MddNetworkStats in daily maintenance.
+    DATA_DOWNLOAD_NETWORK_STATS = 1056;
+
+    // File group download started.
+    DATA_DOWNLOAD_STARTED = 1070;
+
+    // File group download complete.
+    DATA_DOWNLOAD_COMPLETE = 1007;
+
+    // The log event for MDD download latency.
+    DATA_DOWNLOAD_LATENCY_LOG = 1080;
+
+    // All files in the group were already available when the file group was
+    // added.
+    DATA_DOWNLOAD_COMPLETE_IMMEDIATE = 1032;
+
+    DATA_DOWNLOAD_PENDING_GROUP_REPLACED = 1115;
+
+    reserved 1000 to 1006;
+    reserved 1008 to 1031;
+    reserved 1033 to 1043;
+    reserved 1045 to 1054;
+    reserved 1057 to 1067;
+    reserved 1069;
+    reserved 1071 to 1079;
+    reserved 1081 to 1107;
+    reserved 1109 to 1114;
 
     reserved 2000 to 2999, 3000 to 3999, 4000 to 4099, 4100 to 4199,
         5000 to 5999, 6000 to 6999, 7000 to 7999, 8000 to 8999, 9000 to 9999,
@@ -171,3 +200,77 @@
     reserved 1000 to 3000;
   }
 }
+
+// Collection of MDD Lib's Public API methods used when logging the result of an
+// MDD Lib API call.
+message MddLibApiName {
+  enum Code {
+    UNKNOWN = 0;
+
+    // File Group metadata management APIs.
+    // NOTE: These APIs will include DataDownloadFileGroupStats in their
+    // logs.
+    ADD_FILE_GROUP = 1;
+    GET_FILE_GROUP = 2;
+    REMOVE_FILE_GROUP = 3;
+    REPORT_USAGE = 4;
+
+    // File Group data management APIs.
+    // NOTE: These APIs will include DataDownloadFileGroupStats in their
+    // logs.
+    CANCEL_FOREGROUND_DOWNLOAD = 5;
+    DOWNLOAD_FILE_GROUP = 6;
+    DOWNLOAD_FILE_GROUP_WITH_FOREGROUND_SERVICE = 7;
+    IMPORT_FILES = 8;
+
+    // File Group metadata bulk management APIs
+    // NOTE: These APIs will not include DataDownloadFileGroupStats in
+    // their logs.
+    CLEAR = 9;
+    GET_FILE_GROUPS_BY_FILTER = 10;
+    MAINTENANCE = 11;
+    REMOVE_FILE_GROUPS_BY_FILTER = 12;
+
+    // File data management APIs
+    // NOTE: These APIs will not include DataDownloadFileGroupStats in
+    // their logs.
+    DOWNLOAD_FILE = 13;
+    DOWNLOAD_FILE_WITH_FOREGROUND_SERVICE = 14;
+
+    // Task scheduling APIs.
+    // NOTE: These APIs will not include DataDownloadFileGroupStats in
+    // their logs.
+    HANDLE_TASK = 15;
+    SCHEDULE_PERIODIC_BACKGROUND_TASKS = 16;
+    SYNC = 17;
+
+    // Calls to phenotype external experiment id setting
+
+    // NOTE: this isn't actually an MDD API but the data is in the same format.
+    // DataDownloadFileGroupStats will be populated when available.
+    PHENOTYPE_CLEAR_EXPERIMENT_IDS = 18;
+    PHENOTYPE_UPDATE_EXPERIMENT_IDS = 19;
+    PHENOTYPE_CLEAR_ALL = 20;
+  }
+}
+
+// Result enum when logging the result of an MDD Lib API call.
+message MddLibApiResult {
+  enum Code {
+    RESULT_UNKNOWN = 0;
+    RESULT_SUCCESS = 1;
+
+    // Codes for failures
+    // Used for failures whose is reason is unknown.
+    RESULT_FAILURE = 2;
+    // Request cancelled
+    RESULT_CANCELLED = 3;
+    // Interrupted
+    RESULT_INTERRUPTED = 4;
+    RESULT_IO_ERROR = 5;
+    RESULT_ILLEGAL_STATE = 6;
+    RESULT_ILLEGAL_ARGUMENT = 7;
+    RESULT_UNSUPPORTED_OPERATION = 8;
+    RESULT_DOWNLOAD_ERROR = 9;
+  }
+}
diff --git a/proto/logs.proto b/proto/logs.proto
index b35aa07..e3d6582 100644
--- a/proto/logs.proto
+++ b/proto/logs.proto
@@ -19,7 +19,6 @@
 
 import "log_enums.proto";
 
-//option jspb_use_correct_proto2_semantics = false;  // <internal>
 option java_package = "com.google.mobiledatadownload";
 option java_outer_classname = "LogProto";
 
@@ -182,8 +181,17 @@
   // MDD download result log.
   optional MddDownloadResultLog mdd_download_result_log = 63;
 
-  reserved 1 to 9, 11 to 20, 22 to 31, 33 to 39, 41 to 45, 47 to 50, 52 to 62,
-      64 to 71, 73;
+  // MDD download latency log.
+  optional MddDownloadLatency mdd_download_latency = 67;
+
+  // MDD Api Result event
+  optional MddLibApiResultLog mdd_lib_api_result_log = 71;
+
+  // MDD File Group Network Stats. Additional info necessary for Network Stats.
+  optional MddNetworkStats mdd_network_stats = 49;
+
+  reserved 1 to 9, 11 to 20, 22 to 31, 33 to 39, 41 to 45, 47 to 48, 50,
+      52 to 62, 64 to 66, 68 to 70, 73;
 }
 
 // Info on sampling method used for log events. Stable sampling means if a
@@ -268,4 +276,74 @@
   //
   // See <internal> for more info.
   optional int32 days_since_last_log = 6;
+}
+
+// MDD download latency log.
+// Next tag: 4
+message MddDownloadLatency {
+  // The number of download attempts needed to fully download the file group.
+  optional int32 download_attempt_count = 1;
+  // The download latency in milliseconds, which is the time elapsed between
+  // download started and download complete.
+  optional int64 download_latency_ms = 2;
+  // The total MDD download latency in milliseconds, which is the time elapsed
+  // between new config received from P/H and download complete.
+  // True E2E download latency = PH propagation latency + MDD total download
+  // latency. Here we are talking about the later.
+  optional int64 total_latency_ms = 3;
+}
+
+// MDD Lib API result log.
+// This log will be generated for each MDD Lib API call.
+//
+// Next tag: 5
+message MddLibApiResultLog {
+  // The API which generated this result.
+  optional MddLibApiName.Code api_used = 1;
+
+  // The result of the API call.
+  optional MddLibApiResult.Code result = 2;
+
+  // Will be populated with relevant file group details depending on the api
+  // type. See MddLibApiName for more details.
+  repeated DataDownloadFileGroupStats data_download_file_group_stats = 3;
+
+  // The latency in nano seconds.
+  optional int64 latency_ns = 4;
+}
+
+// MDD File Group Network stats.
+message MddGroupNetworkStats {
+  optional DataDownloadFileGroupStats data_download_file_group_stats = 1;
+
+  // The total bytes downloaded through Wifi by the file group.
+  optional int64 total_wifi_bytes = 2;
+
+  // The total bytes downloaded through Cellular by the file group.
+  optional int64 total_cellular_bytes = 3;
+
+  // The total bytes downloaded through ways other than wifi or Cellular by the
+  // file group. E.g. import from local storage & etc.
+  optional int64 total_other_bytes = 4;
+}
+
+// MDD Network stats
+message MddNetworkStats {
+  message GroupStats {
+    optional DataDownloadFileGroupStats data_download_file_group_stats = 1;
+
+    // The total bytes downloaded through Wifi by the file group.
+    optional uint64 total_wifi_bytes = 2;
+
+    // The total bytes downloaded through Cellular by the file group.
+    optional uint64 total_cellular_bytes = 3;
+  }
+
+  repeated GroupStats group_stats = 1;
+
+  // Total bytes downloaded by all MDD file groups through Wifi.
+  optional uint64 total_mdd_wifi_bytes = 2;
+
+  // Total bytes downloaded by all MDD file groups through Cellular.
+  optional uint64 total_mdd_cellular_bytes = 3;
 }
\ No newline at end of file
diff --git a/proto/metadata.proto b/proto/metadata.proto
index 6e6d8dc..4082a40 100644
--- a/proto/metadata.proto
+++ b/proto/metadata.proto
@@ -573,21 +573,21 @@
 }
 
 // Metadata used by
-// com.google.android.libraries.mdi.download.MobileDataDownloadManager
+// com.google.android.libraries.mobiledatadownload.MobileDataDownloadManager
 message MobileDataDownloadManagerMetadata {
   optional bool mdd_migrated_to_offroad = 1;
   optional int32 reset_trigger = 2;
 }
 
 // Metadata used by
-// com.google.android.libraries.mdi.download.SharedFileManager
+// com.google.android.libraries.mobiledatadownload.SharedFileManager
 message SharedFileManagerMetadata {
   optional bool migrated_to_new_file_key = 1;
   optional int64 next_file_name = 2;
 }
 
 // Collects all data used by
-// com.google.android.libraries.mdi.download.internal.Migrations
+// com.google.android.libraries.mobiledatadownload.internal.Migrations
 message MigrationsStore {
   enum FileKeyVersion {
     NEW_FILE_KEY = 0;
@@ -599,7 +599,7 @@
 }
 
 // Collects all data used by
-// com.google.android.libraries.mdi.download.internal.FileGroupsMetadata
+// com.google.android.libraries.mobiledatadownload.internal.FileGroupsMetadata
 message FileGroupsMetadataStore {
   // Key must be a serialized GroupKey.
   map<string, DataFileGroupInternal> data_file_groups = 1;
@@ -609,7 +609,7 @@
 }
 
 // Collects all data used by
-// com.google.android.libraries.mdi.download.internal.SharedFilesMetadata
+// com.google.android.libraries.mobiledatadownload.internal.SharedFilesMetadata
 message SharedFilesMetadataStore {
   // The key must be a serialized NewFileKey.
   map<string, SharedFile> shared_files = 1;
@@ -641,7 +641,7 @@
   optional string checksum = 3;
   optional DataFileGroupInternal.AllowedReaders allowed_readers = 4;
   optional mobstore.proto.Transforms download_transforms = 5
-  [deprecated = true];
+      [deprecated = true];
 }
 
 // This proto is used to store state for logging. See details at