Angular relies heavily on the RxJs library for reactive patterns, and using functional programming paradigms from an event stream.  As Angular routes compose different components, events within or outside of the UI push data causing components to render the changes.

If this sounds difficult, hold on, I'll explain this pattern and its benefits.  Even I suffered from my misunderstandings when using effects in NgRx.

If you have familiarity with callbacks, the same idea applies with the Observer.  The difference lies where a callback notifies a single caller and the Observer notifies zero or multiple listeners.

The advantage for using the Observer pattern in Angular boils down to the usage of components.  Components need to share data between them and react to certain events, for example a button click responds by performing a backend call.  That backend call pushes to an event stream.  Once complete, multiple components display parts of the data returned from the backend response.

Another use case deals with real time events such as web socket connections or SSE.  These could aid the creation of a real time chat.

Free Your Developers

Nx Monorepo Starter alleviates developers from re-inventing deployments on popular Google Cloud Services, standardizes on libraries, and saves time for scaffolding projects.

Download

Goals

  1. Understand Subscribing / Unsubscribing
  2. Multicast vs. Unicast
  3. Inner / Outer Observables
  4. Common Inner Observable Operators
  5. Common Operators

Subscribing

The confusion I've encountered with developers new to the Observer pattern, and is necessary to start listening to the Observable, consumers must subscribe.  There exist several techniques in Angular to accomplish this.

If explicitly subscribing to an Observable, the subscribe method takes in a listener for the next value, or an object with methods of next, error, and complete.

Understand, that when error invokes, an Observable throws an error and stops emitting values.

When the Observable finishes, the complete method invokes and will also stop emitting values.

As long as the subscription has not completed or unsubscribed, values pushed invoke the next method.

Unsubscribing

Unsubscribing stops emitting values to listeners in the subscription.  This hinders those new to RxJs as implementations of completing or unsubscribing differ.  And this matters as components subscribe to Hot Observables keep pushing values, and if not properly stopping the subscription this can cause memory leaks.  I'll cover more below the difference between hot and cold observables below.

For more information in preventing these memory leaks, find three different patterns to prevent memory leaks.

3 Patterns to Prevent Memory Leaks in Angular Components for Observables
Learn Three Patterns to Prevent Memory Leaks in Angular Components which Consume Observables

Unicast and Multicast

Developer confusion arise if the observable differs in the execution context.  This type of difference could be obscured by an operator, which in the implementation of that operator utilizes a Subject.

Subjects create multicast event streams that share the same execution context.  Subjects essentially keep a list of observers to push values into.

Multicast
const poorManSubject = () => {
  // An array of observers for that watch for values
  const observers = [];
  
  return {
    // Adds an observer to the list of obsevers
    addObserver: (observer) => observers.push(observer),
    // When a value is pushed from the subject, emit value to each observer
    next: (value) => observers.forEach(observer => observer(value))
  };
}

In the context of a web page, this came be exemplified with a shopping cart.  As with most ecommerce applications, the number of shopping cart items places in the header near navigation links.  The shopping cart component emits events that update state and both the cart component line items update along side the number of items in the header.  Both components subscribe to a shared context.

In unicast observables, a new execution context is created.  So there's one observer listening to the values being pushed from the stream.

unicast

Inner Observables

If you've ever dealt with dependent promises where the previous promises output is required by the next Promise, inner observables should make sense.  Otherwise simply, it's passing values emitted from an outer stream and passing to another stream for handling.  With certain operators that subscribe to inner observables, there are key differences.

@Component({
  selector: 'a-fake-selector'
  // rest left out for brevity
})
export class aFakeComponent {
  protected subject$ = new BehaviorSubject<string>(null);
  
  @Input()
  public searchText set(value: string) {
    this.subject$.next(value);
  }
  
  // Outer Observable, this will push values into the inner observable
  public searchValues$: Observable<{ status: 'OK' | 'Error', results: string[] }> = this.subject$.pipe(
    // using switch map here to cancel previous observable, say the user is 
    // typing real fast, and we can to cancel the previous in flight
    // http request
    switchMap(searchValue => {
      // Inner Observable, the outer stream will emit values from the httpClient
      return this.httpClient.post('https://afakeendpoint.com', { searchValue }).pipe(
        map(searchResults => ({status: 'OK', results: searchResults })),
        catchError(err => {
          return of({
            status: 'Error',
            results: ['An unexpected error']
          })
        })
      )
    })
  );
  
  constructor(protected httpClient: HttpClient) { }
}

Common Inner Observable Operators

The following operators subscribe to the inner observable.  Below I will explain when to use each inner observable operator.

mergeMap

Say you've entered a chatroom with multiple friends.  As each friend enters the chatroom, you want to listen to what each friend says.  Each of them typing messages and the chat room fills with the messages.

When a friend leaves, messages cease displaying in the chat for that friend.

mergeMap merges all of the inner observables into one stream.  It's useful for parallel HTTP calls as one does not have to wait for the other.

mergeMap WebSocket

In the mergeMap WebSocket diagram, the Outer Stream box represents users entering a chat room.  There's U1 (user 1), U2, and U3 all entering the room at different times.  The outer stream waits for other users to enter the chat room over time and as they do, the inner stream connects new web sockets for values to be emitted.

// outerstream for the chatroom waiting for new users
const chatRoomMessages$ = chatRoomOuterStream$.pipe(
  mergeMap(({ userId, chatroomId }) => {
    // inner stream emits all values to the outer stream
    return webSocket(`ws://localhost:8081?roomId=${chatroomId}&userId=${userId}`);
  })
)

concatMap

Imagine waiting in a line at the grocery store.  Each customer waits to perform a transaction with the store.  The cashier executes (subscribing) scans each customer's items and performs their transaction, completing the customer in queue.  The following customer needed to wait before the cashier could attend to scan and perform their transaction.

This cashier and customer flow describes a concatMap inner observable flow.  Each value emitted by the outer stream enters a queue in the order received.  The inner stream executes and the next value won't be executed until the previous completes.

The concatMap operator could be used in a trivia game where users compete by sending their answers in first.  Contestants choosing the correct answer first and validating answer in an ordered sequence matters as requirements on order of correct answers.

concatMap - requests execute in order they are received

switchMap

Now envision it's the beginning of the year, and join a gym.  It offers different subscriptions all with different amenities included.  First, your subscription allows for a fourteen day trial, and turns into a $30/month charge.  After the fourteen days you start getting charged $30/month.

switchMap provides similar functionality.  First your outer stream is time passing day by day.  After a certain threshold, the free trial expires and new $30/month in the inner observable.  The free trial is canceled and $30/month subscription begins.

Another use case for this would be a type ahead.  As a user types into a textbox, a search waits in the background to resolve.  While waiting, if the user changes the input, the previous request will stop, or be canceled, and a new request created.

const inputChange$ = fromEvent(formInput, 'change');

const values$ = inputChange$.pipe(
  // cancel previous observable if it was in flight
  // and switch to the observable inside of the switchMap
  switchMap(value => {
    return this.httpClient.post('http://yoururlgoeshere.com', {
      searchTerm: value
    });
  })
);
switchMap - previous observable cancelled

Common Operators

map

This operator transform the values emitted by a stream to another value.  Often developers confuse mergeMap and map.  Know that mergeMap returns another observable, while map returns a new value to be emitted in the stream.

const subject$ = new BehaviorSubject<number>(5);

const multiplyByFive$: Observable<number> = subject$.pipe(
  map(value => value * 5)
);

multiplyByFive$.subscribe(value => {
  console.log(value);
});

subject.next(4);

// output on line 8
// 25
// 20

catchError

When errors occur in observables, the emitted value can be changed.  This often comes in handy when using NgRx to emit an error action, as NgRx stream expects a type of Action.  Do understand that when errors within the stream go unhandled, it completes the stream and values stop emitted.

of(1,2,3,4,5).pipe(
  map(value => {
    if (value > 5) {
      throw 'Value must be less than 5';
    }

    return value;
  }),
  catchError(() => of(0)) // complete stream and return 0
);

Summary

Invoking an observable requires developers to subscribe and this will start the emission of values.  Unsubscribing from an observable tells an observable to remove the listener.

There a multicast and unicast observables.  Multicast emit the same values to their subscribers, while unicast emits different values of the observable.

Inner observables trip developers up quite a bit.  Without knowing what the mergeMap, switchMap, and concatMap operators unexpected values can be emitted or even canceled.

mergeMap - parallel execution of inner observables

switchMap - cancellation of previous inner observable

concatMap - inner observables execute in the order the outter observable pushed values to it.

Common operators exist to transform values and help catch errors.  Many more operators exist and do many other things under the hood, but these two have frequent use.