File indexing completed on 2025-02-02 04:47:48

0001 /*
0002  * SPDX-FileCopyrightText: 2018 Erik Duisters <e.duisters1@gmail.com>
0003  *
0004  * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
0005  */
0006 
0007 package org.kde.kdeconnect.async;
0008 
0009 import android.os.Handler;
0010 import android.os.Looper;
0011 import android.util.Log;
0012 
0013 import androidx.annotation.Nullable;
0014 
0015 import java.util.HashMap;
0016 import java.util.Map;
0017 import java.util.concurrent.BlockingQueue;
0018 import java.util.concurrent.CancellationException;
0019 import java.util.concurrent.ExecutionException;
0020 import java.util.concurrent.Future;
0021 import java.util.concurrent.LinkedBlockingQueue;
0022 import java.util.concurrent.RejectedExecutionException;
0023 import java.util.concurrent.ThreadPoolExecutor;
0024 import java.util.concurrent.TimeUnit;
0025 
0026 /**
0027  * Scheduler for {@link BackgroundJob} objects.
0028  * <p>
0029  *     We use an internal {@link ThreadPoolExecutor} to catch Exceptions and
0030  *     pass them along to {@link #handleUncaughtException(Future, Throwable)}.
0031  * </p>
0032  */
0033 public class BackgroundJobHandler {
0034     private static final String TAG = BackgroundJobHandler.class.getSimpleName();
0035 
0036     private final Map<BackgroundJob, Future<?>> jobMap = new HashMap<>();
0037     private final Object jobMapLock = new Object();
0038 
0039     private class MyThreadPoolExecutor extends ThreadPoolExecutor {
0040         MyThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
0041             super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
0042         }
0043 
0044         @Override
0045         protected void afterExecute(Runnable r, Throwable t) {
0046             super.afterExecute(r, t);
0047 
0048             if (!(r instanceof Future)) {
0049                 return;
0050             }
0051 
0052             Future<?> future = (Future<?>) r;
0053 
0054             if (t == null) {
0055                 try {
0056                     future.get();
0057                 } catch (CancellationException ce) {
0058                     Log.d(TAG,"afterExecute got a CancellationException");
0059                 } catch (ExecutionException ee) {
0060                     t = ee;
0061                 } catch (InterruptedException ie) {
0062                     Log.d(TAG, "afterExecute got an InterruptedException");
0063                     Thread.currentThread().interrupt();    // ignore/reset
0064                 }
0065             }
0066 
0067             if (t != null) {
0068                 BackgroundJobHandler.this.handleUncaughtException(future, t);
0069             }
0070         }
0071     }
0072 
0073     private final ThreadPoolExecutor threadPoolExecutor;
0074     private final Handler handler;
0075 
0076     private BackgroundJobHandler(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
0077         this.handler = new Handler(Looper.getMainLooper());
0078         this.threadPoolExecutor = new MyThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
0079     }
0080 
0081     public void runJob(BackgroundJob bgJob) {
0082         Future<?> f;
0083 
0084         bgJob.setBackgroundJobHandler(this);
0085 
0086         try {
0087             synchronized (jobMapLock) {
0088                 f = threadPoolExecutor.submit(bgJob);
0089                 jobMap.put(bgJob, f);
0090             }
0091         } catch (RejectedExecutionException e) {
0092             Log.d(TAG,"threadPoolExecutor.submit rejected a background job: " + e.getMessage());
0093 
0094             bgJob.reportError(e);
0095         }
0096     }
0097 
0098     public boolean isRunning(long jobId) {
0099         synchronized (jobMapLock) {
0100             for (BackgroundJob job : jobMap.keySet()) {
0101                 if (job.getId() == jobId) {
0102                     return true;
0103                 }
0104             }
0105         }
0106 
0107         return false;
0108     }
0109 
0110     @Nullable
0111     public BackgroundJob getJob(long jobId) {
0112         synchronized (jobMapLock) {
0113             for (BackgroundJob job : jobMap.keySet()) {
0114                 if (job.getId() == jobId) {
0115                     return job;
0116                 }
0117             }
0118         }
0119 
0120         return null;
0121     }
0122 
0123     void cancelJob(BackgroundJob job) {
0124         synchronized (jobMapLock) {
0125             if (jobMap.containsKey(job)) {
0126                 Future<?> f = jobMap.get(job);
0127 
0128                 if (f.cancel(true)) {
0129                     threadPoolExecutor.purge();
0130                 }
0131 
0132                 jobMap.remove(job);
0133             }
0134         }
0135     }
0136 
0137     private void handleUncaughtException(Future<?> ft, Throwable t) {
0138         synchronized (jobMapLock) {
0139             for (Map.Entry<BackgroundJob, Future<?>> pairs : jobMap.entrySet()) {
0140                 Future<?> future = pairs.getValue();
0141 
0142                 if (future == ft) {
0143                     pairs.getKey().reportError(t);
0144                     break;
0145                 }
0146             }
0147         }
0148     }
0149 
0150     void onFinished(BackgroundJob job) {
0151         synchronized (jobMapLock) {
0152             jobMap.remove(job);
0153         }
0154     }
0155 
0156     void runOnUiThread(Runnable runnable) {
0157         handler.post(runnable);
0158     }
0159 
0160     public static BackgroundJobHandler newFixedThreadPoolBackgroundJobHander(int numThreads) {
0161         return new BackgroundJobHandler(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
0162     }
0163 }