DEV Community

drake
drake

Posted on

openai批处理翻译PDF文档

import time
import os
import requests
import json
from fpdf import FPDF
from ebooklib import epub
from bs4 import BeautifulSoup
from openai import OpenAI
from retry import retry
from PyPDF2 import PdfReader
import re

# openai API的使用方式:https://platform.openai.com/docs/quickstart?language-preference=python
"""
批处理模式
每本书(833页的英文文档)翻译成本大概是0.2美元,合人民币1.4元;相当划算了!!!
"""
client = OpenAI()

class PDF(FPDF):
    """
    PDF生成的基类 - 优化版本
    """
    def __init__(self):
        super().__init__()
        self.setup_chinese_font()

    def setup_chinese_font(self):
        """安全地设置中文字体"""
        font_path = './kaiti.ttf'
        try:
            if os.path.exists(font_path):
                # 修复时间戳问题
                current_time = time.time()
                os.utime(font_path, (current_time, current_time))

                # 使用新的API添加字体(移除uni参数)
                self.add_font('kaiti', '', font_path)
                self.font_available = True
                print("中文字体加载成功")
            else:
                print("字体文件不存在,将使用默认字体")
                self.font_available = False
        except Exception as e:
            print(f"字体加载失败: {e},将使用默认字体")
            self.font_available = False

    def set_chinese_font(self, size=12):
        """设置中文字体"""
        if self.font_available:
            try:
                self.set_font('kaiti', '', size)
            except Exception as e:
                print(f"设置中文字体失败: {e},使用默认字体")
                self.set_font('Arial', '', size)
        else:
            self.set_font('Arial', '', size)

    def footer(self):
        """页脚设置"""
        self.set_y(-15)
        self.set_chinese_font(8)
        try:
            self.cell(0, 10, f'{self.page_no()}', 0, 0, 'C')
        except Exception as e:
            print(f"页脚添加失败: {e}")
            # 使用英文页脚作为备选
            self.set_font('Arial', '', 8)
            self.cell(0, 10, f'Page {self.page_no()}', 0, 0, 'C')

class Topdf:
    """
    生成PDF文件 - 优化版本
    """
    def __init__(self, pdf_path, txt_file_path):
        """
        pdf_path:输出的PDF文件名
        txt_file_path:需要被转为PDF的txt文件名
        """
        self.pdf_file = pdf_path
        self.txt_file = txt_file_path

    def clean_text(self, text):
        """清理文本,移除可能导致问题的字符"""
        if not isinstance(text, str):
            text = str(text)

        # 移除控制字符
        text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text)

        # 处理常见的特殊字符
        text = text.replace('\u2018', "'").replace('\u2019', "'")  # 智能引号
        text = text.replace('\u201C', '"').replace('\u201D', '"')  # 智能双引号
        text = text.replace('\u2013', '-').replace('\u2014', '--') # em dash, en dash
        text = text.replace('\u2026', '...')  # 省略号

        # 过滤掉超出 BMP 平面的字符
        text = ''.join(c for c in text if ord(c) <= 0xFFFF)

        return text

    def safe_add_text(self, pdf, text):
        """安全地添加文本到PDF"""
        try:
            cleaned_text = self.clean_text(text)
            # 获取页面可用宽度
            available_width = pdf.w - pdf.l_margin - pdf.r_margin

            # 使用multi_cell添加文本,自动处理换行
            pdf.multi_cell(available_width, 10, cleaned_text)

        except Exception as e:
            print(f"添加文本时出错: {e}")
            # 尝试添加错误提示
            try:
                pdf.multi_cell(0, 10, "[文本显示错误]")
            except:
                pass

    def to_pdf_from_text_file(self):
        """从text文件生成PDF文件"""
        try:
            # 创建PDF对象
            pdf = PDF()
            pdf.set_left_margin(15)
            pdf.set_right_margin(15)
            pdf.add_page()
            pdf.set_auto_page_break(auto=True, margin=20)
            pdf.set_chinese_font(12)

            # 读取TXT文件
            with open(self.txt_file, 'r', encoding='utf-8') as file:
                text = file.read()

            # 安全地添加文本到PDF
            self.safe_add_text(pdf, text)

            # 保存PDF文件
            pdf.output(self.pdf_file)
            print(f"TXT内容已成功保存到 {self.pdf_file}")

        except Exception as e:
            print(f"生成PDF时出错: {e}")

    def to_pdf(self, pages_list):
        """从翻译结果的列表生成PDF文件"""
        try:
            # 创建PDF对象
            pdf = PDF()
            pdf.set_left_margin(15)
            pdf.set_right_margin(15)
            pdf.add_page()
            pdf.set_auto_page_break(auto=True, margin=20)
            pdf.set_chinese_font(12)

            for i, page in enumerate(pages_list):
                print(f"正在处理第 {i+1}/{len(pages_list)} 页...")

                # 安全地添加每页内容
                self.safe_add_text(pdf, page)

                # 在每页之间添加间距
                if i < len(pages_list) - 1:
                    pdf.ln(5)

            # 保存PDF文件
            pdf.output(self.pdf_file)
            print(f"翻译结果内容已成功保存到 {self.pdf_file}")

        except Exception as e:
            print(f"生成PDF时出错: {e}")

class Translate:
    """
    GPT-4o-mini批处理翻译类 - 优化版本
    """
    def __init__(self, source_file):
        """
        source_file: 需要翻译的文件名
        """
        directory = 'files/'
        if not os.path.exists(directory):
            os.makedirs(directory, exist_ok=True)

        # 文件路径设置
        self.file_path = os.path.join(directory, source_file)
        self.batch_file = os.path.join(directory, 'batch_input.jsonl')
        self.batch_file_job_done = os.path.join(directory, 'batch_output.jsonl')
        self.output_txt = os.path.join(directory, f"{source_file.split('.')[0]}.txt")
        self.output_pdf = os.path.join(directory, f"{source_file.split('.')[0]}.pdf")

        # API密钥
        self.api_key = os.environ.get('OPENAI_API_KEY')
        if not self.api_key:
            raise ValueError("请设置 OPENAI_API_KEY 环境变量")

    def extract_text_from_pdf_translate(self):
        """从PDF中抽取文本并构建请求体"""
        try:
            reader = PdfReader(self.file_path)
            request_json_all_lines = ""
            num = 0

            print(f"开始处理PDF文件,共 {len(reader.pages)}")

            for i, page in enumerate(reader.pages):
                try:
                    page_text = page.extract_text().strip()
                    if not page_text:
                        print(f"{i+1} 页无文本内容,跳过")
                        continue

                    num += 1
                    print(f"处理第 {num} 页文本...")

                    # 构建请求体
                    line_request_json = self.build_batch_line(num, page_text) + '\n'
                    request_json_all_lines += line_request_json

                except Exception as e:
                    print(f"处理第 {i+1} 页时出错: {e}")
                    continue

            print(f"PDF文本提取完成,共处理 {num}")
            return request_json_all_lines

        except Exception as e:
            print(f"PDF文件处理失败: {e}")
            return ""

    def extract_text_from_epub_translate(self):
        """从epub中抽取文本并构建请求体"""
        try:
            book = epub.read_epub(self.file_path, options={"ignore_ncx": True})
            request_json_all_lines = ""
            num = 0

            print("开始处理EPUB文件...")

            for item in book.get_items():
                try:
                    if item.media_type == 'application/xhtml+xml':
                        soup = BeautifulSoup(item.get_content(), 'html.parser')
                        page_text = soup.get_text().strip()

                        if not page_text:
                            continue

                        num += 1
                        print(f"处理第 {num} 章节...")

                        # 构建请求体
                        line_request_json = self.build_batch_line(num, page_text) + '\n'
                        request_json_all_lines += line_request_json

                except Exception as e:
                    print(f"处理章节时出错: {e}")
                    continue

            print(f"EPUB文本提取完成,共处理 {num} 个章节")
            return request_json_all_lines

        except Exception as e:
            print(f"EPUB文件处理失败: {e}")
            return ""

    def build_batch(self):
        """构建批处理请求文件"""
        print("开始构建批处理请求...")

        if '.pdf' in self.file_path.lower():
            request_json_all_lines = self.extract_text_from_pdf_translate()
        elif '.epub' in self.file_path.lower():
            request_json_all_lines = self.extract_text_from_epub_translate()
        else:
            print(f'不支持的文件类型: {self.file_path}')
            return False

        if not request_json_all_lines.strip():
            print('无法从文档中抽取文本!')
            return False

        # 生成批处理请求文件
        try:
            with open(self.batch_file, 'w', encoding='utf-8') as f:
                f.write(request_json_all_lines)
            print(f"批处理请求文件已生成: {self.batch_file}")
            return True
        except Exception as e:
            print(f"生成批处理文件失败: {e}")
            return False

    def build_batch_line(self, id, text):
        """构建单个请求对象"""
        # 限制文本长度,避免token过多
        max_length = 8000  # 适当限制长度
        if len(text) > max_length:
            text = text[:max_length] + "..."

        data = {
            "custom_id": str(id),
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-4o-mini",
                "messages": [
                    {"role": "system", "content": "You are a professional translation assistant. Translate the given text into Chinese accurately and naturally."},
                    {"role": "user", "content": f"请将以下文本翻译成中文,保持原文的格式和结构:\n\n{text}"}
                ],
                "max_tokens": 4000
            }
        }
        return json.dumps(data, ensure_ascii=False)

    def upload_batchfile(self):
        """上传批处理文件"""
        try:
            with open(self.batch_file, "rb") as f:
                batch_input_file = client.files.create(
                    file=f,
                    purpose="batch"
                )
            print(f'批处理文件上传成功,input_file_id: {batch_input_file.id}')
            return batch_input_file.id
        except Exception as e:
            print(f'批处理文件上传失败: {e}')
            raise

    def create_batch_request(self, input_file_id):
        """提交批处理请求"""
        try:
            batch_job = client.batches.create(
                input_file_id=input_file_id,
                endpoint="/v1/chat/completions",
                completion_window="24h",
                metadata={
                    "description": "Translation batch job"
                }
            )
            batch_job_id = batch_job.id
            print(f'批处理任务已提交,任务ID:{batch_job_id}')
            return batch_job_id
        except Exception as e:
            print(f'提交批处理请求失败: {e}')
            raise

    def commit_job(self):
        """提交翻译任务"""
        # 构建批处理文件
        if not self.build_batch():
            return None

        try:
            # 上传文件
            input_file_id = self.upload_batchfile()
            # 提交批处理请求
            batch_job_id = self.create_batch_request(input_file_id)
            return batch_job_id
        except Exception as e:
            print(f'提交任务失败: {e}')
            return None

    @retry(tries=10, delay=10)
    def retrieve_batch(self, batch_job_id):
        """获取批处理状态"""
        try:
            url = f'https://api.openai.com/v1/batches/{batch_job_id}'
            headers = {
                'Authorization': f'Bearer {self.api_key}',
                'Content-Type': 'application/json'
            }
            response = requests.get(url, headers=headers)
            response.raise_for_status()

            data = response.json()

            # 格式化输出状态信息
            created_at = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(data['created_at']))
            expires_at = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(data['expires_at']))

            print(f'任务状态: {data["status"]}')
            print(f'创建时间: {created_at}')
            print(f'过期时间: {expires_at}')
            print(f'总请求数: {data["request_counts"]["total"]}')
            print(f'完成数: {data["request_counts"]["completed"]}')
            print(f'失败数: {data["request_counts"]["failed"]}')
            print('-' * 60)

            return data

        except Exception as e:
            print(f'获取批处理状态失败: {e}')
            raise

    def wait_batch_job_done(self, batch_job_id):
        """等待批处理任务完成"""
        print(f"等待批处理任务完成: {batch_job_id}")

        while True:
            try:
                data = self.retrieve_batch(batch_job_id)
                status = data['status']

                if status == 'completed':
                    output_file_id = data['output_file_id']
                    print(f'批处理任务完成!输出文件ID: {output_file_id}')

                    # 下载结果文件
                    self.download_result_file(output_file_id)
                    break

                elif status == 'failed':
                    print('批处理任务失败!')
                    if data.get('error_file_id'):
                        print(f'错误文件ID: {data["error_file_id"]}')
                    break

                elif status == 'expired':
                    print('批处理任务已过期!')
                    break

                else:
                    print(f'任务进行中,状态: {status},等待100秒后再次检查...')
                    time.sleep(100)

            except Exception as e:
                print(f'检查任务状态时出错: {e}')
                time.sleep(100)

    def download_result_file(self, output_file_id):
        """下载结果文件"""
        try:
            content = client.files.content(output_file_id)
            content_bytes = content.read()

            if content_bytes:
                with open(self.batch_file_job_done, 'wb') as file:
                    file.write(content_bytes)
                print(f'结果文件下载成功: {self.batch_file_job_done}')
            else:
                print('下载的文件内容为空')

        except Exception as e:
            print(f'下载结果文件失败: {e}')
            raise

    def save_translation_result(self):
        """保存翻译结果"""
        try:
            pages_content_translated = []
            origin_data_list = []

            # 读取JSONL文件
            with open(self.batch_file_job_done, 'r', encoding='utf-8') as file:
                for line in file:
                    if line.strip():
                        try:
                            line_json = json.loads(line)
                            origin_data_list.append(line_json)
                        except json.JSONDecodeError as e:
                            print(f'解析JSON行时出错: {e}')
                            continue

            if not origin_data_list:
                print('没有找到有效的翻译结果')
                return

            # 按custom_id排序
            data_list = sorted(origin_data_list, key=lambda x: int(x["custom_id"]))

            print(f'开始处理 {len(data_list)} 条翻译结果...')

            for i, data in enumerate(data_list):
                try:
                    if 'response' in data and 'body' in data['response']:
                        content = data['response']['body']['choices'][0]['message']['content']
                        pages_content_translated.append(content)
                        print(f'处理翻译结果 {i+1}/{len(data_list)}')
                    else:
                        print(f'{i+1} 条结果格式异常,跳过')
                        pages_content_translated.append("[翻译失败]")
                except Exception as e:
                    print(f'处理第 {i+1} 条翻译结果时出错: {e}')
                    pages_content_translated.append("[翻译错误]")

            # 保存为TXT
            text = '\n\n'.join(pages_content_translated)
            with open(self.output_txt, 'w', encoding='utf-8') as f:
                f.write(text)
            print(f'翻译结果已保存为TXT: {self.output_txt}')

            # 保存为PDF
            print('开始生成PDF文件...')
            Topdf(self.output_pdf, self.output_txt).to_pdf(pages_content_translated)
            print(f'翻译结果已保存为PDF: {self.output_pdf}')

        except Exception as e:
            print(f'保存翻译结果时出错: {e}')

    def run(self, batch_job_id=None):
        """翻译程序主入口"""
        try:
            if not batch_job_id:
                print("开始新的翻译任务...")
                batch_job_id = self.commit_job()
                if not batch_job_id:
                    print("任务提交失败")
                    return
            else:
                print(f"继续处理现有任务: {batch_job_id}")

            # 等待任务完成并下载结果
            self.wait_batch_job_done(batch_job_id)

            # 保存翻译结果
            self.save_translation_result()

            print("翻译任务完成!")

        except Exception as e:
            print(f'运行翻译程序时出错: {e}')

if __name__ == '__main__':
    try:
        # 批处理模式配置
        source_file = "Linden's Handbook of Batteries.pdf"

        # 现有任务ID(如果要继续之前的任务)
        # batch_job_id = 'batch_686a4580ba9881908df1d275f5bd1c78'
        batch_job_id = None  # 新任务使用None

        # 创建翻译实例并运行
        translator = Translate(source_file)
        translator.run(batch_job_id)

    except Exception as e:
        print(f'程序执行出错: {e}')
Enter fullscreen mode Exit fullscreen mode

Top comments (0)