Observables are the foundation of RxJS. Everything to do with RxJS revolves around Observables. In this article, we will look at the many different methods of creating Observables provided to us by RxJS.
There are two main methods to create Observables in RxJS. Subjects and Operators. We will take a look at both of these!
What is an Observable?
But first, what is an Observable?
Observables are like functions with zero arguments that push multiple values to their Observers, either synchronously or asynchronously.
This can be kind of confusing, so let's take a very basic example of an Observable that pushes 4 values to any of its Observers.
const obs$ = Observable.create((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => observer.next(4), 1000);
});
console.log("before subscribe");
const observer = obs$.subscribe((v) => console.log("received: ", v));
console.log("after subscribe");
In the example above, we create the Observable and tell it to send 1, 2 and 3 to it's Observer immediately when it subscribes to the Observable. These are the synchronous calls.
However, 4 doesn't get sent until 1 second later, occurring after we've logged after subscribe
, making this an async operation.
You can see this in the output:
before subscribe
received: 1
received: 2
received: 3
after subscribe
received: 4
The Observer will keep receiving values until the Observable notifies it that it has completed pushing values. If we modify the example above, we can see this in action.
const obs$ = Observable.create((observer) => {
observer.next(1);
observer.next(2);
observer.complete();
observer.next(3);
setTimeout(() => observer.next(4), 1000);
});
console.log("before subscribe");
obs$.subscribe((v) => console.log("received: ", v));
console.log("after subscribe");
We've added a call to observer.complete();
after observer.next(2)
which will notify the Observer that the Observer has finished pushing values.
Take a look at the new output:
before subscribe
received: 1
received: 2
after subscribe
We can see that even though we try to push the values 3 and 4 to the Observer, the Observer does not receive them.
A method of creating an Observable using the static create
method is illustrated above. Now, we will take a look at creating Observables with Subjects and Operators.
Creating Observables with Subjects
A Subject can be thought of as a combination of EventEmitters and Observables. They act like both. An Observer can subscribe
to a Subject to receive the values it pushes, while you can use the Subject directly to push new values to each Observer, or to tell each Observer that the Subject has completed pushing values.
There are 4 types of Subjects that RxJS exposes to us. We'll take a look at each in turn.
Subject
Subject
is the most basic Subject that we can use to create Observables. It's very simple to use, and we can use it to push values to all Observers that are subscribed to it. Each Observer will only receive values that are pushed by the Subject after the Observer has subscribed.
Let's see this in action.
const subject$ = new Subject();
const observerA = subject$.subscribe((v) => console.log("Observer A: ", v));
const observerB = subject$.subscribe((v) => console.log("Observer B: ", v));
subject$.next(1);
const observerC = subject$.subscribe((v) => console.log("Observer C: ", v))
subject$.next(2);
We start by creating the subject, then create two Observers that will log each value they receive from the Subject (Observable).
We tell the Subject to push the value 1
.
We then create ObserverC
which also logs each value it receives from the Subject.
Finally, we tell the Subject to push the value 2
.
Now, take a look at the output of this:
Observer A: 1
Observer B: 1
Observer A: 2
Observer B: 2
Observer C: 2
We can see that ObserverA
and ObserverB
both received 1
but ObserverC
only received 2
, highlighting that Observers of the basic Subject
will only receive values that are pushed after they have subscribed!
BehaviorSubject
Another type of Subject we can use is BehaviorSubject
. It works exactly the same as the basic Subject
with one key difference. It has a sense of a current value. When the Subject pushes a new value, it stores this value internally. When any new Observer subscribes to the BehaviorSubject
, it will immediately send them the last value that it pushed to its Observers.
If we take the example we used for Subject
and change it to use a BehaviorSubject
we can see this functionality in action:
const behaviorSubject$ = new BehaviorSubject();
const observerA = behaviorSubject$.subscribe((v) => console.log("Observer A: ", v));
const observerB = behaviorSubject$.subscribe((v) => console.log("Observer B: ", v));
behaviorSubject$.next(1);
const observerC = behaviorSubject$.subscribe((v) => console.log("Observer C: ", v))
behaviorSubject$.next(2);
Let's see the output to see the difference:
Observer A: 1
Observer B: 1
Observer C: 1
Observer A: 2
Observer B: 2
Observer C: 2
We can see that ObserverC
was sent the value 1
even though it subscribed to the BehaviorSubject after the 1
was pushed.
ReplaySubject
The ReplaySubject
is very similar to the BehaviorSubject
in that it can remember the values it has pushed and immediately send them to new Observers that have subscribed. However, it allows you to specify how many values it should remember and will send all these values to each new Observer that subscribes.
If we modify the example above slightly, we can see this functionality in action:
const replaySubject$ = new ReplaySubject(2); // 2 - number of values to store
const observerA = replaySubject$.subscribe((v) => console.log("Observer A: ", v));
replaySubject$.next(1);
replaySubject$.next(2);
replaySubject$.next(3);
const observerB = replaySubject$.subscribe((v) => console.log("Observer B: ", v))
replaySubject$.next(4);
This time, we are going to have the ReplaySubject
push 4 values to its Observers. We also tell it that it should always store the two latest values it emitted.
Let's take a look at the output:
Observer A: 1
Observer A: 2
Observer A: 3
Observer B: 2
Observer B: 3
Observer A: 4
Observer B: 4
We see that ObserverA
receives the first 3 values perfectly fine. Then ObserverB
subscribes to the ReplaySubject
and it is immediately sent the values 2
and 3
, which were the last two values the Subject had pushed. Then both Observers receive the next value of 4
correctly.
AsyncSubject
The AsyncSubject
exposes all the same methods as Subject
, however it works differently. It only ever sends the last value it has been told to push to its Observers, and it will only do this when the Subject is completed (by calling complete()
). Therefore, Observers only receive values when the Subject completes and any Observers that subscribe after will immediately receive the value it pushed when it completed.
We can see this in action:
const asyncSubject$ = new AsyncSubject(2);
const observerA = asyncSubject$.subscribe((v) =>
console.log("Observer A: ", v)
);
asyncSubject$.next(1);
asyncSubject$.next(2);
const observerB = asyncSubject$.subscribe((v) =>
console.log("Observer B: ", v)
);
asyncSubject$.next(3);
asyncSubject$.complete();
const observerC = asyncSubject$.subscribe((v) =>
console.log("Observer C: ", v)
);
The output of this is:
Observer A: 3
Observer B: 3
Observer C: 3
We can see that although ObserverA
had subscribed before any values were pushed, it only received 3
, the last one. We can also see that ObserverC
also immediately received the value 3
even though it subscribed after the AsyncSubject had completed.
Creating Observables with Operators
An alternative method of creating Observables comes from the operators that RxJS exposes. These operators can be categorized based on their intention. In this article, we are going to look at the Creation Operators, so named as they create Observables.
You can see a list of these operators here: http://reactivex.io/rxjs/manual/overview.html#creation-operators
ajax
ajax
is an operator that creates an Observable to handle AJAX Requests. It takes either a request object with URL, Headers etc or a string for a URL. Once the request completes, the Observable completes. This allows us to make AJAX requests and handle them reactively.
const obs$ = ajax("https://api.github.com/users?per_page=2");
obs$.subscribe((v) => console.log("received: ", v.response));
The output of this will be:
received: (2) [Object, Object]
bindCallback
bindCallback
allows you to take any function that usually uses a callback approach and transform it into an Observable. This can be quite difficult to wrap your head around, so we'll break it down with an example:
// Let's say we have a function that takes two numbers, multiplies them
// and passes the result to a callback function we manually provide to it
function multiplyNumbersThenCallback(x, y, callback) {
callback(x * y);
}
// We would normally use this function as shown below
multiplyNumbersThenCallback(3, 4, (value) =>
console.log("Value given to callback: ", value)
);
// However, with bindCallback, we can turn this function into
// a new function that takes the same arguments as the original
// function, but without the callback function
const multiplyNumbers = bindCallback(multiplyNumbersThenCallback);
// We call this function with the numbers we want to multiply
// and it returns to us an Observable that will only push
// the result of the multiplication when we subscribe to it
multiplyNumbers(3, 4).subscribe((value) =>
console.log("Value pushed by Observable: ", value)
);
By using bindCallback
, we can take functions that use a Callback API and transform them into reactive functions that create Observables that we can subscribe to.
defer
defer
allows you to create an Observable only when the Observer subscribes to it. It will create a new Observable for each Observer, meaning they do not share the same Observable even if it appears that they do.
const defferedObs$ = defer(() => of([1, 2, 3]));
const observerA = defferedObs$.subscribe((v) => console.log("Observer A: ", v));
const observerB = defferedObs$.subscribe((v) => console.log("Observer B: ", v));
This outputs:
Observer A: (3) [1, 2, 3]
Observer B: (3) [1, 2, 3]
Both Observers received an Observable with the same values pushed from it. These are actually different Observables even though they pushed the same values. We can illustrate that defer
creates different Observables for each Observer by modifying the example:
let numOfObservers = 0;
const defferedObs$ = defer(() => {
if(numOfObservers === 0) {
numOfObservers++;
return of([1, 2, 3]);
}
return of([4,5,6])
});
const observerA = defferedObs$.subscribe((v) => console.log("Observer A: ", v));
const observerB = defferedObs$.subscribe((v) => console.log("Observer B: ", v));
We've changed the defer
object to give the first Observer an Observable of [1, 2, 3]
and any other Observers [4, 5, 6]
. Which we can then see in the output:
Observer A: (3) [1, 2, 3]
Observer B: (3) [4, 5, 6]
empty
The empty
operator creates an Observable that pushes no values and immediately completes when subscribed to:
const obs$ = empty();
obs$.subscribe((v) => console.log("received: ", v));
This produces NO output as it never pushes a value.
from
from
is a powerful operator. It can convert almost anything into an Observable, and pushes the values from these sources in an intelligent manner, based on the source itself.
We'll take two examples- an array and an iterable from a generator:
const obs$ = from([1,2,3]);
obs$.subscribe((v) => console.log("received: ", v));
With an array, from
will take each element in the array and push them separately:
received: 1
received: 2
received: 3
Similarly, with the iterable from the generator, we will get each value separately:
function* countToTen() {
let i = 0;
while(i < 11) {
yield i;
i++;
}
}
const obs$ = from(countToTen());
obs$.subscribe((v) => console.log("received: ", v));
If we create a generator that counts to 10, then from
will push each number from 0-10:
received: 0
received: 1
received: 2
received: 3
received: 4
received: 5
received: 6
received: 7
received: 8
received: 9
received: 10
fromEvent
The fromEvent
operator will create an Observable that pushes a every event of a specified type that has occurred on a specified event target, such as every click on a webpage.
We can set this up very easily:
const obs$ = fromEvent(document, "click");
obs$.subscribe(() => console.log("received click!"));
Every time you click on the page it logs "received click!":
received click!
received click!
fromEventPattern
The fromEventPattern
is similar to the fromEvent
operator in that it works with events that have occurred. However, it takes two arguments. An addHandler
function argument and a removeHandler
function argument.
The addHandler
function is called when the Observable is subscribed to, and the Observer that has subscribed will receive every event that is set up in the addHandler
function.
The removeHandler
function is called when the Observer unsubscribes
from the Observable.
This sounds more confusing than it actually is. Let's use the example above where we want to get all clicks that occur on the page:
function addHandler(handler) {
document.addEventListener('click', handler)
}
function removeHandler(handler) {
document.removeEventListener('click', handler)
}
const obs$ = fromEventPattern(addHandler, removeHandler);
obs$.subscribe(() => console.log("received click!"));
Every time you click on the page it logs "received click!":
received click!
received click!
generate
This operator allows us to set up an Observable that will create values to push based on the arguments we pass to it, with a condition to tell it when to stop.
We can take our earlier example of counting to 10 and implement it with this operator:
const obs$ = generate(
1,
(x) => x < 11,
(x) => x++
)
obs$.subscribe((v) => console.log("received: ", v));
This outputs:
received: 0
received: 1
received: 2
received: 3
received: 4
received: 5
received: 6
received: 7
received: 8
received: 9
received: 10
interval
The interval
operator creates an Observable that pushes a new value at a set interval of time. The example below shows how we can create an Observable that pushes a new value every second:
const obs$ = interval(1000);
obs$.subscribe((v) => console.log("received: ", v));
Which will log a new value every second:
received: 0
received: 1
received: 2
never
The never
operator creates an Observable that never pushes a new value, never errors, and never completes. It can be useful for testing or composing with other Observables.
const obs$ = never();
// This never logs anything as it never receives a value
obs$.subscribe((v) => console.log("received: ", v));
of
The of
operator creates an Observable that pushes values you supply as arguments in the same order you supply them, and then completes.
Unlike the from
operator, it will NOT take every element from an array and push each. It will, instead, push the full array as one value:
const obs$ = of(1000, [1,2,4]);
obs$.subscribe((v) => console.log("received: ", v));
The output of which is:
received: 1000
received: (3) [1, 2, 4]
range
The range
operator creates an Observable that pushes values in sequence between two specified values. We'll take our count to 10 example again, and show how it can be created using the range
operator:
const obs$ = range(0, 10);
obs$.subscribe((v) => console.log("received: ", v));
The output of this is:
received: 0
received: 1
received: 2
received: 3
received: 4
received: 5
received: 6
received: 7
received: 8
received: 9
received: 10
throwError
The throwError
operator creates an Observable that pushes no values but immediately pushes an error notification. We can handle errors thrown by Observables gracefully when an Observer subscribes to the Observable:
const obs$ = throwError(new Error("I've fallen over"));
obs$.subscribe(
(v) => console.log("received: ", v),
(e) => console.error(e)
);
The output of this is:
Error: I've fallen over
timer
timer
creates an Observable that does not push any value until after a specified delay. You can also tell it an interval time, wherein after the initial delay, it will push increasing values at each interval.
const obs$ = timer(3000, 1000);
obs$.subscribe((v) => console.log("received: ", v));
The output starts occurring after 3 seconds and each log is 1 second apart
received: 0
received: 1
received: 2
received: 3
Hopefully, you have been introduced to new methods of creating Observables that will help you when working with RxJS in the future! There are some Creation Operators that can come in super handy for nuanced use-cases, such as bindCallback
and fromEvent
.