DEV Community

Cover image for Server-Sent Events(SSE) com dart (Shelf e Flutter)
Toshi Ossada for flutterbrasil

Posted on

1

Server-Sent Events(SSE) com dart (Shelf e Flutter)

Fala Devs! Com toda certeza você já deve ter tido a necessidade de fazer uma requisição HTTP, eu mesmo já escrevi um artigo muito legal que utilizamos o DIO como cliente HTTP para fazermos requisições a nossa API (https://blog.flutterando.com.br/consumindo-api-utilizando-o-dio-9ec72aeceeaa).

Entretanto podemos passar por situações que precisamos fazer uma requisição em que o resultado venha de forma gradativa ao cliente, há algumas formas que podemos realizar esta tarefa, a primeira maneira é fazer que nosso cliente fique acessando o servidor continuamente em busca de atualizações, esta técnica é chamada de pooling.

Outra forma seria utilizar os WebSockets, eles são talvez a melhor opção quando queremos receber dados de forma gradativa e em tempo real.

Em seu funcionamento é criada uma sessão de comunicação interativa entre o cliente e o servidor, recebendo mensagens em tempo real, funcionando de forma semelhante ao Firestore.

Entretanto ela é uma técnica um pouco custosa e complexa, e as vezes não precisamos de lima solução tão robusta assim, então existe um meio termo?

E a resposta é SIM! O Server-Sent Events(popularmente conhecido como SSE), um método que funciona de forma semelhante ao Websocket, ele basicamente definira uma comunicação em uma API chamada de EventSource e esta comunicação se manterá aberta recebendo mensagens de forma gradativa do servidor.

E um bônus, neste artigo não implementaremos apenas o cliente, veremos também como criar o SSE do lado do servidor e para isto utilizaremos, é claro, o DART para o backend com o pacote SHELF. https://pub.dev/packages/shelfv

Servidor

Para isto vamos criar um projeto em branco em Dart. Execute

dart create server

Utilizaremos três pacotes para desenvolver nossa API em Dart:

Shelf(https://pub.dev/packages/shelf) — Pacote responsável por nos ajudar a criar nossos webservers

Shelf_router(https://pub.dev/packages/shelf_router) — Pacote que trata a criação de caminhas para os nossos endpoints.

Eventsource(https://pub.dev/packages/eventsource) — Pacote responsável por implementar o padrão do eventsource(SSE) em nossa api.

Com esses três pacotes nosso pubspec deverá ficar da seguinte maneira

A primeira tarefa que faremos é fazer uma implementação do eventsource em nosso projeto

Basta adicionarmos a implementação a seguir

library shelf_eventsource;
import "dart:convert";
import "package:eventsource/publisher.dart";
import "package:eventsource/src/encoder.dart";
import "package:shelf/shelf.dart";
/// Create a shelf handler for the specified channel.
/// This handler can be passed to the [shelf.serve] method.
Handler eventSourceHandler(
EventSourcePublisher publisher, {
String channel = "",
bool gzip = false,
}) {
// define the handler
Response shelfHandler(Request request) {
if (request.method != "GET") {
return Response.notFound(null);
}
if (!request.canHijack) {
throw ArgumentError("eventSourceHandler may only be used with a "
"server that supports request hijacking.");
}
// set content encoding to gzip if we allow it and the request supports it
bool useGzip =
gzip && (request.headers["Accept-Encoding"] ?? "").contains("gzip");
// hijack the raw underlying channel
request.hijack((untypedChannel) {
var socketChannel = (untypedChannel).cast<List<int>>();
// create a regular UTF8 sink to write headers
var sink = utf8.encoder.startChunkedConversion(socketChannel.sink);
// write headers
sink.add("HTTP/1.1 200 OK\r\n"
"Content-Type: text/event-stream; charset=utf-8\r\n"
"Cache-Control: no-cache, no-store, must-revalidate\r\n"
"Connection: keep-alive\r\n");
if (useGzip) sink.add("Content-Encoding: gzip\r\n");
sink.add("\r\n");
// create encoder for this connection
var encodedSink = EventSourceEncoder(compressed: useGzip)
.startChunkedConversion(socketChannel.sink);
// initialize the new subscription
publisher.newSubscription(
onEvent: encodedSink.add,
onClose: encodedSink.close,
channel: channel,
lastEventId: request.headers["Last-Event-ID"]);
});
}
return shelfHandler;
}

Agora podemos implementar o entry point do nossa API que está no sse.dart

Primeiramente criamos o método que irá gerar as as mensagens retornadas para o cliente de forma assíncrona, para isto basta quando quisermos retornar alguma informação executar o publisher.add() passando um evento de resposta e na hora que finalizar o processamento executar o publisher.close()

Feito isso podemos criar nosso endpoint (no caso o /events) e passar nosso EventSourcePublisher como parâmetro do método eventSourceHandler.

import "dart:async";
import 'dart:convert';
import 'package:eventsource/publisher.dart';
import "package:shelf/shelf_io.dart" as io;
import 'package:shelf_router/shelf_router.dart';
import 'eventsource.dart';
main() {
var app = Router();
app.get("/events", (r) {
final publisher = EventSourcePublisher();
generateEvents(publisher);
var handler = eventSourceHandler(publisher);
handler(r);
});
io.serve(app, "localhost", 8080);
}
generateEvents(EventSourcePublisher publisher) {
int id = 0;
Timer.periodic(const Duration(seconds: 1), (timer) {
final data = json.encode({
'id': id,
'message': 'event $id',
'finished': id == 10,
});
publisher.add(Event(data: data));
if (id == 10) {
timer.cancel();
publisher.close();
}
id++;
});
}
view raw server.dart hosted with ❤ by GitHub

E sucesso! Nossa API está pronta, basta executar e ela estará disponível na porta que configuramos (8080), execute:

dart run

O resultado será o seguinte:

Cliente

No nosso aplicativo Flutter só iremos precisar do pacote eventsource(https://pub.dev/packages/eventsource)

Adicione ele no seu pubspec

Este pacote nos oferece um objeto Eventsource que contém o método conect() que faz a conexão com a API e rebe as mensagens. Como próximo passo devemos criar uma Stream e emitir o resultado da mensagem que vem da API nessa Stream.

Também devemos criar um getter para conseguir acessar a stream.

A implementação da classe completa deve ficar assim:

import 'dart:async';
import 'dart:convert';
import 'package:eventsource/eventsource.dart';
import '../enums/sse_enum.dart';
import '../sse_adapter.dart';
class EventsourceSseAdapterImpl implements SseAdapter {
final kTimeout = const Duration(seconds: 30);
late StreamController<Map> _streamController;
_url({
required String baseUrl,
String? relativePath,
String? path,
String scheme = 'http',
Map<String, dynamic>? queryParameters,
}) {
final listQueryString = <String>[];
var q = '';
queryParameters?.forEach((key, value) {
listQueryString.add('$key=$value');
});
if (listQueryString.isNotEmpty) q = '?${listQueryString.join('&')}';
if (scheme.isNotEmpty) {
baseUrl = baseUrl.replaceAll('http://', '').replaceAll('https://', '');
baseUrl = '$scheme://$baseUrl';
}
return '$baseUrl${relativePath ?? ''}${path ?? ''}$q';
}
_body(Map<String, dynamic>? data) {
if (data == null) return null;
return json.encode(data);
}
@override
Future<SseAdapter> connect({
required String baseUrl,
String? relativePath,
required String path,
String scheme = 'http',
bool closeOnError = true,
Map<String, dynamic>? queryParameters,
Map<String, String>? headers,
Map<String, dynamic>? data,
SseMethod method = SseMethod.GET,
}) async {
try {
late final EventSource _eventSources;
final url = _url(
baseUrl: baseUrl,
relativePath: relativePath,
path: path,
queryParameters: queryParameters,
scheme: scheme,
);
final body = _body(data);
_streamController = StreamController<Map>();
_eventSources = await EventSource.connect(
url,
headers: headers,
method: method.value,
body: body,
);
_eventSources.listen((Event event) {
try {
if (!_streamController.isClosed && event.data != null) {
var dataJson = json.decode(event.data!);
_streamController.add(dataJson);
}
} catch (e) {
rethrow;
}
}).onError((e) {
_streamController.addError(Exception("Erro SSE"));
if (closeOnError) close();
});
return this;
} catch (e) {
rethrow;
}
}
@override
Stream get stream => _streamController.stream.timeout(kTimeout);
@override
bool isClosed() => _streamController.isClosed;
@override
void close() {
_streamController.close();
}
}

Agora temos uma stream que emite um resultado de um Map agora em nossa datasource podemos escutar essa stream para ela fazer o mapper do JSON para um Model

No repositório fazemos um mapper de model para entity.

E por fim na nossa controller podemos escutar a Stream e mandar atualizar nossa store para fazer reatividade na nossa page.

Desta forma teremos o seguinte resultado

Legal ne?? muito simples trabalhar com SSE em nossos aplicativos.

O projeto completo pode conferir abaixo
https://github.com/toshiossada/flutter_sse

Image description

Entre em nosso discord para interagir com a comunidade: https://discord.com/invite/flutterbrasil
https://linktr.ee/flutterbrasil

Do your career a big favor. Join DEV. (The website you're on right now)

It takes one minute, it's free, and is worth it for your career.

Get started

Community matters

Top comments (1)

Collapse
 
der_gopher profile image
Alex Pliutau

Great write up! Does anyone use Server-Sent Events in their projects? If yes, for which use cases? This video dives into the main building blocks of Server-Sent Events in Go.
youtu.be/nvijc5J-JAQ

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs

AWS Security LIVE!

Hosted by security experts, AWS Security LIVE! showcases AWS Partners tackling real-world security challenges. Join live and get your security questions answered.

Tune in to the full event

DEV is partnering to bring live events to the community. Join us or dismiss this billboard if you're not interested. ❤️