#include <sys/eventfd.h>
#include <unistd.h>
#include <cerrno>
+#include <stdexcept>
#include <string>
#include <boost/intrusive/parent_from_member.hpp>
AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
: ctx_(0),
+ ctxSet_(false),
pending_(0),
capacity_(capacity),
pollFd_(-1) {
}
}
+void AsyncIO::decrementPending() {
+ ssize_t p = pending_.fetch_add(-1, std::memory_order_acq_rel);
+ DCHECK_GE(p, 1);
+}
+
void AsyncIO::initializeContext() {
- if (!ctx_) {
- int rc = io_queue_init(capacity_, &ctx_);
- // returns negative errno
- checkKernelError(rc, "AsyncIO: io_queue_init failed");
- DCHECK(ctx_);
+ if (!ctxSet_.load(std::memory_order_acquire)) {
+ std::lock_guard<std::mutex> lock(initMutex_);
+ if (!ctxSet_.load(std::memory_order_relaxed)) {
+ int rc = io_queue_init(capacity_, &ctx_);
+ // returns negative errno
+ checkKernelError(rc, "AsyncIO: io_queue_init failed");
+ DCHECK(ctx_);
+ ctxSet_.store(true, std::memory_order_release);
+ }
}
}
void AsyncIO::submit(Op* op) {
CHECK_EQ(op->state(), Op::State::INITIALIZED);
- CHECK_LT(pending_, capacity_) << "too many pending requests";
initializeContext(); // on demand
+
+ // We can increment past capacity, but we'll clean up after ourselves.
+ ssize_t p = pending_.fetch_add(1, std::memory_order_acq_rel);
+ if (p >= capacity_) {
+ decrementPending();
+ throw std::range_error("AsyncIO: too many pending requests");
+ }
iocb* cb = &op->iocb_;
cb->data = nullptr; // unused
if (pollFd_ != -1) {
io_set_eventfd(cb, pollFd_);
}
int rc = io_submit(ctx_, 1, &cb);
- checkKernelError(rc, "AsyncIO: io_submit failed");
+ if (rc < 0) {
+ decrementPending();
+ throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed");
+ }
DCHECK_EQ(rc, 1);
op->start();
- ++pending_;
}
Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
CHECK(ctx_);
CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
- CHECK_LE(minRequests, pending_);
- return doWait(minRequests, pending_);
+ ssize_t p = pending_.load(std::memory_order_acquire);
+ CHECK_LE(minRequests, p);
+ return doWait(minRequests, p);
}
Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
}
Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
- io_event events[pending_];
+ io_event events[maxRequests];
int count;
do {
// Wait forever
} while (count == -EINTR);
checkKernelError(count, "AsyncIO: io_getevents failed");
DCHECK_GE(count, minRequests); // the man page says so
- DCHECK_LE(count, pending_);
+ DCHECK_LE(count, maxRequests);
completed_.clear();
if (count == 0) {
DCHECK(events[i].obj);
Op* op = boost::intrusive::get_parent_from_member(
events[i].obj, &AsyncIOOp::iocb_);
- --pending_;
+ decrementPending();
op->complete(events[i].res);
completed_.push_back(op);
}
#include <sys/uio.h>
#include <libaio.h>
+#include <atomic>
#include <cstdint>
#include <deque>
#include <functional>
+#include <mutex>
#include <ostream>
#include <utility>
#include <vector>
* any IOs on this AsyncIO have completed. If you do this, you must use
* pollCompleted() instead of wait() -- do not read from the pollFd()
* file descriptor directly.
+ *
+ * You may use the same AsyncIO object from multiple threads, as long as
+ * there is only one concurrent caller of wait() / pollCompleted() (perhaps
+ * by always calling it from the same thread, or by providing appropriate
+ * mutual exclusion) In this case, pending() returns a snapshot
+ * of the current number of pending requests.
*/
explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE);
~AsyncIO();
void submit(Op* op);
private:
+ void decrementPending();
void initializeContext();
+
Range<Op**> doWait(size_t minRequests, size_t maxRequests);
io_context_t ctx_;
- size_t pending_;
- const size_t capacity_;
+ std::atomic<bool> ctxSet_;
+ std::mutex initMutex_;
+
+ std::atomic<ssize_t> pending_;
+ const ssize_t capacity_;
int pollFd_;
std::vector<Op*> completed_;
};
#include <cstdio>
#include <memory>
#include <random>
+#include <thread>
#include <vector>
#include <glog/logging.h>
}
void testReadsParallel(const std::vector<TestSpec>& specs,
- AsyncIO::PollMode pollMode) {
+ AsyncIO::PollMode pollMode,
+ bool multithreaded) {
AsyncIO aioReader(specs.size(), pollMode);
std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
std::vector<ManagedBuffer> bufs;
+ bufs.reserve(specs.size());
int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
PCHECK(fd != -1);
SCOPE_EXIT {
::close(fd);
};
+
+ std::vector<std::thread> threads;
+ if (multithreaded) {
+ threads.reserve(specs.size());
+ }
for (int i = 0; i < specs.size(); i++) {
bufs.push_back(allocateAligned(specs[i].size));
+ }
+ auto submit = [&] (int i) {
ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
aioReader.submit(&ops[i]);
+ };
+ for (int i = 0; i < specs.size(); i++) {
+ if (multithreaded) {
+ threads.emplace_back([&submit, i] { submit(i); });
+ } else {
+ submit(i);
+ }
+ }
+ for (auto& t : threads) {
+ t.join();
}
std::vector<bool> pending(specs.size(), true);
void testReads(const std::vector<TestSpec>& specs,
AsyncIO::PollMode pollMode) {
testReadsSerially(specs, pollMode);
- testReadsParallel(specs, pollMode);
+ testReadsParallel(specs, pollMode, false);
+ testReadsParallel(specs, pollMode, true);
testReadsQueued(specs, pollMode);
}