projects
/
folly.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Ensure LogWriter::writeMessage(std::string&&) is in the overload set of ImmediateFile...
[folly.git]
/
folly
/
experimental
/
io
/
AsyncIO.cpp
diff --git
a/folly/experimental/io/AsyncIO.cpp
b/folly/experimental/io/AsyncIO.cpp
index 491db24f055937b9a94f7782592b5cf165e1a1fa..0b51128aed74d846f80f3026baf0a7fc894776fc 100644
(file)
--- a/
folly/experimental/io/AsyncIO.cpp
+++ b/
folly/experimental/io/AsyncIO.cpp
@@
-1,5
+1,5
@@
/*
/*
- * Copyright 201
5
Facebook, Inc.
+ * Copyright 201
7
Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@
-17,7
+17,6
@@
#include <folly/experimental/io/AsyncIO.h>
#include <sys/eventfd.h>
#include <folly/experimental/io/AsyncIO.h>
#include <sys/eventfd.h>
-#include <unistd.h>
#include <cerrno>
#include <ostream>
#include <stdexcept>
#include <cerrno>
#include <ostream>
#include <stdexcept>
@@
-30,6
+29,7
@@
#include <folly/Format.h>
#include <folly/Likely.h>
#include <folly/String.h>
#include <folly/Format.h>
#include <folly/Likely.h>
#include <folly/String.h>
+#include <folly/portability/Unistd.h>
namespace folly {
namespace folly {
@@
-66,6
+66,11
@@
void AsyncIOOp::complete(ssize_t result) {
}
}
}
}
+void AsyncIOOp::cancel() {
+ DCHECK_EQ(state_, State::PENDING);
+ state_ = State::CANCELED;
+}
+
ssize_t AsyncIOOp::result() const {
CHECK_EQ(state_, State::COMPLETED);
return result_;
ssize_t AsyncIOOp::result() const {
CHECK_EQ(state_, State::COMPLETED);
return result_;
@@
-104,13
+109,7
@@
void AsyncIOOp::init() {
state_ = State::INITIALIZED;
}
state_ = State::INITIALIZED;
}
-AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
- : ctx_(0),
- ctxSet_(false),
- pending_(0),
- submitted_(0),
- capacity_(capacity),
- pollFd_(-1) {
+AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) : capacity_(capacity) {
CHECK_GT(capacity_, 0);
completed_.reserve(capacity_);
if (pollMode == POLLABLE) {
CHECK_GT(capacity_, 0);
completed_.reserve(capacity_);
if (pollMode == POLLABLE) {
@@
-194,7
+193,13
@@
Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
auto p = pending_.load(std::memory_order_acquire);
CHECK_LE(minRequests, p);
CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
auto p = pending_.load(std::memory_order_acquire);
CHECK_LE(minRequests, p);
- return doWait(minRequests, p);
+ return doWait(WaitType::COMPLETE, minRequests, p, completed_);
+}
+
+Range<AsyncIO::Op**> AsyncIO::cancel() {
+ CHECK(ctx_);
+ auto p = pending_.load(std::memory_order_acquire);
+ return doWait(WaitType::CANCEL, p, p, canceled_);
}
Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
}
Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
@@
-217,12
+222,18
@@
Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
DCHECK_LE(numEvents, pending_);
// Don't reap more than numEvents, as we've just reset the counter to 0.
DCHECK_LE(numEvents, pending_);
// Don't reap more than numEvents, as we've just reset the counter to 0.
- return doWait(
numEvents, numEvents
);
+ return doWait(
WaitType::COMPLETE, numEvents, numEvents, completed_
);
}
}
-Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
+Range<AsyncIO::Op**> AsyncIO::doWait(
+ WaitType type,
+ size_t minRequests,
+ size_t maxRequests,
+ std::vector<Op*>& result) {
io_event events[maxRequests];
io_event events[maxRequests];
+ // Unfortunately, Linux AIO doesn't implement io_cancel, so even for
+ // WaitType::CANCEL we have to wait for IO completion.
size_t count = 0;
do {
int ret;
size_t count = 0;
do {
int ret;
@@
-237,27
+248,30
@@
Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
/* timeout */ nullptr); // wait forever
} while (ret == -EINTR);
// Check as may not be able to recover without leaking events.
/* timeout */ nullptr); // wait forever
} while (ret == -EINTR);
// Check as may not be able to recover without leaking events.
- CHECK_GE(ret, 0)
-
<< "AsyncIO: io_getevents failed with error "
<< errnoStr(-ret);
+ CHECK_GE(ret, 0)
<< "AsyncIO: io_getevents failed with error "
+
<< errnoStr(-ret);
count += ret;
} while (count < minRequests);
DCHECK_LE(count, maxRequests);
count += ret;
} while (count < minRequests);
DCHECK_LE(count, maxRequests);
- completed_.clear();
- if (count == 0) {
- return folly::Range<Op**>();
- }
-
+ result.clear();
for (size_t i = 0; i < count; ++i) {
DCHECK(events[i].obj);
Op* op = boost::intrusive::get_parent_from_member(
events[i].obj, &AsyncIOOp::iocb_);
decrementPending();
for (size_t i = 0; i < count; ++i) {
DCHECK(events[i].obj);
Op* op = boost::intrusive::get_parent_from_member(
events[i].obj, &AsyncIOOp::iocb_);
decrementPending();
- op->complete(events[i].res);
- completed_.push_back(op);
+ switch (type) {
+ case WaitType::COMPLETE:
+ op->complete(events[i].res);
+ break;
+ case WaitType::CANCEL:
+ op->cancel();
+ break;
+ }
+ result.push_back(op);
}
}
- return
folly::Range<Op**>(&completed_.front(), coun
t);
+ return
range(resul
t);
}
AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
}
AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
@@
-277,9
+291,7
@@
void AsyncIOQueue::submit(OpFactory op) {
maybeDequeue();
}
maybeDequeue();
}
-void AsyncIOQueue::onCompleted(AsyncIOOp* op) {
- maybeDequeue();
-}
+void AsyncIOQueue::onCompleted(AsyncIOOp* /* op */) { maybeDequeue(); }
void AsyncIOQueue::maybeDequeue() {
while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
void AsyncIOQueue::maybeDequeue() {
while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
@@
-289,9
+301,9
@@
void AsyncIOQueue::maybeDequeue() {
// Interpose our completion callback
auto& nextCb = op->notificationCallback();
// Interpose our completion callback
auto& nextCb = op->notificationCallback();
- op->setNotificationCallback([this, nextCb](AsyncIOOp* op) {
- this->onCompleted(op);
- if (nextCb) nextCb(op);
+ op->setNotificationCallback([this, nextCb](AsyncIOOp* op
2
) {
+ this->onCompleted(op
2
);
+ if (nextCb) nextCb(op
2
);
});
asyncIO_->submit(op);
});
asyncIO_->submit(op);
@@
-310,6
+322,7
@@
const char* asyncIoOpStateToString(AsyncIOOp::State state) {
X(AsyncIOOp::State::INITIALIZED);
X(AsyncIOOp::State::PENDING);
X(AsyncIOOp::State::COMPLETED);
X(AsyncIOOp::State::INITIALIZED);
X(AsyncIOOp::State::PENDING);
X(AsyncIOOp::State::COMPLETED);
+ X(AsyncIOOp::State::CANCELED);
}
return "<INVALID AsyncIOOp::State>";
}
}
return "<INVALID AsyncIOOp::State>";
}
@@
-348,11
+361,13
@@
std::ostream& operator<<(std::ostream& os, const iocb& cb) {
switch (cb.aio_lio_opcode) {
case IO_CMD_PREAD:
case IO_CMD_PWRITE:
switch (cb.aio_lio_opcode) {
case IO_CMD_PREAD:
case IO_CMD_PWRITE:
- os << folly::format("buf={}, off={}, size={}, ",
- cb.u.c.buf, cb.u.c.nbytes, cb.u.c.offset);
+ os << folly::format("buf={}, offset={}, nbytes={}, ",
+ cb.u.c.buf, cb.u.c.offset, cb.u.c.nbytes);
+ break;
default:
os << "[TODO: write debug string for "
<< iocbCmdToString(cb.aio_lio_opcode) << "] ";
default:
os << "[TODO: write debug string for "
<< iocbCmdToString(cb.aio_lio_opcode) << "] ";
+ break;
}
return os;
}
return os;