All You Need To Know About RxJS To Get Started

I recently came across MkDocs-Material by Martin Donath, a fantastic open-source project with over 22k GitHub stars.
It’s an incredible contribution to the community, making documentation hosting effortless.
While exploring it, I got curious about how such a large project achieves reactiveness.
The stack is mostly HTML, SCSS, Preact, RxJS, and a few workers, and I saw this as the perfect opportunity to dive into RxJS—especially how it utilizes Observables and other advanced patterns.
So, let’s break down RxJS from the ground up and see what makes it tick!
Intro to RxJS
What is RxJS?
RxJS (Reactive Extensions for JavaScript) is a library for composing asynchronous and event-based programs using Observables.
Think of Observables as data streams you can listen to, transform, combine, and control with precision.
In simpler terms: RxJS helps you manage async data like a pro.
Why Should You Care?
Here’s why RxJS is worth your attention:
- Declarative Approach: Focus on what to do with data, not how to manage it.
- Powerful Operators: Transform, filter, and combine streams with ease.
- Versatile: Perfect for handling user interactions, HTTP requests, WebSockets, and more.
Getting Started
First, install RxJS with npm:
npm install rxjs
Creating Observables in RxJS
From Scratch
You can create an observable using new Observable()
.
Inside it, you define what to send to subscribers using observer.next()
. This can be literally anything—a string, an object, even your to-do list (though maybe don’t do that).
To get things rolling, just call subscribe()
, which makes the observable start firing off those values.
Here’s a quick example:
import { Observable } from "rxjs";
const customObservable = new Observable((observer) => {
observer.next("Hi");
observer.next("Mom");
observer.complete();
});
customObservable.subscribe({
next: (value) => console.log(value),
complete: () => console.log(`I'm Done!`),
});
// Output:
// Hi
// Mom
// I'm Done!
Boom! Values are sent, received, and when there’s nothing left to say, complete()
wraps it up like a polite email sign-off.
From DOM Events
Want to react to user clicks?
Just use fromEvent()
.
Pass in the DOM element and the event you care about—like click
.
import { fromEvent } from "rxjs";
const button = document.getElementById("myButton");
const clicks$ = fromEvent(button, "click");
clicks$.subscribe(() => console.log("Button clicked!"));
Now, every time someone clicks the button, it logs “Button clicked!” Simple, right?
You could even hook this up to rage-click counters.
From Promises
Got a promise?
You can turn it into an observable with from()
.
This is super handy when dealing with promise-based libraries.
import { from } from "rxjs";
const promise = new Promise((resolve) =>
setTimeout(() => resolve("Resolved!"), 1000)
);
const observableFromPromise = from(promise);
observableFromPromise.subscribe((value) => console.log(value));
// Converting back to a promise
observableFromPromise.toPromise().then(console.log);
// Output
// Resolved.
This simulates an API call (aka: fake waiting).
After one second, it logs “Resolved!” Oh, and if you ever miss promises that much, you can flip it back with toPromise()
.
Static Values
Need to turn random data into an observable?
Use of()
. It doesn’t care what you throw at it—numbers, strings, booleans, objects, your grocery list…
import { of } from "rxjs";
const staticValues$ = of(1, "RxJS", true, { key: "value" });
staticValues$.subscribe((value) => console.log(value));
// Output:
// 1
// RxJS
// true
// { key: 'value' }
Basically, anything can be part of a stream.
Yes, anything. Even your existential crisis.
Timers and Intervals
If you need to fire events after a delay or at regular intervals, RxJS has your back:
- Timers trigger once after a set delay.
- Intervals keep firing like an overenthusiastic alarm clock.
import { timer, interval } from "rxjs";
// Emits once after 2 seconds
timer(2000).subscribe(() => console.log("Timer fired!"));
// Emits every second
interval(1000).subscribe((count) => console.log(`Count: ${count}`));
timer()
waits politely before saying anything, while interval()
just can’t stop talking—perfect for regular updates (or if you’re trying to annoy your console).
Conclusion
RxJS might feel overwhelming at first, but once you get the hang of it, you’ll wonder how you ever managed async code without it.
Start small, experiment, and soon you’ll be chaining Observables like a boss.
LiveAPI: Super-Convenient API Docs Generation
With LiveAPI, you can quickly generate interactive API documentation that allows users to execute APIs directly from the browser.
Hot vs Cold Observable in RxJS
Understanding the difference between them is crucial when working with reactive programming, as it affects how data is produced and shared among subscribers. Now lets dive into hot and cold observables.
Cold Observables: Fresh Data for Every Subscriber
A cold observable is one where the underlying data is created inside the observable.
This means that each subscription starts a new execution of the observable, producing unique values for each subscriber.
Example of a Cold Observable
import { Observable } from "rxjs";
const coldObservable = new Observable((observer) => {
const randomNum = Math.random(); // Generates a new number per subscription
observer.next(randomNum);
observer.complete();
});
coldObservable.subscribe((value) => console.log("Subscriber 1:", value));
coldObservable.subscribe((value) => console.log("Subscriber 2:", value));
// Output:
// Subscriber 1: 0.645732
// Subscriber 2: 0.927384
Each subscriber gets a different random number because the observable generates a new value each time someone subscribes.
Hot Observables: Shared Data Among Subscribers
A hot observable is one where the data is generated outside the observable.
All subscribers receive the same data and share execution, preventing redundant computations.
Example of a Hot Observable
import { Observable } from "rxjs";
const sharedRandomNum = Math.random();
const hotObservable = new Observable((observer) => {
observer.next(sharedRandomNum);
observer.complete();
});
hotObservable.subscribe((value) => console.log("Subscriber 1:", value));
hotObservable.subscribe((value) => console.log("Subscriber 2:", value));
// Output:
// Subscriber 1: 0.645732
// Subscriber 2: 0.645732
Since the random number is generated before the observable is created, all subscribers receive the same value.
Making a Cold Observable Hot with publish()
Instead of generating data externally, we can use the publish()
operator to convert a cold observable into a hot one.
import { Observable } from "rxjs";
import { publish } from "rxjs/operators";
const coldObservable = new Observable((observer) => {
const randomNum = Math.random();
observer.next(randomNum);
observer.complete();
});
const hotObservable = coldObservable.pipe(publish());
hotObservable.subscribe((value) => console.log("Subscriber 1:", value));
hotObservable.subscribe((value) => console.log("Subscriber 2:", value));
hotObservable.connect(); // Ensures the observable emits shared values
// Output:
// Subscriber 1: 0.845291
// Subscriber 2: 0.845291
Calling connect()
ensures that the observable emits data only once, making it behave like a hot observable.
Completing Observables & Avoiding Memory Leaks
Observables should be properly completed to avoid potential memory leaks, especially when dealing with continuous streams.
Example: Completing an Observable Automatically
Using finally()
, we can detect when an observable completes:
import { timer } from "rxjs";
import { finalize } from "rxjs/operators";
timer(1000)
.pipe(finalize(() => console.log("Observable completed")))
.subscribe(() => console.log("Emitted value"));
Example: Manually Unsubscribing
For infinite observables like interval()
, we must manually unsubscribe to prevent leaks:
import { interval } from "rxjs";
const subscription = interval(1000).subscribe((value) => console.log(value));
setTimeout(() => {
subscription.unsubscribe();
console.log("Unsubscribed from observable");
}, 3000);
After 3 seconds, the subscription stops receiving values, freeing resources.
Conclusion
- Cold observables generate new data for each subscriber.
- Hot observables share data among subscribers.
- You can convert a cold observable into a hot one using
publish()
andconnect()
. - Always handle completion and unsubscribing to prevent memory leaks.
Bonus: Using forkJoin
for Combining Observables
If you need to combine multiple observables and only emit the final values, forkJoin
is your go-to operator.
Example:
import { forkJoin, of, delay } from "rxjs";
const obs1$ = of("rainbows").pipe(delay(2000));
const obs2$ = of("unicorns").pipe(delay(2000));
forkJoin([obs1$, obs2$]).subscribe((values) => console.log(values));
// Output (after 2s): ['rainbows', 'unicorns']
This ensures that the subscription only receives values after both observables have completed.
Handling Errors with catchError
and retry
Error handling is essential in reactive programming. The catchError
operator allows graceful handling of errors, while retry
lets you reattempt a failed observable sequence.
Example:
import { throwError, catchError, retry } from "rxjs";
const faulty$ = throwError(() => new Error("Something went wrong!"));
faulty$
.pipe(
retry(2),
catchError((err) => of(`Error caught: ${err.message}`))
)
.subscribe((value) => console.log(value));
// Output (after 2 retries): Error caught: Something went wrong!
LiveAPI: Super-Convenient API Docs Generation
With LiveAPI, you can quickly generate interactive API documentation that allows users to execute APIs directly from the browser.
Operators in RxJS
map
The map
operator is your go-to when you need to transform data.
Think of it like JavaScript's Array.map
, but for Observables.
Example:
import { of } from "rxjs";
import { map } from "rxjs/operators";
const jsonStr = '{ "greetType": "Hi", "familyMember": "Mom" }';
of(jsonStr)
.pipe(map((json) => JSON.parse(json)))
.subscribe((obj) => {
console.log(obj.greetType);
console.log(obj.familyMember);
});
// Output:
// Hi
// Mom
Here, we're transforming a JSON string into a usable JavaScript object.
Perfect when handling API responses.
tap
tap
lets you perform side effects without affecting the actual data stream.
It's great for debugging.
Example:
import { of } from "rxjs";
import { tap, map } from "rxjs/operators";
of("rxjs")
.pipe(
tap((value) => console.log(`Original: ${value}`)),
map((value) => value.toUpperCase()),
tap((value) => console.log(`Transformed: ${value}`))
)
.subscribe();
You’ll see both the original and transformed values in the console.
Super handy for peeking into the data flow.
filter
filter
does exactly what you think: it filters data based on a condition.
Example:
import { from } from "rxjs";
import { filter } from "rxjs/operators";
from([1, 2, 3, 4, 5])
.pipe(filter((num) => num % 2 === 0))
.subscribe(console.log);
// Output:
// 2, 4
Only even numbers make it through.
It's like the bouncer of your data stream.
debounceTime
& throttleTime
Both operators control the rate of emitted values but behave differently:
debounceTime
emits the last value after a delay.throttleTime
emits the first value, then ignores subsequent values for the set time.
Example:
import { fromEvent } from "rxjs";
import { debounceTime, throttleTime } from "rxjs/operators";
const input = document.getElementById("search");
fromEvent(input, "input")
.pipe(debounceTime(500))
.subscribe(() => console.log("Debounced input:", input.value));
fromEvent(input, "input")
.pipe(throttleTime(1000))
.subscribe(() => console.log("Throttled input:", input.value));
Try typing fast to see the difference.
debounceTime
waits until you pause; throttleTime
logs intermittently.
scan
scan
accumulates values over time, similar to reduce
in JavaScript.
Example:
import { fromEvent } from "rxjs";
import { map, scan } from "rxjs/operators";
fromEvent(document, "click")
.pipe(
map(() => 1),
scan((acc, curr) => acc + curr, 0)
)
.subscribe((count) => console.log(`Total clicks: ${count}`));
Every click increments the total count.
Simple yet powerful for cumulative tasks.
switchMap
switchMap
is ideal when you need to cancel previous requests and switch to a new one.
Example:
import { fromEvent, interval } from "rxjs";
import { switchMap } from "rxjs/operators";
fromEvent(document, "click")
.pipe(switchMap(() => interval(1000)))
.subscribe(console.log);
Clicking resets the interval.
Perfect for scenarios like search suggestions or live data feeds.
takeUntil
takeUntil
stops emissions when another Observable emits a value.
Example:
import { interval, timer } from "rxjs";
import { takeUntil } from "rxjs/operators";
const source$ = interval(500);
const stopper$ = timer(3000);
source$.pipe(takeUntil(stopper$)).subscribe({
next: console.log,
complete: () => console.log("Completed!"),
});
The interval runs until the timer fires after 3 seconds, then completes automatically.
takeWhile
takeWhile
emits values as long as a condition is true.
Example:
import { from } from "rxjs";
import { takeWhile } from "rxjs/operators";
from(["Alice", "Bob", "Charlie", "Doug", "Eve"])
.pipe(takeWhile((name) => name !== "Doug"))
.subscribe(console.log);
It stops emitting once it hits "Doug".
Great for conditional data flows.
first
The first
operator extracts only the very first value emitted by an observable and then completes.
Example:
import { of, first } from "rxjs";
const numbers$ = of(-3, -2, -1, 0, 1, 2, 3);
numbers$.pipe(first()).subscribe((value) => console.log(value));
// Output: -3
In this case, first()
ensures that only -3
, the first value emitted by numbers$
, is logged.
Applying Conditions
You can also pass a predicate function to first()
, which selects the first value that satisfies a condition:
numbers$
.pipe(first((value) => value > 0))
.subscribe((value) => console.log(value));
// Output: 1
last
The last
operator extracts only the last emitted value before the observable completes.
Example:
import { last } from "rxjs";
numbers$.pipe(last()).subscribe((value) => console.log(value));
// Output: 3
Here, last()
ensures that only 3
, the last value from numbers$
, is logged.
Applying Conditions
Just like first()
, you can pass a condition to last()
:
numbers$
.pipe(last((value) => value < 0))
.subscribe((value) => console.log(value));
// Output: -1
Here’s a friendly explanation of the pipe
method that fits well with your blog’s casual tone:
How Does pipe
Works
So, you’ve seen pipe
popping up everywhere in RxJS, right? It’s like the conveyor belt of an Observable. You take a stream of data, send it through a series of transformations, and out comes the final processed value.
Think of it like assembling a burger :
- Start with the base (Observable)
- Add layers (operators like
map
,filter
, etc.) - Get the final delicious result (processed data)
import { range } from "rxjs";
import { map, filter, scan } from "rxjs/operators";
const source$ = range(0, 10);
source$
.pipe(
filter((x) => x % 2 === 0), // Keep only even numbers
map((x) => x * 2), // Double them
scan((acc, x) => acc + x, 0) // Keep a running sum
)
.subscribe(console.log);
Step 1: Creating an Observable
const source$ = range(0, 10);
range(0, 10)
creates an Observable that emits numbers from0
to9
(inclusive).- This is our source Observable (
source$
), which we will process using operators.
Step 2: Applying Pipeable Operators
We use the .pipe()
method to transform the emitted values step by step.
1️⃣ filter(x => x % 2 === 0)
- This filters out odd numbers, keeping only even ones.
- It passes only numbers where
x % 2 === 0
(i.e., numbers divisible by 2). - Emitted values after filtering:
0, 2, 4, 6, 8
2️⃣ map(x => x + x)
- This transforms each emitted value by doubling it.
- Mapping result:
0 → 0
2 → 4
4 → 8
6 → 12
8 → 16
- Emitted values after mapping:
0, 4, 8, 12, 16
3️⃣ scan((acc, x) => acc + x, 0)
-
Works like
reduce()
, but emits the accumulated result at each step. -
It maintains an accumulator (
acc
), starting from0
. -
It adds each emitted value to the accumulator and emits the running total.
Step-by-step accumulation:
0 + 0 = 0
0 + 4 = 4
4 + 8 = 12
12 + 12 = 24
24 + 16 = 40
-
Final emitted values:
0, 4, 12, 24, 40
Step 3: Subscribing to the Observable
.subscribe(x => console.log(x));
- This listens to the Observable and logs each emitted value to the console.
Final Output in Console:
0
4
12
24
40
Without pipe
, you’d have a messy chain of nested calls.
With pipe
, everything stays clean, readable, and functional.
Conclusion
RxJS operators are the real magic behind reactive programming.
They help you transform, filter, combine, and control data streams effortlessly.
Subject and Multicast in RxJS
What Is a Subject?
A Subject in RxJS is essentially an Observable with extra features. It allows you to:
- Emit new data to subscribers at any time using
.next()
. - Act as both an Observable and an Observer (you can subscribe to it and push values into it).
- Broadcast values to multiple subscribers, unlike a regular Observable that sends data individually per subscription.
Example: Creating and Using a Subject
import { Subject } from "rxjs";
const subject = new Subject();
// Subscribers
subject.subscribe((value) => console.log(`Subscriber 1: ${value}`));
subject.subscribe((value) => console.log(`Subscriber 2: ${value}`));
// Emit values
subject.next("Hello, RxJS!");
subject.next("Subjects are powerful!");
How It Works:
- Two subscribers listen to the same Subject.
- When we call
.next()
, both subscribers receive the value simultaneously. - Unlike standard Observables, we don’t need a separate producer—we control emissions directly.
This makes Subjects great for real-time data streaming, such as WebSocket updates, user interactions, or shared state management.
Multicasting: Avoiding Unnecessary Side Effects
By default, Observables create a new execution for each subscription, meaning if you have multiple subscribers, the data source runs multiple times.
Example: Problem with Multiple Subscribers
import { Observable } from "rxjs";
const observable = new Observable((subscriber) => {
console.log("New subscriber - fetching data...");
subscriber.next(Math.random()); // Simulating an API call
});
observable.subscribe((value) => console.log(`Subscriber 1: ${value}`));
observable.subscribe((value) => console.log(`Subscriber 2: ${value}`));
Output:
New subscriber - fetching data...
Subscriber 1: 0.12345
New subscriber - fetching data...
Subscriber 2: 0.67890
Each subscription triggers a new execution, leading to redundant API calls, computations, or event listeners. This is where multicasting comes in.
Using .multicast()
to Share Data
What is .multicast()
?
.multicast()
is an operator that allows multiple subscribers to share a single execution of an Observable, avoiding unnecessary repetitions.
Example: Multicasting Click Events
import { fromEvent, Subject } from "rxjs";
import { tap, multicast } from "rxjs/operators";
const clicks = fromEvent(document, "click").pipe(
tap(() => console.log("Click event triggered")), // Side effect
multicast(() => new Subject())
);
const subscription = clicks.connect();
clicks.subscribe(() => console.log("Subscriber 1 received click event"));
clicks.subscribe(() => console.log("Subscriber 2 received click event"));
Why This Works Better:
- Instead of logging
"Click event triggered"
twice per click, it runs only once. - Both subscribers receive the same event data.
.connect()
ensures the shared execution starts immediately.
Subjects vs. Multicasting: When to Use What?
Feature | Subjects | Multicasting |
---|---|---|
Control over emissions | ✅ Yes | ❌ No (depends on source Observable) |
Broadcast values to multiple subscribers | ✅ Yes | ✅ Yes |
Avoid redundant execution | ❌ No | ✅ Yes |
Ideal for state sharing | ✅ Yes | ❌ No |
- Use Subjects when you need a manually controlled data source (e.g., WebSocket messages, user actions).
- Use Multicasting when you have a single source (like an API or event listener) but multiple subscribers.
Conclusion
Subjects and multicasting are powerful tools in RxJS that allow efficient data sharing.
Whether you need real-time data propagation with Subjects or execution optimization with Multicasting, these concepts make reactive programming more efficient.
While exploring mkdocs-material implementation, I've been learning how to adapt these techniques for LiveAPI, a product I've been passionately working on for quite a while.
With LiveAPI, you can quickly generate interactive API documentation that allows users to execute APIs directly from the browser.
If you’re tired of manually creating docs for your APIs, this tool might just make your life easier.