V8 API Reference, 7.2.502.16 (for Deno 0.2.4)
item-parallel-job.h
1 // Copyright 2017 the V8 project authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef V8_HEAP_ITEM_PARALLEL_JOB_H_
6 #define V8_HEAP_ITEM_PARALLEL_JOB_H_
7 
8 #include <memory>
9 #include <vector>
10 
11 #include "src/base/atomic-utils.h"
12 #include "src/base/logging.h"
13 #include "src/base/macros.h"
14 #include "src/base/optional.h"
15 #include "src/cancelable-task.h"
16 #include "src/counters.h"
17 #include "src/globals.h"
18 
19 namespace v8 {
20 
21 namespace base {
22 class Semaphore;
23 }
24 
25 namespace internal {
26 
27 class Counters;
28 class Isolate;
29 
30 // This class manages background tasks that process a set of items in parallel.
31 // The first task added is executed on the same thread as |job.Run()| is called.
32 // All other tasks are scheduled in the background.
33 //
34 // - Items need to inherit from ItemParallelJob::Item.
35 // - Tasks need to inherit from ItemParallelJob::Task.
36 //
37 // Items need to be marked as finished after processing them. Task and Item
38 // ownership is transferred to the job.
39 //
40 // Each parallel (non-main thread) task will report the time between the job
41 // being created and it being scheduled to |gc_parallel_task_latency_histogram|.
42 class V8_EXPORT_PRIVATE ItemParallelJob {
43  public:
44  class Task;
45 
46  class V8_EXPORT_PRIVATE Item {
47  public:
48  Item() = default;
49  virtual ~Item() = default;
50 
51  // Marks an item as being finished.
52  void MarkFinished() { CHECK_EQ(kProcessing, state_.exchange(kFinished)); }
53 
54  private:
55  enum ProcessingState : uintptr_t { kAvailable, kProcessing, kFinished };
56 
57  bool TryMarkingAsProcessing() {
58  ProcessingState available = kAvailable;
59  return state_.compare_exchange_strong(available, kProcessing);
60  }
61  bool IsFinished() { return state_ == kFinished; }
62 
63  std::atomic<ProcessingState> state_{kAvailable};
64 
65  friend class ItemParallelJob;
66  friend class ItemParallelJob::Task;
67 
68  DISALLOW_COPY_AND_ASSIGN(Item);
69  };
70 
71  class V8_EXPORT_PRIVATE Task : public CancelableTask {
72  public:
73  explicit Task(Isolate* isolate);
74  ~Task() override;
75 
76  virtual void RunInParallel() = 0;
77 
78  protected:
79  // Retrieves a new item that needs to be processed. Returns |nullptr| if
80  // all items are processed. Upon returning an item, the task is required
81  // to process the item and mark the item as finished after doing so.
82  template <class ItemType>
83  ItemType* GetItem() {
84  while (items_considered_++ != items_->size()) {
85  // Wrap around.
86  if (cur_index_ == items_->size()) {
87  cur_index_ = 0;
88  }
89  Item* item = (*items_)[cur_index_++];
90  if (item->TryMarkingAsProcessing()) {
91  return static_cast<ItemType*>(item);
92  }
93  }
94  return nullptr;
95  }
96 
97  private:
98  friend class ItemParallelJob;
99  friend class Item;
100 
101  // Sets up state required before invoking Run(). If
102  // |start_index is >= items_.size()|, this task will not process work items
103  // (some jobs have more tasks than work items in order to parallelize post-
104  // processing, e.g. scavenging). If |gc_parallel_task_latency_histogram| is
105  // provided, it will be used to report histograms on the latency between
106  // posting the task and it being scheduled.
107  void SetupInternal(
108  base::Semaphore* on_finish, std::vector<Item*>* items,
109  size_t start_index,
110  base::Optional<AsyncTimedHistogram> gc_parallel_task_latency_histogram);
111 
112  // We don't allow overriding this method any further.
113  void RunInternal() final;
114 
115  std::vector<Item*>* items_ = nullptr;
116  size_t cur_index_ = 0;
117  size_t items_considered_ = 0;
118  base::Semaphore* on_finish_ = nullptr;
119  base::Optional<AsyncTimedHistogram> gc_parallel_task_latency_histogram_;
120 
121  DISALLOW_COPY_AND_ASSIGN(Task);
122  };
123 
124  ItemParallelJob(CancelableTaskManager* cancelable_task_manager,
125  base::Semaphore* pending_tasks);
126 
127  ~ItemParallelJob();
128 
129  // Adds a task to the job. Transfers ownership to the job.
130  void AddTask(Task* task) { tasks_.push_back(std::unique_ptr<Task>(task)); }
131 
132  // Adds an item to the job. Transfers ownership to the job.
133  void AddItem(Item* item) { items_.push_back(item); }
134 
135  int NumberOfItems() const { return static_cast<int>(items_.size()); }
136  int NumberOfTasks() const { return static_cast<int>(tasks_.size()); }
137 
138  // Runs this job. Reporting metrics in a thread-safe manner to
139  // |async_counters|.
140  void Run(const std::shared_ptr<Counters>& async_counters);
141 
142  private:
143  std::vector<Item*> items_;
144  std::vector<std::unique_ptr<Task>> tasks_;
145  CancelableTaskManager* cancelable_task_manager_;
146  base::Semaphore* pending_tasks_;
147  DISALLOW_COPY_AND_ASSIGN(ItemParallelJob);
148 };
149 
150 } // namespace internal
151 } // namespace v8
152 
153 #endif // V8_HEAP_ITEM_PARALLEL_JOB_H_
Definition: libplatform.h:13