mergeMap
mergeMap()
is a higher-order mapping operator used to handle scenarios where each value emitted by a source (outer) Observable triggers an asynchronous operation that returns another Observable (an inner Observable).
Here's how it works:
- It takes a value emitted by the source (outer) Observable.
- It uses that value and a function you provide to create a new inner Observable.
- It subscribes to this new inner Observable.
- Crucially (and unlike
switchMap
): If the source Observable emits a new value,mergeMap
does not cancel or unsubscribe from any previous inner Observables that might still be running. - It subscribes to the new inner Observable generated by the new source value and runs it concurrently with any other active inner Observables.
- It then merges the values emitted by all the active inner Observables into a single output stream. The order of the output values depends on when the inner Observables emit, not necessarily the order of the outer source emissions.
Think of it as spawning multiple asynchronous tasks based on incoming triggers and collecting all their results together as they complete, without cancelling anything.
Key Characteristics#
- Higher-Order Mapping: Maps values from an outer Observable to inner Observables.
- Concurrent Execution: Subscribes to and runs multiple inner Observables in parallel.
- Merging Output: Combines emissions from all active inner Observables into a single output stream.
- No Cancellation: Does not cancel previous inner operations when new outer values arrive.
- Use Cases: Ideal when you need to perform multiple asynchronous actions concurrently based on source emissions and want the results from all of them. Useful when the order of completion isn't strictly important, and parallel processing is beneficial.
Real-World Example Scenario#
It's Monday evening here in Bengaluru (around 5:40 PM IST), and imagine you're working on a feature in an Angular application where a user can modify several pieces of data (e.g., multiple settings, or various documents in a list) and then click a single "Save All" button.
- The click event triggers an action.
- You get a list of items that need saving (e.g.,
['settingA', 'settingB', 'documentX']
). - For each item in the list, you need to make a separate API call (e.g.,
http.put('/api/settings/settingA', ...)
,http.put('/api/settings/settingB', ...)
, etc.). - You want these save operations to happen in parallel to make it faster. You don't want to wait for 'settingA' to finish saving before starting the save for 'settingB'.
- You want to get feedback (like a success/error message) for each individual save operation as it completes.
mergeMap
is perfect for this because it will take each item ID, trigger its corresponding API call (inner Observable), run all these API calls concurrently, and merge their results (e.g., success/error responses) into the output stream as they arrive.
Code Snippet (Angular Component - Save All Example)#
import { Component } from "@angular/core";
import { HttpClient } from "@angular/common/http";
import { from, of, Observable } from "rxjs";
import { mergeMap, catchError, map, tap } from "rxjs/operators";
interface SaveItem {
id: string;
payload: any;
apiUrl: string;
}
interface SaveResult {
id: string;
success: boolean;
message: string;
response?: any;
}
@Component({
selector: "app-save-all-demo",
template: `
<h4>Concurrent Save Demo</h4>
<button (click)="saveAll()" class="btn btn-primary" [disabled]="isSaving">
{{ isSaving ? "Saving..." : "Save All Changes" }}
</button>
<ul class="list-group mt-2">
<li
*ngFor="let result of saveResults"
class="list-group-item"
[ngClass]="{
'list-group-item-success': result.success,
'list-group-item-danger': !result.success
}"
>
{{ result.message }}
</li>
</ul>
`,
})
export class SaveAllDemoComponent {
saveResults: SaveResult[] = [];
isSaving = false;
constructor(private http: HttpClient) {}
saveAll(): void {
this.isSaving = true;
this.saveResults = []; // Clear previous results
// 1. Define the items that need saving (could come from component state)
const itemsToSave: SaveItem[] = [
{
id: "doc1",
payload: { content: "Updated content for Doc 1" },
apiUrl: "/api/documents/doc1",
},
{
id: "settingA",
payload: { value: true },
apiUrl: "/api/settings/settingA",
},
{
id: "userPrefX",
payload: { theme: "dark" },
apiUrl: "/api/preferences/userPrefX",
},
];
// 2. Create an Observable from the array of items
const itemsSource$ = from(itemsToSave);
// 3. Use mergeMap to process each item concurrently
itemsSource$
.pipe(
tap((item) =>
console.log(
`Starting save process for: ${
item.id
} at ${new Date().toLocaleTimeString()}`
)
),
mergeMap(
// This function is called for each item ('doc1', 'settingA', 'userPrefX')
(itemToSave: SaveItem) => {
console.log(
` [mergeMap] Triggering API call for: ${itemToSave.id}`
);
// Return the inner Observable (the HTTP PUT/POST request) for this item
// mergeMap subscribes to this immediately and runs it concurrently with others.
return this.http
.put<any>(itemToSave.apiUrl, itemToSave.payload)
.pipe(
// Map the successful HTTP response to a SaveResult object
map((response) => ({
id: itemToSave.id,
success: true,
message: `Successfully saved ${itemToSave.id}.`,
response: response,
})),
// Catch errors specific to this *inner* HTTP request
catchError((error) => {
console.error(`Error saving ${itemToSave.id}:`, error);
// Return an Observable emitting a failure SaveResult
// 'of()' creates an Observable that emits the value and completes.
return of({
id: itemToSave.id,
success: false,
message: `Failed to save ${itemToSave.id}: ${
error.statusText || "Unknown error"
}`,
});
})
); // End of inner http observable pipe
} // End of mergeMap project function
) // End of outer pipe
)
.subscribe({
next: (result: SaveResult) => {
// This 'next' handler receives results from *any* of the inner HTTP calls
// as they complete. The order is not guaranteed.
console.log(`Received result: ${result.message}`);
this.saveResults.push(result);
},
error: (err) => {
// This catches errors in the outer stream (e.g., if 'from(itemsToSave)' failed)
// Errors from inner HTTP calls are caught by the inner catchError.
console.error("Outer stream error:", err);
this.isSaving = false;
this.saveResults.push({
id: "GLOBAL_ERROR",
success: false,
message: "An unexpected error occurred in the save process.",
});
},
complete: () => {
// This is called only when the outer stream (itemsSource$) completes AND
// *all* inner Observables spawned by mergeMap have also completed.
console.log(
`All save operations finalized at ${new Date().toLocaleTimeString()}.`
);
this.isSaving = false;
},
});
}
}
Explanation:
from(itemsToSave)
: Creates the outer Observable, emitting eachSaveItem
object one by one.mergeMap((itemToSave: SaveItem) => ...)
: For eachSaveItem
emitted byfrom()
:- It immediately calls the function provided.
- This function returns
this.http.put(...)
, which is the inner Observable representing the API call for that specific item. mergeMap
subscribes to this inner Observable without unsubscribing from any previous ones. Iffrom()
emitsitem1
,item2
,item3
quickly,mergeMap
will likely have three concurrenthttp.put
requests running.
- Inner
map
andcatchError
: These handle the result of each individual API call, transforming it into a standardSaveResult
format whether it succeeds or fails. ThecatchError
prevents a single failed save from stopping the processing of other saves. subscribe({...})
:- The
next
handler receivesSaveResult
objects as soon as any of the concurrent HTTP requests complete. The order might be different from the order initemsToSave
depending on server response times. - The
complete
handler only fires when all items from thefrom()
observable have been processed bymergeMap
, and all the corresponding inner HTTP observables have completed.
- The
Summary#
use mergeMap()
when you need to trigger multiple asynchronous operations based on incoming events/data and want them to run concurrently, collecting all their results as they finish. It's ideal for parallelism where cancellation of previous operations is not needed or desired.