rxjava meetup presentation

Post on 09-Jan-2017

86 Views

Category:

Technology

1 Downloads

Preview:

Click to see full reader

TRANSCRIPT

Guillaume Valverde Software Developer guillaume.valverde@futurice.com @drevlav

A year with RXJavaSeptember / 2016

Kick Application

Observable / Observer Pattern

Observable : emits event.

Observer : listens to events and acts accordingly.

Observable: emits data called a stream

Emits 0, 1, x or an infinite amount of items

Observable: emits data called a stream

Emits 0, 1, x or an infinite amount of items

Observable: emits data called a stream

Emits 0, 1, x or an infinite amount of items

Observable: emits data called a stream

Emits 0, 1, x or an infinite amount of items

Observable: emits data called a stream

Emits 0, 1, x or an infinite amount of items

Complete

Observable: emits data called a stream

Emits 0, 1, x or an infinite amount of items

Complete

Observable: emits data called a stream

Emits 0, 1, x or an infinite amount of items

Send an error

Observable: emits data called a stream

Emits 0, 1, x or an infinite amount of items

Send an error

OBSERVABLE CONTRACT

0 to N events

nothing | error | complete

Observable exemples:• Mouse clicks• Button Clicks• Network request • Network State• Content Provider

update• list, array• UI State

Everything is an event

Observable.just(“rxjava”)

“rxjava”

Observable.just(“rxjava”) .subscribe();

Observable.from(…)

1 2 3

Integer[] ids = {1, 2, 3};Observable.from(ids) .subscribe();

Using Retrofit

Network Request

Response

RxView.clicks(View view)

first click

RxView.clicks(sendButton) .subscribe(click -> { do something… })

Second click

Third click

who is listening?

who is listening?The subscriber

who is listening?The subscriber

who is listening?The subscriber

Integer[] ids = {1, 2, 3};Observable.from(ids) .subscribe();

who is listening?The subscriber

Integer[] ids = {1, 2, 3};Observable.from(ids) .subscribe();

Subscriber: Observer & subscription

public abstract class Subscriber<T> implements Observer<T>, Subscription {

}

Subscriber : Observer & Subscription

public interface Observer<T> { void onCompleted(); void onError(Throwable e); void onNext(T t);}

public interface Subscription { void unsubscribe(); boolean isUnsubscribed();}

Observable / Subscriber (Observer & Subscription)

String[] ids = {"1", "2", "3"};

Subscription subscription = Observable.from(ids) .subscribe(id -> Log.v(TAG, id), error -> Log.e(TAG, "error: " + error.getMessage()), () -> Log.v(TAG, "complete"));

Stream on steroids operators

Composition

Map

RXJava is Awesome

.map(word -> word.length())

6 2 7

String[] ids = {“RXJava”, “is”, “Awesome”};Observable.from(ids)

FlatMap1 2 3

FlatMap1 2 3

.Flatmap ( )xxx

FlatMap1 2 3

.Flatmap ( )xxx

11

FlatMap1 2 3

.Flatmap ( )xxx

11

22

FlatMap1 2 3

.Flatmap ( )xxx

11

22

33

FlatMap1 2 3

.Flatmap ( )xxx

11

22

33

.flatten

FlatMap1 2 3

.Flatmap ( )xxx

11

22

33

.flatten

FlatMap1 2 3

.Flatmap ( )xxx

11

22

33

.flatten

1

FlatMap1 2 3

.Flatmap ( )xxx

11

22

33

.flatten

1 2

FlatMap1 2 3

.Flatmap ( )xxx

11

22

33

.flatten

1 2 2

FlatMap1 2 3

.Flatmap ( )xxx

11

22

33

.flatten

1 2 2 1

FlatMap1 2 3

.Flatmap ( )xxx

11

22

33

.flatten

1 2 2 1 3

FlatMap1 2 3

.Flatmap ( )xxx

11

22

33

.flatten

1 2 2 1 3 3

FlatMap1 2 3

.Flatmap ( )xxx

11

22

33

.flatten

1 2 2 1 3 3

id1 id2 id3

.flatMap(id -> fetchArticle())

Article1 Article2 Article3

FlatMapList ids … Observable.from(ids)

Compose withObservables

Retrieve a list of repositories

• more than 3 letters• cancel previous search when new one• make a search when the user is done typing

Compose with ObservablesRxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString) .switchMap(searchString -> networkManager.search(searchString)) .observeOn(AndroidSchedulers.mainThread()) .subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));

Type RxJava

Type RxJava

RxTextView.textChanges(editText)

Type RxJava

R Rx RxJ RxJa RxJav RxJava

RxTextView.textChanges(editText)

Type RxJava

R Rx RxJ RxJa RxJav RxJava

RxTextView.textChanges(editText)

.observeOn(Schedulers.io())

Type RxJava

R Rx RxJ RxJa RxJav RxJava

R Rx RxJ RxJa RxJav RxJava

RxTextView.textChanges(editText)

.observeOn(Schedulers.io())

Type RxJava

R Rx RxJ RxJa RxJav RxJava

R Rx RxJ RxJa RxJav RxJava

RxTextView.textChanges(editText)

.observeOn(Schedulers.io())

.debounce(500, TimeUnit.MILLISECONDS)

Type RxJava

R Rx RxJ RxJa RxJav RxJava

R Rx RxJ RxJa RxJav RxJava

RxTextView.textChanges(editText)

.observeOn(Schedulers.io())

.debounce(500, TimeUnit.MILLISECONDS)

RxJ RxJavaR

Type RxJava

R Rx RxJ RxJa RxJav RxJava

R Rx RxJ RxJa RxJav RxJava

RxTextView.textChanges(editText)

.observeOn(Schedulers.io())

.debounce(500, TimeUnit.MILLISECONDS)

.filter(charSequence -> charSequence.length() >= 3)

RxJ RxJavaR

Type RxJava

R Rx RxJ RxJa RxJav RxJava

R Rx RxJ RxJa RxJav RxJava

RxJ RxJava

RxTextView.textChanges(editText)

.observeOn(Schedulers.io())

.debounce(500, TimeUnit.MILLISECONDS)

.filter(charSequence -> charSequence.length() >= 3)

RxJ RxJavaR

RxJ RxJava

RxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString)

RxJ RxJava

RxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString)

.switchMap(searchString -> networkManager.search(searchString))

RxJ RxJava

RxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString)

.switchMap(searchString -> networkManager.search(searchString))

List<Repo>

RxJ RxJava

RxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString)

.switchMap(searchString -> networkManager.search(searchString)) List<Repo>

List<Repo>

RxJ RxJava

RxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString)

.switchMap(searchString -> networkManager.search(searchString)) List<Repo>

List<Repo>

List<Repo>

RxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString) .switchMap(searchString -> networkManager.search(searchString))

List<Repo>

RxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString) .switchMap(searchString -> networkManager.search(searchString))

List<Repo>

.observeOn(AndroidSchedulers.mainThread())

RxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString) .switchMap(searchString -> networkManager.search(searchString))

List<Repo>

.observeOn(AndroidSchedulers.mainThread())

List<Repo>

RxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString) .switchMap(searchString -> networkManager.search(searchString))

List<Repo>

.observeOn(AndroidSchedulers.mainThread())

List<Repo>

.subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));

R Rx RxJ RxJa RxJav RxJava

R Rx RxJ RxJa RxJav RxJava

RxJ RxJava

RxJ RxJava

List<Repo>

List<Repo>

R

RXjava is synchronous by default

RXjava is synchronous by default

for(String word: sentence) { Log.v(TAG, word);}

RXjava is synchronous by default

Same as

for(String word: sentence) { Log.v(TAG, word);}

RXjava is synchronous by default

Same as

for(String word: sentence) { Log.v(TAG, word);}

Observable.from(sentence) .subscribe(word -> Log.v(TAG, word));

THREADING

Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));

THREADING

Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));

THREADING

Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));

ObserveOn / SubscribeOn

Observable.from(ids)

.flatMap(id -> fetchArticle())

.subscribeOn(… newThread)

.observeOn(… mainThread)

.subscribe(…)

Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));

ObserveOn / SubscribeOn

Observable.from(ids)

.flatMap(id -> fetchArticle())

.subscribeOn(… newThread)

.observeOn(… mainThread)

.subscribe(…) CurrentThread

Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));

ObserveOn / SubscribeOn

Observable.from(ids)

.flatMap(id -> fetchArticle())

.subscribeOn(… newThread)

.observeOn(… mainThread)

.subscribe(…)

Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));

ObserveOn / SubscribeOn

Observable.from(ids)

.flatMap(id -> fetchArticle())

.subscribeOn(… newThread)

.observeOn(… mainThread)

.subscribe(…)

ChangeThread,New Thread

Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));

ObserveOn / SubscribeOn

Observable.from(ids)

.flatMap(id -> fetchArticle())

.subscribeOn(… newThread)

.observeOn(… mainThread)

.subscribe(…)

Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));

ObserveOn / SubscribeOn

Observable.from(ids)

.flatMap(id -> fetchArticle())

.subscribeOn(… newThread)

.observeOn(… mainThread)

.subscribe(…)

emit ids

Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));

ObserveOn / SubscribeOn

Observable.from(ids)

.flatMap(id -> fetchArticle())

.subscribeOn(… newThread)

.observeOn(… mainThread)

.subscribe(…)

emit an Article

Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));

ObserveOn / SubscribeOn

Observable.from(ids)

.flatMap(id -> fetchArticle())

.subscribeOn(… newThread)

.observeOn(… mainThread)

.subscribe(…)

emit an Article

Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));

ObserveOn / SubscribeOn

Observable.from(ids)

.flatMap(id -> fetchArticle())

.subscribeOn(… newThread)

.observeOn(… mainThread)

.subscribe(…) emit an Article

Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));

ObserveOn / SubscribeOn

Observable.from(ids)

.flatMap(id -> fetchArticle())

.subscribeOn(… newThread)

.observeOn(… mainThread)

.subscribe(…) act on Article

Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));

THREADING SubscribeOn / ObserveOn

• Only 1 SubscribeOn effective.

• Multiple ObserveOn can work.

Organise your codeArchitecture / View Model / Separation of concern

business LogicReturn Observable

Input :Source Observable

.subscribe()

Unittesting

Integrationtesting

Acceptance / smoketesting

ViewView Model

Architecture

Hold the business logicFor the View

display uiaccording to state of the data

Data Layer

Store

Store

Binding

Hold the logic for dataemits Pojos

ViewView Model

Architecture

Hold the business logicFor the View

display uiaccording to state of the data

Data Layer

Store

Store

Binding

Hold the logic for dataemits Pojos

ViewView Model

Architecture

Hold the business logicFor the View

display uiaccording to state of the data

Data Layer

Store

Store

Binding

Hold the logic for dataemits Pojos

ViewView Model

Architecture

Hold the business logicFor the View

display uiaccording to state of the data

Data Layer

Store

Store

Binding

Hold the logic for dataemits Pojos

ViewView Model

Architecture

Hold the business logicFor the View

display uiaccording to state of the data

Data Layer

Store

Store

Binding

Hold the logic for dataemits Pojos

ViewView Model

Architecture

Hold the business logicFor the View

display uiaccording to state of the data

Data Layer

Store

Store

Binding

Hold the logic for dataemits Pojos

updatetrigger

ViewView Model

Architecture

Hold the business logicFor the View

display uiaccording to state of the data

Data Layer

Store

Store

Binding

Hold the logic for dataemits Pojos

ViewView Model

Architecture

Hold the business logicFor the View

display uiaccording to state of the data

Data Layer

Store

Store

Binding

Hold the logic for dataemits Pojos

ViewView Model

Architecture

Hold the business logicFor the View

display uiaccording to state of the data

Data Layer

Store

Store

Binding

Hold the logic for dataemits Pojos

.observeOn(Schedulers.io())

.debounce(500, TimeUnit.MILLISECONDS)

.filter(charSequence -> charSequence.length() > 3)

.map(CharSequence::toString)

.switchMap(word -> dataLayer.fetchListRepo(word))

.observeOn(AndroidSchedulers.mainThread())

RxTextView.textChanges(editText)

.subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));

.observeOn(Schedulers.io())

.debounce(500, TimeUnit.MILLISECONDS)

.filter(charSequence -> charSequence.length() > 3)

.map(CharSequence::toString)

.switchMap(word -> dataLayer.fetchListRepo(word))

.observeOn(AndroidSchedulers.mainThread())

Input :Source Observable

RxTextView.textChanges(editText)

.subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));

dataLayer

UIView

dependencies:

.observeOn(Schedulers.io())

.debounce(500, TimeUnit.MILLISECONDS)

.filter(charSequence -> charSequence.length() > 3)

.map(CharSequence::toString)

.switchMap(word -> dataLayer.fetchListRepo(word))

.observeOn(AndroidSchedulers.mainThread())

ViewModel

Input :Source Observable

RxTextView.textChanges(editText)

.subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));

dataLayer

UIView

dependencies:

.observeOn(Schedulers.io())

.debounce(500, TimeUnit.MILLISECONDS)

.filter(charSequence -> charSequence.length() > 3)

.map(CharSequence::toString)

.switchMap(word -> dataLayer.fetchListRepo(word))

.observeOn(AndroidSchedulers.mainThread())

ViewModel

Input :Source Observable

RxTextView.textChanges(editText)

.subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));

dataLayer

UIView

dependencies:

.observeOn(Schedulers.io())

.debounce(500, TimeUnit.MILLISECONDS)

.filter(charSequence -> charSequence.length() > 3)

.map(CharSequence::toString)

.switchMap(word -> dataLayer.fetchListRepo(word))

.observeOn(AndroidSchedulers.mainThread())

ViewModel

Input :Source Observable

.subscribe(…)

RxTextView.textChanges(editText)

.subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));

dataLayer

UIView

dependencies:

.observeOn(Schedulers.io())

.debounce(500, TimeUnit.MILLISECONDS)

.filter(charSequence -> charSequence.length() > 3)

.map(CharSequence::toString)

.switchMap(word -> dataLayer.fetchListRepo(word))

.observeOn(AndroidSchedulers.mainThread())

ViewModel

Input :Source Observable

.subscribe(…)

RxTextView.textChanges(editText)

.subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));

dataLayer

UIView

dependencies:

.observeOn(Schedulers.io())

.debounce(500, TimeUnit.MILLISECONDS)

.filter(charSequence -> charSequence.length() > 3)

.map(CharSequence::toString)

.switchMap(word -> dataLayer.fetchListRepo(word))

.observeOn(AndroidSchedulers.mainThread())

ViewModel

Input :Source Observable

.subscribe(…)

RxTextView.textChanges(editText)

.subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));

dataLayer

UIView

dependencies:

.observeOn(Schedulers.io())

.debounce(500, TimeUnit.MILLISECONDS)

.filter(charSequence -> charSequence.length() > 3)

.map(CharSequence::toString)

.switchMap(word -> dataLayer.fetchListRepo(word))

.observeOn(AndroidSchedulers.mainThread())

ViewModel

Input :Source Observable

.subscribe(…)

RxTextView.textChanges(editText)

.subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));

dataLayer

UIView

dependencies:

ViewView Model

Architecture

Hold the business logicFor the View

display uiaccording to state of the data

Data Layer

Store

Store

Binding

Hold the logic for dataemits Pojos

public class ViewModel { private Observable<CharSequence> charSequenceObservable; private DataLayer dataLayer; ViewModel(Observable<CharSequence> charSequenceObservable, DataLayer dataLayer) { this.charSequenceObservable = charSequenceObservable; this.dataLayer = dataLayer; } Observable<List<GitHubRepository>> getListObservable() { return charSequenceObservable .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString) .switchMap(word -> dataLayer.search(word)); }}

Thank you

top related