DEV Community

Alin Climente
Alin Climente

Posted on

How to create gemini-embedding-2 using Google Batch API

Here are some sample functions extracted directly from an app I'm building. Wasted some hours on it to get it working so if any dev or LLM comes on to this article - here is how to do it right first time.

First we need to create the .jsonl file for the google embeddings batch api. Most important part to retain here is the format from payload.


    def create_jsonl_for_embeddings(self):
        ta = TypeAdapter(list[VAnchorText])
        jsonl_path = SURSE_LEGISLATIE_JUST_EXPANDED_DIR / "embeddings-requests.jsonl"

        with open(jsonl_path, "w", encoding="utf-8") as f:
            for s in surse_legislatie_just:
                toc_path = (
                    SURSE_LEGISLATIE_JUST_EXPANDED_DIR / s.slug / f"toc-{s.slug}.json"
                )
                if not toc_path.exists():
                    continue

                toc_data = SurseConsolidate.model_validate_json(toc_path.read_text())

                for c in toc_data.consolidari:
                    anchors_path = (
                        SURSE_LEGISLATIE_JUST_EXPANDED_DIR
                        / s.slug
                        / c.html_name.replace(".html", ".json")
                    )

                    if not anchors_path.exists():
                        continue

                    anchors_data = ta.validate_json(anchors_path.read_text())

                    for anchor in anchors_data:
                        if not anchor.text:
                            continue

                        payload = {
                            "key": anchor.html_id,
                            "request": {
                                "model": "models/gemini-embedding-2",
                                "output_dimensionality": 768,
                                "content": {"parts": [{"text": anchor.text}]},
                            },
                        }
                        f.write(json.dumps(payload) + "\n")
Enter fullscreen mode Exit fullscreen mode

Now if the file is too big you'll get 429 RESOURCE_EXHAUSTED from google. Which is wrong if you are on the paid tier. Docs on this is lacking.

google.genai.errors.ClientError: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429, 'message': 'You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. To monitor your current usage, head to: https://ai.dev/rate-limit. ', 'status': 'RESOURCE_EXHAUSTED', 'details': [{'@type': 'type.googleapis.com/google.rpc.Help', 'links': [{'description': 'Learn more about Gemini API quotas', 'url': 'https://ai.google.dev/gemini-api/docs/rate-limits'}]}]}}
Enter fullscreen mode Exit fullscreen mode

What to retain from this code is that you need to batch requests 5k lines from .jsonl file.

    def generate_embeddings_from_jsonl(self):
        jsonl_path = SURSE_LEGISLATIE_JUST_EXPANDED_DIR / "embeddings-requests.jsonl"
        embeddings_path = SURSE_LEGISLATIE_JUST_EXPANDED_DIR / "embeddings.jsonl"
        assert jsonl_path.exists()

        client = genai.Client(api_key=GOOGLE_GENAI_KEY)

        # 1. Read all lines and calculate chunks
        all_lines = jsonl_path.read_text(encoding="utf-8").splitlines()
        chunk_size = 5000
        total_chunks = math.ceil(len(all_lines) / chunk_size)

        log.debug(
            f"Starting chunked embedding: {len(all_lines)} lines split into {total_chunks} batches."
        )

        # 2. Clear previous output file so we start fresh for appending
        if embeddings_path.exists():
            embeddings_path.unlink()
            log.debug(f"Cleared existing {embeddings_path.name}")

        # 3. Process each chunk sequentially
        for i in range(total_chunks):
            start_idx = i * chunk_size
            end_idx = start_idx + chunk_size
            chunk_lines = all_lines[start_idx:end_idx]

            # Create a temporary JSONL for this specific chunk
            chunk_file_path = (
                SURSE_LEGISLATIE_JUST_EXPANDED_DIR / f"temp_chunk_{i}.jsonl"
            )
            # Ensure it ends with a newline
            chunk_file_path.write_text("\n".join(chunk_lines) + "\n", encoding="utf-8")

            log.info(f"--- Processing Batch {i + 1}/{total_chunks} ---")

            # Upload the chunk
            uploaded_file = client.files.upload(
                file=chunk_file_path,
                config=types.UploadFileConfig(
                    display_name=f"chatcodfiscal-chunk-{i}",
                    mime_type="application/jsonl",
                ),
            )

            log.debug(
                f"Chunk uploaded as {uploaded_file.name}. Waiting for it to be ACTIVE..."
            )
            while True:
                file_status = client.files.get(name=uploaded_file.name)
                if file_status.state.name == "ACTIVE":
                    log.debug("File is ACTIVE.")
                    break
                elif file_status.state.name == "FAILED":
                    log.error(f"File processing failed on Google's side for chunk {i}.")
                    return

                time.sleep(5)

            # Create the batch job
            batch_job = client.batches.create_embeddings(
                model="models/gemini-embedding-2",
                src=types.EmbeddingsBatchJobSource(file_name=uploaded_file.name),
                config=types.CreateEmbeddingsBatchJobConfig(
                    display_name=f"chatcodfiscal-batch-{i}",
                ),
            )

            # Wait for the batch job to finish
            while True:
                batch_job = client.batches.get(name=batch_job.name)
                if batch_job.state.name in (
                    "JOB_STATE_SUCCEEDED",
                    "JOB_STATE_FAILED",
                    "JOB_STATE_CANCELLED",
                ):
                    break
                log.debug(f"Job state: {batch_job.state.name}. Waiting 30 seconds...")
                time.sleep(30)

            log.debug(f"Batch {i + 1} finished with state: {batch_job.state.name}")

            if batch_job.state.name == "JOB_STATE_FAILED":
                log.error(f"Batch {i + 1} Error: {batch_job.error}")
                return

            if batch_job.state.name == "JOB_STATE_SUCCEEDED":
                # Download results and APPEND bytes to the main embeddings file
                file_content_bytes = client.files.download(
                    file=batch_job.dest.file_name
                )

                # "ab" mode opens the file in binary format for appending
                with open(embeddings_path, "ab") as f:
                    f.write(file_content_bytes)

                log.info(
                    f"Chunk {i + 1} successfully appended to {embeddings_path.name}"
                )

            # Clean up temporary chunk file from disk
            chunk_file_path.unlink(missing_ok=True)

            # Delete the file from Google's servers to clear quota faster
            client.files.delete(name=uploaded_file.name)

        log.info("All embeddings generated and saved to a single file successfully!")
Enter fullscreen mode Exit fullscreen mode

Top comments (0)