Ensure LogWriter::writeMessage(std::string&&) is in the overload set of ImmediateFile...
[folly.git] / folly / experimental / io / AsyncIO.cpp
index 491db24f055937b9a94f7782592b5cf165e1a1fa..0b51128aed74d846f80f3026baf0a7fc894776fc 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -17,7 +17,6 @@
 #include <folly/experimental/io/AsyncIO.h>
 
 #include <sys/eventfd.h>
-#include <unistd.h>
 #include <cerrno>
 #include <ostream>
 #include <stdexcept>
@@ -30,6 +29,7 @@
 #include <folly/Format.h>
 #include <folly/Likely.h>
 #include <folly/String.h>
+#include <folly/portability/Unistd.h>
 
 namespace folly {
 
@@ -66,6 +66,11 @@ void AsyncIOOp::complete(ssize_t result) {
   }
 }
 
+void AsyncIOOp::cancel() {
+  DCHECK_EQ(state_, State::PENDING);
+  state_ = State::CANCELED;
+}
+
 ssize_t AsyncIOOp::result() const {
   CHECK_EQ(state_, State::COMPLETED);
   return result_;
@@ -104,13 +109,7 @@ void AsyncIOOp::init() {
   state_ = State::INITIALIZED;
 }
 
-AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
-  : ctx_(0),
-    ctxSet_(false),
-    pending_(0),
-    submitted_(0),
-    capacity_(capacity),
-    pollFd_(-1) {
+AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) : capacity_(capacity) {
   CHECK_GT(capacity_, 0);
   completed_.reserve(capacity_);
   if (pollMode == POLLABLE) {
@@ -194,7 +193,13 @@ Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
   CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
   auto p = pending_.load(std::memory_order_acquire);
   CHECK_LE(minRequests, p);
-  return doWait(minRequests, p);
+  return doWait(WaitType::COMPLETE, minRequests, p, completed_);
+}
+
+Range<AsyncIO::Op**> AsyncIO::cancel() {
+  CHECK(ctx_);
+  auto p = pending_.load(std::memory_order_acquire);
+  return doWait(WaitType::CANCEL, p, p, canceled_);
 }
 
 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
@@ -217,12 +222,18 @@ Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
   DCHECK_LE(numEvents, pending_);
 
   // Don't reap more than numEvents, as we've just reset the counter to 0.
-  return doWait(numEvents, numEvents);
+  return doWait(WaitType::COMPLETE, numEvents, numEvents, completed_);
 }
 
-Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
+Range<AsyncIO::Op**> AsyncIO::doWait(
+    WaitType type,
+    size_t minRequests,
+    size_t maxRequests,
+    std::vector<Op*>& result) {
   io_event events[maxRequests];
 
+  // Unfortunately, Linux AIO doesn't implement io_cancel, so even for
+  // WaitType::CANCEL we have to wait for IO completion.
   size_t count = 0;
   do {
     int ret;
@@ -237,27 +248,30 @@ Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
                          /* timeout */ nullptr);  // wait forever
     } while (ret == -EINTR);
     // Check as may not be able to recover without leaking events.
-    CHECK_GE(ret, 0)
-      << "AsyncIO: io_getevents failed with error " << errnoStr(-ret);
+    CHECK_GE(ret, 0) << "AsyncIO: io_getevents failed with error "
+                     << errnoStr(-ret);
     count += ret;
   } while (count < minRequests);
   DCHECK_LE(count, maxRequests);
 
-  completed_.clear();
-  if (count == 0) {
-    return folly::Range<Op**>();
-  }
-
+  result.clear();
   for (size_t i = 0; i < count; ++i) {
     DCHECK(events[i].obj);
     Op* op = boost::intrusive::get_parent_from_member(
         events[i].obj, &AsyncIOOp::iocb_);
     decrementPending();
-    op->complete(events[i].res);
-    completed_.push_back(op);
+    switch (type) {
+      case WaitType::COMPLETE:
+        op->complete(events[i].res);
+        break;
+      case WaitType::CANCEL:
+        op->cancel();
+        break;
+    }
+    result.push_back(op);
   }
 
-  return folly::Range<Op**>(&completed_.front(), count);
+  return range(result);
 }
 
 AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
@@ -277,9 +291,7 @@ void AsyncIOQueue::submit(OpFactory op) {
   maybeDequeue();
 }
 
-void AsyncIOQueue::onCompleted(AsyncIOOp* op) {
-  maybeDequeue();
-}
+void AsyncIOQueue::onCompleted(AsyncIOOp* /* op */) { maybeDequeue(); }
 
 void AsyncIOQueue::maybeDequeue() {
   while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
@@ -289,9 +301,9 @@ void AsyncIOQueue::maybeDequeue() {
 
     // Interpose our completion callback
     auto& nextCb = op->notificationCallback();
-    op->setNotificationCallback([this, nextCb](AsyncIOOp* op) {
-      this->onCompleted(op);
-      if (nextCb) nextCb(op);
+    op->setNotificationCallback([this, nextCb](AsyncIOOp* op2) {
+      this->onCompleted(op2);
+      if (nextCb) nextCb(op2);
     });
 
     asyncIO_->submit(op);
@@ -310,6 +322,7 @@ const char* asyncIoOpStateToString(AsyncIOOp::State state) {
     X(AsyncIOOp::State::INITIALIZED);
     X(AsyncIOOp::State::PENDING);
     X(AsyncIOOp::State::COMPLETED);
+    X(AsyncIOOp::State::CANCELED);
   }
   return "<INVALID AsyncIOOp::State>";
 }
@@ -348,11 +361,13 @@ std::ostream& operator<<(std::ostream& os, const iocb& cb) {
   switch (cb.aio_lio_opcode) {
     case IO_CMD_PREAD:
     case IO_CMD_PWRITE:
-      os << folly::format("buf={}, off={}, size={}, ",
-                          cb.u.c.buf, cb.u.c.nbytes, cb.u.c.offset);
+      os << folly::format("buf={}, offset={}, nbytes={}, ",
+                          cb.u.c.buf, cb.u.c.offset, cb.u.c.nbytes);
+      break;
     default:
       os << "[TODO: write debug string for "
          << iocbCmdToString(cb.aio_lio_opcode) << "] ";
+      break;
   }
 
   return os;