switch (msg.type) {
case MessageType::MSG_NEW_CONN:
{
+ if (connectionEventCallback_) {
+ connectionEventCallback_->onConnectionDequeuedByAcceptCallback(
+ msg.fd, msg.address);
+ }
callback_->connectionAccepted(msg.fd, msg.address);
break;
}
// callback more efficiently without having to use a notification queue.
RemoteAcceptor* acceptor = nullptr;
try {
- acceptor = new RemoteAcceptor(callback);
+ acceptor = new RemoteAcceptor(callback, connectionEventCallback_);
acceptor->start(eventBase, maxAtOnce, maxNumMsgsInQueue_);
} catch (...) {
callbacks_.pop_back();
address.setFromSockaddr(saddr, addrLen);
+ if (clientSocket >= 0 && connectionEventCallback_) {
+ connectionEventCallback_->onConnectionAccepted(clientSocket, address);
+ }
+
std::chrono::time_point<std::chrono::steady_clock> nowMs =
std::chrono::steady_clock::now();
auto timeSinceLastAccept = std::max<int64_t>(
++numDroppedConnections_;
if (clientSocket >= 0) {
closeNoInt(clientSocket);
+ if (connectionEventCallback_) {
+ connectionEventCallback_->onConnectionDropped(clientSocket,
+ address);
+ }
}
continue;
}
} else {
dispatchError("accept() failed", errno);
}
+ if (connectionEventCallback_) {
+ connectionEventCallback_->onConnectionAcceptError(errno);
+ }
return;
}
closeNoInt(clientSocket);
dispatchError("failed to set accepted socket to non-blocking mode",
errno);
+ if (connectionEventCallback_) {
+ connectionEventCallback_->onConnectionDropped(clientSocket, address);
+ }
return;
}
#endif
return;
}
+ const SocketAddress addr(address);
// Create a message to send over the notification queue
QueueMessage msg;
msg.type = MessageType::MSG_NEW_CONN;
// Loop until we find a free queue to write to
while (true) {
if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) {
+ if (connectionEventCallback_) {
+ connectionEventCallback_->onConnectionEnqueuedForAcceptCallback(socket,
+ addr);
+ }
// Success! return.
return;
- }
+ }
// We couldn't add to queue. Fall through to below
LOG(ERROR) << "failed to dispatch newly accepted socket:"
<< " all accept callback queues are full";
closeNoInt(socket);
+ if (connectionEventCallback_) {
+ connectionEventCallback_->onConnectionDropped(socket, addr);
+ }
return;
}
// since we won't be able to re-enable ourselves later.
LOG(ERROR) << "failed to allocate AsyncServerSocket backoff"
<< " timer; unable to temporarly pause accepting";
+ if (connectionEventCallback_) {
+ connectionEventCallback_->onBackoffError();
+ }
return;
}
}
if (!backoffTimeout_->scheduleTimeout(timeoutMS)) {
LOG(ERROR) << "failed to schedule AsyncServerSocket backoff timer;"
<< "unable to temporarly pause accepting";
+ if (connectionEventCallback_) {
+ connectionEventCallback_->onBackoffError();
+ }
return;
}
for (auto& handler : sockets_) {
handler.unregisterHandler();
}
+ if (connectionEventCallback_) {
+ connectionEventCallback_->onBackoffStarted();
+ }
}
void AsyncServerSocket::backoffTimeoutExpired() {
// If all of the callbacks were removed, we shouldn't re-enable accepts
if (callbacks_.empty()) {
+ if (connectionEventCallback_) {
+ connectionEventCallback_->onBackoffEnded();
+ }
return;
}
abort();
}
}
+ if (connectionEventCallback_) {
+ connectionEventCallback_->onBackoffEnded();
+ }
}