2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/Baton.h>
20 #include <folly/Function.h>
21 #include <folly/IndexedMemPool.h>
22 #include <folly/Portability.h>
23 #include <folly/detail/CacheLocality.h>
31 /// Flat combining (FC) was introduced in the SPAA 2010 paper Flat
32 /// Combining and the Synchronization-Parallelism Tradeoff, by Danny
33 /// Hendler, Itai Incze, Nir Shavit, and Moran Tzafrir.
34 /// http://mcg.cs.tau.ac.il/projects/projects/flat-combining
36 /// FC is an alternative to coarse-grained locking for making
37 /// sequential data structures thread-safe while minimizing the
38 /// synchroniation overheads and cache coherence traffic associated
41 /// Under FC, when a thread finds the lock contended, it can
42 /// request (using a request record) that the lock holder execute its
43 /// operation on the shared data structure. There can be a designated
44 /// combiner thread or any thread can act as the combiner when it
47 /// Potential advantages of FC include:
48 /// - Reduced cache coherence traffic
49 /// - Reduced synchronization overheads, as the overheads of releasing
50 /// and acquiring the lock are eliminated from the critical path of
51 /// operating on the data structure.
52 /// - Opportunities for smart combining, where executing multiple
53 /// operations together may take less time than executng the
54 /// operations separately, e.g., K delete_min operations on a
55 /// priority queue may be combined to take O(K + log N) time instead
58 /// This implementation of flat combining supports:
60 /// - A simple interface that requires minimal extra code by the
61 /// user. To use this interface efficiently the user-provided
62 /// functions must be copyable to folly::Functio without dynamic
63 /// allocation. If this is impossible or inconvenient, the user is
64 /// encouraged to use the custom interface described below.
65 /// - A custom interface that supports custom combinining and custom
66 /// request structure, either for the sake of smart combining or for
67 /// efficiently supporting operations that are not be copyable to
68 /// folly::Function without synamic allocation.
69 /// - Both synchronous and asynchronous operations.
70 /// - Request records with and without thread-caching.
71 /// - Combining with and without a dedicated combiner thread.
73 /// This implementation differs from the algorithm in the SPAA 2010 paper:
74 /// - It does not require thread caching of request records
75 /// - It supports a dedicated combiner
76 /// - It supports asynchronous operations
78 /// The generic FC class template supports generic data structures and
79 /// utilities with arbitrary operations. The template supports static
80 /// polymorphism for the combining function to enable custom smart
83 /// A simple example of using the FC template:
84 /// class ConcurrentFoo : public FlatCombining<ConcurrentFoo> {
85 /// Foo foo_; // sequential data structure
87 /// T bar(V v) { // thread-safe execution of foo_.bar(v)
89 /// // Note: fn must be copyable to folly::Function without dynamic
90 /// // allocation. Otherwise, it is recommended to use the custom
91 /// // interface and manage the function arguments and results
92 /// // explicitly in a custom request structure.
93 /// auto fn = [&] { result = foo_.bar(v); };
94 /// this->requestFC(fn);
99 /// See test/FlatCombiningExamples.h for more examples. See the
100 /// comments for requestFC() below for a list of simple and custom
101 /// variants of that function.
104 typename T, // concurrent data structure using FC interface
105 typename Mutex = std::mutex,
106 template <typename> class Atom = std::atomic,
107 typename Req = /* default dummy type */ bool>
108 class FlatCombining {
109 using SavedFn = folly::Function<void()>;
112 /// Combining request record.
114 FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
115 folly::Baton<Atom, true, false> valid_;
116 folly::Baton<Atom, true, false> done_;
117 folly::Baton<Atom, true, false> disconnected_;
138 bool isValid() const {
139 return valid_.try_wait();
150 bool isDone() const {
151 return done_.try_wait();
158 void setDisconnected() {
159 disconnected_.post();
162 void clearDisconnected() {
163 disconnected_.reset();
166 bool isDisconnected() const {
167 return disconnected_.try_wait();
170 void setIndex(const size_t index) {
174 size_t getIndex() const {
178 void setNext(const size_t next) {
182 size_t getNext() const {
186 void setLast(const uint64_t pass) {
190 uint64_t getLast() const {
198 template <typename Func>
199 void setFn(Func&& fn) {
201 std::is_nothrow_constructible<
202 folly::Function<void()>,
203 _t<std::decay<Func>>>::value,
204 "Try using a smaller function object that can fit in folly::Function "
205 "without allocation, or use the custom interface of requestFC() to "
206 "manage the requested function's arguments and results explicitly "
207 "in a custom request structure without allocation.");
208 fn_ = std::forward<Func>(fn);
228 using Pool = folly::IndexedMemPool<Rec, 32, 4, Atom, false, false>;
231 /// The constructor takes three optional arguments:
232 /// - Optional dedicated combiner thread (default true)
233 /// - Number of records (if 0, then kDefaultNumRecs)
234 /// - A hint for the max. number of combined operations per
235 /// combining session that is checked at the beginning of each pass
236 /// on the request records (if 0, then kDefaultMaxops)
237 explicit FlatCombining(
238 const bool dedicated = true,
239 uint32_t numRecs = 0, // number of combining records
240 const uint32_t maxOps = 0 // hint of max ops per combining session
242 : numRecs_(numRecs == 0 ? kDefaultNumRecs : numRecs),
243 maxOps_(maxOps == 0 ? kDefaultMaxOps : maxOps),
245 dedicated_(dedicated),
246 recsPool_(numRecs_) {
248 // dedicated combiner thread
249 combiner_ = std::thread([this] { dedicatedCombining(); });
253 /// Destructor: If there is a dedicated combiner, the destructor
254 /// flags it to shutdown. Otherwise, the destructor waits for all
255 /// pending asynchronous requests to be completed.
265 // Wait for all pending operations to complete. Useful primarily
266 // when there are asynchronous operations without a dedicated
269 for (size_t i = getRecsHead(); i != NULL_INDEX; i = nextIndex(i)) {
270 Rec& rec = recsPool_[i];
275 // Give the caller exclusive access.
276 void acquireExclusive() {
280 // Give the caller exclusive access through a lock holder.
281 // No need for explicit release.
282 template <typename LockHolder>
283 void acquireExclusive(LockHolder& l) {
287 // Try to give the caller exclusive access. Returns true iff successful.
288 bool tryExclusive() {
289 return m_.try_lock();
292 // Release exclusive access. The caller must have exclusive access.
293 void releaseExclusive() {
297 // Execute an operation without combining
298 template <typename OpFunc>
299 void requestNoFC(OpFunc& opFn) {
300 std::lock_guard<Mutex> guard(m_);
304 // This function first tries to execute the operation without
305 // combining. If unuccessful, it allocates a combining record if
306 // needed. If there are no available records, it waits for exclusive
307 // access and executes the operation. If a record is available and
308 // ready for use, it fills the record and indicates that the request
309 // is valid for combining. If the request is synchronous (by default
310 // or necessity), it waits for the operation to be completed by a
311 // combiner and optionally extracts the result, if any.
313 // This function can be called in several forms:
314 // Simple forms that do not require the user to define a Req structure
315 // or to override any request processing member functions:
317 // requestFC(opFn, rec) // provides its own pre-allocated record
318 // requestFC(opFn, rec, syncop) // asynchronous if syncop == false
319 // Custom forms that require the user to define a Req structure and to
320 // override some request processing member functions:
321 // requestFC(opFn, fillFn)
322 // requestFC(opFn, fillFn, rec)
323 // requestFC(opFn, fillFn, rec, syncop)
324 // requestFC(opFn, fillFn, resFn)
325 // requestFC(opFn, fillFn, resFn, rec)
326 template <typename OpFunc>
327 void requestFC(OpFunc&& opFn, Rec* rec = nullptr, bool syncop = true) {
328 auto dummy = [](Req&) {};
330 std::forward<OpFunc>(opFn),
337 template <typename OpFunc, typename FillFunc>
340 const FillFunc& fillFn,
342 bool syncop = true) {
343 auto dummy = [](Req&) {};
345 std::forward<OpFunc>(opFn),
352 template <typename OpFunc, typename FillFunc, typename ResFn>
355 const FillFunc& fillFn,
357 Rec* rec = nullptr) {
358 // must wait for result to execute resFn -- so it must be synchronous
360 std::forward<OpFunc>(opFn),
368 // Allocate a record.
370 auto idx = recsPool_.allocIndex();
371 if (idx == NULL_INDEX) {
372 outOfSpaceCount_.fetch_add(1);
375 Rec& rec = recsPool_[idx];
381 void freeRec(Rec* rec) {
382 if (rec == nullptr) {
385 auto idx = rec->getIndex();
386 recsPool_.recycleIndex(idx);
389 // Returns a count of the number of combined operations so far.
390 uint64_t getCombinedOpCount() {
391 std::lock_guard<Mutex> guard(m_);
395 // Returns a count of the number of combining passes so far.
396 uint64_t getCombiningPasses() {
397 std::lock_guard<Mutex> guard(m_);
401 uint64_t getOutOfSpaceCount() {
402 return outOfSpaceCount_.load();
406 const size_t NULL_INDEX = 0;
407 const uint32_t kDefaultMaxOps = 100;
408 const uint64_t kDefaultNumRecs = 64;
409 const uint64_t kIdleThreshold = 10;
411 FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
414 FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
415 folly::Baton<Atom, false, true> pending_;
416 Atom<bool> shutdown_{false};
418 FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
423 std::thread combiner_;
426 FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
427 uint64_t combined_ = 0;
428 uint64_t passes_ = 0;
429 uint64_t sessions_ = 0;
430 Atom<uint64_t> outOfSpaceCount_{0};
432 template <typename OpFunc, typename FillFunc, typename ResFn>
435 const FillFunc& fillFn,
440 std::unique_lock<Mutex> l(this->m_, std::defer_lock);
449 bool tc = (rec != nullptr);
451 // if an async op doesn't have a thread-cached record then turn
452 // it into a synchronous op.
456 if (rec == nullptr) {
457 // Can't use FC - Must acquire lock
464 // Wait if record is in use
469 // Fill the request (custom)
470 Req& req = rec->getReq();
474 rec->setFn(std::forward<OpFunc>(opFn));
476 // Indicate that record is valid
477 assert(!rec->isValid());
479 // end of combining critical path
481 // store-load order setValid before isDisconnected
482 std::atomic_thread_fence(std::memory_order_seq_cst);
483 if (rec->isDisconnected()) {
484 rec->clearDisconnected();
485 pushRec(rec->getIndex());
488 // If synchronous wait for the request to be completed
492 Req& req = rec->getReq();
493 resFn(req); // Extract the result (custom)
496 freeRec(rec); // Free the temporary record.
501 void pushRec(size_t idx) {
502 Rec& rec = recsPool_[idx];
504 auto head = recs_.load(std::memory_order_acquire);
505 rec.setNext(head); // there shouldn't be a data race here
506 if (recs_.compare_exchange_weak(head, idx)) {
512 size_t getRecsHead() {
513 return recs_.load(std::memory_order_acquire);
516 size_t nextIndex(size_t idx) {
517 return recsPool_[idx].getNext();
520 void clearPending() {
528 bool isPending() const {
529 return pending_.try_wait();
532 void awaitPending() {
536 uint64_t combiningSession() {
537 uint64_t combined = 0;
539 uint64_t count = static_cast<T*>(this)->combiningPass();
545 } while (combined < this->maxOps_);
549 void tryCombining() {
551 while (isPending()) {
553 combined_ += combiningSession();
558 void dedicatedCombining() {
562 if (shutdown_.load()) {
569 std::lock_guard<Mutex> guard(m_);
570 count = combiningSession();
573 if (count < maxOps_) {
580 void awaitDone(Rec& rec) {
584 awaitDoneTryLock(rec);
588 /// Waits for the request to be done and occasionally tries to
589 /// acquire the lock and to do combining. Used only in the absence
590 /// of a dedicated combiner.
591 void awaitDoneTryLock(Rec& rec) {
594 while (!rec.isDone()) {
596 std::unique_lock<Mutex> l(m_, std::defer_lock);
602 folly::asm_volatile_pause();
603 if (++count == 1000) {
611 shutdown_.store(true);
615 /// The following member functions may be overridden for customization
617 void combinedOp(Req&) {
618 throw std::runtime_error(
619 "FlatCombining::combinedOp(Req&) must be overridden in the derived"
620 " class if called.");
623 void processReq(Rec& rec) {
624 SavedFn& opFn = rec.getFn();
630 Req& req = rec.getReq();
631 static_cast<T*>(this)->combinedOp(req); // defined in derived class
633 rec.setLast(passes_);
637 uint64_t combiningPass() {
639 auto idx = getRecsHead();
641 while (idx != NULL_INDEX) {
642 Rec& rec = recsPool_[idx];
643 auto next = rec.getNext();
644 bool valid = rec.isValid();
645 if (!valid && (passes_ - rec.getLast() > kIdleThreshold) &&
649 rec.setDisconnected();
650 // store-load order setDisconnected before isValid
651 std::atomic_thread_fence(std::memory_order_seq_cst);
652 valid = rec.isValid();
666 } // namespace folly {