From: Philip Pronin Date: Thu, 11 May 2017 19:51:11 +0000 (-0700) Subject: AsyncIO::cancel X-Git-Tag: v2017.05.15.00~10 X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=commitdiff_plain;h=c8aadaad37770dc4d1caf2bff239c604b51a6132 AsyncIO::cancel Summary: It should be implemented with `io_cancel`, but it is not supported (D682836), so still have to drain events, ignoring only op callbacks. Reviewed By: luciang, ot Differential Revision: D5044020 fbshipit-source-id: 0bcd04c91a437fccaf2189ccf771a1cb61c68942 --- diff --git a/folly/experimental/io/AsyncIO.cpp b/folly/experimental/io/AsyncIO.cpp index 9128d8b3..e5674bff 100644 --- a/folly/experimental/io/AsyncIO.cpp +++ b/folly/experimental/io/AsyncIO.cpp @@ -66,6 +66,11 @@ void AsyncIOOp::complete(ssize_t result) { } } +void AsyncIOOp::cancel() { + DCHECK_EQ(state_, State::PENDING); + state_ = State::CANCELED; +} + ssize_t AsyncIOOp::result() const { CHECK_EQ(state_, State::COMPLETED); return result_; @@ -104,13 +109,7 @@ void AsyncIOOp::init() { state_ = State::INITIALIZED; } -AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) - : ctx_(0), - ctxSet_(false), - pending_(0), - submitted_(0), - capacity_(capacity), - pollFd_(-1) { +AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) : capacity_(capacity) { CHECK_GT(capacity_, 0); completed_.reserve(capacity_); if (pollMode == POLLABLE) { @@ -194,7 +193,15 @@ Range AsyncIO::wait(size_t minRequests) { CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object"; auto p = pending_.load(std::memory_order_acquire); CHECK_LE(minRequests, p); - return doWait(minRequests, p); + doWait(WaitType::COMPLETE, minRequests, p, &completed_); + return Range(completed_.data(), completed_.size()); +} + +size_t AsyncIO::cancel() { + CHECK(ctx_); + auto p = pending_.load(std::memory_order_acquire); + doWait(WaitType::CANCEL, p, p, nullptr); + return p; } Range AsyncIO::pollCompleted() { @@ -217,12 +224,19 @@ Range AsyncIO::pollCompleted() { DCHECK_LE(numEvents, pending_); // Don't reap more than numEvents, as we've just reset the counter to 0. - return doWait(numEvents, numEvents); + doWait(WaitType::COMPLETE, numEvents, numEvents, &completed_); + return Range(completed_.data(), completed_.size()); } -Range AsyncIO::doWait(size_t minRequests, size_t maxRequests) { +void AsyncIO::doWait( + WaitType type, + size_t minRequests, + size_t maxRequests, + std::vector* result) { io_event events[maxRequests]; + // Unfortunately, Linux AIO doesn't implement io_cancel, so even for + // WaitType::CANCEL we have to wait for IO completion. size_t count = 0; do { int ret; @@ -237,27 +251,32 @@ Range AsyncIO::doWait(size_t minRequests, size_t maxRequests) { /* timeout */ nullptr); // wait forever } while (ret == -EINTR); // Check as may not be able to recover without leaking events. - CHECK_GE(ret, 0) - << "AsyncIO: io_getevents failed with error " << errnoStr(-ret); + CHECK_GE(ret, 0) << "AsyncIO: io_getevents failed with error " + << errnoStr(-ret); count += ret; } while (count < minRequests); DCHECK_LE(count, maxRequests); - completed_.clear(); - if (count == 0) { - return folly::Range(); + if (result != nullptr) { + result->clear(); } - for (size_t i = 0; i < count; ++i) { DCHECK(events[i].obj); Op* op = boost::intrusive::get_parent_from_member( events[i].obj, &AsyncIOOp::iocb_); decrementPending(); - op->complete(events[i].res); - completed_.push_back(op); + switch (type) { + case WaitType::COMPLETE: + op->complete(events[i].res); + break; + case WaitType::CANCEL: + op->cancel(); + break; + } + if (result != nullptr) { + result->push_back(op); + } } - - return folly::Range(&completed_.front(), count); } AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO) @@ -308,6 +327,7 @@ const char* asyncIoOpStateToString(AsyncIOOp::State state) { X(AsyncIOOp::State::INITIALIZED); X(AsyncIOOp::State::PENDING); X(AsyncIOOp::State::COMPLETED); + X(AsyncIOOp::State::CANCELED); } return ""; } diff --git a/folly/experimental/io/AsyncIO.h b/folly/experimental/io/AsyncIO.h index faa5e27f..6702c53f 100644 --- a/folly/experimental/io/AsyncIO.h +++ b/folly/experimental/io/AsyncIO.h @@ -40,25 +40,24 @@ namespace folly { * An AsyncIOOp represents a pending operation. You may set a notification * callback or you may use this class's methods directly. * - * The op must remain allocated until completion. + * The op must remain allocated until it is completed or canceled. */ class AsyncIOOp : private boost::noncopyable { friend class AsyncIO; friend std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o); + public: typedef std::function NotificationCallback; explicit AsyncIOOp(NotificationCallback cb = NotificationCallback()); ~AsyncIOOp(); - // There would be a cancel() method here if Linux AIO actually implemented - // it. But let's not get your hopes up. - enum class State { UNINITIALIZED, INITIALIZED, PENDING, - COMPLETED + COMPLETED, + CANCELED, }; /** @@ -95,8 +94,7 @@ class AsyncIOOp : private boost::noncopyable { * conventions). Use checkKernelError (folly/Exception.h) on the result to * throw a std::system_error in case of error instead. * - * It is an error to call this if the Op hasn't yet started or is still - * pending. + * It is an error to call this if the Op hasn't completed. */ ssize_t result() const; @@ -104,6 +102,7 @@ class AsyncIOOp : private boost::noncopyable { void init(); void start(); void complete(ssize_t result); + void cancel(); NotificationCallback cb_; iocb iocb_; @@ -123,7 +122,7 @@ class AsyncIO : private boost::noncopyable { enum PollMode { NOT_POLLABLE, - POLLABLE + POLLABLE, }; /** @@ -141,12 +140,12 @@ class AsyncIO : private boost::noncopyable { * file descriptor directly. * * You may use the same AsyncIO object from multiple threads, as long as - * there is only one concurrent caller of wait() / pollCompleted() (perhaps - * by always calling it from the same thread, or by providing appropriate - * mutual exclusion) In this case, pending() returns a snapshot + * there is only one concurrent caller of wait() / pollCompleted() / cancel() + * (perhaps by always calling it from the same thread, or by providing + * appropriate mutual exclusion). In this case, pending() returns a snapshot * of the current number of pending requests. */ - explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE); + explicit AsyncIO(size_t capacity, PollMode pollMode = NOT_POLLABLE); ~AsyncIO(); /** @@ -156,6 +155,11 @@ class AsyncIO : private boost::noncopyable { */ Range wait(size_t minRequests); + /** + * Cancel all pending requests and return their number. + */ + size_t cancel(); + /** * Return the number of pending requests. */ @@ -196,16 +200,21 @@ class AsyncIO : private boost::noncopyable { void decrementPending(); void initializeContext(); - Range doWait(size_t minRequests, size_t maxRequests); + enum class WaitType { COMPLETE, CANCEL }; + void doWait( + WaitType type, + size_t minRequests, + size_t maxRequests, + std::vector* result); - io_context_t ctx_; - std::atomic ctxSet_; + io_context_t ctx_{nullptr}; + std::atomic ctxSet_{false}; std::mutex initMutex_; - std::atomic pending_; - std::atomic submitted_; + std::atomic pending_{0}; + std::atomic submitted_{0}; const size_t capacity_; - int pollFd_; + int pollFd_{-1}; std::vector completed_; }; diff --git a/folly/experimental/io/test/AsyncIOTest.cpp b/folly/experimental/io/test/AsyncIOTest.cpp index 39e51a1d..7c776ef0 100644 --- a/folly/experimental/io/test/AsyncIOTest.cpp +++ b/folly/experimental/io/test/AsyncIOTest.cpp @@ -36,7 +36,9 @@ #include namespace fs = folly::fs; + using folly::AsyncIO; +using folly::AsyncIOOp; using folly::AsyncIOQueue; namespace { @@ -85,7 +87,7 @@ class TemporaryFile { }; TemporaryFile::TemporaryFile(size_t size) - : path_(fs::temp_directory_path() / fs::unique_path()) { + : path_(fs::temp_directory_path() / fs::unique_path()) { CHECK_EQ(size % sizeof(uint32_t), 0); size /= sizeof(uint32_t); const uint32_t seed = 42; @@ -370,7 +372,7 @@ TEST(AsyncIO, NonBlockingWait) { SCOPE_EXIT { ::close(fd); }; - size_t size = 2*kAlign; + size_t size = 2 * kAlign; auto buf = allocateAligned(size); op.pread(fd, buf.get(), size, 0); aioReader.submit(&op); @@ -389,3 +391,50 @@ TEST(AsyncIO, NonBlockingWait) { EXPECT_EQ(size, res); EXPECT_EQ(aioReader.pending(), 0); } + +TEST(AsyncIO, Cancel) { + constexpr size_t kNumOps = 10; + + AsyncIO aioReader(kNumOps, AsyncIO::NOT_POLLABLE); + int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY); + PCHECK(fd != -1); + SCOPE_EXIT { + ::close(fd); + }; + + std::vector ops(kNumOps); + std::vector bufs; + + size_t completed = 0; + for (auto& op : ops) { + const size_t size = 2 * kAlign; + bufs.push_back(allocateAligned(size)); + op.setNotificationCallback([&](AsyncIOOp*) { ++completed; }); + op.pread(fd, bufs.back().get(), size, 0); + aioReader.submit(&op); + } + + EXPECT_EQ(aioReader.pending(), kNumOps); + EXPECT_EQ(completed, 0); + + { + auto result = aioReader.wait(1); + EXPECT_EQ(result.size(), 1); + } + EXPECT_EQ(completed, 1); + EXPECT_EQ(aioReader.pending(), kNumOps - 1); + + EXPECT_EQ(aioReader.cancel(), kNumOps - 1); + EXPECT_EQ(aioReader.pending(), 0); + EXPECT_EQ(completed, 1); + + completed = 0; + for (auto& op : ops) { + if (op.state() == AsyncIOOp::State::COMPLETED) { + ++completed; + } else { + EXPECT_TRUE(op.state() == AsyncIOOp::State::CANCELED) << op; + } + } + EXPECT_EQ(completed, 1); +}