2025年4月13日·阅读约1分钟

Kotlin Coroutines vs RxJava:网络与后台工作的比较

Kotlin Coroutines vs RxJava:比较在真实 Android 应用中关于取消、错误处理和测试模式的差异,针对网络与后台工作提供实用建议。

Kotlin Coroutines vs RxJava:网络与后台工作的比较

为什么这个选择对生产环境网络很重要

在真实的 Android 应用中,网络和后台工作不只是一次 API 请求。它包括登录与令牌刷新、在加载过程中旋转屏幕、用户离开界面后的同步、照片上传,以及不能耗尽电池的定期任务。

最令人头疼的 bug 通常不是语法错误,而是在异步工作比 UI 存活时间更长(泄漏)、取消只停止了 UI 却没有停止实际请求(浪费流量与卡住的加载指示器)、重试导致请求倍增(触发限速或封禁),或者不同层对错误的处理方式不一致以至于没人能确定用户最终看到什么。

Kotlin Coroutines vs RxJava 的决策会影响日常的可靠性:

  • 你如何建模工作(一次性调用 vs 流)
  • 取消如何传播
  • 错误如何表示并呈现给 UI
  • 如何为网络、磁盘和 UI 控制线程
  • 如何测试时序、重试和边缘情况

下面的模式聚焦于在高负载或慢网络下容易出问题的点:取消、错误处理、重试与超时,以及能防止回归的测试习惯。示例保持简短且实用。

核心心智模型:suspend 调用、streams 与 Flow

Kotlin Coroutines vs RxJava 之间的主要差别在于你要建模的工作的形状。

一个 suspend 函数表示一次性操作。它要么返回一个值,要么抛出一次失败。这与大多数网络调用相匹配:获取个人资料、更新设置、上传照片。调用代码从上到下可读性好,即便后来加上日志、缓存和分支也容易理解。

RxJava 的出发点是先问你是在处理单个值还是随时间变化的多个值。Single 是一次性结果(成功或错误)。Observable(或 Flowable)是一个可以发出多个值、然后完成或失败的流。它适合真正事件化的功能:文本变化事件、websocket 消息或轮询。

Flow 是与协程友好的流表示方式。把它看作协程的“流版本”,具有结构化取消,与挂起 API 直接契合。

一个快速经验法则:

  • 对一次请求一次响应使用 suspend
  • 对随时间变化的值使用 Flow
  • 当你的应用已经大量依赖操作符和复杂流组合时,再考虑用 RxJava。

随着功能增长,可读性通常在你把流模型强行套到一次性调用上,或把持续事件当作单一返回值处理时率先崩坏。先匹配抽象与现实,再在其上建立约定。

取消在实践中(带简短代码示例)

取消决定异步代码是安全可控,还是会导致随机崩溃和多余请求。目标很简单:当用户离开界面时,为该界面启动的任何工作都应停止。

在 Kotlin 协程中,取消是模型的一部分。一个 Job 表示工作;有了结构化并发,你通常不需要到处传递 Job。你在某个 scope(比如 ViewModel scope)内部启动工作,当该 scope 被取消时,里面的所有内容都会被取消。

class ProfileViewModel(
    private val api: Api
) : ViewModel() {

    fun loadProfile() = viewModelScope.launch {
        // If the ViewModel is cleared, this coroutine is cancelled,
        // and so is the in-flight network call (if the client supports it).
        val profile = api.getProfile() // suspend
        // update UI state here
    }
}

两个生产级细节很重要:

  • 通过可取消的客户端调用挂起的网络方法。否则协程会停止但 HTTP 调用可能继续运行。
  • 对必须不能挂起的请求使用 withTimeout(或 withTimeoutOrNull)。

RxJava 使用显式的 dispose。你为每个订阅保留一个 Disposable,或者把它们收集到 CompositeDisposable。当屏幕关闭时,你 dispose,链条应该停止。

class ProfilePresenter(private val api: ApiRx) {
    private val bag = CompositeDisposable()

    fun attach() {
        bag += api.getProfile()
            .subscribe(
                { profile -> /* render */ },
                { error -> /* show error */ }
            )
    }

    fun detach() {
        bag.clear() // cancels in-flight work if upstream supports cancellation
    }
}

一个实用的屏幕退出规则:如果你无法指出取消发生在哪里(scope 取消或 dispose()),就假设工作会继续运行,并在发布前修复它。

易于理解的错误处理

Kotlin Coroutines vs RxJava 的一个大区别是错误如何传递。协程让失败看起来像普通代码:挂起调用会抛出异常,调用方决定如何处理。Rx 则通过流推进失败,这很强大,但如果不小心容易把问题隐藏起来。

当遇到意外故障(超时、500、解析错误)时使用异常。需要在域模型中表达并让 UI 特别处理的错误(比如密码错误、邮箱已被使用),应当把错误建模为数据。

一个简单的协程模式可以保留堆栈信息并保持可读:

suspend fun loadProfile(): Profile = try {
    api.getProfile() // may throw
} catch (e: IOException) {
    throw NetworkException("No connection", e)
}

当你确实想返回成功或失败而不抛出时,runCatchingResult 很有用:

suspend fun loadProfileResult(): Result<Profile> =
    runCatching { api.getProfile() }

如果你用 getOrNull(),要小心没有同时处理失败的情况,那会悄悄把真实的 bug 变成“空状态”界面。

在 RxJava 中,让错误路径显式化。仅在安全回退时使用 onErrorReturn。当你需要切换数据源(例如退回到缓存)时,优先使用 onErrorResumeNext。对于重试,使用 retryWhen 并把规则限定好,这样不会在“密码错误”这类不可重试错误上反复尝试。

防止错误被吞掉的一组习惯:

  • 在有上下文的地方记录或上报错误(只记录一次)。
  • 在包装异常时保留原始异常作为 cause
  • 避免把所有错误都变成通用回退值的 catch-all。
  • 把面向用户的错误做成有类型的模型,而不是字符串。

线程基础:Dispatchers vs Schedulers

Ship common features quickly
Add authentication and payments with built-in modules when you need them.
Explore AppMaster

很多异步 bug 最终和线程有关:在主线程做繁重工作,或在后台线程更新 UI。Kotlin Coroutines vs RxJava 的主要差别在于你如何表达线程切换。

在协程里,你通常在主线程开始 UI 工作,再跳到后台 dispatcher 去做昂贵操作。常用选择:

  • Dispatchers.Main 用于 UI 更新
  • Dispatchers.IO 用于阻塞 I/O(网络、磁盘)
  • Dispatchers.Default 用于 CPU 密集型任务(JSON 解析、排序、加密)

一个直接的模式是:获取数据,在主线程外解析,然后渲染。

viewModelScope.launch(Dispatchers.Main) {
    val json = withContext(Dispatchers.IO) { api.fetchProfileJson() }
    val profile = withContext(Dispatchers.Default) { parseProfile(json) }
    _uiState.value = UiState.Content(profile)
}

RxJava 用 subscribeOn 表示“哪里执行工作”,用 observeOn 表示“在哪里观察结果”。一个常见的惊讶是期望 observeOn 会影响上游工作,但它不会。subscribeOn 为源和它以上的操作符设置线程,而每个 observeOn 从该点起切换线程。

api.fetchProfileJson()
    .subscribeOn(Schedulers.io())
    .map { json -> parseProfile(json) } // still on io unless you change it
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        { profile -> render(profile) },
        { error -> showError(error) }
    )

避免惊讶的一个规则:把 UI 工作集中到一个地方。在协程中,在 Dispatchers.Main 上分配或收集 UI 状态;在 RxJava 中,在渲染前放一个最终的 observeOn(main),不要随意散布多个 observeOn

如果界面卡顿,先把解析和映射移出主线程。单次改动就能修复很多真实世界的问题。

针对网络调用的重试、超时和并行工作

Avoid tech debt early
Get real source code you can review, extend, and maintain as requirements change.
Generate Code

顺利路径通常不是问题。问题来自挂起的调用、把事情越搞越糟的重试,或并行工作并没有真正并行。这些模式常常决定团队偏好 Kotlin Coroutines vs RxJava 的原因。

快速失败的超时

在协程里,你可以在任何挂起调用周围加一个硬性上限。把超时放在靠近调用处,这样能展示正确的 UI 提示。

val user = withTimeout(5_000) {
    api.getUser() // suspend
}

在 RxJava 中,你把 timeout 操作符附到流上。这在超时行为应该是共享管道一部分时很有用。

不会造成损害的重试

只在重试是安全的情况下重试。一个简单规则:对幂等请求(如 GET)可以更大方地重试;对会产生副作用的请求(如“创建订单”)则要谨慎。无论如何要限制次数并加入延迟或抖动。

合适的默认保护措施:

  • 在网络超时和临时服务错误上重试。
  • 不要在验证错误(400)或认证失败上重试。
  • 限制重试次数(通常 2–3 次)并记录最终失败。
  • 使用退避延迟,避免对服务器造成瞬时冲击。

在 RxJava 中,retryWhen 让你表达“只在这些错误上以这种延迟重试”。在协程里,Flow 有 retryretryWhen,而普通的 suspend 函数通常用一个带延迟的小循环来实现。

代码不纠结的并行调用

协程让并行工作更直接:同时启动两个请求,然后等待两者完成。

coroutineScope {
    val profile = async { api.getProfile() }
    val feed = async { api.getFeed() }
    profile.await() to feed.await()
}

当把多个源合并是链条主要目的时,RxJava 很擅长。zip 是等待两者的常用工具,merge 适合想尽快得到任一结果的场景。

对于大或快速的流,背压依然重要。RxJava 的 Flowable 在背压处理上更成熟。Coroutines 的 Flow 在很多场景下也能很好应对,但当事件产生速度超过 UI 或数据库写入速度时,你仍可能需要缓冲或丢弃策略。

互操作与迁移模式(混合代码库)

大多数团队不会一夜之间切换。实用的 Kotlin Coroutines vs RxJava 迁移会在逐模块迁移的同时保持应用稳定。

把 Rx API 包装成挂起函数

如果你有现成的 Single<T>Completable,把它包装成带取消支持的挂起函数,这样被取消的协程会 dispose 掉 Rx 订阅。

suspend fun <T : Any> Single<T>.awaitCancellable(): T =
  suspendCancellableCoroutine { cont ->
    val d = subscribe(
      { value -> cont.resume(value) {} },
      { error -> cont.resumeWithException(error) }
    )
    cont.invokeOnCancellation { d.dispose() }
  }

这能避免一个常见的失败模式:用户离开屏幕后协程被取消,但网络调用继续运行并在稍后更新共享状态。

向 Rx 调用方暴露协程代码

迁移期间,某些层级仍然期望 Rx 类型。把挂起工作包装到 Single.fromCallable,并只在后台线程阻塞。

fun loadProfileRx(api: Api): Single<Profile> =
  Single.fromCallable {
    runBlocking { api.loadProfile() } // ensure subscribeOn(Schedulers.io())
  }

把这个边界保持小且记录清楚。对新代码,优先在协程作用域中直接调用挂起 API。

Flow 的适用场景与局限

Flow 可以替代很多 Observable 的用例:UI 状态、数据库更新和类似分页的流。如果你依赖热流、subjects、复杂的背压调优或大量自定义操作符,Flow 可能不如 Rx 直接。

降低混淆的迁移策略:

  • 先把叶子模块(网络、存储)转换为挂起 API。
  • 在模块边界加小的适配器(Rx 到 suspend,suspend 到 Rx)。
  • 只有在你也控制消费者时,才把 Rx 流替换为 Flow。
  • 每个功能区保持一种异步风格。
  • 在最后一个调用者迁移后删除适配器。

你会真正用到的测试模式

Create your backend fast
Model data, add auth, and ship endpoints without hand-writing server boilerplate.
Start Building

时序和取消问题是异步 bug 的藏身之处。好的异步测试能让时间确定且结果易断言。这又是 Kotlin Coroutines vs RxJava 感觉不同的地方,尽管两者都能很好测试。

协程:runTest、TestDispatcher 与时间控制

对协程代码,优先使用 runTest 和测试调度器,这样测试不依赖真实线程或真实延迟。虚拟时间让你触发超时、重试和防抖窗口而不需要睡眠。

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `emits Loading then Success`() = runTest {
  val dispatcher = StandardTestDispatcher(testScheduler)
  val repo = Repo(api = fakeApi, io = dispatcher)

  val states = mutableListOf<UiState>()
  val job = launch(dispatcher) { repo.loadProfile().toList(states) }

  testScheduler.runCurrent() // run queued work
  assert(states.first() is UiState.Loading)

  testScheduler.advanceTimeBy(1_000) // trigger delay/retry windows
  testScheduler.runCurrent()
  assert(states.last() is UiState.Success)

  job.cancel()
}

要测试取消,取消收集的 Job(或父作用域)并断言你的 fake API 停止或不再发出更多状态。

RxJava:TestScheduler、TestObserver、确定性时间

Rx 测试通常结合 TestScheduler 控制时间和 TestObserver 做断言。

@Test
fun `disposes on cancel and stops emissions`() {
  val scheduler = TestScheduler()
  val observer = TestObserver<UiState>()

  val d = repo.loadProfileRx(scheduler)
    .subscribeWith(observer)

  scheduler.triggerActions()
  observer.assertValueAt(0) { it is UiState.Loading }

  d.dispose()
  scheduler.advanceTimeBy(1, TimeUnit.SECONDS)
  observer.assertValueCount(1) // no more events after dispose
}

在任一风格测试错误路径时,关注映射结果而不是异常类型。断言在 401、超时或响应格式错误后你期待的 UI 状态。

一小组检查点能覆盖大多数回归:

  • 加载与最终状态(Success、Empty、Error)
  • 取消清理(job 被取消,disposable 被 dispose)
  • 错误映射(服务端代码到用户消息)
  • 重试后没有重复发射
  • 使用虚拟时间测试基于时间的逻辑,而不是实睡

导致生产问题的常见错误

大多数生产问题并不是因为选了 Kotlin Coroutines 还是 RxJava,而是一些让工作比你预期运行更久、运行两次或在错误线程更新 UI 的习惯。

一个常见的泄漏是把工作启动在错误的 scope 中。如果你从比屏幕寿命更长的 scope 启动网络调用(或者你创建自己的 scope 却从不取消),请求在用户离开后仍可能完成并尝试更新状态。在协程中,这通常表现为默认使用长生命周期的 scope;在 RxJava 中,通常是忘记 dispose。

另一个经典问题是“fire and forget”。全局 scope 和遗忘的 Disposables 在刚开始看起来没问题,直到工作堆积。一个在每次 resume 都刷新的聊天界面,经过几次导航后很容易积累多个刷新任务,占用内存并争抢网络。

重试也容易出错。无限重试或无延迟重试会轰炸后端并耗电。当失败是永久性的(比如登出后的 401)时尤其危险。让重试有条件、生效次数有限并加上退避。

线程错误会导致难以复现的崩溃。你可能在某处在主线程解析 JSON,或在后台线程更新 UI,这取决于你放置 dispatcher 或 scheduler 的位置。

快速检查能捕捉多数问题:

  • 把工作绑定到生命周期所有者,并在该所有者结束时取消。
  • 把清理放在显眼位置:在一个地方取消 Jobs 或清空 Disposables。
  • 对重试设严格限制(次数、延迟和哪些错误可重试)。
  • 在代码评审中坚持一条 UI 更新规则(只在主线程)。
  • 把后台同步视作有约束的系统,而不是随意的函数调用。

如果你从生成的 Kotlin 代码发布 Android 应用(例如来自 AppMaster),相同的陷阱仍然存在。你仍然需要关于 scope、取消、重试上限和线程规则的明确约定。

用来选择 Coroutines、RxJava 或两者的快速清单

Test async flows sooner
Prototype login, profile, and sync flows with visual logic instead of fragile glue code.
Try It

从工作的形状开始。大多数网络调用是一次性的,但应用也有持续信号,如连接性、认证状态或实时更新。早期选错抽象通常会在后期表现为难以处理的取消与复杂难读的错误路径。

一个简单的决策(以及向团队解释选择的方法):

  • 一次性请求(登录、获取资料):优先用 suspend 函数。
  • 持续流(事件、数据库更新):优先用 Flow 或 Rx 的 Observable
  • UI 生命周期取消:在 viewModelScopelifecycleScope 中使用协程通常比手动管理 disposables 简单。
  • 强烈依赖高级流操作符和背压:旧代码库中 RxJava 可能仍更适合。
  • 复杂的重试与错误映射:选团队能保持可读性的方案。

一个实用规则:如果一个屏幕发起一次请求并渲染一次结果,协程让代码更接近普通函数调用;如果你在构建很多事件的管道(输入、去抖、取消前一个请求、合并过滤器),RxJava 或 Flow 往往更自然。

一致性胜过完美。两个在各处广泛使用的良好模式,比五个不一致的“最佳”模式更易维护。

示例场景:登录、获取资料与后台同步

One stack for UI and API
Build web and mobile clients alongside your backend to keep error handling consistent.
Create App

一个常见的生产流程是:用户点击登录,你调用认证端点,然后获取首页的资料,最后启动后台同步。这里 Kotlin Coroutines vs RxJava 在日常维护中会有不同的体验。

协程版本(顺序 + 可取消)

在协程中,“先做这个,再做那个”的结构很自然。如果用户关闭屏幕,取消作用域会停止正在进行的工作。

suspend fun loginAndLoadProfile(): Result<Profile> = runCatching {
  val token = api.login(email, password) // suspend
  val profile = api.profile("Bearer $token")
  syncManager.startSyncInBackground(token) // fire-and-forget
  profile
}.recoverCatching { e ->
  throw when (e) {
    is HttpException -> when (e.code()) {
      401 -> AuthExpiredException()
      in 500..599 -> ServerDownException()
      else -> e
    }
    is IOException -> NoNetworkException()
    else -> e
  }
}

// UI layer
val job = viewModelScope.launch { loginAndLoadProfile() }
override fun onCleared() { job.cancel() }

RxJava 版本(链 + dispose)

在 RxJava 中,同样的流程通常写成一条链。取消意味着 dispose,通常使用 CompositeDisposable

val d = api.login(email, password)
  .flatMap { token -> api.profile("Bearer $token").map { it to token } }
  .doOnSuccess { (_, token) -> syncManager.startSyncInBackground(token) }
  .onErrorResumeNext { e: Throwable ->
    Single.error(
      when (e) {
        is HttpException -> if (e.code() == 401) AuthExpiredException() else e
        is IOException -> NoNetworkException()
        else -> e
      }
    )
  }
  .subscribe({ (profile, _) -> show(profile) }, { showError(it) })

compositeDisposable.add(d)
override fun onCleared() { compositeDisposable.clear() }

这里的最小测试套件应覆盖三种结果:成功、被映射的失败(401、500、无网络)以及取消/ dispose。

接下来的步骤:选定约定并保持一致

团队通常出问题是因为不同功能间的模式不一致,而不是 Kotlin Coroutines vs RxJava 本身错。一个简短的决策说明(哪怕一页)能节省审查时间并让行为可预测。

从一个明确的划分开始:一次性工作(单次网络调用) vs 流(随时间的更新,如 websocket、位置或数据库变化)。决定默认方案并定义允许例外的情况。

然后添加一组共享的辅助工具,让每个功能在网络不稳定时表现一致:

  • 一个把 HTTP 代码、超时、离线等映射成应用级失败的集中点
  • 网络调用的默认超时值,并能在需要长时间操作时覆盖
  • 一个说明哪些请求可安全重试的重试策略
  • 一个取消规则:用户离开屏幕会停止什么、允许继续什么
  • 有助于排查而不泄露敏感数据的日志规范

测试约定同样重要。统一方法能让测试不依赖真实时间或线程。对协程通常意味着测试调度器与结构化作用域;对 RxJava 通常意味着测试调度器与显式 dispose。无论哪种方式,都应追求快速、确定性的测试且不使用 sleep。

如果你想整体加速开发,AppMaster (appmaster.io) 是一个可选项,可以生成后端 API 和基于 Kotlin 的移动应用,减少大量手写代码。即便使用生成代码,关于取消、错误、重试和测试的生产级约定仍然是保证网络行为可预测的关键。

常见问题

When should I use a suspend function vs a stream for networking?

默认对一次性只返回一次的请求使用 suspend,例如登录或获取资料。对于随时间变化的值(如 websocket 消息、连接状态或数据库更新),使用 Flow(或 Rx 流)。

Does coroutine cancellation actually stop an in-flight network request?

是的,但前提是你的 HTTP 客户端支持可取消。协程会在作用域被取消时停止对应的 coroutine,但如果底层 HTTP 请求本身不能被取消,网络请求可能仍会在后台继续运行。

What’s the safest way to prevent leaks when the user leaves a screen?

把工作绑定到生命周期作用域(例如 viewModelScope),这样在屏幕逻辑结束时会自动取消。避免在长生命周期或全局作用域中随意启动任务,除非这个工作确实是全局需要的。

How should error handling differ between Coroutines and RxJava?

在协程中,失败通常会抛出异常,并用 try/catch 在靠近能映射 UI 状态的地方处理。RxJava 中错误会沿着流传播,因此要显式处理错误路径,避免使用把失败悄悄变成默认值的操作符。

Should I model errors as exceptions or as data?

对意外故障(超时、500、解析错误等)用异常表示。对 UI 需要明确响应的情况(例如“密码错误”或“邮箱已被使用”),用有类型的错误数据建模,而不是依赖字符串匹配。

What’s a simple way to add timeouts without making code messy?

把超时放在能展示正确 UI 信息的呼叫附近。在协程中对 suspend 调用使用 withTimeout 很直接;在 RxJava 中,用 timeout 操作符把超时变成数据流的一部分。

How do I implement retries without causing duplicate requests or bans?

仅在安全时重试,通常对幂等请求(如 GET)更宽松。限制重试次数(通常 2–3 次),不要对验证错误或认证失败重试,添加延迟或抖动,避免轰炸服务器或耗电。

What’s the main threading pitfall with Dispatchers vs Schedulers?

常见做法是:协程用 Dispatchers,通常在主线程开始 UI 工作,再切到 IODefault 做昂贵计算。RxJava 用 subscribeOn 指定上游在哪儿运行,用 observeOn 指定在哪儿消费结果;在渲染前放一个最终的 observeOn(main),避免意外的线程问题。

Can I mix RxJava and Coroutines during a migration?

可以,但要把边界保持小且支持取消。把 Rx 包装成可取消的 suspend(在协程取消时 dispose 掉 Rx 订阅),并且仅在少量、文档化的桥接点上向 Rx 暴露挂起函数。

How do I test cancellation, retries, and time-based logic reliably?

用虚拟时间让测试不依赖真实线程或睡眠。协程用 runTest 和测试调度器(TestDispatcher),可以精确控制延迟和取消;RxJava 用 TestScheduler,并断言在 dispose() 后没有更多事件发出。

容易上手
创造一些 惊人的东西

使用免费计划试用 AppMaster。
准备就绪后,您可以选择合适的订阅。

开始吧