File indexing completed on 2024-05-12 17:15:20

0001 /*
0002    Copyright (C) 2013 Andreas Hartmetz <ahartmetz@gmail.com>
0003 
0004    This library is free software; you can redistribute it and/or
0005    modify it under the terms of the GNU Library General Public
0006    License as published by the Free Software Foundation; either
0007    version 2 of the License, or (at your option) any later version.
0008 
0009    This library is distributed in the hope that it will be useful,
0010    but WITHOUT ANY WARRANTY; without even the implied warranty of
0011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0012    Library General Public License for more details.
0013 
0014    You should have received a copy of the GNU Library General Public License
0015    along with this library; see the file COPYING.LGPL.  If not, write to
0016    the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
0017    Boston, MA 02110-1301, USA.
0018 
0019    Alternatively, this file is available under the Mozilla Public License
0020    Version 1.1.  You may obtain a copy of the License at
0021    http://www.mozilla.org/MPL/
0022 */
0023 
0024 #include "arguments.h"
0025 #include "connectaddress.h"
0026 #include "connection.h"
0027 #include "eventdispatcher.h"
0028 #include "iconnectionstatelistener.h"
0029 #include "imessagereceiver.h"
0030 #include "inewconnectionlistener.h"
0031 #include "message.h"
0032 #include "pendingreply.h"
0033 #include "server.h"
0034 
0035 #include "../testutil.h"
0036 
0037 #include <algorithm>
0038 #include <iostream>
0039 #include <string>
0040 #include <thread>
0041 #include <vector>
0042 
0043 /*
0044 
0045 Sequence diagram of successful test runs. There are three runs with three connections each, where what
0046 happens in the second connection changes from test run to test run.
0047 
0048 Events are assumed to be asynchronous between threads, unless indicated as in the following example:
0049 Accept connection       <-          Connect to server
0050                                     Talk to rubber duckie
0051 In plain words: The server must accept after the client starts connecting, not earlier.
0052 In this case, the ordering is enforced naturally, in other cases, auxiliary synchronization is needed.
0053 Note that it WELL POSSIBLE that "talk to rubber duckie" has already happened when the connection is
0054 accepted by the server - the only guarantee is that accept happens after connect.
0055 
0056 
0057 TODO:
0058 - how to test connections?
0059 - review, add further details
0060 
0061 
0062 Server thread           .           Client thread
0063 
0064 ###### First connection - always succeeds
0065 
0066 Set up server           ->          Create thread
0067 Accept connection       <-          Connect
0068 Receive TestMsg         <-          Send TestMsg
0069 Send TestReply          ->          Receive TestReply
0070 
0071 
0072 ###### Second connection - succeeds (test run 1) or fails due to closing by client (test run 2)
0073 ###### or fals due to closing by server (test run 3)
0074 
0075 Signal next connection  -> ...
0076 
0077 ### test run 1 - connection 2 succeeds
0078 
0079 Accept                  <-          Connect
0080 Receive TestMsg         <-          Send TestMsg
0081 Send TestReply          ->          Receive TestReply (test checks this)
0082 
0083 ### test run 2 - connection 2 failed by client
0084 
0085 Accept                  <-          Connect
0086 Receive connection      <-          Close
0087   closed error (check)
0088 
0089 ### test run 3 - connection 2 failed by server
0090 
0091 Accept                  <-          Connect
0092                                     Send TestMsg
0093 Close                   ->          Receive failed PendingReply (check)
0094 
0095 
0096 ###### Third connection - always succeeds
0097 
0098 Accept                  <-          Connect
0099 Receive TestMsg         <-          Send TestMsg
0100 Send TestReply          ->          Receive TestReply (check)
0101 
0102 */
0103 
0104 
0105 enum TestConstants
0106 {
0107     BrokenConnectionIndex = 1,
0108     ConnectionsPerTestRun = 3,
0109 
0110     NoFailTestRun = 0,
0111     ClientCloseTestRun = 1,
0112     ServerCloseTestRun = 2,
0113     TestRunCount = 3,
0114 
0115     ReplyTimeoutMsecs = 25000 // TODO back to 250
0116 };
0117 
0118 //////////////////////// client thread (a secondary thread) /////////////////////
0119 
0120 class ClientSideHandlers : public IConnectionStateListener, public IMessageReceiver
0121 {
0122 public:
0123     ~ClientSideHandlers() {}
0124 
0125     // IConnectionStateListener
0126     void handleConnectionChanged(Connection *, Connection::State, Connection::State newState) override
0127     {
0128         if (newState == Connection::Unconnected && m_testRunIndex == ServerCloseTestRun) {
0129             std::cerr << "Client thread: handling disconnect" << std::endl;
0130             m_serverClosedConnections++;
0131         }
0132     }
0133 
0134     // IMessageReceiver
0135     void handlePendingReplyFinished(PendingReply *pr, Connection *connection) override
0136     {
0137         std::cerr << "Client thread: received pong" << " " << pr->hasNonErrorReply() << std::endl;
0138         if (pr->hasNonErrorReply()) {
0139             m_receivedSuccessReplies++;
0140         } else {
0141             m_receivedErrorReplies++;
0142         }
0143         connection->eventDispatcher()->interrupt();
0144     }
0145 
0146     int m_testRunIndex = 0;
0147     int m_serverClosedConnections = 0;
0148     int m_receivedSuccessReplies = 0;
0149     int m_receivedErrorReplies = 0;
0150 };
0151 
0152 
0153 static void clientThreadRun(ConnectAddress address, int testRunIndex)
0154 {
0155     EventDispatcher eventDispatcher;
0156     ClientSideHandlers clientHandlers;
0157     clientHandlers.m_testRunIndex = testRunIndex;
0158 
0159     // Client-side connections - these call listeners in ClientSideHandlers when closing during
0160     // destruction. The easiest way to ensure a non-crashing destruction order is to put them here.
0161     std::vector<Connection> connections;
0162 
0163     for (int i = 0; i < TestConstants::ConnectionsPerTestRun; i++) {
0164         std::cerr << "Client thread: test run " << testRunIndex << " / connection " << i << std::endl;
0165         connections.push_back(Connection(&eventDispatcher, address));
0166         connections.back().setConnectionStateListener(&clientHandlers);
0167 
0168 #if 1
0169         if (i == TestConstants::BrokenConnectionIndex && testRunIndex == ClientCloseTestRun) {
0170             std::cerr << "Client thread: closing connection" << std::endl;
0171             connections.back().close();
0172             std::cerr << "Client thread: closed connection" << std::endl;
0173             continue;
0174         }
0175 #endif
0176         Message ping = Message::createCall("/foo", "org.bar.interface", "serverTest");
0177         PendingReply pendingReply = connections.back().send(std::move(ping), ReplyTimeoutMsecs);
0178         std::cerr << "Client thread: sent ping" << std::endl;
0179 #if 0
0180         if (i == TestConstants::BrokenConnectionIndex && testRunIndex == ClientCloseTestRun) {
0181             std::cerr << "Client thread: closing connection" << std::endl;
0182             while (connections.back().sendQueueLength()) {
0183                 eventDispatcher.poll();
0184             }
0185             connections.back().close();
0186             std::cerr << "Client thread: closed connection" << std::endl;
0187             continue;
0188         } else
0189 #endif
0190         pendingReply.setReceiver(&clientHandlers);
0191 
0192         while (eventDispatcher.poll()) {
0193         }
0194 
0195         if (i == TestConstants::BrokenConnectionIndex && testRunIndex == ServerCloseTestRun) {
0196             TEST(pendingReply.error().code() == Error::RemoteDisconnect);
0197         } else {
0198             TEST(!pendingReply.error().isError());
0199         }
0200     }
0201 
0202     if (testRunIndex == NoFailTestRun) {
0203         TEST(clientHandlers.m_serverClosedConnections == 0);
0204         TEST(clientHandlers.m_receivedSuccessReplies == 3);
0205         TEST(clientHandlers.m_receivedErrorReplies == 0);
0206     } else if (testRunIndex == ClientCloseTestRun) {
0207         TEST(clientHandlers.m_serverClosedConnections == 0);
0208         TEST(clientHandlers.m_receivedSuccessReplies == 2);
0209         TEST(clientHandlers.m_receivedErrorReplies == 0);
0210     } else {
0211         TEST(clientHandlers.m_serverClosedConnections == 1);
0212         TEST(clientHandlers.m_receivedSuccessReplies == 2);
0213         TEST(clientHandlers.m_receivedErrorReplies == 1);
0214     }
0215 }
0216 
0217 //////////////////////// server thread (the main thread) /////////////////////
0218 
0219 class ServerSideHandlers : public INewConnectionListener, public IConnectionStateListener,
0220                            public IMessageReceiver
0221 {
0222 public:
0223     // INewConnectionListener
0224     void handleNewConnection(Server *server) override
0225     {
0226         std::unique_ptr<Connection> conn(server->takeNextClient());
0227         TEST(conn); // for now this is simply not allowed... we could try to check why this
0228                     // happened, if it ever happens
0229         conn->setSpontaneousMessageReceiver(this);
0230         conn->setConnectionStateListener(this);
0231         const size_t connectionIndex = m_connections.size();
0232         m_connections.push_back(std::move(*conn));
0233 
0234         if (connectionIndex == TestConstants::BrokenConnectionIndex &&
0235             m_testRunIndex == ServerCloseTestRun) {
0236             m_connections.back().close();
0237             stopListeningToConnection(&m_connections.back(), "we closed");
0238         }
0239     }
0240 
0241     // IConnectionStateListener
0242     void handleConnectionChanged(Connection *conn, Connection::State oldState,
0243                                  Connection::State newState) override
0244     {
0245         const auto it = std::find_if(m_connections.begin(), m_connections.end(),
0246                                      [conn] (Connection &c) { return conn == &c; });
0247         const int connIndex = std::distance(m_connections.begin(), it);
0248 
0249         std::cerr << "Server thread: handling state change @ index " << connIndex
0250                   << " from " << oldState << " to " << newState << std::endl;
0251         if (newState != Connection::Unconnected) {
0252             return;
0253         }
0254 
0255         std::cerr << "Server thread: handling disconnect @ index " << connIndex << std::endl;
0256         if (connIndex == TestConstants::BrokenConnectionIndex) {
0257             if (m_testRunIndex == ClientCloseTestRun) {
0258                 std::cerr << "  *** HURZ ***" << std::endl;
0259                 m_clientClosedConnectionAtTheRightPoint++;
0260                 stopListeningToConnection(conn, "disconnected");
0261             }
0262         }
0263     }
0264 
0265     // IMessageReceiver
0266     void handleSpontaneousMessageReceived(Message message, Connection *conn) override
0267     {
0268         std::cerr << "Server thread: received ping" << std::endl;
0269         conn->sendNoReply(Message::createReplyTo(message));
0270         stopListeningToConnection(conn, "ping received");
0271     }
0272 
0273     int m_testRunIndex = 0;
0274     std::vector<Connection> m_connections;  // server-side connections
0275     int m_connectionsFullyHandled = 0;
0276     int m_clientClosedConnectionAtTheRightPoint = 0;
0277 
0278 private:
0279     void stopListeningToConnection(Connection *conn, const char *reason)
0280     {
0281         std::cerr << "Server thread: start ignoring connection because " << reason << std::endl;
0282         conn->setSpontaneousMessageReceiver(nullptr);
0283         conn->setConnectionStateListener(nullptr);
0284         m_connectionsFullyHandled++;
0285     }
0286 };
0287 
0288 static void testAcceptMultiple(int testRunIndex)
0289 {
0290     // Accept multiple connections and run a ping-pong message test on each. If withFail is true,
0291     // abort one connection from the client side and check that the rest still works.
0292     EventDispatcher eventDispatcher;
0293 
0294     ConnectAddress addr;
0295     addr.setRole(ConnectAddress::Role::PeerServer);
0296 #ifdef __unix__
0297     addr.setType(ConnectAddress::Type::TmpDir);
0298     addr.setPath("/tmp");
0299 #else
0300     addr.setType(ConnectAddress::Type::Tcp);
0301     addr.setPort(36816 /* randomly selected ;) */);
0302 #endif
0303 
0304     Server server(&eventDispatcher, addr);
0305     ServerSideHandlers serverHandler;
0306     serverHandler.m_testRunIndex = testRunIndex;
0307     server.setNewConnectionListener(&serverHandler);
0308 
0309     ConnectAddress clientAddr = server.concreteAddress();
0310     clientAddr.setRole(ConnectAddress::Role::PeerClient);
0311     std::thread clientThread(clientThreadRun, clientAddr, testRunIndex);
0312 
0313     while (serverHandler.m_connectionsFullyHandled < ConnectionsPerTestRun ||
0314            (serverHandler.m_connections.back().state() != Connection::Unconnected &&
0315             serverHandler.m_connections.back().sendQueueLength())) {
0316         eventDispatcher.poll();
0317     }
0318 
0319     clientThread.join();
0320 
0321     TEST(serverHandler.m_connectionsFullyHandled == ConnectionsPerTestRun);
0322     if (testRunIndex == ClientCloseTestRun) {
0323         TEST(serverHandler.m_clientClosedConnectionAtTheRightPoint == 1);
0324     } else {
0325         TEST(serverHandler.m_clientClosedConnectionAtTheRightPoint == 0);
0326     }
0327 }
0328 
0329 
0330 int main(int, char *[])
0331 {
0332     for (int i = 0; i < TestRunCount; i++) {
0333         testAcceptMultiple(i);
0334     }
0335     std::cout << "Passed!\n";
0336 }