File indexing completed on 2025-02-02 04:47:48
0001 /* 0002 * SPDX-FileCopyrightText: 2018 Erik Duisters <e.duisters1@gmail.com> 0003 * 0004 * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL 0005 */ 0006 0007 package org.kde.kdeconnect.async; 0008 0009 import android.os.Handler; 0010 import android.os.Looper; 0011 import android.util.Log; 0012 0013 import androidx.annotation.Nullable; 0014 0015 import java.util.HashMap; 0016 import java.util.Map; 0017 import java.util.concurrent.BlockingQueue; 0018 import java.util.concurrent.CancellationException; 0019 import java.util.concurrent.ExecutionException; 0020 import java.util.concurrent.Future; 0021 import java.util.concurrent.LinkedBlockingQueue; 0022 import java.util.concurrent.RejectedExecutionException; 0023 import java.util.concurrent.ThreadPoolExecutor; 0024 import java.util.concurrent.TimeUnit; 0025 0026 /** 0027 * Scheduler for {@link BackgroundJob} objects. 0028 * <p> 0029 * We use an internal {@link ThreadPoolExecutor} to catch Exceptions and 0030 * pass them along to {@link #handleUncaughtException(Future, Throwable)}. 0031 * </p> 0032 */ 0033 public class BackgroundJobHandler { 0034 private static final String TAG = BackgroundJobHandler.class.getSimpleName(); 0035 0036 private final Map<BackgroundJob, Future<?>> jobMap = new HashMap<>(); 0037 private final Object jobMapLock = new Object(); 0038 0039 private class MyThreadPoolExecutor extends ThreadPoolExecutor { 0040 MyThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 0041 super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue); 0042 } 0043 0044 @Override 0045 protected void afterExecute(Runnable r, Throwable t) { 0046 super.afterExecute(r, t); 0047 0048 if (!(r instanceof Future)) { 0049 return; 0050 } 0051 0052 Future<?> future = (Future<?>) r; 0053 0054 if (t == null) { 0055 try { 0056 future.get(); 0057 } catch (CancellationException ce) { 0058 Log.d(TAG,"afterExecute got a CancellationException"); 0059 } catch (ExecutionException ee) { 0060 t = ee; 0061 } catch (InterruptedException ie) { 0062 Log.d(TAG, "afterExecute got an InterruptedException"); 0063 Thread.currentThread().interrupt(); // ignore/reset 0064 } 0065 } 0066 0067 if (t != null) { 0068 BackgroundJobHandler.this.handleUncaughtException(future, t); 0069 } 0070 } 0071 } 0072 0073 private final ThreadPoolExecutor threadPoolExecutor; 0074 private final Handler handler; 0075 0076 private BackgroundJobHandler(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 0077 this.handler = new Handler(Looper.getMainLooper()); 0078 this.threadPoolExecutor = new MyThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue); 0079 } 0080 0081 public void runJob(BackgroundJob bgJob) { 0082 Future<?> f; 0083 0084 bgJob.setBackgroundJobHandler(this); 0085 0086 try { 0087 synchronized (jobMapLock) { 0088 f = threadPoolExecutor.submit(bgJob); 0089 jobMap.put(bgJob, f); 0090 } 0091 } catch (RejectedExecutionException e) { 0092 Log.d(TAG,"threadPoolExecutor.submit rejected a background job: " + e.getMessage()); 0093 0094 bgJob.reportError(e); 0095 } 0096 } 0097 0098 public boolean isRunning(long jobId) { 0099 synchronized (jobMapLock) { 0100 for (BackgroundJob job : jobMap.keySet()) { 0101 if (job.getId() == jobId) { 0102 return true; 0103 } 0104 } 0105 } 0106 0107 return false; 0108 } 0109 0110 @Nullable 0111 public BackgroundJob getJob(long jobId) { 0112 synchronized (jobMapLock) { 0113 for (BackgroundJob job : jobMap.keySet()) { 0114 if (job.getId() == jobId) { 0115 return job; 0116 } 0117 } 0118 } 0119 0120 return null; 0121 } 0122 0123 void cancelJob(BackgroundJob job) { 0124 synchronized (jobMapLock) { 0125 if (jobMap.containsKey(job)) { 0126 Future<?> f = jobMap.get(job); 0127 0128 if (f.cancel(true)) { 0129 threadPoolExecutor.purge(); 0130 } 0131 0132 jobMap.remove(job); 0133 } 0134 } 0135 } 0136 0137 private void handleUncaughtException(Future<?> ft, Throwable t) { 0138 synchronized (jobMapLock) { 0139 for (Map.Entry<BackgroundJob, Future<?>> pairs : jobMap.entrySet()) { 0140 Future<?> future = pairs.getValue(); 0141 0142 if (future == ft) { 0143 pairs.getKey().reportError(t); 0144 break; 0145 } 0146 } 0147 } 0148 } 0149 0150 void onFinished(BackgroundJob job) { 0151 synchronized (jobMapLock) { 0152 jobMap.remove(job); 0153 } 0154 } 0155 0156 void runOnUiThread(Runnable runnable) { 0157 handler.post(runnable); 0158 } 0159 0160 public static BackgroundJobHandler newFixedThreadPoolBackgroundJobHander(int numThreads) { 0161 return new BackgroundJobHandler(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); 0162 } 0163 }