- written by Bishoy Hany
The Monday Morning Problem
Imagine the following scenario: Monday, 9 AM. You grab your coffee, join the stand-up call, and suddenly realise: you have absolutely no idea what you did last week.
The weekend wiped your mental cache clean. Was it the React component refactor? Or was that two weeks ago? You vaguely remember fighting with TypeScript errors on Thursday, but what was the actual solution? Your Git commits say 'fix: update logic'—thanks, past you, very helpful.
Here's the thing: your brain isn't built to be a perfect activity log. But your computer? It remembers everything. That's where PiecesOS comes in.
The Goal
Create an app that tracks what work is done every day.
For Part 1, we will work on:
- Connecting to PiecesOS
- Getting our workstream summaries
- Grouping the summaries by day
- Keeping everything updated in real-time
We’ll hit a UI later on. Let’s take this one step at a time!
Prerequisites
Dart SDK version 3.8.1 or higher (flutter.dev)
Gemini API key (will mention that in part 2
Setting Up
Let’s begin by creating a new flutter app
flutter create daily_recap_app
Before you can start using the Pieces SDK, you’ll need to add it to your Dart or Flutter project. Open your project’s pubspec.yaml file and add the following under ‘dependencies:’:
dependencies:
# The Pieces SDK
pieces_os_client:
git:
url: https://github.com/pieces-app/pieces-os-client-sdk-for-dart.git
web_socket_channel: ^3.0.1
The SDK is automatically generated from our API setup, so it’s mostly made up of code that helps your app talk to Pieces’ api.
We also added the web_socket_channel dependency to connect to the Pieces web-socket
Step 1: Connecting to PiecesOS
Alright, first challenge – how do we even talk to PiecesOS?
Let's first create a new file lib/services/pieces_os_service.dart
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:pieces_os_client/api.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
class PiecesOSService {
// Super important Pieces sometimes could run on other ports so you should implement your own port-finding algo
// here is example using the python sdk
// https://github.com/pieces-app/pieces-os-client-sdk-for-python/blob/db1fa83f21407323b98d21507a59d1e4f176d064/src/pieces_os_client/wrapper/client.py#L139-L168
// We gonna use 39300 for simplicity and PiecesOS will be running on port 39300 for most of the time as well
static const String baseUrl = 'http://localhost:39300';
static const String websocketUrl = 'ws://localhost:39300';
// Used to register application
late final ConnectorApi _connectorApi;
// Used to retrieve the workstream summaries
late final WorkstreamSummaryApi _workstreamSummaryApi;
// Used to retrieve the summary content
late final AnnotationApi _annotationApi;
// Cache for workstream summaries grouped by day
final Map<DateTime, List<WorkstreamSummary>> _summariesByDay = {};
PiecesOSService() {
final client = ApiClient(basePath: baseUrl);
_connectorApi = ConnectorApi(client);
_workstreamSummaryApi = WorkstreamSummaryApi(client);
_annotationApi = AnnotationApi(client);
}
}
Nothing fancy – just setting up our API clients.
Now the fun part – actually connecting:
// existing code from above
Future<Application> connectApplication() async {
final seededApp = SeededTrackedApplication(
name: ApplicationNameEnum.OPEN_SOURCE, // Hey Pieces, I'm open source!
platform: Platform.operatingSystem == "macos"
? PlatformEnum.MACOS
: PlatformEnum.WINDOWS,
version: "0.0.1",
);
final connection = SeededConnectorConnection(
application: seededApp,
);
_context = await _connectorApi.connect(
seededConnectorConnection: connection,
);
print('Successfully connected to Pieces OS!');
return _context!.application;
}
Basically, we're saying, "Hey Pieces, I'm a new app, let me in!" and it gives us back a context with our application info.
Step 2: The WebSocket
This websocket specifically sends us all of the IDs for the users' workstream summaries in Pieces upon first connection. After, it sends only the IDs of either newly created summaries, summaries that have updated data, or summaries that were deleted.
Here's the setup:
// existing code from above
void _startWebSocketListener() {
final wsUrl = '$websocketUrl/workstream_summaries/stream/identifiers';
_wsChannel = WebSocketChannel.connect(Uri.parse(wsUrl));
print('WebSocket connected to $wsUrl');
_wsSubscription = _wsChannel!.stream.listen(
(message) async {
try {
// Important: WebSocket sends JSON strings, decode them first!
final streamedIdentifiers = StreamedIdentifiers.fromJson(
jsonDecode(message),
);
// Loop through each identifier we received
for (final id in streamedIdentifiers!.iterable) {
final summaryId = id.workstreamSummary?.id;
if (summaryId != null) {
await _fetchAndCacheSummary(summaryId);
}
}
} catch (e) {
print('Error processing message: $e');
}
},
onDone: () {
print('WebSocket closed, reconnecting...');
_reconnectWebSocket();
},
onError: (error) {
print('WebSocket error: $error');
_reconnectWebSocket();
},
);
}
Here's the beauty of this approach: The WebSocket just keeps until their app shuts down or they manually close it, sending us identifiers. When we first connect, it dumps ALL existing identifiers on us (could be from a year ago!) and stays open to send us new ones as they're created in real-time. One connection handles everything!
Step 3: Fetching and Caching Summaries
Okay, so we have identifiers. Now what? We need to actually fetch the summary data and organize it.
Here's what a WorkstreamSummary looks like (the important parts anyway):
// This is a model defined in the sdks don't add any thing to the code yet.
class WorkstreamSummary {
String id; // Unique identifier
String? name; // Optional name/title
GroupedTimestamp created; // When it was created
GroupedTimestamp? updated; // When it was last updated
// ... and a bunch of other fields
}
And GroupedTimestamp is basically:
// This is a model defined in the sdks don't add any thing to the code yet.
class GroupedTimestamp {
DateTime value; // The actual timestamp
}
So when we fetch a summary, we need to:
- Get the created date
- Strip the time from the date
- Add the summary to that day's list
- Sort everything by time
Here's the code:
// existing code from above
Future<void> _fetchAndCacheSummary(String identifier) async {
final summary = await _workstreamSummaryApi
.workstreamSummariesSpecificWorkstreamSummarySnapshot(identifier);
if (summary == null) {
return;
}
// Normalize to day (remove time)
final createdDate = summary.created.value;
final dayKey = DateTime(
createdDate.year,
createdDate.month,
createdDate.day,
);
// Create day entry if it doesn't exist
_summariesByDay.putIfAbsent(dayKey, () => []);
// Check for duplicates (avoid adding the same summary twice)
final existingIndex = _summariesByDay[dayKey]!
.indexWhere((s) => s.id == summary.id);
if (existingIndex != -1) {
// Update existing
_summariesByDay[dayKey]![existingIndex] = summary;
} else {
// Add new
_summariesByDay[dayKey]!.add(summary);
}
// Sort by time (most recent first)
_summariesByDay[dayKey]!.sort((a, b) {
return b.created.value.compareTo(a.created.value);
});
// Notify anyone listening
_summariesStreamController.add(Map.unmodifiable(_summariesByDay));
print('Cached summary ${summary.id} for day $dayKey');
}
The _summariesByDay is just aMap<DateTime, List<WorkstreamSummary>> where:
- Key: A DateTime set to midnight (e.g., Nov 4, 2025 at 00:00:00)
- Value: List of all summaries for that day, sorted by time
Let's also add some useful methods to retrieve a summary
// existing code
/// Get summaries for a specific day from cache
List<WorkstreamSummary> getSummariesForDay(DateTime date) {
final dayKey = DateTime(date.year, date.month, date.day);
return _summariesByDay[dayKey] ?? [];
}
/// Get all days that have summaries (sorted most recent first)
List<DateTime> getDaysWithSummaries() {
final days = _summariesByDay.keys.toList();
days.sort((a, b) => b.compareTo(a)); // Most recent first
return days;
}
Step 4: Keeping Everything in Sync
Speaking of streams – let's add a broadcast stream that others can listen to:
// existing code
final StreamController<Map<DateTime, List<WorkstreamSummary>>>
_summariesStreamController = StreamController.broadcast();
Stream<Map<DateTime, List<WorkstreamSummary>>> get summariesStream =>
_summariesStreamController.stream;
So in the UI (when we build it), we can just do:
service.summariesStream.listen((summaries) {
print('Got new data!');
// Update UI here
});
Step 4.5: Waiting for Initial WebSocket Sync
Here's a problem: when the app first starts, the WebSocket dumps all existing summaries on us. This could be hundreds of identifiers! If we try to show the UI or generate a recap before they're all loaded, we'll have incomplete data.
But we only need to wait once – on the first message. After that, the WebSocket just sends new updates in real-time and we don't want to block.
Here's the solution using a Completer:
Step 1: Add Fields and Wait Method
Place this code block right after the PiecesOSService() constructor and before the connectApplication() method:
// Add these fields to the class
Completer<void>? _initialSyncCompleter;
bool get _isInitialSyncComplete =>
_initialSyncCompleter?.isCompleted ?? false;
/// Wait for the initial WebSocket sync to complete
/// Only blocks on the first call, returns immediately after
Future<void> waitForInitialSync() async {
// If already synced, return immediately
if (_isInitialSyncComplete) {
return;
}
// If sync is in progress, wait for it
if (_initialSyncCompleter != null) {
return _initialSyncCompleter!.future;
}
// Start waiting for first sync
_initialSyncCompleter = Completer<void>();
return _initialSyncCompleter!.future;
}
Step 2: Update WebSocket Listener
Replace _wsSubscription with this new section:
_wsSubscription = _wsChannel!.stream.listen(
(message) async {
try {
final streamedIdentifiers = StreamedIdentifiers.fromJson(
jsonDecode(message),
);
// Process all identifiers
for (final id in streamedIdentifiers!.iterable) {
final summaryId = id.workstreamSummary?.id;
if (summaryId != null) {
await _fetchAndCacheSummary(summaryId);
}
}
// Mark initial sync as complete after first message
if (!_isInitialSyncComplete) {
_isInitialSyncComplete = true;
_initialSyncCompleter?.complete();
print('Initial WebSocket sync complete!');
}
} catch (e) {
print('Error processing message: $e');
}
},
// ... onDone, onError handlers
);
Step 3: Use in App Initialization
Place this code in your app's entry point (typically lib/main.dart, create the file if you don't have it):
import 'services/pieces_os_service.dart';
Future<void> main() async {
final service = PiecesOSService();
await service.connectApplication();
service.startWebSocketListener();
// Wait for all existing summaries to load
await service.waitForInitialSync();
// Now we can safely generate recaps or show UI!
print('Ready to go! All summaries loaded.');
}
Step 5: Accessing the Actual Summary Content
WorkstreamSummary objects from the SDK don't directly expose the summary text. The actual content is stored in annotations.
Each summary has annotations, and we need to:
- Loop through the summary's annotations
- Find the one with type
SUMMARY - Fetch that annotation using
AnnotationApi - Extract the text content
Let's add a method to fetch annotation content:
// At the bottom of pieces_os_service.dart
/// Get the summary content from a workstream summary's annotations
Future<String?> getSummaryContent(WorkstreamSummary summary) async {
try {
// Loop through annotations to find the DESCRIPTION type
for (final annotationRef in summary.annotations?.indices.keys.toList() ?? []) {
// Fetch the full annotation using AnnotationApi
final annotation = await _annotationApi
.annotationSpecificAnnotationSnapshot(annotationRef);
if (annotation == null) {
continue;
}
// Check if this is a DESCRIPTION type annotation
if (annotation.type == AnnotationTypeEnum.SUMMARY) {
// Return the text content - this is the actual summary!
return annotation.text;
}
}
return null;
} catch (e) {
print('Error fetching annotation content for ${summary.id}: $e');
return null;
}
}
The SDK separates metadata (ID, timestamp) from content (stored in annotations).
Step 6: Creating a Proper Data Structure
Let's create a proper class for summaries with their content. Add this to lib/models/daily_recap_models.dart:
/// Data class to hold a summary with its content
class SummaryWithContent {
final String id;
final String title;
final String content; // The actual summary text!
final DateTime timestamp;
SummaryWithContent({
required this.id,
required this.title,
required this.content,
required this.timestamp,
});
}
Now, let's add a convenient method to get summaries with their content:
// Don't forget to import the model that we just created
import 'package:daily_recap_app/models/daily_recap_models.dart';
// Below existing code in pieces_os_straervice.dart
/// Get summaries with their content for a specific day
Future<List<SummaryWithContent>> getSummariesWithContentForDay(
DateTime date,
) async {
final summaries = getSummariesForDay(date);
final List<SummaryWithContent> summariesWithContent = [];
for (final summary in summaries) {
final content = await getSummaryContent(summary);
if (content != null && content.isNotEmpty) {
summariesWithContent.add(
SummaryWithContent(
id: summary.id,
title: summary.name,
content: content, // Actual text from annotation!
timestamp: summary.created.value,
),
);
}
}
return summariesWithContent;
}
Perfect! Now we have rich, typed data ready for AI processing and UI display.
Part 1 Summary:
- Connect to PiecesOS
- Open WebSocket for real-time updates
- Fetch and cache summaries
- Group by day
- Stream updates to UI
- Extract summary content from annotations
What is created so far lib/services/pieces_os_service.dart
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:pieces_os_client/api.dart';
import 'package:pieces_os_client/api_client.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:daily_recap_app/models/daily_recap_models.dart';
/// Service to interact with Pieces OS for LTM (Long Term Memory) tracking
class PiecesOSService {
static const String baseUrl = 'http://localhost:39300';
static const String websocketUrl = 'ws://localhost:39300';
// Used to register application
late final ConnectorApi _connectorApi;
// Used to retrieve the workstream summaries
late final WorkstreamSummaryApi _workstreamSummaryApi;
// Used to retrieve the summary content
late final AnnotationApi _annotationApi;
final StreamController<Map<DateTime, List<WorkstreamSummary>>>
_summariesStreamController = StreamController.broadcast();
Stream<Map<DateTime, List<WorkstreamSummary>>> get summariesStream =>
_summariesStreamController.stream;
// Track initial WebSocket sync
bool _isInitialSyncComplete = false;
Completer<void>? _initialSyncCompleter;
PiecesOSService() {
final client = ApiClient(basePath: baseUrl);
_connectorApi = ConnectorApi(client);
_workstreamSummaryApi = WorkstreamSummaryApi(client);
_annotationApi = AnnotationApi(client);
}
Future<Application> connectApplication() async {
final seededApp = SeededTrackedApplication(
name: ApplicationNameEnum.OPEN_SOURCE, // Hey Pieces, I'm open source!
platform: Platform.operatingSystem == "macos"
? PlatformEnum.MACOS
: PlatformEnum.WINDOWS,
version: "0.0.1",
);
final connection = SeededConnectorConnection(
application: seededApp,
);
_context = await _connectorApi.connect(
seededConnectorConnection: connection,
);
print('Successfully connected to Pieces OS!');
return _context!.application;
}
/// Wait for the initial WebSocket sync to complete
/// Only blocks on the first call, returns immediately after
Future<void> waitForInitialSync() async {
// If already synced, return immediately
if (_isInitialSyncComplete) {
return;
}
// If sync is in progress, wait for it
if (_initialSyncCompleter != null) {
return _initialSyncCompleter!.future;
}
// Start waiting for first sync
_initialSyncCompleter = Completer<void>();
return _initialSyncCompleter!.future;
}
void _startWebSocketListener() {
final wsUrl = '$websocketUrl/workstream_summaries/stream/identifiers';
_wsChannel = WebSocketChannel.connect(Uri.parse(wsUrl));
print('WebSocket connected to $wsUrl');
_wsSubscription = _wsChannel!.stream.listen(
(message) async {
try {
// Important: WebSocket sends JSON strings, decode them first!
final streamedIdentifiers = StreamedIdentifiers.fromJson(
jsonDecode(message),
);
// Loop through each identifier we received
for (final id in streamedIdentifiers!.iterable) {
final summaryId = id.workstreamSummary?.id;
if (summaryId != null) {
await _fetchAndCacheSummary(summaryId);
}
}
// Mark initial sync as complete after first message
if (!_isInitialSyncComplete) {
_isInitialSyncComplete = true;
_initialSyncCompleter?.complete();
print('Initial WebSocket sync complete!');
}
} catch (e) {
print('Error processing message: $e');
}
},
onDone: () {
print('WebSocket closed, reconnecting...');
_reconnectWebSocket();
},
onError: (error) {
print('WebSocket error: $error');
_reconnectWebSocket();
},
);
}
Future<void> _fetchAndCacheSummary(String identifier) async {
final summary = await _workstreamSummaryApi
.workstreamSummariesSpecificWorkstreamSummarySnapshot(identifier);
if (summary == null) {
return;
}
// Normalize to day (remove time)
final createdDate = summary.created.value;
final dayKey = DateTime(
createdDate.year,
createdDate.month,
createdDate.day,
);
// Create day entry if it doesn't exist
if (!_summariesByDay.containsKey(dayKey)) {
_summariesByDay[dayKey] = [];
}
// Check for duplicates (avoid adding the same summary twice)
final existingIndex = _summariesByDay[dayKey]!
.indexWhere((s) => s.id == summary.id);
if (existingIndex != -1) {
// Update existing
_summariesByDay[dayKey]![existingIndex] = summary;
} else {
// Add new
_summariesByDay[dayKey]!.add(summary);
}
// Sort by time (most recent first)
_summariesByDay[dayKey]!.sort((a, b) {
return b.created.value.compareTo(a.created.value);
});
// Notify anyone listening
_summariesStreamController.add(Map.unmodifiable(_summariesByDay));
print('Cached summary ${summary.id} for day $dayKey');
}
/// Get summaries for a specific day from cache
List<WorkstreamSummary> getSummariesForDay(DateTime date) {
final dayKey = DateTime(date.year, date.month, date.day);
return _summariesByDay[dayKey] ?? [];
}
/// Get all days that have summaries (sorted most recent first)
List<DateTime> getDaysWithSummaries() {
final days = _summariesByDay.keys.toList();
days.sort((a, b) => b.compareTo(a)); // Most recent first
return days;
}
/// Get ries with their content for a specific day
Future<List<SummaryWithContent>> getSummariesWithContentForDay(
DateTime date,
) async {
final summaries = getSummariesForDay(date);
final List<SummaryWithContent> summariesWithContent = [];
for (final summary in summaries) {
final content = await getSummaryContent(summary);
if (content != null && content.isNotEmpty) {
summariesWithContent.add(
SummaryWithContent(
id: summary.id,
title: summary.name,
content: content,
timestamp: summary.created.value,
),
);
}
}
return summariesWithContent;
}
}
Reference GitHub for viewing the full project.

Top comments (0)