Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions actexecutor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ android {
defaultConfig {
minSdkVersion 22
targetSdkVersion 29
versionCode 1
versionName "0.0.2"
versionCode 2
versionName "0.0.3"

testInstrumentationRunner "androidx.test.runner.AndroidJUnitRunner"
consumerProguardFiles 'consumer-rules.pro'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.velmie.actexecutor.executor

import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO_PARALLELISM_PROPERTY_NAME

/**
* DEFAULT - The default [CoroutineDispatcher] that is used by all standard builders like
* [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc
* if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
*
* It is backed by a shared pool of threads on JVM. By default, the maximal level of parallelism used
* by this dispatcher is equal to the number of CPU cores, but is at least two.
* Level of parallelism X guarantees that no more than X tasks can be executed in this dispatcher in parallel.
*
*
* MAIN - A coroutine dispatcher that is confined to the Main thread operating with UI objects.
* This dispatcher can be used either directly or via [MainScope] factory.
* Usually such dispatcher is single-threaded.
*
* Access to this property may throw [IllegalStateException] if no main thread dispatchers are present in the classpath.
*
* Depending on platform and classpath it can be mapped to different dispatchers:
* - On JS and Native it is equivalent of [Default] dispatcher.
* - On JVM it is either Android main thread dispatcher, JavaFx or Swing EDT dispatcher. It is chosen by
* [`ServiceLoader`](https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html).
*
* In order to work with `Main` dispatcher, the following artifacts should be added to project runtime dependencies:
* - `kotlinx-coroutines-android` for Android Main thread dispatcher
* - `kotlinx-coroutines-javafx` for JavaFx Application thread dispatcher
* - `kotlinx-coroutines-swing` for Swing EDT dispatcher
*
* In order to set a custom `Main` dispatcher for testing purposes, add the `kotlinx-coroutines-test` artifact to
* project test dependencies.
*
* Implementation note: [MainCoroutineDispatcher.immediate] is not supported on Native and JS platforms.
*
*
* UNCONFIRMED - A coroutine dispatcher that is not confined to any specific thread.
* It executes initial continuation of the coroutine in the current call-frame
* and lets the coroutine resume in whatever thread that is used by the corresponding suspending function, without
* mandating any specific threading policy. Nested coroutines launched in this dispatcher form an event-loop to avoid
* stack overflows.
*
* ### Event loop
* Event loop semantics is a purely internal concept and have no guarantees on the order of execution
* except that all queued coroutines will be executed on the current thread in the lexical scope of the outermost
* unconfined coroutine.
*
* For example, the following code:
* ```
* withContext(Dispatchers.Unconfined) {
* println(1)
* withContext(Dispatchers.Unconfined) { // Nested unconfined
* println(2)
* }
* println(3)
* }
* println("Done")
* ```
* Can print both "1 2 3" and "1 3 2", this is an implementation detail that can be changed.
* But it is guaranteed that "Done" will be printed only when both `withContext` are completed.
*
*
* Note that if you need your coroutine to be confined to a particular thread or a thread-pool after resumption,
* but still want to execute it in the current call-frame until its first suspension, then you can use
* an optional [CoroutineStart] parameter in coroutine builders like
* [launch][CoroutineScope.launch] and [async][CoroutineScope.async] setting it to the
* the value of [CoroutineStart.UNDISPATCHED].
*
*
* IO - The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads.
*
* Additional threads in this pool are created and are shutdown on demand.
* The number of threads used by this dispatcher is limited by the value of
* "`kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property.
* It defaults to the limit of 64 threads or the number of cores (whichever is larger).
*
* This dispatcher shares threads with a [Default][Dispatchers.Default] dispatcher, so using
* `withContext(Dispatchers.IO) { ... }` does not lead to an actual switching to another thread —
* typically execution continues in the same thread.
*/

enum class ActDispatchers(val dispatcher: CoroutineDispatcher) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

зачем нам Dispatchers? что это нам даст?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Для переключения диспатчеров при создании объекта ActExecutor для разного типа задач. Если задача выполняются в ui потоке, то нужно использовать main dispatcher(тестил на примере тостов при использовании стратегии очереди и после выполнения второй задачи в рантайме ошибка доступа к ui потоку вылетала).

Copy link
Contributor Author

@dmitry-kuzhelko-velmie dmitry-kuzhelko-velmie Feb 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Поразбираюсь ещё в чем может быть дело, ведь сама задача(Act) вне кода корутины выполняется

DEFAULT(Dispatchers.Default),
MAIN(Dispatchers.Main),
UNCONFIRMED(Dispatchers.Unconfined),
IO(Dispatchers.IO)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,71 +3,119 @@ package com.velmie.actexecutor.executor
import androidx.lifecycle.Observer
import com.velmie.actexecutor.BuildConfig
import com.velmie.actexecutor.act.Act
import com.velmie.actexecutor.act.Id
import com.velmie.actexecutor.act.SimpleAct
import com.velmie.actexecutor.act.DelayAct
import com.velmie.actexecutor.act.LiveDataAct
import com.velmie.actexecutor.act.SimpleAct
import com.velmie.actexecutor.store.ActMap
import com.velmie.networkutils.core.Resource
import com.velmie.networkutils.core.Status
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import timber.log.Timber
import kotlin.system.measureTimeMillis

class ActExecutor(private val actMap: ActMap) : ActExecutorInterface {
class ActExecutor(
private val actMap: ActMap,
private val strategyType: ActStrategy = ActStrategy.DEFAULT,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

стратегия должна быть на act, а не на весь инструмент, иначе теряется ее смысл и можно дергать обычный queue

actDispatcher: ActDispatchers = ActDispatchers.DEFAULT
) : ActExecutorInterface {

private val scope: CoroutineScope = CoroutineScope(actDispatcher.dispatcher)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

зачем нам глобальная корутина для инструмента?

private var job: Job? = null

init {
if (BuildConfig.DEBUG) {
Timber.plant(Timber.DebugTree())
}
}

private val scope = CoroutineScope(Dispatchers.Default)

@Synchronized
override fun execute(act: Act) {
return when {
actMap.contains(act.id) -> {
Timber.d("id: ${act.id} - Act duplicate")
}
else -> startExecution(act)
return when (strategyType) {
ActStrategy.DEFAULT -> startAct(act)
ActStrategy.REPLACE -> startReplaceAct(act)
ActStrategy.QUEUE -> startQueueAct(act = act)
}
}

private fun startExecution(act: Act) {
actMap.add(act.id, act)
val removeFromMap = { actMap.remove(act.id) }
when (act) {
is SimpleAct -> {
act.actFunction()
removeFromMap()
}
is DelayAct -> {
val invokeTime = measureTimeMillis { act.actFunction() }
scope.launch {
delay(act.delay - invokeTime)
removeFromMap()
}
private fun startAct(act: Act) {
when {
actMap.contains(act.id) -> Timber.d("id: ${act.id} - Act duplicate")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

много дублирования

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • мне кажется что тут будет неожиданное поведение

actMap.size() == 1 -> Timber.d("Only one event can be processed at the same time")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

это в орне не верно, у тебя может быть сколько угодно событий в map

else -> executeAct(act.id, act)
}
}

private fun startReplaceAct(act: Act) {
when {
actMap.contains(act.id) -> actMap.replace(act.id, act)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ты подменяешь act и на этом все. никакой логики по replace нет

actMap.size() == 1 -> Timber.d("Only one event can be processed at the same time")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

это в орне не верно, у тебя может быть сколько угодно событий в map

else -> executeAct(act.id, act)
}
}

private fun startQueueAct(id: Id = (Math.random()).toString(), act: Act) {
executeAct(id, act)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

по этой логике у тебя и так бы работал старый код

}

private fun executeAct(id: Id, act: Act) {
try {
if (!actMap.contains(id)) {
actMap.add(id, act)
}
is LiveDataAct<*> -> {
act.liveData.observe(act.lifecycleOwner, Observer {
when (it) {
is Resource<*> -> {
act.afterAct(it)
if (it.status == Status.ERROR || it.status == Status.SUCCESS) {
removeFromMap()
}
}
else -> {

val removeFromMap = { actMap.remove(id) }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

можешь объяснить это действие? я не что-то не совсем понимаю

if ((job == null) || (job!!.isCancelled) || (job!!.isCompleted)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

зачем нам воообще тут корутины?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

для реализации задержек для DelayAct

when (act) {
is SimpleAct -> {
act.actFunction()
removeFromMap()
executeNextActIfExist()
}
is DelayAct -> {
val invokeTime = measureTimeMillis { act.actFunction() }
job = scope.launch {
delay(act.delay - invokeTime)
removeFromMap()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

job?.cancel() не нужен, у тебя корутина закончит свое выполнение как только будет вызван removeFromMap()

throw IllegalArgumentException("Type T in LiveData<T> unregistered")
job?.cancel()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

а вот для replace - уже будет нужен

executeNextActIfExist()
}
}
})
is LiveDataAct<*> -> {
act.liveData.observe(act.lifecycleOwner, Observer {
when (it) {
is Resource<*> -> {
act.afterAct(it)
if (it.status == Status.ERROR || it.status == Status.SUCCESS) {
removeFromMap()
executeNextActIfExist()
}
}
else -> {
removeFromMap()
throw IllegalArgumentException("Type T in LiveData<T> unregistered")
}
}
})
}
else -> throw IllegalArgumentException("Type Act unregistered")
}
}
else -> throw IllegalArgumentException("Type Act unregistered")
} catch (exception: IllegalArgumentException) {
throw exception
} catch (exception: java.lang.RuntimeException) {
throw RuntimeException("You cannot perform UI tasks outside the UI thread. For perform UI tasks switch to dispatcher.MAIN in the constructor ActExecutor")
}
}

private fun executeNextActIfExist() {
if (actMap.isNotEmpty()) {
val id = actMap.getFirstKey()
val value = actMap.getFirstValue()
startQueueAct(id, value)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.velmie.actexecutor.executor

/**
* Strategies that can be used for incoming events.
*
* DEFAULT - Only one event can be stored at the same time.
* Save and execute only one event and ignore all new events.
* After event is executed, it becomes possible to add new event.
*
* REPLACE - Only one event can be stored at the same time. Replace already added event and ignore all new events.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ты тут не игнорируешь ничего

*
* QUEUE - Several events can be stored at the same time. Add already added and new events to queue.
*/

enum class ActStrategy() {
DEFAULT, REPLACE, QUEUE
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

давай пока откажемся от QUEUE - так будет проще. потом следующей задачей его внесем

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,9 @@ interface ActMap {
fun add(id: Id, act: Act)
fun remove(id: Id)
fun removeAll()
fun replace(id: Id, act: Act)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

сможешь пояснить? не понимаю зачем нам .
fun getFirstKey(): Id
fun getFirstValue(): Act

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

реализация стратегии очередь подразумевает выполнение задач с одинаковыми id, для этих целей не подойдут id из объекта Act, т.к могут повторяться и перезаписываться. Поэтому при реализации этой стратегии используются рандомные уникальные id (Math.random()).toString() который вместе с value вытягиваются с помощью этих методов (весь entry set из LinkedHashMap не получается вытянуть, поэтому пришлось тянуть по отдельности)

fun getFirstKey(): Id
fun getFirstValue(): Act
fun isNotEmpty(): Boolean
fun size(): Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package com.velmie.actexecutor.store

import com.velmie.actexecutor.act.Act
import com.velmie.actexecutor.act.Id
import java.util.concurrent.ConcurrentHashMap
import java.util.Collections
import kotlin.collections.LinkedHashMap

class ConcurrentActMap : ActMap {

private val map = ConcurrentHashMap<Id, Act>()
private val map: MutableMap<Id, Act> = Collections.synchronizedMap(LinkedHashMap<Id, Act>())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

какие видишь преимущества у предложенной коллекции для нашей реализации?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

для реализации стратегии очереди нужно чтобы задачи сортировались в порядке добавления, а это реализует коллекция LinkedHashMap, для потокобезопасности её пришлось синхрошизированной сделать


override fun contains(id: Id): Boolean {
return map.containsKey(id)
Expand All @@ -25,4 +26,24 @@ class ConcurrentActMap : ActMap {
override fun removeAll() {
map.clear()
}

override fun replace(id: Id, act: Act) {
map[id] = act
}

override fun getFirstKey(): Id {
return map.keys.first()
}

override fun getFirstValue(): Act {
return map.values.first()
}

override fun isNotEmpty(): Boolean {
return map.isNotEmpty()
}

override fun size(): Int {
return map.size
}
}