Introduction:
Reactive programming is a popular paradigm that enables developers to build highly responsive and scalable applications. When combined with the Flutter framework, it empowers developers to create dynamic and reactive user interfaces. One of the most powerful libraries for reactive programming in Flutter is RxDart. In this blog post, we will explore the fundamentals of reactive programming and demonstrate how RxDart can be leveraged to enhance your Flutter applications.
Outline
Core Concepts of Reactive Programming
At its core, reactive programming revolves around three fundamental concepts:
Observables:
Observables represent a sequence of values that can change over time. They can emit data or events, and other parts of the application can subscribe to these observables to be notified when new values or events are available.Streams:
Streams are a type of observables in reactive programming. They provide a continuous flow of data or events over time. Developers can listen to streams and react to the data or events emitted by them.Data Flow:
Reactive programming encourages a unidirectional data flow, where changes in the observables trigger updates in dependent components or operations. This ensures that the application remains responsive and efficiently handles changes without causing unnecessary side effects.
By leveraging these core concepts, reactive programming enables developers to build applications that can react to user interactions, data updates, and other events in a scalable and efficient manner. It promotes a more declarative and event-driven style of programming, making it easier to handle complex asynchronous operations and maintain a responsive user interface.
RxDart Installation and Setup
To install RxDart and set it up in your Flutter project, follow these steps:
Open your Flutter project in an IDE or text editor.
Open the
pubspec.yaml
file located in the root directory of your Flutter project.In the
dependencies
section of thepubspec.yaml
file, add the following line:
dependencies:
rxdart: ^0.27.1
This line specifies that your project will depend on the RxDart library and uses version 0.27.1 (or the latest version available).
Save the
pubspec.yaml
file.In your IDE or terminal, run the following command to fetch and install the RxDart library:
flutter pub get
This command will download the RxDart library and make it available for use in your Flutter project.
- Once the installation is complete, you can start using RxDart in your Flutter code. Import the RxDart package in the relevant files where you intend to use it:
import 'package:rxdart/rxdart.dart';
- You are now ready to utilize RxDart and its reactive programming capabilities in your Flutter project. Refer to the RxDart documentation and examples to learn about different observables, streams, and operators provided by the library.
Remember to import the necessary classes from the RxDart package whenever you need to use them in your code.
Note: Make sure to follow the Flutter and Dart version compatibility requirements specified by the RxDart library.
Key concept of Observable, Stream, StreamController and Subjects in RxDart
Key Concepts in RxDart:
- Observable:
- Observables represent a stream of data or events that can change over time.
- They allow you to emit values or events and enable other parts of the application to subscribe and react to those emissions.
- Observables can be created from various sources such as lists, futures, or streams.
Example:
// Creating an Observable from a List
final numbers = Observable.fromIterable([1, 2, 3, 4, 5]);
// Subscribing to the Observable
final subscription = numbers.listen((number) {
print('Received number: $number');
});
// Output: Received number: 1, Received number: 2, ...
- Stream and StreamController:
- A stream represents a sequence of asynchronous events. It is a core concept in Dart's async programming model.
- StreamController acts as a source of events for a stream. It allows you to add events to the stream and control its flow.
- Streams provide a way to handle asynchronous data and enable listening to events emitted by the stream.
Example:
// Creating a StreamController
final controller = StreamController<int>();
// Adding events to the stream
controller.add(1);
controller.add(2);
controller.add(3);
// Listening to the stream
final subscription = controller.stream.listen((event) {
print('Received event: $event');
});
// Output: Received event: 1, Received event: 2, ...
- Subjects:
- Subjects are a type of Observable and StreamController combined. They can act as both a source of events and a stream to listen to those events.
- RxDart provides different types of subjects, such as BehaviorSubject, PublishSubject, and ReplaySubject, each with unique characteristics.
- Subjects are often used for managing state and broadcasting events within reactive programming.
Example:
// Creating a BehaviorSubject
final subject = BehaviorSubject<int>();
// Subscribing to the BehaviorSubject
final subscription = subject.listen((value) {
print('Received value: $value');
});
// Emitting values through the BehaviorSubject
subject.add(1);
subject.add(2);
subject.add(3);
// Output: Received value: 1, Received value: 2, ...
Reactive event handling Combining streams and observables in RxDart
1. Reactive Event Handling:
Reactive event handling refers to the ability of RxDart to handle and react to events in a reactive and efficient manner. It allows you to listen to events from various sources, such as user interactions, network responses, or timer events, and perform actions based on those events. RxDart provides operators and techniques to handle events reactively, enabling you to build responsive and dynamic applications.
Example of Reactive Event Handling:
// Creating an Observable for button presses
final buttonPresses = Observable(controller.stream);
// Subscribing to button presses and reacting to the events
final subscription = buttonPresses.listen((event) {
print('Button pressed!');
});
// Simulating button presses by adding events to the stream
controller.add(true);
controller.add(false);
// Output: Button pressed!, Button pressed!
2. Combining Streams and Observables:
Combining streams and observables in RxDart allows you to merge, combine, or transform multiple streams or observables into a single stream or observable. This capability is useful when you need to handle multiple data sources or perform complex operations on data emitted by different streams or observables.
Example of Combining Streams and Observables:
// Creating two streams
final stream1 = Stream.fromIterable([1, 2, 3]);
final stream2 = Stream.fromIterable([4, 5, 6]);
// Combining the streams into a single stream
final combinedStream = Rx.concat([stream1, stream2]);
// Subscribing to the combined stream and reacting to the events
final subscription = combinedStream.listen((event) {
print('Combined event: $event');
});
// Output: Combined event: 1, Combined event: 2, ..., Combined event: 6
In this example, the concat
operator from RxDart combines two streams into a single stream, merging their events in the order they occur. The resulting combined stream emits events from both stream1
and stream2
.
Advanced Techniques and Best Practices
- Throttling and Debouncing:
- Throttling and debouncing are techniques used to control the rate at which events are emitted.
- Throttling limits the number of events emitted within a specified time interval.
- Debouncing delays emitting events until a specified quiet period occurs, discarding any previous events within that period.
Example of Throttling:
// Creating an Observable from button presses
final buttonPresses = Observable(controller.stream);
// Throttling the button presses to emit at most one event per 500 milliseconds
final throttledButtonPresses = buttonPresses.throttleTime(Duration(milliseconds: 500));
// Subscribing to the throttled button presses
final subscription = throttledButtonPresses.listen((event) {
print('Button pressed!');
});
// Simulating multiple button presses
for (int i = 0; i < 10; i++) {
controller.add(true);
await Future.delayed(Duration(milliseconds: 100));
}
// Output: Button pressed!
Example of Debouncing:
// Creating an Observable from search queries
final searchQueries = Observable(controller.stream);
// Debouncing the search queries to emit events only after 500 milliseconds of quiet period
final debouncedSearchQueries = searchQueries.debounceTime(Duration(milliseconds: 500));
// Subscribing to the debounced search queries
final subscription = debouncedSearchQueries.listen((query) {
print('Search query: $query');
// Perform search operation here
});
// Simulating search queries
controller.add('flutter');
await Future.delayed(Duration(milliseconds: 200));
controller.add('rx');
await Future.delayed(Duration(milliseconds: 200));
controller.add('dart');
await Future.delayed(Duration(milliseconds: 800));
// Output: Search query: dart
- Error Handling and Retries:
- RxDart provides operators to handle errors emitted by observables or streams.
- Error-handling operators like
onErrorResumeNext
orcatchError
allow you to handle errors gracefully and provide fallback mechanisms. - You can also implement retry mechanisms using operators like
retry
orretryWhen
to automatically retry failed operations.
Example of Error Handling and Retries:
// Creating an Observable from a network request
final request = Observable.fromFuture(fetchDataFromNetwork());
// Handling errors and providing a fallback value
final response = request.onErrorResumeNext(Observable.just('Fallback response'));
// Subscribing to the response
final subscription = response.listen((data) {
print('Received data: $data');
}, onError: (error) {
print('Error occurred: $error');
});
// Output: Received data: Fallback response (in case of error)
- Memory Management and Resource Disposal:
- It is essential to manage resources and dispose of subscriptions and subjects properly to avoid memory leaks.
- Use the
takeUntil
ortakeWhile
operators to automatically dispose of subscriptions when certain conditions are met. - Dispose of subscriptions and subjects explicitly using the
subscription.cancel()
orsubject.close()
methods when they are no longer needed.
Example of Memory Management and Resource Disposal:
// Creating an Observable from a timer
final timer = Observable(Stream.periodic(Duration(seconds: 1), (value) => value));
// Subscribing to the timer and automatically disposing of the subscription after 5 seconds
final subscription = timer.takeUntil(Observable.timer(null, Duration(seconds: 5))).listen((value) {
print('Timer value: $value');
});
// Output: Timer value: 0, Timer value: 1, Timer value: 2, Timer value: 3, Timer value: 4
// Disposing of the subscription explicitly after it is no longer needed
subscription.cancel();
Testing and Debugging with RxDart
Testing and debugging are crucial aspects of any software development process. Here's how you can approach testing and debugging with RxDart, along with an example:
- Testing with RxDart:
- RxDart provides various utilities and techniques to test observables, streams, and operators.
- Use the
TestWidgetsFlutterBinding.ensureInitialized()
method to initialize the test environment before running RxDart tests. - Utilize the
TestStream
class from therxdart/testing.dart
package to create testable streams and observables. - Use test-specific operators like
materialize()
anddematerialize()
to convert events into notifications that can be easily asserted.
Example of Testing with RxDart:
import 'package:rxdart/rxdart.dart';
import 'package:rxdart/testing.dart';
import 'package:test/test.dart';
void main() {
test('Test observable emits correct values', () {
// Initialize the test environment
TestWidgetsFlutterBinding.ensureInitialized();
// Create a TestStream
final stream = TestStream<int>();
// Emit values to the stream
stream.emit(1);
stream.emit(2);
stream.emit(3);
stream.close();
// Create an observable from the TestStream
final observable = Observable(stream);
// Assert the emitted values
expect(observable, emitsInOrder([1, 2, 3]));
});
}
- Debugging with RxDart:
- RxDart provides debugging operators that help analyze and debug observables and streams during development.
- The
doOnData()
operator allows you to inspect each emitted data item, enabling you to log or perform other debugging operations. - The
doOnError()
anddoOnDone()
operators allow you to handle error and completion events respectively for debugging purposes.
Example of Debugging with RxDart:
// Creating an observable from a list
final observable = Observable.fromIterable([1, 2, 3, 4, 5]);
// Adding the doOnData operator for debugging
final debugObservable = observable.doOnData((data) {
print('Data: $data');
});
// Subscribing to the debugObservable
final subscription = debugObservable.listen((data) {
print('Received data: $data');
}, onError: (error) {
print('Error occurred: $error');
}, onDone: () {
print('Stream completed');
});
// Output:
// Data: 1
// Received data: 1
// Data: 2
// Received data: 2
// Data: 3
// Received data: 3
// Data: 4
// Received data: 4
// Data: 5
// Received data: 5
// Stream completed
By applying testing and debugging techniques, you can ensure the correctness and reliability of your RxDart code. Test your observables and streams using the provided testing utilities and leverage debugging operators to gain insights into the behavior of your reactive code during development and troubleshooting processes.
Real-World Example
In this example we will build a fully functional app that search a world from JSON API.
Step -1
In first step we will generate a model for our JSON API.
Use can use QuikeType.io to generate it just paste the JSON schema and you have the model
@immutable
class Words {
final List<String> names;
const Words({
required this.names,
});
Words copyWith({
List<String>? names,
}) =>
Words(
names: names ?? this.names,
);
factory Words.fromJson(Map<String, dynamic> json) => Words(
names: List<String>.from(json["names"].map((x) => x)),
);
Map<String, dynamic> toJson() => {
"names": List<dynamic>.from(names.map((x) => x)),
};
}
This code provides a way to convert Words objects to JSON and vice versa, making it easy to serialize and deserialize the data for communication or storage purposes.
Step -2
Let's build a heraricary that focuses on creating of classes representing different search result states. Each class represents a specific state, such as loading, no result, error, or with a successful result.
import 'package:flutter/foundation.dart' show immutable;
@immutable
abstract class SearchResult {
const SearchResult();
}
@immutable
class SearchResultLoading implements SearchResult {
const SearchResultLoading();
}
@immutable
class SearchResultNoResult implements SearchResult {
const SearchResultNoResult();
}
@immutable
class SearchResultWithError implements SearchResult {
final Object? error;
const SearchResultWithError(this.error);
}
@immutable
class SearchResultWithResult implements SearchResult {
final List<String> result;
const SearchResultWithResult(this.result);
}
Step - 3
Let's Write code that demonstrates and performs a search operation on a list of words fetched from an API. This code demonstrates the basic steps involved in performing a search operation using a remote API, caching the results, and extracting the matching words based on a search term.
import 'dart:convert';
import 'package:http/http.dart' as http;
class Api {
List<String>? _words;
Api();
// Step - 3
Future<List<String>> search(String searchTerm) async {
final term = searchTerm.trim().toLowerCase();
final cachedResult = _extractWordsUsingSearchTerm(searchTerm);
if (cachedResult != null) {
return cachedResult;
}
// api calling
final words = await _getData(
"https://dl.dropboxusercontent.com/s/s4xik49426frdl4/words.json?dl=0");
_words = words;
return _extractWordsUsingSearchTerm(term) ?? [];
}
// Step - 2
List<String>? _extractWordsUsingSearchTerm(String word) {
final cachedWords = _words;
if (cachedWords != null) {
List<String> result = [];
for (final worded in cachedWords) {
if (worded.contains(word.trim().toLowerCase())) {
result.add(worded);
}
}
return result;
} else {
return null;
}
}
// Step - 1
// Future<List<dynamic>> _getData(String url) => HttpClient()
// .getUrl(Uri.parse(url))
// .then((request) => request.close())
// .then((response) => response.transform(utf8.decoder).join())
// .then((jsonString) => json.decode(jsonString) as List<dynamic>);
Future<List<String>?> _getData(String url) async {
final response = await http.Client().get(Uri.parse(url));
final parsed = jsonDecode(response.body)['names'];
List<String>? names = parsed != null ? List.from(parsed) : null;
return names;
}
}
// work only on String not list
extension TrimmedCaseInsensitiveContain on String {
bool trimmedContains(String other) => trim().toLowerCase().contains(
other.trim().toLowerCase(),
);
}
Step - 4
This code demonstrates the setup of a reactive search bloc using RxDart. It establishes a bidirectional communication channel for searching by providing a sink to add search terms and a stream to receive search results. The search terms undergo stream transformations to control the search behavior, and the results are emitted as a stream of SearchResult objects with different states.
The result stream is created by chaining several stream transformations on textChanges. It performs the following operations:
- distinct()
ensures that only distinct search terms are processed.
- debounceTime(const Duration(milliseconds: 350))
delays the processing of the search term stream, allowing a brief duration (350 milliseconds) of inactivity before emitting the latest search term. This helps to reduce unnecessary API calls for rapidly changing search terms.
- switchMap((String searchTerm)
maps each search term to a stream of SearchResult objects based on the search operation. If the search term is empty, it immediately emits a null result. Otherwise, it performs the actual search operation using the api.search method, which returns a Future>. This future is wrapped using Rx.fromCallable and then delayed by 1 second using delay to introduce a delay before emitting the result. The mapped stream is further transformed using map to convert the search results into appropriate SearchResult objects (SearchResultNoResult, SearchResultWithResult, or SearchResultLoading) based on the conditions.
- startWith(const SearchResultLoading())
emits a loading result as the initial value when the search begins.
- onErrorReturnWith((error, _) => SearchResultWithError(error))
handles any errors that occur during the search operation and emits a SearchResultWithError object.
import 'dart:async';
import 'package:flutter/foundation.dart' show immutable;
import 'package:infinite_words/bloc/api.dart';
import 'package:infinite_words/bloc/search_result.dart';
import 'package:rxdart/rxdart.dart';
@immutable
class SearchBloc {
final Sink<String> search;
final Stream<SearchResult?> results;
void dispose() {
search.close();
}
factory SearchBloc({required Api api}) {
final textChanges = BehaviorSubject<String>();
final result = textChanges
.distinct()
.debounceTime(const Duration(milliseconds: 350))
.switchMap<SearchResult?>((String searchTerm) {
if (searchTerm.isEmpty) {
return Stream<SearchResult?>.value(null);
} else {
return Rx.fromCallable(() => api.search(searchTerm))
.delay(const Duration(seconds: 1))
.map(
(results) => results.isEmpty
? const SearchResultNoResult()
: SearchResultWithResult(results),
)
.startWith(const SearchResultLoading())
.onErrorReturnWith((error, _) => SearchResultWithError(error));
}
});
return SearchBloc._(
search: textChanges.sink,
results: result,
);
}
const SearchBloc._({
required this.search,
required this.results,
});
}
Step - 5 UI Development
Custom widget to display out search result in GridView
import 'package:flutter/material.dart';
class GridViewWidget extends StatelessWidget {
const GridViewWidget({
Key? key,
required this.results,
}) : super(key: key);
final List<String> results;
@override
Widget build(BuildContext context) {
return Expanded(
child: GridView.builder(
itemCount: results.length,
gridDelegate: const SliverGridDelegateWithMaxCrossAxisExtent(
maxCrossAxisExtent: 200,
childAspectRatio: 3 / 2,
crossAxisSpacing: 20,
mainAxisSpacing: 20),
itemBuilder: (context, index) {
return Container(
alignment: Alignment.center,
decoration: BoxDecoration(
color: Colors.white, borderRadius: BorderRadius.circular(15)),
child: Text(
results[index],
textAlign: TextAlign.center,
style: const TextStyle(fontSize: 20, fontWeight: FontWeight.bold),
),
);
},
),
);
}
}
Step - 6
This code provides a UI representation of the different states of the search results and handles the appropriate rendering based on the received data.
import 'package:flutter/material.dart';
import 'package:infinite_words/bloc/search_result.dart';
import 'package:infinite_words/widgets/grid_view_widget.dart';
class SearchResultView extends StatelessWidget {
final Stream<SearchResult?> searchResults;
const SearchResultView({
Key? key,
required this.searchResults,
}) : super(key: key);
@override
Widget build(BuildContext context) {
return StreamBuilder(
stream: searchResults,
builder: (
BuildContext context,
AsyncSnapshot snapshot,
) {
if (snapshot.hasData) {
final result = snapshot.data;
if (result is SearchResultWithError) {
return const Center(child: Text('Error'));
} else if (result is SearchResultLoading) {
return const Center(child: CircularProgressIndicator());
} else if (result is SearchResultNoResult) {
return const Center(child: Text('No Result Found'));
} else if (result is SearchResultWithResult) {
final results = result.result;
return GridViewWidget(results: results);
} else {
return const Center(child: Text("Unknown State"));
}
} else {
return const Center(
child: Text(
"Waiting.....",
style: TextStyle(color: Colors.white, fontSize: 18),
),
);
}
},
);
}
}
Step -7
This code sets up the home page of the application with a text input field for searching words and displays the search results using the SearchResultView widget. The SearchBloc manages the search functionality and emits the search results to the UI.
import 'package:flutter/material.dart';
import 'package:infinite_words/bloc/api.dart';
import 'package:infinite_words/bloc/search_bloc.dart';
import 'package:infinite_words/view/search_result_view.dart';
class HomePage extends StatefulWidget {
const HomePage({Key? key}) : super(key: key);
@override
State<HomePage> createState() => _HomePageState();
}
class _HomePageState extends State<HomePage> {
late final SearchBloc _bloc;
@override
void initState() {
_bloc = SearchBloc(api: Api());
super.initState();
}
@override
void dispose() {
super.dispose();
_bloc.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
backgroundColor: Colors.black,
// appBar: AppBar(
// title: const Text('Search'),
// ),
body: Padding(
padding: const EdgeInsets.all(10),
child: Column(
children: [
const SizedBox(
height: 50,
),
TextField(
decoration: InputDecoration(
border: InputBorder.none,
filled: true,
fillColor: Colors.white,
contentPadding: const EdgeInsets.only(
left: 14.0, bottom: 6.0, top: 8.0),
focusedBorder: OutlineInputBorder(
borderSide: const BorderSide(color: Colors.grey),
borderRadius: BorderRadius.circular(10.0),
),
enabledBorder: UnderlineInputBorder(
borderSide: const BorderSide(color: Colors.white),
borderRadius: BorderRadius.circular(10.0),
),
hintText: "Write a word",
hintStyle: const TextStyle(
fontSize: 20,
// color: Colors.white,
)),
onChanged: _bloc.search.add),
const SizedBox(
height: 10,
),
SearchResultView(searchResults: _bloc.results)
],
),
),
);
}
}
Top comments (2)
This article is written excellently and is extremely helpful.Thank you for sharing.
Fair is a Flutter dynamicization framework developed by the 58 Open Source Team. It utilizes its self-developed Fair Compiler tool to transform Dart source files into projects with dynamic updating widget capabilities. I have learned that Fair framework has gained 2.2 thousand stars on GitHub, which is quite an impressive achievement.
Flutter dynamicization framework provides developers with a convenient way to update and modify the user interface and logic of an application without the need for recompiling and republishing the app. This is particularly useful for rapid iteration and real-time updates of applications.
github.com/wuba/Fair
Really, it is very nice and clean article about RXDart. Thank you for sharing