Einführung in RxJS

Artikel als PDF herunterladen:
Download Einführung in RxJS

Den ersten Kontakt mit den sogenannten Reactive Extensions for JavaScript (kurz RxJS) habe ich bei dem Umstieg von Angular.js auf Angular 2 gemacht. Wurden in Angular.js asynchrone Prozesse noch mit Promises abgebildet, so hat man sich in Angular 2 für Observables von RxJS entschieden. Es war also an der Zeit, mal einen genaueren Blick auf dieses Framework zu werfen. Ziel dieses Artikels ist es, dass der Leser die grundlegenden Konzepte von RxJS kennen lernt und eine solide Wissensbasis erlangt, um seine ersten Schritte mit RxJS zu wagen.

Observables vs. Promises

Promises und Observables sind eine Art Platzhalter für noch nicht bekannte Ergebnisse. Ein einfaches Beispiel stellt die Http-Schnittstelle dar. Sendet der Client eine Http-Anfrage, so ist das Ergebnis dieser Anfrage erst nach der Antwort des Servers bekannt. Der Aufrufer erhält von der Schnittstelle einen solchen Platzhalter und kann auf dessen Ergebnis lauschen und unterdessen beispielsweise eine Ladeanimation anzeigen.

Was aber ist an Observables anders als an Promises? Der entscheidende Unterschied besteht darin, dass Observables über die Möglichkeit verfügen, abgebrochen werden zu können und mehr als nur ein Ergebnis zu verarbeiten. Dies kann beispielsweise bei der Eventbehandlung in einem Client-Framework wie Angular sehr von Vorteil sein.

Einrichtung

Bevor man mit RxJS loslegen kann, muss es zunächst einmal eingerichtet werden. In diesem Artikel verwende ich npm, um die Abhängigkeiten zu verwalten. Also wird auf der Kommandozeile der folgende Befehl ausgeführt, um RxJS in der neusten Version in die package.json aufzunehmen.

npm install rxjs --save

Da ich in diesem Artikel TypeScript einsetze, benötigen wir außerdem noch den TypeScript-Compiler. Diesen installieren wir mit dem folgenden Aufruf global auf unserer Entwicklungsmaschine.

npm install typescript -g

Mit Hilfe des folgenden Code-Beispiels finden wir heraus, ob die Installation erfolgreich war:

import {Subject} from 'rxjs/Rx';

const subject = new Subject<string>(); 
subject.subscribe((value) => console.log(`hello ${value}`));

subject.next('RxJs');

Um diesen TypeScript-Code ausführen zu können, muss er zuerst mithilfe des TypeScript-Compilers in JavaScript kompiliert werden.

tsc hello-rxjs.ts

Nach Ausführung des obigen Befehls sollte im Dateisystem eine Datei hello-rxjs.js entstehen. Diese Datei kann nun mithilfe von Node.js ausgeführt werden. Es sollte eine Ausgabe „hello RxJS“ auf der Kommandozeile erscheinen.

node hello-rxjs.js

In diesem kurzen Beispiel ist bereits ein erster Fallstrick versteckt. Es gibt zwei Varianten wie RxJS importiert werden kann. Innerhalb dieses Artikels verwende ich ausschließlich die einfachere Variante: das „Komplettpaket“.

import {Subject, Observable} from 'rxjs/Rx';

Die Alternative bindet nur die verwendeten Komponenten in die Anwendung ein. Dies empfiehlt sich vor allem dann, wenn es auf die Auslieferungsgröße ankommt. In dieser Variante müssen auch alle verwendeten Operatoren (dazu später mehr) einzeln importiert werden.

import {Subject} from 'rxjs/Subject'; 
import {Observable} from 'rxjs/Observable';

Observables

Das Herz von RxJS bilden die Observables. Ein Observable ist die Repräsentation einer beliebigen Menge von Werten, die über eine beliebige Zeitdauer verteilt sein könnnen. Mit einer Subscription kann man diese Werte beobachten. „Werte“ sind oftmals „Ereignisse“ (z. B. ein Maus-Klick oder eine Navigation), können aber auch Werte (1, 2, 3, „A“, „B“,“C“) im engeren Sinn sein.

Im einführenden Beispiel wurde bereits ein solches Observable gezeigt. Die dort verwendete Klasse Subject implementiert das Interface Observable und erweitert dieses um zusätzliche Methoden (z. B. next) zur Auslösung des Observables. Aufgrund dieser Möglichkeit eignet sich Subject zur einfachen und nachvollziehbaren Demonstration der Funktionsweise von RxJS.

Die Klasse Observable hat die Methode subscribe, mit deren Hilfe man eine Subscription erhält. Observables haben immer einen Zustand. Solange wie keine Subscription für ein Observable aktiv ist, hat es den Zustand „cold“. In diesem Zustand interessiert sich niemand für die Werte des Observables, also werden diese auch gar nicht verarbeitet. Mit dem Aufruf von subscribe und der damit verbundenen Erzeugung einer Subscription, geht das Observable in den Zustand „hot“ über. Die Werte des Observables werden nun überwacht und verarbeitet. Werte, die vor der Subscription in das Observable gereicht wurden, wird der neue Subscriber nie erfahren. Das folgende Beispiel würde also lediglich die Werte 2 und 3 ausgeben.

import {Subject} from 'rxjs/Rx';

const subject = new Subject<string>(); 
subject.next('1'); 
subject.subscribe((value) => console.log(value)); 
subject.next('2'); 
subject.next('3');

Da zum Zeitpunkt der Subscription der Wert 1 bereits gesendet wurde, wird die Subscription lediglich über die nachfolgenden Werte benachrichtigt. Das Beispiel zeigt außerdem, dass die an subscribe übergebene Arrow-Function jeweils einmal pro Wert des Subjects ausgeführt wird. Doch die Methode subscribe verfügt noch über zwei weitere Parameter, um auf Ereignisse des Observables zu reagieren. Das zweite Argument ist ein Callback für Fehler und das dritte Argument wird ausgeführt, wenn das Observable completed wurde. Hier ist ein Beispiel, welches diese beiden Callbacks demonstriert.

import {Subject} from 'rxjs/Rx';

const subject = new Subject<string>(); 
subject.subscribe(
	(value) => console.log(value), 
	(error) => console.error(error), 
	() => console.log('finished'));

subject.next('1'); 
subject.next('2'); 
subject.error(new Error('something went wrong')); 
subject.complete();

Die Ausgabe dieses Code-Schnipsels würde die Werte 1, 2 und einen Fehler liefern, nicht aber die „finished“-Meldung. Das liegt daran, dass ein Observable nach einem Fehler nicht mehr beendet werden kann – es endet also entweder in dem Zustand „Error“ oder „Completed“. Würde man also die Zeile weglassen, die den Error auslöst, so würde neben 1 und 2 auch die Ausgabe „finished“ erscheinen.

An dieser Stelle ist eine weitere Besonderheit zu erwähnen: Fehler, die im Value-Callback (erstes Argument der subscribe-Methode) auftreten, werden nicht vom Error-Callback behandelt.
Hat man erst einmal eine Subscription in der Hand, so ist man auch in der Verantwortung diese wieder aufzulösen. Ansonsten riskiert man ein Speicherleck. Es ist also wichtig, sicherzustellen, dass die Methode unsubscribe aufgerufen wird, wenn die Werte nicht mehr überwacht werden sollen. In Angular wird hierfür gerne die Lifecycle-Methode ngOnDestroy verwendet. Eine Ausnahme bilden Observables, bei denen eine Beendigung sichergestellt werden kann. Eine Http-Anfrage gehört beispielsweise zu dieser Klasse von Observables, da sie nach der Antwort durch den Server keine weiteren Ergebnisse liefern können und sie somit abgeschlossen sind.

Operatoren

Bis jetzt sind Observables noch nicht sonderlich spektakulär – warum also einen ganzen Artikel dazu schreiben? Erst mit der Einführung der Operatoren werden Observables wirklich interessant. Denn neben dem Unterschied zu Promises, dass mehr als nur ein Wert verarbeitet werden kann, wird dem Entwickler mit den Operatoren eine große Werkzeugkiste in die Hand gegeben. Sie ermöglichen es, die Werte eines Observables zu verändern, sie mit anderen Observables zu vereinen, zu filtern, zu limitieren und noch vieles mehr.

Operatoren werden in aller Regel auf Observables ausgeführt und geben ein neues Observable zurück. Dadurch können mehrere dieser Aufrufe aneinander gekettet werden. Es entsteht eine Observable-Sequenz, wodurch eine Notation ähnlich wie beim Builder-Pattern entsteht. Nachfolgend stelle ich die verschiedenen Kategorien, in die sich Operatoren aufteilen lassen, mit ihren wichtigsten Vertretern vor.

Marble-Diagramme

Es hat sich etabliert, die Funktionsweise von Observables durch Marble-Diagramme zu veranschaulichen. Ein Marble-Diagramm zeigt für ein einzelnes Observable immer einen Zeitstrahl, auf denen sich bunte „Murmeln“ befinden, welche die Werte symbolisieren.

Hier ist ein einfaches Beispiel für ein Marble-Diagramm, welches eine Observable-Sequenz darstellt, in der die eingehenden Werte 1:1 in einen anderen Wert konvertiert werden:

Die folgende Tabelle erklärt die Symbolik von Marble-Diagrammen.

Aus Platzgründen wird in diesem Artikel nicht für jeden hier aufgeführten Operator zusätzlich ein Marble-Diagramm aufgeführt. Eine ausführliche Liste mit Marble-Diagrammen findet sich auf der Seite http://rxmarbles.com/. Ein Besuch lohnt sich, denn die interaktiven Diagramme erleichtern den Einstieg in RxJS ungemein.

Erstellung

Folgende Operatoren erzeugen neue Observables. Sie können in den meisten Fällen statisch auf der Klasse Observable aufgerufen werden.

from

Observable.from([1, 2, 3]) 
   .subscribe((value) => console.log(value));

Erzeugt ein Observable, das jeden Wert eines Arrays einzeln in die Verarbeitungskette reicht. Im obigen Beispiel werden also die Werte 1, 2 und anschließend 3 ausgegeben. Nachdem alle Werte verarbeitet wurden, wird das Observable automatisch „completed“.

timer / interval

Observable.timer(500, 100) 
   .subscribe((value) => console.log(value));

Erzeugt ein Observable, das nach einer Verzögerung (im Beispiel 500 ms) den Wert 0 in die Sequenz reicht. Wird wie im Beispiel auch der zweite Parameter angegeben, so wird zusätzlich nach dieser Anzahl an Millisekunden je ein weiterer inkrementierter Wert zurückgegeben. Dieses Observable wird unendlich lang laufen. Es ist also wichtig, dass die Beendigung dieser Subscription durch ein unsubscribe sichergestellt wird. Das obige Beispiel wird die Werte 0, 1, 2, 3 und so weiter ausgeben, bis die Anwendung beendet wird. Ist lediglich die Funktionalität des zweiten Parameters gewünscht, so kann man dies mit dem Operator interval erreichen, welcher sich abgesehen von dem anfänglichen Delay identisch verhält.

Transformation

Es kommt sehr häufig vor, dass die Werte eines Observables in gänzlich neue Objekte umgewandelt werden müssen. Transformations-Operatoren übernehmen diese Aufgabe und erzeugen basierend auf den Eingangswerten ein oder mehrere neue Objekte für die weitere Sequenz.

map

Observable.from([1, 2, 3])
   .map((x) => x * 10) 
   .subscribe((value) => console.log(value));

Transformiert den eingehenden Wert in einen beliebigen anderen Wert. Im obigen Beispiel wird der eingehende Wert mit 10 multipliziert. Die Konsole wird also die Werte 10, 20 und 30 anzeigen. Das Marble-Diagramm zu diesem Operator habe ich übrigens als Beispiel zur Erklärung von Marble-Diagrammen verwendet.

flatMap

Observable.from([1, 2, 3])
   .flatMap((value) => Observable.from('A', 'B'))
   .subscribe((value) => console.log(value));

Transformiert einen einfachen Eingangswert in ein Observable, dessen Sequenzwerte anschließend jeweils einzeln in der Sequenz weiterverarbeitet werden. Im Beispiel wird ein simples Array mit den Werten 1, 2 und 3 mit flatMap aufgerufen. Für jeden dieser Werte wird im Flatmap-Callback ein neues Observable zurückgegeben, welches die Werte „A“ und „B“ beinhaltet. Die Ausgabe lautet demnach „A“, „B“, „A“, „B“, „A“ und „B“.

Filter

Filter-Operatoren verringern die Menge an verarbeiteten Werten. Sie können einzelne Werte aus der Sequenz entfernen oder die Sequenz gänzlich beenden.

filter

Observable.timer(500, 100)
   .filter((value) => value % 2 === 0)
   .take(5) 
   .subscribe((value) => console.log(value));

Der wohl naheliegendste Operator der Filter-Kategorie. filter begrenzt die Elemente einer Verarbeitungskette anhand einer Funktion, welche für jeden Wert einen Wahrheitswert zurückgibt. Ist dieser wahr, wird das Element in der Sequenz weitergereicht, ist er falsch, wird es gefiltert.

Im obigen Beispiel werden alle ungeraden Werte gefiltert, so dass am Ende nur die geraden Zahlen 0, 2, 4, 6 und 8 ausgegeben werden.

take / takeUntil

Observable.interval(100)
   .take(5) 
   .subscribe((value) => console.log(value));

Begrenzt die verarbeiteten Werte auf eine übergebene Anzahl. Im Beispiel wird das unendliche Intervall, welches alle 100 ms einen numerischen Wert liefert nach fünf Ausführungen beendet. Die Ausgabe ist also auf 0, 1, 2, 3 und 4 begrenzt. Durch die Begrenzung der Ausführungsanzahl wird die eigentlich unendliche Subscription endlich und kann so automatisch beendet werden. Der Aufruf von unsubscribe ist demnach nicht notwendig, außer es kann nicht sichergestellt werden, dass mindestens fünf Werte durch das Observable erzeugt werden.

const delayed = Observable.timer(1000);

Observable.interval(500)
   .takeUntil(delayed) 
   .subscribe((value) => console.log(value));

Eine andere Variante von take stellt takeUntil dar, welches so lange Werte verarbeitet, bis ein anderes Observable einen Wert liefert. Im Beispiel werden also die Werte 0 und 1 nach je 500 ms ausgegeben. Da zu diesem Zeitpunkt eine Sekunde vergangen ist, liefert das zweite Observable einen Wert 0 und beendet somit die Sequenz. Beide Subscriptions sind dann automatisch beendet.

Kombination

Hat man RxJS erst einmal im Einsatz, wird auch schnell die Anforderung entstehen, die Werte zweier Observables zu mischen und in einer gemeinsamen Sequenz zu verarbeiten. Die folgenden beiden Operatoren ermöglichen ein solches Zusammenführen zweier Observables.

zip

const nameObservable = Observable.from(['Jim', 'Johnny', 'Jack']);
const surnameObservable = Observable.from(['Beam', 'Walker', 'Daniels', 'Jameson']);

Observable.zip(nameObservable, surnameObservable)
   .map((values) => `${values[0]} ${values[1]}`)
   .subscribe((value) => console.log(value));

Möchte man die Werte von zwei Observables 1:1 miteinander kombinieren, so greift man zum zip-Operator. zip kann beliebig viele Observables entgegennehmen. Es wartet solange, bis von allen ein Wert eingetroffen ist und reicht diese dann in einem Array weiter.

Im Beispiel wird zip verwendet um ein Observable von Vornamen mit einem Observable von Nachnamen zu kombinieren. Die Ausgaben sind demnach „Jim Beam“, „Johnny Walker“ und „Jack Daniels“. „Jameson“ wird nicht ausgegeben, da hier der Vorname fehlt.

merge

Observable.interval(150)
   .merge(Observable.interval(200)) 
   .take(6) 
   .subscribe((value) => console.log(value));

Erweitert die Werte eines Observables um die Werte eines anderen Observables. Im Gegensatz zu zip wartet merge nicht auf die Werte des anderen Observables, die Werte werden sofort weitergereicht, egal von welchem Observable diese stammen.

Utility

Wie der Name dieser Kategorie bereits verrät, finden sich hier praktische Hilfs-Operatoren, die keiner der anderen Kategorien genau zugeordnet werden konnten.

do

Observable.from(['Jim', 'Johnny', 'Jack'])
   .do((name) => console.log(name)) 
   .subscribe();

Ein sehr praktischer Operator für Seiteneffekte. Dieser führt lediglich eine Funktion aus, ohne die Werte der Sequenz in irgendeiner Art zu verändern. Im Beispiel wurde die Konsolen-Ausgabe von der subscribe-Methode in das do verlagert, was keinen spürbaren Unterschied macht.

delay

Observable.interval(100)
   .delay(300) 
   .take(5) 
   .subscribe((value) => console.log(value));

Verzögert die Weitergabe eines Wertes um die übergebene Anzahl an Millisekunden und reicht anschließend alle bis dahin aufgetretenen Werte weiter. Nach Ablauf des Delays werden alle folgenden Werte sofort weitergereicht. Das Beispiel zeigt nach 300 ms die Werte 0, 1, 2, 3 und nach weiteren 100 ms den Wert 4 an.

Fehlerbehandlung

Neben der Observable-weiten Fehlerbehandlung der subscribe-Methode existieren auch mehrere Operatoren, die auf vorhergehende Errors in der Observable-Sequenz reagieren. Operatoren dieser Kategorie ermöglichen eine Wiederaufnahme der Sequenz. Der Error-Handler der subscribe-Methode wird also unter Umständen nicht benachrichtigt.

catch

Eine Fehlerbehandlung durch den Error-Handler der subscribe-Methode (2. Parameter) hat nicht die Möglichkeit, die Subscription weiter aufrecht zu erhalten. Nach der Ausführung der Fehlerbehandlung ist die Subscription zwangsweise beendet, obwohl das Observable möglicherweise noch weitere Werte liefern könnte.

Anders verhält sich der catch-Operator. Dieser ruft die übergebene Funktion auf, sofern irgendwo vor ihm in der Sequenz ein Fehler auftreten ist. Der Rückgabewert der Funktion wird anschließend in der weiteren Sequenz verarbeitet.

Observable.interval(100)
   .do(() => {throw Error('something went wrong')})
   .catch((error) => 'fixed it!')
   .subscribe((value) => console.log(value));

Somit gibt das Beispiel den Wert „fixed it!“ mehrfach zurück, obwohl der Wert ursprünglich ein numerischer Zähler des Operators interval war.

retry

Observable.from(['Jim', 'Johnny', 'Jack'])
   .do((name) => console.log(name)) 
   .do(() => {throw Error('something went wrong')})
   .retry(2) 
   .subscribe((value) => console.log(value));

Vor allem bei Netzwerkanfragen kann es vorkommen, dass fehlgeschlagene Anfragen wiederholt werden müssen. Für diese Aufgabe eignet sich der retry-Operator. Dieser wiederholt im Fehlerfall die vorhergehende Sequenz bis diese erfolgreich ist oder die übergebene Anzahl an Versuchen erreicht wurde. Das Beispiel gibt also dreimal „Jim“ und anschließend einen Stacktrace des Fehlers aus. „Johnny“ und „Jack“ kommen gar nicht erst zum Zug.

Fazit

RxJS ist eine hochinteressante Bibliothek, welche, wenn sie gut in der Anwendung integriert wurde, ein mächtiges Werkzeug darstellt. Angular hat hier bereits gute Arbeit geleistet. Die Http-Kommunikation, das Databinding und die Eventbehandlung setzen auf Observables.

Hat man dessen Funktionsweise erst einmal verstanden, gelingt einem das Umdenken von Promises auf Observables recht schnell. Ich hoffe, dieser Artikel erleichtert die Überwindung der ersten Hürden, so dass sich schon bald die ersten Erfolge mit RxJS einstellen.

Weiterführende Links

http://reactivex.io/rxjs/
https://www.learnrxjs.io/
http://rxmarbles.com/
https://channel9.msdn.com/blogs/j.van.gogh/reactive-extensions-api-in-depth-marble-diagrams-select–where

Zum Autor

Michael Ruttka ist Consultant bei der buschmais GbR. Seine fachlichen Schwerpunkte liegen in der Entwicklung und Konzeption von Webanwendungen.

Weitere Artikel von Michael

Kommentare sind abgeschaltet.