5 #include "src/heap/item-parallel-job.h" 7 #include "src/base/platform/semaphore.h" 13 ItemParallelJob::Task::Task(Isolate* isolate) : CancelableTask(isolate) {}
15 ItemParallelJob::Task::~Task() {
18 if (gc_parallel_task_latency_histogram_)
19 gc_parallel_task_latency_histogram_->RecordAbandon();
22 void ItemParallelJob::Task::SetupInternal(
23 base::Semaphore* on_finish, std::vector<Item*>* items,
size_t start_index,
24 base::Optional<AsyncTimedHistogram> gc_parallel_task_latency_histogram) {
25 on_finish_ = on_finish;
28 if (start_index < items->size()) {
29 cur_index_ = start_index;
31 items_considered_ = items_->size();
34 gc_parallel_task_latency_histogram_ =
35 std::move(gc_parallel_task_latency_histogram);
38 void ItemParallelJob::Task::RunInternal() {
39 if (gc_parallel_task_latency_histogram_) {
40 gc_parallel_task_latency_histogram_->RecordDone();
41 gc_parallel_task_latency_histogram_.reset();
48 ItemParallelJob::ItemParallelJob(CancelableTaskManager* cancelable_task_manager,
49 base::Semaphore* pending_tasks)
50 : cancelable_task_manager_(cancelable_task_manager),
51 pending_tasks_(pending_tasks) {}
53 ItemParallelJob::~ItemParallelJob() {
54 for (
size_t i = 0;
i < items_.size();
i++) {
55 Item* item = items_[
i];
56 CHECK(item->IsFinished());
61 void ItemParallelJob::Run(
const std::shared_ptr<Counters>& async_counters) {
62 DCHECK_GT(tasks_.size(), 0);
63 const size_t num_items = items_.size();
64 const size_t num_tasks = tasks_.size();
66 TRACE_EVENT_INSTANT2(TRACE_DISABLED_BY_DEFAULT(
"v8.gc"),
67 "ItemParallelJob::Run", TRACE_EVENT_SCOPE_THREAD,
68 "num_tasks", static_cast<int>(num_tasks),
"num_items",
69 static_cast<int>(num_items));
71 AsyncTimedHistogram gc_parallel_task_latency_histogram(
72 async_counters->gc_parallel_task_latency(), async_counters);
79 const size_t num_tasks_processing_items = Min(num_items, tasks_.size());
83 const size_t items_remainder = num_tasks_processing_items > 0
84 ? num_items % num_tasks_processing_items
88 const size_t items_per_task = num_tasks_processing_items > 0
89 ? num_items / num_tasks_processing_items
91 CancelableTaskManager::Id* task_ids =
92 new CancelableTaskManager::Id[num_tasks];
93 std::unique_ptr<Task> main_task;
94 for (
size_t i = 0, start_index = 0;
i < num_tasks;
95 i++, start_index += items_per_task + (
i < items_remainder ? 1 : 0)) {
96 auto task = std::move(tasks_[
i]);
102 DCHECK_IMPLIES(start_index >= num_items,
i >= num_tasks_processing_items);
104 task->SetupInternal(pending_tasks_, &items_, start_index,
105 i > 0 ? gc_parallel_task_latency_histogram
106 : base::Optional<AsyncTimedHistogram>());
107 task_ids[
i] = task->id();
109 V8::GetCurrentPlatform()->CallBlockingTaskOnWorkerThread(std::move(task));
111 main_task = std::move(task);
120 for (
size_t i = 0;
i < num_tasks;
i++) {
121 if (cancelable_task_manager_->TryAbort(task_ids[
i]) !=
122 TryAbortResult::kTaskAborted) {
123 pending_tasks_->Wait();