File indexing completed on 2024-04-28 16:59:42
0001 /* 0002 Copyright (C) 2013 Andreas Hartmetz <ahartmetz@gmail.com> 0003 0004 This library is free software; you can redistribute it and/or 0005 modify it under the terms of the GNU Library General Public 0006 License as published by the Free Software Foundation; either 0007 version 2 of the License, or (at your option) any later version. 0008 0009 This library is distributed in the hope that it will be useful, 0010 but WITHOUT ANY WARRANTY; without even the implied warranty of 0011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 0012 Library General Public License for more details. 0013 0014 You should have received a copy of the GNU Library General Public License 0015 along with this library; see the file COPYING.LGPL. If not, write to 0016 the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, 0017 Boston, MA 02110-1301, USA. 0018 0019 Alternatively, this file is available under the Mozilla Public License 0020 Version 1.1. You may obtain a copy of the License at 0021 http://www.mozilla.org/MPL/ 0022 */ 0023 0024 #include "connection.h" 0025 #include "connection_p.h" 0026 0027 #include "arguments.h" 0028 #include "authclient.h" 0029 #include "event.h" 0030 #include "eventdispatcher_p.h" 0031 #include "icompletionlistener.h" 0032 #include "iconnectionstatelistener.h" 0033 #include "imessagereceiver.h" 0034 #include "iserver.h" 0035 #include "localsocket.h" 0036 #include "message.h" 0037 #include "message_p.h" 0038 #include "pendingreply.h" 0039 #include "pendingreply_p.h" 0040 #include "stringtools.h" 0041 0042 #include <algorithm> 0043 #include <cassert> 0044 0045 class HelloReceiver : public IMessageReceiver 0046 { 0047 public: 0048 void handlePendingReplyFinished(PendingReply *pr, Connection *connection) override 0049 { 0050 assert(pr == &m_helloReply); 0051 (void) pr; 0052 ConnectionPrivate::get(connection)->handleHelloReply(); 0053 } 0054 0055 PendingReply m_helloReply; // keep it here so it conveniently goes away when it's done 0056 }; 0057 0058 class ClientConnectedHandler : public ICompletionListener 0059 { 0060 public: 0061 ~ClientConnectedHandler() override 0062 { 0063 delete m_server; 0064 } 0065 0066 void handleCompletion(void *) override 0067 { 0068 m_parent->handleClientConnected(); 0069 } 0070 0071 IServer *m_server; 0072 ConnectionPrivate *m_parent; 0073 }; 0074 0075 static Connection::State userState(ConnectionPrivate::State ps) 0076 { 0077 switch (ps) { 0078 case ConnectionPrivate::Unconnected: 0079 return Connection::Unconnected; 0080 case ConnectionPrivate::ServerWaitingForClient: 0081 case ConnectionPrivate::Authenticating: 0082 case ConnectionPrivate::AwaitingUniqueName: 0083 return Connection::Connecting; 0084 case ConnectionPrivate::Connected: 0085 return Connection::Connected; 0086 } 0087 assert(false); 0088 return Connection::Unconnected; 0089 } 0090 0091 ConnectionStateChanger::ConnectionStateChanger(ConnectionPrivate *cp) 0092 : m_connPrivate(cp) 0093 { 0094 } 0095 0096 ConnectionStateChanger::ConnectionStateChanger(ConnectionPrivate *cp, ConnectionPrivate::State newState) 0097 : m_connPrivate(cp), 0098 m_oldState(cp->m_state) 0099 { 0100 cp->m_state = newState; 0101 } 0102 0103 ConnectionStateChanger::~ConnectionStateChanger() 0104 { 0105 if (m_oldState < 0) { 0106 return; 0107 } 0108 const Connection::State oldUserState = userState(static_cast<ConnectionPrivate::State>(m_oldState)); 0109 const Connection::State newUserState = userState(m_connPrivate->m_state); 0110 if (oldUserState != newUserState) { 0111 m_connPrivate->notifyStateChange(oldUserState, newUserState); 0112 } 0113 } 0114 0115 void ConnectionStateChanger::setNewState(ConnectionPrivate::State newState) 0116 { 0117 // Ensure that, in the destructor, the new state is always compared to the original old state 0118 if (m_oldState < 0) { 0119 m_oldState = m_connPrivate->m_state; 0120 } 0121 m_connPrivate->m_state = newState; 0122 } 0123 0124 void ConnectionStateChanger::disable() 0125 { 0126 m_oldState = -1; 0127 } 0128 0129 ConnectionPrivate::ConnectionPrivate(Connection *connection, EventDispatcher *dispatcher) 0130 : IIoEventForwarder(EventDispatcherPrivate::get(dispatcher)), 0131 m_connection(connection), 0132 m_eventDispatcher(dispatcher) 0133 { 0134 } 0135 0136 IO::Status ConnectionPrivate::handleIoReady(IO::RW rw) 0137 { 0138 IO::Status status; 0139 IIoEventListener *const downstream = downstreamListener(); 0140 if (m_state == ServerWaitingForClient) { 0141 assert(downstream == m_clientConnectedHandler->m_server); 0142 } else { 0143 assert(downstream == m_transport); 0144 } 0145 if (downstream) { 0146 status = downstream->handleIoReady(rw); 0147 } else { 0148 status = IO::Status::InternalError; 0149 } 0150 0151 if (status != IO::Status::OK) { 0152 if (status != IO::Status::PayloadError) { 0153 close(Error::RemoteDisconnect); 0154 } else { 0155 assert(!m_sendQueue.empty()); 0156 const Message &msg = m_sendQueue.front(); 0157 uint32 failedSerial = msg.serial(); 0158 Error error = msg.error(); 0159 m_sendQueue.pop_front(); 0160 // If the following fails, there is no "spontaneously failed to send" notification mechanism. 0161 // It is not a mistake in this case that it fails silently. 0162 maybeDispatchToPendingReply(failedSerial, error); 0163 } 0164 } 0165 return status; 0166 } 0167 0168 Connection::Connection(EventDispatcher *dispatcher, const ConnectAddress &ca) 0169 : d(new ConnectionPrivate(this, dispatcher)) 0170 { 0171 d->m_connectAddress = ca; 0172 assert(d->m_eventDispatcher); 0173 EventDispatcherPrivate::get(d->m_eventDispatcher)->m_connectionToNotify = d; 0174 0175 if (ca.type() == ConnectAddress::Type::None || ca.role() == ConnectAddress::Role::None) { 0176 return; 0177 } 0178 0179 ConnectionStateChanger stateChanger(d); 0180 0181 if (ca.role() == ConnectAddress::Role::PeerServer) { 0182 // this sets up a server that will be destroyed after accepting exactly one connection 0183 d->m_clientConnectedHandler = new ClientConnectedHandler; 0184 ConnectAddress dummyClientAddress; 0185 IServer *const is = IServer::create(ca, &dummyClientAddress); 0186 if (is && is->isListening()) { 0187 d->addIoListener(is); 0188 is->setNewConnectionListener(d->m_clientConnectedHandler); 0189 d->m_clientConnectedHandler->m_server = is; 0190 d->m_clientConnectedHandler->m_parent = d; 0191 0192 stateChanger.setNewState(ConnectionPrivate::ServerWaitingForClient); 0193 } else { 0194 delete is; 0195 } 0196 } else { 0197 d->m_transport = ITransport::create(ca); 0198 if (d->m_transport && d->m_transport->isOpen()) { 0199 d->addIoListener(d->m_transport); 0200 if (ca.role() == ConnectAddress::Role::BusClient) { 0201 d->startAuthentication(); 0202 stateChanger.setNewState(ConnectionPrivate::Authenticating); 0203 } else { 0204 assert(ca.role() == ConnectAddress::Role::PeerClient); 0205 // get ready to receive messages right away 0206 d->receiveNextMessage(); 0207 stateChanger.setNewState(ConnectionPrivate::Connected); 0208 } 0209 } else { 0210 delete d->m_transport; 0211 d->m_transport = nullptr; 0212 } 0213 } 0214 } 0215 0216 Connection::Connection(EventDispatcher *dispatcher, CommRef mainConnectionRef) 0217 : d(new ConnectionPrivate(this, dispatcher)) 0218 { 0219 EventDispatcherPrivate::get(d->m_eventDispatcher)->m_connectionToNotify = d; 0220 0221 // This must be destroyed after all the Lockers so we notify with no locks held! 0222 ConnectionStateChanger stateChanger(d); 0223 0224 d->m_mainThreadLink = std::move(mainConnectionRef.commutex); 0225 CommutexLocker locker(&d->m_mainThreadLink); 0226 assert(locker.hasLock()); 0227 Commutex *const id = d->m_mainThreadLink.id(); 0228 if (!id) { 0229 assert(false); 0230 return; // stay in Unconnected state 0231 } 0232 0233 d->m_mainThreadConnection = mainConnectionRef.connection; 0234 ConnectionPrivate *mainD = d->m_mainThreadConnection; 0235 0236 // get the current values - if we got them from e.g. the CommRef they could be outdated 0237 // and we don't want to wait for more event ping-pong 0238 SpinLocker mainLocker(&mainD->m_lock); 0239 d->m_connectAddress = mainD->m_connectAddress; 0240 0241 // register with the main Connection 0242 SecondaryConnectionConnectEvent *evt = new SecondaryConnectionConnectEvent(); 0243 evt->connection = d; 0244 evt->id = id; 0245 EventDispatcherPrivate::get(mainD->m_eventDispatcher) 0246 ->queueEvent(std::unique_ptr<Event>(evt)); 0247 stateChanger.setNewState(ConnectionPrivate::AwaitingUniqueName); 0248 } 0249 0250 Connection::Connection(ITransport *transport, EventDispatcher *ed, const ConnectAddress &address) 0251 : d(new ConnectionPrivate(this, ed)) 0252 { 0253 // TODO FULLY validate address, also in the other constructors and in ITransport::create() 0254 // and in IServer::create()! 0255 assert(address.role() == ConnectAddress::Role::PeerServer); 0256 assert(d->m_eventDispatcher); 0257 d->m_transport = transport; 0258 d->addIoListener(d->m_transport); 0259 d->m_connectAddress = address; 0260 EventDispatcherPrivate::get(d->m_eventDispatcher)->m_connectionToNotify = d; 0261 0262 #if 0 0263 // TODO make the client authenticate itself, roughly along these lines 0264 // (not yet investigated whether peer auth is out of spec, optional or mandatory) 0265 // this sets up a server that will be destroyed after accepting exactly one connection 0266 d->m_clientConnectedHandler = new ClientConnectedHandler; 0267 d->m_clientConnectedHandler->m_server = IServer::create(ca); 0268 d->m_clientConnectedHandler->m_server->setEventDispatcher(dispatcher); 0269 d->m_clientConnectedHandler->m_server->setNewConnectionListener(d->m_clientConnectedHandler); 0270 d->m_clientConnectedHandler->m_parent = d; 0271 #endif 0272 d->receiveNextMessage(); 0273 ConnectionStateChanger stateChanger(d, ConnectionPrivate::Connected); 0274 } 0275 0276 Connection::Connection(Connection &&other) 0277 { 0278 d = other.d; 0279 other.d = nullptr; 0280 if (d) { 0281 d->m_connection = this; 0282 } 0283 } 0284 0285 Connection &Connection::operator=(Connection &&other) 0286 { 0287 this->~Connection(); 0288 d = other.d; 0289 other.d = nullptr; 0290 if (d) { 0291 d->m_connection = this; 0292 } 0293 return *this; 0294 } 0295 0296 Connection::~Connection() 0297 { 0298 if (!d) { 0299 return; 0300 } 0301 d->close(Error::LocalDisconnect); 0302 0303 delete d->m_transport; 0304 delete d->m_authClient; 0305 delete d->m_helloReceiver; 0306 delete d->m_receivingMessage; 0307 0308 delete d; 0309 d = nullptr; 0310 } 0311 0312 Connection::State Connection::state() const 0313 { 0314 return userState(d->m_state); 0315 } 0316 0317 void Connection::close() 0318 { 0319 d->close(Error::LocalDisconnect); 0320 } 0321 0322 void ConnectionPrivate::close(Error withError) 0323 { 0324 // Can't be main and secondary at the main time - it could be made to work, but what for? 0325 assert(m_secondaryThreadLinks.empty() || !m_mainThreadConnection); 0326 if (m_closing) 0327 { 0328 // Closing, especially cancelling queued messages, may cause further calls to close() from 0329 // callbacks. The easiest way is to say we're already closing and ignore the additional calls. 0330 return; 0331 } 0332 m_closing = true; 0333 0334 if (m_mainThreadConnection) { 0335 CommutexUnlinker unlinker(&m_mainThreadLink); 0336 if (unlinker.hasLock()) { 0337 SecondaryConnectionDisconnectEvent *evt = new SecondaryConnectionDisconnectEvent(); 0338 evt->connection = this; 0339 EventDispatcherPrivate::get(m_mainThreadConnection->m_eventDispatcher) 0340 ->queueEvent(std::unique_ptr<Event>(evt)); 0341 } 0342 } 0343 0344 // Destroy whatever is suitable and available at a given time, in order to avoid things like 0345 // one secondary thread blocking another indefinitely and smaller dependency-related slowdowns. 0346 while (!m_secondaryThreadLinks.empty()) { 0347 for (auto it = m_secondaryThreadLinks.begin(); it != m_secondaryThreadLinks.end(); ) { 0348 0349 CommutexUnlinker unlinker(&it->second, false); 0350 if (unlinker.willSucceed()) { 0351 if (unlinker.hasLock()) { 0352 MainConnectionDisconnectEvent *evt = new MainConnectionDisconnectEvent(); 0353 evt->error = withError; 0354 EventDispatcherPrivate::get(it->first->m_eventDispatcher) 0355 ->queueEvent(std::unique_ptr<Event>(evt)); 0356 } 0357 unlinker.unlinkNow(); // don't access the element after erasing it, finish it now 0358 it = m_secondaryThreadLinks.erase(it); 0359 } else { 0360 ++it; // don't block, try again next iteration 0361 } 0362 } 0363 } 0364 0365 cancelAllPendingReplies(withError); 0366 0367 EventDispatcherPrivate::get(m_eventDispatcher)->m_connectionToNotify = nullptr; 0368 if (m_transport) { 0369 m_transport->close(); 0370 } 0371 ConnectionStateChanger stateChanger(this, Unconnected); 0372 } 0373 0374 void ConnectionPrivate::startAuthentication() 0375 { 0376 // Reserve serial 1 for the "hello" message - technically not necessary, there is no required ordering 0377 // of serials. 0378 takeNextSerial(); 0379 m_authClient = new AuthClient(m_transport); 0380 m_authClient->setCompletionListener(this); 0381 } 0382 0383 void ConnectionPrivate::handleHelloReply() 0384 { 0385 if (!m_helloReceiver->m_helloReply.hasNonErrorReply()) { 0386 delete m_helloReceiver; 0387 m_helloReceiver = nullptr; 0388 close(Error::RemoteDisconnect); 0389 return; 0390 } 0391 Message msg = m_helloReceiver->m_helloReply.takeReply(); 0392 const Arguments &argList = msg.arguments(); 0393 delete m_helloReceiver; 0394 m_helloReceiver = nullptr; 0395 0396 Arguments::Reader reader(argList); 0397 cstring busName = reader.readString(); 0398 if (reader.state() != Arguments::Finished) { 0399 handleHelloFailed(); 0400 // busName contains garbage here, don't access it! 0401 return; 0402 } 0403 m_uniqueName = toStdString(busName); 0404 0405 // tell current secondaries 0406 UniqueNameReceivedEvent evt; 0407 evt.uniqueName = m_uniqueName; 0408 for (auto &it : m_secondaryThreadLinks) { 0409 CommutexLocker otherLocker(&it.second); 0410 if (otherLocker.hasLock()) { 0411 EventDispatcherPrivate::get(it.first->m_eventDispatcher) 0412 ->queueEvent(std::unique_ptr<Event>(new UniqueNameReceivedEvent(evt))); 0413 } 0414 } 0415 0416 ConnectionStateChanger stateChanger(this, Connected); 0417 } 0418 0419 void ConnectionPrivate::handleHelloFailed() 0420 { 0421 assert(m_state == AwaitingUniqueName); 0422 close(Error::RemoteDisconnect); 0423 } 0424 0425 void ConnectionPrivate::notifyStateChange(Connection::State oldUserState, Connection::State newUserState) 0426 { 0427 if (m_connectionStateListener) { 0428 m_connectionStateListener->handleConnectionChanged(m_connection, oldUserState, newUserState); 0429 } 0430 } 0431 0432 void ConnectionPrivate::handleClientConnected() 0433 { 0434 m_transport = m_clientConnectedHandler->m_server->takeNextClient(); 0435 delete m_clientConnectedHandler; 0436 m_clientConnectedHandler = nullptr; 0437 0438 assert(m_transport); 0439 addIoListener(m_transport); 0440 receiveNextMessage(); 0441 0442 ConnectionStateChanger stateChanger(this, Connected); 0443 } 0444 0445 void Connection::setDefaultReplyTimeout(int msecs) 0446 { 0447 d->m_defaultTimeout = msecs; 0448 } 0449 0450 int Connection::defaultReplyTimeout() const 0451 { 0452 return d->m_defaultTimeout; 0453 } 0454 0455 uint32 ConnectionPrivate::takeNextSerial() 0456 { 0457 uint32 ret; 0458 do { 0459 ret = m_sendSerial.fetch_add(1, std::memory_order_relaxed); 0460 } while (unlikely(ret == 0)); 0461 return ret; 0462 } 0463 0464 Error ConnectionPrivate::prepareSend(Message *msg) 0465 { 0466 if (msg->serial() == 0) { 0467 if (!m_mainThreadConnection) { 0468 msg->setSerial(takeNextSerial()); 0469 } else { 0470 // we take a serial from the other Connection and then serialize locally in order to keep the CPU 0471 // expense of serialization local, even though it's more complicated than doing everything in the 0472 // other thread / Connection. 0473 CommutexLocker locker(&m_mainThreadLink); 0474 if (locker.hasLock()) { 0475 msg->setSerial(m_mainThreadConnection->takeNextSerial()); 0476 } else { 0477 return Error::LocalDisconnect; 0478 } 0479 } 0480 } 0481 0482 MessagePrivate *const mpriv = MessagePrivate::get(msg); // this is unchanged by move()ing the owning Message. 0483 if (!mpriv->serialize()) { 0484 return mpriv->m_error; 0485 } 0486 return Error::NoError; 0487 } 0488 0489 void ConnectionPrivate::sendPreparedMessage(Message msg) 0490 { 0491 MessagePrivate *const mpriv = MessagePrivate::get(&msg); 0492 mpriv->setCompletionListener(this); 0493 m_sendQueue.push_back(std::move(msg)); 0494 if (m_state == ConnectionPrivate::Connected && m_sendQueue.size() == 1) { 0495 // first in queue, don't wait for some other event to trigger sending 0496 mpriv->send(m_transport); 0497 } 0498 } 0499 0500 PendingReply Connection::send(Message m, int timeoutMsecs) 0501 { 0502 if (timeoutMsecs == DefaultTimeout) { 0503 timeoutMsecs = d->m_defaultTimeout; 0504 } 0505 0506 Error error = d->prepareSend(&m); 0507 0508 PendingReplyPrivate *pendingPriv = new PendingReplyPrivate(d->m_eventDispatcher, timeoutMsecs); 0509 pendingPriv->m_connectionOrReply.connection = d; 0510 pendingPriv->m_receiver = nullptr; 0511 pendingPriv->m_serial = m.serial(); 0512 0513 // even if we're handing off I/O to a main Connection, keep a record because that simplifies 0514 // aborting all pending replies when we disconnect from the main Connection, no matter which 0515 // side initiated the disconnection. 0516 d->m_pendingReplies.emplace(m.serial(), pendingPriv); 0517 0518 if (error.isError() || d->m_state == ConnectionPrivate::Unconnected) { 0519 // Signal the error asynchronously, in order to get the same delayed completion callback as in 0520 // the non-error case. This should make the behavior more predictable and client code harder to 0521 // accidentally get wrong. To detect errors immediately, PendingReply::error() can be used. 0522 0523 // An intentionally locally disconnected connection is not in an error state, but trying to send 0524 // a message over it is an error. 0525 pendingPriv->m_error = error.isError() ? error : Error::LocalDisconnect; 0526 pendingPriv->m_replyTimeout.start(0); 0527 } else { 0528 if (!d->m_mainThreadConnection) { 0529 d->sendPreparedMessage(std::move(m)); 0530 } else { 0531 CommutexLocker locker(&d->m_mainThreadLink); 0532 if (locker.hasLock()) { 0533 std::unique_ptr<SendMessageWithPendingReplyEvent> evt(new SendMessageWithPendingReplyEvent); 0534 evt->message = std::move(m); 0535 evt->connection = d; 0536 EventDispatcherPrivate::get(d->m_mainThreadConnection->m_eventDispatcher) 0537 ->queueEvent(std::move(evt)); 0538 } else { 0539 pendingPriv->m_error = Error::LocalDisconnect; 0540 } 0541 } 0542 } 0543 0544 return PendingReply(pendingPriv); 0545 } 0546 0547 Error Connection::sendNoReply(Message m) 0548 { 0549 // ### (when not called from send()) warn if sending a message without the noreply flag set? 0550 // doing that is wasteful, but might be common. needs investigation. 0551 Error error = d->prepareSend(&m); 0552 if (error.isError() || d->m_state == ConnectionPrivate::Unconnected) { 0553 return error.isError() ? error : Error::LocalDisconnect; 0554 } 0555 0556 // pass ownership to the send queue now because if the IO system decided to send the message without 0557 // going through an event loop iteration, handleCompletion would be called and expects the message to 0558 // be in the queue 0559 0560 if (!d->m_mainThreadConnection) { 0561 d->sendPreparedMessage(std::move(m)); 0562 } else { 0563 CommutexLocker locker(&d->m_mainThreadLink); 0564 if (locker.hasLock()) { 0565 std::unique_ptr<SendMessageEvent> evt(new SendMessageEvent); 0566 evt->message = std::move(m); 0567 EventDispatcherPrivate::get(d->m_mainThreadConnection->m_eventDispatcher) 0568 ->queueEvent(std::move(evt)); 0569 } else { 0570 return Error::LocalDisconnect; 0571 } 0572 } 0573 return Error::NoError; 0574 } 0575 0576 size_t Connection::sendQueueLength() const 0577 { 0578 return d->m_sendQueue.size(); 0579 } 0580 0581 void Connection::waitForConnectionEstablished() 0582 { 0583 if (d->m_state != ConnectionPrivate::Authenticating) { 0584 return; 0585 } 0586 while (d->m_state == ConnectionPrivate::Authenticating) { 0587 d->m_authClient->handleTransportCanRead(); 0588 } 0589 if (d->m_state != ConnectionPrivate::AwaitingUniqueName) { 0590 return; 0591 } 0592 // Send the hello message 0593 assert(!d->m_sendQueue.empty()); // the hello message should be in the queue 0594 MessagePrivate *helloPriv = MessagePrivate::get(&d->m_sendQueue.front()); 0595 helloPriv->handleTransportCanWrite(); 0596 0597 // Receive the hello reply 0598 while (d->m_state == ConnectionPrivate::AwaitingUniqueName) { 0599 MessagePrivate::get(d->m_receivingMessage)->handleTransportCanRead(); 0600 } 0601 } 0602 0603 ConnectAddress Connection::connectAddress() const 0604 { 0605 return d->m_connectAddress; 0606 } 0607 0608 std::string Connection::uniqueName() const 0609 { 0610 return d->m_uniqueName; 0611 } 0612 0613 bool Connection::isConnected() const 0614 { 0615 return d->m_transport && d->m_transport->isOpen(); 0616 } 0617 0618 EventDispatcher *Connection::eventDispatcher() const 0619 { 0620 return d->m_eventDispatcher; 0621 } 0622 0623 IMessageReceiver *Connection::spontaneousMessageReceiver() const 0624 { 0625 return d->m_client; 0626 } 0627 0628 void Connection::setSpontaneousMessageReceiver(IMessageReceiver *receiver) 0629 { 0630 d->m_client = receiver; 0631 } 0632 0633 IConnectionStateListener *Connection::connectionStateListener() const 0634 { 0635 return d->m_connectionStateListener; 0636 } 0637 0638 void Connection::setConnectionStateListener(IConnectionStateListener *listener) 0639 { 0640 d->m_connectionStateListener = listener; 0641 } 0642 0643 void ConnectionPrivate::handleCompletion(void *task) 0644 { 0645 switch (m_state) { 0646 case Authenticating: { 0647 assert(task == m_authClient); 0648 assert(m_authClient->isFinished()); 0649 if (!m_authClient->isAuthenticated()) { 0650 close(Error::AuthenticationFailed); 0651 } 0652 m_unixFdPassingEnabled = m_authClient->isUnixFdPassingEnabled(); 0653 delete m_authClient; 0654 m_authClient = nullptr; 0655 if (m_state == Unconnected) { 0656 break; 0657 } 0658 0659 ConnectionStateChanger stateChanger(this, AwaitingUniqueName); 0660 0661 // Announce our presence to the bus and have it send some introductory information of its own 0662 Message hello = Message::createCall("/org/freedesktop/DBus", "org.freedesktop.DBus", "Hello"); 0663 hello.setSerial(1); 0664 hello.setExpectsReply(false); 0665 hello.setDestination(std::string("org.freedesktop.DBus")); 0666 MessagePrivate *const helloPriv = MessagePrivate::get(&hello); 0667 0668 m_helloReceiver = new HelloReceiver; 0669 m_helloReceiver->m_helloReply = m_connection->send(std::move(hello)); 0670 // Small hack: Connection::send() refuses to really start sending if the connection isn't in 0671 // Connected state. So force the sending here to actually get to Connected state. 0672 helloPriv->send(m_transport); 0673 // Also ensure that the hello message is sent before any other messages that may have been 0674 // already enqueued by an API client 0675 if (m_sendQueue.size() > 1) { 0676 hello = std::move(m_sendQueue.back()); 0677 m_sendQueue.pop_back(); 0678 m_sendQueue.push_front(std::move(hello)); 0679 } 0680 m_helloReceiver->m_helloReply.setReceiver(m_helloReceiver); 0681 // get ready to receive the first message, the hello reply 0682 receiveNextMessage(); 0683 0684 break; 0685 } 0686 case AwaitingUniqueName: // the code paths for these two states only diverge in the PendingReply handler 0687 case Connected: { 0688 assert(!m_authClient); 0689 if (!m_sendQueue.empty() && task == &m_sendQueue.front()) { 0690 Message *msg = static_cast<Message* >(task); 0691 if (msg->error().isError()) { 0692 if (m_state == AwaitingUniqueName) { 0693 handleHelloFailed(); 0694 } 0695 // TODO else also close the connection? (maybe depending on which error it is) 0696 } 0697 m_sendQueue.pop_front(); 0698 if (!m_sendQueue.empty()) { 0699 MessagePrivate::get(&m_sendQueue.front())->send(m_transport); 0700 } 0701 } else { 0702 assert(task == m_receivingMessage); 0703 Message *const receivedMessage = m_receivingMessage; 0704 0705 receiveNextMessage(); 0706 0707 if (receivedMessage->type() == Message::InvalidMessage) { 0708 if (m_state == AwaitingUniqueName) { 0709 handleHelloFailed(); 0710 } 0711 delete receivedMessage; 0712 } else if (!maybeDispatchToPendingReply(receivedMessage)) { 0713 if (m_client) { 0714 m_client->handleSpontaneousMessageReceived(Message(std::move(*receivedMessage)), 0715 m_connection); 0716 } 0717 // dispatch to other threads listening to spontaneous messages, if any 0718 for (auto it = m_secondaryThreadLinks.begin(); it != m_secondaryThreadLinks.end(); ) { 0719 SpontaneousMessageReceivedEvent *evt = new SpontaneousMessageReceivedEvent(); 0720 if (std::next(it) != m_secondaryThreadLinks.end()) { 0721 evt->message = *receivedMessage; 0722 } else { 0723 evt->message = std::move(*receivedMessage); 0724 } 0725 0726 CommutexLocker otherLocker(&it->second); 0727 if (otherLocker.hasLock()) { 0728 EventDispatcherPrivate::get(it->first->m_eventDispatcher) 0729 ->queueEvent(std::unique_ptr<Event>(evt)); 0730 ++it; 0731 } else { 0732 ConnectionPrivate *connection = it->first; 0733 it = m_secondaryThreadLinks.erase(it); 0734 discardPendingRepliesForSecondaryThread(connection); 0735 delete evt; 0736 } 0737 } 0738 delete receivedMessage; 0739 } 0740 } 0741 break; 0742 } 0743 default: 0744 // ### decide what to do here 0745 break; 0746 }; 0747 } 0748 0749 bool ConnectionPrivate::maybeDispatchToPendingReply(Message *receivedMessage) 0750 { 0751 if (receivedMessage->type() != Message::MethodReturnMessage && 0752 receivedMessage->type() != Message::ErrorMessage) { 0753 return false; 0754 } 0755 0756 auto it = m_pendingReplies.find(receivedMessage->replySerial()); 0757 if (it == m_pendingReplies.end()) { 0758 return false; 0759 } 0760 0761 if (PendingReplyPrivate *pr = it->second.asPendingReply()) { 0762 m_pendingReplies.erase(it); 0763 assert(!pr->m_isFinished); 0764 pr->handleReceived(receivedMessage); 0765 } else { 0766 // forward to other thread's Connection 0767 ConnectionPrivate *connection = it->second.asConnection(); 0768 m_pendingReplies.erase(it); 0769 assert(connection); 0770 PendingReplySuccessEvent *evt = new PendingReplySuccessEvent; 0771 evt->reply = std::move(*receivedMessage); 0772 delete receivedMessage; 0773 EventDispatcherPrivate::get(connection->m_eventDispatcher)->queueEvent(std::unique_ptr<Event>(evt)); 0774 } 0775 return true; 0776 } 0777 0778 bool ConnectionPrivate::maybeDispatchToPendingReply(uint32 serial, Error error) 0779 { 0780 assert(error.isError()); 0781 auto it = m_pendingReplies.find(serial); 0782 if (it == m_pendingReplies.end()) { 0783 return false; 0784 } 0785 0786 if (PendingReplyPrivate *pr = it->second.asPendingReply()) { 0787 m_pendingReplies.erase(it); 0788 assert(!pr->m_isFinished); 0789 pr->handleError(error); 0790 } else { 0791 // forward to other thread's Connection 0792 ConnectionPrivate *connection = it->second.asConnection(); 0793 m_pendingReplies.erase(it); 0794 assert(connection); 0795 PendingReplyFailureEvent *evt = new PendingReplyFailureEvent; 0796 evt->m_serial = serial; 0797 evt->m_error = error; 0798 EventDispatcherPrivate::get(connection->m_eventDispatcher)->queueEvent(std::unique_ptr<Event>(evt)); 0799 } 0800 return true; 0801 } 0802 0803 void ConnectionPrivate::receiveNextMessage() 0804 { 0805 m_receivingMessage = new Message; 0806 MessagePrivate *const mpriv = MessagePrivate::get(m_receivingMessage); 0807 mpriv->setCompletionListener(this); 0808 mpriv->receive(m_transport); 0809 } 0810 0811 void ConnectionPrivate::unregisterPendingReply(PendingReplyPrivate *p) 0812 { 0813 if (m_mainThreadConnection) { 0814 CommutexLocker otherLocker(&m_mainThreadLink); 0815 if (otherLocker.hasLock()) { 0816 PendingReplyCancelEvent *evt = new PendingReplyCancelEvent; 0817 evt->serial = p->m_serial; 0818 EventDispatcherPrivate::get(m_mainThreadConnection->m_eventDispatcher) 0819 ->queueEvent(std::unique_ptr<Event>(evt)); 0820 } 0821 } 0822 #ifndef NDEBUG 0823 auto it = m_pendingReplies.find(p->m_serial); 0824 assert(it != m_pendingReplies.end()); 0825 if (!m_mainThreadConnection) { 0826 assert(it->second.asPendingReply()); 0827 assert(it->second.asPendingReply() == p); 0828 } 0829 #endif 0830 m_pendingReplies.erase(p->m_serial); 0831 } 0832 0833 void ConnectionPrivate::cancelAllPendingReplies(Error withError) 0834 { 0835 // No locking because we should have no connections to other threads anymore at this point. 0836 // No const iteration followed by container clear because that has different semantics - many 0837 // things can happen in a callback... 0838 // In case we have pending replies for secondary threads, and we cancel all pending replies, 0839 // that is because we're shutting down, which we told the secondary thread, and it will deal 0840 // with bulk cancellation of replies. We just throw away our records about them. 0841 for (auto it = m_pendingReplies.begin() ; it != m_pendingReplies.end(); ) { 0842 PendingReplyPrivate *pendingPriv = it->second.asPendingReply(); 0843 it = m_pendingReplies.erase(it); 0844 if (pendingPriv) { // if from this thread 0845 pendingPriv->handleError(withError); 0846 } 0847 } 0848 m_sendQueue.clear(); 0849 } 0850 0851 void ConnectionPrivate::discardPendingRepliesForSecondaryThread(ConnectionPrivate *connection) 0852 { 0853 for (auto it = m_pendingReplies.begin() ; it != m_pendingReplies.end(); ) { 0854 if (it->second.asConnection() == connection) { 0855 it = m_pendingReplies.erase(it); 0856 // notification and deletion are handled on the event's source thread 0857 } else { 0858 ++it; 0859 } 0860 } 0861 } 0862 0863 void ConnectionPrivate::processEvent(Event *evt) 0864 { 0865 // std::cerr << "ConnectionPrivate::processEvent() with event type " << evt->type << std::endl; 0866 0867 switch (evt->type) { 0868 case Event::SendMessage: 0869 sendPreparedMessage(std::move(static_cast<SendMessageEvent *>(evt)->message)); 0870 break; 0871 0872 case Event::SendMessageWithPendingReply: { 0873 SendMessageWithPendingReplyEvent *pre = static_cast<SendMessageWithPendingReplyEvent *>(evt); 0874 m_pendingReplies.emplace(pre->message.serial(), pre->connection); 0875 sendPreparedMessage(std::move(pre->message)); 0876 break; 0877 } 0878 case Event::SpontaneousMessageReceived: 0879 if (m_client) { 0880 SpontaneousMessageReceivedEvent *smre = static_cast<SpontaneousMessageReceivedEvent *>(evt); 0881 m_client->handleSpontaneousMessageReceived(Message(std::move(smre->message)), m_connection); 0882 } 0883 break; 0884 0885 case Event::PendingReplySuccess: 0886 maybeDispatchToPendingReply(&static_cast<PendingReplySuccessEvent *>(evt)->reply); 0887 break; 0888 0889 case Event::PendingReplyFailure: { 0890 PendingReplyFailureEvent *prfe = static_cast<PendingReplyFailureEvent *>(evt); 0891 const auto it = m_pendingReplies.find(prfe->m_serial); 0892 if (it == m_pendingReplies.end()) { 0893 // not a disaster, but when it happens in debug mode I want to check it out 0894 assert(false); 0895 break; 0896 } 0897 PendingReplyPrivate *pendingPriv = it->second.asPendingReply(); 0898 m_pendingReplies.erase(it); 0899 pendingPriv->handleError(prfe->m_error); 0900 break; 0901 } 0902 0903 case Event::PendingReplyCancel: 0904 // This comes from a secondary thread, which handles PendingReply notification itself. 0905 m_pendingReplies.erase(static_cast<PendingReplyCancelEvent *>(evt)->serial); 0906 break; 0907 0908 case Event::SecondaryConnectionConnect: { 0909 SecondaryConnectionConnectEvent *sce = static_cast<SecondaryConnectionConnectEvent *>(evt); 0910 0911 const auto it = find_if(m_unredeemedCommRefs.begin(), m_unredeemedCommRefs.end(), 0912 [sce](const CommutexPeer &item) { return item.id() == sce->id; } ); 0913 assert(it != m_unredeemedCommRefs.end()); 0914 const auto emplaced = m_secondaryThreadLinks.emplace(sce->connection, std::move(*it)).first; 0915 m_unredeemedCommRefs.erase(it); 0916 0917 // "welcome package" - it's done (only) as an event to avoid locking order issues 0918 CommutexLocker locker(&emplaced->second); 0919 if (locker.hasLock()) { 0920 UniqueNameReceivedEvent *evt = new UniqueNameReceivedEvent; 0921 evt->uniqueName = m_uniqueName; 0922 EventDispatcherPrivate::get(sce->connection->m_eventDispatcher) 0923 ->queueEvent(std::unique_ptr<Event>(evt)); 0924 } 0925 0926 break; 0927 } 0928 0929 case Event::SecondaryConnectionDisconnect: { 0930 SecondaryConnectionDisconnectEvent *sde = static_cast<SecondaryConnectionDisconnectEvent *>(evt); 0931 // delete our records to make sure we don't call into it in the future! 0932 const auto found = m_secondaryThreadLinks.find(sde->connection); 0933 if (found == m_secondaryThreadLinks.end()) { 0934 // looks like we've noticed the disappearance of the other thread earlier 0935 return; 0936 } 0937 m_secondaryThreadLinks.erase(found); 0938 discardPendingRepliesForSecondaryThread(sde->connection); 0939 break; 0940 } 0941 case Event::MainConnectionDisconnect: { 0942 // since the main thread *sent* us the event, it already knows to drop all our PendingReplies 0943 m_mainThreadConnection = nullptr; 0944 MainConnectionDisconnectEvent *mcde = static_cast<MainConnectionDisconnectEvent *>(evt); 0945 cancelAllPendingReplies(mcde->error); 0946 break; 0947 } 0948 case Event::UniqueNameReceived: 0949 // We get this when the unique name became available after we were linked up with the main thread 0950 m_uniqueName = static_cast<UniqueNameReceivedEvent *>(evt)->uniqueName; 0951 if (m_state == AwaitingUniqueName) { 0952 ConnectionStateChanger stateChanger(this, Connected); 0953 } 0954 break; 0955 } 0956 } 0957 0958 Connection::CommRef Connection::createCommRef() 0959 { 0960 // TODO this is a good time to clean up "dead" CommRefs, where the counterpart was destroyed. 0961 CommRef ret; 0962 ret.connection = d; 0963 std::pair<CommutexPeer, CommutexPeer> link = CommutexPeer::createLink(); 0964 { 0965 SpinLocker mainLocker(&d->m_lock); 0966 d->m_unredeemedCommRefs.emplace_back(std::move(link.first)); 0967 } 0968 ret.commutex = std::move(link.second); 0969 return ret; 0970 } 0971 0972 uint32 Connection::supportedFileDescriptorsPerMessage() const 0973 { 0974 return (d->m_transport && d->m_unixFdPassingEnabled) ? 0975 d->m_transport->supportedPassingUnixFdsCount() : 0; 0976 }