From 971e3362e1682d7b4ccddb0771c0038f627955ca Mon Sep 17 00:00:00 2001 From: dmitrijmihajluk Date: Fri, 21 Feb 2020 07:05:00 +0300 Subject: [PATCH 1/2] build 0.1.0 --- actexecutor/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/actexecutor/build.gradle b/actexecutor/build.gradle index c239a43..58f5950 100644 --- a/actexecutor/build.gradle +++ b/actexecutor/build.gradle @@ -8,8 +8,8 @@ android { defaultConfig { minSdkVersion 22 targetSdkVersion 29 - versionCode 1 - versionName "0.0.2" + versionCode 2 + versionName "0.0.3" testInstrumentationRunner "androidx.test.runner.AndroidJUnitRunner" consumerProguardFiles 'consumer-rules.pro' From 40760da947863fac332775741108d6eb1b1e46a1 Mon Sep 17 00:00:00 2001 From: Dmitry Kuzhelko Date: Mon, 24 Feb 2020 17:31:16 +0300 Subject: [PATCH 2/2] feat(act_executor): Added strategies for incoming event --- .../actexecutor/executor/ActDispatchers.kt | 89 +++++++++++++ .../actexecutor/executor/ActExecutor.kt | 122 ++++++++++++------ .../actexecutor/executor/ActStrategy.kt | 17 +++ .../com/velmie/actexecutor/store/ActMap.kt | 5 + .../actexecutor/store/ConcurrentActMap.kt | 25 +++- 5 files changed, 219 insertions(+), 39 deletions(-) create mode 100644 actexecutor/src/main/java/com/velmie/actexecutor/executor/ActDispatchers.kt create mode 100644 actexecutor/src/main/java/com/velmie/actexecutor/executor/ActStrategy.kt diff --git a/actexecutor/src/main/java/com/velmie/actexecutor/executor/ActDispatchers.kt b/actexecutor/src/main/java/com/velmie/actexecutor/executor/ActDispatchers.kt new file mode 100644 index 0000000..242b70e --- /dev/null +++ b/actexecutor/src/main/java/com/velmie/actexecutor/executor/ActDispatchers.kt @@ -0,0 +1,89 @@ +package com.velmie.actexecutor.executor + +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.IO_PARALLELISM_PROPERTY_NAME + +/** + * DEFAULT - The default [CoroutineDispatcher] that is used by all standard builders like + * [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc + * if no dispatcher nor any other [ContinuationInterceptor] is specified in their context. + * + * It is backed by a shared pool of threads on JVM. By default, the maximal level of parallelism used + * by this dispatcher is equal to the number of CPU cores, but is at least two. + * Level of parallelism X guarantees that no more than X tasks can be executed in this dispatcher in parallel. + * + * + * MAIN - A coroutine dispatcher that is confined to the Main thread operating with UI objects. + * This dispatcher can be used either directly or via [MainScope] factory. + * Usually such dispatcher is single-threaded. + * + * Access to this property may throw [IllegalStateException] if no main thread dispatchers are present in the classpath. + * + * Depending on platform and classpath it can be mapped to different dispatchers: + * - On JS and Native it is equivalent of [Default] dispatcher. + * - On JVM it is either Android main thread dispatcher, JavaFx or Swing EDT dispatcher. It is chosen by + * [`ServiceLoader`](https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html). + * + * In order to work with `Main` dispatcher, the following artifacts should be added to project runtime dependencies: + * - `kotlinx-coroutines-android` for Android Main thread dispatcher + * - `kotlinx-coroutines-javafx` for JavaFx Application thread dispatcher + * - `kotlinx-coroutines-swing` for Swing EDT dispatcher + * + * In order to set a custom `Main` dispatcher for testing purposes, add the `kotlinx-coroutines-test` artifact to + * project test dependencies. + * + * Implementation note: [MainCoroutineDispatcher.immediate] is not supported on Native and JS platforms. + * + * + * UNCONFIRMED - A coroutine dispatcher that is not confined to any specific thread. + * It executes initial continuation of the coroutine in the current call-frame + * and lets the coroutine resume in whatever thread that is used by the corresponding suspending function, without + * mandating any specific threading policy. Nested coroutines launched in this dispatcher form an event-loop to avoid + * stack overflows. + * + * ### Event loop + * Event loop semantics is a purely internal concept and have no guarantees on the order of execution + * except that all queued coroutines will be executed on the current thread in the lexical scope of the outermost + * unconfined coroutine. + * + * For example, the following code: + * ``` + * withContext(Dispatchers.Unconfined) { + * println(1) + * withContext(Dispatchers.Unconfined) { // Nested unconfined + * println(2) + * } + * println(3) + * } + * println("Done") + * ``` + * Can print both "1 2 3" and "1 3 2", this is an implementation detail that can be changed. + * But it is guaranteed that "Done" will be printed only when both `withContext` are completed. + * + * + * Note that if you need your coroutine to be confined to a particular thread or a thread-pool after resumption, + * but still want to execute it in the current call-frame until its first suspension, then you can use + * an optional [CoroutineStart] parameter in coroutine builders like + * [launch][CoroutineScope.launch] and [async][CoroutineScope.async] setting it to the + * the value of [CoroutineStart.UNDISPATCHED]. + * + * + * IO - The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads. + * + * Additional threads in this pool are created and are shutdown on demand. + * The number of threads used by this dispatcher is limited by the value of + * "`kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property. + * It defaults to the limit of 64 threads or the number of cores (whichever is larger). + * + * This dispatcher shares threads with a [Default][Dispatchers.Default] dispatcher, so using + * `withContext(Dispatchers.IO) { ... }` does not lead to an actual switching to another thread — + * typically execution continues in the same thread. + */ + +enum class ActDispatchers(val dispatcher: CoroutineDispatcher) { + DEFAULT(Dispatchers.Default), + MAIN(Dispatchers.Main), + UNCONFIRMED(Dispatchers.Unconfined), + IO(Dispatchers.IO) +} \ No newline at end of file diff --git a/actexecutor/src/main/java/com/velmie/actexecutor/executor/ActExecutor.kt b/actexecutor/src/main/java/com/velmie/actexecutor/executor/ActExecutor.kt index 140cbc5..2cfb873 100644 --- a/actexecutor/src/main/java/com/velmie/actexecutor/executor/ActExecutor.kt +++ b/actexecutor/src/main/java/com/velmie/actexecutor/executor/ActExecutor.kt @@ -3,20 +3,28 @@ package com.velmie.actexecutor.executor import androidx.lifecycle.Observer import com.velmie.actexecutor.BuildConfig import com.velmie.actexecutor.act.Act +import com.velmie.actexecutor.act.Id +import com.velmie.actexecutor.act.SimpleAct import com.velmie.actexecutor.act.DelayAct import com.velmie.actexecutor.act.LiveDataAct -import com.velmie.actexecutor.act.SimpleAct import com.velmie.actexecutor.store.ActMap import com.velmie.networkutils.core.Resource import com.velmie.networkutils.core.Status import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.launch import timber.log.Timber import kotlin.system.measureTimeMillis -class ActExecutor(private val actMap: ActMap) : ActExecutorInterface { +class ActExecutor( + private val actMap: ActMap, + private val strategyType: ActStrategy = ActStrategy.DEFAULT, + actDispatcher: ActDispatchers = ActDispatchers.DEFAULT +) : ActExecutorInterface { + + private val scope: CoroutineScope = CoroutineScope(actDispatcher.dispatcher) + private var job: Job? = null init { if (BuildConfig.DEBUG) { @@ -24,50 +32,90 @@ class ActExecutor(private val actMap: ActMap) : ActExecutorInterface { } } - private val scope = CoroutineScope(Dispatchers.Default) - @Synchronized override fun execute(act: Act) { - return when { - actMap.contains(act.id) -> { - Timber.d("id: ${act.id} - Act duplicate") - } - else -> startExecution(act) + return when (strategyType) { + ActStrategy.DEFAULT -> startAct(act) + ActStrategy.REPLACE -> startReplaceAct(act) + ActStrategy.QUEUE -> startQueueAct(act = act) } } - private fun startExecution(act: Act) { - actMap.add(act.id, act) - val removeFromMap = { actMap.remove(act.id) } - when (act) { - is SimpleAct -> { - act.actFunction() - removeFromMap() - } - is DelayAct -> { - val invokeTime = measureTimeMillis { act.actFunction() } - scope.launch { - delay(act.delay - invokeTime) - removeFromMap() - } + private fun startAct(act: Act) { + when { + actMap.contains(act.id) -> Timber.d("id: ${act.id} - Act duplicate") + actMap.size() == 1 -> Timber.d("Only one event can be processed at the same time") + else -> executeAct(act.id, act) + } + } + + private fun startReplaceAct(act: Act) { + when { + actMap.contains(act.id) -> actMap.replace(act.id, act) + actMap.size() == 1 -> Timber.d("Only one event can be processed at the same time") + else -> executeAct(act.id, act) + } + } + + private fun startQueueAct(id: Id = (Math.random()).toString(), act: Act) { + executeAct(id, act) + } + + private fun executeAct(id: Id, act: Act) { + try { + if (!actMap.contains(id)) { + actMap.add(id, act) } - is LiveDataAct<*> -> { - act.liveData.observe(act.lifecycleOwner, Observer { - when (it) { - is Resource<*> -> { - act.afterAct(it) - if (it.status == Status.ERROR || it.status == Status.SUCCESS) { - removeFromMap() - } - } - else -> { + + val removeFromMap = { actMap.remove(id) } + if ((job == null) || (job!!.isCancelled) || (job!!.isCompleted)) { + when (act) { + is SimpleAct -> { + act.actFunction() + removeFromMap() + executeNextActIfExist() + } + is DelayAct -> { + val invokeTime = measureTimeMillis { act.actFunction() } + job = scope.launch { + delay(act.delay - invokeTime) removeFromMap() - throw IllegalArgumentException("Type T in LiveData unregistered") + job?.cancel() + executeNextActIfExist() } } - }) + is LiveDataAct<*> -> { + act.liveData.observe(act.lifecycleOwner, Observer { + when (it) { + is Resource<*> -> { + act.afterAct(it) + if (it.status == Status.ERROR || it.status == Status.SUCCESS) { + removeFromMap() + executeNextActIfExist() + } + } + else -> { + removeFromMap() + throw IllegalArgumentException("Type T in LiveData unregistered") + } + } + }) + } + else -> throw IllegalArgumentException("Type Act unregistered") + } } - else -> throw IllegalArgumentException("Type Act unregistered") + } catch (exception: IllegalArgumentException) { + throw exception + } catch (exception: java.lang.RuntimeException) { + throw RuntimeException("You cannot perform UI tasks outside the UI thread. For perform UI tasks switch to dispatcher.MAIN in the constructor ActExecutor") + } + } + + private fun executeNextActIfExist() { + if (actMap.isNotEmpty()) { + val id = actMap.getFirstKey() + val value = actMap.getFirstValue() + startQueueAct(id, value) } } diff --git a/actexecutor/src/main/java/com/velmie/actexecutor/executor/ActStrategy.kt b/actexecutor/src/main/java/com/velmie/actexecutor/executor/ActStrategy.kt new file mode 100644 index 0000000..027e291 --- /dev/null +++ b/actexecutor/src/main/java/com/velmie/actexecutor/executor/ActStrategy.kt @@ -0,0 +1,17 @@ +package com.velmie.actexecutor.executor + +/** + * Strategies that can be used for incoming events. + * + * DEFAULT - Only one event can be stored at the same time. + * Save and execute only one event and ignore all new events. + * After event is executed, it becomes possible to add new event. + * + * REPLACE - Only one event can be stored at the same time. Replace already added event and ignore all new events. + * + * QUEUE - Several events can be stored at the same time. Add already added and new events to queue. + */ + +enum class ActStrategy() { + DEFAULT, REPLACE, QUEUE +} \ No newline at end of file diff --git a/actexecutor/src/main/java/com/velmie/actexecutor/store/ActMap.kt b/actexecutor/src/main/java/com/velmie/actexecutor/store/ActMap.kt index 7346df9..d0e0105 100644 --- a/actexecutor/src/main/java/com/velmie/actexecutor/store/ActMap.kt +++ b/actexecutor/src/main/java/com/velmie/actexecutor/store/ActMap.kt @@ -8,4 +8,9 @@ interface ActMap { fun add(id: Id, act: Act) fun remove(id: Id) fun removeAll() + fun replace(id: Id, act: Act) + fun getFirstKey(): Id + fun getFirstValue(): Act + fun isNotEmpty(): Boolean + fun size(): Int } diff --git a/actexecutor/src/main/java/com/velmie/actexecutor/store/ConcurrentActMap.kt b/actexecutor/src/main/java/com/velmie/actexecutor/store/ConcurrentActMap.kt index a125e06..fc11db4 100644 --- a/actexecutor/src/main/java/com/velmie/actexecutor/store/ConcurrentActMap.kt +++ b/actexecutor/src/main/java/com/velmie/actexecutor/store/ConcurrentActMap.kt @@ -2,11 +2,12 @@ package com.velmie.actexecutor.store import com.velmie.actexecutor.act.Act import com.velmie.actexecutor.act.Id -import java.util.concurrent.ConcurrentHashMap +import java.util.Collections +import kotlin.collections.LinkedHashMap class ConcurrentActMap : ActMap { - private val map = ConcurrentHashMap() + private val map: MutableMap = Collections.synchronizedMap(LinkedHashMap()) override fun contains(id: Id): Boolean { return map.containsKey(id) @@ -25,4 +26,24 @@ class ConcurrentActMap : ActMap { override fun removeAll() { map.clear() } + + override fun replace(id: Id, act: Act) { + map[id] = act + } + + override fun getFirstKey(): Id { + return map.keys.first() + } + + override fun getFirstValue(): Act { + return map.values.first() + } + + override fun isNotEmpty(): Boolean { + return map.isNotEmpty() + } + + override fun size(): Int { + return map.size + } }