replaySubject
Think of a ReplaySubject
as a Subject
that records a history of the values that have passed through it. When a new Observer subscribes, the ReplaySubject
immediately sends ("replays") a specified number of the most recent values from its recording to that new subscriber.
Key Features#
- Records Values: It keeps a buffer of the last
n
values that were emitted vianext()
. - Replays on Subscription: New subscribers immediately receive the buffered values (up to the specified buffer size) in the order they were originally emitted.
- Configurable Buffer: You specify the buffer size when creating it (e.g.,
new ReplaySubject<string>(3)
will store and replay the last 3 values). - Optional Time Window: You can also specify a
windowTime
(in milliseconds) along with the buffer size. This makes it replay values that were emitted within that time window and are within the buffer size limit. - No Initial Value Required: Unlike
BehaviorSubject
, it doesn't need a starting value. - Multicasting: Like all Subjects, it pushes new values (
next()
) to all current subscribers simultaneously after the initial replay.
Analogy#
Imagine a meeting recorder or a chat log:
- When someone speaks (
next()
is called), the recorder captures it. - It keeps a log of the recent conversation (the buffer).
- If someone joins the meeting late (subscribes), they can quickly catch up by listening to the recording of the last few minutes/points (the replay). They get the context they missed.
- After catching up, they hear the live conversation (
next()
emissions) along with everyone else.
Why Use a ReplaySubject
?#
- Caching Recent Events: When you need to ensure that consumers who subscribe later still get access to the most recent few events or data points, not just the single latest one (like
BehaviorSubject
) or none (likeSubject
). - Late Subscribers Needing Context: Useful in scenarios like:
- A log of recent user actions.
- A stream of notifications where seeing the last few is important.
- Data streams where events happen quickly, and a subscriber might miss some if they aren't connected constantly.
Real-World Example: Recent Activity Log Service#
Let's create a service that logs recent significant actions within the application (e.g., "Item Added", "Settings Saved"). We want components that display this log to show the last 5 actions, even if the component loads after those actions have occurred.
Code Snippets#
1. Activity Log Service (activity-log.service.ts
)
import { Injectable } from "@angular/core";
import { ReplaySubject, Observable } from "rxjs";
export interface LogEntry {
message: string;
timestamp: Date;
}
@Injectable({
providedIn: "root",
})
export class ActivityLogService {
// 1. Define the buffer size: Store the last 5 log entries.
private readonly logBufferSize = 5;
// 2. Create the ReplaySubject with the specified buffer size.
// Private to control who can add logs.
private logSubject = new ReplaySubject<LogEntry>(this.logBufferSize);
// 3. Expose the log entries as an Observable.
logEntries$: Observable<LogEntry> = this.logSubject.asObservable();
// Method for other parts of the app to add log entries
addLog(message: string): void {
const newEntry: LogEntry = {
message: message,
timestamp: new Date(),
};
console.log(`ActivityLogService: Adding log - "${message}"`);
// 4. Push the new log entry into the ReplaySubject.
// It gets stored in the buffer and sent to current subscribers.
this.logSubject.next(newEntry);
}
}
2. Action Simulator Component - Adds Logs
import { Component, inject } from "@angular/core";
import { ActivityLogService } from "./activity-log.service"; // Adjust path
@Component({
selector: "app-action-simulator",
standalone: true,
template: `
<div>
<h4>Simulate Actions</h4>
<button (click)="simulateAdd()">Add Item</button>
<button (click)="simulateSave()">Save Settings</button>
<button (click)="simulateDelete()">Delete User</button>
</div>
`,
styles: [
"div { margin-bottom: 15px; padding: 10px; border: 1px dashed green; }",
],
})
export class ActionSimulatorComponent {
private logService = inject(ActivityLogService);
private itemCounter = 0;
simulateAdd(): void {
this.itemCounter++;
this.logService.addLog(`Item #${this.itemCounter} added to cart.`);
}
simulateSave(): void {
this.logService.addLog("User preferences saved successfully.");
}
simulateDelete(): void {
this.logService.addLog("User account marked for deletion.");
}
}
3. Activity Log Display Component - Shows Logs
import {
Component,
inject,
signal,
OnInit,
DestroyRef,
ChangeDetectionStrategy,
} from "@angular/core";
import { CommonModule, DatePipe } from "@angular/common"; // Need DatePipe
import { takeUntilDestroyed } from "@angular/core/rxjs-interop";
import { ActivityLogService, LogEntry } from "./activity-log.service"; // Adjust path
@Component({
selector: "app-activity-display",
standalone: true,
imports: [CommonModule, DatePipe], // Import CommonModule and DatePipe
template: `
<div class="log-display">
<h4>Recent Activity Log (Last {{ bufferSize }})</h4>
@if (logMessages().length > 0) {
<ul>
@for (entry of logMessages(); track entry.timestamp) {
<li>
[{{ entry.timestamp | date : "mediumTime" }}] {{ entry.message }}
</li>
}
</ul>
} @else {
<p>No activity logged yet.</p>
}
</div>
`,
styles: [
`
.log-display {
border: 1px solid orange;
padding: 10px;
margin-top: 10px;
min-height: 150px;
}
ul {
list-style: none;
padding-left: 0;
}
li {
margin-bottom: 5px;
font-size: 0.9em;
}
`,
],
changeDetection: ChangeDetectionStrategy.OnPush,
})
export class ActivityDisplayComponent implements OnInit {
private logService = inject(ActivityLogService);
private destroyRef = inject(DestroyRef);
// Expose buffer size to template if needed (optional)
bufferSize = (this.logService as any).logBufferSize; // Access private for demo - better way is getter in service
// Use a signal to hold the logs for the template
logMessages = signal<LogEntry[]>([]);
ngOnInit(): void {
// Subscribe to the log entries from the service
this.logService.logEntries$
.pipe(
// Automatically unsubscribe when the component is destroyed
takeUntilDestroyed(this.destroyRef)
)
.subscribe((entry) => {
// *** Key Point ***
// When this component subscribes, it might immediately receive
// up to 'logBufferSize' (5) entries if they were already added
// to the ReplaySubject BEFORE this component loaded/subscribed.
console.log(
`ActivityDisplayComponent received log: "${entry.message}"`
);
// Update the signal. Add new entry to the end.
// ReplaySubject emits buffered items one by one, then live ones.
// We just append each one as it arrives.
this.logMessages.update((currentLogs) => [...currentLogs, entry]);
// Optional: Trim the array in the component if you strictly want only 'bufferSize' items VISIBLE
// This might be needed if the ReplaySubject's buffer gets cleared/changed,
// though usually you'd just display what the ReplaySubject sends.
// this.logMessages.update(currentLogs => currentLogs.slice(-this.bufferSize));
});
}
}
4. App Component
import { Component, inject } from "@angular/core";
import { ActionSimulatorComponent } from "./action-simulator.component"; // Adjust path
import { ActivityDisplayComponent } from "./activity-display.component"; // Adjust path
import { ActivityLogService } from "./activity-log.service"; // Adjust path
@Component({
selector: "app-root",
standalone: true,
imports: [ActionSimulatorComponent, ActivityDisplayComponent], // Import components
template: `
<h1>RxJS ReplaySubject Demo</h1>
<button (click)="addLogDirectly()">Add Log (Directly)</button>
<hr />
<app-action-simulator></app-action-simulator>
<hr />
<app-activity-display></app-activity-display>
<!-- Add another display later to show replay -->
<!-- <app-activity-display></app-activity-display> -->
`,
})
export class AppComponent {
// Inject service just to add an initial log for testing replay
private logService = inject(ActivityLogService);
constructor() {
// Add a log entry *before* any components might fully initialize
this.logService.addLog("Application session started.");
}
addLogDirectly(): void {
this.logService.addLog("Direct Log Button Clicked.");
}
}
Explanation:
ActivityLogService
creates aReplaySubject
configured to buffer the last 5 (logBufferSize
)LogEntry
objects.- When
addLog
is called (byActionSimulatorComponent
or directly), the newLogEntry
is pushed into thelogSubject
. It's stored in the buffer (replacing the oldest if the buffer is full) and sent to any currently subscribed components. - When
ActivityDisplayComponent
initializes (ngOnInit
), it subscribes tologService.logEntries$
. - Crucially: The
ReplaySubject
immediately replays its buffered messages (up to 5 of the most recent ones, including the "Application session started." log added in theAppComponent
constructor) to the new subscription inActivityDisplayComponent
. The component doesn't start with an empty list; it gets the recent history right away. - As new logs are added via
addLog
, they are pushed live to theActivityDisplayComponent
's subscription and added to its display.
If you were to add a second instance of ActivityDisplayComponent
to the AppComponent
template later (e.g., after a few logs have already been added), that second instance would also immediately receive the same buffered history upon subscribing, demonstrating the replay functionality for late subscribers.