From d10b650899b3caac08364fb888740bcb762dc7f7 Mon Sep 17 00:00:00 2001 From: Tudor Bosman Date: Wed, 8 May 2013 14:40:10 -0700 Subject: [PATCH] Make folly::AsyncIO thread safe Summary: You can now submit to the same AsyncIO object from different threads, but you must still reap from only one thread at a time. Test Plan: async_io_test, added MT test Reviewed By: philipp@fb.com FB internal diff: D804914 --- folly/experimental/io/AsyncIO.cpp | 46 ++++++++++++++++------ folly/experimental/io/AsyncIO.h | 17 +++++++- folly/experimental/io/test/AsyncIOTest.cpp | 25 +++++++++++- 3 files changed, 71 insertions(+), 17 deletions(-) diff --git a/folly/experimental/io/AsyncIO.cpp b/folly/experimental/io/AsyncIO.cpp index 83a55286..83558e83 100644 --- a/folly/experimental/io/AsyncIO.cpp +++ b/folly/experimental/io/AsyncIO.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -104,6 +105,7 @@ void AsyncIOOp::init() { AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) : ctx_(0), + ctxSet_(false), pending_(0), capacity_(capacity), pollFd_(-1) { @@ -126,36 +128,54 @@ AsyncIO::~AsyncIO() { } } +void AsyncIO::decrementPending() { + ssize_t p = pending_.fetch_add(-1, std::memory_order_acq_rel); + DCHECK_GE(p, 1); +} + void AsyncIO::initializeContext() { - if (!ctx_) { - int rc = io_queue_init(capacity_, &ctx_); - // returns negative errno - checkKernelError(rc, "AsyncIO: io_queue_init failed"); - DCHECK(ctx_); + if (!ctxSet_.load(std::memory_order_acquire)) { + std::lock_guard lock(initMutex_); + if (!ctxSet_.load(std::memory_order_relaxed)) { + int rc = io_queue_init(capacity_, &ctx_); + // returns negative errno + checkKernelError(rc, "AsyncIO: io_queue_init failed"); + DCHECK(ctx_); + ctxSet_.store(true, std::memory_order_release); + } } } void AsyncIO::submit(Op* op) { CHECK_EQ(op->state(), Op::State::INITIALIZED); - CHECK_LT(pending_, capacity_) << "too many pending requests"; initializeContext(); // on demand + + // We can increment past capacity, but we'll clean up after ourselves. + ssize_t p = pending_.fetch_add(1, std::memory_order_acq_rel); + if (p >= capacity_) { + decrementPending(); + throw std::range_error("AsyncIO: too many pending requests"); + } iocb* cb = &op->iocb_; cb->data = nullptr; // unused if (pollFd_ != -1) { io_set_eventfd(cb, pollFd_); } int rc = io_submit(ctx_, 1, &cb); - checkKernelError(rc, "AsyncIO: io_submit failed"); + if (rc < 0) { + decrementPending(); + throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed"); + } DCHECK_EQ(rc, 1); op->start(); - ++pending_; } Range AsyncIO::wait(size_t minRequests) { CHECK(ctx_); CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object"; - CHECK_LE(minRequests, pending_); - return doWait(minRequests, pending_); + ssize_t p = pending_.load(std::memory_order_acquire); + CHECK_LE(minRequests, p); + return doWait(minRequests, p); } Range AsyncIO::pollCompleted() { @@ -182,7 +202,7 @@ Range AsyncIO::pollCompleted() { } Range AsyncIO::doWait(size_t minRequests, size_t maxRequests) { - io_event events[pending_]; + io_event events[maxRequests]; int count; do { // Wait forever @@ -190,7 +210,7 @@ Range AsyncIO::doWait(size_t minRequests, size_t maxRequests) { } while (count == -EINTR); checkKernelError(count, "AsyncIO: io_getevents failed"); DCHECK_GE(count, minRequests); // the man page says so - DCHECK_LE(count, pending_); + DCHECK_LE(count, maxRequests); completed_.clear(); if (count == 0) { @@ -201,7 +221,7 @@ Range AsyncIO::doWait(size_t minRequests, size_t maxRequests) { DCHECK(events[i].obj); Op* op = boost::intrusive::get_parent_from_member( events[i].obj, &AsyncIOOp::iocb_); - --pending_; + decrementPending(); op->complete(events[i].res); completed_.push_back(op); } diff --git a/folly/experimental/io/AsyncIO.h b/folly/experimental/io/AsyncIO.h index 04217969..83c37f16 100644 --- a/folly/experimental/io/AsyncIO.h +++ b/folly/experimental/io/AsyncIO.h @@ -21,9 +21,11 @@ #include #include +#include #include #include #include +#include #include #include #include @@ -138,6 +140,12 @@ class AsyncIO : private boost::noncopyable { * any IOs on this AsyncIO have completed. If you do this, you must use * pollCompleted() instead of wait() -- do not read from the pollFd() * file descriptor directly. + * + * You may use the same AsyncIO object from multiple threads, as long as + * there is only one concurrent caller of wait() / pollCompleted() (perhaps + * by always calling it from the same thread, or by providing appropriate + * mutual exclusion) In this case, pending() returns a snapshot + * of the current number of pending requests. */ explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE); ~AsyncIO(); @@ -180,12 +188,17 @@ class AsyncIO : private boost::noncopyable { void submit(Op* op); private: + void decrementPending(); void initializeContext(); + Range doWait(size_t minRequests, size_t maxRequests); io_context_t ctx_; - size_t pending_; - const size_t capacity_; + std::atomic ctxSet_; + std::mutex initMutex_; + + std::atomic pending_; + const ssize_t capacity_; int pollFd_; std::vector completed_; }; diff --git a/folly/experimental/io/test/AsyncIOTest.cpp b/folly/experimental/io/test/AsyncIOTest.cpp index cdd9d283..f4ee54b6 100644 --- a/folly/experimental/io/test/AsyncIOTest.cpp +++ b/folly/experimental/io/test/AsyncIOTest.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -152,20 +153,39 @@ void testReadsSerially(const std::vector& specs, } void testReadsParallel(const std::vector& specs, - AsyncIO::PollMode pollMode) { + AsyncIO::PollMode pollMode, + bool multithreaded) { AsyncIO aioReader(specs.size(), pollMode); std::unique_ptr ops(new AsyncIO::Op[specs.size()]); std::vector bufs; + bufs.reserve(specs.size()); int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY); PCHECK(fd != -1); SCOPE_EXIT { ::close(fd); }; + + std::vector threads; + if (multithreaded) { + threads.reserve(specs.size()); + } for (int i = 0; i < specs.size(); i++) { bufs.push_back(allocateAligned(specs[i].size)); + } + auto submit = [&] (int i) { ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start); aioReader.submit(&ops[i]); + }; + for (int i = 0; i < specs.size(); i++) { + if (multithreaded) { + threads.emplace_back([&submit, i] { submit(i); }); + } else { + submit(i); + } + } + for (auto& t : threads) { + t.join(); } std::vector pending(specs.size(), true); @@ -249,7 +269,8 @@ void testReadsQueued(const std::vector& specs, void testReads(const std::vector& specs, AsyncIO::PollMode pollMode) { testReadsSerially(specs, pollMode); - testReadsParallel(specs, pollMode); + testReadsParallel(specs, pollMode, false); + testReadsParallel(specs, pollMode, true); testReadsQueued(specs, pollMode); } -- 2.34.1