DEV Community

Cover image for DO cross the streams! Introducing Combine Support in Ditto - build complex workflows from multiple reactive streams
Ryan Ratner for Ditto

Posted on • Originally published at ditto.live

DO cross the streams! Introducing Combine Support in Ditto - build complex workflows from multiple reactive streams

Historically, iOS developers had to use delegates, polling timers, notification centers, and callbacks to build reactive architectures. While these techniques are useful for simple callbacks, they falter when dealing with multiple events in robust event-driven apps. Creating complex chains, combinations, and permutations of multiple reactive streams is incredibly difficult. Thus, reactive libraries like the open source Rx suite 1 and Apple's Combine were created to solve this exact issue.

Since version 1.1.1 we have several Combine.Publisher extension methods on our long-running callback APIs. Notably:

These new extension methods make it easy to use reactive techniques in your iOS or macOS apps without an additional library. We consider these APIs stable: they are available for iOS versions 13.0+ and macOS 10.15+. Of all the extension methods, you'll probably use LiveQueryPublisher the most. This Publisher is a method on the DittoPendingCursorOperation and facilitates most of the synchronization behavior of your app. Here's an example:

let cancellables = Set<AnyCancellables>()

// observe all documents in a collection
ditto.store["cars"]
  .findAll()
  .liveQueryPublisher()
  .sink { (documents, event)
    // do something with documents and events
  }
  .store(in: &cancellables)

// or you can observe documents matching a query
ditto.store["cars"]
  .find("color == $args.color && mileage > $args.mileage", args: [ "color": "red", "mileage": 5000])
  .sort("mileage")
  .limit(50)
  .liveQueryPublisher()
  .sink { (documents, event)
    // do something with documents and events
  }
  .store(in: &cancellables)
Enter fullscreen mode Exit fullscreen mode

Stopping the live query is identical to stopping the publisher:

cancellable = nil // if you have a single cancellable, stop it by setting it to nil

cancellables.removeAll() // removeAll will stop all attached publishers and their respective live queries
Enter fullscreen mode Exit fullscreen mode

Often, our users love to use Codable with Ditto Documents. Since Codables can fail to decode for some reason (type mismatch is the most common example), you may want to handle each decode error individually as they're streamed out. The liveQueryPublisher gives each emission as an Array<DittoDocument> each time the query results change.

struct Car: Codable {
  var _id: String
  var name: String
  var mileage: Float
  var isSold: Bool
}

ditto.store["cars"]
  .findAll()
  .liveQueryPublisher()
  .flatMap({ (docs, _) in docs.publisher })
  .tryMap({ try $0.typed(as: Car.self).value })
  .sink(receiveCompletion: { (error) in
    print("Decoding a document failed: \(String(describing: error))")
  }, receiveValue: { (car: Car) in
    print("Successfully decoded a car \(Car)")
  })
Enter fullscreen mode Exit fullscreen mode

There may be times where your application would like to collect the final values of only successfully decoded Car objects. You may want this functionality if you're not interested in stale or malformed legacy documents. Here we use flatMap to turn the Publisher<[DittoDocument]> into a Publisher<DittoDocument> that will emit for each item in the array. Now for each of the single emitted DittoDocument we will use compactMap on the decoding function. Notice the try? here. If any Car fails to decode, this function will return a nil. compactMap will skip any nil values. At the end we will use collect to gather all of the emitted Car objects and repackage them into an Array.

let cancellable = ditto.store["cars"]
  .findAll()
  .liveQueryPublisher()
  .flatMap({ $0.documents.publisher })
  .compactMap({ try? $0.typed(as: Car.self).value })
  .collect()
  .sink { cars in
    print("Sucessfully decoded cars: \(cars). Failed to decode cars were removed from the array.")
  }
Enter fullscreen mode Exit fullscreen mode

SQL JOIN-like behavior with Ditto using Combine

A question that we get all the time is "How do I perform SQL-like JOINs with Ditto"? While Ditto's current interface can't handle relationships like a traditional SQL database, our Combine support can help us achieve the same effect. 2.

Let's say you're trying to build a menu view in SwiftUI like the first image shown below.

Combine menu

Image description

It's likely that you'll use a SwiftUI List with multiple Sections with the help of ForEach. Assume each document in their respective collection looks like the following:

// products
{
  "_id": "chicken-sandwich",
  "name": "Chicken Sandwich",
  "detail": "Grilled chicken, tomatoes, lettuce, and mustard",
  "categoryId": "entree"
}
Enter fullscreen mode Exit fullscreen mode
// categories
{
  "_id": "entrees",
  "name": "Main Courses and Entrees"
}
Enter fullscreen mode Exit fullscreen mode

We can create representable Codables for each Document type. Notice that we've added Identifiable to help ForEach iteration:

struct Category: Codable {
    var _id: String
    var name: String
    var isOnSale: Bool
}

extension Category: Identifiable {
    var id: String {
        return self._id
    }
}
Enter fullscreen mode Exit fullscreen mode
struct Product: Codable {
    var _id: String
    var name: String
    var detail: String
    var categoryId: String
}

extension Product: Identifiable {
    var id: String {
        return self._id
    }
}
Enter fullscreen mode Exit fullscreen mode

Notice that the product has a categoryId, this is our foreign key. Linking these foreign keys with our earlier APIs wasn't very straightforward. However, with our new Combine extensions, we can use the combineLatest function to emit a single callback for both the products and categories collections.

First we will need to create a JOIN-ed struct that will house our nested values:

struct CategorizedProducts {
  // This is the category
  var category: Category
  // This is the products filtered by the category above
  var products: [Product]
}

// We add `Identifiable` to help `ForEach` iteration.
// Since this is unique by inner category._id property, we ensure
// to return its value
extension CategorizedProducts: Identifiable {
  var id: String {
      return self.category._id
  }
}
Enter fullscreen mode Exit fullscreen mode

To populate CategorizedProducts we initially need to create our combineLatest implementation. First we will need to get access to both categories and products publishers. We use Codable to map documents into concrete data types in the .tryMap operator.

let categoriesPublisher = categoriesCollection.findAll().liveQueryPublisher()
    .tryMap({ try $0.documents.map({ try $0.typed(as: Category.self).value }) })

let productsPublisher = productsCollection.findAll().liveQueryPublisher()
    .tryMap({ try $0.documents.map({ try $0.typed(as: Product.self).value }) })
Enter fullscreen mode Exit fullscreen mode

Finally, we can combine the latest values of each publisher using .combineLatest and .map. In the .map function, we iterate over each category and use it to create a CategorizedProducts object and filter all products by the categoryId.

let cancellable = categoriesPublisher.combineLatest(productsPublisher)
    .map { (categories, products) in
        return categories.map({ category -> CategorizedProducts in
            let filteredProducts = products.filter { product in product.categoryId == category._id }
            return CategorizedProducts(category: category, products: filteredProducts)
        })
    }
    .sink { categorizedProducts in
      print("categories with their products", categorizedProducts)
    }
Enter fullscreen mode Exit fullscreen mode

If any update, insert, or deletions occur to the products or categories collection, you'll always get a new set of categorizedProducts. To show this menu we can iterate over each categorizedProducts in SwiftUI like so:

List {
  ForEach(viewModel.categorizedProducts) { categorizedProducts in
    Section(categorizedProducts.category.name) {
      ForEach(categorizedProducts.products) { product in
        VStack(alignment: .leading) {
          Text(product.name)
              .bold()
          Text(product.detail)
              .font(.caption)
        }
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

For an example project using this technique, checkout the source code on GitHub here.

Making SQL JOIN-like behavior more efficient

Using .combineLatest as a way to get a SQL JOIN-like can achieve both a reactive API as well as better management of relational models. However, it's important to know that this is just an approximation. Remember, Ditto live queries sync exactly what you tell it to sync from the mesh. The example above will sync all categories and all products. This behavior may be desirable for many use cases but let's take at some ways we can reduce what is synced over the mesh.

You can use the queries to limit what documents are of interest by specifying a more selective query. Let's say you're only interested in getting CategorizedProducts where the category is one of "appetizers", "entrees", or "desserts". We could now filter using a query with .find instead of .findAll on the categories collection.

let categoriesPublisher = categoriesCollection
    .find("contains([$args.categoryIds], _id)", args: ["categoryId": ["appetizers", "entrees", "desserts"]])
    .liveQueryPublisher()
    .tryMap({ try $0.documents.map({ try $0.typed(as: Category.self).value }) })

let productsPublisher = productsCollection
    .find("contains([$args.categoryIds], categoryIds)", args: ["categoryId": ["appetizers", "entrees", "desserts"]])
    .liveQueryPublisher()
    .tryMap({ try $0.documents.map({ try $0.typed(as: Product.self).value }) })

let cancellable = categoriesPublisher.combineLatest(productsPublisher)
    .map { (categories, products) in
        return categories.map({ category -> CategorizedProducts in
            let filteredProducts = products.filter { product in product.categoryId == category._id }
            return CategorizedProducts(category: category, products: filteredProducts)
        })
    }
    .sink { categorizedProducts in
      print("categories with their products where categoryId are appetizers, entrees, desserts", categorizedProducts)
    }
Enter fullscreen mode Exit fullscreen mode

Now the device will only sync relevant CategorizedProducts by the specified category _ids.

Advanced Scenarios for SQL JOIN-like behavior

Let's say we have a scenario where we only want to show CategorizedProducts where the Category.isOnSale == true. To do a SQL JOIN-like behavior with Ditto and Combine is not as straightforward as it was in the example above. This is because we are querying on a property that only exists on the categories collection. Previously we were querying on both the primary key Category._id and the foreign key Product.categoryId. To do this is a bit harder than most and requires a decent amount of understanding of all the Combine operators.

First we will need to create a live query of categories where the isOnSale == true.

let categoriesPublisher = categoriesCollection
    .find("isOnSale == true")
    .liveQueryPublisher()
    .tryMap({ try $0.documents.map({ try $0.typed(as: Category.self).value }) })
Enter fullscreen mode Exit fullscreen mode

Since we don't know the categories that are passed ahead of time to filter the products, we need to filter them after the values have returned from the categories live query publisher. Once we've received the categories that fit the isOnSale == true query, we can then create map it's result into an AnyPublisher<[CategorizedProducts], Never>. We use AnyPublisher<[CategorizedProducts], Never> for brevity so that the entire chain isn't convoluted with complex generics.

Once the category's publisher emits data, we retrieve an array of categoryIds: [String] to feed it to the products live query publisher by filtering on the categoryId foreign key property. Next, inside of the first categories publisher's map, we will use our .combineLatest technique to map and filter the CategorizedProducts.

The most important function in this chain is the switchToLatest right before the .sink. While it can be tricky for your eyes to follow, the switchToLatest will dispose of any publishers if the top level publisher changes. This is a critical operator because we absolutely want to dispose the product live queries if the categories change. Categories may change if a new isOnSale is added, an existing category that has isOnSale becomes false, or is deleted. Without switchToLatest we will get mismatched products from previous categories.

let categoriesPublisher = categoriesCollection
    .find("isOnSale == true")
    .liveQueryPublisher()
    .tryMap({ try $0.documents.map({ try $0.typed(as: Category.self).value }) })


categoriesPublisher
    .map({ categories -> AnyPublisher<[CategorizedProducts], Never> in
        // retrieve the categoryIds for all that were on sale.
        let categoryIds: [String] = categories.map{ $0._id }

        let productsPublisher = self.productsCollection
            .find("contains($args.categoryIds, categoryId)", args: ["categoryIds": categoryIds])
            .liveQueryPublisher()
            .tryMap({ try $0.documents.map({ try $0.typed(as: Product.self).value }) })
            .catch({ _ in Just([]) })
            .eraseToAnyPublisher()

        // we now create CategorizedProducts from the filtered categories and filtered products
        return Just(categories).combineLatest(productsPublisher)
            .map { (categories, products) -> [CategorizedProducts] in
                return categories.map({ category -> CategorizedProducts in
                    let filteredProducts = products.filter { product in product.categoryId == category._id }
                    return CategorizedProducts(category: category, products: filteredProducts)
                })
            }
            .eraseToAnyPublisher()
    })
    .switchToLatest() // extremely important so that we dispose of the products publisher if the categories change
    .catch({ _ in Just([]) })
    .sink { categorizedProducts in
      // completely filtered categorizedProducts
    }
Enter fullscreen mode Exit fullscreen mode

Now if any category is added, removed, or updated to match isOnSale == true, we will instantly retrieve a new set of CategorizedProducts with live queries specifically limited to the matched Category._id
As you can see this is very complex and really shows off the power of Combine with Ditto. We understand that the last example is a very complex and verbose code sample to achieve SQL JOIN-like behavior and we are working hard on adding native support directly within Ditto.

We're extremely excited to see all the new iOS and macOS applications that leverage Combine with Ditto. Building reactive applications has always been a core tenet of Ditto's design philosophy. Now with Combine, you can have incredible control over Ditto's live query system to build complex and robust event-driven apps.

Footnotes


  1. Rx is suite of libraries that really gained a tremendous foothold for designing Reactive streams. There are libraries for JavaScript - RxJS, Swift - RxSwift, Java - RxJava, Rx .NET - Reactive Extensions and more. If you're building Ditto applications beyond Swift, you should take a look at these libraries. Most of the operators available in Apple's Combine framework have equivalent functions. 

  2. We get so many requests for relationships, aggregates, syncing specific fields and more that we're planning on adding this functionality directly in an upcoming iteration of our Ditto query language. This will likely look like SQL but will have some changes to accommodate working with Ditto's distributed database semantics. 

Top comments (0)