Fix copyright lines
[folly.git] / folly / experimental / io / AsyncIO.cpp
index e5674bff9e1915fd2eaf01126fb10646e4cff786..55883ff91a75da76518005b1578084f4d4569477 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2013-present Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -34,9 +34,7 @@
 namespace folly {
 
 AsyncIOOp::AsyncIOOp(NotificationCallback cb)
-  : cb_(std::move(cb)),
-    state_(State::UNINITIALIZED),
-    result_(-EINVAL) {
+    : cb_(std::move(cb)), state_(State::UNINITIALIZED), result_(-EINVAL) {
   memset(&iocb_, 0, sizeof(iocb_));
 }
 
@@ -142,13 +140,13 @@ void AsyncIO::initializeContext() {
       // returns negative errno
       if (rc == -EAGAIN) {
         long aio_nr, aio_max;
-        std::unique_ptr<FILE, int(*)(FILE*)>
-          fp(fopen("/proc/sys/fs/aio-nr", "r"), fclose);
+        std::unique_ptr<FILE, int (*)(FILE*)> fp(
+            fopen("/proc/sys/fs/aio-nr", "r"), fclose);
         PCHECK(fp);
         CHECK_EQ(fscanf(fp.get(), "%ld", &aio_nr), 1);
 
-        std::unique_ptr<FILE, int(*)(FILE*)>
-          aio_max_fp(fopen("/proc/sys/fs/aio-max-nr", "r"), fclose);
+        std::unique_ptr<FILE, int (*)(FILE*)> aio_max_fp(
+            fopen("/proc/sys/fs/aio-max-nr", "r"), fclose);
         PCHECK(aio_max_fp);
         CHECK_EQ(fscanf(aio_max_fp.get(), "%ld", &aio_max), 1);
 
@@ -165,7 +163,7 @@ void AsyncIO::initializeContext() {
 
 void AsyncIO::submit(Op* op) {
   CHECK_EQ(op->state(), Op::State::INITIALIZED);
-  initializeContext();  // on demand
+  initializeContext(); // on demand
 
   // We can increment past capacity, but we'll clean up after ourselves.
   auto p = pending_.fetch_add(1, std::memory_order_acq_rel);
@@ -174,7 +172,7 @@ void AsyncIO::submit(Op* op) {
     throw std::range_error("AsyncIO: too many pending requests");
   }
   iocb* cb = &op->iocb_;
-  cb->data = nullptr;  // unused
+  cb->data = nullptr; // unused
   if (pollFd_ != -1) {
     io_set_eventfd(cb, pollFd_);
   }
@@ -193,15 +191,13 @@ Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
   CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
   auto p = pending_.load(std::memory_order_acquire);
   CHECK_LE(minRequests, p);
-  doWait(WaitType::COMPLETE, minRequests, p, &completed_);
-  return Range<Op**>(completed_.data(), completed_.size());
+  return doWait(WaitType::COMPLETE, minRequests, p, completed_);
 }
 
-size_t AsyncIO::cancel() {
+Range<AsyncIO::Op**> AsyncIO::cancel() {
   CHECK(ctx_);
   auto p = pending_.load(std::memory_order_acquire);
-  doWait(WaitType::CANCEL, p, p, nullptr);
-  return p;
+  return doWait(WaitType::CANCEL, p, p, canceled_);
 }
 
 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
@@ -215,7 +211,7 @@ Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
     rc = ::read(pollFd_, &numEvents, 8);
   } while (rc == -1 && errno == EINTR);
   if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
-    return Range<Op**>();  // nothing completed
+    return Range<Op**>(); // nothing completed
   }
   checkUnixError(rc, "AsyncIO: read from event fd failed");
   DCHECK_EQ(rc, 8);
@@ -224,15 +220,14 @@ Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
   DCHECK_LE(numEvents, pending_);
 
   // Don't reap more than numEvents, as we've just reset the counter to 0.
-  doWait(WaitType::COMPLETE, numEvents, numEvents, &completed_);
-  return Range<Op**>(completed_.data(), completed_.size());
+  return doWait(WaitType::COMPLETE, numEvents, numEvents, completed_);
 }
 
-void AsyncIO::doWait(
+Range<AsyncIO::Op**> AsyncIO::doWait(
     WaitType type,
     size_t minRequests,
     size_t maxRequests,
-    std::vector<Op*>* result) {
+    std::vector<Op*>& result) {
   io_event events[maxRequests];
 
   // Unfortunately, Linux AIO doesn't implement io_cancel, so even for
@@ -244,11 +239,12 @@ void AsyncIO::doWait(
       // GOTCHA: io_getevents() may returns less than min_nr results if
       // interrupted after some events have been read (if before, -EINTR
       // is returned).
-      ret = io_getevents(ctx_,
-                         minRequests - count,
-                         maxRequests - count,
-                         events + count,
-                         /* timeout */ nullptr);  // wait forever
+      ret = io_getevents(
+          ctx_,
+          minRequests - count,
+          maxRequests - count,
+          events + count,
+          /* timeout */ nullptr); // wait forever
     } while (ret == -EINTR);
     // Check as may not be able to recover without leaking events.
     CHECK_GE(ret, 0) << "AsyncIO: io_getevents failed with error "
@@ -257,9 +253,7 @@ void AsyncIO::doWait(
   } while (count < minRequests);
   DCHECK_LE(count, maxRequests);
 
-  if (result != nullptr) {
-    result->clear();
-  }
+  result.clear();
   for (size_t i = 0; i < count; ++i) {
     DCHECK(events[i].obj);
     Op* op = boost::intrusive::get_parent_from_member(
@@ -273,16 +267,14 @@ void AsyncIO::doWait(
         op->cancel();
         break;
     }
-    if (result != nullptr) {
-      result->push_back(op);
-    }
+    result.push_back(op);
   }
-}
 
-AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
-  : asyncIO_(asyncIO) {
+  return range(result);
 }
 
+AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO) : asyncIO_(asyncIO) {}
+
 AsyncIOQueue::~AsyncIOQueue() {
   CHECK_EQ(asyncIO_->pending(), 0);
 }
@@ -296,7 +288,9 @@ void AsyncIOQueue::submit(OpFactory op) {
   maybeDequeue();
 }
 
-void AsyncIOQueue::onCompleted(AsyncIOOp* /* op */) { maybeDequeue(); }
+void AsyncIOQueue::onCompleted(AsyncIOOp* /* op */) {
+  maybeDequeue();
+}
 
 void AsyncIOQueue::maybeDequeue() {
   while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
@@ -308,7 +302,9 @@ void AsyncIOQueue::maybeDequeue() {
     auto& nextCb = op->notificationCallback();
     op->setNotificationCallback([this, nextCb](AsyncIOOp* op2) {
       this->onCompleted(op2);
-      if (nextCb) nextCb(op2);
+      if (nextCb) {
+        nextCb(op2);
+      }
     });
 
     asyncIO_->submit(op);
@@ -319,7 +315,9 @@ void AsyncIOQueue::maybeDequeue() {
 
 namespace {
 
-#define X(c) case c: return #c
+#define X(c) \
+  case c:    \
+    return #c
 
 const char* asyncIoOpStateToString(AsyncIOOp::State state) {
   switch (state) {
@@ -353,21 +351,28 @@ std::string fd2name(int fd) {
   std::string path = folly::to<std::string>("/proc/self/fd/", fd);
   char link[PATH_MAX];
   const ssize_t length =
-    std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
+      std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
   return path.assign(link, length);
 }
 
 std::ostream& operator<<(std::ostream& os, const iocb& cb) {
   os << folly::format(
-    "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
-    cb.data, cb.key, iocbCmdToString(cb.aio_lio_opcode),
-    cb.aio_reqprio, cb.aio_fildes, fd2name(cb.aio_fildes));
+      "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
+      cb.data,
+      cb.key,
+      iocbCmdToString(cb.aio_lio_opcode),
+      cb.aio_reqprio,
+      cb.aio_fildes,
+      fd2name(cb.aio_fildes));
 
   switch (cb.aio_lio_opcode) {
     case IO_CMD_PREAD:
     case IO_CMD_PWRITE:
-      os << folly::format("buf={}, offset={}, nbytes={}, ",
-                          cb.u.c.buf, cb.u.c.offset, cb.u.c.nbytes);
+      os << folly::format(
+          "buf={}, offset={}, nbytes={}, ",
+          cb.u.c.buf,
+          cb.u.c.offset,
+          cb.u.c.nbytes);
       break;
     default:
       os << "[TODO: write debug string for "
@@ -378,7 +383,7 @@ std::ostream& operator<<(std::ostream& os, const iocb& cb) {
   return os;
 }
 
-}  // anonymous namespace
+} // namespace
 
 std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
   os << "{" << op.state_ << ", ";
@@ -402,4 +407,4 @@ std::ostream& operator<<(std::ostream& os, AsyncIOOp::State state) {
   return os << asyncIoOpStateToString(state);
 }
 
-}  // namespace folly
+} // namespace folly