RxJs Tutorial - Understanding the Basics
Angular relies heavily on the RxJs
library for reactive patterns, and using functional programming paradigms from an event stream. As Angular routes compose different components, events within or outside of the UI push data causing components to render the changes.
If this sounds difficult, hold on, I'll explain this pattern and its benefits. Even I suffered from my misunderstandings when using effects in NgRx.
If you have familiarity with callbacks
, the same idea applies with the Observer
. The difference lies where a callback
notifies a single caller and the Observer
notifies zero or multiple listeners.
The advantage for using the Observer pattern in Angular boils down to the usage of components. Components need to share data between them and react to certain events, for example a button click responds by performing a backend call. That backend call pushes to an event stream. Once complete, multiple components display parts of the data returned from the backend response.
Another use case deals with real time events such as web socket connections or SSE. These could aid the creation of a real time chat.
Free Your Developers
Nx Monorepo Starter alleviates developers from re-inventing deployments on popular Google Cloud Services, standardizes on libraries, and saves time for scaffolding projects.
DownloadGoals
- Understand Subscribing / Unsubscribing
- Multicast vs. Unicast
- Inner / Outer Observables
- Common Inner Observable Operators
- Common Operators
Subscribing
The confusion I've encountered with developers new to the Observer pattern, and is necessary to start listening to the Observable
, consumers must subscribe
. There exist several techniques in Angular to accomplish this.
If explicitly subscribing to an Observable
, the subscribe
method takes in a listener for the next value, or an object with methods of next
, error
, and complete
.
Understand, that when error
invokes, an Observable
throws an error and stops emitting values.
When the Observable
finishes, the complete
method invokes and will also stop emitting values.
As long as the subscription has not completed or unsubscribed, values pushed invoke the next
method.
Love Web Development?
Angular, Google Cloud, C#, Node, NestJs?
Unsubscribing
Unsubscribing stops emitting values to listeners in the subscription. This hinders those new to RxJs
as implementations of completing or unsubscribing differ. And this matters as components subscribe to Hot Observables keep pushing values, and if not properly stopping the subscription this can cause memory leaks. I'll cover more below the difference between hot and cold observables below.
For more information in preventing these memory leaks, find three different patterns to prevent memory leaks.
Unicast and Multicast
Developer confusion arise if the observable differs in the execution context. This type of difference could be obscured by an operator
, which in the implementation of that operator utilizes a Subject
.
Subjects create multicast event streams that share the same execution context. Subjects essentially keep a list of observers to push values into.
const poorManSubject = () => {
// An array of observers for that watch for values
const observers = [];
return {
// Adds an observer to the list of obsevers
addObserver: (observer) => observers.push(observer),
// When a value is pushed from the subject, emit value to each observer
next: (value) => observers.forEach(observer => observer(value))
};
}
In the context of a web page, this came be exemplified with a shopping cart. As with most ecommerce applications, the number of shopping cart items places in the header near navigation links. The shopping cart component emits events that update state and both the cart component line items update along side the number of items in the header. Both components subscribe to a shared context.
In unicast observables, a new execution context is created. So there's one observer listening to the values being pushed from the stream.
Inner Observables
If you've ever dealt with dependent promises where the previous promises output is required by the next Promise, inner observables should make sense. Otherwise simply, it's passing values emitted from an outer stream and passing to another stream for handling. With certain operators that subscribe to inner observables, there are key differences.
@Component({
selector: 'a-fake-selector'
// rest left out for brevity
})
export class aFakeComponent {
protected subject$ = new BehaviorSubject<string>(null);
@Input()
public searchText set(value: string) {
this.subject$.next(value);
}
// Outer Observable, this will push values into the inner observable
public searchValues$: Observable<{ status: 'OK' | 'Error', results: string[] }> = this.subject$.pipe(
// using switch map here to cancel previous observable, say the user is
// typing real fast, and we can to cancel the previous in flight
// http request
switchMap(searchValue => {
// Inner Observable, the outer stream will emit values from the httpClient
return this.httpClient.post('https://afakeendpoint.com', { searchValue }).pipe(
map(searchResults => ({status: 'OK', results: searchResults })),
catchError(err => {
return of({
status: 'Error',
results: ['An unexpected error']
})
})
)
})
);
constructor(protected httpClient: HttpClient) { }
}
Common Inner Observable Operators
The following operators subscribe to the inner observable. Below I will explain when to use each inner observable operator.
mergeMap
Say you've entered a chatroom with multiple friends. As each friend enters the chatroom, you want to listen to what each friend says. Each of them typing messages and the chat room fills with the messages.
When a friend leaves, messages cease displaying in the chat for that friend.
mergeMap
merges all of the inner observables into one stream. It's useful for parallel HTTP calls as one does not have to wait for the other.
In the mergeMap WebSocket
diagram, the Outer Stream
box represents users entering a chat room. There's U1
(user 1), U2
, and U3
all entering the room at different times. The outer stream waits for other users to enter the chat room over time and as they do, the inner stream connects new web sockets for values to be emitted.
// outerstream for the chatroom waiting for new users
const chatRoomMessages$ = chatRoomOuterStream$.pipe(
mergeMap(({ userId, chatroomId }) => {
// inner stream emits all values to the outer stream
return webSocket(`ws://localhost:8081?roomId=${chatroomId}&userId=${userId}`);
})
)
concatMap
Imagine waiting in a line at the grocery store. Each customer waits to perform a transaction with the store. The cashier executes (subscribing) scans each customer's items and performs their transaction, completing the customer in queue. The following customer needed to wait before the cashier could attend to scan and perform their transaction.
This cashier and customer flow describes a concatMap
inner observable flow. Each value emitted by the outer stream enters a queue in the order received. The inner stream executes and the next value won't be executed until the previous completes.
The concatMap
operator could be used in a trivia game where users compete by sending their answers in first. Contestants choosing the correct answer first and validating answer in an ordered sequence matters as requirements on order of correct answers.
switchMap
Now envision it's the beginning of the year, and join a gym. It offers different subscriptions all with different amenities included. First, your subscription allows for a fourteen day trial, and turns into a $30/month charge. After the fourteen days you start getting charged $30/month.
switchMap
provides similar functionality. First your outer stream is time passing day by day. After a certain threshold, the free trial expires and new $30/month in the inner observable. The free trial is canceled and $30/month subscription begins.
Another use case for this would be a type ahead. As a user types into a textbox, a search waits in the background to resolve. While waiting, if the user changes the input, the previous request will stop, or be canceled, and a new request created.
const inputChange$ = fromEvent(formInput, 'change');
const values$ = inputChange$.pipe(
// cancel previous observable if it was in flight
// and switch to the observable inside of the switchMap
switchMap(value => {
return this.httpClient.post('http://yoururlgoeshere.com', {
searchTerm: value
});
})
);
Common Operators
map
This operator transform the values emitted by a stream to another value. Often developers confuse mergeMap
and map
. Know that mergeMap
returns another observable, while map
returns a new value to be emitted in the stream.
const subject$ = new BehaviorSubject<number>(5);
const multiplyByFive$: Observable<number> = subject$.pipe(
map(value => value * 5)
);
multiplyByFive$.subscribe(value => {
console.log(value);
});
subject.next(4);
// output on line 8
// 25
// 20
catchError
When errors occur in observables, the emitted value can be changed. This often comes in handy when using NgRx
to emit an error action, as NgRx
stream expects a type of Action
. Do understand that when errors within the stream go unhandled, it completes the stream and values stop emitted.
of(1,2,3,4,5).pipe(
map(value => {
if (value > 5) {
throw 'Value must be less than 5';
}
return value;
}),
catchError(() => of(0)) // complete stream and return 0
);
Summary
Invoking an observable requires developers to subscribe and this will start the emission of values. Unsubscribing from an observable tells an observable to remove the listener.
There a multicast and unicast observables. Multicast emit the same values to their subscribers, while unicast emits different values of the observable.
Inner observables trip developers up quite a bit. Without knowing what the mergeMap
, switchMap
, and concatMap
operators unexpected values can be emitted or even canceled.
mergeMap - parallel execution of inner observables
switchMap - cancellation of previous inner observable
concatMap - inner observables execute in the order the outter observable pushed values to it.
Common operators exist to transform values and help catch errors. Many more operators exist and do many other things under the hood, but these two have frequent use.