File indexing completed on 2024-05-12 17:15:20
0001 /* 0002 Copyright (C) 2014 Andreas Hartmetz <ahartmetz@gmail.com> 0003 0004 This library is free software; you can redistribute it and/or 0005 modify it under the terms of the GNU Library General Public 0006 License as published by the Free Software Foundation; either 0007 version 2 of the License, or (at your option) any later version. 0008 0009 This library is distributed in the hope that it will be useful, 0010 but WITHOUT ANY WARRANTY; without even the implied warranty of 0011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 0012 Library General Public License for more details. 0013 0014 You should have received a copy of the GNU Library General Public License 0015 along with this library; see the file COPYING.LGPL. If not, write to 0016 the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, 0017 Boston, MA 02110-1301, USA. 0018 0019 Alternatively, this file is available under the Mozilla Public License 0020 Version 1.1. You may obtain a copy of the License at 0021 http://www.mozilla.org/MPL/ 0022 */ 0023 0024 #include "arguments.h" 0025 #include "connectaddress.h" 0026 #include "eventdispatcher.h" 0027 #include "imessagereceiver.h" 0028 #include "message.h" 0029 #include "pendingreply.h" 0030 #include "stringtools.h" 0031 #include "connection.h" 0032 0033 #include "../testutil.h" 0034 0035 #include <atomic> 0036 #include <iostream> 0037 #include <thread> 0038 0039 static const char *echoPath = "/echo"; 0040 // make the name "fairly unique" because the interface name is our only protection against replying 0041 // to the wrong message 0042 static const char *echoInterface = "org.example_fb39a8dbd0aa66d2.echo"; 0043 static const char *echoMethod = "echo"; 0044 0045 //////////////// Multi-thread ping-pong test //////////////// 0046 0047 static const char *pingPayload = "-> J. Random PING"; 0048 static const char *pongPayload = "<- J. Random Pong"; 0049 0050 class PongSender : public IMessageReceiver 0051 { 0052 public: 0053 void handleSpontaneousMessageReceived(Message ping, Connection *connection) override 0054 { 0055 if (ping.interface() != echoInterface) { 0056 // This is not the ping... it is probably still something from connection setup. 0057 // We can possibly receive many things here that we were not expecting. 0058 return; 0059 } 0060 { 0061 const Arguments args = ping.arguments(); 0062 Arguments::Reader reader(args); 0063 cstring payload = reader.readString(); 0064 TEST(!reader.error().isError()); 0065 TEST(reader.isFinished()); 0066 std::cout << "we have ping with payload: " << payload.ptr << std::endl; 0067 } 0068 0069 { 0070 Message pong = Message::createReplyTo(ping); 0071 Arguments::Writer writer; 0072 writer.writeString(pongPayload); 0073 pong.setArguments(writer.finish()); 0074 0075 std::cout << "\n\nSending pong!\n\n"; 0076 Error replyError = connection->sendNoReply(std::move(pong)); 0077 TEST(!replyError.isError()); 0078 0079 connection->eventDispatcher()->interrupt(); 0080 } 0081 } 0082 }; 0083 0084 static void pongThreadRun(Connection::CommRef mainConnectionRef, std::atomic<bool> *pongThreadReady) 0085 { 0086 std::cout << " Pong thread starting!\n"; 0087 EventDispatcher eventDispatcher; 0088 Connection conn(&eventDispatcher, std::move(mainConnectionRef)); 0089 0090 PongSender pongSender; 0091 conn.setSpontaneousMessageReceiver(&pongSender); 0092 0093 while (eventDispatcher.poll()) { 0094 std::cout << " Pong thread waking up!\n"; 0095 if (conn.uniqueName().length()) { 0096 pongThreadReady->store(true); 0097 // HACK: we do this only to wake up the main thread's event loop 0098 std::cout << "\n\nSending WAKEUP package!!\n\n"; 0099 Message wakey = Message::createCall(echoPath, "org.notexample.foo", echoMethod); 0100 wakey.setDestination(conn.uniqueName()); 0101 conn.sendNoReply(std::move(wakey)); 0102 } else { 0103 std::cout << " Pong thread: NO NAME YET!\n"; 0104 } 0105 // receive ping message 0106 // send pong message 0107 } 0108 std::cout << " Pong thread almost finished!\n"; 0109 } 0110 0111 class PongReceiver : public IMessageReceiver 0112 { 0113 public: 0114 void handlePendingReplyFinished(PendingReply *pongReply, Connection *) override 0115 { 0116 TEST(!pongReply->error().isError()); 0117 Message pong = pongReply->takeReply(); 0118 0119 Arguments args = pong.arguments(); 0120 Arguments::Reader reader(args); 0121 std::string strPayload = toStdString(reader.readString()); 0122 TEST(!reader.error().isError()); 0123 TEST(reader.isFinished()); 0124 TEST(strPayload == pongPayload); 0125 } 0126 }; 0127 0128 static void testPingPong() 0129 { 0130 EventDispatcher eventDispatcher; 0131 Connection conn(&eventDispatcher, ConnectAddress::StandardBus::Session); 0132 0133 std::atomic<bool> pongThreadReady(false); 0134 std::thread pongThread(pongThreadRun, conn.createCommRef(), &pongThreadReady); 0135 0136 // finish creating the connection 0137 while (conn.uniqueName().empty()) { 0138 std::cout << "."; 0139 eventDispatcher.poll(); 0140 } 0141 0142 std::cout << "we have connection! " << conn.uniqueName() << "\n"; 0143 0144 // send ping message to other thread 0145 Message ping = Message::createCall(echoPath, echoInterface, echoMethod); 0146 Arguments::Writer writer; 0147 writer.writeString(pingPayload); 0148 ping.setArguments(writer.finish()); 0149 ping.setDestination(conn.uniqueName()); 0150 0151 PongReceiver pongReceiver; 0152 PendingReply pongReply; 0153 0154 bool sentPing = false; 0155 while (!sentPing || !pongReply.isFinished()) { 0156 eventDispatcher.poll(); 0157 if (pongThreadReady.load() && !sentPing) { 0158 std::cout << "\n\nSending ping!!\n\n"; 0159 pongReply = conn.send(std::move(ping)); 0160 pongReply.setReceiver(&pongReceiver); 0161 sentPing = true; 0162 } 0163 } 0164 TEST(pongReply.hasNonErrorReply()); 0165 0166 std::cout << "we have pong!\n"; 0167 0168 pongThread.join(); 0169 } 0170 0171 //////////////// Multi-threaded timeout test //////////////// 0172 0173 class TimeoutReceiver : public IMessageReceiver 0174 { 0175 public: 0176 void handlePendingReplyFinished(PendingReply *reply, Connection *) override 0177 { 0178 TEST(reply->isFinished()); 0179 TEST(!reply->hasNonErrorReply()); 0180 TEST(reply->error().code() == Error::Timeout); 0181 std::cout << "We HAVE timed out.\n"; 0182 } 0183 }; 0184 0185 static void timeoutThreadRun(Connection::CommRef mainConnectionRef, std::atomic<bool> *done) 0186 { 0187 // TODO v turn this into proper documentation in Connection 0188 // Open a Connection "slaved" to the other Connection - it runs its own event loop in this thread 0189 // and has message I/O handled by the Connection in the "master" thread through message passing. 0190 // The main purpose of that is to use just one DBus connection per application( module), which is often 0191 // more convenient for client programmers and brings some limited ordering guarantees. 0192 std::cout << " Other thread starting!\n"; 0193 EventDispatcher eventDispatcher; 0194 Connection conn(&eventDispatcher, std::move(mainConnectionRef)); 0195 while (!conn.uniqueName().length()) { 0196 eventDispatcher.poll(); 0197 } 0198 0199 Message notRepliedTo = Message::createCall(echoPath, echoInterface, echoMethod); 0200 notRepliedTo.setDestination(conn.uniqueName()); 0201 0202 PendingReply deadReply = conn.send(std::move(notRepliedTo), 50); 0203 TimeoutReceiver timeoutReceiver; 0204 deadReply.setReceiver(&timeoutReceiver); 0205 0206 while (!deadReply.isFinished()) { 0207 eventDispatcher.poll(); 0208 } 0209 *done = true; 0210 } 0211 0212 static void testThreadedTimeout() 0213 { 0214 EventDispatcher eventDispatcher; 0215 Connection conn(&eventDispatcher, ConnectAddress::StandardBus::Session); 0216 0217 std::atomic<bool> done(false); 0218 std::thread timeoutThread(timeoutThreadRun, conn.createCommRef(), &done); 0219 0220 while (!done) { 0221 eventDispatcher.poll(); 0222 } 0223 0224 timeoutThread.join(); 0225 } 0226 0227 0228 // more things to test: 0229 // - (do we want to do this, and if so here??) blocking on a reply through other thread's connection 0230 // - ping-pong with several messages queued - every message should arrive exactly once and messages 0231 // should arrive in sending order (can use serials for that as simplificitaion) 0232 0233 int main(int, char *[]) 0234 { 0235 testPingPong(); 0236 testThreadedTimeout(); 0237 std::cout << "Passed!\n"; 0238 }