Skip to content

concatMap

concatMap() is a higher-order mapping operator that maps each value from a source (outer) Observable to an inner Observable, subscribes to it, but waits for that inner Observable to complete before moving on to map and subscribe to the inner Observable generated by the next value from the source.

Here's the flow:

  1. It takes the first value emitted by the source (outer) Observable.
  2. It uses that value and your project function to create the first inner Observable.
  3. It subscribes to this first inner Observable and emits its values.
  4. If the source Observable emits a second value while the first inner Observable is still running, concatMap holds onto that second value.
  5. Only when the first inner Observable completes does concatMap use the held second value to create and subscribe to the second inner Observable.
  6. This process continues, effectively creating a queue where inner Observables are executed one after another, strictly in the order dictated by the source Observable.

Think of it as processing tasks in a single-file line: the next task only starts once the current one is completely finished.

Key Characteristics#

  • Higher-Order Mapping: Maps values from an outer Observable to inner Observables.
  • Sequential Execution: Runs inner Observables one at a time, in order.
  • Waits for Completion: Does not subscribe to the next inner Observable until the previous one completes.
  • Preserves Order: Guarantees that the output values maintain the order corresponding to the source emissions.
  • Use Cases: Ideal when the order of operations is crucial, or when you need to ensure one asynchronous task finishes before the next begins (e.g., to avoid race conditions, maintain data integrity, or process items sequentially). Also useful for implicit rate-limiting when you don't want concurrent requests.

Real-World Example Scenario#

It's Tuesday afternoon here in Bengaluru (around 2:50 PM IST), and imagine you're building a feature where a user can trigger several updates that need to be applied to a database or configuration file in a specific order to maintain consistency.

Scenario: A user is rapidly clicking buttons to add different items to a configuration profile ("Add Feature A", "Enable Setting B", "Add User C"). Each click triggers an API call to update the profile. If these updates happened concurrently (mergeMap), they might interfere with each other or lead to an inconsistent final state depending on server response times. If you used switchMap, clicking "Enable Setting B" might cancel the "Add Feature A" request if it was still pending.

You want to ensure the updates are applied strictly in the order the user clicked:

  1. Wait for "Add Feature A" API call to complete successfully.
  2. Then execute the "Enable Setting B" API call and wait for it to complete.
  3. Then execute the "Add User C" API call.

concatMap enforces this sequential processing.

Code Snippet (Angular Component - Sequential Updates)#

import { Component, OnDestroy } from "@angular/core";
import { HttpClient } from "@angular/common/http";
import { Subject, Subscription, Observable, of } from "rxjs";
import { concatMap, catchError, tap, delay } from "rxjs/operators"; // Import concatMap

interface UpdateAction {
  id: string; // Unique identifier for the action
  type: string;
  payload: any;
  apiUrl: string;
}

interface UpdateResult {
  actionId: string;
  success: boolean;
  message: string;
}

@Component({
  selector: "app-sequential-updates",
  template: `
    <h4>Sequential Update Demo</h4>
    <p>
      Rapidly click the buttons below. Updates will be processed one after
      another.
    </p>
    <button
      (click)="triggerUpdate('Add Feature A', '/api/features')"
      class="btn btn-info me-2"
    >
      Add Feature A
    </button>
    <button
      (click)="triggerUpdate('Enable Setting B', '/api/settings/b')"
      class="btn btn-info me-2"
    >
      Enable Setting B
    </button>
    <button
      (click)="triggerUpdate('Add User C', '/api/users')"
      class="btn btn-info me-2"
    >
      Add User C
    </button>

    <div class="mt-3">
      <h5>Processing Log:</h5>
      <ul class="list-group">
        <li
          *ngFor="let log of processingLog"
          class="list-group-item small"
          [ngClass]="{
            'list-group-item-success': log.includes('Success'),
            'list-group-item-danger': log.includes('Failed'),
            'list-group-item-secondary':
              log.includes('Queueing') || log.includes('Starting')
          }"
        >
          {{ log }}
        </li>
      </ul>
    </div>
  `,
})
export class SequentialUpdatesComponent implements OnDestroy {
  processingLog: string[] = [];

  // Use a Subject to push actions onto the stream when buttons are clicked
  private actionSubject = new Subject<UpdateAction>();
  private actionSubscription: Subscription;

  constructor(private http: HttpClient) {
    // Subscribe to the action stream and use concatMap for processing
    this.actionSubscription = this.actionSubject
      .pipe(
        tap((action) => {
          const logMsg = `[${new Date().toLocaleTimeString()}] Queueing: ${
            action.type
          } (ID: ${action.id})`;
          console.log(logMsg);
          this.processingLog.push(logMsg);
        }),
        // concatMap ensures the next action waits until the inner observable (API call) completes
        concatMap((action: UpdateAction) => {
          const startLogMsg = `[${new Date().toLocaleTimeString()}] Starting API call for: ${
            action.type
          } (ID: ${action.id})`;
          console.log(`   ${startLogMsg}`);
          this.processingLog.push(startLogMsg);

          // Simulate API call - Replace with actual http.post/put
          // Adding a delay to better visualize the sequential nature
          const innerApiCall$ = of({
            success: true,
            received: action.payload,
          }).pipe(
            delay(1500 + Math.random() * 1000) // Simulate network latency (1.5 - 2.5 seconds)
          );
          // const innerApiCall$ = this.http.post<any>(action.apiUrl, action.payload)

          return innerApiCall$.pipe(
            // Map the successful result
            map((response) => ({
              actionId: action.id,
              success: true,
              message: `Success: ${action.type} (ID: ${action.id}) completed.`,
            })),
            // Catch errors for *this specific* API call
            catchError((error) => {
              console.error(
                `Error processing ${action.type} (ID: ${action.id}):`,
                error
              );
              // Return an Observable emitting the failure result
              return of({
                actionId: action.id,
                success: false,
                message: `Failed: ${action.type} (ID: ${action.id}) - ${
                  error.message || "Unknown error"
                }`,
              });
            })
          ); // End of inner pipe
        }) // End of concatMap
      )
      .subscribe({
        next: (result: UpdateResult) => {
          // This receives results one by one, *in order*, after each API call completes
          const logMsg = `[${new Date().toLocaleTimeString()}] ${
            result.message
          }`;
          console.log(logMsg);
          this.processingLog.push(logMsg);
        },
        error: (err) => {
          // Error in the main action stream (unlikely with Subject unless error pushed)
          const logMsg = `[${new Date().toLocaleTimeString()}] Critical stream error: ${err}`;
          console.error(logMsg);
          this.processingLog.push(logMsg);
        },
        complete: () => {
          // Only called if the actionSubject itself completes (not typical for button clicks)
          const logMsg = `[${new Date().toLocaleTimeString()}] Action stream completed.`;
          console.log(logMsg);
          this.processingLog.push(logMsg);
        },
      });
  }

  triggerUpdate(type: string, apiUrl: string): void {
    const action: UpdateAction = {
      id: Math.random().toString(36).substring(2, 9), // Generate simple unique ID
      type: type,
      payload: { timestamp: new Date().toISOString() }, // Example payload
      apiUrl: apiUrl,
    };
    console.log(
      `[${new Date().toLocaleTimeString()}] Button clicked, pushing action: ${type} (ID: ${
        action.id
      })`
    );
    this.actionSubject.next(action); // Push the action onto the Subject stream
  }

  ngOnDestroy(): void {
    // Clean up the subscription when the component is destroyed
    if (this.actionSubscription) {
      this.actionSubscription.unsubscribe();
      console.log("Sequential updates subscription stopped.");
    }
    this.actionSubject.complete(); // Also complete the subject
  }
}

Explanation:

  1. Subject<UpdateAction>: We use a Subject (actionSubject) to act as the source Observable. Button clicks push UpdateAction objects onto this Subject using actionSubject.next(action).
  2. concatMap((action: UpdateAction) => ...): This is the core.
    • When the actionSubject emits an action, concatMap takes it.
    • It executes the inner function, which returns the inner Observable (innerApiCall$).
    • concatMap subscribes to innerApiCall$.
    • Crucially: If another action is pushed onto actionSubject before innerApiCall$ completes, concatMap waits. It doesn't execute the inner function for the new action yet.
    • Only when the current innerApiCall$ completes (either successfully maps to a result or is handled by catchError) does concatMap proceed to process the next queued action from actionSubject.
  3. delay(): Added inside the simulated API call to make the sequential waiting behavior obvious in the log.
  4. Logging: The console and UI logs will clearly show actions being queued, then starting, then completing one after another, even if the buttons are clicked very rapidly.

Summary#

use concatMap when the order of execution matters and you need to ensure that asynchronous operations triggered by a stream of events happen sequentially, one completing before the next one begins. It's your tool for enforcing order in asynchronous workflows.