KOTLIN学习-COROUTINES(协程)
协程,类似线程,非阻塞式编程(像同步编写一样),在用户态直接对线程进行管理,使用挂起当前上下文替代阻塞,从而可以复用被delay的线程,大量减少了线程资源浪费。
基本使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
fun runAsync()= runBlocking {
val time = measureTimeMillis {//系统函数统计时间
val one = async { doSomethingUsefulOne() }//异步调用,返回结果
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")//等待异步执行完成(wait调用会挂起当前线程)
}
println("Completed in $time ms")
}
//协程coroutines 调用方法需要suspend修饰
suspend fun doSomethingUsefulOne(): Int {
delay(1000L)// pretend we are doing something useful here
return13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L)// pretend we are doing something useful here, too
return29
}
这里面没有使用异步+回调,直接像写同步代码一样,简洁
launch 返回Job可取消任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
fun cancelCoroutine() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i =0
while(isActive) {// cancellable computation loop
// print a message twice a second
if(System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L)// delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin()// cancels the job and waits for its completion
println("main: Now I can quit.")
}
线程之间切换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun jumpCor(){//创建单线程coroutines
newSingleThreadContext("Ctx1").use { ctx1 ->
newSingleThreadContext("Ctx2").use { ctx2 ->
runBlocking(ctx1) {
log("Started in ctx1")
withContext(ctx2) {
log("Working in ctx2")
}
log("Back to ctx1")
}
}
}
}
方法内创建Scope
1
2
3
4
5
6
7
8
9
10
suspend fun showSomeData() = coroutineScope {
val data = async(Dispatchers.IO) {// IO task io线程调用操作
// ... load some UI data for the Main thread ...
}
withContext(Dispatchers.Main){//UI task UI更新
val result = data.await()
// display(result)
}
}
协程上下文环境,CoroutineScope,CoroutineContext
每个协程运行需要在指定Scope内才能使用协程相关方法delay,asyc,launch,创建CoroutineScope ,runBlocking函数内部会创建CoroutineScope,系统提供GlobalScope,MainScope等辅助类内部创建Scope
也可以通过CoroutineContext和Job创建自己的CoroutineScope
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fun sampleCreateCorountine(){
//create corountine scope
//自定义CoroutineScope
val coroutineContext = Dispatchers.Default
val job = Job()
val coroutineScope = CoroutineScope(coroutineContext + job)
//使用scope
coroutineScope.launch {
}
//创建全局Scope
GlobalScope.launch (Dispatchers.Default+CoroutineName("global background thread")){
}
//创建主线程分发处理Scope
MainScope().launch {
}
}
类内部定义协程
1,直接继承CoroutineScope
1
2
3
4
5
6
7
8
9
10
classSomethingWithLifecycle : CoroutineScope {
// 使用job来管理你的SomethingWithLifecycle的所有子协程
privateval job = Job()
override val coroutineContext: CoroutineContext
get() = Dispatchers.Main + job
fun destory(){//退出取消
job.cancel()
}
}
2,直接使用已定义Scope
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
classCorMyActivity : AppCompatActivity(), CoroutineScope by MainScope() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
showSomeData()
}
/**
* Note how coroutine builders are scoped: if activity is destroyed or any of the launched coroutines
in this method throws an exception, then all nested coroutines are cancelled.
*/
fun showSomeData() = launch {
// <- extension on current activity, launched in the main thread
// ... here we can use suspending functions or coroutine builders with other dispatchers
// draw(data) // draw in the main thread
}
override fun onDestroy() {
super.onDestroy()
cancel()
}
}
Dispatchers,协程分发器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
fun dispatchTask()= runBlocking<Unit> {
// it inherits the context (and thus dispatcher) from the CoroutineScope that it is being launched from.
launch {// context of the parent, main runBlocking coroutine
println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")
}
//执行coroutine是在调用者的线程,但是当在coroutine中第一个挂起之后,后面所在的线程将完全取决于
// 调用挂起方法的线程(如delay一般是由kotlinx.coroutines.DefaultExecutor中的线程调用)
//Unconfined在挂起后在delay的调用线程DefaultExecutor执行
launch(context = Dispatchers.Unconfined) {// not confined -- will work with main thread
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
}
// coroutines are launched in GlobalScope,uses shared background pool of threads
//uses the same dispatcher as GlobalScope.launch<br>//Dispatchers.Default 处理cup密集型任务,线程数为cpu内核数,最少为2,Dispatchers.IO 处理阻塞性IO,socket密集度任务,数量随任务多少变化,默认最大数量64
launch(context = Dispatchers.Default) {// will get dispatched to DefaultDispatcher
println("Default : I'm working in thread ${Thread.currentThread().name}")
}
//creates a thread for the coroutine to run
launch(newSingleThreadContext("MyOwnThread")) {// will get its own new thread
println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
}
}
协程结合Architecture ViewModel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
classNewsViewModel: ViewModel() {
privateval mApi:WebServer
init {
mApi = WebServer()
}
val dataNews: MutableLiveData<DataResource<NewsDataRsp>> by lazy {
// MutableLiveData<DataResource<NewsDataRsp>>().also {
// loadNewsData(minId=null)
// }
MutableLiveData<DataResource<NewsDataRsp>>()
}
fun loadNewsData(pageIndex:Int =1,countItem:Int =20,minId:String?=null){
runCoroutine(dataNews){
val mp = mutableMapOf("encode"to"ywjh","source"to"app","sys"to"android","banner"to"banner",
"limit"to countItem.toString(),"version"to"7002000")
if(pageIndex>1&&false==minId.isNullOrEmpty()){
mp.put("min_id",minId)
}
val response = monDataSourceApi.getNewsData(mp).execute()
return@runCoroutineresponse.body()!!
}
}
fun fetchNews(pageIndex:Int =1,countItem:Int =20,minId:String){
val mp = mutableMapOf("encode"to"ywjh","source"to"app","sys"to"android","banner"to"banner",
"limit"to countItem.toString(),"version"to"7002000")
if(pageIndex>1&&false==minId.isNullOrEmpty()){
mp.put("min_id",minId)
}
val cor = CoroutineScope(Dispatchers.IO)
cor.launch {
try{
val response = monDataSourceApi.getNewsData(mp).execute()
dataNews.postValue(DataResource(PLETED, response.body(),null))
}catch(exception: Exception) {
dataNews.postValue(DataResource(PLETED,null, exception))
}
}
}
suspend fun simpleGetData(pageIndex:Int =1,countItem:Int =20,minId:String) = withContext(Dispatchers.IO) {
val mp = mutableMapOf("encode"to"ywjh","source"to"app","sys"to"android","banner"to"banner",
"limit"to countItem.toString(),"version"to"7002000")
if(pageIndex>1&&false==minId.isNullOrEmpty()){
mp.put("min_id",minId)
}
try{
val response = monDataSourceApi.getNewsData(mp).execute()
dataNews.postValue(DataResource(PLETED, response.body(),null))
}catch(exception: Exception) {
dataNews.postValue(DataResource(PLETED,null, exception))
}
}
privatefun <T> runCoroutine(correspondenceLiveData: MutableLiveData<DataResource<T>>, block: suspend () -> T) {
correspondenceLiveData.value = DataResource(DataResource.Status.LOADING,null,null)
GlobalScope.launch(Dispatchers.IO) {
try{
val result = block()
correspondenceLiveData.postValue(DataResource(PLETED, result,null))
}catch(exception: Exception) {
// val error = ErrorConverter.convertError(exception)
correspondenceLiveData.postValue(DataResource(PLETED,null, exception))
}
}
}
}