DEV Community

Cover image for Building Daily Stand-Up Generator using Pieces API - Part 1: The SDK overview
Pieces 🌟
Pieces 🌟

Posted on

Building Daily Stand-Up Generator using Pieces API - Part 1: The SDK overview

- written by Bishoy Hany

The Monday Morning Problem

Imagine the following scenario: Monday, 9 AM. You grab your coffee, join the stand-up call, and suddenly realise: you have absolutely no idea what you did last week.
The weekend wiped your mental cache clean. Was it the React component refactor? Or was that two weeks ago? You vaguely remember fighting with TypeScript errors on Thursday, but what was the actual solution? Your Git commits say 'fix: update logic'—thanks, past you, very helpful.

Here's the thing: your brain isn't built to be a perfect activity log. But your computer? It remembers everything. That's where PiecesOS comes in.

The Goal

Create an app that tracks what work is done every day.

For Part 1, we will work on:

  • Connecting to PiecesOS
  • Getting our workstream summaries
  • Grouping the summaries by day
  • Keeping everything updated in real-time

We’ll hit a UI later on. Let’s take this one step at a time!

Prerequisites

Dart SDK version 3.8.1 or higher (flutter.dev)
Gemini API key (will mention that in part 2

Setting Up

Let’s begin by creating a new flutter app

flutter create daily_recap_app
Enter fullscreen mode Exit fullscreen mode

Before you can start using the Pieces SDK, you’ll need to add it to your Dart or Flutter project. Open your project’s pubspec.yaml file and add the following under ‘dependencies:’:

dependencies:
  # The Pieces SDK
  pieces_os_client:
    git:
      url: https://github.com/pieces-app/pieces-os-client-sdk-for-dart.git

  web_socket_channel: ^3.0.1
Enter fullscreen mode Exit fullscreen mode

The SDK is automatically generated from our API setup, so it’s mostly made up of code that helps your app talk to Pieces’ api.
We also added the web_socket_channel dependency to connect to the Pieces web-socket

Step 1: Connecting to PiecesOS

Alright, first challenge – how do we even talk to PiecesOS?

Let's first create a new file lib/services/pieces_os_service.dart

import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:pieces_os_client/api.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

class PiecesOSService {
  // Super important Pieces sometimes could run on other ports so you should implement your own port-finding algo
  // here is example using the python sdk
  // https://github.com/pieces-app/pieces-os-client-sdk-for-python/blob/db1fa83f21407323b98d21507a59d1e4f176d064/src/pieces_os_client/wrapper/client.py#L139-L168
  // We gonna use 39300 for simplicity and PiecesOS will be running on port 39300 for most of the time as well
  static const String baseUrl = 'http://localhost:39300';
  static const String websocketUrl = 'ws://localhost:39300';

  // Used to register application
  late final ConnectorApi _connectorApi;
  // Used to retrieve the workstream summaries
  late final WorkstreamSummaryApi _workstreamSummaryApi;

  // Used to retrieve the summary content
  late final AnnotationApi _annotationApi;

  // Cache for workstream summaries grouped by day
  final Map<DateTime, List<WorkstreamSummary>> _summariesByDay = {};

  PiecesOSService() {
    final client = ApiClient(basePath: baseUrl);
    _connectorApi = ConnectorApi(client);
    _workstreamSummaryApi = WorkstreamSummaryApi(client);
    _annotationApi = AnnotationApi(client);
  }
}

Enter fullscreen mode Exit fullscreen mode

Nothing fancy – just setting up our API clients.

Now the fun part – actually connecting:

 // existing code from above
  Future<Application> connectApplication() async {
    final seededApp = SeededTrackedApplication(
      name: ApplicationNameEnum.OPEN_SOURCE,  // Hey Pieces, I'm open source!
      platform: Platform.operatingSystem == "macos"
          ? PlatformEnum.MACOS
          : PlatformEnum.WINDOWS,
      version: "0.0.1",
    );

    final connection = SeededConnectorConnection(
      application: seededApp,
    );

    _context = await _connectorApi.connect(
      seededConnectorConnection: connection,
    );

    print('Successfully connected to Pieces OS!');
    return _context!.application;
  }

Enter fullscreen mode Exit fullscreen mode

Basically, we're saying, "Hey Pieces, I'm a new app, let me in!" and it gives us back a context with our application info.

Step 2: The WebSocket

This websocket specifically sends us all of the IDs for the users' workstream summaries in Pieces upon first connection. After, it sends only the IDs of either newly created summaries, summaries that have updated data, or summaries that were deleted.

Here's the setup:

  // existing code from above
  void _startWebSocketListener() {
    final wsUrl = '$websocketUrl/workstream_summaries/stream/identifiers';
    _wsChannel = WebSocketChannel.connect(Uri.parse(wsUrl));

    print('WebSocket connected to $wsUrl');

    _wsSubscription = _wsChannel!.stream.listen(
      (message) async {
        try {
          // Important: WebSocket sends JSON strings, decode them first!
          final streamedIdentifiers = StreamedIdentifiers.fromJson(
            jsonDecode(message),
          );

          // Loop through each identifier we received
          for (final id in streamedIdentifiers!.iterable) {
            final summaryId = id.workstreamSummary?.id;
            if (summaryId != null) {
              await _fetchAndCacheSummary(summaryId);
            }
          }
        } catch (e) {
          print('Error processing message: $e');
        }
      },
      onDone: () {
        print('WebSocket closed, reconnecting...');
        _reconnectWebSocket();
      },
      onError: (error) {
        print('WebSocket error: $error');
        _reconnectWebSocket();
      },
    );
  }

Enter fullscreen mode Exit fullscreen mode

Here's the beauty of this approach: The WebSocket just keeps until their app shuts down or they manually close it, sending us identifiers. When we first connect, it dumps ALL existing identifiers on us (could be from a year ago!) and stays open to send us new ones as they're created in real-time. One connection handles everything!
Step 3: Fetching and Caching Summaries
Okay, so we have identifiers. Now what? We need to actually fetch the summary data and organize it.

Here's what a WorkstreamSummary looks like (the important parts anyway):

// This is a model defined in the sdks don't add any thing to the code yet.

class WorkstreamSummary {
  String id;              // Unique identifier
  String? name;           // Optional name/title
  GroupedTimestamp created;  // When it was created
  GroupedTimestamp? updated; // When it was last updated
  // ... and a bunch of other fields
}

Enter fullscreen mode Exit fullscreen mode

And GroupedTimestamp is basically:

// This is a model defined in the sdks don't add any thing to the code yet.

class GroupedTimestamp {
  DateTime value;  // The actual timestamp
}

Enter fullscreen mode Exit fullscreen mode

So when we fetch a summary, we need to:

  • Get the created date
  • Strip the time from the date
  • Add the summary to that day's list
  • Sort everything by time

Here's the code:

 // existing code from above
  Future<void> _fetchAndCacheSummary(String identifier) async {
    final summary = await _workstreamSummaryApi
        .workstreamSummariesSpecificWorkstreamSummarySnapshot(identifier);

    if (summary == null) {
      return;
    }
    // Normalize to day (remove time)
    final createdDate = summary.created.value;
    final dayKey = DateTime(
      createdDate.year,
      createdDate.month,
      createdDate.day,
    );

    // Create day entry if it doesn't exist
    _summariesByDay.putIfAbsent(dayKey, () => []);

    // Check for duplicates (avoid adding the same summary twice)
    final existingIndex = _summariesByDay[dayKey]!
        .indexWhere((s) => s.id == summary.id);

    if (existingIndex != -1) {
      // Update existing
      _summariesByDay[dayKey]![existingIndex] = summary;
    } else {
      // Add new
      _summariesByDay[dayKey]!.add(summary);
    }

    // Sort by time (most recent first)
    _summariesByDay[dayKey]!.sort((a, b) {
      return b.created.value.compareTo(a.created.value);
    });

    // Notify anyone listening
    _summariesStreamController.add(Map.unmodifiable(_summariesByDay));

    print('Cached summary ${summary.id} for day $dayKey');
  }

Enter fullscreen mode Exit fullscreen mode

The _summariesByDay is just aMap<DateTime, List<WorkstreamSummary>> where:

  • Key: A DateTime set to midnight (e.g., Nov 4, 2025 at 00:00:00)
  • Value: List of all summaries for that day, sorted by time

Let's also add some useful methods to retrieve a summary

 // existing code
  /// Get summaries for a specific day from cache
  List<WorkstreamSummary> getSummariesForDay(DateTime date) {
    final dayKey = DateTime(date.year, date.month, date.day);
    return _summariesByDay[dayKey] ?? [];
  }

  /// Get all days that have summaries (sorted most recent first)
  List<DateTime> getDaysWithSummaries() {
    final days = _summariesByDay.keys.toList();
    days.sort((a, b) => b.compareTo(a)); // Most recent first
    return days;
  }

Enter fullscreen mode Exit fullscreen mode

Step 4: Keeping Everything in Sync
Speaking of streams – let's add a broadcast stream that others can listen to:

// existing code
final StreamController<Map<DateTime, List<WorkstreamSummary>>>
    _summariesStreamController = StreamController.broadcast();

Stream<Map<DateTime, List<WorkstreamSummary>>> get summariesStream =>
    _summariesStreamController.stream;

Enter fullscreen mode Exit fullscreen mode

So in the UI (when we build it), we can just do:

service.summariesStream.listen((summaries) {
  print('Got new data!');
  // Update UI here
});

Enter fullscreen mode Exit fullscreen mode

Step 4.5: Waiting for Initial WebSocket Sync

Here's a problem: when the app first starts, the WebSocket dumps all existing summaries on us. This could be hundreds of identifiers! If we try to show the UI or generate a recap before they're all loaded, we'll have incomplete data.

But we only need to wait once – on the first message. After that, the WebSocket just sends new updates in real-time and we don't want to block.

Here's the solution using a Completer:

Step 1: Add Fields and Wait Method

Place this code block right after the PiecesOSService() constructor and before the connectApplication() method:

// Add these fields to the class
Completer<void>? _initialSyncCompleter;
bool get _isInitialSyncComplete =>
      _initialSyncCompleter?.isCompleted ?? false;
/// Wait for the initial WebSocket sync to complete
/// Only blocks on the first call, returns immediately after
Future<void> waitForInitialSync() async {
  // If already synced, return immediately
  if (_isInitialSyncComplete) {
    return;
  }

  // If sync is in progress, wait for it
  if (_initialSyncCompleter != null) {
    return _initialSyncCompleter!.future;
  }

  // Start waiting for first sync
  _initialSyncCompleter = Completer<void>();

  return _initialSyncCompleter!.future;
}

Enter fullscreen mode Exit fullscreen mode

Step 2: Update WebSocket Listener

Replace _wsSubscription with this new section:

_wsSubscription = _wsChannel!.stream.listen(
  (message) async {
    try {
      final streamedIdentifiers = StreamedIdentifiers.fromJson(
        jsonDecode(message),
      );

      // Process all identifiers
      for (final id in streamedIdentifiers!.iterable) {
        final summaryId = id.workstreamSummary?.id;
        if (summaryId != null) {
          await _fetchAndCacheSummary(summaryId);
        }
      }

      // Mark initial sync as complete after first message
      if (!_isInitialSyncComplete) {
        _isInitialSyncComplete = true;
        _initialSyncCompleter?.complete();
        print('Initial WebSocket sync complete!');
      }
    } catch (e) {
      print('Error processing message: $e');
    }
  },
  // ... onDone, onError handlers
);

Enter fullscreen mode Exit fullscreen mode

Step 3: Use in App Initialization

Place this code in your app's entry point (typically lib/main.dart, create the file if you don't have it):

import 'services/pieces_os_service.dart';

Future<void> main() async {
  final service = PiecesOSService();

  await service.connectApplication();
  service.startWebSocketListener();

  // Wait for all existing summaries to load
  await service.waitForInitialSync();

  // Now we can safely generate recaps or show UI!
  print('Ready to go! All summaries loaded.');
}

Enter fullscreen mode Exit fullscreen mode

Step 5: Accessing the Actual Summary Content

WorkstreamSummary objects from the SDK don't directly expose the summary text. The actual content is stored in annotations.

Each summary has annotations, and we need to:

  1. Loop through the summary's annotations
  2. Find the one with type SUMMARY
  3. Fetch that annotation using AnnotationApi
  4. Extract the text content

Let's add a method to fetch annotation content:

// At the bottom of pieces_os_service.dart

/// Get the summary content from a workstream summary's annotations
Future<String?> getSummaryContent(WorkstreamSummary summary) async {
  try {
    // Loop through annotations to find the DESCRIPTION type
    for (final annotationRef in summary.annotations?.indices.keys.toList() ?? []) {
      // Fetch the full annotation using AnnotationApi
      final annotation = await _annotationApi
          .annotationSpecificAnnotationSnapshot(annotationRef);

      if (annotation == null) {
        continue;
      }
      // Check if this is a DESCRIPTION type annotation
      if (annotation.type == AnnotationTypeEnum.SUMMARY) {
        // Return the text content - this is the actual summary!
        return annotation.text;
      }
    }

    return null;
  } catch (e) {
    print('Error fetching annotation content for ${summary.id}: $e');
    return null;
  }
}

Enter fullscreen mode Exit fullscreen mode

The SDK separates metadata (ID, timestamp) from content (stored in annotations).

Step 6: Creating a Proper Data Structure

Let's create a proper class for summaries with their content. Add this to lib/models/daily_recap_models.dart:

/// Data class to hold a summary with its content
class SummaryWithContent {
  final String id;
  final String title;
  final String content;        // The actual summary text!
  final DateTime timestamp;

  SummaryWithContent({
    required this.id,
    required this.title,
    required this.content,
    required this.timestamp,
  });
}

Enter fullscreen mode Exit fullscreen mode

Now, let's add a convenient method to get summaries with their content:

// Don't forget to import the model that we just created
import 'package:daily_recap_app/models/daily_recap_models.dart';

// Below existing code in pieces_os_straervice.dart

/// Get summaries with their content for a specific day
Future<List<SummaryWithContent>> getSummariesWithContentForDay(
  DateTime date,
) async {
  final summaries = getSummariesForDay(date);
  final List<SummaryWithContent> summariesWithContent = [];

  for (final summary in summaries) {
    final content = await getSummaryContent(summary);
    if (content != null && content.isNotEmpty) {
      summariesWithContent.add(
        SummaryWithContent(
          id: summary.id,
          title: summary.name,
          content: content,              // Actual text from annotation!
          timestamp: summary.created.value,
        ),
      );
    }
  }

  return summariesWithContent;
}

Enter fullscreen mode Exit fullscreen mode

Perfect! Now we have rich, typed data ready for AI processing and UI display.

Part 1 Summary:

  1. Connect to PiecesOS
  2. Open WebSocket for real-time updates
  3. Fetch and cache summaries
  4. Group by day
  5. Stream updates to UI
  6. Extract summary content from annotations

What is created so far lib/services/pieces_os_service.dart

import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:pieces_os_client/api.dart';
import 'package:pieces_os_client/api_client.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:daily_recap_app/models/daily_recap_models.dart';

/// Service to interact with Pieces OS for LTM (Long Term Memory) tracking
class PiecesOSService {
  static const String baseUrl = 'http://localhost:39300';
  static const String websocketUrl = 'ws://localhost:39300';

  // Used to register application
  late final ConnectorApi _connectorApi;
  // Used to retrieve the workstream summaries
  late final WorkstreamSummaryApi _workstreamSummaryApi;

  // Used to retrieve the summary content
  late final AnnotationApi _annotationApi;

  final StreamController<Map<DateTime, List<WorkstreamSummary>>>
      _summariesStreamController = StreamController.broadcast();

  Stream<Map<DateTime, List<WorkstreamSummary>>> get summariesStream =>
      _summariesStreamController.stream;

  // Track initial WebSocket sync
  bool _isInitialSyncComplete = false;
  Completer<void>? _initialSyncCompleter;

  PiecesOSService() {
    final client = ApiClient(basePath: baseUrl);
    _connectorApi = ConnectorApi(client);
    _workstreamSummaryApi = WorkstreamSummaryApi(client);
    _annotationApi = AnnotationApi(client);
  }

  Future<Application> connectApplication() async {
    final seededApp = SeededTrackedApplication(
      name: ApplicationNameEnum.OPEN_SOURCE,  // Hey Pieces, I'm open source!
      platform: Platform.operatingSystem == "macos"
          ? PlatformEnum.MACOS
          : PlatformEnum.WINDOWS,
      version: "0.0.1",
    );

    final connection = SeededConnectorConnection(
      application: seededApp,
    );

    _context = await _connectorApi.connect(
      seededConnectorConnection: connection,
    );

    print('Successfully connected to Pieces OS!');
    return _context!.application;
  }

  /// Wait for the initial WebSocket sync to complete
  /// Only blocks on the first call, returns immediately after
  Future<void> waitForInitialSync() async {
    // If already synced, return immediately
    if (_isInitialSyncComplete) {
      return;
    }

    // If sync is in progress, wait for it
    if (_initialSyncCompleter != null) {
      return _initialSyncCompleter!.future;
    }

    // Start waiting for first sync
    _initialSyncCompleter = Completer<void>();
    return _initialSyncCompleter!.future;
  }

  void _startWebSocketListener() {
    final wsUrl = '$websocketUrl/workstream_summaries/stream/identifiers';
    _wsChannel = WebSocketChannel.connect(Uri.parse(wsUrl));

    print('WebSocket connected to $wsUrl');

    _wsSubscription = _wsChannel!.stream.listen(
      (message) async {
        try {
          // Important: WebSocket sends JSON strings, decode them first!
          final streamedIdentifiers = StreamedIdentifiers.fromJson(
            jsonDecode(message),
          );

          // Loop through each identifier we received
          for (final id in streamedIdentifiers!.iterable) {
            final summaryId = id.workstreamSummary?.id;
            if (summaryId != null) {
              await _fetchAndCacheSummary(summaryId);
            }
          }

          // Mark initial sync as complete after first message
          if (!_isInitialSyncComplete) {
            _isInitialSyncComplete = true;
            _initialSyncCompleter?.complete();
            print('Initial WebSocket sync complete!');
          }
        } catch (e) {
          print('Error processing message: $e');
        }
      },
      onDone: () {
        print('WebSocket closed, reconnecting...');
        _reconnectWebSocket();
      },
      onError: (error) {
        print('WebSocket error: $error');
        _reconnectWebSocket();
      },
    );
  }

  Future<void> _fetchAndCacheSummary(String identifier) async {
    final summary = await _workstreamSummaryApi
        .workstreamSummariesSpecificWorkstreamSummarySnapshot(identifier);

    if (summary == null) {
      return;
    }
    // Normalize to day (remove time)
    final createdDate = summary.created.value;
    final dayKey = DateTime(
      createdDate.year,
      createdDate.month,
      createdDate.day,
    );

    // Create day entry if it doesn't exist
    if (!_summariesByDay.containsKey(dayKey)) {
      _summariesByDay[dayKey] = [];
    }

    // Check for duplicates (avoid adding the same summary twice)
    final existingIndex = _summariesByDay[dayKey]!
        .indexWhere((s) => s.id == summary.id);

    if (existingIndex != -1) {
      // Update existing
      _summariesByDay[dayKey]![existingIndex] = summary;
    } else {
      // Add new
      _summariesByDay[dayKey]!.add(summary);
    }

    // Sort by time (most recent first)
    _summariesByDay[dayKey]!.sort((a, b) {
      return b.created.value.compareTo(a.created.value);
    });

    // Notify anyone listening
    _summariesStreamController.add(Map.unmodifiable(_summariesByDay));

    print('Cached summary ${summary.id} for day $dayKey');
  }

  /// Get summaries for a specific day from cache
  List<WorkstreamSummary> getSummariesForDay(DateTime date) {
    final dayKey = DateTime(date.year, date.month, date.day);
    return _summariesByDay[dayKey] ?? [];
  }

  /// Get all days that have summaries (sorted most recent first)
  List<DateTime> getDaysWithSummaries() {
    final days = _summariesByDay.keys.toList();
    days.sort((a, b) => b.compareTo(a)); // Most recent first
    return days;
  }

  /// Get ries with their content for a specific day
  Future<List<SummaryWithContent>> getSummariesWithContentForDay(
    DateTime date,
  ) async {
    final summaries = getSummariesForDay(date);
    final List<SummaryWithContent> summariesWithContent = [];

    for (final summary in summaries) {
      final content = await getSummaryContent(summary);
      if (content != null && content.isNotEmpty) {
        summariesWithContent.add(
          SummaryWithContent(
            id: summary.id,
            title: summary.name,
            content: content,
            timestamp: summary.created.value,
          ),
        );
      }
    }

    return summariesWithContent;
  }
}


Enter fullscreen mode Exit fullscreen mode

Reference GitHub for viewing the full project.

Top comments (0)