File indexing completed on 2024-05-19 05:37:25
0001 /* 0002 * SPDX-FileCopyrightText: 2016 Ivan Cukic <ivan.cukic(at)kde.org> 0003 * 0004 * SPDX-License-Identifier: LGPL-2.1-only OR LGPL-3.0-only OR LicenseRef-KDE-Accepted-LGPL 0005 */ 0006 0007 // 0008 // W A R N I N G 0009 // ------------- 0010 // 0011 // This file is not part of the AsynQt API. It exists purely as an 0012 // implementation detail. This header file may change from version to 0013 // version without notice, or even be removed. 0014 // 0015 // We mean it. 0016 // 0017 0018 #include <type_traits> 0019 0020 #include "../utils_p.h" 0021 0022 namespace AsynQt 0023 { 0024 namespace detail 0025 { 0026 template<typename _Result> 0027 class FlattenFutureInterface : public QObject, public QFutureInterface<_Result> 0028 { 0029 public: 0030 FlattenFutureInterface(QFuture<QFuture<_Result>> future) 0031 : m_outerFuture(future) 0032 { 0033 } 0034 0035 inline void setFutureResultAt(int, std::true_type /* _Result is void */) 0036 { 0037 // nothing to do 0038 } 0039 0040 inline void setFutureResultAt(int index, std::false_type /* _Result is not void */) 0041 { 0042 this->reportResult(m_currentInnerFuture.resultAt(index)); 0043 } 0044 0045 void processNextInnerFuture() 0046 { 0047 // Already processing something 0048 if (m_innerFutureWatcher) 0049 return; 0050 0051 m_innerFutureWatcher.reset(new QFutureWatcher<_Result>()); 0052 m_currentInnerFuture = m_innerFutures.head(); 0053 0054 onFinished(m_innerFutureWatcher, [this]() { 0055 dequeueInnerFuture(); 0056 }); 0057 0058 onCanceled(m_innerFutureWatcher, [this]() { 0059 this->reportCanceled(); 0060 }); 0061 0062 onResultReadyAt(m_innerFutureWatcher, [this](int index) { 0063 setFutureResultAt(index, typename std::is_void<_Result>::type()); 0064 }); 0065 0066 m_innerFutureWatcher->setFuture(m_currentInnerFuture); 0067 0068 this->reportStarted(); 0069 } 0070 0071 void dequeueInnerFuture() 0072 { 0073 m_innerFutureWatcher.reset(); 0074 m_innerFutures.dequeue(); 0075 0076 if (m_innerFutures.size() == 0) { 0077 if (m_outerFuture.isCanceled()) { 0078 this->reportCanceled(); 0079 } 0080 0081 if (m_outerFuture.isFinished()) { 0082 this->reportFinished(); 0083 } 0084 0085 } else { 0086 processNextInnerFuture(); 0087 } 0088 } 0089 0090 QFuture<_Result> start() 0091 { 0092 m_outerFutureWatcher.reset(new QFutureWatcher<QFuture<_Result>>()); 0093 0094 onFinished(m_outerFutureWatcher, [this]() { 0095 if (m_innerFutures.isEmpty()) { 0096 this->reportFinished(); 0097 } 0098 }); 0099 0100 onCanceled(m_outerFutureWatcher, [this]() { 0101 if (m_innerFutures.isEmpty()) { 0102 this->reportCanceled(); 0103 } 0104 }); 0105 0106 onResultReadyAt(m_outerFutureWatcher, [this](int index) { 0107 m_innerFutures.enqueue(m_outerFuture.resultAt(index)); 0108 processNextInnerFuture(); 0109 }); 0110 0111 m_outerFutureWatcher->setFuture(m_outerFuture); 0112 0113 this->reportStarted(); 0114 0115 return this->future(); 0116 } 0117 0118 private: 0119 QFuture<QFuture<_Result>> m_outerFuture; 0120 std::unique_ptr<QFutureWatcher<QFuture<_Result>>> m_outerFutureWatcher; 0121 0122 QFuture<_Result> m_currentInnerFuture; 0123 QQueue<QFuture<_Result>> m_innerFutures; 0124 std::unique_ptr<QFutureWatcher<_Result>> m_innerFutureWatcher; 0125 }; 0126 0127 template<typename _Result> 0128 QFuture<_Result> flatten_impl(const QFuture<QFuture<_Result>> &future) 0129 { 0130 return (new FlattenFutureInterface<_Result>(future))->start(); 0131 } 0132 0133 namespace operators 0134 { 0135 struct FlattenModifier { 0136 }; 0137 0138 template<typename _Result> 0139 QFuture<_Result> operator|(const QFuture<QFuture<_Result>> &future, FlattenModifier) 0140 { 0141 return flatten_impl(future); 0142 } 0143 0144 } // namespace operators 0145 0146 } // namespace detail 0147 } // namespace AsynQt