DEV Community

guoliwu
guoliwu

Posted on

This Week(s) in DocArray

It’s already been a month since the last alpha release of DocArray v2. Since then a lot has happened: we’ve merged features that we’re really proud of and keep crying tears of joy and misery trying to coerce Python into doing what we want. If you want to learn about interesting Python edge cases or follow the advancement of DocArray v2 development then you’re at the right place in this dev blog!

For those who don’t know, DocArray is a library for representing, sending, and storing multi-modal data, with a focus on applications in ML and Neural Search. The project just moved to the Linux foundation AI and Data and to celebrate its first birthday we decided to rewrite it from scratch, mainly because of a design shift and a will to solidify the codebase from the ground up.

MultiModalDataset

As part of our goal to make DocArray the go-to library for representing, sending, and storing multi-modal data, we‘ve added a MultiModalDataset class to easily convert DocumentArrays into PyTorch Dataset compliant datasets that can be used in the PyTorch DataLoader.

All you need is a DocumentArray and a dictionary of preprocessing functions and you’re up and running!

from docarray import BaseDocument, DocumentArray
from docarray.data import MultiModalDataset
from docarray.documents import Text
from torch.utils.data import DataLoader

class Thesis(BaseDocument):
    title: Text

class Student(BaseDocument):
    thesis: Thesis

da: DocumentArray[Student] = get_students()
ds: MultiModalDataset[Student] = MultiModalDataset[Student](
    da,
    preprocessing={'thesis.title': embed_title, 'thesis': normalize_embedding},
)
loader: DataLoader = DataLoader(
    ds, batch_size=4, collate_fn=MultiModalDataset[Student].collate_fn
)

# Use your loader just like any other dataloader for awesome DL training
Enter fullscreen mode Exit fullscreen mode

If you’re interested in using DocArray for training, check out our example notebook, or take a peek at implementation details of MultiModalDataset.

TensorFlow support

After recently adding PyTorch support, we’ve now gone on to add TensorFlow support to DocArray v2. Like with PyTorch, we planned on subclassing the tensorflow.Tensor class with our TensorFlowTensor class. By doing so we want to allow DocArray to run operations on it while also being able to hand over our TensorFlowTensor instance to ML models or TensorFlow functions without TensorFlow being confused about this instance’s class but instead recognizing it as its own. Since we implemented this for PyTorch already, this should be easy, right?

But stop, not so fast. At first glance, TensorFlow tensors seem to be of class tf.Tensor, right?

import tensorflow as tf

tensor = tf.zeros((5,))
tensor
Enter fullscreen mode Exit fullscreen mode
<tf.Tensor: shape=(5,), dtype=float32, numpy=array([0., 0., 0., 0., 0.], dtype=float32)>
Enter fullscreen mode Exit fullscreen mode

When trying to subclass tf.Tensor though, we notice that this does not seem to work:

from typing import Any, Type, Union, cast

import tensorflow as tf
from docarray.typing.tensor.abstract_tensor import AbstractTensor
from pydantic.tools import parse_obj_as

class TensorFlowTensor(AbstractTensor, tf.Tensor):
    @classmethod
    def validate(cls, value, field, config) -> 'TensorFlowTensor':
        if isinstance(value, tf.Tensor):
            value.__class__ = cls
            return cast(TensorFlowTensor, value)
        else:
            raise ValueError(f'Expected a tf.Tensor, got {type(value)}')

our_tensor = parse_obj_as(TensorFlowTensor, tf.zeros((5,)))  # will fail
Enter fullscreen mode Exit fullscreen mode

Parsing a tf.Tensor as TensorFlowTensor will fail:

pydantic.error_wrappers.ValidationError: 1 validation error for ParsingModel[TensorFlowTensor]
__root__
  __class__ assignment: 'TensorFlowTensor' object layout differs from 'tensorflow.python.framework.ops.EagerTensor' (type=type_error)
Enter fullscreen mode Exit fullscreen mode

But wait, here they talk about an EagerTensor, not tf.Tensor. This is because TensorFlow actually supports eager execution and as well as graph execution. It defaults to eager execution, where operations are evaluated immediately. In graph execution, a computational graph is constructed for later evaluation.

So maybe we just need to extend TensorFlow’s EagerTensor then!

This, however, doesn’t work either, because the class EagerTensor is created on the fly, which is why trying to extend this class will fail with:

TypeError: type 'tensorflow.python.framework.ops.EagerTensor' is not an acceptable base type.

With all that being said, we’ve decided to go with the following solution for now:

Instead of extending TensorFlow’s tensor, we store a tf.Tensor instance as an attribute of our TensorFlowTensor class. Therefore if you want to perform operations on the tensor data or hand it over to your ML model, you have to explicitly access the .tensor attribute:

import tensorflow as tf
from docarray.typing import TensorFlowTensor

t = TensorFlowTensor(tensor=tf.zeros((224, 224)))

# tensorflow functions
broadcasted = tf.broadcast_to(t.tensor, (3, 224, 224))
broadcasted = tf.broadcast_to(t.unwrap(), (3, 224, 224))
broadcasted = tf.broadcast_to(t, (3, 224, 224))  # this will fail
Enter fullscreen mode Exit fullscreen mode

In future we plan to take a closer look and find a solution that enables handling TensorFlowTensors just like our TorchTensors. In particular, we plan to investigate if there’s an equivalent in TensorFlow to Torch’s __torch_function__(), which we told you about in the previous blog post. With such an equivalent and some tricks here and there we hope to enable smooth usage or our TensorFlowTensor class and make it feel like it’s a subclass of TensorFlow’s tensor, without it actually being one.

Nested class and multiprocessing

As part of our goal to make DocArray the go-to library for representing, sending, and storing multi-modal data, it’s important that DocumentArrays support multiprocessing, namely processing on multi-CPU cores.

In particular, we recently implemented a MultiModalDataset class to easily convert a DocumentArray into a dataset that can be used in the PyTorch DataLoader. The PyTorch DataLoader wraps the Python multiprocessing module to implement preprocessing with multiple CPUs.

The problem

One of the well-known issues with multiprocessing is that it doesn’t support classes that are declared inside a function:

def get_class():
    class B:
        ...

    return B

MyClass = get_class()

def foo(*args):
    return MyClass()

import multiprocessing as mp

with mp.get_context('fork').Pool(2) as p:
    print(p.map(foo, range(2)))
Enter fullscreen mode Exit fullscreen mode
Traceback (most recent call last):
  File "/Users/jackmin/Jina/docarray/meow.py", line 13, in <module>
    print(p.map(foo, range(2)))
  File "/Users/jackmin/miniconda3/envs/docarray/lib/python3.10/multiprocessing/pool.py", line 367, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Users/jackmin/miniconda3/envs/docarray/lib/python3.10/multiprocessing/pool.py", line 774, in get
    raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '[<__main__.get_class.<locals>.B object at 0x10152e950>]'. Reason: 'AttributeError("Can't pickle local object 'get_class.<locals>.B'")'
Enter fullscreen mode Exit fullscreen mode

Pickling

This is because multiprocessing uses pickle to share objects with workers. Pickling only saves the qualified class name of an object and unpickling requires re-importing the class by its qualified class name. For that to work, the class needs a global qualified name. Classes defined by functions are local and thus cannot be pickled:

def get_class():
    class B:
        ...

    return B

MyClass = get_class()

import pickle

pickle.dump(MyClass(), open('meow.pkl', 'wb'))
Enter fullscreen mode Exit fullscreen mode
Traceback (most recent call last):
  File "/Users/jackmin/Jina/docarray/meow.py", line 10, in <module>
    pickle.dump(MyClass(), open("meow.pkl", "wb"))
AttributeError: Can't pickle local object 'get_class.<locals>.B'
Enter fullscreen mode Exit fullscreen mode

In order to get around this, we need to make the declared class global:So maybe we just need to extend TensorFlow’s EagerTensor then!

This, however, doesn’t work either, because the class EagerTensor is created on the fly, which is why trying to extend this class will fail with:

TypeError: type 'tensorflow.python.framework.ops.EagerTensor' is not an acceptable base type.

With all that being said, we’ve decided to go with the following solution for now:

Instead of extending TensorFlow’s tensor, we store a tf.Tensor instance as an attribute of our TensorFlowTensor class. Therefore if you want to perform operations on the tensor data or hand it over to your ML model, you have to explicitly access the .tensor attribute:

import tensorflow as tf
from docarray.typing import TensorFlowTensor

t = TensorFlowTensor(tensor=tf.zeros((224, 224)))

# tensorflow functions
broadcasted = tf.broadcast_to(t.tensor, (3, 224, 224))
broadcasted = tf.broadcast_to(t.unwrap(), (3, 224, 224))
broadcasted = tf.broadcast_to(t, (3, 224, 224))  # this will fail
Enter fullscreen mode Exit fullscreen mode

In future we plan to take a closer look and find a solution that enables handling TensorFlowTensors just like our TorchTensors. In particular, we plan to investigate if there’s an equivalent in TensorFlow to Torch’s __torch_function__(), which we told you about in the previous blog post. With such an equivalent and some tricks here and there we hope to enable smooth usage or our TensorFlowTensor class and make it feel like it’s a subclass of TensorFlow’s tensor, without it actually being one.

Nested class and multiprocessing

As part of our goal to make DocArray the go-to library for representing, sending, and storing multi-modal data, it’s important that DocumentArrays support multiprocessing, namely processing on multi-CPU cores.

In particular, we recently implemented a MultiModalDataset class to easily convert a DocumentArray into a dataset that can be used in the PyTorch DataLoader. The PyTorch DataLoader wraps the Python multiprocessing module to implement preprocessing with multiple CPUs.

The problem

One of the well-known issues with multiprocessing is that it doesn’t support classes that are declared inside a function:

def get_class():
    class B:
        ...

    return B

MyClass = get_class()

def foo(*args):
    return MyClass()

import multiprocessing as mp

with mp.get_context('fork').Pool(2) as p:
    print(p.map(foo, range(2)))
Enter fullscreen mode Exit fullscreen mode
Traceback (most recent call last):
  File "/Users/jackmin/Jina/docarray/meow.py", line 13, in <module>
    print(p.map(foo, range(2)))
  File "/Users/jackmin/miniconda3/envs/docarray/lib/python3.10/multiprocessing/pool.py", line 367, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Users/jackmin/miniconda3/envs/docarray/lib/python3.10/multiprocessing/pool.py", line 774, in get
    raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '[<__main__.get_class.<locals>.B object at 0x10152e950>]'. Reason: 'AttributeError("Can't pickle local object 'get_class.<locals>.B'")'
Enter fullscreen mode Exit fullscreen mode

Pickling

This is because multiprocessing uses pickle to share objects with workers. Pickling only saves the qualified class name of an object and unpickling requires re-importing the class by its qualified class name. For that to work, the class needs a global qualified name. Classes defined by functions are local and thus cannot be pickled:

def get_class():
    class B:
        ...

    return B

MyClass = get_class()

import pickle

pickle.dump(MyClass(), open('meow.pkl', 'wb'))
Enter fullscreen mode Exit fullscreen mode
Traceback (most recent call last):
  File "/Users/jackmin/Jina/docarray/meow.py", line 10, in <module>
    pickle.dump(MyClass(), open("meow.pkl", "wb"))
AttributeError: Can't pickle local object 'get_class.<locals>.B'
Enter fullscreen mode Exit fullscreen mode

In order to get around this, we need to make the declared class global:

def get_class():
    global B

    class B:
        ...

    return B

MyClass = get_class()

import pickle

pickle.dump(MyClass(), open('meow.pkl', 'wb'))
Enter fullscreen mode Exit fullscreen mode

We can now load the pickles in a separate process as long as the process has a declaration of our class:

def get_class():
    global B

    class B:
        ...

    return B

MyClass = get_class()

import pickle

print(pickle.load(open('meow.pkl', 'rb')))
Enter fullscreen mode Exit fullscreen mode

It doesn’t really matter how it ends up in the global scope. We can even do this:

class B:
    ...

import pickle

print(pickle.load(open('meow.pkl', 'rb')))
Enter fullscreen mode Exit fullscreen mode

The fix?

Ok. It just wants it to be global. Simple enough right? Let’s just plop global in front of our declaration and be done with it.

def get_class():
    global B

    class B:
        ...

    return B

MyClass = get_class()

def foo(*args):
    return MyClass()

import multiprocessing as mp

with mp.get_context('fork').Pool(2) as p:
    print(p.map(foo, range(2)))
Enter fullscreen mode Exit fullscreen mode

Yay this runs fine. But, what if our function returns a different class depending on the input arguments? I mean, why else would I want to return a class from a function?

def get_class(version: int):
    global B

    class B:
        VERSION: int = version

    return B

C1 = get_class(1)
C2 = get_class(2)

def get_version(cls):
    print(cls)
    return cls.VERSION

import multiprocessing as mp

with mp.get_context('fork').Pool(2) as p:
    print(p.map(get_version, [C1, C2]))
Enter fullscreen mode Exit fullscreen mode
<class '__main__.B'>
Traceback (most recent call last):
  File "/Users/jackmin/Jina/docarray/meow.py", line 19, in <module>
    print(p.map(get_version, [C1, C2]))
  File "/Users/jackmin/miniconda3/envs/docarray/lib/python3.10/multiprocessing/pool.py", line 367, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Users/jackmin/miniconda3/envs/docarray/lib/python3.10/multiprocessing/pool.py", line 774, in get
    raise self._value
  File "/Users/jackmin/miniconda3/envs/docarray/lib/python3.10/multiprocessing/pool.py", line 540, in _handle_tasks
    put(task)
  File "/Users/jackmin/miniconda3/envs/docarray/lib/python3.10/multiprocessing/connection.py", line 211, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/Users/jackmin/miniconda3/envs/docarray/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '__main__.B'>: it's not the same object as __main__.B
Enter fullscreen mode Exit fullscreen mode

Can't pickle <class '__main__.B'>: it's not the same object as __main__.B. What does that mean?

Double declaration

Well, our little trick has some caveats. By performing a global declaration, we’re essentially taking the class declaration out into the top-level scope. This means we’re essentially doing this:

class B:
    VERSION: int = 1

C1 = B

class B:
    VERSION: int = 2

C2 = B

def get_version(cls):
    print(cls)
    return cls.VERSION

import multiprocessing as mp

with mp.get_context('fork').Pool(2) as p:
    print(p.map(get_version, [C1, C2]))
Enter fullscreen mode Exit fullscreen mode

If we run this code, we get the exact same error we got before:

<class '__main__.B'>
Traceback (most recent call last):
  File "/Users/jackmin/Jina/docarray/wow.py", line 15, in <module>
    print(p.map(get_version, [C1, C2]))
  File "/Users/jackmin/miniconda3/envs/docarray/lib/python3.10/multiprocessing/pool.py", line 367, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Users/jackmin/miniconda3/envs/docarray/lib/python3.10/multiprocessing/pool.py", line 774, in get
    raise self._value
  File "/Users/jackmin/miniconda3/envs/docarray/lib/python3.10/multiprocessing/pool.py", line 540, in _handle_tasks
    put(task)
  File "/Users/jackmin/miniconda3/envs/docarray/lib/python3.10/multiprocessing/connection.py", line 211, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/Users/jackmin/miniconda3/envs/docarray/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '__main__.B'>: it's not the same object as __main__.B
Enter fullscreen mode Exit fullscreen mode

What happened here? By declaring the class twice, we’ve overwritten our first Class B with a second Class B in the global scope. Pickle is aware of this when it tries to serialize C1. It will notice that the Class B C1 refers to is no longer the top-level one and raises an exception.

Qualified names must be unique

The issue here is that both Class Bs have the same qualified name. Thus, both definitions are fighting over who gets to be the one the global dictionary knows about.

We can resolve this conflict and allow our two classes to live together peacefully by moving them to different qualified names and thus, different keys in the global scope:

def get_class(version: int):
    global B

    class B:
        VERSION: int = version

    B.__qualname__ = B.__qualname__ + str(version)
    globals()[f'B{version}'] = B
    return B

C1 = get_class(1)
C2 = get_class(2)

def get_version(cls):
    print('Class Name:', cls.__name__)
    print('Class Qualified Name:', cls.__qualname__)
    print('Type repr', cls)
    return cls.VERSION

import multiprocessing as mp

with mp.get_context('fork').Pool(2) as p:
    print(p.map(get_version, [C1, C2]))
Enter fullscreen mode Exit fullscreen mode
Class Name: B
Class Qualified Name: B1
Type repr <class '__main__.B1'>
Class Name: B
Class Qualified Name: B2
Type repr <class '__main__.B2'>
[1, 2]
Enter fullscreen mode Exit fullscreen mode

Notice that although the two classes have different qualified names, they can still share the same name with no issues. Printing the type does however show the qualified name.

Implementation example

If you’d like to see how we used this pattern to implement DocumentArrays that work with multiprocessing, check out this PR.

Support Proto 3 and 4

Protobuf introduced a breaking change in their 4.21 release. This has had a big impact on the Python ecosystem, and a lot of libraries have not yet been updated to use version 4.x. Perhaps the biggest pain for the ML ecosystem is TensorFlow’s lack of support for Protobuf, as it’s a widely used library and many packages, including DocArray, depend on it.

At the same time, DocArray can be used without TensorFlow — It’s just one of several available backends. To better support all users, we’ve decided to support both versions of protobuf.

This is actually easier than it may sound. We simply generated two Python files with Protoc, one for each of the Protobuf versions we want to support (3.x and 4.x).

So, depending on the protobuf version you have installed, we either load the first or the second version of the proto file. It’s as straightforward as that. Here is the PR for the curious.

Join the conversation

Want to keep up to date or just have a chat with us? Join our Discord and say hi!

Join Discord

Author

Sami Jaghouar,Alex C-G,Charlotte Gerhaher,Jack Min Ong

Qriginal Link

https://jina.ai/news/this-week-in-docarray-2/https://jina.ai/news/this-week-in-docarray-2/

Top comments (0)