subject
Think of a Subject
as a special kind of Observable that acts like both an Observable and an Observer:
- As an Observable: You can
subscribe
to it just like any other Observable to receive values it emits. - As an Observer: You can manually push values into it by calling its
next(value)
method. You can also make it emit an error witherror(err)
or signal completion withcomplete()
.
The key characteristic of a Subject is multicasting. This means that when the Subject emits a value (because next()
was called on it), it pushes that value to all of its current subscribers simultaneously. This is different from plain ("cold") Observables (like those from HttpClient
or interval
), which typically start a new, independent execution for each subscriber.
Analogy#
Imagine a live radio broadcast.
- The Radio Station Announcer is the code calling
subject.next("Breaking news!")
. - The Radio Station's Broadcast Tower is the
Subject
itself. - The Listeners tuning their radios are the
Subscribers
.
When the announcer speaks into the microphone (next()
), the tower (Subject
) broadcasts that message live, and all listeners (Subscribers
) who are currently tuned in hear the same message at the same time. Listeners who tune in later will only hear broadcasts from that point forward; they miss the earlier messages (this is specific to the basic Subject
type).
Why Use a Subject?#
- Event Bus: To create a simple way for different parts of your application (like unrelated components) to communicate through a shared service. One part calls
next()
on the Subject, and other parts listening to that Subject react. - Bridging: To take values or events from non-Observable sources (like imperative button clicks, WebSocket messages, etc.) and push them into an Observable stream for further processing with RxJS operators.
- Sharing Observable Executions: While there are operators like
shareReplay
often better suited for sharing the result of an Observable, a Subject can sometimes be used to manually control and share a single subscription's output.
Real-World Example: Cross-Component Communication#
Let's say you have a header component with a "Login" button and a user profile component elsewhere on the page. When the user successfully logs in (maybe via a popup triggered from the header), you want the user profile component to update immediately without a page refresh.
We can use a shared service with a Subject to announce the login status change.
Code Snippets:
1. Shared Authentication Service (auth.service.ts
)
import { Injectable } from "@angular/core";
import { Subject, Observable } from "rxjs";
export interface User {
id: string;
name: string;
}
@Injectable({
providedIn: "root",
})
export class AuthService {
// 1. Private Subject: Only the service can push values into it.
// We use 'User | null' to indicate either a logged-in user or logged-out state.
private userLoginStatusSubject = new Subject<User | null>();
// 2. Public Observable: Components subscribe to this to listen for changes.
// '.asObservable()' hides the .next(), .error(), .complete() methods from consumers.
userLoginStatus$: Observable<User | null> =
this.userLoginStatusSubject.asObservable();
// Simulate a login process
login(username: string): void {
console.log("AuthService: Attempting login...");
// In a real app, this would involve an HTTP call, password check etc.
setTimeout(() => {
const fakeUser: User = { id: "user123", name: username };
console.log("AuthService: Login successful, broadcasting user.");
// 3. Broadcasting the change: Push the new user data into the Subject.
this.userLoginStatusSubject.next(fakeUser);
}, 1000); // Simulate network delay
}
// Simulate a logout process
logout(): void {
console.log("AuthService: Logging out, broadcasting null user.");
// 4. Broadcasting the change: Push 'null' into the Subject.
this.userLoginStatusSubject.next(null);
}
}
2. Header Component - Triggers Login/Logout
import { Component, inject } from "@angular/core";
import { AuthService } from "./auth.service"; // Adjust path if needed
@Component({
selector: "app-header",
standalone: true,
template: `
<nav>
<span>My App</span>
<button (click)="loginUser()">Login</button>
<button (click)="logoutUser()">Logout</button>
</nav>
`,
styles: [
"nav { display: flex; justify-content: space-between; padding: 10px; background-color: #eee; }",
],
})
export class HeaderComponent {
private authService = inject(AuthService);
loginUser(): void {
// Prompt or fixed user for simplicity
const username = prompt("Enter username", "Alice");
if (username) {
this.authService.login(username);
}
}
logoutUser(): void {
this.authService.logout();
}
}
3. User Profile Component - Listens for Changes
import {
Component,
inject,
signal,
OnInit,
DestroyRef,
ChangeDetectionStrategy,
} from "@angular/core";
import { CommonModule } from "@angular/common"; // Needed for @if
import { takeUntilDestroyed } from "@angular/core/rxjs-interop";
import { AuthService, User } from "./auth.service"; // Adjust path if needed
@Component({
selector: "app-user-profile",
standalone: true,
imports: [CommonModule], // Import CommonModule
template: `
<div class="profile-status">
<h4>User Status</h4>
@if (loggedInUser()) {
<p>Welcome, {{ loggedInUser()?.name }}! (ID: {{ loggedInUser()?.id }})</p>
} @else {
<p>You are currently logged out.</p>
}
</div>
`,
styles: [
`
.profile-status {
border: 1px solid lightblue;
padding: 10px;
margin-top: 10px;
}
`,
],
changeDetection: ChangeDetectionStrategy.OnPush, // Good practice with signals/observables
})
export class UserProfileComponent implements OnInit {
private authService = inject(AuthService);
private destroyRef = inject(DestroyRef);
// Use a signal to hold the user state for the template
loggedInUser = signal<User | null>(null);
ngOnInit(): void {
// Subscribe to the service's Observable
this.authService.userLoginStatus$
.pipe(
// Automatically unsubscribe when the component is destroyed
takeUntilDestroyed(this.destroyRef)
)
.subscribe((user) => {
// Update the signal when a new status is broadcast by the Subject
console.log("UserProfileComponent received user status:", user);
this.loggedInUser.set(user);
});
}
}
4. App Component (Hosting the others)
import { Component } from "@angular/core";
import { HeaderComponent } from "./header.component"; // Adjust path
import { UserProfileComponent } from "./user-profile.component"; // Adjust path
@Component({
selector: "app-root",
standalone: true,
imports: [HeaderComponent, UserProfileComponent], // Import components
template: `
<h1>RxJS Subject Demo</h1>
<app-header></app-header>
<app-user-profile></app-user-profile>
<!-- You could even add a second profile component instance -->
<!-- <app-user-profile></app-user-profile> -->
<!-- Both would update simultaneously! -->
`,
})
export class AppComponent {}
Explanation:
- The
AuthService
creates aSubject
(userLoginStatusSubject
) to manage the login state. - It exposes only an
Observable
(userLoginStatus$
) derived from the subject using.asObservable()
. This is good practice – it prevents components from accidentally callingnext()
on the service's subject. Only the service itself controls when broadcasts happen. - When
authService.login()
orauthService.logout()
is called (triggered byHeaderComponent
), the service callsthis.userLoginStatusSubject.next(...)
, pushing the new user data (ornull
) into the Subject. - The
UserProfileComponent
subscribes toauthService.userLoginStatus$
in itsngOnInit
. - Whenever the Subject broadcasts a new value, the subscription in
UserProfileComponent
receives it, and updates theloggedInUser
signal, causing the component's template to reactively display the current status. takeUntilDestroyed
ensures the subscription is cleaned up when theUserProfileComponent
is destroyed.
This demonstrates how a Subject acts as a central hub (multicasting) to notify multiple interested parties (subscribers) about events happening elsewhere in the application.