-
Notifications
You must be signed in to change notification settings - Fork 2
feat(act_executor): Added strategies for incoming event #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| package com.velmie.actexecutor.executor | ||
|
|
||
| import kotlinx.coroutines.CoroutineDispatcher | ||
| import kotlinx.coroutines.Dispatchers | ||
| import kotlinx.coroutines.IO_PARALLELISM_PROPERTY_NAME | ||
|
|
||
| /** | ||
| * DEFAULT - The default [CoroutineDispatcher] that is used by all standard builders like | ||
| * [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc | ||
| * if no dispatcher nor any other [ContinuationInterceptor] is specified in their context. | ||
| * | ||
| * It is backed by a shared pool of threads on JVM. By default, the maximal level of parallelism used | ||
| * by this dispatcher is equal to the number of CPU cores, but is at least two. | ||
| * Level of parallelism X guarantees that no more than X tasks can be executed in this dispatcher in parallel. | ||
| * | ||
| * | ||
| * MAIN - A coroutine dispatcher that is confined to the Main thread operating with UI objects. | ||
| * This dispatcher can be used either directly or via [MainScope] factory. | ||
| * Usually such dispatcher is single-threaded. | ||
| * | ||
| * Access to this property may throw [IllegalStateException] if no main thread dispatchers are present in the classpath. | ||
| * | ||
| * Depending on platform and classpath it can be mapped to different dispatchers: | ||
| * - On JS and Native it is equivalent of [Default] dispatcher. | ||
| * - On JVM it is either Android main thread dispatcher, JavaFx or Swing EDT dispatcher. It is chosen by | ||
| * [`ServiceLoader`](https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html). | ||
| * | ||
| * In order to work with `Main` dispatcher, the following artifacts should be added to project runtime dependencies: | ||
| * - `kotlinx-coroutines-android` for Android Main thread dispatcher | ||
| * - `kotlinx-coroutines-javafx` for JavaFx Application thread dispatcher | ||
| * - `kotlinx-coroutines-swing` for Swing EDT dispatcher | ||
| * | ||
| * In order to set a custom `Main` dispatcher for testing purposes, add the `kotlinx-coroutines-test` artifact to | ||
| * project test dependencies. | ||
| * | ||
| * Implementation note: [MainCoroutineDispatcher.immediate] is not supported on Native and JS platforms. | ||
| * | ||
| * | ||
| * UNCONFIRMED - A coroutine dispatcher that is not confined to any specific thread. | ||
| * It executes initial continuation of the coroutine in the current call-frame | ||
| * and lets the coroutine resume in whatever thread that is used by the corresponding suspending function, without | ||
| * mandating any specific threading policy. Nested coroutines launched in this dispatcher form an event-loop to avoid | ||
| * stack overflows. | ||
| * | ||
| * ### Event loop | ||
| * Event loop semantics is a purely internal concept and have no guarantees on the order of execution | ||
| * except that all queued coroutines will be executed on the current thread in the lexical scope of the outermost | ||
| * unconfined coroutine. | ||
| * | ||
| * For example, the following code: | ||
| * ``` | ||
| * withContext(Dispatchers.Unconfined) { | ||
| * println(1) | ||
| * withContext(Dispatchers.Unconfined) { // Nested unconfined | ||
| * println(2) | ||
| * } | ||
| * println(3) | ||
| * } | ||
| * println("Done") | ||
| * ``` | ||
| * Can print both "1 2 3" and "1 3 2", this is an implementation detail that can be changed. | ||
| * But it is guaranteed that "Done" will be printed only when both `withContext` are completed. | ||
| * | ||
| * | ||
| * Note that if you need your coroutine to be confined to a particular thread or a thread-pool after resumption, | ||
| * but still want to execute it in the current call-frame until its first suspension, then you can use | ||
| * an optional [CoroutineStart] parameter in coroutine builders like | ||
| * [launch][CoroutineScope.launch] and [async][CoroutineScope.async] setting it to the | ||
| * the value of [CoroutineStart.UNDISPATCHED]. | ||
| * | ||
| * | ||
| * IO - The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads. | ||
| * | ||
| * Additional threads in this pool are created and are shutdown on demand. | ||
| * The number of threads used by this dispatcher is limited by the value of | ||
| * "`kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property. | ||
| * It defaults to the limit of 64 threads or the number of cores (whichever is larger). | ||
| * | ||
| * This dispatcher shares threads with a [Default][Dispatchers.Default] dispatcher, so using | ||
| * `withContext(Dispatchers.IO) { ... }` does not lead to an actual switching to another thread — | ||
| * typically execution continues in the same thread. | ||
| */ | ||
|
|
||
| enum class ActDispatchers(val dispatcher: CoroutineDispatcher) { | ||
| DEFAULT(Dispatchers.Default), | ||
| MAIN(Dispatchers.Main), | ||
| UNCONFIRMED(Dispatchers.Unconfined), | ||
| IO(Dispatchers.IO) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,71 +3,119 @@ package com.velmie.actexecutor.executor | |
| import androidx.lifecycle.Observer | ||
| import com.velmie.actexecutor.BuildConfig | ||
| import com.velmie.actexecutor.act.Act | ||
| import com.velmie.actexecutor.act.Id | ||
| import com.velmie.actexecutor.act.SimpleAct | ||
| import com.velmie.actexecutor.act.DelayAct | ||
| import com.velmie.actexecutor.act.LiveDataAct | ||
| import com.velmie.actexecutor.act.SimpleAct | ||
| import com.velmie.actexecutor.store.ActMap | ||
| import com.velmie.networkutils.core.Resource | ||
| import com.velmie.networkutils.core.Status | ||
| import kotlinx.coroutines.CoroutineScope | ||
| import kotlinx.coroutines.Dispatchers | ||
| import kotlinx.coroutines.Job | ||
| import kotlinx.coroutines.delay | ||
| import kotlinx.coroutines.launch | ||
| import timber.log.Timber | ||
| import kotlin.system.measureTimeMillis | ||
|
|
||
| class ActExecutor(private val actMap: ActMap) : ActExecutorInterface { | ||
| class ActExecutor( | ||
| private val actMap: ActMap, | ||
| private val strategyType: ActStrategy = ActStrategy.DEFAULT, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. стратегия должна быть на act, а не на весь инструмент, иначе теряется ее смысл и можно дергать обычный queue |
||
| actDispatcher: ActDispatchers = ActDispatchers.DEFAULT | ||
| ) : ActExecutorInterface { | ||
|
|
||
| private val scope: CoroutineScope = CoroutineScope(actDispatcher.dispatcher) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. зачем нам глобальная корутина для инструмента? |
||
| private var job: Job? = null | ||
|
|
||
| init { | ||
| if (BuildConfig.DEBUG) { | ||
| Timber.plant(Timber.DebugTree()) | ||
| } | ||
| } | ||
|
|
||
| private val scope = CoroutineScope(Dispatchers.Default) | ||
|
|
||
| @Synchronized | ||
| override fun execute(act: Act) { | ||
| return when { | ||
| actMap.contains(act.id) -> { | ||
| Timber.d("id: ${act.id} - Act duplicate") | ||
| } | ||
| else -> startExecution(act) | ||
| return when (strategyType) { | ||
| ActStrategy.DEFAULT -> startAct(act) | ||
| ActStrategy.REPLACE -> startReplaceAct(act) | ||
| ActStrategy.QUEUE -> startQueueAct(act = act) | ||
| } | ||
| } | ||
|
|
||
| private fun startExecution(act: Act) { | ||
| actMap.add(act.id, act) | ||
| val removeFromMap = { actMap.remove(act.id) } | ||
| when (act) { | ||
| is SimpleAct -> { | ||
| act.actFunction() | ||
| removeFromMap() | ||
| } | ||
| is DelayAct -> { | ||
| val invokeTime = measureTimeMillis { act.actFunction() } | ||
| scope.launch { | ||
| delay(act.delay - invokeTime) | ||
| removeFromMap() | ||
| } | ||
| private fun startAct(act: Act) { | ||
| when { | ||
| actMap.contains(act.id) -> Timber.d("id: ${act.id} - Act duplicate") | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. много дублирования
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| actMap.size() == 1 -> Timber.d("Only one event can be processed at the same time") | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. это в орне не верно, у тебя может быть сколько угодно событий в map |
||
| else -> executeAct(act.id, act) | ||
| } | ||
| } | ||
|
|
||
| private fun startReplaceAct(act: Act) { | ||
| when { | ||
| actMap.contains(act.id) -> actMap.replace(act.id, act) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ты подменяешь act и на этом все. никакой логики по replace нет |
||
| actMap.size() == 1 -> Timber.d("Only one event can be processed at the same time") | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. это в орне не верно, у тебя может быть сколько угодно событий в map |
||
| else -> executeAct(act.id, act) | ||
| } | ||
| } | ||
|
|
||
| private fun startQueueAct(id: Id = (Math.random()).toString(), act: Act) { | ||
| executeAct(id, act) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. по этой логике у тебя и так бы работал старый код |
||
| } | ||
|
|
||
| private fun executeAct(id: Id, act: Act) { | ||
| try { | ||
| if (!actMap.contains(id)) { | ||
| actMap.add(id, act) | ||
| } | ||
| is LiveDataAct<*> -> { | ||
| act.liveData.observe(act.lifecycleOwner, Observer { | ||
| when (it) { | ||
| is Resource<*> -> { | ||
| act.afterAct(it) | ||
| if (it.status == Status.ERROR || it.status == Status.SUCCESS) { | ||
| removeFromMap() | ||
| } | ||
| } | ||
| else -> { | ||
|
|
||
| val removeFromMap = { actMap.remove(id) } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. можешь объяснить это действие? я не что-то не совсем понимаю |
||
| if ((job == null) || (job!!.isCancelled) || (job!!.isCompleted)) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. зачем нам воообще тут корутины?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. для реализации задержек для DelayAct |
||
| when (act) { | ||
| is SimpleAct -> { | ||
| act.actFunction() | ||
| removeFromMap() | ||
| executeNextActIfExist() | ||
| } | ||
| is DelayAct -> { | ||
| val invokeTime = measureTimeMillis { act.actFunction() } | ||
| job = scope.launch { | ||
| delay(act.delay - invokeTime) | ||
| removeFromMap() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. job?.cancel() не нужен, у тебя корутина закончит свое выполнение как только будет вызван removeFromMap() |
||
| throw IllegalArgumentException("Type T in LiveData<T> unregistered") | ||
| job?.cancel() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. а вот для replace - уже будет нужен |
||
| executeNextActIfExist() | ||
| } | ||
| } | ||
| }) | ||
| is LiveDataAct<*> -> { | ||
| act.liveData.observe(act.lifecycleOwner, Observer { | ||
| when (it) { | ||
| is Resource<*> -> { | ||
| act.afterAct(it) | ||
| if (it.status == Status.ERROR || it.status == Status.SUCCESS) { | ||
| removeFromMap() | ||
| executeNextActIfExist() | ||
| } | ||
| } | ||
| else -> { | ||
| removeFromMap() | ||
| throw IllegalArgumentException("Type T in LiveData<T> unregistered") | ||
| } | ||
| } | ||
| }) | ||
| } | ||
| else -> throw IllegalArgumentException("Type Act unregistered") | ||
| } | ||
| } | ||
| else -> throw IllegalArgumentException("Type Act unregistered") | ||
| } catch (exception: IllegalArgumentException) { | ||
| throw exception | ||
| } catch (exception: java.lang.RuntimeException) { | ||
| throw RuntimeException("You cannot perform UI tasks outside the UI thread. For perform UI tasks switch to dispatcher.MAIN in the constructor ActExecutor") | ||
| } | ||
| } | ||
|
|
||
| private fun executeNextActIfExist() { | ||
| if (actMap.isNotEmpty()) { | ||
| val id = actMap.getFirstKey() | ||
| val value = actMap.getFirstValue() | ||
| startQueueAct(id, value) | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| package com.velmie.actexecutor.executor | ||
|
|
||
| /** | ||
| * Strategies that can be used for incoming events. | ||
| * | ||
| * DEFAULT - Only one event can be stored at the same time. | ||
| * Save and execute only one event and ignore all new events. | ||
| * After event is executed, it becomes possible to add new event. | ||
| * | ||
| * REPLACE - Only one event can be stored at the same time. Replace already added event and ignore all new events. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ты тут не игнорируешь ничего |
||
| * | ||
| * QUEUE - Several events can be stored at the same time. Add already added and new events to queue. | ||
| */ | ||
|
|
||
| enum class ActStrategy() { | ||
| DEFAULT, REPLACE, QUEUE | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. давай пока откажемся от QUEUE - так будет проще. потом следующей задачей его внесем |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,4 +8,9 @@ interface ActMap { | |
| fun add(id: Id, act: Act) | ||
| fun remove(id: Id) | ||
| fun removeAll() | ||
| fun replace(id: Id, act: Act) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. сможешь пояснить? не понимаю зачем нам .
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. реализация стратегии очередь подразумевает выполнение задач с одинаковыми id, для этих целей не подойдут id из объекта Act, т.к могут повторяться и перезаписываться. Поэтому при реализации этой стратегии используются рандомные уникальные id |
||
| fun getFirstKey(): Id | ||
| fun getFirstValue(): Act | ||
| fun isNotEmpty(): Boolean | ||
| fun size(): Int | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,11 +2,12 @@ package com.velmie.actexecutor.store | |
|
|
||
| import com.velmie.actexecutor.act.Act | ||
| import com.velmie.actexecutor.act.Id | ||
| import java.util.concurrent.ConcurrentHashMap | ||
| import java.util.Collections | ||
| import kotlin.collections.LinkedHashMap | ||
|
|
||
| class ConcurrentActMap : ActMap { | ||
|
|
||
| private val map = ConcurrentHashMap<Id, Act>() | ||
| private val map: MutableMap<Id, Act> = Collections.synchronizedMap(LinkedHashMap<Id, Act>()) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. какие видишь преимущества у предложенной коллекции для нашей реализации?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. для реализации стратегии очереди нужно чтобы задачи сортировались в порядке добавления, а это реализует коллекция LinkedHashMap, для потокобезопасности её пришлось синхрошизированной сделать |
||
|
|
||
| override fun contains(id: Id): Boolean { | ||
| return map.containsKey(id) | ||
|
|
@@ -25,4 +26,24 @@ class ConcurrentActMap : ActMap { | |
| override fun removeAll() { | ||
| map.clear() | ||
| } | ||
|
|
||
| override fun replace(id: Id, act: Act) { | ||
| map[id] = act | ||
| } | ||
|
|
||
| override fun getFirstKey(): Id { | ||
| return map.keys.first() | ||
| } | ||
|
|
||
| override fun getFirstValue(): Act { | ||
| return map.values.first() | ||
| } | ||
|
|
||
| override fun isNotEmpty(): Boolean { | ||
| return map.isNotEmpty() | ||
| } | ||
|
|
||
| override fun size(): Int { | ||
| return map.size | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
зачем нам Dispatchers? что это нам даст?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Для переключения диспатчеров при создании объекта ActExecutor для разного типа задач. Если задача выполняются в ui потоке, то нужно использовать main dispatcher(тестил на примере тостов при использовании стратегии очереди и после выполнения второй задачи в рантайме ошибка доступа к ui потоку вылетала).
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Поразбираюсь ещё в чем может быть дело, ведь сама задача(Act) вне кода корутины выполняется