File indexing completed on 2024-05-12 17:15:20

0001 /*
0002    Copyright (C) 2014 Andreas Hartmetz <ahartmetz@gmail.com>
0003 
0004    This library is free software; you can redistribute it and/or
0005    modify it under the terms of the GNU Library General Public
0006    License as published by the Free Software Foundation; either
0007    version 2 of the License, or (at your option) any later version.
0008 
0009    This library is distributed in the hope that it will be useful,
0010    but WITHOUT ANY WARRANTY; without even the implied warranty of
0011    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0012    Library General Public License for more details.
0013 
0014    You should have received a copy of the GNU Library General Public License
0015    along with this library; see the file COPYING.LGPL.  If not, write to
0016    the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
0017    Boston, MA 02110-1301, USA.
0018 
0019    Alternatively, this file is available under the Mozilla Public License
0020    Version 1.1.  You may obtain a copy of the License at
0021    http://www.mozilla.org/MPL/
0022 */
0023 
0024 #include "arguments.h"
0025 #include "connectaddress.h"
0026 #include "eventdispatcher.h"
0027 #include "imessagereceiver.h"
0028 #include "message.h"
0029 #include "pendingreply.h"
0030 #include "stringtools.h"
0031 #include "connection.h"
0032 
0033 #include "../testutil.h"
0034 
0035 #include <atomic>
0036 #include <iostream>
0037 #include <thread>
0038 
0039 static const char *echoPath = "/echo";
0040 // make the name "fairly unique" because the interface name is our only protection against replying
0041 // to the wrong message
0042 static const char *echoInterface = "org.example_fb39a8dbd0aa66d2.echo";
0043 static const char *echoMethod = "echo";
0044 
0045 //////////////// Multi-thread ping-pong test ////////////////
0046 
0047 static const char *pingPayload = "-> J. Random PING";
0048 static const char *pongPayload = "<- J. Random Pong";
0049 
0050 class PongSender : public IMessageReceiver
0051 {
0052 public:
0053     void handleSpontaneousMessageReceived(Message ping, Connection *connection) override
0054     {
0055         if (ping.interface() != echoInterface) {
0056             // This is not the ping... it is probably still something from connection setup.
0057             // We can possibly receive many things here that we were not expecting.
0058             return;
0059         }
0060         {
0061             const Arguments args = ping.arguments();
0062             Arguments::Reader reader(args);
0063             cstring payload = reader.readString();
0064             TEST(!reader.error().isError());
0065             TEST(reader.isFinished());
0066             std::cout << "we have ping with payload: " << payload.ptr << std::endl;
0067         }
0068 
0069         {
0070             Message pong = Message::createReplyTo(ping);
0071             Arguments::Writer writer;
0072             writer.writeString(pongPayload);
0073             pong.setArguments(writer.finish());
0074 
0075             std::cout << "\n\nSending pong!\n\n";
0076             Error replyError = connection->sendNoReply(std::move(pong));
0077             TEST(!replyError.isError());
0078 
0079             connection->eventDispatcher()->interrupt();
0080         }
0081     }
0082 };
0083 
0084 static void pongThreadRun(Connection::CommRef mainConnectionRef, std::atomic<bool> *pongThreadReady)
0085 {
0086     std::cout << " Pong thread starting!\n";
0087     EventDispatcher eventDispatcher;
0088     Connection conn(&eventDispatcher, std::move(mainConnectionRef));
0089 
0090     PongSender pongSender;
0091     conn.setSpontaneousMessageReceiver(&pongSender);
0092 
0093     while (eventDispatcher.poll()) {
0094         std::cout << " Pong thread waking up!\n";
0095         if (conn.uniqueName().length()) {
0096             pongThreadReady->store(true);
0097             // HACK: we do this only to wake up the main thread's event loop
0098             std::cout << "\n\nSending WAKEUP package!!\n\n";
0099             Message wakey = Message::createCall(echoPath, "org.notexample.foo", echoMethod);
0100             wakey.setDestination(conn.uniqueName());
0101             conn.sendNoReply(std::move(wakey));
0102         } else {
0103             std::cout << " Pong thread: NO NAME YET!\n";
0104         }
0105         // receive ping message
0106         // send pong message
0107     }
0108     std::cout << " Pong thread almost finished!\n";
0109 }
0110 
0111 class PongReceiver : public IMessageReceiver
0112 {
0113 public:
0114     void handlePendingReplyFinished(PendingReply *pongReply, Connection *) override
0115     {
0116         TEST(!pongReply->error().isError());
0117         Message pong = pongReply->takeReply();
0118 
0119         Arguments args = pong.arguments();
0120         Arguments::Reader reader(args);
0121         std::string strPayload = toStdString(reader.readString());
0122         TEST(!reader.error().isError());
0123         TEST(reader.isFinished());
0124         TEST(strPayload == pongPayload);
0125     }
0126 };
0127 
0128 static void testPingPong()
0129 {
0130     EventDispatcher eventDispatcher;
0131     Connection conn(&eventDispatcher, ConnectAddress::StandardBus::Session);
0132 
0133     std::atomic<bool> pongThreadReady(false);
0134     std::thread pongThread(pongThreadRun, conn.createCommRef(), &pongThreadReady);
0135 
0136     // finish creating the connection
0137     while (conn.uniqueName().empty()) {
0138         std::cout << ".";
0139         eventDispatcher.poll();
0140     }
0141 
0142     std::cout << "we have connection! " << conn.uniqueName() << "\n";
0143 
0144     // send ping message to other thread
0145     Message ping = Message::createCall(echoPath, echoInterface, echoMethod);
0146     Arguments::Writer writer;
0147     writer.writeString(pingPayload);
0148     ping.setArguments(writer.finish());
0149     ping.setDestination(conn.uniqueName());
0150 
0151     PongReceiver pongReceiver;
0152     PendingReply pongReply;
0153 
0154     bool sentPing = false;
0155     while (!sentPing || !pongReply.isFinished()) {
0156         eventDispatcher.poll();
0157         if (pongThreadReady.load() && !sentPing) {
0158             std::cout << "\n\nSending ping!!\n\n";
0159             pongReply = conn.send(std::move(ping));
0160             pongReply.setReceiver(&pongReceiver);
0161             sentPing = true;
0162         }
0163     }
0164     TEST(pongReply.hasNonErrorReply());
0165 
0166     std::cout << "we have pong!\n";
0167 
0168     pongThread.join();
0169 }
0170 
0171 //////////////// Multi-threaded timeout test ////////////////
0172 
0173 class TimeoutReceiver : public IMessageReceiver
0174 {
0175 public:
0176     void handlePendingReplyFinished(PendingReply *reply, Connection *) override
0177     {
0178         TEST(reply->isFinished());
0179         TEST(!reply->hasNonErrorReply());
0180         TEST(reply->error().code() == Error::Timeout);
0181         std::cout << "We HAVE timed out.\n";
0182     }
0183 };
0184 
0185 static void timeoutThreadRun(Connection::CommRef mainConnectionRef, std::atomic<bool> *done)
0186 {
0187     // TODO v turn this into proper documentation in Connection
0188     // Open a Connection "slaved" to the other Connection - it runs its own event loop in this thread
0189     // and has message I/O handled by the Connection in the "master" thread through message passing.
0190     // The main purpose of that is to use just one DBus connection per application( module), which is often
0191     // more convenient for client programmers and brings some limited ordering guarantees.
0192     std::cout << " Other thread starting!\n";
0193     EventDispatcher eventDispatcher;
0194     Connection conn(&eventDispatcher, std::move(mainConnectionRef));
0195     while (!conn.uniqueName().length()) {
0196         eventDispatcher.poll();
0197     }
0198 
0199     Message notRepliedTo = Message::createCall(echoPath, echoInterface, echoMethod);
0200     notRepliedTo.setDestination(conn.uniqueName());
0201 
0202     PendingReply deadReply = conn.send(std::move(notRepliedTo), 50);
0203     TimeoutReceiver timeoutReceiver;
0204     deadReply.setReceiver(&timeoutReceiver);
0205 
0206     while (!deadReply.isFinished()) {
0207         eventDispatcher.poll();
0208     }
0209     *done = true;
0210 }
0211 
0212 static void testThreadedTimeout()
0213 {
0214     EventDispatcher eventDispatcher;
0215     Connection conn(&eventDispatcher, ConnectAddress::StandardBus::Session);
0216 
0217     std::atomic<bool> done(false);
0218     std::thread timeoutThread(timeoutThreadRun, conn.createCommRef(), &done);
0219 
0220     while (!done) {
0221         eventDispatcher.poll();
0222     }
0223 
0224     timeoutThread.join();
0225 }
0226 
0227 
0228 // more things to test:
0229 // - (do we want to do this, and if so here??) blocking on a reply through other thread's connection
0230 // - ping-pong with several messages queued - every message should arrive exactly once and messages
0231 //   should arrive in sending order (can use serials for that as simplificitaion)
0232 
0233 int main(int, char *[])
0234 {
0235     testPingPong();
0236     testThreadedTimeout();
0237     std::cout << "Passed!\n";
0238 }