blob: 7be91e3ae1ecc2f5a432933badc78f784c2c6268 [file] [log] [blame]
// Copyright 2016 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_TASK_THREAD_POOL_SEQUENCE_H_
#define BASE_TASK_THREAD_POOL_SEQUENCE_H_
#include <stddef.h>
#include "base/base_export.h"
#include "base/containers/intrusive_heap.h"
#include "base/containers/queue.h"
#include "base/sequence_token.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool/pooled_parallel_task_runner.h"
#include "base/task/thread_pool/task.h"
#include "base/task/thread_pool/task_source.h"
#include "base/task/thread_pool/task_source_sort_key.h"
#include "base/thread_annotations.h"
#include "base/threading/sequence_local_storage_map.h"
namespace base {
namespace internal {
// A Sequence is intended to hold delayed tasks and immediate tasks.
// Delayed tasks are held in a prority_queue until they are ripe and
// immediate tasks in a simple fifo queue.
// When Sequence::TakeTask is called, we select the next appropriate task
// from both queues and return it.
// Each queue holds slots each containing up to a single Task that must be
// executed in posting/runtime order.
//
// In comments below, an "empty Sequence" is a Sequence with no slot.
//
// Note: there is a known refcounted-ownership cycle in the Scheduler
// architecture: Sequence -> Task -> TaskRunner -> Sequence -> ...
// This is okay so long as the other owners of Sequence (PriorityQueue and
// WorkerThread in alternation and
// ThreadGroupImpl::WorkerThreadDelegateImpl::GetWork()
// temporarily) keep running it (and taking Tasks from it as a result). A
// dangling reference cycle would only occur should they release their reference
// to it while it's not empty. In other words, it is only correct for them to
// release it after PopTask() returns false to indicate it was made empty by
// that call (in which case the next PushImmediateTask() will return true to
// indicate to the caller that the Sequence should be re-enqueued for
// execution). This class is thread-safe.
class BASE_EXPORT Sequence : public TaskSource {
public:
// A Transaction can perform multiple operations atomically on a
// Sequence. While a Transaction is alive, it is guaranteed that nothing
// else will access the Sequence; the Sequence's lock is held for the
// lifetime of the Transaction.
class BASE_EXPORT Transaction : public TaskSource::Transaction {
public:
Transaction(Transaction&& other);
Transaction(const Transaction&) = delete;
Transaction& operator=(const Transaction&) = delete;
~Transaction();
// Returns true if the sequence must be added to the immediate queue after
// receiving a new immediate Task in order to be scheduled. If the caller
// doesn't want the sequence to be scheduled, it may not add the sequence to
// the immediate queue even if this returns true.
bool WillPushImmediateTask();
// Adds immediate |task| to the end of this sequence.
void PushImmediateTask(Task task);
// Adds a delayed |task| in this sequence, and returns true if the sequence
// needs to be re-enqueued in the delayed queue as a result of this
// sequence's delayed sort key changing.
bool PushDelayedTask(Task task);
Sequence* sequence() const { return static_cast<Sequence*>(task_source()); }
private:
friend class Sequence;
explicit Transaction(Sequence* sequence);
};
// |traits| is metadata that applies to all Tasks in the Sequence.
// |task_runner| is a reference to the TaskRunner feeding this TaskSource.
// |task_runner| can be nullptr only for tasks with no TaskRunner, in which
// case |execution_mode| must be kParallel. Otherwise, |execution_mode| is the
// execution mode of |task_runner|.
Sequence(const TaskTraits& traits,
TaskRunner* task_runner,
TaskSourceExecutionMode execution_mode);
Sequence(const Sequence&) = delete;
Sequence& operator=(const Sequence&) = delete;
// Begins a Transaction. This method cannot be called on a thread which has an
// active Sequence::Transaction.
[[nodiscard]] Transaction BeginTransaction();
// TaskSource:
ExecutionEnvironment GetExecutionEnvironment() override;
size_t GetRemainingConcurrency() const override;
TaskSourceSortKey GetSortKey() const override;
TimeTicks GetDelayedSortKey() const override;
// Returns a token that uniquely identifies this Sequence.
const SequenceToken& token() const { return token_; }
SequenceLocalStorageMap* sequence_local_storage() {
return &sequence_local_storage_;
}
bool OnBecomeReady() override;
bool has_worker_for_testing() const NO_THREAD_SAFETY_ANALYSIS {
return has_worker_;
}
bool is_immediate_for_testing() const { return is_immediate_; }
bool IsEmptyForTesting() const NO_THREAD_SAFETY_ANALYSIS { return IsEmpty(); }
private:
~Sequence() override;
struct DelayedTaskGreater {
bool operator()(const Task& lhs, const Task& rhs) const;
};
// TaskSource:
RunStatus WillRunTask() override;
Task TakeTask(TaskSource::Transaction* transaction) override;
absl::optional<Task> Clear(TaskSource::Transaction* transaction) override;
bool DidProcessTask(TaskSource::Transaction* transaction) override;
bool WillReEnqueue(TimeTicks now,
TaskSource::Transaction* transaction) override;
// Returns true if the delayed task to be posted will cause the delayed sort
// key to change.
bool DelayedSortKeyWillChange(const Task& delayed_task) const
EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Selects the earliest task to run, either from immediate or
// delayed queue and return it.
// Expects this sequence to have at least one task that can run
// immediately.
Task TakeEarliestTask() EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Get and return next task from immediate queue
Task TakeNextImmediateTask() EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Update the next earliest/latest ready time.
void UpdateReadyTimes() EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Returns true if there are immediate tasks
bool HasImmediateTasks() const EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Returns true if tasks ready to be executed
bool HasReadyTasks(TimeTicks now) const override;
bool IsEmpty() const EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Releases reference to TaskRunner.
void ReleaseTaskRunner();
const SequenceToken token_ = SequenceToken::Create();
// Queues of tasks to execute.
base::queue<Task> queue_ GUARDED_BY(lock_);
base::IntrusiveHeap<Task, DelayedTaskGreater> delayed_queue_
GUARDED_BY(lock_);
// Caches the latest/earliest ready time for atomic access. Writes are
// protected by |lock_|, but allows atomic reads outside of |lock_|. If this
// sequence is empty, these are in an unknown state and shouldn't be read.
// Minimum of latest_delayed_run_time() of next delayed task if any, and
// |queue_time| of next immediate task if any.
std::atomic<TimeTicks> latest_ready_time_ GUARDED_BY(lock_){TimeTicks()};
// is_null() if there is an immediate task, or earliest_delayed_run_time() of
// next delayed task otherwise.
std::atomic<TimeTicks> earliest_ready_time_ GUARDED_BY(lock_){TimeTicks()};
// True if a worker is currently associated with a Task from this Sequence.
bool has_worker_ = false;
// True if the sequence has ready tasks and requested to be queued as such
// through WillPushImmediateTask() or OnBecomeReady(). Reset to false once all
// ready tasks are done being processed and either DidProcessTask() or
// WillReEnqueue() returned false. Normally, |is_immediate_| is protected by
// |lock_|, except in OnBecomeReady() hence the use of atomics.
std::atomic_bool is_immediate_{false};
// Holds data stored through the SequenceLocalStorageSlot API.
SequenceLocalStorageMap sequence_local_storage_;
};
} // namespace internal
} // namespace base
#endif // BASE_TASK_THREAD_POOL_SEQUENCE_H_