concatMap
concatMap()
is a higher-order mapping operator that maps each value from a source (outer) Observable to an inner Observable, subscribes to it, but waits for that inner Observable to complete before moving on to map and subscribe to the inner Observable generated by the next value from the source.
Here's the flow:
- It takes the first value emitted by the source (outer) Observable.
- It uses that value and your project function to create the first inner Observable.
- It subscribes to this first inner Observable and emits its values.
- If the source Observable emits a second value while the first inner Observable is still running,
concatMap
holds onto that second value. - Only when the first inner Observable completes does
concatMap
use the held second value to create and subscribe to the second inner Observable. - This process continues, effectively creating a queue where inner Observables are executed one after another, strictly in the order dictated by the source Observable.
Think of it as processing tasks in a single-file line: the next task only starts once the current one is completely finished.
Key Characteristics#
- Higher-Order Mapping: Maps values from an outer Observable to inner Observables.
- Sequential Execution: Runs inner Observables one at a time, in order.
- Waits for Completion: Does not subscribe to the next inner Observable until the previous one completes.
- Preserves Order: Guarantees that the output values maintain the order corresponding to the source emissions.
- Use Cases: Ideal when the order of operations is crucial, or when you need to ensure one asynchronous task finishes before the next begins (e.g., to avoid race conditions, maintain data integrity, or process items sequentially). Also useful for implicit rate-limiting when you don't want concurrent requests.
Real-World Example Scenario#
It's Tuesday afternoon here in Bengaluru (around 2:50 PM IST), and imagine you're building a feature where a user can trigger several updates that need to be applied to a database or configuration file in a specific order to maintain consistency.
Scenario: A user is rapidly clicking buttons to add different items to a configuration profile ("Add Feature A", "Enable Setting B", "Add User C"). Each click triggers an API call to update the profile. If these updates happened concurrently (mergeMap
), they might interfere with each other or lead to an inconsistent final state depending on server response times. If you used switchMap
, clicking "Enable Setting B" might cancel the "Add Feature A" request if it was still pending.
You want to ensure the updates are applied strictly in the order the user clicked:
- Wait for "Add Feature A" API call to complete successfully.
- Then execute the "Enable Setting B" API call and wait for it to complete.
- Then execute the "Add User C" API call.
concatMap
enforces this sequential processing.
Code Snippet (Angular Component - Sequential Updates)#
import { Component, OnDestroy } from "@angular/core";
import { HttpClient } from "@angular/common/http";
import { Subject, Subscription, Observable, of } from "rxjs";
import { concatMap, catchError, tap, delay } from "rxjs/operators"; // Import concatMap
interface UpdateAction {
id: string; // Unique identifier for the action
type: string;
payload: any;
apiUrl: string;
}
interface UpdateResult {
actionId: string;
success: boolean;
message: string;
}
@Component({
selector: "app-sequential-updates",
template: `
<h4>Sequential Update Demo</h4>
<p>
Rapidly click the buttons below. Updates will be processed one after
another.
</p>
<button
(click)="triggerUpdate('Add Feature A', '/api/features')"
class="btn btn-info me-2"
>
Add Feature A
</button>
<button
(click)="triggerUpdate('Enable Setting B', '/api/settings/b')"
class="btn btn-info me-2"
>
Enable Setting B
</button>
<button
(click)="triggerUpdate('Add User C', '/api/users')"
class="btn btn-info me-2"
>
Add User C
</button>
<div class="mt-3">
<h5>Processing Log:</h5>
<ul class="list-group">
<li
*ngFor="let log of processingLog"
class="list-group-item small"
[ngClass]="{
'list-group-item-success': log.includes('Success'),
'list-group-item-danger': log.includes('Failed'),
'list-group-item-secondary':
log.includes('Queueing') || log.includes('Starting')
}"
>
{{ log }}
</li>
</ul>
</div>
`,
})
export class SequentialUpdatesComponent implements OnDestroy {
processingLog: string[] = [];
// Use a Subject to push actions onto the stream when buttons are clicked
private actionSubject = new Subject<UpdateAction>();
private actionSubscription: Subscription;
constructor(private http: HttpClient) {
// Subscribe to the action stream and use concatMap for processing
this.actionSubscription = this.actionSubject
.pipe(
tap((action) => {
const logMsg = `[${new Date().toLocaleTimeString()}] Queueing: ${
action.type
} (ID: ${action.id})`;
console.log(logMsg);
this.processingLog.push(logMsg);
}),
// concatMap ensures the next action waits until the inner observable (API call) completes
concatMap((action: UpdateAction) => {
const startLogMsg = `[${new Date().toLocaleTimeString()}] Starting API call for: ${
action.type
} (ID: ${action.id})`;
console.log(` ${startLogMsg}`);
this.processingLog.push(startLogMsg);
// Simulate API call - Replace with actual http.post/put
// Adding a delay to better visualize the sequential nature
const innerApiCall$ = of({
success: true,
received: action.payload,
}).pipe(
delay(1500 + Math.random() * 1000) // Simulate network latency (1.5 - 2.5 seconds)
);
// const innerApiCall$ = this.http.post<any>(action.apiUrl, action.payload)
return innerApiCall$.pipe(
// Map the successful result
map((response) => ({
actionId: action.id,
success: true,
message: `Success: ${action.type} (ID: ${action.id}) completed.`,
})),
// Catch errors for *this specific* API call
catchError((error) => {
console.error(
`Error processing ${action.type} (ID: ${action.id}):`,
error
);
// Return an Observable emitting the failure result
return of({
actionId: action.id,
success: false,
message: `Failed: ${action.type} (ID: ${action.id}) - ${
error.message || "Unknown error"
}`,
});
})
); // End of inner pipe
}) // End of concatMap
)
.subscribe({
next: (result: UpdateResult) => {
// This receives results one by one, *in order*, after each API call completes
const logMsg = `[${new Date().toLocaleTimeString()}] ${
result.message
}`;
console.log(logMsg);
this.processingLog.push(logMsg);
},
error: (err) => {
// Error in the main action stream (unlikely with Subject unless error pushed)
const logMsg = `[${new Date().toLocaleTimeString()}] Critical stream error: ${err}`;
console.error(logMsg);
this.processingLog.push(logMsg);
},
complete: () => {
// Only called if the actionSubject itself completes (not typical for button clicks)
const logMsg = `[${new Date().toLocaleTimeString()}] Action stream completed.`;
console.log(logMsg);
this.processingLog.push(logMsg);
},
});
}
triggerUpdate(type: string, apiUrl: string): void {
const action: UpdateAction = {
id: Math.random().toString(36).substring(2, 9), // Generate simple unique ID
type: type,
payload: { timestamp: new Date().toISOString() }, // Example payload
apiUrl: apiUrl,
};
console.log(
`[${new Date().toLocaleTimeString()}] Button clicked, pushing action: ${type} (ID: ${
action.id
})`
);
this.actionSubject.next(action); // Push the action onto the Subject stream
}
ngOnDestroy(): void {
// Clean up the subscription when the component is destroyed
if (this.actionSubscription) {
this.actionSubscription.unsubscribe();
console.log("Sequential updates subscription stopped.");
}
this.actionSubject.complete(); // Also complete the subject
}
}
Explanation:
Subject<UpdateAction>
: We use a Subject (actionSubject
) to act as the source Observable. Button clicks pushUpdateAction
objects onto this Subject usingactionSubject.next(action)
.concatMap((action: UpdateAction) => ...)
: This is the core.- When the
actionSubject
emits an action,concatMap
takes it. - It executes the inner function, which returns the inner Observable (
innerApiCall$
). concatMap
subscribes toinnerApiCall$
.- Crucially: If another action is pushed onto
actionSubject
beforeinnerApiCall$
completes,concatMap
waits. It doesn't execute the inner function for the new action yet. - Only when the current
innerApiCall$
completes (either successfully maps to a result or is handled bycatchError
) doesconcatMap
proceed to process the next queued action fromactionSubject
.
- When the
delay()
: Added inside the simulated API call to make the sequential waiting behavior obvious in the log.- Logging: The console and UI logs will clearly show actions being queued, then starting, then completing one after another, even if the buttons are clicked very rapidly.
Summary#
use concatMap
when the order of execution matters and you need to ensure that asynchronous operations triggered by a stream of events happen sequentially, one completing before the next one begins. It's your tool for enforcing order in asynchronous workflows.