presentation ghadir rxjava jfs 20150709...

24
17.06.2015 1 Reactive Extensions mit RXJava Phillip Ghadir, innoQ 17.06.2015 © Copyright innoQ 2014 - 2015 Agenda Reactive Programming Reactive Extensions Higher-Order Functions am Beispiel Über Subjects Arten von Observables Back-Pressure 17.06.2015 © Copyright innoQ 2014 - 2015

Upload: others

Post on 21-May-2020

13 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

1

Reactive Extensions mit RXJava

Phillip Ghadir, innoQ

17.06.2015 © Copyright innoQ 2014 ­ 2015

Agenda

• Reactive Programming

• Reactive Extensions

• Higher­Order Functions am Beispiel

• Über Subjects

• Arten von Observables

• Back­Pressure

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 2: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

2

REACTIVE PROGRAMMING

… ist ein Datenfluss­zentrisches Programmiermodell

17.06.2015 © Copyright innoQ 2014 ­ 2015

http://mediaserver.boonty.com/gamesimages/1244_de_sc3.jpg

3D Rendering

&

Audio

FRAN http://conal.net/papers/icfp97/

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 3: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

3

http://www.microsoft.com/hardware/en­ie/p/wheel­mouse­optical

17.06.2015 © Copyright innoQ 2014 ­ 2015

Über die Zeit veränderliche Werte

http://www.microsoft.com/hardware/en­ie/p/wheel­mouse­optical

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 4: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

4

REACTIVE EXTENSIONS

… objektorientiertes Reactive Programming

17.06.2015 © Copyright innoQ 2014 ­ 2015

Reactive Extensions (RX) sind Bibliotheken zur und

Komposition von Systemen durch beobachtbare Sequenzen und

frei nach rx.codeplex.com

.NET

CPP

Java

JavaScript

Python

Ruby

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 5: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

5

17.06.2015 © Copyright innoQ 2014 ­ 2015

Iterator<> Observable<>

Verarbeitung next() onNext()

Fehlerbehandlung throw onError()

Abschluss return onCompleted()

Dualität von Pull und Push

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 6: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

6

17.06.2015 © Copyright innoQ 2014 ­ 2015

mit RXJava

mit meiner Tabellenkalkulation

BehaviorSubject<Integer> a1 = BehaviorSubject.create(0);

BehaviorSubject<Integer> a2 = BehaviorSubject.create(0);

Observable a3 = Observable.combineLatest( a1, a2, (a,b) -> a+b );

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 7: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

7

Signatur von RX.Observer

17.06.2015 © Copyright innoQ 2014 ­ 2015

Observer registrieren

anObservable.subscribe( )

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 8: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

8

anObservable.subscribe(                      );

RX API kennt auch Abkürzungen!

ns::process

17.06.2015 © Copyright innoQ 2014 ­ 2015

RX.Observer Aufruf­Kontrakt

(onNext)* (OnCompleted | OnError)?

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 9: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

9

Darstellung eines Event­Streams

17.06.2015 © Copyright innoQ 2014 ­ 2015

Geht von 

sequentiellen 

Aufrufen aus!

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 10: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

10

unsubscribe() unterbricht die Aufruf­Kette

17.06.2015 © Copyright innoQ 2014 ­ 2015

INTS

… ein kleines Beispiel

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 11: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

11

Observables als Monade

Observable<Integer> ints = Observable

.range(1, 1000)

.filter(x -> x > 10)

.buffer(2)

.map( Bumm2::calcSum )

.map( Bumm2::idOrExcModulo31 )

.onErrorReturn(t -> 6480);

ints.subscribe(System.out::println);

17.06.2015 © Copyright innoQ 2014 ­ 2015

filter

Observable<Integer> ints = Observable

.range(1, 1000)

.filter(x -> x > 10)

.buffer(2)

.map( Bumm2::calcSum )

.onErrorReturn(t -> 6480);

ints.subscribe(System.out::println);

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 12: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

12

buffer

Observable<Integer> ints = Observable

.range(1, 1000)

.filter(x -> x > 10)

.buffer(2)

.map( Bumm2::calcSum )

.map( Bumm2::idOrExcModulo31 )

.onErrorReturn(t -> 6480);

17.06.2015 © Copyright innoQ 2014 ­ 2015

Observable<Integer> ints = Observable

.range(1, 1000)

.filter(x -> x > 10)

.buffer(2)

.map( Bumm2::calcSum )

.onErrorReturn(t -> 6480);

map

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 13: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

13

Observable<Integer> ints = Observable

.range(1, 1000)

.filter(x -> x > 10)

.buffer(2)

.map( Bumm2::calcSum )

.onErrorReturn(t -> 6480);

onErrorReturn

17.06.2015 © Copyright innoQ 2014 ­ 2015

ints liefert...

23 27 6480

Observable<Integer> ints = Observable

.range(1, 1000)

.filter(x -> x > 10)

.buffer(2)

.map( Bumm2::calcSum )

.map( Bumm2::idOrExcModulo31 )

.onErrorReturn(t -> 6480);

ints.subscribe(System.out::println);

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 14: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

14

SUBJECT

Observer und Observable in einem

Die folgenden Marble­Diagramme stammen aus der RXJava ­ JavaDoc

17.06.2015 © Copyright innoQ 2014 ­ 2015

4 Arten Subject

• AsyncSubject

• BehaviorSubject

• PublishSubject

• ReplaySubject

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 15: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

15

AsyncSubject

17.06.2015 © Copyright innoQ 2014 ­ 2015

AsyncSubject

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 16: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

16

BehaviorSubject

17.06.2015 © Copyright innoQ 2014 ­ 2015

BehaviorSubject

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 17: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

17

PublishSubject

17.06.2015 © Copyright innoQ 2014 ­ 2015

ReplaySubject

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 18: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

18

ARTEN VON OBSERVABLES

„cold“, „hot“ und „multicasted“

17.06.2015 © Copyright innoQ 2014 ­ 2015

Cold & Hot Observables

Cold Observables

• „Die Sequenz der Events ist

konsistent „gleich“,

unabhängig vom Zeitpunkt

zu dem sich ein Observer

subscribed.“

• Event­Stream vorhersehbar

Hot Observables

• Events des Observable

kommen unabhängig davon,

ob Observer registriert sind.

• „Observer erhalten

tendenziell nur einen Teil

der Events.“

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 19: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

19

Mögliche Charakterisierung

Cold

• Zähler

• File­Reader­Observables

• Wrapper um Ressourcen mit Pull­Zugriff ???

Hot

Wrapper um andere Push­Event­Quellen wie

• Mouse­Events

• DB­Trigger

• File­System­Änderungen

17.06.2015 © Copyright innoQ 2014 ­ 2015

ZU VIEL DRUCK?

Die folgenden Diagramme stammen von https://github.com/ReactiveX/RxJava/wiki/Backpressure17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 20: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

20

Strategien gegen zu schnelle

Observables

• Throttling

• Puffern

• Bei Aufruf im selben Thread ... den Aufruf blockieren

• Subscriber pullt unter der Haube das Observable

17.06.2015 © Copyright innoQ 2014 ­ 2015

Throttling (first, last)

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 21: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

21

Throttling (mit Timeout)

17.06.2015 © Copyright innoQ 2014 ­ 2015

Puffern

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 22: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

22

ZUSAMMENFASSUNG

Und etwas zum Mitnehmen

17.06.2015 © Copyright innoQ 2014 ­ 2015

System.out.println( "$");

BehaviorSubject<Integer> a1 = BehaviorSubject.create(0);

BehaviorSubject<Integer> a2 = BehaviorSubject.create(0);

Observable a3 =

Observable.combineLatest( a1, a2, (a,b) -> a+b );

a3.subscribe(x -> System.out.println( "a3 hat den Wert: " + x));

System.out.println( "$ setze a1 auf 3" );

a1.onNext( 3 );

System.out.println( "$ setze a2 auf 4" );

a2.onNext( 4 ); $

a3 hat den Wert: 0

$ setze a1 auf 3

a3 hat den Wert: 3

$ setze a2 auf 4

a3 hat den Wert: 7

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 23: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

23

Zum Mitnehmen

• Iterativ und Reaktiv sind dual

• „Behandle alles wie asynchrone Event­Streams“

• Essenzieller RX­Kontrakt:

(onNext)* (OnCompleted | OnError)?

• Push macht manches einfacher, aber nicht alles!

17.06.2015 © Copyright innoQ 2014 ­ 2015

Vielen Dank!

E [email protected]

W http://innoq.com

T +49 2173 33 66 0

F +49 2173 33 66 222

Noch Fragen

17.06.2015 © Copyright innoQ 2014 ­ 2015

Page 24: presentation ghadir rxjava jfs 20150709 2pages2015.java-forum-stuttgart.de/_data/F7_Ghadir.pdf17.06.2015 11 Observables als Monade Observable ints = Observable.range(1,

17.06.2015

24

Referenzen

• RX Homehttp://rx.codeplex.com

• RX Design Guidelines http://blogs.msdn.com/b/rxteam/archive/2010/10/28/rx­design­guidelines.aspx

• RX­Beispielehttp://rxwiki.wikidot.com/101samples 

• RXJavahttps://github.com/ReactiveX/RxJava

17.06.2015 © Copyright innoQ 2014 ­ 2015