DEV Community

drake
drake

Posted on

OpenAI批处理的调用方式

  • 先将API KEY 配置到环境变量OPENAI_API_KEY

  • 代码示例

import time
import os
import requests
import json
from ebooklib import epub
from bs4 import BeautifulSoup
from openai import OpenAI
from PyPDF2 import PdfReader, PdfWriter

# openai API的使用方式:https://platform.openai.com/docs/quickstart?language-preference=python
client = OpenAI()


class Translate:
    """
    GPT-4o-mini:输入费用为每百万 tokens $0.15,输出费用为每百万 tokens $0.60
    批处理打五折
    批处理文档:https://platform.openai.com/docs/api-reference/batch/create
    """
    def __init__(self, source_file):
        self.file_path = source_file
        self.output_pdf = "output_translated1.pdf"
        self.batch_file = 'batch_input.json'
        self.batch_file_job_done = 'batch_output.json'
        # 通过环境变量获取api key
        OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')
        self.api_key = OPENAI_API_KEY

    def extract_text_from_pdf_translate(self):
        """
        从PDF中抽取文本,并且构建请求体,用\n合并所有请求体
        """
        reader = PdfReader(self.file_path)
        request_json_all_lines = ""
        num = 0
        for page in reader.pages:
            page_text = page.extract_text()
            page_text = page_text.strip()
            if not page_text:
                continue
            num += 1
            # 构建每一行的请求体
            line_request_json = self.build_batch_line(num, page_text) + '\n'
            request_json_all_lines += line_request_json
        return request_json_all_lines

    def extract_text_from_epub_translate(self):
        """
        从epub中抽取文本,并且构建请求体,用\n合并所有请求体
        """
        # 读取 EPUB 文件
        book = epub.read_epub(self.file_path, options={"ignore_ncx": True})
        request_json_all_lines = ""
        num = 0
        for item in book.get_items():
            # 检查 item 是否是正文类型(基于 MIME 类型)
            if item.media_type == 'application/xhtml+xml':  # 处理 xhtml 内容
                soup = BeautifulSoup(item.get_content(), 'html.parser')
                page_text = soup.get_text()
                page_text = page_text.strip()
                if not page_text:
                    continue
                num += 1
                # 构建每一行的请求体
                line_request_json = self.build_batch_line(num, page_text) + '\n'
                request_json_all_lines += line_request_json
        return request_json_all_lines

    def build_batch(self):
        """
        将整本书内容抽取出来,构建全部的请求体,并且生成请求体集合的文件
        """
        if '.pdf' in self.file_path:
            request_json_all_lines = self.extract_text_from_pdf_translate()
        elif '.epub' in self.file_path:
            request_json_all_lines = self.extract_text_from_epub_translate()
        else:
            print(f'文件类型有误')
            return
        # 生成批处理的请求体文件
        with open(self.batch_file, 'w')as f:
            f.write(request_json_all_lines)

    def build_batch_line(self, id, text):
        """
        构建列表中的一个请求对象
        """
        data = {
            "custom_id": str(id),
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-4o-mini",
                "messages": [
                    {"role": "system", "content": "You are a translation assistant."},
                    {"role": "user", "content": f"将该文本翻译成中文: {text}"}
                ]
            }
        }
        line = json.dumps(data)
        return line

    def upload_batchfile(self):
        """
        上传批处理文件
        响应体数据结构:
        {
          "object": "file",
          "id": "file-VnK61ScVxBsuZfGCRWn7Lc",
          "purpose": "fine-tune",
          "filename": "batch_input.json",
          "bytes": 5922839,
          "created_at": 1735787655,
          "status": "processed",
          "status_details": null
        }
        """
        url = 'https://api.openai.com/v1/files'
        headers = {
            'Authorization': f'Bearer {self.api_key}'
        }
        files = {
            'file': (self.batch_file, open(self.batch_file, 'rb')),
            # 'purpose': (None, 'fine-tune')
            'purpose': (None, 'batch')
        }
        response = requests.post(url, headers=headers, files=files)
        # string
        id = response.json()['id']
        print(f'批处理文件上传成功,input_file_id: {id}')
        return id

    def create_batch_request(self, input_file_id):
        """
        提交批处理请求, 返回 batch_job_id
        响应体样例:
        {
          "id": "batch_67760717ee4481909b10277c1227dcb7",
          "object": "batch",
          "endpoint": "/v1/chat/completions",
          "errors": null,
          "input_file_id": "file-78RvssonMDx1qE9XpQy7K3",
          "completion_window": "24h",
          "status": "validating",
          "output_file_id": null,
          "error_file_id": null,
          "created_at": 1735788312,
          "in_progress_at": null,
          "expires_at": 1735874712,
          "finalizing_at": null,
          "completed_at": null,
          "failed_at": null,
          "expired_at": null,
          "cancelling_at": null,
          "cancelled_at": null,
          "request_counts": {
            "total": 0,
            "completed": 0,
            "failed": 0
          },
          "metadata": null
        }
        """
        url = 'https://api.openai.com/v1/batches'
        headers = {
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json'
        }
        data = {
            'input_file_id': input_file_id,
            'endpoint': '/v1/chat/completions',
            'completion_window': '24h'
        }
        response = requests.post(url, headers=headers, data=json.dumps(data))
        # string
        batch_job_id = response.json()['id']
        print(f'批处理文件被接受并处理的回执任务ID:{batch_job_id}')
        return batch_job_id

    def commit_job(self):
        """
        提交任务,并获取任务处理的ID
        """
        # 先生成批处理文件
        self.build_batch()
        # 上传文件并获取回执的文件ID (该环节有可能失败)
        input_file_id = self.upload_batchfile()
        batch_job_id = self.create_batch_request(input_file_id)
        return batch_job_id

    def retrieve_batch(self, batch_job_id):
        """
        获取批处理请求的处理完成状态
        数据结构:
        {
            "id": "batch_67760717ee4481909b10277c1227dcb7",
            "object": "batch",
            "endpoint": "/v1/chat/completions",
            "errors": null,
            "input_file_id": "file-78RvssonMDx1qE9XpQy7K3",
            "completion_window": "24h",
            "status": "in_progress",
            "output_file_id": null,
            "error_file_id": null,
            "created_at": 1735788312,
            "in_progress_at": 1735788313,
            "expires_at": 1735874712,
            "finalizing_at": null,
            "completed_at": null,
            "failed_at": null,
            "expired_at": null,
            "cancelling_at": null,
            "cancelled_at": null,
            "request_counts": {
                "total": 110,
                "completed": 60,
                "failed": 0
            },
            "metadata": null
        }
        """
        # 获取批处理请求的处理结果
        url = f'https://api.openai.com/v1/batches/{batch_job_id}'
        headers = {
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json'
        }
        response = requests.get(url, headers=headers)
        data = response.json()
        created_at = data['created_at']
        expires_at = data['expires_at']
        request_counts_total = data['request_counts']['total']
        request_counts_completed = data['request_counts']['completed']
        request_counts_failed = data['request_counts']['failed']
        created_at = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(created_at))
        expires_at = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(expires_at))
        print(
            f'创建时间:{created_at}\n过期时间:{expires_at}\n'
            f'总请求数:{request_counts_total}\n完成数:{request_counts_completed}\n'
            f'失败数:{request_counts_failed}\n'
        )
        print('------'*10)
        return data

    def wait_batch_job_done(self, batch_job_id):
        """
        等待批处理任务(openai 执行完毕)处理完成
        """
        while True:
            # 查询处理进度状态
            data = self.retrieve_batch(batch_job_id)
            status = data['status']
            if status == 'completed':
                # string
                output_file_id = data['output_file_id']
                print(f'批处理任务全部处理完成,output_file_id:{output_file_id}')
                break
            time.sleep(100)

        # TODO 检索文件内容 + 保存到本地
        # https://platform.openai.com/docs/api-reference/files/retrieve-contents
        url = f'https://api.openai.com/v1/files/{output_file_id}/content'
        headers = {
            'Authorization': f'Bearer {self.api_key}'
        }
        response = requests.get(url, headers=headers)
        # Check if the request was successful
        if response.status_code == 200:
            with open(self.batch_file_job_done, 'wb') as file:
                file.write(response.content)
            print('File content downloaded successfully to file.jsonl')
        else:
            print('Failed to download file:', response.status_code, response.text)

    def run(self):
        # 提交任务给OpenAI处理
        batch_job_id = self.commit_job()
        # 获取处理结果
        self.wait_batch_job_done(batch_job_id)

if __name__ == '__main__':
    source_file = 'Principles of Neural Science.epub'
    Translate(source_file).run()
Enter fullscreen mode Exit fullscreen mode

Image of Timescale

🚀 pgai Vectorizer: SQLAlchemy and LiteLLM Make Vector Search Simple

We built pgai Vectorizer to simplify embedding management for AI applications—without needing a separate database or complex infrastructure. Since launch, developers have created over 3,000 vectorizers on Timescale Cloud, with many more self-hosted.

Read more

Top comments (1)

Collapse
 
dragon72463399 profile image
drake

打印示例:


创建时间:2025-01-02 11:25:12
过期时间:2025-01-03 11:25:12
总请求数:110
完成数:110
失败数:0


批处理任务全部处理完成,output_file_id:file-Qof7D5GpC99RdsHciFem5f
File content downloaded successfully to file.jsonl

Postmark Image

Speedy emails, satisfied customers

Are delayed transactional emails costing you user satisfaction? Postmark delivers your emails almost instantly, keeping your customers happy and connected.

Sign up