Here are some sample functions extracted directly from an app I'm building. Wasted some hours on it to get it working so if any dev or LLM comes on to this article - here is how to do it right first time.
First we need to create the .jsonl file for the google embeddings batch api. Most important part to retain here is the format from payload.
def create_jsonl_for_embeddings(self):
ta = TypeAdapter(list[VAnchorText])
jsonl_path = SURSE_LEGISLATIE_JUST_EXPANDED_DIR / "embeddings-requests.jsonl"
with open(jsonl_path, "w", encoding="utf-8") as f:
for s in surse_legislatie_just:
toc_path = (
SURSE_LEGISLATIE_JUST_EXPANDED_DIR / s.slug / f"toc-{s.slug}.json"
)
if not toc_path.exists():
continue
toc_data = SurseConsolidate.model_validate_json(toc_path.read_text())
for c in toc_data.consolidari:
anchors_path = (
SURSE_LEGISLATIE_JUST_EXPANDED_DIR
/ s.slug
/ c.html_name.replace(".html", ".json")
)
if not anchors_path.exists():
continue
anchors_data = ta.validate_json(anchors_path.read_text())
for anchor in anchors_data:
if not anchor.text:
continue
payload = {
"key": anchor.html_id,
"request": {
"model": "models/gemini-embedding-2",
"output_dimensionality": 768,
"content": {"parts": [{"text": anchor.text}]},
},
}
f.write(json.dumps(payload) + "\n")
Now if the file is too big you'll get 429 RESOURCE_EXHAUSTED from google. Which is wrong if you are on the paid tier. Docs on this is lacking.
google.genai.errors.ClientError: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429, 'message': 'You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. To monitor your current usage, head to: https://ai.dev/rate-limit. ', 'status': 'RESOURCE_EXHAUSTED', 'details': [{'@type': 'type.googleapis.com/google.rpc.Help', 'links': [{'description': 'Learn more about Gemini API quotas', 'url': 'https://ai.google.dev/gemini-api/docs/rate-limits'}]}]}}
What to retain from this code is that you need to batch requests 5k lines from .jsonl file.
def generate_embeddings_from_jsonl(self):
jsonl_path = SURSE_LEGISLATIE_JUST_EXPANDED_DIR / "embeddings-requests.jsonl"
embeddings_path = SURSE_LEGISLATIE_JUST_EXPANDED_DIR / "embeddings.jsonl"
assert jsonl_path.exists()
client = genai.Client(api_key=GOOGLE_GENAI_KEY)
# 1. Read all lines and calculate chunks
all_lines = jsonl_path.read_text(encoding="utf-8").splitlines()
chunk_size = 5000
total_chunks = math.ceil(len(all_lines) / chunk_size)
log.debug(
f"Starting chunked embedding: {len(all_lines)} lines split into {total_chunks} batches."
)
# 2. Clear previous output file so we start fresh for appending
if embeddings_path.exists():
embeddings_path.unlink()
log.debug(f"Cleared existing {embeddings_path.name}")
# 3. Process each chunk sequentially
for i in range(total_chunks):
start_idx = i * chunk_size
end_idx = start_idx + chunk_size
chunk_lines = all_lines[start_idx:end_idx]
# Create a temporary JSONL for this specific chunk
chunk_file_path = (
SURSE_LEGISLATIE_JUST_EXPANDED_DIR / f"temp_chunk_{i}.jsonl"
)
# Ensure it ends with a newline
chunk_file_path.write_text("\n".join(chunk_lines) + "\n", encoding="utf-8")
log.info(f"--- Processing Batch {i + 1}/{total_chunks} ---")
# Upload the chunk
uploaded_file = client.files.upload(
file=chunk_file_path,
config=types.UploadFileConfig(
display_name=f"chatcodfiscal-chunk-{i}",
mime_type="application/jsonl",
),
)
log.debug(
f"Chunk uploaded as {uploaded_file.name}. Waiting for it to be ACTIVE..."
)
while True:
file_status = client.files.get(name=uploaded_file.name)
if file_status.state.name == "ACTIVE":
log.debug("File is ACTIVE.")
break
elif file_status.state.name == "FAILED":
log.error(f"File processing failed on Google's side for chunk {i}.")
return
time.sleep(5)
# Create the batch job
batch_job = client.batches.create_embeddings(
model="models/gemini-embedding-2",
src=types.EmbeddingsBatchJobSource(file_name=uploaded_file.name),
config=types.CreateEmbeddingsBatchJobConfig(
display_name=f"chatcodfiscal-batch-{i}",
),
)
# Wait for the batch job to finish
while True:
batch_job = client.batches.get(name=batch_job.name)
if batch_job.state.name in (
"JOB_STATE_SUCCEEDED",
"JOB_STATE_FAILED",
"JOB_STATE_CANCELLED",
):
break
log.debug(f"Job state: {batch_job.state.name}. Waiting 30 seconds...")
time.sleep(30)
log.debug(f"Batch {i + 1} finished with state: {batch_job.state.name}")
if batch_job.state.name == "JOB_STATE_FAILED":
log.error(f"Batch {i + 1} Error: {batch_job.error}")
return
if batch_job.state.name == "JOB_STATE_SUCCEEDED":
# Download results and APPEND bytes to the main embeddings file
file_content_bytes = client.files.download(
file=batch_job.dest.file_name
)
# "ab" mode opens the file in binary format for appending
with open(embeddings_path, "ab") as f:
f.write(file_content_bytes)
log.info(
f"Chunk {i + 1} successfully appended to {embeddings_path.name}"
)
# Clean up temporary chunk file from disk
chunk_file_path.unlink(missing_ok=True)
# Delete the file from Google's servers to clear quota faster
client.files.delete(name=uploaded_file.name)
log.info("All embeddings generated and saved to a single file successfully!")
Top comments (0)