| // Copyright 2016 The Chromium Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "base/task/thread_pool/worker_thread_waitable_event.h" |
| |
| #include <stddef.h> |
| |
| #include <atomic> |
| #include <memory> |
| #include <utility> |
| #include <vector> |
| |
| #include "base/allocator/partition_allocator/src/partition_alloc/partition_alloc_buildflags.h" |
| #include "base/allocator/partition_allocator/src/partition_alloc/partition_alloc_config.h" |
| #include "base/allocator/partition_allocator/src/partition_alloc/shim/allocator_shim.h" |
| #include "base/allocator/partition_allocator/src/partition_alloc/shim/allocator_shim_default_dispatch_to_partition_alloc.h" |
| #include "base/compiler_specific.h" |
| #include "base/functional/bind.h" |
| #include "base/functional/callback_helpers.h" |
| #include "base/memory/ptr_util.h" |
| #include "base/memory/raw_ptr.h" |
| #include "base/memory/ref_counted.h" |
| #include "base/message_loop/message_pump_type.h" |
| #include "base/synchronization/condition_variable.h" |
| #include "base/task/common/checked_lock.h" |
| #include "base/task/thread_pool/environment_config.h" |
| #include "base/task/thread_pool/sequence.h" |
| #include "base/task/thread_pool/task.h" |
| #include "base/task/thread_pool/task_tracker.h" |
| #include "base/task/thread_pool/test_utils.h" |
| #include "base/task/thread_pool/worker_thread_observer.h" |
| #include "base/test/test_timeouts.h" |
| #include "base/test/test_waitable_event.h" |
| #include "base/threading/platform_thread.h" |
| #include "base/threading/simple_thread.h" |
| #include "base/threading/thread.h" |
| #include "base/time/time.h" |
| #include "build/build_config.h" |
| #include "testing/gmock/include/gmock/gmock.h" |
| #include "testing/gtest/include/gtest/gtest.h" |
| |
| #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \ |
| PA_CONFIG(THREAD_CACHE_SUPPORTED) |
| #include "base/allocator/partition_allocator/src/partition_alloc/extended_api.h" |
| #include "base/allocator/partition_allocator/src/partition_alloc/thread_cache.h" |
| #endif // BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && |
| // PA_CONFIG(THREAD_CACHE_SUPPORTED) |
| |
| using testing::_; |
| using testing::Mock; |
| using testing::Ne; |
| using testing::StrictMock; |
| |
| namespace base { |
| namespace internal { |
| namespace { |
| |
| const size_t kNumSequencesPerTest = 150; |
| |
| class WorkerThreadDefaultDelegate : public WorkerThreadWaitableEvent::Delegate { |
| public: |
| WorkerThreadDefaultDelegate() = default; |
| WorkerThreadDefaultDelegate(const WorkerThreadDefaultDelegate&) = delete; |
| WorkerThreadDefaultDelegate& operator=(const WorkerThreadDefaultDelegate&) = |
| delete; |
| |
| // WorkerThreadWaitableEvent::Delegate: |
| WorkerThread::ThreadLabel GetThreadLabel() const override { |
| return WorkerThread::ThreadLabel::DEDICATED; |
| } |
| void OnMainEntry(WorkerThread* worker) override {} |
| RegisteredTaskSource GetWork(WorkerThread* worker) override { |
| return nullptr; |
| } |
| RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source, |
| WorkerThread* worker) override { |
| ADD_FAILURE() << "Unexpected call to SwapProcessedTask()"; |
| return nullptr; |
| } |
| TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); } |
| }; |
| |
| // The test parameter is the number of Tasks per Sequence returned by GetWork(). |
| class ThreadPoolWorkerTest : public testing::TestWithParam<int> { |
| public: |
| ThreadPoolWorkerTest(const ThreadPoolWorkerTest&) = delete; |
| ThreadPoolWorkerTest& operator=(const ThreadPoolWorkerTest&) = delete; |
| |
| protected: |
| ThreadPoolWorkerTest() : num_get_work_cv_(lock_.CreateConditionVariable()) { |
| Thread::Options service_thread_options; |
| service_thread_options.message_pump_type = MessagePumpType::IO; |
| service_thread_.StartWithOptions(std::move(service_thread_options)); |
| } |
| |
| void SetUp() override { |
| worker_ = MakeRefCounted<WorkerThreadWaitableEvent>( |
| ThreadType::kDefault, std::make_unique<TestWorkerThreadWaitableEventDelegate>(this), |
| task_tracker_.GetTrackedRef(), 0); |
| ASSERT_TRUE(worker_); |
| worker_->Start(service_thread_.task_runner()); |
| worker_set_.Signal(); |
| main_entry_called_.Wait(); |
| } |
| |
| void TearDown() override { |
| // |worker_| needs to be released before ~TaskTracker() as it holds a |
| // TrackedRef to it. |
| worker_->JoinForTesting(); |
| worker_ = nullptr; |
| } |
| |
| int TasksPerSequence() const { return GetParam(); } |
| |
| // Wait until GetWork() has been called |num_get_work| times. |
| void WaitForNumGetWork(size_t num_get_work) { |
| CheckedAutoLock auto_lock(lock_); |
| while (num_get_work_ < num_get_work) |
| num_get_work_cv_->Wait(); |
| } |
| |
| void SetMaxGetWork(size_t max_get_work) { |
| CheckedAutoLock auto_lock(lock_); |
| max_get_work_ = max_get_work; |
| } |
| |
| void SetNumSequencesToCreate(size_t num_sequences_to_create) { |
| CheckedAutoLock auto_lock(lock_); |
| EXPECT_EQ(0U, num_sequences_to_create_); |
| num_sequences_to_create_ = num_sequences_to_create; |
| } |
| |
| size_t NumRunTasks() { |
| CheckedAutoLock auto_lock(lock_); |
| return num_run_tasks_; |
| } |
| |
| std::vector<scoped_refptr<TaskSource>> CreatedTaskSources() { |
| CheckedAutoLock auto_lock(lock_); |
| return created_sequences_; |
| } |
| |
| std::vector<scoped_refptr<TaskSource>> DidProcessTaskSequences() { |
| CheckedAutoLock auto_lock(lock_); |
| return did_run_task_sources_; |
| } |
| |
| scoped_refptr<WorkerThreadWaitableEvent> worker_; |
| Thread service_thread_ = Thread("ServiceThread"); |
| |
| private: |
| class TestWorkerThreadWaitableEventDelegate : public WorkerThreadDefaultDelegate { |
| public: |
| explicit TestWorkerThreadWaitableEventDelegate(ThreadPoolWorkerTest* outer) |
| : outer_(outer) {} |
| TestWorkerThreadWaitableEventDelegate(const TestWorkerThreadWaitableEventDelegate&) = delete; |
| TestWorkerThreadWaitableEventDelegate& operator=(const TestWorkerThreadWaitableEventDelegate&) = |
| delete; |
| |
| ~TestWorkerThreadWaitableEventDelegate() override { |
| EXPECT_FALSE(IsCallToDidProcessTaskExpected()); |
| } |
| |
| // WorkerThread::Delegate: |
| void OnMainEntry(WorkerThread* worker) override { |
| outer_->worker_set_.Wait(); |
| EXPECT_EQ(outer_->worker_.get(), worker); |
| EXPECT_FALSE(IsCallToDidProcessTaskExpected()); |
| |
| // Without synchronization, OnMainEntry() could be called twice without |
| // generating an error. |
| CheckedAutoLock auto_lock(outer_->lock_); |
| EXPECT_FALSE(outer_->main_entry_called_.IsSignaled()); |
| outer_->main_entry_called_.Signal(); |
| } |
| |
| RegisteredTaskSource GetWork(WorkerThread* worker) override { |
| EXPECT_FALSE(IsCallToDidProcessTaskExpected()); |
| EXPECT_EQ(outer_->worker_.get(), worker); |
| |
| { |
| CheckedAutoLock auto_lock(outer_->lock_); |
| |
| // Increment the number of times that this method has been called. |
| ++outer_->num_get_work_; |
| outer_->num_get_work_cv_->Signal(); |
| |
| // Verify that this method isn't called more times than expected. |
| EXPECT_LE(outer_->num_get_work_, outer_->max_get_work_); |
| |
| // Check if a Sequence should be returned. |
| if (outer_->num_sequences_to_create_ == 0) |
| return nullptr; |
| --outer_->num_sequences_to_create_; |
| } |
| |
| // Create a Sequence with TasksPerSequence() Tasks. |
| scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>( |
| TaskTraits(), nullptr, TaskSourceExecutionMode::kParallel); |
| Sequence::Transaction sequence_transaction(sequence->BeginTransaction()); |
| for (int i = 0; i < outer_->TasksPerSequence(); ++i) { |
| Task task(FROM_HERE, |
| BindOnce(&ThreadPoolWorkerTest::RunTaskCallback, |
| Unretained(outer_)), |
| TimeTicks::Now(), TimeDelta()); |
| sequence_transaction.WillPushImmediateTask(); |
| EXPECT_TRUE(outer_->task_tracker_.WillPostTask( |
| &task, sequence->shutdown_behavior())); |
| sequence_transaction.PushImmediateTask(std::move(task)); |
| } |
| auto registered_task_source = |
| outer_->task_tracker_.RegisterTaskSource(sequence); |
| EXPECT_TRUE(registered_task_source); |
| |
| ExpectCallToDidProcessTask(); |
| |
| { |
| // Add the Sequence to the vector of created Sequences. |
| CheckedAutoLock auto_lock(outer_->lock_); |
| outer_->created_sequences_.push_back(sequence); |
| } |
| auto run_status = registered_task_source.WillRunTask(); |
| EXPECT_NE(run_status, TaskSource::RunStatus::kDisallowed); |
| return registered_task_source; |
| } |
| |
| RegisteredTaskSource SwapProcessedTask( |
| RegisteredTaskSource registered_task_source, |
| WorkerThread* worker) override { |
| { |
| CheckedAutoLock auto_lock(expect_did_run_task_lock_); |
| EXPECT_TRUE(expect_did_run_task_); |
| expect_did_run_task_ = false; |
| } |
| |
| // If TasksPerSequence() is 1, |registered_task_source| should be nullptr. |
| // Otherwise, |registered_task_source| should contain TasksPerSequence() - |
| // 1 Tasks. |
| if (outer_->TasksPerSequence() == 1) { |
| EXPECT_FALSE(registered_task_source); |
| } else { |
| EXPECT_TRUE(registered_task_source); |
| EXPECT_TRUE(registered_task_source.WillReEnqueue(TimeTicks::Now())); |
| |
| // Verify the number of Tasks in |registered_task_source|. |
| for (int i = 0; i < outer_->TasksPerSequence() - 1; ++i) { |
| registered_task_source.WillRunTask(); |
| IgnoreResult(registered_task_source.TakeTask()); |
| if (i < outer_->TasksPerSequence() - 2) { |
| EXPECT_TRUE(registered_task_source.DidProcessTask()); |
| EXPECT_TRUE(registered_task_source.WillReEnqueue(TimeTicks::Now())); |
| } else { |
| EXPECT_FALSE(registered_task_source.DidProcessTask()); |
| } |
| } |
| scoped_refptr<TaskSource> task_source = |
| registered_task_source.Unregister(); |
| { |
| // Add |task_source| to |did_run_task_sources_|. |
| CheckedAutoLock auto_lock(outer_->lock_); |
| outer_->did_run_task_sources_.push_back(std::move(task_source)); |
| EXPECT_LE(outer_->did_run_task_sources_.size(), |
| outer_->created_sequences_.size()); |
| } |
| } |
| return GetWork(worker); |
| } |
| |
| private: |
| // Expect a call to DidProcessTask() before the next call to any other |
| // method of this delegate. |
| void ExpectCallToDidProcessTask() { |
| CheckedAutoLock auto_lock(expect_did_run_task_lock_); |
| expect_did_run_task_ = true; |
| } |
| |
| bool IsCallToDidProcessTaskExpected() const { |
| CheckedAutoLock auto_lock(expect_did_run_task_lock_); |
| return expect_did_run_task_; |
| } |
| |
| raw_ptr<ThreadPoolWorkerTest> outer_; |
| |
| // Synchronizes access to |expect_did_run_task_|. |
| mutable CheckedLock expect_did_run_task_lock_; |
| |
| // Whether the next method called on this delegate should be |
| // DidProcessTask(). |
| bool expect_did_run_task_ = false; |
| }; |
| |
| void RunTaskCallback() { |
| CheckedAutoLock auto_lock(lock_); |
| ++num_run_tasks_; |
| EXPECT_LE(num_run_tasks_, created_sequences_.size()); |
| } |
| |
| TaskTracker task_tracker_; |
| |
| // Synchronizes access to all members below. |
| mutable CheckedLock lock_; |
| |
| // Signaled once OnMainEntry() has been called. |
| TestWaitableEvent main_entry_called_; |
| |
| // Number of Sequences that should be created by GetWork(). When this |
| // is 0, GetWork() returns nullptr. |
| size_t num_sequences_to_create_ = 0; |
| |
| // Number of times that GetWork() has been called. |
| size_t num_get_work_ = 0; |
| |
| // Maximum number of times that GetWork() can be called. |
| size_t max_get_work_ = 0; |
| |
| // Condition variable signaled when |num_get_work_| is incremented. |
| std::unique_ptr<ConditionVariable> num_get_work_cv_; |
| |
| // Sequences created by GetWork(). |
| std::vector<scoped_refptr<TaskSource>> created_sequences_; |
| |
| // Sequences passed to DidProcessTask(). |
| std::vector<scoped_refptr<TaskSource>> did_run_task_sources_; |
| |
| // Number of times that RunTaskCallback() has been called. |
| size_t num_run_tasks_ = 0; |
| |
| // Signaled after |worker_| is set. |
| TestWaitableEvent worker_set_; |
| }; |
| |
| } // namespace |
| |
| // Verify that when GetWork() continuously returns Sequences, all Tasks in these |
| // Sequences run successfully. The test wakes up the WorkerThread once. |
| TEST_P(ThreadPoolWorkerTest, ContinuousWork) { |
| // Set GetWork() to return |kNumSequencesPerTest| Sequences before starting to |
| // return nullptr. |
| SetNumSequencesToCreate(kNumSequencesPerTest); |
| |
| // Expect |kNumSequencesPerTest| calls to GetWork() in which it returns a |
| // Sequence and one call in which its returns nullptr. |
| const size_t kExpectedNumGetWork = kNumSequencesPerTest + 1; |
| SetMaxGetWork(kExpectedNumGetWork); |
| |
| // Wake up |worker_| and wait until GetWork() has been invoked the |
| // expected amount of times. |
| worker_->WakeUp(); |
| WaitForNumGetWork(kExpectedNumGetWork); |
| |
| // All tasks should have run. |
| EXPECT_EQ(kNumSequencesPerTest, NumRunTasks()); |
| |
| // If Sequences returned by GetWork() contain more than one Task, they aren't |
| // empty after the worker pops Tasks from them and thus should be returned to |
| // DidProcessTask(). |
| if (TasksPerSequence() > 1) |
| EXPECT_EQ(CreatedTaskSources(), DidProcessTaskSequences()); |
| else |
| EXPECT_TRUE(DidProcessTaskSequences().empty()); |
| } |
| |
| // Verify that when GetWork() alternates between returning a Sequence and |
| // returning nullptr, all Tasks in the returned Sequences run successfully. The |
| // test wakes up the WorkerThread once for each Sequence. |
| TEST_P(ThreadPoolWorkerTest, IntermittentWork) { |
| for (size_t i = 0; i < kNumSequencesPerTest; ++i) { |
| // Set GetWork() to return 1 Sequence before starting to return |
| // nullptr. |
| SetNumSequencesToCreate(1); |
| |
| // Expect |i + 1| calls to GetWork() in which it returns a Sequence and |
| // |i + 1| calls in which it returns nullptr. |
| const size_t expected_num_get_work = 2 * (i + 1); |
| SetMaxGetWork(expected_num_get_work); |
| |
| // Wake up |worker_| and wait until GetWork() has been invoked |
| // the expected amount of times. |
| worker_->WakeUp(); |
| WaitForNumGetWork(expected_num_get_work); |
| |
| // The Task should have run |
| EXPECT_EQ(i + 1, NumRunTasks()); |
| |
| // If Sequences returned by GetWork() contain more than one Task, they |
| // aren't empty after the worker pops Tasks from them and thus should be |
| // returned to DidProcessTask(). |
| if (TasksPerSequence() > 1) |
| EXPECT_EQ(CreatedTaskSources(), DidProcessTaskSequences()); |
| else |
| EXPECT_TRUE(DidProcessTaskSequences().empty()); |
| } |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(OneTaskPerSequence, |
| ThreadPoolWorkerTest, |
| ::testing::Values(1)); |
| INSTANTIATE_TEST_SUITE_P(TwoTasksPerSequence, |
| ThreadPoolWorkerTest, |
| ::testing::Values(2)); |
| |
| namespace { |
| |
| class ControllableCleanupDelegate : public WorkerThreadDefaultDelegate { |
| public: |
| class Controls : public RefCountedThreadSafe<Controls> { |
| public: |
| Controls() = default; |
| Controls(const Controls&) = delete; |
| Controls& operator=(const Controls&) = delete; |
| |
| void HaveWorkBlock() { work_running_.Reset(); } |
| |
| void UnblockWork() { work_running_.Signal(); } |
| |
| void WaitForWorkToRun() { work_processed_.Wait(); } |
| |
| void WaitForCleanupRequest() { cleanup_requested_.Wait(); } |
| |
| void WaitForDelegateDestroy() { destroyed_.Wait(); } |
| |
| void WaitForMainExit() { exited_.Wait(); } |
| |
| void set_expect_get_work(bool expect_get_work) { |
| expect_get_work_ = expect_get_work; |
| } |
| |
| void ResetState() { |
| work_running_.Signal(); |
| work_processed_.Reset(); |
| cleanup_requested_.Reset(); |
| exited_.Reset(); |
| work_requested_ = false; |
| } |
| |
| void set_can_cleanup(bool can_cleanup) { can_cleanup_ = can_cleanup; } |
| |
| private: |
| friend class ControllableCleanupDelegate; |
| friend class RefCountedThreadSafe<Controls>; |
| ~Controls() = default; |
| |
| TestWaitableEvent work_running_{WaitableEvent::ResetPolicy::MANUAL, |
| WaitableEvent::InitialState::SIGNALED}; |
| TestWaitableEvent work_processed_; |
| TestWaitableEvent cleanup_requested_; |
| TestWaitableEvent destroyed_; |
| TestWaitableEvent exited_; |
| |
| bool expect_get_work_ = true; |
| bool can_cleanup_ = false; |
| bool work_requested_ = false; |
| }; |
| |
| explicit ControllableCleanupDelegate(TaskTracker* task_tracker) |
| : task_tracker_(task_tracker), controls_(new Controls()) {} |
| |
| ControllableCleanupDelegate(const ControllableCleanupDelegate&) = delete; |
| ControllableCleanupDelegate& operator=(const ControllableCleanupDelegate&) = |
| delete; |
| ~ControllableCleanupDelegate() override { controls_->destroyed_.Signal(); } |
| |
| RegisteredTaskSource GetWork(WorkerThread* worker) override { |
| EXPECT_TRUE(controls_->expect_get_work_); |
| |
| // Sends one item of work to signal |work_processed_|. On subsequent calls, |
| // sends nullptr to indicate there's no more work to be done. |
| if (controls_->work_requested_) { |
| if (CanCleanup(worker)) { |
| OnCleanup(); |
| worker->Cleanup(); |
| controls_->set_expect_get_work(false); |
| } |
| return nullptr; |
| } |
| |
| controls_->work_requested_ = true; |
| scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>( |
| TaskTraits(WithBaseSyncPrimitives(), |
| TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN), |
| nullptr, TaskSourceExecutionMode::kParallel); |
| Task task(FROM_HERE, |
| BindOnce( |
| [](TestWaitableEvent* work_processed, |
| TestWaitableEvent* work_running) { |
| work_processed->Signal(); |
| work_running->Wait(); |
| }, |
| Unretained(&controls_->work_processed_), |
| Unretained(&controls_->work_running_)), |
| TimeTicks::Now(), TimeDelta()); |
| auto transaction = sequence->BeginTransaction(); |
| transaction.WillPushImmediateTask(); |
| EXPECT_TRUE( |
| task_tracker_->WillPostTask(&task, sequence->shutdown_behavior())); |
| transaction.PushImmediateTask(std::move(task)); |
| auto registered_task_source = |
| task_tracker_->RegisterTaskSource(std::move(sequence)); |
| EXPECT_TRUE(registered_task_source); |
| registered_task_source.WillRunTask(); |
| return registered_task_source; |
| } |
| |
| RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source, WorkerThread* worker) override { |
| return GetWork(worker); |
| } |
| |
| void OnMainExit(WorkerThread* worker) override { |
| controls_->exited_.Signal(); |
| } |
| |
| bool CanCleanup(WorkerThread* worker) { |
| // Saving |can_cleanup_| now so that callers waiting on |cleanup_requested_| |
| // have the thread go to sleep and then allow timing out. |
| bool can_cleanup = controls_->can_cleanup_; |
| controls_->cleanup_requested_.Signal(); |
| return can_cleanup; |
| } |
| |
| void OnCleanup() { |
| EXPECT_TRUE(controls_->can_cleanup_); |
| EXPECT_TRUE(controls_->cleanup_requested_.IsSignaled()); |
| } |
| |
| // ControllableCleanupDelegate: |
| scoped_refptr<Controls> controls() { return controls_; } |
| |
| private: |
| scoped_refptr<Sequence> work_sequence_; |
| const raw_ptr<TaskTracker> task_tracker_; |
| scoped_refptr<Controls> controls_; |
| }; |
| |
| class MockedControllableCleanupDelegate : public ControllableCleanupDelegate { |
| public: |
| explicit MockedControllableCleanupDelegate(TaskTracker* task_tracker) |
| : ControllableCleanupDelegate(task_tracker) {} |
| MockedControllableCleanupDelegate(const MockedControllableCleanupDelegate&) = |
| delete; |
| MockedControllableCleanupDelegate& operator=( |
| const MockedControllableCleanupDelegate&) = delete; |
| ~MockedControllableCleanupDelegate() override = default; |
| |
| // WorkerThread::Delegate: |
| MOCK_METHOD1(OnMainEntry, void(WorkerThread* worker)); |
| }; |
| |
| } // namespace |
| |
| // Verify that calling WorkerThread::Cleanup() from GetWork() causes |
| // the WorkerThread's thread to exit. |
| TEST(ThreadPoolWorkerTest, WorkerCleanupFromGetWork) { |
| Thread service_thread = Thread("ServiceThread"); |
| Thread::Options service_thread_options; |
| service_thread_options.message_pump_type = MessagePumpType::IO; |
| service_thread.StartWithOptions(std::move(service_thread_options)); |
| TaskTracker task_tracker; |
| // Will be owned by WorkerThread. |
| MockedControllableCleanupDelegate* delegate = |
| new StrictMock<MockedControllableCleanupDelegate>(&task_tracker); |
| scoped_refptr<ControllableCleanupDelegate::Controls> controls = |
| delegate->controls(); |
| controls->set_can_cleanup(true); |
| EXPECT_CALL(*delegate, OnMainEntry(_)); |
| auto worker = |
| MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, WrapUnique(delegate), |
| task_tracker.GetTrackedRef(), 0); |
| worker->Start(service_thread.task_runner()); |
| worker->WakeUp(); |
| controls->WaitForWorkToRun(); |
| Mock::VerifyAndClear(delegate); |
| controls->WaitForMainExit(); |
| // Join the worker to avoid leaks. |
| worker->JoinForTesting(); |
| } |
| |
| TEST(ThreadPoolWorkerTest, WorkerCleanupDuringWork) { |
| Thread service_thread = Thread("ServiceThread"); |
| Thread::Options service_thread_options; |
| service_thread_options.message_pump_type = MessagePumpType::IO; |
| service_thread.StartWithOptions(std::move(service_thread_options)); |
| TaskTracker task_tracker; |
| // Will be owned by WorkerThread. |
| // No mock here as that's reasonably covered by other tests and the delegate |
| // may destroy on a different thread. Mocks aren't designed with that in mind. |
| std::unique_ptr<ControllableCleanupDelegate> delegate = |
| std::make_unique<ControllableCleanupDelegate>(&task_tracker); |
| scoped_refptr<ControllableCleanupDelegate::Controls> controls = |
| delegate->controls(); |
| |
| controls->HaveWorkBlock(); |
| |
| auto worker = |
| MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate), |
| task_tracker.GetTrackedRef(), 0); |
| worker->Start(service_thread.task_runner()); |
| worker->WakeUp(); |
| |
| controls->WaitForWorkToRun(); |
| worker->Cleanup(); |
| worker = nullptr; |
| controls->UnblockWork(); |
| controls->WaitForDelegateDestroy(); |
| } |
| |
| TEST(ThreadPoolWorkerTest, WorkerCleanupDuringWait) { |
| Thread service_thread = Thread("ServiceThread"); |
| Thread::Options service_thread_options; |
| service_thread_options.message_pump_type = MessagePumpType::IO; |
| service_thread.StartWithOptions(std::move(service_thread_options)); |
| TaskTracker task_tracker; |
| // Will be owned by WorkerThread. |
| // No mock here as that's reasonably covered by other tests and the delegate |
| // may destroy on a different thread. Mocks aren't designed with that in mind. |
| std::unique_ptr<ControllableCleanupDelegate> delegate = |
| std::make_unique<ControllableCleanupDelegate>(&task_tracker); |
| scoped_refptr<ControllableCleanupDelegate::Controls> controls = |
| delegate->controls(); |
| |
| auto worker = |
| MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate), |
| task_tracker.GetTrackedRef(), 0); |
| worker->Start(service_thread.task_runner()); |
| worker->WakeUp(); |
| |
| controls->WaitForCleanupRequest(); |
| worker->Cleanup(); |
| worker = nullptr; |
| controls->WaitForDelegateDestroy(); |
| } |
| |
| TEST(ThreadPoolWorkerTest, WorkerCleanupDuringShutdown) { |
| TaskTracker task_tracker; |
| Thread service_thread = Thread("ServiceThread"); |
| Thread::Options service_thread_options; |
| service_thread_options.message_pump_type = MessagePumpType::IO; |
| service_thread.StartWithOptions(std::move(service_thread_options)); |
| // Will be owned by WorkerThread. |
| // No mock here as that's reasonably covered by other tests and the delegate |
| // may destroy on a different thread. Mocks aren't designed with that in mind. |
| std::unique_ptr<ControllableCleanupDelegate> delegate = |
| std::make_unique<ControllableCleanupDelegate>(&task_tracker); |
| scoped_refptr<ControllableCleanupDelegate::Controls> controls = |
| delegate->controls(); |
| |
| controls->HaveWorkBlock(); |
| |
| auto worker = |
| MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate), |
| task_tracker.GetTrackedRef(), 0); |
| worker->Start(service_thread.task_runner()); |
| worker->WakeUp(); |
| |
| controls->WaitForWorkToRun(); |
| test::ShutdownTaskTracker(&task_tracker); |
| worker->Cleanup(); |
| worker = nullptr; |
| controls->UnblockWork(); |
| controls->WaitForDelegateDestroy(); |
| } |
| |
| // Verify that Start() is a no-op after Cleanup(). |
| TEST(ThreadPoolWorkerTest, CleanupBeforeStart) { |
| Thread service_thread = Thread("ServiceThread"); |
| Thread::Options service_thread_options; |
| service_thread_options.message_pump_type = MessagePumpType::IO; |
| service_thread.StartWithOptions(std::move(service_thread_options)); |
| TaskTracker task_tracker; |
| // Will be owned by WorkerThread. |
| // No mock here as that's reasonably covered by other tests and the delegate |
| // may destroy on a different thread. Mocks aren't designed with that in mind. |
| std::unique_ptr<ControllableCleanupDelegate> delegate = |
| std::make_unique<ControllableCleanupDelegate>(&task_tracker); |
| scoped_refptr<ControllableCleanupDelegate::Controls> controls = |
| delegate->controls(); |
| controls->set_expect_get_work(false); |
| |
| auto worker = |
| MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate), |
| task_tracker.GetTrackedRef(), 0); |
| |
| worker->Cleanup(); |
| worker->Start(service_thread.task_runner()); |
| |
| EXPECT_FALSE(worker->ThreadAliveForTesting()); |
| } |
| |
| namespace { |
| |
| class CallJoinFromDifferentThread : public SimpleThread { |
| public: |
| explicit CallJoinFromDifferentThread(WorkerThreadWaitableEvent* worker_to_join) |
| : SimpleThread("WorkerThreadJoinThread"), |
| worker_to_join_(worker_to_join) {} |
| |
| CallJoinFromDifferentThread(const CallJoinFromDifferentThread&) = delete; |
| CallJoinFromDifferentThread& operator=(const CallJoinFromDifferentThread&) = |
| delete; |
| ~CallJoinFromDifferentThread() override = default; |
| |
| void Run() override { |
| run_started_event_.Signal(); |
| worker_to_join_.ExtractAsDangling()->JoinForTesting(); |
| } |
| |
| void WaitForRunToStart() { run_started_event_.Wait(); } |
| |
| private: |
| raw_ptr<WorkerThreadWaitableEvent> worker_to_join_; |
| TestWaitableEvent run_started_event_; |
| }; |
| |
| } // namespace |
| |
| TEST(ThreadPoolWorkerTest, WorkerCleanupDuringJoin) { |
| Thread service_thread = Thread("ServiceThread"); |
| Thread::Options service_thread_options; |
| service_thread_options.message_pump_type = MessagePumpType::IO; |
| service_thread.StartWithOptions(std::move(service_thread_options)); |
| TaskTracker task_tracker; |
| // Will be owned by WorkerThread. |
| // No mock here as that's reasonably covered by other tests and the |
| // delegate may destroy on a different thread. Mocks aren't designed with that |
| // in mind. |
| std::unique_ptr<ControllableCleanupDelegate> delegate = |
| std::make_unique<ControllableCleanupDelegate>(&task_tracker); |
| scoped_refptr<ControllableCleanupDelegate::Controls> controls = |
| delegate->controls(); |
| |
| controls->HaveWorkBlock(); |
| |
| auto worker = |
| MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate), |
| task_tracker.GetTrackedRef(), 0); |
| worker->Start(service_thread.task_runner()); |
| worker->WakeUp(); |
| |
| controls->WaitForWorkToRun(); |
| CallJoinFromDifferentThread join_from_different_thread(worker.get()); |
| join_from_different_thread.Start(); |
| join_from_different_thread.WaitForRunToStart(); |
| // Sleep here to give the other thread a chance to call JoinForTesting(). |
| // Receiving a signal that Run() was called doesn't mean JoinForTesting() was |
| // necessarily called, and we can't signal after JoinForTesting() as |
| // JoinForTesting() blocks until we call UnblockWork(). |
| PlatformThread::Sleep(TestTimeouts::tiny_timeout()); |
| worker->Cleanup(); |
| worker = nullptr; |
| controls->UnblockWork(); |
| controls->WaitForDelegateDestroy(); |
| join_from_different_thread.Join(); |
| } |
| |
| namespace { |
| |
| class ExpectThreadTypeDelegate : public WorkerThreadDefaultDelegate { |
| public: |
| ExpectThreadTypeDelegate() |
| : thread_type_verified_in_get_work_event_( |
| WaitableEvent::ResetPolicy::AUTOMATIC) {} |
| ExpectThreadTypeDelegate(const ExpectThreadTypeDelegate&) = delete; |
| ExpectThreadTypeDelegate& operator=(const ExpectThreadTypeDelegate&) = delete; |
| |
| void SetExpectedThreadType(ThreadType expected_thread_type) { |
| expected_thread_type_ = expected_thread_type; |
| } |
| |
| void WaitForThreadTypeVerifiedInGetWork() { |
| thread_type_verified_in_get_work_event_.Wait(); |
| } |
| |
| // WorkerThread::Delegate: |
| void OnMainEntry(WorkerThread* worker) override { VerifyThreadType(); } |
| RegisteredTaskSource GetWork(WorkerThread* worker) override { |
| VerifyThreadType(); |
| thread_type_verified_in_get_work_event_.Signal(); |
| return nullptr; |
| } |
| |
| private: |
| void VerifyThreadType() { |
| CheckedAutoLock auto_lock(expected_thread_type_lock_); |
| EXPECT_EQ(expected_thread_type_, PlatformThread::GetCurrentThreadType()); |
| } |
| |
| // Signaled after GetWork() has verified the thread type of the worker thread. |
| TestWaitableEvent thread_type_verified_in_get_work_event_; |
| |
| // Synchronizes access to |expected_thread_type_|. |
| CheckedLock expected_thread_type_lock_; |
| |
| // Expected thread type for the next call to OnMainEntry() or GetWork(). |
| ThreadType expected_thread_type_ = ThreadType::kDefault; |
| }; |
| |
| } // namespace |
| |
| TEST(ThreadPoolWorkerTest, BumpThreadTypeOfAliveThreadDuringShutdown) { |
| if (!CanUseBackgroundThreadTypeForWorkerThread()) |
| return; |
| |
| TaskTracker task_tracker; |
| Thread service_thread = Thread("ServiceThread"); |
| Thread::Options service_thread_options; |
| service_thread_options.message_pump_type = MessagePumpType::IO; |
| service_thread.StartWithOptions(std::move(service_thread_options)); |
| |
| // Block shutdown to ensure that the worker doesn't exit when StartShutdown() |
| // is called. |
| scoped_refptr<Sequence> sequence = |
| MakeRefCounted<Sequence>(TaskTraits{TaskShutdownBehavior::BLOCK_SHUTDOWN}, |
| nullptr, TaskSourceExecutionMode::kParallel); |
| auto registered_task_source = |
| task_tracker.RegisterTaskSource(std::move(sequence)); |
| |
| std::unique_ptr<ExpectThreadTypeDelegate> delegate( |
| new ExpectThreadTypeDelegate); |
| ExpectThreadTypeDelegate* delegate_raw = delegate.get(); |
| delegate_raw->SetExpectedThreadType(ThreadType::kBackground); |
| auto worker = |
| MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kBackground, std::move(delegate), |
| task_tracker.GetTrackedRef(), 0); |
| worker->Start(service_thread.task_runner()); |
| |
| // Verify that the initial thread type is kBackground (or kNormal if thread |
| // type can't be increased). |
| worker->WakeUp(); |
| delegate_raw->WaitForThreadTypeVerifiedInGetWork(); |
| |
| // Verify that the thread type is bumped to kNormal during shutdown. |
| delegate_raw->SetExpectedThreadType(ThreadType::kDefault); |
| task_tracker.StartShutdown(); |
| worker->WakeUp(); |
| delegate_raw->WaitForThreadTypeVerifiedInGetWork(); |
| |
| worker->JoinForTesting(); |
| } |
| |
| namespace { |
| |
| class VerifyCallsToObserverDelegate : public WorkerThreadDefaultDelegate { |
| public: |
| explicit VerifyCallsToObserverDelegate( |
| test::MockWorkerThreadObserver* observer) |
| : observer_(observer) {} |
| VerifyCallsToObserverDelegate(const VerifyCallsToObserverDelegate&) = delete; |
| VerifyCallsToObserverDelegate& operator=( |
| const VerifyCallsToObserverDelegate&) = delete; |
| |
| // WorkerThread::Delegate: |
| void OnMainEntry(WorkerThread* worker) override { |
| Mock::VerifyAndClear(observer_); |
| } |
| |
| void OnMainExit(WorkerThread* worker) override { |
| observer_->AllowCallsOnMainExit(1); |
| } |
| |
| private: |
| const raw_ptr<test::MockWorkerThreadObserver> observer_; |
| }; |
| |
| } // namespace |
| |
| // Verify that the WorkerThreadObserver is notified when the worker enters |
| // and exits its main function. |
| TEST(ThreadPoolWorkerTest, WorkerThreadObserver) { |
| StrictMock<test::MockWorkerThreadObserver> observer; |
| TaskTracker task_tracker; |
| Thread service_thread = Thread("ServiceThread"); |
| Thread::Options service_thread_options; |
| service_thread_options.message_pump_type = MessagePumpType::IO; |
| service_thread.StartWithOptions(std::move(service_thread_options)); |
| auto delegate = std::make_unique<VerifyCallsToObserverDelegate>(&observer); |
| auto worker = |
| MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate), |
| task_tracker.GetTrackedRef(), 0); |
| EXPECT_CALL(observer, OnWorkerThreadMainEntry()); |
| worker->Start(service_thread.task_runner(), &observer); |
| worker->Cleanup(); |
| // Join the worker to avoid leaks. |
| worker->JoinForTesting(); |
| Mock::VerifyAndClear(&observer); |
| } |
| |
| #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \ |
| PA_CONFIG(THREAD_CACHE_SUPPORTED) |
| namespace { |
| NOINLINE void FreeForTest(void* data) { |
| free(data); |
| } |
| } // namespace |
| |
| class WorkerThreadThreadCacheDelegate : public WorkerThreadDefaultDelegate { |
| public: |
| void WaitForWork() override { |
| // Fill several buckets before going to sleep. |
| for (size_t size = 8; |
| size < partition_alloc::ThreadCache::kDefaultSizeThreshold; size++) { |
| void* data = malloc(size); |
| // A simple malloc() / free() pair can be discarded by the compiler (and |
| // is), making the test fail. It is sufficient to make |FreeForTest()| a |
| // NOINLINE function for the call to not be eliminated, but it is |
| // required. |
| FreeForTest(data); |
| } |
| |
| size_t cached_memory_before = |
| partition_alloc::ThreadCache::Get()->CachedMemory(); |
| WorkerThreadDefaultDelegate::WaitForWork(); |
| size_t cached_memory_after = |
| partition_alloc::ThreadCache::Get()->CachedMemory(); |
| |
| if (!first_wakeup_done_) { |
| // First time we sleep is a short sleep, no cache purging. |
| // |
| // Here and below, cannot assert on exact thread cache size, since |
| // anything that allocates will make it fluctuate. |
| EXPECT_GT(cached_memory_after, cached_memory_before / 2); |
| first_wakeup_done_.store(true, std::memory_order_release); |
| } else { |
| // Second one is long, should purge. |
| EXPECT_LT(cached_memory_after, cached_memory_before / 2); |
| } |
| } |
| |
| std::atomic<bool> first_wakeup_done_{false}; |
| }; |
| |
| // TODO(crbug.com/1469364): Re-enable this test on Mac. |
| #if BUILDFLAG(IS_MAC) |
| #define MAYBE_Purge DISABLED_Purge |
| #else |
| #define MAYBE_Purge Purge |
| #endif |
| TEST(ThreadPoolWorkerThreadCachePurgeTest, MAYBE_Purge) { |
| // Make sure the thread cache is enabled in the main partition. |
| partition_alloc::internal::ThreadCacheProcessScopeForTesting scope( |
| allocator_shim::internal::PartitionAllocMalloc::Allocator()); |
| |
| Thread service_thread = Thread("ServiceThread"); |
| Thread::Options service_thread_options; |
| service_thread_options.message_pump_type = MessagePumpType::IO; |
| service_thread.StartWithOptions(std::move(service_thread_options)); |
| TaskTracker task_tracker; |
| auto delegate = std::make_unique<WorkerThreadThreadCacheDelegate>(); |
| auto* delegate_raw = delegate.get(); |
| auto worker = |
| MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate), |
| task_tracker.GetTrackedRef(), 0); |
| // Wake up before the thread is started to make sure the first sleep is short. |
| worker->WakeUp(); |
| worker->Start(service_thread.task_runner(), nullptr); |
| |
| while (delegate_raw->first_wakeup_done_.load(std::memory_order_acquire)) { |
| } |
| |
| // Have to use real sleep unfortunately rather than virtual time, because |
| // WaitableEvent uses the non-overridable variant of TimeTicks. |
| PlatformThread::Sleep(1.1 * |
| WorkerThread::Delegate::kPurgeThreadCacheIdleDelay); |
| worker->JoinForTesting(); |
| } |
| |
| #endif // BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && |
| // PA_CONFIG(THREAD_CACHE_SUPPORTED) |
| |
| } // namespace internal |
| } // namespace base |