DEV Community

drake
drake

Posted on

OpenAI批处理的调用方式

  • 先将API KEY 配置到环境变量OPENAI_API_KEY

  • 代码示例

import time
import os
import requests
import json
from ebooklib import epub
from bs4 import BeautifulSoup
from openai import OpenAI
from PyPDF2 import PdfReader, PdfWriter

# openai API的使用方式:https://platform.openai.com/docs/quickstart?language-preference=python
client = OpenAI()


class Translate:
    """
    GPT-4o-mini:输入费用为每百万 tokens $0.15,输出费用为每百万 tokens $0.60
    批处理打五折
    批处理文档:https://platform.openai.com/docs/api-reference/batch/create
    """
    def __init__(self, source_file):
        self.file_path = source_file
        self.output_pdf = "output_translated1.pdf"
        self.batch_file = 'batch_input.json'
        self.batch_file_job_done = 'batch_output.json'
        # 通过环境变量获取api key
        OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')
        self.api_key = OPENAI_API_KEY

    def extract_text_from_pdf_translate(self):
        """
        从PDF中抽取文本,并且构建请求体,用\n合并所有请求体
        """
        reader = PdfReader(self.file_path)
        request_json_all_lines = ""
        num = 0
        for page in reader.pages:
            page_text = page.extract_text()
            page_text = page_text.strip()
            if not page_text:
                continue
            num += 1
            # 构建每一行的请求体
            line_request_json = self.build_batch_line(num, page_text) + '\n'
            request_json_all_lines += line_request_json
        return request_json_all_lines

    def extract_text_from_epub_translate(self):
        """
        从epub中抽取文本,并且构建请求体,用\n合并所有请求体
        """
        # 读取 EPUB 文件
        book = epub.read_epub(self.file_path, options={"ignore_ncx": True})
        request_json_all_lines = ""
        num = 0
        for item in book.get_items():
            # 检查 item 是否是正文类型(基于 MIME 类型)
            if item.media_type == 'application/xhtml+xml':  # 处理 xhtml 内容
                soup = BeautifulSoup(item.get_content(), 'html.parser')
                page_text = soup.get_text()
                page_text = page_text.strip()
                if not page_text:
                    continue
                num += 1
                # 构建每一行的请求体
                line_request_json = self.build_batch_line(num, page_text) + '\n'
                request_json_all_lines += line_request_json
        return request_json_all_lines

    def build_batch(self):
        """
        将整本书内容抽取出来,构建全部的请求体,并且生成请求体集合的文件
        """
        if '.pdf' in self.file_path:
            request_json_all_lines = self.extract_text_from_pdf_translate()
        elif '.epub' in self.file_path:
            request_json_all_lines = self.extract_text_from_epub_translate()
        else:
            print(f'文件类型有误')
            return
        # 生成批处理的请求体文件
        with open(self.batch_file, 'w')as f:
            f.write(request_json_all_lines)

    def build_batch_line(self, id, text):
        """
        构建列表中的一个请求对象
        """
        data = {
            "custom_id": str(id),
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-4o-mini",
                "messages": [
                    {"role": "system", "content": "You are a translation assistant."},
                    {"role": "user", "content": f"将该文本翻译成中文: {text}"}
                ]
            }
        }
        line = json.dumps(data)
        return line

    def upload_batchfile(self):
        """
        上传批处理文件
        响应体数据结构:
        {
          "object": "file",
          "id": "file-VnK61ScVxBsuZfGCRWn7Lc",
          "purpose": "fine-tune",
          "filename": "batch_input.json",
          "bytes": 5922839,
          "created_at": 1735787655,
          "status": "processed",
          "status_details": null
        }
        """
        url = 'https://api.openai.com/v1/files'
        headers = {
            'Authorization': f'Bearer {self.api_key}'
        }
        files = {
            'file': (self.batch_file, open(self.batch_file, 'rb')),
            # 'purpose': (None, 'fine-tune')
            'purpose': (None, 'batch')
        }
        response = requests.post(url, headers=headers, files=files)
        # string
        id = response.json()['id']
        print(f'批处理文件上传成功,input_file_id: {id}')
        return id

    def create_batch_request(self, input_file_id):
        """
        提交批处理请求, 返回 batch_job_id
        响应体样例:
        {
          "id": "batch_67760717ee4481909b10277c1227dcb7",
          "object": "batch",
          "endpoint": "/v1/chat/completions",
          "errors": null,
          "input_file_id": "file-78RvssonMDx1qE9XpQy7K3",
          "completion_window": "24h",
          "status": "validating",
          "output_file_id": null,
          "error_file_id": null,
          "created_at": 1735788312,
          "in_progress_at": null,
          "expires_at": 1735874712,
          "finalizing_at": null,
          "completed_at": null,
          "failed_at": null,
          "expired_at": null,
          "cancelling_at": null,
          "cancelled_at": null,
          "request_counts": {
            "total": 0,
            "completed": 0,
            "failed": 0
          },
          "metadata": null
        }
        """
        url = 'https://api.openai.com/v1/batches'
        headers = {
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json'
        }
        data = {
            'input_file_id': input_file_id,
            'endpoint': '/v1/chat/completions',
            'completion_window': '24h'
        }
        response = requests.post(url, headers=headers, data=json.dumps(data))
        # string
        batch_job_id = response.json()['id']
        print(f'批处理文件被接受并处理的回执任务ID:{batch_job_id}')
        return batch_job_id

    def commit_job(self):
        """
        提交任务,并获取任务处理的ID
        """
        # 先生成批处理文件
        self.build_batch()
        # 上传文件并获取回执的文件ID (该环节有可能失败)
        input_file_id = self.upload_batchfile()
        batch_job_id = self.create_batch_request(input_file_id)
        return batch_job_id

    def retrieve_batch(self, batch_job_id):
        """
        获取批处理请求的处理完成状态
        数据结构:
        {
            "id": "batch_67760717ee4481909b10277c1227dcb7",
            "object": "batch",
            "endpoint": "/v1/chat/completions",
            "errors": null,
            "input_file_id": "file-78RvssonMDx1qE9XpQy7K3",
            "completion_window": "24h",
            "status": "in_progress",
            "output_file_id": null,
            "error_file_id": null,
            "created_at": 1735788312,
            "in_progress_at": 1735788313,
            "expires_at": 1735874712,
            "finalizing_at": null,
            "completed_at": null,
            "failed_at": null,
            "expired_at": null,
            "cancelling_at": null,
            "cancelled_at": null,
            "request_counts": {
                "total": 110,
                "completed": 60,
                "failed": 0
            },
            "metadata": null
        }
        """
        # 获取批处理请求的处理结果
        url = f'https://api.openai.com/v1/batches/{batch_job_id}'
        headers = {
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json'
        }
        response = requests.get(url, headers=headers)
        data = response.json()
        created_at = data['created_at']
        expires_at = data['expires_at']
        request_counts_total = data['request_counts']['total']
        request_counts_completed = data['request_counts']['completed']
        request_counts_failed = data['request_counts']['failed']
        created_at = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(created_at))
        expires_at = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(expires_at))
        print(
            f'创建时间:{created_at}\n过期时间:{expires_at}\n'
            f'总请求数:{request_counts_total}\n完成数:{request_counts_completed}\n'
            f'失败数:{request_counts_failed}\n'
        )
        print('------'*10)
        return data

    def wait_batch_job_done(self, batch_job_id):
        """
        等待批处理任务(openai 执行完毕)处理完成
        """
        while True:
            # 查询处理进度状态
            data = self.retrieve_batch(batch_job_id)
            status = data['status']
            if status == 'completed':
                # string
                output_file_id = data['output_file_id']
                print(f'批处理任务全部处理完成,output_file_id:{output_file_id}')
                break
            time.sleep(100)

        # TODO 检索文件内容 + 保存到本地
        # https://platform.openai.com/docs/api-reference/files/retrieve-contents
        url = f'https://api.openai.com/v1/files/{output_file_id}/content'
        headers = {
            'Authorization': f'Bearer {self.api_key}'
        }
        response = requests.get(url, headers=headers)
        # Check if the request was successful
        if response.status_code == 200:
            with open(self.batch_file_job_done, 'wb') as file:
                file.write(response.content)
            print('File content downloaded successfully to file.jsonl')
        else:
            print('Failed to download file:', response.status_code, response.text)

    def run(self):
        # 提交任务给OpenAI处理
        batch_job_id = self.commit_job()
        # 获取处理结果
        self.wait_batch_job_done(batch_job_id)

if __name__ == '__main__':
    source_file = 'Principles of Neural Science.epub'
    Translate(source_file).run()
Enter fullscreen mode Exit fullscreen mode

Top comments (1)

Collapse
 
dragon72463399 profile image
drake

打印示例:


创建时间:2025-01-02 11:25:12
过期时间:2025-01-03 11:25:12
总请求数:110
完成数:110
失败数:0


批处理任务全部处理完成,output_file_id:file-Qof7D5GpC99RdsHciFem5f
File content downloaded successfully to file.jsonl