first#
The first()
operator is used to get only the first value emitted by a source Observable that meets an optional condition. After emitting that single value, it immediately completes the stream.
It can be used in a few ways:
first()
(no arguments): Emits the very first value from the source, then completes. If the source completes without emitting any values,first()
will emit anEmptyError
.first(predicateFn)
: Emits the first value from the source for which the providedpredicateFn
function returnstrue
, then completes. If the source completes before any value satisfies the predicate,first()
will emit anEmptyError
.first(predicateFn, defaultValue)
: Same as above, but if the source completes before any value satisfies the predicate, it emits the provideddefaultValue
instead of erroring, and then completes.
Key Characteristics#
- Selects First Value: Emits only one value – the first that qualifies based on the arguments.
- Completes Stream: Immediately completes after emitting the single value (or default value/error).
- Unsubscribes from Source: Cleans up the source subscription upon completion or error.
- Potential Error: Can throw
EmptyError
if no suitable value is found before the source completes (unless adefaultValue
is supplied).
Real-World Example Scenario#
Imagine your Angular application needs to load some initial configuration data when it starts. This data might be available via an Observable (perhaps from a service using ReplaySubject(1)
or BehaviorSubject
). You only care about getting that first available configuration value to initialize your component, even if the source Observable might emit updates later. first()
is ideal for grabbing that initial emission and then completing the stream cleanly.
Scenario: Let's simulate a stream emitting user status updates ('pending', 'active', 'inactive'). We only want to know the first time the user becomes 'active'.
Code Snippet#
import { Component, OnInit } from "@angular/core";
import {
of,
first,
catchError,
EMPTY,
throwError,
timer,
map,
concat,
} from "rxjs";
import { EmptyError } from "rxjs";
@Component({
selector: "app-first-demo",
standalone: true,
imports: [],
template: `
<h4>First Operator Demo</h4>
<p>Looking for the first 'active' status. Check console.</p>
<p>Result: {{ resultStatus }}</p>
`,
})
export class FirstDemoComponent implements OnInit {
resultStatus = "Waiting...";
ngOnInit(): void {
const statusUpdates$ = concat(
timer(500).pipe(map(() => "pending")),
timer(1000).pipe(map(() => "pending")),
timer(1500).pipe(map(() => "active")), // First 'active' here
timer(2000).pipe(map(() => "pending")),
timer(2500).pipe(map(() => "inactive"))
);
console.log(
`[${new Date().toLocaleTimeString()}] Subscribing to find first 'active' status...`
);
statusUpdates$
.pipe(
first((status) => status === "active"),
catchError((error) => {
if (error instanceof EmptyError) {
console.warn(
`[${new Date().toLocaleTimeString()}] No 'active' status found before stream completed.`
);
this.resultStatus = "No active status found.";
return EMPTY;
} else {
console.error(
`[${new Date().toLocaleTimeString()}] Stream error:`,
error
);
this.resultStatus = `Error: ${error.message}`;
return throwError(() => error);
}
})
)
.subscribe({
next: (activeStatus) => {
console.log(
`[${new Date().toLocaleTimeString()}] Found first active status: ${activeStatus}`
);
this.resultStatus = `First active status: ${activeStatus}`;
},
complete: () => {
console.log(
`[${new Date().toLocaleTimeString()}] Stream completed by first().`
);
// Note: resultStatus might already be set by next or catchError
if (this.resultStatus === "Waiting...") {
this.resultStatus =
"Stream completed (likely handled by catchError/default)";
}
},
});
}
}
Explanation:
statusUpdates$
: We simulate a stream emitting different status strings over time usingconcat
andtimer
.first(status => status === 'active')
: This operator listens tostatusUpdates$
.- It ignores 'pending'.
- When 'active' arrives,
first()
emits 'active'. - Immediately after emitting 'active', it sends the
complete
signal and unsubscribes fromstatusUpdates$
. The subsequent 'pending' and 'inactive' emissions are never processed by this subscription.
catchError(...)
: This handles theEmptyError
thatfirst()
would throw if thestatusUpdates$
completed before emitting 'active'. In this example, 'active' is found, so this specific error path isn't taken.subscribe({...})
:- The
next
handler receives the single value 'active'. - The
complete
handler is called right afternext
, confirming the stream finished.
- The
Summary#
first()
is used when you need exactly one value from the beginning of a stream (optionally matching a condition) and want the stream to complete immediately afterward. It's concise for getting initial values or the first occurrence of a specific event. Remember its potential to throw EmptyError
if no qualifying value is emitted before the source completes.