Kotlin Coroutines vs RxJava: ネットワーキングとバックグラウンド処理
Kotlin Coroutines と RxJava を比較し、キャンセル、エラー処理、テスト手法を実際の Android アプリのネットワーキングやバックグラウンド処理の観点から解説します。

本番向けネットワーキングでこの選択が重要な理由
実際の Android アプリでのネットワーキングやバックグラウンド処理は、単一の API 呼び出し以上のものです。ログインやトークンリフレッシュ、読み込み中に画面が回転するケース、ユーザーが画面を離れた後の同期、写真アップロード、バッテリーを枯渇させない定期作業などが含まれます。
最も問題になるバグは通常、文法的な問題ではありません。非同期処理が UI より長く生き続けてしまう(リーク)、キャンセルが UI 側を止めるが実際のリクエストは止めない(無駄な通信やハマったスピナー)、リトライがリクエストを増やしてしまう(レート制限やブロック)、異なるレイヤーがエラーを別々に扱って誰もユーザーが見るものを予測できない、などの形で現れます。
Kotlin Coroutines と RxJava の選択は、日常的な信頼性に影響します。
- 作業のモデル化方法(ワンショット呼び出し vs ストリーム)
- キャンセルの伝播方法
- エラーの表現と UI への伝え方
- ネットワーク、ディスク、UI のスレッド制御方法
- タイミング、リトライ、エッジケースのテストしやすさ
以下のパターンは、負荷時や遅いネットワークで壊れやすい箇所、つまりキャンセル、エラー処理、リトライとタイムアウト、回帰を防ぐテスト習慣に焦点を当てています。例は短く実用的に保っています。
コアのメンタルモデル: suspend 呼び出し、ストリーム、Flow
Kotlin Coroutines と RxJava の主な違いは、扱う作業の形です。
suspend 関数はワンショット操作を表します。1つの値を返すか、1つの失敗を投げます。これは大抵のネットワーキング呼び出し(プロフィール取得、設定更新、写真アップロード)に合います。呼び出し側のコードは上から下へ読むので、ログ、キャッシュ、分岐を追加しても読みやすさが保たれます。
RxJava はまず「1つの値か、時間を通して多数の値か」を考えます。Single はワンショットの結果(成功かエラー)を表します。Observable(またはFlowable)は多数の値を発行し、完了するか失敗するストリームです。これはテキスト変更イベント、WebSocket メッセージ、ポーリングのようなイベント的な機能に合います。
Flow はコルーチンと親和性の高いストリーム表現です。構造化されたキャンセルとサスペンド API との直接的な適合を持つ“ストリーム版”のコルーチンと考えてください。
簡単な目安:
- 1 回のリクエストと 1 回のレスポンスには
suspendを使う。 - 値が時間とともに変化する場合は
Flowを使う。 - 既にオペレーターや複雑なストリーム合成を多用しているアプリなら RxJava を使うことを検討する。
機能が増えると、ワンショットの呼び出しに無理やりストリームモデルを当てはめたり、継続的なイベントを単一の戻り値のように扱おうとすると、可読性が真っ先に壊れます。まず抽象を現実に合わせ、次にそれに基づいた慣習を作ってください。
実務でのキャンセル(短いコード例付き)
キャンセルは、非同期コードが安全に感じられるか、ランダムなクラッシュや無駄な呼び出しになるかを決めます。目標は単純:ユーザーが画面を離れたら、その画面のために開始された作業は止まるべきです。
Kotlin Coroutines では、キャンセルはモデルに組み込まれています。Job は作業を表し、構造化された並行処理では通常 Job をあちこち渡しません。ViewModel スコープのようなスコープ内で作業を開始すると、そのスコープがキャンセルされたときに内部のすべてがキャンセルされます。
class ProfileViewModel(
private val api: Api
) : ViewModel() {
fun loadProfile() = viewModelScope.launch {
// If the ViewModel is cleared, this coroutine is cancelled,
// and so is the in-flight network call (if the client supports it).
val profile = api.getProfile() // suspend
// update UI state here
}
}
本番で重要な点が二つあります:
- キャンセル可能なクライアント経由で suspend ネットワーキングを呼ぶこと。そうしないと、コルーチンは止まっても HTTP 呼び出しは動き続けることがあります。
- ハングしてはいけないリクエストには
withTimeout(あるいはwithTimeoutOrNull)を使うこと。
RxJava は明示的な破棄(disposal)を使います。各購読について Disposable を保持するか、CompositeDisposable に集めます。画面が消えるときに dispose すればチェーンは止まるはずです。
class ProfilePresenter(private val api: ApiRx) {
private val bag = CompositeDisposable()
fun attach() {
bag += api.getProfile()
.subscribe(
{ profile -> /* render */ },
{ error -> /* show error */ }
)
}
fun detach() {
bag.clear() // cancels in-flight work if upstream supports cancellation
}
}
実務的なルール:キャンセルがどこで行われているか(スコープキャンセルや dispose())を指し示せないなら、その作業は実行を続けると仮定してリリース前に直してください。
理解しやすいエラー処理
Kotlin Coroutines と RxJava の大きな違いは、エラーの伝わり方です。コルーチンでは失敗が通常のコードのように見えます:サスペンド呼び出しが例外を投げ、呼び出し元がどうするかを決めます。Rx は失敗をストリーム上に流すので強力ですが、不注意だと問題を隠してしまいやすいです。
予期しない失敗(タイムアウト、500系、パースバグなど)には例外を使ってください。UI が特定の応答(パスワード間違い、"メールは既に使用されています" など)を必要とする場合、そのエラーをドメインモデルの一部としてデータで表現してください。
単純なコルーチンパターンはスタックトレースを保ち、読みやすさを保ちます:
suspend fun loadProfile(): Profile = try {
api.getProfile() // may throw
} catch (e: IOException) {
throw NetworkException("No connection", e)
}
runCatching と Result は、例外を投げずに成功か失敗かを返したいときに便利です。
suspend fun loadProfileResult(): Result<Profile> =
runCatching { api.getProfile() }
失敗を扱わずに getOrNull() を多用すると、本当のバグを「空の状態」画面に静かに変えてしまうので注意してください。
RxJava ではエラーパスを明示的に保つことが重要です。安全なフォールバックにのみ onErrorReturn を使い、キャッシュデータに切り替えるときは onErrorResumeNext を使ってください。リトライには retryWhen を使い、例えば「パスワード間違い」ではリトライしないようにするなどルールを狭く保ちます。
エラーを飲み込ませないための習慣:
- コンテキストの近くで一度だけログまたはレポートする。
- ラップする際は元の例外を
causeとして保持する。 - すべてのエラーをデフォルト値に変えるようなキャッチオールのフォールバックは避ける。
- ユーザー向けエラーは文字列ではなく型付きモデルにする。
スレッドの基本: Dispatchers と Schedulers
多くの非同期バグはスレッドに起因します:重い処理をメインスレッドで行ったり、バックグラウンドスレッドから UI を触ったり。Kotlin Coroutines と RxJava の違いは、スレッド切り替えの表現方法にあります。
コルーチンでは、UI 作業をメインで始めて、重い部分をバックグラウンドのディスパッチャに移すことが多いです。一般的な選択は:
Dispatchers.Mainは UI 更新用Dispatchers.IOはネットワークやディスクのブロッキング I/ODispatchers.Defaultは JSON パースやソート、暗号化などの CPU 作業
単純なパターンは、データを取得し、メイン外でパースし、描画することです。
viewModelScope.launch(Dispatchers.Main) {
val json = withContext(Dispatchers.IO) { api.fetchProfileJson() }
val profile = withContext(Dispatchers.Default) { parseProfile(json) }
_uiState.value = UiState.Content(profile)
}
RxJava は subscribeOn で「どこで作業するか」を、observeOn で「どこで結果を観測するか」を表します。よくある落とし穴は observeOn が上流の作業に影響すると思うことです。しません。subscribeOn はソースとそれより上のオペレーターのスレッドを設定し、各 observeOn はその時点以降のスレッドを切り替えます。
api.fetchProfileJson()
.subscribeOn(Schedulers.io())
.map { json -> parseProfile(json) } // still on io unless you change it
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ profile -> render(profile) },
{ error -> showError(error) }
)
驚きを避けるルール:UI 作業は一箇所にまとめる。コルーチンでは Dispatchers.Main で状態を割り当てたり収集したりします。RxJava では描画直前に一度 observeOn(main) を置き、余分な observeOn は不要な限り散らさないでください。
画面がカクつくなら、まずパースやマッピングをメイン外に移してください。その変更だけで実際の問題の多くが解決します。
タイムアウト、リトライ、並列処理
ハッピーパス以外が問題になることが多いです。ハングする呼び出し、リトライで状況が悪化するケース、実は並列でない「並列」処理などが原因です。これらのパターンはチームが Kotlin Coroutines と RxJava のどちらを選ぶかに影響します。
早く失敗させるタイムアウト
コルーチンでは任意の suspend 呼び出しの周りにハードキャップを置けます。タイムアウトは呼び出し箇所の近くに置いて、適切な UI メッセージを出せるようにします。
val user = withTimeout(5_000) {
api.getUser() // suspend
}
RxJava ではストリームに timeout 演算子を付けます。タイムアウト動作を共通のパイプラインの一部にしたいときに便利です。
ダメージを与えないリトライ
リトライは安全な場合だけ行ってください。簡単なルール:冪等なリクエスト(GET など)はリトライしやすく、サイドエフェクトがあるリクエスト(注文作成など)は控えます。回数を制限し、遅延やジッターを入れてください。
良いデフォルトのガードレール:
- ネットワークのタイムアウトや一時的なサーバーエラーでリトライする
- バリデーションエラー(400系)や認証失敗ではリトライしない
- リトライ回数を制限(通常 2〜3 回)し、最終失敗をログに残す
- サーバーを叩きすぎないようバックオフとジッターを使う
RxJava の retryWhen は「どのエラーで、どの遅延でリトライするか」を表現できます。コルーチンでは Flow に retry や retryWhen があり、plain な suspend 関数では小さなループと delay を使うことが多いです。
複雑にならない並列呼び出し
コルーチンは並列作業が直接的です:2つのリクエストを開始して両方を待ちます。
coroutineScope {
val profile = async { api.getProfile() }
val feed = async { api.getFeed() }
profile.await() to feed.await()
}
RxJava は複数ソースの結合がチェーンの主目的である場合に強みを発揮します。zip は両方を待つのに使われ、merge は到着した順に結果を受け取りたいときに便利です。
大きなストリームや高速なストリームではバックプレッシャーが重要です。RxJava の Flowable は成熟したバックプレッシャーツールを持っています。コルーチンの Flow も多くのケースを十分に扱えますが、イベントが UI やデータベース書き込みの速度を超える場合はバッファリングやドロップポリシーが必要になることがあります。
相互運用とマイグレーション(混在コードベース)
ほとんどのチームは一夜にして切り替えません。実用的な Kotlin Coroutines と RxJava のマイグレーションは、モジュール単位で移行しつつアプリを安定させます。
Rx API を suspend 関数にラップする
既存の Single<T> や Completable がある場合、キャンセル対応のラッパーを作って、キャンセルされたコルーチンが Rx の購読を破棄するようにします。
suspend fun <T : Any> Single<T>.awaitCancellable(): T =
suspendCancellableCoroutine { cont ->
val d = subscribe(
{ value -> cont.resume(value) {} },
{ error -> cont.resumeWithException(error) }
)
cont.invokeOnCancellation { d.dispose() }
}
これによりよくある失敗モードを避けられます:ユーザーが画面を離れてコルーチンがキャンセルされても、ネットワーク呼び出しが動き続けて共有状態を後で更新してしまう、という問題です。
コルーチンコードを Rx 呼び出し側に公開する
マイグレーション中は一部のレイヤーがまだ Rx 型を期待することがあります。Single.fromCallable でサスペンド処理をラップし、バックグラウンドスレッドでだけブロックするようにします。
fun loadProfileRx(api: Api): Single<Profile> =
Single.fromCallable {
runBlocking { api.loadProfile() } // ensure subscribeOn(Schedulers.io())
}
この境界は小さく、ドキュメント化しておいてください。新しいコードでは、コルーチンスコープから直接 suspend API を呼ぶ方が望ましいです。
Flow が適する場所、そうでない場所
Flow は多くの Observable のユースケースを置き換えられます:UI 状態、データベース更新、ページングに似たストリームなど。ただし、ホットストリーム、サブジェクト、大規模なカスタム演算子群や高度なバックプレッシャー調整に強く依存している場合は直接的ではないことがあります。
混乱を避けるマイグレーション戦略:
- 葉のモジュール(ネットワーク、ストレージ)を先に suspend API に変換する。
- モジュール境界で小さなアダプタ(Rx→suspend、suspend→Rx)を追加する。
- 消費側もコントロールできる場合に限り、Rx ストリームを Flow に置き換える。
- 機能領域ごとに一つの非同期スタイルにまとめる。
- 最後の呼び手が移行したらアダプタを削除する。
実際に使うテストパターン
タイミングとキャンセルの問題は非同期バグの隠れ場所です。良い非同期テストは時間を決定論的にし、結果のアサーションを簡単にします。ここも Kotlin Coroutines と RxJava の違いが感じられる領域ですが、どちらもきちんとテストできます。
コルーチン: runTest、TestDispatcher、時間制御
コルーチンコードには runTest とテストディスパッチャを使い、テストが実際のスレッドや遅延に依存しないようにします。仮想時間でタイムアウト、リトライ、デバウンスを実行できます。
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `emits Loading then Success`() = runTest {
val dispatcher = StandardTestDispatcher(testScheduler)
val repo = Repo(api = fakeApi, io = dispatcher)
val states = mutableListOf<UiState>()
val job = launch(dispatcher) { repo.loadProfile().toList(states) }
testScheduler.runCurrent() // run queued work
assert(states.first() is UiState.Loading)
testScheduler.advanceTimeBy(1_000) // trigger delay/retry windows
testScheduler.runCurrent()
assert(states.last() is UiState.Success)
job.cancel()
}
キャンセルをテストするには、収集している Job(または親スコープ)をキャンセルして、フェイク API が止まるか、これ以上状態が発行されないことを検証します。
RxJava: TestScheduler、TestObserver、決定論的な時間
Rx のテストは通常 TestScheduler と TestObserver を組み合わせます。
@Test
fun `disposes on cancel and stops emissions`() {
val scheduler = TestScheduler()
val observer = TestObserver<UiState>()
val d = repo.loadProfileRx(scheduler)
.subscribeWith(observer)
scheduler.triggerActions()
observer.assertValueAt(0) { it is UiState.Loading }
d.dispose()
scheduler.advanceTimeBy(1, TimeUnit.SECONDS)
observer.assertValueCount(1) // no more events after dispose
}
どちらのスタイルでもエラーパスのテストでは、例外型そのものよりもマッピング結果に注目してください。401、タイムアウト、破損したレスポンスの後に期待する UI 状態をアサートします。
少ないチェックでほとんどの回帰をカバーできます:
- ローディングと最終状態(Success、Empty、Error)
- キャンセルのクリーンアップ(Job がキャンセルされている、Disposable が破棄されている)
- エラーマッピング(サーバーコードからユーザーメッセージへ)
- リトライ後の重複発行がないこと
- 時間に依存するロジックは仮想時間でテストする
本番バグの原因となるよくある間違い
ほとんどの本番問題は、Kotlin Coroutines と RxJava の選択そのものではなく、作業が想定より長く動く、二重に動く、あるいは UI を間違ったタイミングで触る、といった習慣に起因します。
よくあるリークは間違ったスコープで作業を立ち上げることです。画面より長く生きるスコープからネットワーク呼び出しを開始したり、自分でスコープを作ってキャンセルし忘れると、リクエストがユーザー離脱後に完了して状態を更新してしまいます。コルーチンでは長寿命スコープをデフォルトで使うこと、RxJava では破棄し忘れが典型です。
もう一つのクラシックは「fire-and-forget」です。グローバルスコープや忘れられた Disposable は最初は問題なく見えますが、作業が積み重なるとネットワークやメモリを使い果たしてしまいます。あるチャット画面が resume ごとに更新を開始すると、数回のナビゲーションで複数の更新ジョブが並列に走り、メモリを保持しネットワークを競合させます。
リトライも間違いやすいです。無制限のリトライや遅延のないリトライはバックエンドをスパムしバッテリーを消耗します。特にログアウト後に 401 が返るような恒久的な失敗では危険です。リトライ条件を限定し、バックオフを入れ、回復不可能なエラーでは止めるようにしてください。
スレッドの間違いは再現が難しいクラッシュを引き起こします。JSON をメインスレッドでパースしたり、ディスパッチャやスケジューラの置き方次第でバックグラウンドから UI を更新してしまったりします。
簡単なチェックでこれらの多くを捕まえられます:
- 作業をライフサイクル所有者に縛り、その所有者が終わるとキャンセルする。
- クリーンアップを明示的に:Job はキャンセルし、Disposable は一箇所でクリアする。
- リトライに厳しい制限を課す(回数、遅延、対象エラー)。
- コードレビューで UI 更新はメインスレッドだけ、というルールを徹底する。
- バックグラウンド同期は単なる関数呼び出しではなく、制約のあるシステムとして扱う。
生成された Kotlin コード(例えば AppMaster を使って生成した場合)から Android アプリを出荷する場合でも、同じ落とし穴が適用されます。スコープ、キャンセル、リトライ制限、スレッドルールに関する明確な慣習が必要です。
Coroutines、RxJava、または両方を選ぶための簡単なチェックリスト
まず作業の形を見てください。ほとんどのネットワーキング呼び出しはワンショットですが、接続状態や認証状態、ライブアップデートのような継続的なシグナルもアプリにはあります。早い段階で間違った抽象を選ぶと、後でキャンセルやエラー経路が汚くなります。
チームに説明しやすい簡単な決め方:
- ワンタイムリクエスト(ログイン、プロフィール取得):
suspend関数を優先。 - 継続するストリーム(イベント、データベース更新):
Flowまたは Rx のObservableを優先。 - UI ライフサイクルでのキャンセル:
viewModelScopeやlifecycleScopeのコルーチンは手動で Disposable を管理するより簡単なことが多い。 - 高度なストリーム演算子やバックプレッシャーに強く依存する古いコードベース:RxJava の方が適している場合がある。
- 複雑なリトライやエラーマッピング:チームが読みやすく保てる方を選ぶ。
実用的なルール:1 画面が 1 リクエストをして 1 結果を描画するなら、コルーチンは通常、通常の関数呼び出しに近い形でコードを保てます。多くのイベントのパイプライン(入力、デバウンス、前のリクエストのキャンセル、フィルタ結合)を作るなら、RxJava や Flow の方が自然に感じることが多いです。
一貫性は完璧さより強い。機能ごとに 5 個の「ベスト」パターンがばらばらに使われるより、2 つの良いパターンが全体で使われている方が保守しやすいです。
例: ログイン、プロフィール取得、バックグラウンド同期
一般的な本番フローは:ユーザーがログインをタップし、認証エンドポイントを呼んでホーム画面用のプロフィールを取得し、最後にバックグラウンド同期を開始する、という流れです。ここで Kotlin Coroutines と RxJava の違いが日々の保守で感じられます。
コルーチン版(順次 + キャンセル可能)
コルーチンでは「これをやって、それからあれをやる」という形が自然です。ユーザーが画面を閉じるとスコープのキャンセルで進行中の作業が止まります。
suspend fun loginAndLoadProfile(): Result<Profile> = runCatching {
val token = api.login(email, password) // suspend
val profile = api.profile("Bearer $token")
syncManager.startSyncInBackground(token) // fire-and-forget
profile
}.recoverCatching { e ->
throw when (e) {
is HttpException -> when (e.code()) {
401 -> AuthExpiredException()
in 500..599 -> ServerDownException()
else -> e
}
is IOException -> NoNetworkException()
else -> e
}
}
// UI layer
val job = viewModelScope.launch { loginAndLoadProfile() }
override fun onCleared() { job.cancel() }
RxJava 版(チェーン + 破棄)
RxJava では同じフローがチェーンになります。キャンセルは通常 CompositeDisposable に追加して dispose() で行います。
val d = api.login(email, password)
.flatMap { token -> api.profile("Bearer $token").map { it to token } }
.doOnSuccess { (_, token) -> syncManager.startSyncInBackground(token) }
.onErrorResumeNext { e: Throwable ->
Single.error(
when (e) {
is HttpException -> if (e.code() == 401) AuthExpiredException() else e
is IOException -> NoNetworkException()
else -> e
}
)
}
.subscribe({ (profile, _) -> show(profile) }, { showError(it) })
compositeDisposable.add(d)
override fun onCleared() { compositeDisposable.clear() }
ここでの最小限のテストスイートは、成功、マッピングされた失敗(401、500 系、ネットワーク無し)、キャンセル/破棄の 3 つの結果をカバーすべきです。
次のステップ: 慣習を決めて一貫性を保つ
パターンが機能ごとにバラバラだとチームは混乱します。短い決定ノート(1 ページでも良い)を作ればレビュー時間を節約し、動作を予測しやすくなります。
まずワンショット(単発のネットワーク呼び出し)とストリーム(時間を通じて変化する更新)を明確に分け、デフォルトと例外を決めてください。
次に、ネットワークが不調になったときに全機能が同じように振る舞うような共有ヘルパーを用意します:
- エラー(HTTP コード、タイムアウト、オフライン)を UI が理解できるアプリレベルの失敗にマップする一箇所
- ネットワーク呼び出しのデフォルトタイムアウト値と、長い操作用のオーバーライド方法
- 安全にリトライできるもの(例:GET)を定義したリトライポリシー
- キャンセルルール:ユーザーが画面を離れたら何を止め、何を続けるか
- 機密データを漏らさないログルール
テストの慣習も同じくらい重要です。テストが実際の時間やスレッドに依存しないよう合意してください。コルーチンでは通常テストディスパッチャと構造化スコープを用い、RxJava では TestScheduler と明示的な破棄を使います。どちらでも高速で決定論的なテストを目指してください。
全体を早く進めたいなら、AppMaster (appmaster.io) のようにバックエンド API と Kotlin ベースのモバイルコードを生成して手間を減らす選択肢もあります。生成コードを使う場合でも、キャンセル、エラー、リトライ、テストに関する本番向けの慣習は同じように必要です。
よくある質問
デフォルトで、ログインやプロフィール取得のように一度だけ返るリクエストにはsuspendを使います。値が時間とともに変わる(WebSocketのメッセージ、接続状態、データベースの更新など)はFlow(あるいはRxのストリーム)を使ってください。
はい。ただし、HTTPクライアントがキャンセルに対応している場合に限ります。コルーチン自体はスコープがキャンセルされると停止しますが、基底のHTTP呼び出しがキャンセルに対応していないとリクエストはバックグラウンドで続く可能性があります。
viewModelScopeのようなライフサイクルに紐づくスコープに仕事を縛ることで、画面のロジックが終わると自動的にキャンセルされます。グローバルなスコープや無名のスコープで起動しないようにしてください。
コルーチンでは例外がthrowされ、try/catchで扱います。ストリームではエラーはチェーンを通して流れるので、エラーパスを明示的に保ち、失敗をデフォルト値に変えてしまう演算子は避けてください。
タイムアウトや500系、パースエラーなど予期しない障害は例外で扱います。UIが特定の応答(パスワード間違い、メール重複など)を必要とする場合は、文字列に頼らず型付けされたエラーとしてデータで表現してください。
呼び出し箇所の近くで適切なUIメッセージを出せるようにタイムアウトを設定します。コルーチンではsuspend呼び出しの近くでwithTimeoutを使い、RxJavaではストリーム上のtimeout演算子を使います。
リトライは安全な場合にだけ行ってください。通常はGETのような冪等リクエストに対しては緩やかに、サイドエフェクトのあるPOSTなどはリトライしない方が良いです。再試行回数を小さく(2〜3回)、遅延やジッターを入れてサーバーを叩きすぎないようにします。
コルーチンはDispatchersでスレッドを切り替えます。UIはDispatchers.Main、ネットワークやファイルI/OはDispatchers.IO、CPU処理はDispatchers.Defaultを使うのが基本です。RxJavaではsubscribeOnが上流の実行場所、observeOnが以降の観測場所を決めるので、画面描画直前に一度だけメインに切り替えるのが分かりやすいです。
できますが、境界は小さくしてキャンセルに注意してください。Rxをsuspendにラップするときは、コルーチンのキャンセルでRxの購読が破棄されるように実装します。逆にsuspendをRxに公開する場合は、限定的でドキュメント化されたブリッジにとどめてください。
仮想時間を使って、テストがスリープや実際のスレッドに依存しないようにします。コルーチンではrunTestとテスト用ディスパッチャで遅延やキャンセルを制御し、RxJavaではTestSchedulerとTestObserverを使ってdispose()後に発行が止まることを検証します。


