Skip to content

from

The from() operator is another creation operator, but its main purpose is to convert various other types of objects and data structures into an Observable. It's versatile and can handle things like:

  • Arrays (or array-like objects like NodeList, arguments)
  • Iterables (like Map, Set, or strings)
  • Promises
  • Other Observables (though this is less common as you usually just use the Observable directly)

When given an array or iterable, from() emits each item from that collection one by one, in order, and then completes. When given a Promise, it waits for the Promise to resolve, emits the resolved value as its single next notification, and then completes. If the Promise rejects, from() emits an error notification.

Key Characteristics#

  • Conversion: Its primary role is converting something else into an Observable.
  • Single Argument: It takes only one argument – the object to convert.
  • Emission Behavior:
  • For arrays/iterables: Emits items synchronously, one by one.
  • For Promises: Emits the resolved value asynchronously when the promise settles.
  • Completes: It completes after emitting all items (from iterables) or the resolved value (from promises).

Difference from of()#

This is a common point of confusion:

  • of([1, 2, 3]): Emits the entire array [1, 2, 3] as a single item.
  • from([1, 2, 3]): Emits 1, then 2, then 3 as three separate items.

Real-World Example Scenarios#

  1. Processing Array Items: You might fetch configuration data which arrives as a plain array, but you want to use RxJS operators (map, filter, etc.) to process each item in the array within a stream.
  2. Integrating Promises: You're working within an Angular/RxJS codebase, but need to interact with a browser API or a third-party JavaScript library that returns a Promise. from() lets you easily bring that promise-based result into your RxJS workflow.

Code Snippet 1 (Using from() with an Array)#

Let's say you have an array of user IDs and you want to create an Observable stream that emits each ID individually.

import { Component, OnInit } from "@angular/core";
import { from, Observable } from "rxjs"; // Import 'from'
import { map } from "rxjs/operators";

@Component({
  selector: "app-user-id-processor",
  template: `
    <h4>Processing User IDs:</h4>
    <ul>
      <li *ngFor="let processedId of processedUserIds$ | async">
        {{ processedId }}
      </li>
    </ul>
  `,
})
export class UserIdProcessorComponent implements OnInit {
  userIds: string[] = ["user-001", "user-007", "user-101"];
  processedUserIds$: Observable<string> | undefined;

  ngOnInit(): void {
    console.log("Component initializing...");

    // Convert the userIds array into an Observable stream
    const userIdStream$ = from(this.userIds);

    // Example: Use RxJS operators on the stream from the array
    this.processedUserIds$ = userIdStream$.pipe(
      map((id) => `Processed: ${id.toUpperCase()}`) // Apply an operator to each emitted ID
    );

    console.log(
      "Observable created from array. Subscribing manually for demonstration..."
    );

    this.processedUserIds$.subscribe({
      next: (value) => {
        // Called for each ID ('Processed: USER-001', 'Processed: USER-007', etc.)
        console.log("Received processed ID:", value);
      },
      error: (err) => {
        console.error("Error:", err); // Won't happen here
      },
      complete: () => {
        // Called after the last ID is processed and emitted
        console.log("User ID stream complete!");
      },
    });
    console.log("Subscription processing for array finished (synchronously).");
  }
}

Code Snippet 2 (Using from() with a Promise)#

Imagine you need to use the browser's Workspace API (which returns a Promise) to get some data and integrate it into your component's Observable-based logic.

import { Component, OnInit } from "@angular/core";
import { from, Observable } from "rxjs"; // Import 'from'
import { switchMap, catchError, tap } from "rxjs/operators";

@Component({
  selector: "app-promise-integrator",
  template: `
    <h4>Data from Promise:</h4>
    <div *ngIf="data$ | async as fetchedData">
      Data fetched: {{ fetchedData | json }}
    </div>
    <div *ngIf="errorMessage">Error: {{ errorMessage }}</div>
  `,
})
export class PromiseIntegratorComponent implements OnInit {
  data$: Observable<any> | undefined;
  errorMessage: string = "";

  ngOnInit(): void {
    console.log("Component initializing...");

    // 1. Create a Promise (e.g., using fetch)
    const dataPromise = fetch("https://api.example.com/data") // Example API
      .then((response) => {
        if (!response.ok) {
          throw new Error(`HTTP error! status: ${response.status}`);
        }
        return response.json(); // This also returns a Promise
      });

    console.log("Promise created. Converting to Observable using from()...");

    // 2. Convert the Promise to an Observable using from()
    const promiseAsObservable$ = from(dataPromise);

    // 3. Use the Observable in your RxJS pipeline
    this.data$ = promiseAsObservable$.pipe(
      tap((data) =>
        console.log("Data received from promise via Observable:", data)
      ),
      catchError((error) => {
        // Handle potential errors from the promise (fetch failure, JSON parsing error)
        console.error("Error emitted from promise Observable:", error);
        this.errorMessage = error.message || "Failed to fetch data";
        return from([]); // Return an empty observable to prevent killing the main stream
        // Or: return throwError(() => new Error('Custom error message'));
      })
    );

    console.log(
      "Subscribing to promise-based Observable (will resolve asynchronously)..."
    );
    // AsyncPipe in the template will handle the subscription here.
    // Manual subscription for logging completion:
    this.data$.subscribe({
      next: () => {
        /* Handled by tap above / AsyncPipe */
      },
      error: () => {
        /* Handled by catchError above */
      },
      complete: () => {
        // Called only if the promise resolves successfully and catchError doesn't replace the stream
        if (!this.errorMessage) {
          console.log("Promise-based Observable stream complete!");
        }
      },
    });
  }
}

Explanation:

  • Array Example: from(this.userIds) takes the array and emits each string element individually, allowing operators like map to work on each one.
  • Promise Example: from(dataPromise) takes the promise returned by Workspace().then(...). It waits (asynchronously) for the promise to resolve. If it resolves successfully, the resolved JSON data is emitted as the next value. If the promise rejects (e.g., network error), from() emits an error notification, which we handle with catchError. The stream completes after the single value (or error) is emitted.

Summary#

from() is your go-to operator when you need to turn an array, iterable, or Promise into an Observable stream, typically to process its contents individually or integrate it seamlessly into your existing RxJS pipelines.