File indexing completed on 2024-04-28 16:59:42

0001 /*
0002    Copyright (C) 2013 Andreas Hartmetz <ahartmetz@gmail.com>
0003 
0004    This library is free software; you can redistribute it and/or
0005    modify it under the terms of the GNU Library General Public
0006    License as published by the Free Software Foundation; either
0007    version 2 of the License, or (at your option) any later version.
0008 
0009    This library is distributed in the hope that it will be useful,
0010    but WITHOUT ANY WARRANTY; without even the implied warranty of
0011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0012    Library General Public License for more details.
0013 
0014    You should have received a copy of the GNU Library General Public License
0015    along with this library; see the file COPYING.LGPL.  If not, write to
0016    the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
0017    Boston, MA 02110-1301, USA.
0018 
0019    Alternatively, this file is available under the Mozilla Public License
0020    Version 1.1.  You may obtain a copy of the License at
0021    http://www.mozilla.org/MPL/
0022 */
0023 
0024 #include "connection.h"
0025 #include "connection_p.h"
0026 
0027 #include "arguments.h"
0028 #include "authclient.h"
0029 #include "event.h"
0030 #include "eventdispatcher_p.h"
0031 #include "icompletionlistener.h"
0032 #include "iconnectionstatelistener.h"
0033 #include "imessagereceiver.h"
0034 #include "iserver.h"
0035 #include "localsocket.h"
0036 #include "message.h"
0037 #include "message_p.h"
0038 #include "pendingreply.h"
0039 #include "pendingreply_p.h"
0040 #include "stringtools.h"
0041 
0042 #include <algorithm>
0043 #include <cassert>
0044 
0045 class HelloReceiver : public IMessageReceiver
0046 {
0047 public:
0048     void handlePendingReplyFinished(PendingReply *pr, Connection *connection) override
0049     {
0050         assert(pr == &m_helloReply);
0051         (void) pr;
0052         ConnectionPrivate::get(connection)->handleHelloReply();
0053     }
0054 
0055     PendingReply m_helloReply; // keep it here so it conveniently goes away when it's done
0056 };
0057 
0058 class ClientConnectedHandler : public ICompletionListener
0059 {
0060 public:
0061     ~ClientConnectedHandler() override
0062     {
0063         delete m_server;
0064     }
0065 
0066     void handleCompletion(void *) override
0067     {
0068         m_parent->handleClientConnected();
0069     }
0070 
0071     IServer *m_server;
0072     ConnectionPrivate *m_parent;
0073 };
0074 
0075 static Connection::State userState(ConnectionPrivate::State ps)
0076 {
0077     switch (ps) {
0078     case ConnectionPrivate::Unconnected:
0079         return Connection::Unconnected;
0080     case ConnectionPrivate::ServerWaitingForClient:
0081     case ConnectionPrivate::Authenticating:
0082     case ConnectionPrivate::AwaitingUniqueName:
0083         return Connection::Connecting;
0084     case ConnectionPrivate::Connected:
0085         return Connection::Connected;
0086     }
0087     assert(false);
0088     return Connection::Unconnected;
0089 }
0090 
0091 ConnectionStateChanger::ConnectionStateChanger(ConnectionPrivate *cp)
0092    : m_connPrivate(cp)
0093 {
0094 }
0095 
0096 ConnectionStateChanger::ConnectionStateChanger(ConnectionPrivate *cp, ConnectionPrivate::State newState)
0097    : m_connPrivate(cp),
0098      m_oldState(cp->m_state)
0099 {
0100     cp->m_state = newState;
0101 }
0102 
0103 ConnectionStateChanger::~ConnectionStateChanger()
0104 {
0105     if (m_oldState < 0) {
0106         return;
0107     }
0108     const Connection::State oldUserState = userState(static_cast<ConnectionPrivate::State>(m_oldState));
0109     const Connection::State newUserState = userState(m_connPrivate->m_state);
0110     if (oldUserState != newUserState) {
0111         m_connPrivate->notifyStateChange(oldUserState, newUserState);
0112     }
0113 }
0114 
0115 void ConnectionStateChanger::setNewState(ConnectionPrivate::State newState)
0116 {
0117     // Ensure that, in the destructor, the new state is always compared to the original old state
0118     if (m_oldState < 0) {
0119         m_oldState = m_connPrivate->m_state;
0120     }
0121     m_connPrivate->m_state = newState;
0122 }
0123 
0124 void ConnectionStateChanger::disable()
0125 {
0126     m_oldState = -1;
0127 }
0128 
0129 ConnectionPrivate::ConnectionPrivate(Connection *connection, EventDispatcher *dispatcher)
0130    : IIoEventForwarder(EventDispatcherPrivate::get(dispatcher)),
0131      m_connection(connection),
0132      m_eventDispatcher(dispatcher)
0133 {
0134 }
0135 
0136 IO::Status ConnectionPrivate::handleIoReady(IO::RW rw)
0137 {
0138     IO::Status status;
0139     IIoEventListener *const downstream = downstreamListener();
0140     if (m_state == ServerWaitingForClient) {
0141         assert(downstream == m_clientConnectedHandler->m_server);
0142     } else {
0143         assert(downstream == m_transport);
0144     }
0145     if (downstream) {
0146         status = downstream->handleIoReady(rw);
0147     } else {
0148         status = IO::Status::InternalError;
0149     }
0150 
0151     if (status != IO::Status::OK) {
0152         if (status != IO::Status::PayloadError) {
0153             close(Error::RemoteDisconnect);
0154         } else {
0155             assert(!m_sendQueue.empty());
0156             const Message &msg = m_sendQueue.front();
0157             uint32 failedSerial = msg.serial();
0158             Error error = msg.error();
0159             m_sendQueue.pop_front();
0160             // If the following fails, there is no "spontaneously failed to send" notification mechanism.
0161             // It is not a mistake in this case that it fails silently.
0162             maybeDispatchToPendingReply(failedSerial, error);
0163         }
0164     }
0165     return status;
0166 }
0167 
0168 Connection::Connection(EventDispatcher *dispatcher, const ConnectAddress &ca)
0169    : d(new ConnectionPrivate(this, dispatcher))
0170 {
0171     d->m_connectAddress = ca;
0172     assert(d->m_eventDispatcher);
0173     EventDispatcherPrivate::get(d->m_eventDispatcher)->m_connectionToNotify = d;
0174 
0175     if (ca.type() == ConnectAddress::Type::None || ca.role() == ConnectAddress::Role::None) {
0176         return;
0177     }
0178 
0179     ConnectionStateChanger stateChanger(d);
0180 
0181     if (ca.role() == ConnectAddress::Role::PeerServer) {
0182         // this sets up a server that will be destroyed after accepting exactly one connection
0183         d->m_clientConnectedHandler = new ClientConnectedHandler;
0184         ConnectAddress dummyClientAddress;
0185         IServer *const is = IServer::create(ca, &dummyClientAddress);
0186         if (is && is->isListening()) {
0187             d->addIoListener(is);
0188             is->setNewConnectionListener(d->m_clientConnectedHandler);
0189             d->m_clientConnectedHandler->m_server = is;
0190             d->m_clientConnectedHandler->m_parent = d;
0191 
0192             stateChanger.setNewState(ConnectionPrivate::ServerWaitingForClient);
0193         } else {
0194             delete is;
0195         }
0196     } else {
0197         d->m_transport = ITransport::create(ca);
0198         if (d->m_transport && d->m_transport->isOpen()) {
0199             d->addIoListener(d->m_transport);
0200             if (ca.role() == ConnectAddress::Role::BusClient) {
0201                 d->startAuthentication();
0202                 stateChanger.setNewState(ConnectionPrivate::Authenticating);
0203             } else {
0204                 assert(ca.role() == ConnectAddress::Role::PeerClient);
0205                 // get ready to receive messages right away
0206                 d->receiveNextMessage();
0207                 stateChanger.setNewState(ConnectionPrivate::Connected);
0208             }
0209         } else {
0210             delete d->m_transport;
0211             d->m_transport = nullptr;
0212         }
0213     }
0214 }
0215 
0216 Connection::Connection(EventDispatcher *dispatcher, CommRef mainConnectionRef)
0217    : d(new ConnectionPrivate(this, dispatcher))
0218 {
0219     EventDispatcherPrivate::get(d->m_eventDispatcher)->m_connectionToNotify = d;
0220 
0221     // This must be destroyed after all the Lockers so we notify with no locks held!
0222     ConnectionStateChanger stateChanger(d);
0223 
0224     d->m_mainThreadLink = std::move(mainConnectionRef.commutex);
0225     CommutexLocker locker(&d->m_mainThreadLink);
0226     assert(locker.hasLock());
0227     Commutex *const id = d->m_mainThreadLink.id();
0228     if (!id) {
0229         assert(false);
0230         return; // stay in Unconnected state
0231     }
0232 
0233     d->m_mainThreadConnection = mainConnectionRef.connection;
0234     ConnectionPrivate *mainD = d->m_mainThreadConnection;
0235 
0236     // get the current values - if we got them from e.g. the CommRef they could be outdated
0237     // and we don't want to wait for more event ping-pong
0238     SpinLocker mainLocker(&mainD->m_lock);
0239     d->m_connectAddress = mainD->m_connectAddress;
0240 
0241     // register with the main Connection
0242     SecondaryConnectionConnectEvent *evt = new SecondaryConnectionConnectEvent();
0243     evt->connection = d;
0244     evt->id = id;
0245     EventDispatcherPrivate::get(mainD->m_eventDispatcher)
0246                                 ->queueEvent(std::unique_ptr<Event>(evt));
0247     stateChanger.setNewState(ConnectionPrivate::AwaitingUniqueName);
0248 }
0249 
0250 Connection::Connection(ITransport *transport, EventDispatcher *ed, const ConnectAddress &address)
0251    : d(new ConnectionPrivate(this, ed))
0252 {
0253     // TODO FULLY validate address, also in the other constructors and in ITransport::create()
0254     //      and in IServer::create()!
0255     assert(address.role() == ConnectAddress::Role::PeerServer);
0256     assert(d->m_eventDispatcher);
0257     d->m_transport = transport;
0258     d->addIoListener(d->m_transport);
0259     d->m_connectAddress = address;
0260     EventDispatcherPrivate::get(d->m_eventDispatcher)->m_connectionToNotify = d;
0261 
0262 #if 0
0263     // TODO make the client authenticate itself, roughly along these lines
0264     //      (not yet investigated whether peer auth is out of spec, optional or mandatory)
0265     // this sets up a server that will be destroyed after accepting exactly one connection
0266     d->m_clientConnectedHandler = new ClientConnectedHandler;
0267     d->m_clientConnectedHandler->m_server = IServer::create(ca);
0268     d->m_clientConnectedHandler->m_server->setEventDispatcher(dispatcher);
0269     d->m_clientConnectedHandler->m_server->setNewConnectionListener(d->m_clientConnectedHandler);
0270     d->m_clientConnectedHandler->m_parent = d;
0271 #endif
0272     d->receiveNextMessage();
0273     ConnectionStateChanger stateChanger(d, ConnectionPrivate::Connected);
0274 }
0275 
0276 Connection::Connection(Connection &&other)
0277 {
0278     d = other.d;
0279     other.d = nullptr;
0280     if (d) {
0281         d->m_connection = this;
0282     }
0283 }
0284 
0285 Connection &Connection::operator=(Connection &&other)
0286 {
0287     this->~Connection();
0288     d = other.d;
0289     other.d = nullptr;
0290     if (d) {
0291         d->m_connection = this;
0292     }
0293     return *this;
0294 }
0295 
0296 Connection::~Connection()
0297 {
0298     if (!d) {
0299         return;
0300     }
0301     d->close(Error::LocalDisconnect);
0302 
0303     delete d->m_transport;
0304     delete d->m_authClient;
0305     delete d->m_helloReceiver;
0306     delete d->m_receivingMessage;
0307 
0308     delete d;
0309     d = nullptr;
0310 }
0311 
0312 Connection::State Connection::state() const
0313 {
0314     return userState(d->m_state);
0315 }
0316 
0317 void Connection::close()
0318 {
0319     d->close(Error::LocalDisconnect);
0320 }
0321 
0322 void ConnectionPrivate::close(Error withError)
0323 {
0324     // Can't be main and secondary at the main time - it could be made to work, but what for?
0325     assert(m_secondaryThreadLinks.empty() || !m_mainThreadConnection);
0326     if (m_closing)
0327     {
0328         // Closing, especially cancelling queued messages, may cause further calls to close() from
0329         // callbacks. The easiest way is to say we're already closing and ignore the additional calls.
0330         return;
0331     }
0332     m_closing = true;
0333 
0334     if (m_mainThreadConnection) {
0335         CommutexUnlinker unlinker(&m_mainThreadLink);
0336         if (unlinker.hasLock()) {
0337             SecondaryConnectionDisconnectEvent *evt = new SecondaryConnectionDisconnectEvent();
0338             evt->connection = this;
0339             EventDispatcherPrivate::get(m_mainThreadConnection->m_eventDispatcher)
0340                 ->queueEvent(std::unique_ptr<Event>(evt));
0341         }
0342     }
0343 
0344     // Destroy whatever is suitable and available at a given time, in order to avoid things like
0345     // one secondary thread blocking another indefinitely and smaller dependency-related slowdowns.
0346     while (!m_secondaryThreadLinks.empty()) {
0347         for (auto it = m_secondaryThreadLinks.begin(); it != m_secondaryThreadLinks.end(); ) {
0348 
0349             CommutexUnlinker unlinker(&it->second, false);
0350             if (unlinker.willSucceed()) {
0351                 if (unlinker.hasLock()) {
0352                     MainConnectionDisconnectEvent *evt = new MainConnectionDisconnectEvent();
0353                     evt->error = withError;
0354                     EventDispatcherPrivate::get(it->first->m_eventDispatcher)
0355                         ->queueEvent(std::unique_ptr<Event>(evt));
0356                 }
0357                 unlinker.unlinkNow(); // don't access the element after erasing it, finish it now
0358                 it = m_secondaryThreadLinks.erase(it);
0359             } else {
0360                 ++it; // don't block, try again next iteration
0361             }
0362         }
0363     }
0364 
0365     cancelAllPendingReplies(withError);
0366 
0367     EventDispatcherPrivate::get(m_eventDispatcher)->m_connectionToNotify = nullptr;
0368     if (m_transport) {
0369         m_transport->close();
0370     }
0371     ConnectionStateChanger stateChanger(this, Unconnected);
0372 }
0373 
0374 void ConnectionPrivate::startAuthentication()
0375 {
0376     // Reserve serial 1 for the "hello" message - technically not necessary, there is no required ordering
0377     // of serials.
0378     takeNextSerial();
0379     m_authClient = new AuthClient(m_transport);
0380     m_authClient->setCompletionListener(this);
0381 }
0382 
0383 void ConnectionPrivate::handleHelloReply()
0384 {
0385     if (!m_helloReceiver->m_helloReply.hasNonErrorReply()) {
0386         delete m_helloReceiver;
0387         m_helloReceiver = nullptr;
0388         close(Error::RemoteDisconnect);
0389         return;
0390     }
0391     Message msg = m_helloReceiver->m_helloReply.takeReply();
0392     const Arguments &argList = msg.arguments();
0393     delete m_helloReceiver;
0394     m_helloReceiver = nullptr;
0395 
0396     Arguments::Reader reader(argList);
0397     cstring busName = reader.readString();
0398     if (reader.state() != Arguments::Finished) {
0399         handleHelloFailed();
0400         // busName contains garbage here, don't access it!
0401         return;
0402     }
0403     m_uniqueName = toStdString(busName);
0404 
0405     // tell current secondaries
0406     UniqueNameReceivedEvent evt;
0407     evt.uniqueName = m_uniqueName;
0408     for (auto &it : m_secondaryThreadLinks) {
0409         CommutexLocker otherLocker(&it.second);
0410         if (otherLocker.hasLock()) {
0411             EventDispatcherPrivate::get(it.first->m_eventDispatcher)
0412                 ->queueEvent(std::unique_ptr<Event>(new UniqueNameReceivedEvent(evt)));
0413         }
0414     }
0415 
0416     ConnectionStateChanger stateChanger(this, Connected);
0417 }
0418 
0419 void ConnectionPrivate::handleHelloFailed()
0420 {
0421     assert(m_state == AwaitingUniqueName);
0422     close(Error::RemoteDisconnect);
0423 }
0424 
0425 void ConnectionPrivate::notifyStateChange(Connection::State oldUserState, Connection::State newUserState)
0426 {
0427     if (m_connectionStateListener) {
0428         m_connectionStateListener->handleConnectionChanged(m_connection, oldUserState, newUserState);
0429     }
0430 }
0431 
0432 void ConnectionPrivate::handleClientConnected()
0433 {
0434     m_transport = m_clientConnectedHandler->m_server->takeNextClient();
0435     delete m_clientConnectedHandler;
0436     m_clientConnectedHandler = nullptr;
0437 
0438     assert(m_transport);
0439     addIoListener(m_transport);
0440     receiveNextMessage();
0441 
0442     ConnectionStateChanger stateChanger(this, Connected);
0443 }
0444 
0445 void Connection::setDefaultReplyTimeout(int msecs)
0446 {
0447     d->m_defaultTimeout = msecs;
0448 }
0449 
0450 int Connection::defaultReplyTimeout() const
0451 {
0452     return d->m_defaultTimeout;
0453 }
0454 
0455 uint32 ConnectionPrivate::takeNextSerial()
0456 {
0457     uint32 ret;
0458     do {
0459         ret = m_sendSerial.fetch_add(1, std::memory_order_relaxed);
0460     } while (unlikely(ret == 0));
0461     return ret;
0462 }
0463 
0464 Error ConnectionPrivate::prepareSend(Message *msg)
0465 {
0466     if (msg->serial() == 0) {
0467         if (!m_mainThreadConnection) {
0468             msg->setSerial(takeNextSerial());
0469         } else {
0470             // we take a serial from the other Connection and then serialize locally in order to keep the CPU
0471             // expense of serialization local, even though it's more complicated than doing everything in the
0472             // other thread / Connection.
0473             CommutexLocker locker(&m_mainThreadLink);
0474             if (locker.hasLock()) {
0475                 msg->setSerial(m_mainThreadConnection->takeNextSerial());
0476             } else {
0477                 return Error::LocalDisconnect;
0478             }
0479         }
0480     }
0481 
0482     MessagePrivate *const mpriv = MessagePrivate::get(msg); // this is unchanged by move()ing the owning Message.
0483     if (!mpriv->serialize()) {
0484         return mpriv->m_error;
0485     }
0486     return Error::NoError;
0487 }
0488 
0489 void ConnectionPrivate::sendPreparedMessage(Message msg)
0490 {
0491     MessagePrivate *const mpriv = MessagePrivate::get(&msg);
0492     mpriv->setCompletionListener(this);
0493     m_sendQueue.push_back(std::move(msg));
0494     if (m_state == ConnectionPrivate::Connected && m_sendQueue.size() == 1) {
0495         // first in queue, don't wait for some other event to trigger sending
0496         mpriv->send(m_transport);
0497     }
0498 }
0499 
0500 PendingReply Connection::send(Message m, int timeoutMsecs)
0501 {
0502     if (timeoutMsecs == DefaultTimeout) {
0503         timeoutMsecs = d->m_defaultTimeout;
0504     }
0505 
0506     Error error = d->prepareSend(&m);
0507 
0508     PendingReplyPrivate *pendingPriv = new PendingReplyPrivate(d->m_eventDispatcher, timeoutMsecs);
0509     pendingPriv->m_connectionOrReply.connection = d;
0510     pendingPriv->m_receiver = nullptr;
0511     pendingPriv->m_serial = m.serial();
0512 
0513     // even if we're handing off I/O to a main Connection, keep a record because that simplifies
0514     // aborting all pending replies when we disconnect from the main Connection, no matter which
0515     // side initiated the disconnection.
0516     d->m_pendingReplies.emplace(m.serial(), pendingPriv);
0517 
0518     if (error.isError() || d->m_state == ConnectionPrivate::Unconnected) {
0519         // Signal the error asynchronously, in order to get the same delayed completion callback as in
0520         // the non-error case. This should make the behavior more predictable and client code harder to
0521         // accidentally get wrong. To detect errors immediately, PendingReply::error() can be used.
0522 
0523         // An intentionally locally disconnected connection is not in an error state, but trying to send
0524         // a message over it is an error.
0525         pendingPriv->m_error = error.isError() ? error : Error::LocalDisconnect;
0526         pendingPriv->m_replyTimeout.start(0);
0527     } else {
0528         if (!d->m_mainThreadConnection) {
0529             d->sendPreparedMessage(std::move(m));
0530         } else {
0531             CommutexLocker locker(&d->m_mainThreadLink);
0532             if (locker.hasLock()) {
0533                 std::unique_ptr<SendMessageWithPendingReplyEvent> evt(new SendMessageWithPendingReplyEvent);
0534                 evt->message = std::move(m);
0535                 evt->connection = d;
0536                 EventDispatcherPrivate::get(d->m_mainThreadConnection->m_eventDispatcher)
0537                     ->queueEvent(std::move(evt));
0538             } else {
0539                 pendingPriv->m_error = Error::LocalDisconnect;
0540             }
0541         }
0542     }
0543 
0544     return PendingReply(pendingPriv);
0545 }
0546 
0547 Error Connection::sendNoReply(Message m)
0548 {
0549     // ### (when not called from send()) warn if sending a message without the noreply flag set?
0550     //     doing that is wasteful, but might be common. needs investigation.
0551     Error error = d->prepareSend(&m);
0552     if (error.isError() || d->m_state == ConnectionPrivate::Unconnected) {
0553         return error.isError() ? error : Error::LocalDisconnect;
0554     }
0555 
0556     // pass ownership to the send queue now because if the IO system decided to send the message without
0557     // going through an event loop iteration, handleCompletion would be called and expects the message to
0558     // be in the queue
0559 
0560     if (!d->m_mainThreadConnection) {
0561         d->sendPreparedMessage(std::move(m));
0562     } else {
0563         CommutexLocker locker(&d->m_mainThreadLink);
0564         if (locker.hasLock()) {
0565             std::unique_ptr<SendMessageEvent> evt(new SendMessageEvent);
0566             evt->message = std::move(m);
0567             EventDispatcherPrivate::get(d->m_mainThreadConnection->m_eventDispatcher)
0568                 ->queueEvent(std::move(evt));
0569         } else {
0570             return Error::LocalDisconnect;
0571         }
0572     }
0573     return Error::NoError;
0574 }
0575 
0576 size_t Connection::sendQueueLength() const
0577 {
0578     return d->m_sendQueue.size();
0579 }
0580 
0581 void Connection::waitForConnectionEstablished()
0582 {
0583     if (d->m_state != ConnectionPrivate::Authenticating) {
0584         return;
0585     }
0586     while (d->m_state == ConnectionPrivate::Authenticating) {
0587         d->m_authClient->handleTransportCanRead();
0588     }
0589     if (d->m_state != ConnectionPrivate::AwaitingUniqueName) {
0590         return;
0591     }
0592     // Send the hello message
0593     assert(!d->m_sendQueue.empty()); // the hello message should be in the queue
0594     MessagePrivate *helloPriv = MessagePrivate::get(&d->m_sendQueue.front());
0595     helloPriv->handleTransportCanWrite();
0596 
0597     // Receive the hello reply
0598     while (d->m_state == ConnectionPrivate::AwaitingUniqueName) {
0599         MessagePrivate::get(d->m_receivingMessage)->handleTransportCanRead();
0600     }
0601 }
0602 
0603 ConnectAddress Connection::connectAddress() const
0604 {
0605     return d->m_connectAddress;
0606 }
0607 
0608 std::string Connection::uniqueName() const
0609 {
0610     return d->m_uniqueName;
0611 }
0612 
0613 bool Connection::isConnected() const
0614 {
0615     return d->m_transport && d->m_transport->isOpen();
0616 }
0617 
0618 EventDispatcher *Connection::eventDispatcher() const
0619 {
0620     return d->m_eventDispatcher;
0621 }
0622 
0623 IMessageReceiver *Connection::spontaneousMessageReceiver() const
0624 {
0625     return d->m_client;
0626 }
0627 
0628 void Connection::setSpontaneousMessageReceiver(IMessageReceiver *receiver)
0629 {
0630     d->m_client = receiver;
0631 }
0632 
0633 IConnectionStateListener *Connection::connectionStateListener() const
0634 {
0635     return d->m_connectionStateListener;
0636 }
0637 
0638 void Connection::setConnectionStateListener(IConnectionStateListener *listener)
0639 {
0640     d->m_connectionStateListener = listener;
0641 }
0642 
0643 void ConnectionPrivate::handleCompletion(void *task)
0644 {
0645     switch (m_state) {
0646     case Authenticating: {
0647         assert(task == m_authClient);
0648         assert(m_authClient->isFinished());
0649         if (!m_authClient->isAuthenticated()) {
0650             close(Error::AuthenticationFailed);
0651         }
0652         m_unixFdPassingEnabled = m_authClient->isUnixFdPassingEnabled();
0653         delete m_authClient;
0654         m_authClient = nullptr;
0655         if (m_state == Unconnected) {
0656             break;
0657         }
0658 
0659         ConnectionStateChanger stateChanger(this, AwaitingUniqueName);
0660 
0661         // Announce our presence to the bus and have it send some introductory information of its own
0662         Message hello = Message::createCall("/org/freedesktop/DBus", "org.freedesktop.DBus", "Hello");
0663         hello.setSerial(1);
0664         hello.setExpectsReply(false);
0665         hello.setDestination(std::string("org.freedesktop.DBus"));
0666         MessagePrivate *const helloPriv = MessagePrivate::get(&hello);
0667 
0668         m_helloReceiver = new HelloReceiver;
0669         m_helloReceiver->m_helloReply = m_connection->send(std::move(hello));
0670         // Small hack: Connection::send() refuses to really start sending if the connection isn't in
0671         // Connected state. So force the sending here to actually get to Connected state.
0672         helloPriv->send(m_transport);
0673         // Also ensure that the hello message is sent before any other messages that may have been
0674         // already enqueued by an API client
0675         if (m_sendQueue.size() > 1) {
0676             hello = std::move(m_sendQueue.back());
0677             m_sendQueue.pop_back();
0678             m_sendQueue.push_front(std::move(hello));
0679         }
0680         m_helloReceiver->m_helloReply.setReceiver(m_helloReceiver);
0681         // get ready to receive the first message, the hello reply
0682         receiveNextMessage();
0683 
0684         break;
0685     }
0686     case AwaitingUniqueName: // the code paths for these two states only diverge in the PendingReply handler
0687     case Connected: {
0688         assert(!m_authClient);
0689         if (!m_sendQueue.empty() && task == &m_sendQueue.front()) {
0690             Message *msg = static_cast<Message* >(task);
0691             if (msg->error().isError()) {
0692                 if (m_state == AwaitingUniqueName) {
0693                     handleHelloFailed();
0694                 }
0695                 // TODO else also close the connection? (maybe depending on which error it is)
0696             }
0697             m_sendQueue.pop_front();
0698             if (!m_sendQueue.empty()) {
0699                 MessagePrivate::get(&m_sendQueue.front())->send(m_transport);
0700             }
0701         } else {
0702             assert(task == m_receivingMessage);
0703             Message *const receivedMessage = m_receivingMessage;
0704 
0705             receiveNextMessage();
0706 
0707             if (receivedMessage->type() == Message::InvalidMessage) {
0708                 if (m_state == AwaitingUniqueName) {
0709                     handleHelloFailed();
0710                 }
0711                 delete receivedMessage;
0712             } else if (!maybeDispatchToPendingReply(receivedMessage)) {
0713                 if (m_client) {
0714                     m_client->handleSpontaneousMessageReceived(Message(std::move(*receivedMessage)),
0715                                                                m_connection);
0716                 }
0717                 // dispatch to other threads listening to spontaneous messages, if any
0718                 for (auto it = m_secondaryThreadLinks.begin(); it != m_secondaryThreadLinks.end(); ) {
0719                     SpontaneousMessageReceivedEvent *evt = new SpontaneousMessageReceivedEvent();
0720                     if (std::next(it) != m_secondaryThreadLinks.end()) {
0721                         evt->message = *receivedMessage;
0722                     } else {
0723                         evt->message = std::move(*receivedMessage);
0724                     }
0725 
0726                     CommutexLocker otherLocker(&it->second);
0727                     if (otherLocker.hasLock()) {
0728                         EventDispatcherPrivate::get(it->first->m_eventDispatcher)
0729                             ->queueEvent(std::unique_ptr<Event>(evt));
0730                         ++it;
0731                     } else {
0732                         ConnectionPrivate *connection = it->first;
0733                         it = m_secondaryThreadLinks.erase(it);
0734                         discardPendingRepliesForSecondaryThread(connection);
0735                         delete evt;
0736                     }
0737                 }
0738                 delete receivedMessage;
0739             }
0740         }
0741         break;
0742     }
0743     default:
0744         // ### decide what to do here
0745         break;
0746     };
0747 }
0748 
0749 bool ConnectionPrivate::maybeDispatchToPendingReply(Message *receivedMessage)
0750 {
0751     if (receivedMessage->type() != Message::MethodReturnMessage &&
0752         receivedMessage->type() != Message::ErrorMessage) {
0753         return false;
0754     }
0755 
0756     auto it = m_pendingReplies.find(receivedMessage->replySerial());
0757     if (it == m_pendingReplies.end()) {
0758         return false;
0759     }
0760 
0761     if (PendingReplyPrivate *pr = it->second.asPendingReply()) {
0762         m_pendingReplies.erase(it);
0763         assert(!pr->m_isFinished);
0764         pr->handleReceived(receivedMessage);
0765     } else {
0766         // forward to other thread's Connection
0767         ConnectionPrivate *connection = it->second.asConnection();
0768         m_pendingReplies.erase(it);
0769         assert(connection);
0770         PendingReplySuccessEvent *evt = new PendingReplySuccessEvent;
0771         evt->reply = std::move(*receivedMessage);
0772         delete receivedMessage;
0773         EventDispatcherPrivate::get(connection->m_eventDispatcher)->queueEvent(std::unique_ptr<Event>(evt));
0774     }
0775     return true;
0776 }
0777 
0778 bool ConnectionPrivate::maybeDispatchToPendingReply(uint32 serial, Error error)
0779 {
0780     assert(error.isError());
0781     auto it = m_pendingReplies.find(serial);
0782     if (it == m_pendingReplies.end()) {
0783         return false;
0784     }
0785 
0786     if (PendingReplyPrivate *pr = it->second.asPendingReply()) {
0787         m_pendingReplies.erase(it);
0788         assert(!pr->m_isFinished);
0789         pr->handleError(error);
0790     } else {
0791         // forward to other thread's Connection
0792         ConnectionPrivate *connection = it->second.asConnection();
0793         m_pendingReplies.erase(it);
0794         assert(connection);
0795         PendingReplyFailureEvent *evt = new PendingReplyFailureEvent;
0796         evt->m_serial = serial;
0797         evt->m_error = error;
0798         EventDispatcherPrivate::get(connection->m_eventDispatcher)->queueEvent(std::unique_ptr<Event>(evt));
0799     }
0800     return true;
0801 }
0802 
0803 void ConnectionPrivate::receiveNextMessage()
0804 {
0805     m_receivingMessage = new Message;
0806     MessagePrivate *const mpriv = MessagePrivate::get(m_receivingMessage);
0807     mpriv->setCompletionListener(this);
0808     mpriv->receive(m_transport);
0809 }
0810 
0811 void ConnectionPrivate::unregisterPendingReply(PendingReplyPrivate *p)
0812 {
0813     if (m_mainThreadConnection) {
0814         CommutexLocker otherLocker(&m_mainThreadLink);
0815         if (otherLocker.hasLock()) {
0816             PendingReplyCancelEvent *evt = new PendingReplyCancelEvent;
0817             evt->serial = p->m_serial;
0818             EventDispatcherPrivate::get(m_mainThreadConnection->m_eventDispatcher)
0819                 ->queueEvent(std::unique_ptr<Event>(evt));
0820         }
0821     }
0822 #ifndef NDEBUG
0823     auto it = m_pendingReplies.find(p->m_serial);
0824     assert(it != m_pendingReplies.end());
0825     if (!m_mainThreadConnection) {
0826         assert(it->second.asPendingReply());
0827         assert(it->second.asPendingReply() == p);
0828     }
0829 #endif
0830     m_pendingReplies.erase(p->m_serial);
0831 }
0832 
0833 void ConnectionPrivate::cancelAllPendingReplies(Error withError)
0834 {
0835     // No locking because we should have no connections to other threads anymore at this point.
0836     // No const iteration followed by container clear because that has different semantics - many
0837     // things can happen in a callback...
0838     // In case we have pending replies for secondary threads, and we cancel all pending replies,
0839     // that is because we're shutting down, which we told the secondary thread, and it will deal
0840     // with bulk cancellation of replies. We just throw away our records about them.
0841     for (auto it = m_pendingReplies.begin() ; it != m_pendingReplies.end(); ) {
0842         PendingReplyPrivate *pendingPriv = it->second.asPendingReply();
0843         it = m_pendingReplies.erase(it);
0844         if (pendingPriv) { // if from this thread
0845             pendingPriv->handleError(withError);
0846         }
0847     }
0848     m_sendQueue.clear();
0849 }
0850 
0851 void ConnectionPrivate::discardPendingRepliesForSecondaryThread(ConnectionPrivate *connection)
0852 {
0853     for (auto it = m_pendingReplies.begin() ; it != m_pendingReplies.end(); ) {
0854         if (it->second.asConnection() == connection) {
0855             it = m_pendingReplies.erase(it);
0856             // notification and deletion are handled on the event's source thread
0857         } else {
0858             ++it;
0859         }
0860     }
0861 }
0862 
0863 void ConnectionPrivate::processEvent(Event *evt)
0864 {
0865     // std::cerr << "ConnectionPrivate::processEvent() with event type " << evt->type << std::endl;
0866 
0867     switch (evt->type) {
0868     case Event::SendMessage:
0869         sendPreparedMessage(std::move(static_cast<SendMessageEvent *>(evt)->message));
0870         break;
0871 
0872     case Event::SendMessageWithPendingReply: {
0873         SendMessageWithPendingReplyEvent *pre = static_cast<SendMessageWithPendingReplyEvent *>(evt);
0874         m_pendingReplies.emplace(pre->message.serial(), pre->connection);
0875         sendPreparedMessage(std::move(pre->message));
0876         break;
0877     }
0878     case Event::SpontaneousMessageReceived:
0879         if (m_client) {
0880             SpontaneousMessageReceivedEvent *smre = static_cast<SpontaneousMessageReceivedEvent *>(evt);
0881             m_client->handleSpontaneousMessageReceived(Message(std::move(smre->message)), m_connection);
0882         }
0883         break;
0884 
0885     case Event::PendingReplySuccess:
0886         maybeDispatchToPendingReply(&static_cast<PendingReplySuccessEvent *>(evt)->reply);
0887         break;
0888 
0889     case Event::PendingReplyFailure: {
0890         PendingReplyFailureEvent *prfe = static_cast<PendingReplyFailureEvent *>(evt);
0891         const auto it = m_pendingReplies.find(prfe->m_serial);
0892         if (it == m_pendingReplies.end()) {
0893             // not a disaster, but when it happens in debug mode I want to check it out
0894             assert(false);
0895             break;
0896         }
0897         PendingReplyPrivate *pendingPriv = it->second.asPendingReply();
0898         m_pendingReplies.erase(it);
0899         pendingPriv->handleError(prfe->m_error);
0900         break;
0901     }
0902 
0903     case Event::PendingReplyCancel:
0904         // This comes from a secondary thread, which handles PendingReply notification itself.
0905         m_pendingReplies.erase(static_cast<PendingReplyCancelEvent *>(evt)->serial);
0906         break;
0907 
0908     case Event::SecondaryConnectionConnect: {
0909         SecondaryConnectionConnectEvent *sce = static_cast<SecondaryConnectionConnectEvent *>(evt);
0910 
0911         const auto it = find_if(m_unredeemedCommRefs.begin(), m_unredeemedCommRefs.end(),
0912                             [sce](const CommutexPeer &item) { return item.id() == sce->id; } );
0913         assert(it != m_unredeemedCommRefs.end());
0914         const auto emplaced = m_secondaryThreadLinks.emplace(sce->connection, std::move(*it)).first;
0915         m_unredeemedCommRefs.erase(it);
0916 
0917         // "welcome package" - it's done (only) as an event to avoid locking order issues
0918         CommutexLocker locker(&emplaced->second);
0919         if (locker.hasLock()) {
0920             UniqueNameReceivedEvent *evt = new UniqueNameReceivedEvent;
0921             evt->uniqueName = m_uniqueName;
0922             EventDispatcherPrivate::get(sce->connection->m_eventDispatcher)
0923                 ->queueEvent(std::unique_ptr<Event>(evt));
0924         }
0925 
0926         break;
0927     }
0928 
0929     case Event::SecondaryConnectionDisconnect: {
0930         SecondaryConnectionDisconnectEvent *sde = static_cast<SecondaryConnectionDisconnectEvent *>(evt);
0931         // delete our records to make sure we don't call into it in the future!
0932         const auto found = m_secondaryThreadLinks.find(sde->connection);
0933         if (found == m_secondaryThreadLinks.end()) {
0934             // looks like we've noticed the disappearance of the other thread earlier
0935             return;
0936         }
0937         m_secondaryThreadLinks.erase(found);
0938         discardPendingRepliesForSecondaryThread(sde->connection);
0939         break;
0940     }
0941     case Event::MainConnectionDisconnect: {
0942         // since the main thread *sent* us the event, it already knows to drop all our PendingReplies
0943         m_mainThreadConnection = nullptr;
0944         MainConnectionDisconnectEvent *mcde = static_cast<MainConnectionDisconnectEvent *>(evt);
0945         cancelAllPendingReplies(mcde->error);
0946         break;
0947     }
0948     case Event::UniqueNameReceived:
0949         // We get this when the unique name became available after we were linked up with the main thread
0950         m_uniqueName = static_cast<UniqueNameReceivedEvent *>(evt)->uniqueName;
0951         if (m_state == AwaitingUniqueName) {
0952             ConnectionStateChanger stateChanger(this, Connected);
0953         }
0954         break;
0955     }
0956 }
0957 
0958 Connection::CommRef Connection::createCommRef()
0959 {
0960     // TODO this is a good time to clean up "dead" CommRefs, where the counterpart was destroyed.
0961     CommRef ret;
0962     ret.connection = d;
0963     std::pair<CommutexPeer, CommutexPeer> link = CommutexPeer::createLink();
0964     {
0965         SpinLocker mainLocker(&d->m_lock);
0966         d->m_unredeemedCommRefs.emplace_back(std::move(link.first));
0967     }
0968     ret.commutex = std::move(link.second);
0969     return ret;
0970 }
0971 
0972 uint32 Connection::supportedFileDescriptorsPerMessage() const
0973 {
0974     return (d->m_transport && d->m_unixFdPassingEnabled) ?
0975                 d->m_transport->supportedPassingUnixFdsCount() : 0;
0976 }