DEV Community

Ayoub Ali
Ayoub Ali

Posted on

Reactive Programming with RxDart in Flutter with Example

Introduction:

Reactive programming is a popular paradigm that enables developers to build highly responsive and scalable applications. When combined with the Flutter framework, it empowers developers to create dynamic and reactive user interfaces. One of the most powerful libraries for reactive programming in Flutter is RxDart. In this blog post, we will explore the fundamentals of reactive programming and demonstrate how RxDart can be leveraged to enhance your Flutter applications.

Outline

Core Concepts of Reactive Programming

At its core, reactive programming revolves around three fundamental concepts:

  1. Observables:
    Observables represent a sequence of values that can change over time. They can emit data or events, and other parts of the application can subscribe to these observables to be notified when new values or events are available.

  2. Streams:
    Streams are a type of observables in reactive programming. They provide a continuous flow of data or events over time. Developers can listen to streams and react to the data or events emitted by them.

  3. Data Flow:
    Reactive programming encourages a unidirectional data flow, where changes in the observables trigger updates in dependent components or operations. This ensures that the application remains responsive and efficiently handles changes without causing unnecessary side effects.

By leveraging these core concepts, reactive programming enables developers to build applications that can react to user interactions, data updates, and other events in a scalable and efficient manner. It promotes a more declarative and event-driven style of programming, making it easier to handle complex asynchronous operations and maintain a responsive user interface.

RxDart Installation and Setup

To install RxDart and set it up in your Flutter project, follow these steps:

  1. Open your Flutter project in an IDE or text editor.

  2. Open the pubspec.yaml file located in the root directory of your Flutter project.

  3. In the dependencies section of the pubspec.yaml file, add the following line:

dependencies:
  rxdart: ^0.27.1
Enter fullscreen mode Exit fullscreen mode

This line specifies that your project will depend on the RxDart library and uses version 0.27.1 (or the latest version available).

  1. Save the pubspec.yaml file.

  2. In your IDE or terminal, run the following command to fetch and install the RxDart library:

flutter pub get
Enter fullscreen mode Exit fullscreen mode

This command will download the RxDart library and make it available for use in your Flutter project.

  1. Once the installation is complete, you can start using RxDart in your Flutter code. Import the RxDart package in the relevant files where you intend to use it:
import 'package:rxdart/rxdart.dart';
Enter fullscreen mode Exit fullscreen mode
  1. You are now ready to utilize RxDart and its reactive programming capabilities in your Flutter project. Refer to the RxDart documentation and examples to learn about different observables, streams, and operators provided by the library.

Remember to import the necessary classes from the RxDart package whenever you need to use them in your code.

Note: Make sure to follow the Flutter and Dart version compatibility requirements specified by the RxDart library.

Key concept of Observable, Stream, StreamController and Subjects in RxDart

Key Concepts in RxDart:

  1. Observable:
    • Observables represent a stream of data or events that can change over time.
    • They allow you to emit values or events and enable other parts of the application to subscribe and react to those emissions.
    • Observables can be created from various sources such as lists, futures, or streams.

Example:

   // Creating an Observable from a List
   final numbers = Observable.fromIterable([1, 2, 3, 4, 5]);

   // Subscribing to the Observable
   final subscription = numbers.listen((number) {
     print('Received number: $number');
   });

   // Output: Received number: 1, Received number: 2, ...
Enter fullscreen mode Exit fullscreen mode
  1. Stream and StreamController:
    • A stream represents a sequence of asynchronous events. It is a core concept in Dart's async programming model.
    • StreamController acts as a source of events for a stream. It allows you to add events to the stream and control its flow.
    • Streams provide a way to handle asynchronous data and enable listening to events emitted by the stream.

Example:

   // Creating a StreamController
   final controller = StreamController<int>();

   // Adding events to the stream
   controller.add(1);
   controller.add(2);
   controller.add(3);

   // Listening to the stream
   final subscription = controller.stream.listen((event) {
     print('Received event: $event');
   });

   // Output: Received event: 1, Received event: 2, ...
Enter fullscreen mode Exit fullscreen mode
  1. Subjects:
    • Subjects are a type of Observable and StreamController combined. They can act as both a source of events and a stream to listen to those events.
    • RxDart provides different types of subjects, such as BehaviorSubject, PublishSubject, and ReplaySubject, each with unique characteristics.
    • Subjects are often used for managing state and broadcasting events within reactive programming.

Example:

   // Creating a BehaviorSubject
   final subject = BehaviorSubject<int>();

   // Subscribing to the BehaviorSubject
   final subscription = subject.listen((value) {
     print('Received value: $value');
   });

   // Emitting values through the BehaviorSubject
   subject.add(1);
   subject.add(2);
   subject.add(3);

   // Output: Received value: 1, Received value: 2, ...
Enter fullscreen mode Exit fullscreen mode

Reactive event handling Combining streams and observables in RxDart

1. Reactive Event Handling:

Reactive event handling refers to the ability of RxDart to handle and react to events in a reactive and efficient manner. It allows you to listen to events from various sources, such as user interactions, network responses, or timer events, and perform actions based on those events. RxDart provides operators and techniques to handle events reactively, enabling you to build responsive and dynamic applications.

Example of Reactive Event Handling:

// Creating an Observable for button presses
final buttonPresses = Observable(controller.stream);

// Subscribing to button presses and reacting to the events
final subscription = buttonPresses.listen((event) {
  print('Button pressed!');
});

// Simulating button presses by adding events to the stream
controller.add(true);
controller.add(false);

// Output: Button pressed!, Button pressed!
Enter fullscreen mode Exit fullscreen mode

2. Combining Streams and Observables:

Combining streams and observables in RxDart allows you to merge, combine, or transform multiple streams or observables into a single stream or observable. This capability is useful when you need to handle multiple data sources or perform complex operations on data emitted by different streams or observables.

Example of Combining Streams and Observables:

// Creating two streams
final stream1 = Stream.fromIterable([1, 2, 3]);
final stream2 = Stream.fromIterable([4, 5, 6]);

// Combining the streams into a single stream
final combinedStream = Rx.concat([stream1, stream2]);

// Subscribing to the combined stream and reacting to the events
final subscription = combinedStream.listen((event) {
  print('Combined event: $event');
});

// Output: Combined event: 1, Combined event: 2, ..., Combined event: 6
Enter fullscreen mode Exit fullscreen mode

In this example, the concat operator from RxDart combines two streams into a single stream, merging their events in the order they occur. The resulting combined stream emits events from both stream1 and stream2.

Advanced Techniques and Best Practices

  1. Throttling and Debouncing:
    • Throttling and debouncing are techniques used to control the rate at which events are emitted.
    • Throttling limits the number of events emitted within a specified time interval.
    • Debouncing delays emitting events until a specified quiet period occurs, discarding any previous events within that period.

Example of Throttling:

   // Creating an Observable from button presses
   final buttonPresses = Observable(controller.stream);

   // Throttling the button presses to emit at most one event per 500 milliseconds
   final throttledButtonPresses = buttonPresses.throttleTime(Duration(milliseconds: 500));

   // Subscribing to the throttled button presses
   final subscription = throttledButtonPresses.listen((event) {
     print('Button pressed!');
   });

   // Simulating multiple button presses
   for (int i = 0; i < 10; i++) {
     controller.add(true);
     await Future.delayed(Duration(milliseconds: 100));
   }

   // Output: Button pressed!
Enter fullscreen mode Exit fullscreen mode

Example of Debouncing:

   // Creating an Observable from search queries
   final searchQueries = Observable(controller.stream);

   // Debouncing the search queries to emit events only after 500 milliseconds of quiet period
   final debouncedSearchQueries = searchQueries.debounceTime(Duration(milliseconds: 500));

   // Subscribing to the debounced search queries
   final subscription = debouncedSearchQueries.listen((query) {
     print('Search query: $query');
     // Perform search operation here
   });

   // Simulating search queries
   controller.add('flutter');
   await Future.delayed(Duration(milliseconds: 200));
   controller.add('rx');
   await Future.delayed(Duration(milliseconds: 200));
   controller.add('dart');
   await Future.delayed(Duration(milliseconds: 800));

   // Output: Search query: dart
Enter fullscreen mode Exit fullscreen mode
  1. Error Handling and Retries:
    • RxDart provides operators to handle errors emitted by observables or streams.
    • Error-handling operators like onErrorResumeNext or catchError allow you to handle errors gracefully and provide fallback mechanisms.
    • You can also implement retry mechanisms using operators like retry or retryWhen to automatically retry failed operations.

Example of Error Handling and Retries:

   // Creating an Observable from a network request
   final request = Observable.fromFuture(fetchDataFromNetwork());

   // Handling errors and providing a fallback value
   final response = request.onErrorResumeNext(Observable.just('Fallback response'));

   // Subscribing to the response
   final subscription = response.listen((data) {
     print('Received data: $data');
   }, onError: (error) {
     print('Error occurred: $error');
   });

   // Output: Received data: Fallback response (in case of error)
Enter fullscreen mode Exit fullscreen mode
  1. Memory Management and Resource Disposal:
    • It is essential to manage resources and dispose of subscriptions and subjects properly to avoid memory leaks.
    • Use the takeUntil or takeWhile operators to automatically dispose of subscriptions when certain conditions are met.
    • Dispose of subscriptions and subjects explicitly using the subscription.cancel() or subject.close() methods when they are no longer needed.

Example of Memory Management and Resource Disposal:

   // Creating an Observable from a timer
   final timer = Observable(Stream.periodic(Duration(seconds: 1), (value) => value));



   // Subscribing to the timer and automatically disposing of the subscription after 5 seconds
   final subscription = timer.takeUntil(Observable.timer(null, Duration(seconds: 5))).listen((value) {
     print('Timer value: $value');
   });

   // Output: Timer value: 0, Timer value: 1, Timer value: 2, Timer value: 3, Timer value: 4

   // Disposing of the subscription explicitly after it is no longer needed
   subscription.cancel();
Enter fullscreen mode Exit fullscreen mode

Testing and Debugging with RxDart

Testing and debugging are crucial aspects of any software development process. Here's how you can approach testing and debugging with RxDart, along with an example:

  1. Testing with RxDart:
    • RxDart provides various utilities and techniques to test observables, streams, and operators.
    • Use the TestWidgetsFlutterBinding.ensureInitialized() method to initialize the test environment before running RxDart tests.
    • Utilize the TestStream class from the rxdart/testing.dart package to create testable streams and observables.
    • Use test-specific operators like materialize() and dematerialize() to convert events into notifications that can be easily asserted.

Example of Testing with RxDart:

   import 'package:rxdart/rxdart.dart';
   import 'package:rxdart/testing.dart';
   import 'package:test/test.dart';

   void main() {
     test('Test observable emits correct values', () {
       // Initialize the test environment
       TestWidgetsFlutterBinding.ensureInitialized();

       // Create a TestStream
       final stream = TestStream<int>();

       // Emit values to the stream
       stream.emit(1);
       stream.emit(2);
       stream.emit(3);
       stream.close();

       // Create an observable from the TestStream
       final observable = Observable(stream);

       // Assert the emitted values
       expect(observable, emitsInOrder([1, 2, 3]));
     });
   }
Enter fullscreen mode Exit fullscreen mode
  1. Debugging with RxDart:
    • RxDart provides debugging operators that help analyze and debug observables and streams during development.
    • The doOnData() operator allows you to inspect each emitted data item, enabling you to log or perform other debugging operations.
    • The doOnError() and doOnDone() operators allow you to handle error and completion events respectively for debugging purposes.

Example of Debugging with RxDart:

   // Creating an observable from a list
   final observable = Observable.fromIterable([1, 2, 3, 4, 5]);

   // Adding the doOnData operator for debugging
   final debugObservable = observable.doOnData((data) {
     print('Data: $data');
   });

   // Subscribing to the debugObservable
   final subscription = debugObservable.listen((data) {
     print('Received data: $data');
   }, onError: (error) {
     print('Error occurred: $error');
   }, onDone: () {
     print('Stream completed');
   });

   // Output:
   // Data: 1
   // Received data: 1
   // Data: 2
   // Received data: 2
   // Data: 3
   // Received data: 3
   // Data: 4
   // Received data: 4
   // Data: 5
   // Received data: 5
   // Stream completed
Enter fullscreen mode Exit fullscreen mode

By applying testing and debugging techniques, you can ensure the correctness and reliability of your RxDart code. Test your observables and streams using the provided testing utilities and leverage debugging operators to gain insights into the behavior of your reactive code during development and troubleshooting processes.

Real-World Example

In this example we will build a fully functional app that search a world from JSON API.

Step -1

In first step we will generate a model for our JSON API.
Use can use QuikeType.io to generate it just paste the JSON schema and you have the model

@immutable
class Words {
  final List<String> names;

  const Words({
    required this.names,
  });

  Words copyWith({
    List<String>? names,
  }) =>
      Words(
        names: names ?? this.names,
      );

  factory Words.fromJson(Map<String, dynamic> json) => Words(
        names: List<String>.from(json["names"].map((x) => x)),
      );

  Map<String, dynamic> toJson() => {
        "names": List<dynamic>.from(names.map((x) => x)),
      };
}
Enter fullscreen mode Exit fullscreen mode

This code provides a way to convert Words objects to JSON and vice versa, making it easy to serialize and deserialize the data for communication or storage purposes.

Step -2

Let's build a heraricary that focuses on creating of classes representing different search result states. Each class represents a specific state, such as loading, no result, error, or with a successful result.

import 'package:flutter/foundation.dart' show immutable;

@immutable
abstract class SearchResult {
  const SearchResult();
}

@immutable
class SearchResultLoading implements SearchResult {
  const SearchResultLoading();
}

@immutable
class SearchResultNoResult implements SearchResult {
  const SearchResultNoResult();
}

@immutable
class SearchResultWithError implements SearchResult {
  final Object? error;

  const SearchResultWithError(this.error);
}

@immutable
class SearchResultWithResult implements SearchResult {
  final List<String> result;
  const SearchResultWithResult(this.result);
}
Enter fullscreen mode Exit fullscreen mode

Step - 3

Let's Write code that demonstrates and performs a search operation on a list of words fetched from an API. This code demonstrates the basic steps involved in performing a search operation using a remote API, caching the results, and extracting the matching words based on a search term.

import 'dart:convert';

import 'package:http/http.dart' as http;

class Api {
  List<String>? _words;
  Api();

// Step - 3

  Future<List<String>> search(String searchTerm) async {
    final term = searchTerm.trim().toLowerCase();
    final cachedResult = _extractWordsUsingSearchTerm(searchTerm);
    if (cachedResult != null) {
      return cachedResult;
    }

    // api calling
    final words = await _getData(
        "https://dl.dropboxusercontent.com/s/s4xik49426frdl4/words.json?dl=0");

    _words = words;

    return _extractWordsUsingSearchTerm(term) ?? [];
  }

// Step - 2

  List<String>? _extractWordsUsingSearchTerm(String word) {
    final cachedWords = _words;
    if (cachedWords != null) {
      List<String> result = [];
      for (final worded in cachedWords) {
        if (worded.contains(word.trim().toLowerCase())) {
          result.add(worded);
        }
      }
      return result;
    } else {
      return null;
    }
  }

// Step - 1
  // Future<List<dynamic>> _getData(String url) => HttpClient()
  //     .getUrl(Uri.parse(url))
  //     .then((request) => request.close())
  //     .then((response) => response.transform(utf8.decoder).join())
  //     .then((jsonString) => json.decode(jsonString) as List<dynamic>);

  Future<List<String>?> _getData(String url) async {
    final response = await http.Client().get(Uri.parse(url));
    final parsed = jsonDecode(response.body)['names'];
    List<String>? names = parsed != null ? List.from(parsed) : null;

    return names;
  }
}

// work only on String not list
extension TrimmedCaseInsensitiveContain on String {
  bool trimmedContains(String other) => trim().toLowerCase().contains(
        other.trim().toLowerCase(),
      );
}
Enter fullscreen mode Exit fullscreen mode

Step - 4

This code demonstrates the setup of a reactive search bloc using RxDart. It establishes a bidirectional communication channel for searching by providing a sink to add search terms and a stream to receive search results. The search terms undergo stream transformations to control the search behavior, and the results are emitted as a stream of SearchResult objects with different states.

The result stream is created by chaining several stream transformations on textChanges. It performs the following operations:

- distinct()

ensures that only distinct search terms are processed.

- debounceTime(const Duration(milliseconds: 350))

delays the processing of the search term stream, allowing a brief duration (350 milliseconds) of inactivity before emitting the latest search term. This helps to reduce unnecessary API calls for rapidly changing search terms.

- switchMap((String searchTerm)

maps each search term to a stream of SearchResult objects based on the search operation. If the search term is empty, it immediately emits a null result. Otherwise, it performs the actual search operation using the api.search method, which returns a Future>. This future is wrapped using Rx.fromCallable and then delayed by 1 second using delay to introduce a delay before emitting the result. The mapped stream is further transformed using map to convert the search results into appropriate SearchResult objects (SearchResultNoResult, SearchResultWithResult, or SearchResultLoading) based on the conditions.

- startWith(const SearchResultLoading())

emits a loading result as the initial value when the search begins.

- onErrorReturnWith((error, _) => SearchResultWithError(error))

handles any errors that occur during the search operation and emits a SearchResultWithError object.

import 'dart:async';

import 'package:flutter/foundation.dart' show immutable;
import 'package:infinite_words/bloc/api.dart';
import 'package:infinite_words/bloc/search_result.dart';
import 'package:rxdart/rxdart.dart';

@immutable
class SearchBloc {
  final Sink<String> search;
  final Stream<SearchResult?> results;

  void dispose() {
    search.close();
  }

  factory SearchBloc({required Api api}) {
    final textChanges = BehaviorSubject<String>();

    final result = textChanges
        .distinct()
        .debounceTime(const Duration(milliseconds: 350))
        .switchMap<SearchResult?>((String searchTerm) {
      if (searchTerm.isEmpty) {
        return Stream<SearchResult?>.value(null);
      } else {
        return Rx.fromCallable(() => api.search(searchTerm))
            .delay(const Duration(seconds: 1))
            .map(
              (results) => results.isEmpty
                  ? const SearchResultNoResult()
                  : SearchResultWithResult(results),
            )
            .startWith(const SearchResultLoading())
            .onErrorReturnWith((error, _) => SearchResultWithError(error));
      }
    });

    return SearchBloc._(
      search: textChanges.sink,
      results: result,
    );
  }
  const SearchBloc._({
    required this.search,
    required this.results,
  });
}
Enter fullscreen mode Exit fullscreen mode

Step - 5 UI Development

Custom widget to display out search result in GridView

import 'package:flutter/material.dart';

class GridViewWidget extends StatelessWidget {
  const GridViewWidget({
    Key? key,
    required this.results,
  }) : super(key: key);

  final List<String> results;

  @override
  Widget build(BuildContext context) {
    return Expanded(
      child: GridView.builder(
        itemCount: results.length,
        gridDelegate: const SliverGridDelegateWithMaxCrossAxisExtent(
            maxCrossAxisExtent: 200,
            childAspectRatio: 3 / 2,
            crossAxisSpacing: 20,
            mainAxisSpacing: 20),
        itemBuilder: (context, index) {
          return Container(
            alignment: Alignment.center,
            decoration: BoxDecoration(
                color: Colors.white, borderRadius: BorderRadius.circular(15)),
            child: Text(
              results[index],
              textAlign: TextAlign.center,
              style: const TextStyle(fontSize: 20, fontWeight: FontWeight.bold),
            ),
          );
        },
      ),
    );
  }
}
Enter fullscreen mode Exit fullscreen mode

Step - 6

This code provides a UI representation of the different states of the search results and handles the appropriate rendering based on the received data.

import 'package:flutter/material.dart';
import 'package:infinite_words/bloc/search_result.dart';
import 'package:infinite_words/widgets/grid_view_widget.dart';

class SearchResultView extends StatelessWidget {
  final Stream<SearchResult?> searchResults;
  const SearchResultView({
    Key? key,
    required this.searchResults,
  }) : super(key: key);

  @override
  Widget build(BuildContext context) {
    return StreamBuilder(
      stream: searchResults,
      builder: (
        BuildContext context,
        AsyncSnapshot snapshot,
      ) {
        if (snapshot.hasData) {
          final result = snapshot.data;
          if (result is SearchResultWithError) {
            return const Center(child: Text('Error'));
          } else if (result is SearchResultLoading) {
            return const Center(child: CircularProgressIndicator());
          } else if (result is SearchResultNoResult) {
            return const Center(child: Text('No Result Found'));
          } else if (result is SearchResultWithResult) {
            final results = result.result;
            return GridViewWidget(results: results);
          } else {
            return const Center(child: Text("Unknown State"));
          }
        } else {
          return const Center(
            child: Text(
              "Waiting.....",
              style: TextStyle(color: Colors.white, fontSize: 18),
            ),
          );
        }
      },
    );
  }
}
Enter fullscreen mode Exit fullscreen mode

Step -7

This code sets up the home page of the application with a text input field for searching words and displays the search results using the SearchResultView widget. The SearchBloc manages the search functionality and emits the search results to the UI.

import 'package:flutter/material.dart';
import 'package:infinite_words/bloc/api.dart';
import 'package:infinite_words/bloc/search_bloc.dart';
import 'package:infinite_words/view/search_result_view.dart';

class HomePage extends StatefulWidget {
  const HomePage({Key? key}) : super(key: key);

  @override
  State<HomePage> createState() => _HomePageState();
}

class _HomePageState extends State<HomePage> {
  late final SearchBloc _bloc;

  @override
  void initState() {
    _bloc = SearchBloc(api: Api());
    super.initState();
  }

  @override
  void dispose() {
    super.dispose();
    _bloc.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      backgroundColor: Colors.black,
      // appBar: AppBar(
      //   title: const Text('Search'),
      // ),
      body: Padding(
        padding: const EdgeInsets.all(10),
        child: Column(
          children: [
            const SizedBox(
              height: 50,
            ),
            TextField(
                decoration: InputDecoration(
                    border: InputBorder.none,
                    filled: true,
                    fillColor: Colors.white,
                    contentPadding: const EdgeInsets.only(
                        left: 14.0, bottom: 6.0, top: 8.0),
                    focusedBorder: OutlineInputBorder(
                      borderSide: const BorderSide(color: Colors.grey),
                      borderRadius: BorderRadius.circular(10.0),
                    ),
                    enabledBorder: UnderlineInputBorder(
                      borderSide: const BorderSide(color: Colors.white),
                      borderRadius: BorderRadius.circular(10.0),
                    ),
                    hintText: "Write a word",
                    hintStyle: const TextStyle(
                      fontSize: 20,
                      // color: Colors.white,
                    )),
                onChanged: _bloc.search.add),
            const SizedBox(
              height: 10,
            ),
            SearchResultView(searchResults: _bloc.results)
          ],
        ),
      ),
    );
  }
}
Enter fullscreen mode Exit fullscreen mode

SourceCode

Top comments (2)

Collapse
 
james2023 profile image
James

This article is written excellently and is extremely helpful.Thank you for sharing.

Fair is a Flutter dynamicization framework developed by the 58 Open Source Team. It utilizes its self-developed Fair Compiler tool to transform Dart source files into projects with dynamic updating widget capabilities. I have learned that Fair framework has gained 2.2 thousand stars on GitHub, which is quite an impressive achievement.

Flutter dynamicization framework provides developers with a convenient way to update and modify the user interface and logic of an application without the need for recompiling and republishing the app. This is particularly useful for rapid iteration and real-time updates of applications.

github.com/wuba/Fair

Collapse
 
prasant10050 profile image
Prasant Kumar

Really, it is very nice and clean article about RXDart. Thank you for sharing