Adding IO stats in AsyncIO.
authorShao-Chuan Wang <shaochuan@fb.com>
Tue, 16 Sep 2014 05:56:18 +0000 (22:56 -0700)
committerDave Watson <davejwatson@fb.com>
Wed, 17 Sep 2014 18:23:42 +0000 (11:23 -0700)
Summary:
It would be great if we have IO stats tracking built within AsyncIO.
It would enable upper layer application to better track the number of I/O
that was submitted and completed

Test Plan:
$ fbmake runtests
Test Results Summary:
Passed: 1734
100% successful

Reviewed By: philipp@fb.com

Subscribers: njormrod, schen, stanislav

FB internal diff: D1557472

Tasks: 5008299

folly/experimental/io/AsyncIO.cpp
folly/experimental/io/AsyncIO.h
folly/experimental/io/test/AsyncIOTest.cpp

index 7e7be271514921387f7c58ae5cd2edde1ea1c198..766da907c4118a3a8f2b5d8be9215f9753df22f2 100644 (file)
@@ -108,6 +108,7 @@ AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
   : ctx_(0),
     ctxSet_(false),
     pending_(0),
+    submitted_(0),
     capacity_(capacity),
     pollFd_(-1) {
   CHECK_GT(capacity_, 0);
@@ -130,7 +131,7 @@ AsyncIO::~AsyncIO() {
 }
 
 void AsyncIO::decrementPending() {
-  ssize_t p = pending_.fetch_add(-1, std::memory_order_acq_rel);
+  auto p = pending_.fetch_add(-1, std::memory_order_acq_rel);
   DCHECK_GE(p, 1);
 }
 
@@ -168,7 +169,7 @@ void AsyncIO::submit(Op* op) {
   initializeContext();  // on demand
 
   // We can increment past capacity, but we'll clean up after ourselves.
-  ssize_t p = pending_.fetch_add(1, std::memory_order_acq_rel);
+  auto p = pending_.fetch_add(1, std::memory_order_acq_rel);
   if (p >= capacity_) {
     decrementPending();
     throw std::range_error("AsyncIO: too many pending requests");
@@ -183,6 +184,7 @@ void AsyncIO::submit(Op* op) {
     decrementPending();
     throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed");
   }
+  submitted_++;
   DCHECK_EQ(rc, 1);
   op->start();
 }
@@ -190,7 +192,7 @@ void AsyncIO::submit(Op* op) {
 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
   CHECK(ctx_);
   CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
-  ssize_t p = pending_.load(std::memory_order_acquire);
+  auto p = pending_.load(std::memory_order_acquire);
   CHECK_LE(minRequests, p);
   return doWait(minRequests, p);
 }
index 6b6293261b95e261f7f7743c09124f71a8a41c0c..ae1f4531fc41f08a637d23dd1ba079e58cbc4caa 100644 (file)
@@ -168,6 +168,12 @@ class AsyncIO : private boost::noncopyable {
    */
   size_t capacity() const { return capacity_; }
 
+  /**
+   * Return the accumulative number of submitted I/O, since this object
+   * has been created.
+   */
+  size_t totalSubmits() const { return submitted_; }
+
   /**
    * If POLLABLE, return a file descriptor that can be passed to poll / epoll
    * and will become readable when any async IO operations have completed.
@@ -197,8 +203,9 @@ class AsyncIO : private boost::noncopyable {
   std::atomic<bool> ctxSet_;
   std::mutex initMutex_;
 
-  std::atomic<ssize_t> pending_;
-  const ssize_t capacity_;
+  std::atomic<size_t> pending_;
+  std::atomic<size_t> submitted_;
+  const size_t capacity_;
   int pollFd_;
   std::vector<Op*> completed_;
 };
index 7025eff5218119da823d6c5d85101f52d43762d4..dabb250a24586fd44730a011c685434a25c61267 100644 (file)
@@ -140,6 +140,7 @@ void testReadsSerially(const std::vector<TestSpec>& specs,
     auto buf = allocateAligned(specs[i].size);
     op.pread(fd, buf.get(), specs[i].size, specs[i].start);
     aioReader.submit(&op);
+    EXPECT_EQ((i + 1), aioReader.totalSubmits());
     EXPECT_EQ(aioReader.pending(), 1);
     auto ops = readerWait(&aioReader);
     EXPECT_EQ(1, ops.size());
@@ -208,6 +209,8 @@ void testReadsParallel(const std::vector<TestSpec>& specs,
       EXPECT_EQ(specs[id].size, res);
     }
   }
+  EXPECT_EQ(specs.size(), aioReader.totalSubmits());
+
   EXPECT_EQ(aioReader.pending(), 0);
   for (int i = 0; i < pending.size(); i++) {
     EXPECT_FALSE(pending[i]);
@@ -259,6 +262,7 @@ void testReadsQueued(const std::vector<TestSpec>& specs,
       EXPECT_EQ(specs[id].size, res);
     }
   }
+  EXPECT_EQ(specs.size(), aioReader.totalSubmits());
   EXPECT_EQ(aioReader.pending(), 0);
   EXPECT_EQ(aioQueue.queued(), 0);
   for (int i = 0; i < pending.size(); i++) {