淺談Python響應式類庫RxPy_第1頁
淺談Python響應式類庫RxPy_第2頁
淺談Python響應式類庫RxPy_第3頁
淺談Python響應式類庫RxPy_第4頁
淺談Python響應式類庫RxPy_第5頁
已閱讀5頁,還剩3頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權,請進行舉報或認領

文檔簡介

第淺談Python響應式類庫RxPy目錄一、基本概念1.1、Observable和Observer(可觀察對象和觀察者)1.2、Operator(操作符)1.3、Single(單例)1.4、Subject(主體)1.5、Scheduler(調(diào)度器)1.6、Observer和Observable1.7、操作符1.8、創(chuàng)建型操作符1.9、過濾型操作符1.10、轉換型操作符1.11、算術操作符1.12、Subject1.13、ReplaySubject1.14、BehaviorSubject1.15、AsyncSubject1.16、Scheduler二、應用場景2.1、防止重復發(fā)送2.2、操作數(shù)據(jù)流

一、基本概念

ReactiveX中有幾個核心的概念,先來簡單介紹一下。

1.1、Observable和Observer(可觀察對象和觀察者)

首先是Observable和Observer,它們分別是可觀察對象和觀察者。Observable可以理解為一個異步的數(shù)據(jù)源,會發(fā)送一系列的值。Observer則類似于消費者,需要先訂閱Observable,然后才可以接收到其發(fā)射的值??梢哉f這組概念是設計模式中的觀察者模式和生產(chǎn)者-消費者模式的綜合體。

1.2、Operator(操作符)

另外一個非常重要的概念就是操作符了。操作符作用于Observable的數(shù)據(jù)流上,可以對其施加各種各樣的操作。更重要的是,操作符還可以鏈式組合起來。這樣的鏈式函數(shù)調(diào)用不僅將數(shù)據(jù)和操作分隔開來,而且代碼更加清晰可讀。一旦熟練掌握之后,你就會愛上這種感覺的。

1.3、Single(單例)

在RxJava和其變體中,還有一個比較特殊的概念叫做Single,它是一種只會發(fā)射同一個值的Observable,說白了就是單例。當然如果你對Java等語言比較熟悉,那么單例想必也很熟悉。

1.4、Subject(主體)

主體這個概念非常特殊,它既是Observable又是Observer。正是因為這個特點,所以Subject可以訂閱其他Observable,也可以將發(fā)射對象給其他Observer。在某些場景中,Subject會有很大的作用。

1.5、Scheduler(調(diào)度器)

默認情況下ReactiveX只運行在當前線程下,但是如果有需要的話,也可以用調(diào)度器來讓ReactiveX運行在多線程環(huán)境下。有很多調(diào)度器和對應的操作符,可以處理多線程場景下的各種要求。

1.6、Observer和Observable

先來看看一個最簡單的例子,運行的結果會依次打印這些數(shù)字。這里的of是一個操作符,可以根據(jù)給定的參數(shù)創(chuàng)建一個新的Observable。創(chuàng)建之后,就可以訂閱Observable,三個回調(diào)方法在對應的時機執(zhí)行。一旦Observer訂閱了Observable,就會接收到后續(xù)Observable發(fā)射的各項值。

fromrximportof

ob=of(1,2,34,5,6,7,7)

ob.subscribe(

on_next=lambdai:print(f'Received:{i}'),

on_error=lambdae:print(f'Error:{e}'),

on_completed=lambda:print('Completed')

)

這個例子看起來好像很簡單,并且看起來沒什么用。但是當你了解了Rx的一些核心概念,就會理解到這是一個多么強大的工具。更重要的是,Observable生成數(shù)據(jù)和訂閱的過程是異步的,如果你熟悉的話,就可以利用這個特性做很多事情。

1.7、操作符

在RxPy中另一個非常重要的概念就是操作符了,甚至可以說操作符就是最重要的一個概念了。幾乎所有的功能都可以通過組合各個操作符來實現(xiàn)。熟練掌握操作符就是學好RxPy的關鍵了。操作符之間也可以用pipe函數(shù)連接起來,構成復雜的操作鏈。

fromrximportof,operatorsasop

importrx

ob=of(1,2,34,5,6,7,7)

ob.pipe(

op.map(lambdai:i**2),

op.filter(lambdai:i=10)

).subscribe(lambdai:print(f'Received:{i}'))

在RxPy中有大量操作符,可以完成各種各樣的功能。我們來簡單看看其中一些常用的操作符。如果你熟悉Java8的流類庫或者其他函數(shù)式編程類庫的話,應該對這些操作符感到非常親切。

1.8、創(chuàng)建型操作符

首先是創(chuàng)建Observable的操作符,列舉了一些比較常用的創(chuàng)建型操作符。

1.9、過濾型操作符

過濾型操作符的主要作用是對Observable進行篩選和過濾。

1.10、轉換型操作符

1.11、算術操作符

1.12、Subject

Subject是一種特殊的對象,它既是Observer又是Observable。不過這個對象一般不太常用,但是假如某些用途還是很有用的。所以還是要介紹一下。下面的代碼,因為訂閱的時候第一個值已經(jīng)發(fā)射出去了,所以只會打印訂閱之后才發(fā)射的值。

fromrx.subjectimportSubject,AsyncSubject,BehaviorSubject,ReplaySubject

#Subject同時是Observer和Observable

print('--------Subject---------')

subject=Subject()

subject.on_next(1)

subject.subscribe(lambdai:print(i))

subject.on_next(2)

subject.on_next(3)

subject.on_next(4)

subject.on_completed()

#234

另外還有幾個特殊的Subject,下面來介紹一下。

1.13、ReplaySubject

ReplaySubject是一個特殊的Subject,它會記錄所有發(fā)射過的值,不論什么時候訂閱的。所以它可以用來當做緩存來使用。ReplaySubject還可以接受一個bufferSize參數(shù),指定可以緩存的最近數(shù)據(jù)數(shù),默認情況下是全部。

下面的代碼和上面的代碼幾乎完全一樣,但是因為使用了ReplaySubject,所以所有的值都會被打印。當然大家也可以試試把訂閱語句放到其他位置,看看輸出是否會產(chǎn)生變化。

#ReplaySubject會緩存所有值,如果指定參數(shù)的話只會緩存最近的幾個值

print('--------ReplaySubject---------')

subject=ReplaySubject()

subject.on_next(1)

subject.subscribe(lambdai:print(i))

subject.on_next(2)

subject.on_next(3)

subject.on_next(4)

subject.on_completed()

#1234

1.14、BehaviorSubject

BehaviorSubject是一個特殊的Subject,它只會記錄最近一次發(fā)射的值。而且在創(chuàng)建它的時候,必須指定一個初始值,所有訂閱它的對象都可以接收到這個初始值。當然如果訂閱的晚了,這個初始值同樣會被后面發(fā)射的值覆蓋,這一點要注意。

#BehaviorSubject會緩存上次發(fā)射的值,除非Observable已經(jīng)關閉

print('--------BehaviorSubject---------')

subject=BehaviorSubject(0)

subject.on_next(1)

subject.on_next(2)

subject.subscribe(lambdai:print(i))

subject.on_next(3)

subject.on_next(4)

subject.on_completed()

#234

1.15、AsyncSubject

AsyncSubject是一個特殊的Subject,顧名思義它是一個異步的Subject,它只會在Observer完成的時候發(fā)射數(shù)據(jù),而且只會發(fā)射最后一個數(shù)據(jù)。因此下面的代碼僅僅會輸出4.假如注釋掉最后一行co_completed調(diào)用,那么什么也不會輸出。

#AsyncSubject會緩存上次發(fā)射的值,而且僅會在Observable關閉后開始發(fā)射

print('--------AsyncSubject---------')

subject=AsyncSubject()

subject.on_next(1)

subject.on_next(2)

subject.subscribe(lambdai:print(i))

subject.on_next(3)

subject.on_next(4)

subject.on_completed()

#4

1.16、Scheduler

雖然RxPy算是異步的框架,但是其實它默認還是運行在單個線程之上的,因此如果使用了某些會阻礙線程運行的操作,那么程序就會卡死。當然針對這些情況,我們就可以使用其他的Scheduler來調(diào)度任務,保證程序能夠高效運行。

下面的例子創(chuàng)建了一個ThreadPoolScheduler,它是

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論