All You Need To Know About RxJS To Get Started

All You Need To Know About RxJS To Get Started

I recently came across MkDocs-Material by Martin Donath, a fantastic open-source project with over 22k GitHub stars.

It’s an incredible contribution to the community, making documentation hosting effortless.

While exploring it, I got curious about how such a large project achieves reactiveness.

The stack is mostly HTML, SCSS, Preact, RxJS, and a few workers, and I saw this as the perfect opportunity to dive into RxJS—especially how it utilizes Observables and other advanced patterns.

So, let’s break down RxJS from the ground up and see what makes it tick!

Intro to RxJS

What is RxJS?

RxJS (Reactive Extensions for JavaScript) is a library for composing asynchronous and event-based programs using Observables.

Think of Observables as data streams you can listen to, transform, combine, and control with precision.

In simpler terms: RxJS helps you manage async data like a pro.

Why Should You Care?

Here’s why RxJS is worth your attention:

  • Declarative Approach: Focus on what to do with data, not how to manage it.
  • Powerful Operators: Transform, filter, and combine streams with ease.
  • Versatile: Perfect for handling user interactions, HTTP requests, WebSockets, and more.

Image description

Getting Started

First, install RxJS with npm:

npm install rxjs

Creating Observables in RxJS

Image description

From Scratch

You can create an observable using new Observable().

Inside it, you define what to send to subscribers using observer.next(). This can be literally anything—a string, an object, even your to-do list (though maybe don’t do that).

To get things rolling, just call subscribe(), which makes the observable start firing off those values.

Here’s a quick example:

import { Observable } from "rxjs";

const customObservable = new Observable((observer) => {
  observer.next("Hi");
  observer.next("Mom");
  observer.complete();
});

customObservable.subscribe({
  next: (value) => console.log(value),
  complete: () => console.log(`I'm Done!`),
});

// Output:

// Hi
// Mom
// I'm Done!

Boom! Values are sent, received, and when there’s nothing left to say, complete() wraps it up like a polite email sign-off.

From DOM Events

Want to react to user clicks?

Just use fromEvent().

Pass in the DOM element and the event you care about—like click.

import { fromEvent } from "rxjs";

const button = document.getElementById("myButton");

const clicks$ = fromEvent(button, "click");

clicks$.subscribe(() => console.log("Button clicked!"));

Now, every time someone clicks the button, it logs “Button clicked!” Simple, right?

You could even hook this up to rage-click counters.

From Promises

Got a promise?

You can turn it into an observable with from().

This is super handy when dealing with promise-based libraries.

import { from } from "rxjs";

const promise = new Promise((resolve) =>
  setTimeout(() => resolve("Resolved!"), 1000)
);

const observableFromPromise = from(promise);

observableFromPromise.subscribe((value) => console.log(value));

// Converting back to a promise
observableFromPromise.toPromise().then(console.log);

// Output
// Resolved.

This simulates an API call (aka: fake waiting).

After one second, it logs “Resolved!” Oh, and if you ever miss promises that much, you can flip it back with toPromise().

Static Values

Need to turn random data into an observable?

Use of(). It doesn’t care what you throw at it—numbers, strings, booleans, objects, your grocery list…

import { of } from "rxjs";

const staticValues$ = of(1, "RxJS", true, { key: "value" });

staticValues$.subscribe((value) => console.log(value));

// Output:
// 1
// RxJS
// true
// { key: 'value' }

Basically, anything can be part of a stream.

Yes, anything. Even your existential crisis.

Image description

Timers and Intervals

If you need to fire events after a delay or at regular intervals, RxJS has your back:

  • Timers trigger once after a set delay.
  • Intervals keep firing like an overenthusiastic alarm clock.
import { timer, interval } from "rxjs";

// Emits once after 2 seconds
timer(2000).subscribe(() => console.log("Timer fired!"));

// Emits every second
interval(1000).subscribe((count) => console.log(`Count: ${count}`));

timer() waits politely before saying anything, while interval() just can’t stop talking—perfect for regular updates (or if you’re trying to annoy your console).

Conclusion

RxJS might feel overwhelming at first, but once you get the hang of it, you’ll wonder how you ever managed async code without it.

Start small, experiment, and soon you’ll be chaining Observables like a boss.

LiveAPI: Super-Convenient API Docs Generation

With LiveAPI, you can quickly generate interactive API documentation that allows users to execute APIs directly from the browser.

Hot vs Cold Observable in RxJS

Image description

Understanding the difference between them is crucial when working with reactive programming, as it affects how data is produced and shared among subscribers. Now lets dive into hot and cold observables.

Cold Observables: Fresh Data for Every Subscriber

A cold observable is one where the underlying data is created inside the observable.

This means that each subscription starts a new execution of the observable, producing unique values for each subscriber.

Example of a Cold Observable

import { Observable } from "rxjs";

const coldObservable = new Observable((observer) => {
  const randomNum = Math.random(); // Generates a new number per subscription
  observer.next(randomNum);
  observer.complete();
});

coldObservable.subscribe((value) => console.log("Subscriber 1:", value));
coldObservable.subscribe((value) => console.log("Subscriber 2:", value));

// Output:
// Subscriber 1: 0.645732
// Subscriber 2: 0.927384

Each subscriber gets a different random number because the observable generates a new value each time someone subscribes.

Hot Observables: Shared Data Among Subscribers

A hot observable is one where the data is generated outside the observable.

All subscribers receive the same data and share execution, preventing redundant computations.

Example of a Hot Observable

import { Observable } from "rxjs";

const sharedRandomNum = Math.random();

const hotObservable = new Observable((observer) => {
  observer.next(sharedRandomNum);
  observer.complete();
});

hotObservable.subscribe((value) => console.log("Subscriber 1:", value));
hotObservable.subscribe((value) => console.log("Subscriber 2:", value));

// Output:
// Subscriber 1: 0.645732
// Subscriber 2: 0.645732

Since the random number is generated before the observable is created, all subscribers receive the same value.

Making a Cold Observable Hot with publish()

Instead of generating data externally, we can use the publish() operator to convert a cold observable into a hot one.

import { Observable } from "rxjs";
import { publish } from "rxjs/operators";

const coldObservable = new Observable((observer) => {
  const randomNum = Math.random();
  observer.next(randomNum);
  observer.complete();
});

const hotObservable = coldObservable.pipe(publish());

hotObservable.subscribe((value) => console.log("Subscriber 1:", value));
hotObservable.subscribe((value) => console.log("Subscriber 2:", value));

hotObservable.connect(); // Ensures the observable emits shared values

// Output:
// Subscriber 1: 0.845291
// Subscriber 2: 0.845291

Calling connect() ensures that the observable emits data only once, making it behave like a hot observable.

Completing Observables & Avoiding Memory Leaks

Observables should be properly completed to avoid potential memory leaks, especially when dealing with continuous streams.

Example: Completing an Observable Automatically

Using finally(), we can detect when an observable completes:

import { timer } from "rxjs";
import { finalize } from "rxjs/operators";

timer(1000)
  .pipe(finalize(() => console.log("Observable completed")))
  .subscribe(() => console.log("Emitted value"));

Example: Manually Unsubscribing

For infinite observables like interval(), we must manually unsubscribe to prevent leaks:

import { interval } from "rxjs";

const subscription = interval(1000).subscribe((value) => console.log(value));

setTimeout(() => {
  subscription.unsubscribe();
  console.log("Unsubscribed from observable");
}, 3000);

After 3 seconds, the subscription stops receiving values, freeing resources.

Conclusion

  • Cold observables generate new data for each subscriber.
  • Hot observables share data among subscribers.
  • You can convert a cold observable into a hot one using publish() and connect().
  • Always handle completion and unsubscribing to prevent memory leaks.

Bonus: Using forkJoin for Combining Observables

If you need to combine multiple observables and only emit the final values, forkJoin is your go-to operator.

Example:

import { forkJoin, of, delay } from "rxjs";

const obs1$ = of("rainbows").pipe(delay(2000));
const obs2$ = of("unicorns").pipe(delay(2000));

forkJoin([obs1$, obs2$]).subscribe((values) => console.log(values));

// Output (after 2s): ['rainbows', 'unicorns']

This ensures that the subscription only receives values after both observables have completed.

Handling Errors with catchError and retry

Error handling is essential in reactive programming. The catchError operator allows graceful handling of errors, while retry lets you reattempt a failed observable sequence.

Example:

import { throwError, catchError, retry } from "rxjs";

const faulty$ = throwError(() => new Error("Something went wrong!"));

faulty$
  .pipe(
    retry(2),
    catchError((err) => of(`Error caught: ${err.message}`))
  )
  .subscribe((value) => console.log(value));

// Output (after 2 retries): Error caught: Something went wrong!

LiveAPI: Super-Convenient API Docs Generation

With LiveAPI, you can quickly generate interactive API documentation that allows users to execute APIs directly from the browser.

Operators in RxJS

map

The map operator is your go-to when you need to transform data.

Think of it like JavaScript's Array.map, but for Observables.

Example:

import { of } from "rxjs";
import { map } from "rxjs/operators";

const jsonStr = '{ "greetType": "Hi", "familyMember": "Mom" }';

of(jsonStr)
  .pipe(map((json) => JSON.parse(json)))
  .subscribe((obj) => {
    console.log(obj.greetType);
    console.log(obj.familyMember);
  });

// Output:
// Hi
// Mom

Here, we're transforming a JSON string into a usable JavaScript object.

Perfect when handling API responses.

Image description

tap

tap lets you perform side effects without affecting the actual data stream.

It's great for debugging.

Example:

import { of } from "rxjs";
import { tap, map } from "rxjs/operators";

of("rxjs")
  .pipe(
    tap((value) => console.log(`Original: ${value}`)),
    map((value) => value.toUpperCase()),
    tap((value) => console.log(`Transformed: ${value}`))
  )
  .subscribe();

You’ll see both the original and transformed values in the console.

Super handy for peeking into the data flow.

Image description

filter

filter does exactly what you think: it filters data based on a condition.

Example:

import { from } from "rxjs";
import { filter } from "rxjs/operators";

from([1, 2, 3, 4, 5])
  .pipe(filter((num) => num % 2 === 0))
  .subscribe(console.log);

// Output:
// 2, 4

Only even numbers make it through.

It's like the bouncer of your data stream.

debounceTime & throttleTime

Both operators control the rate of emitted values but behave differently:

  • debounceTime emits the last value after a delay.
  • throttleTime emits the first value, then ignores subsequent values for the set time.

Example:

import { fromEvent } from "rxjs";
import { debounceTime, throttleTime } from "rxjs/operators";

const input = document.getElementById("search");

fromEvent(input, "input")
  .pipe(debounceTime(500))
  .subscribe(() => console.log("Debounced input:", input.value));

fromEvent(input, "input")
  .pipe(throttleTime(1000))
  .subscribe(() => console.log("Throttled input:", input.value));

Try typing fast to see the difference.

debounceTime waits until you pause; throttleTime logs intermittently.

scan

scan accumulates values over time, similar to reduce in JavaScript.

Example:

import { fromEvent } from "rxjs";
import { map, scan } from "rxjs/operators";

fromEvent(document, "click")
  .pipe(
    map(() => 1),
    scan((acc, curr) => acc + curr, 0)
  )
  .subscribe((count) => console.log(`Total clicks: ${count}`));

Every click increments the total count.

Simple yet powerful for cumulative tasks.

switchMap

switchMap is ideal when you need to cancel previous requests and switch to a new one.

Example:

import { fromEvent, interval } from "rxjs";
import { switchMap } from "rxjs/operators";

fromEvent(document, "click")
  .pipe(switchMap(() => interval(1000)))
  .subscribe(console.log);

Clicking resets the interval.

Perfect for scenarios like search suggestions or live data feeds.

takeUntil

takeUntil stops emissions when another Observable emits a value.

Example:

import { interval, timer } from "rxjs";
import { takeUntil } from "rxjs/operators";

const source$ = interval(500);
const stopper$ = timer(3000);

source$.pipe(takeUntil(stopper$)).subscribe({
  next: console.log,
  complete: () => console.log("Completed!"),
});

The interval runs until the timer fires after 3 seconds, then completes automatically.

takeWhile

takeWhile emits values as long as a condition is true.

Example:

import { from } from "rxjs";
import { takeWhile } from "rxjs/operators";

from(["Alice", "Bob", "Charlie", "Doug", "Eve"])
  .pipe(takeWhile((name) => name !== "Doug"))
  .subscribe(console.log);

It stops emitting once it hits "Doug".

Great for conditional data flows.

first

The first operator extracts only the very first value emitted by an observable and then completes.

Example:

import { of, first } from "rxjs";

const numbers$ = of(-3, -2, -1, 0, 1, 2, 3);
numbers$.pipe(first()).subscribe((value) => console.log(value));

// Output: -3

In this case, first() ensures that only -3, the first value emitted by numbers$, is logged.

Applying Conditions

You can also pass a predicate function to first(), which selects the first value that satisfies a condition:

numbers$
  .pipe(first((value) => value > 0))
  .subscribe((value) => console.log(value));

// Output: 1

last

The last operator extracts only the last emitted value before the observable completes.

Example:

import { last } from "rxjs";

numbers$.pipe(last()).subscribe((value) => console.log(value));

// Output: 3

Here, last() ensures that only 3, the last value from numbers$, is logged.

Applying Conditions

Just like first(), you can pass a condition to last():

numbers$
  .pipe(last((value) => value < 0))
  .subscribe((value) => console.log(value));

// Output: -1

Here’s a friendly explanation of the pipe method that fits well with your blog’s casual tone:

How Does pipe Works

So, you’ve seen pipe popping up everywhere in RxJS, right? It’s like the conveyor belt of an Observable. You take a stream of data, send it through a series of transformations, and out comes the final processed value.

Think of it like assembling a burger :

  1. Start with the base (Observable)
  2. Add layers (operators like map, filter, etc.)
  3. Get the final delicious result (processed data)
import { range } from "rxjs";
import { map, filter, scan } from "rxjs/operators";

const source$ = range(0, 10);

source$
  .pipe(
    filter((x) => x % 2 === 0), // Keep only even numbers
    map((x) => x * 2), // Double them
    scan((acc, x) => acc + x, 0) // Keep a running sum
  )
  .subscribe(console.log);

Step 1: Creating an Observable

const source$ = range(0, 10);
  • range(0, 10) creates an Observable that emits numbers from 0 to 9 (inclusive).
  • This is our source Observable (source$), which we will process using operators.

Step 2: Applying Pipeable Operators

We use the .pipe() method to transform the emitted values step by step.

1️⃣ filter(x => x % 2 === 0)
  • This filters out odd numbers, keeping only even ones.
  • It passes only numbers where x % 2 === 0 (i.e., numbers divisible by 2).
  • Emitted values after filtering: 0, 2, 4, 6, 8
2️⃣ map(x => x + x)
  • This transforms each emitted value by doubling it.
  • Mapping result:
    • 0 → 0
    • 2 → 4
    • 4 → 8
    • 6 → 12
    • 8 → 16
  • Emitted values after mapping: 0, 4, 8, 12, 16
3️⃣ scan((acc, x) => acc + x, 0)
  • Works like reduce(), but emits the accumulated result at each step.

  • It maintains an accumulator (acc), starting from 0.

  • It adds each emitted value to the accumulator and emits the running total.

    Step-by-step accumulation:

    • 0 + 0 = 0
    • 0 + 4 = 4
    • 4 + 8 = 12
    • 12 + 12 = 24
    • 24 + 16 = 40
  • Final emitted values: 0, 4, 12, 24, 40

Step 3: Subscribing to the Observable

.subscribe(x => console.log(x));
  • This listens to the Observable and logs each emitted value to the console.

Final Output in Console:

0
4
12
24
40

Without pipe, you’d have a messy chain of nested calls.
With pipe, everything stays clean, readable, and functional.

Conclusion

RxJS operators are the real magic behind reactive programming.

They help you transform, filter, combine, and control data streams effortlessly.

Subject and Multicast in RxJS

Group 645

What Is a Subject?

A Subject in RxJS is essentially an Observable with extra features. It allows you to:

  • Emit new data to subscribers at any time using .next().
  • Act as both an Observable and an Observer (you can subscribe to it and push values into it).
  • Broadcast values to multiple subscribers, unlike a regular Observable that sends data individually per subscription.

Example: Creating and Using a Subject

import { Subject } from "rxjs";

const subject = new Subject();

// Subscribers
subject.subscribe((value) => console.log(`Subscriber 1: ${value}`));
subject.subscribe((value) => console.log(`Subscriber 2: ${value}`));

// Emit values
subject.next("Hello, RxJS!");
subject.next("Subjects are powerful!");

How It Works:

  1. Two subscribers listen to the same Subject.
  2. When we call .next(), both subscribers receive the value simultaneously.
  3. Unlike standard Observables, we don’t need a separate producer—we control emissions directly.

This makes Subjects great for real-time data streaming, such as WebSocket updates, user interactions, or shared state management.

Multicasting: Avoiding Unnecessary Side Effects

By default, Observables create a new execution for each subscription, meaning if you have multiple subscribers, the data source runs multiple times.

Example: Problem with Multiple Subscribers

import { Observable } from "rxjs";

const observable = new Observable((subscriber) => {
  console.log("New subscriber - fetching data...");
  subscriber.next(Math.random()); // Simulating an API call
});

observable.subscribe((value) => console.log(`Subscriber 1: ${value}`));
observable.subscribe((value) => console.log(`Subscriber 2: ${value}`));

Output:

New subscriber - fetching data...
Subscriber 1: 0.12345
New subscriber - fetching data...
Subscriber 2: 0.67890

Each subscription triggers a new execution, leading to redundant API calls, computations, or event listeners. This is where multicasting comes in.

Using .multicast() to Share Data

What is .multicast()?

.multicast() is an operator that allows multiple subscribers to share a single execution of an Observable, avoiding unnecessary repetitions.

Example: Multicasting Click Events

import { fromEvent, Subject } from "rxjs";
import { tap, multicast } from "rxjs/operators";

const clicks = fromEvent(document, "click").pipe(
  tap(() => console.log("Click event triggered")), // Side effect
  multicast(() => new Subject())
);

const subscription = clicks.connect();

clicks.subscribe(() => console.log("Subscriber 1 received click event"));
clicks.subscribe(() => console.log("Subscriber 2 received click event"));

Why This Works Better:

  • Instead of logging "Click event triggered" twice per click, it runs only once.
  • Both subscribers receive the same event data.
  • .connect() ensures the shared execution starts immediately.

Image description

Subjects vs. Multicasting: When to Use What?

Feature Subjects Multicasting
Control over emissions ✅ Yes ❌ No (depends on source Observable)
Broadcast values to multiple subscribers ✅ Yes ✅ Yes
Avoid redundant execution ❌ No ✅ Yes
Ideal for state sharing ✅ Yes ❌ No
  • Use Subjects when you need a manually controlled data source (e.g., WebSocket messages, user actions).
  • Use Multicasting when you have a single source (like an API or event listener) but multiple subscribers.

Conclusion

Subjects and multicasting are powerful tools in RxJS that allow efficient data sharing.

Whether you need real-time data propagation with Subjects or execution optimization with Multicasting, these concepts make reactive programming more efficient.


While exploring mkdocs-material implementation, I've been learning how to adapt these techniques for LiveAPI, a product I've been passionately working on for quite a while.

With LiveAPI, you can quickly generate interactive API documentation that allows users to execute APIs directly from the browser.

Image description

If you’re tired of manually creating docs for your APIs, this tool might just make your life easier.