#include <limits>
#include <string.h>
#include <type_traits>
-#include <unistd.h>
#include <folly/Traits.h>
#include <folly/detail/CacheLocality.h>
#include <folly/detail/TurnSequencer.h>
+#include <folly/portability/Unistd.h>
namespace folly {
}
}
+ template <class Clock, typename... Args>
+ bool tryWriteUntil(const std::chrono::time_point<Clock>& when,
+ Args&&... args) noexcept {
+ uint64_t ticket;
+ if (tryObtainPromisedPushTicketUntil(ticket, when)) {
+ // we have pre-validated that the ticket won't block, or rather that
+ // it won't block longer than it takes another thread to dequeue an
+ // element from the slot it identifies.
+ enqueueWithTicket(ticket, std::forward<Args>(args)...);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
/// If the queue is not full, enqueues and returns true, otherwise
/// returns false. Unlike write this method can be blocked by another
/// thread, specifically a read that has linearized (been assigned
}
}
+ /// Tries until when to obtain a push ticket for which
+ /// SingleElementQueue::enqueue won't block. Returns true on success, false
+ /// on failure.
+ /// ticket is filled on success AND failure.
+ template <class Clock>
+ bool tryObtainPromisedPushTicketUntil(
+ uint64_t& ticket, const std::chrono::time_point<Clock>& when) noexcept {
+ bool deadlineReached = false;
+ while (!deadlineReached) {
+ if (tryObtainPromisedPushTicket(ticket)) {
+ return true;
+ }
+ // ticket is a blocking ticket until the preceding ticket has been
+ // processed: wait until this ticket's turn arrives. We have not reserved
+ // this ticket so we will have to re-attempt to get a non-blocking ticket
+ // if we wake up before we time-out.
+ deadlineReached = !slots_[idx(ticket)].tryWaitForEnqueueTurnUntil(
+ turn(ticket), pushSpinCutoff_, (ticket % kAdaptationFreq) == 0, when);
+ }
+ return false;
+ }
+
/// Tries to obtain a push ticket which can be satisfied if all
/// in-progress pops complete. This function does not block, but
/// blocking may be required when using the returned ticket if some
auto numPops = popTicket_.load(std::memory_order_acquire); // B
// n will be negative if pops are pending
int64_t n = numPushes - numPops;
+ rv = numPushes;
if (n >= static_cast<ssize_t>(capacity_)) {
// Full, linearize at B. We don't need to recheck the read we
// performed at A, because if numPushes was stale at B then the
return false;
}
if (pushTicket_.compare_exchange_strong(numPushes, numPushes + 1)) {
- rv = numPushes;
return true;
}
}
template <typename = typename std::enable_if<
(folly::IsRelocatable<T>::value &&
boost::has_nothrow_constructor<T>::value) ||
- std::is_nothrow_constructible<T,T&&>::value>::type>
+ std::is_nothrow_constructible<T, T&&>::value>::type>
void enqueue(const uint32_t turn,
Atom<uint32_t>& spinCutoff,
const bool updateSpinCutoff,
ImplByMove, ImplByRelocation>::type());
}
+ /// Waits until either:
+ /// 1: the dequeue turn preceding the given enqueue turn has arrived
+ /// 2: the given deadline has arrived
+ /// Case 1 returns true, case 2 returns false.
+ template <class Clock>
+ bool tryWaitForEnqueueTurnUntil(
+ const uint32_t turn,
+ Atom<uint32_t>& spinCutoff,
+ const bool updateSpinCutoff,
+ const std::chrono::time_point<Clock>& when) noexcept {
+ return sequencer_.tryWaitForTurn(
+ turn * 2, spinCutoff, updateSpinCutoff, &when);
+ }
+
bool mayEnqueue(const uint32_t turn) const noexcept {
return sequencer_.isTurn(turn * 2);
}