「Python実践100本ノックシリーズ」を読んで|どんな人におすすめなのか

【openai, aiofiles, backoff】非同期の段階的推論パイプラインを構築する – LLMで合成データを作ろう

近年、LLM(Large Language Model)を用いた合成データ生成が様々な分野で注目を集めています。特に要約翻訳といった単純タスクから一歩進んで、段階的推論(Chain-of-Thought)を取り入れた合成データを作りたいケースが増えてきました。本記事では、openai, aiofiles, backoff といったライブラリを活用し、Pythonの非同期処理asyncio)を組み合わせて段階的推論パイプラインを構築する方法をご紹介します。

目次

プロジェクトの準備

プロジェクト管理にはuvを使います。

uvをインストールしていない方はこちらを参考にしてください。

あわせて読みたい
【Python】uvの使い方を忘れたときに見るための早見表 以下は「uvの使い方をサクッと思い出すため」の早見表です。コマンド例も載せてあるので、コピペして使ってみてください。 インストール関連 インストール # macOS/Linu...
# プロジェクトを初期化
uv init my-project
cd my-project

uv add openai aiofiles backoff tqdm
  • openai: ChatCompletion 用
  • aiofiles: 非同期ファイルI/O
  • backoff: レートリミットやネットワークエラー時の自動リトライ
  • tqdm: 進捗バー表示

vllmでLLMを使えるようにしていることを想定しています。

macの方はこちらを参考にしてください。

あわせて読みたい
vllmがいつの間にかmacに対応してたので試す 最近、LLM(大規模言語モデル)を手軽に扱う方法として「vLLM」というツールが注目を集めています。私は普段、LLMまわりの実験をするときにvLLMを使ってモデルを立てる...

ディレクトリ構成

本記事でのディレクトリ構成はこのようになります。

my-project
├── README.md
├── pyproject.toml
├── uv.lock
├── .venv
└── src
    ├── main.py
    └── data
        ├── input
        │   ├── 0001.txt
        │   ├── 0002.txt
        │   └── ...
        ├── step1_output
        ├── step2_output
        └── failed.jsonl
  • src/main.py: これから紹介するメインのPythonコード
  • src/data/input/: 推論対象のファイル群(例: 0001.txt, 0002.txt, …)
  • src/data/step1_output/: ステップ1(推論下書き)の出力ファイル
  • src/data/step2_output/: ステップ2(最終結論)の出力ファイル
  • src/data/failed.jsonl: 処理失敗時のログがJSON形式で追記されるファイル

src/main.py

import asyncio
import aiofiles
import backoff
import textwrap
import json
import os

from openai import AsyncOpenAI
from tqdm import tqdm

# ----------------------------------
# 1) LLMクライアント初期化
# ----------------------------------
client = AsyncOpenAI(
    api_key="EMPTY",  # ダミー
    base_url="http://0.0.0.0:8000/v1/"  # 例: Qwen/Qwen2.5-0.5B-Instruct互換
)

# ----------------------------------
# 2) テキストをチャンク分割するユーティリティ
# ----------------------------------
def chunk_text(text: str, chunk_size: int = 2000) -> list[str]:
    """
    chunk_size文字単位で分割し、リストとして返す。
    """
    return textwrap.wrap(text, width=chunk_size)

# ----------------------------------
# 3) backoffデコレータ (リトライ)
# ----------------------------------
@backoff.on_exception(backoff.expo, Exception, max_tries=3)
async def step1_reasoning(chunk: str) -> str:
    """
    ステップ1: 段階的推論の「下書き」(Chain-of-Thought)を生成する。
    例: 論理展開や仮説を詳述してもらう
    """
    system_prompt = (
        "あなたは複雑な問題を段階的に思考するアシスタントです。\n"
        "中間思考(Chain-of-Thought)を丁寧に書き出してください。"
    )
    user_prompt = f"以下の内容について、推論ステップを示してください:\n\n{chunk}"

    completion = await client.chat.completions.create(
        model="Qwen/Qwen2.5-0.5B-Instruct",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
        max_tokens=800,
        temperature=0.7,
        top_p=0.9
    )
    return completion.choices[0].message.content.strip()

@backoff.on_exception(backoff.expo, Exception, max_tries=3)
async def step2_finalize(reasoning_text: str) -> str:
    """
    ステップ2: ステップ1の推論下書きを踏まえ、最終回答や結論をまとめる。
    """
    system_prompt = (
        "あなたは段階的な思考から結論を導くアシスタントです。\n"
        "下書きを参考にしつつ、最終的な要点を明確に示してください。"
    )
    user_prompt = (
        "以下の推論下書きを元に、結論をまとめてください:\n\n"
        + reasoning_text
    )

    completion = await client.chat.completions.create(
        model="Qwen/Qwen2.5-0.5B-Instruct",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
        max_tokens=800,
        temperature=0.5,
        top_p=0.9
    )
    return completion.choices[0].message.content.strip()

# ----------------------------------
# 4) ワーカー (ファイル1件に対する処理)
# ----------------------------------
async def worker(
    name: str,
    queue: asyncio.Queue,
    sem: asyncio.Semaphore,
    step1_dir: str,
    step2_dir: str,
    failed_log: str,
    pbar_lock: asyncio.Lock,
    pbar: tqdm
):
    """
    Queueからファイルパスを取り、step1・step2の推論を行う。
    - step1_outputに中間思考を書き出し
    - step2_outputに最終結果を書き出し
    - 失敗時にはfailed_logに追記
    - 処理完了ごとにpbarを1つ進める
    """
    while True:
        filepath = await queue.get()
        if filepath is None:
            queue.task_done()
            break

        try:
            async with sem:
                # ファイル読み込み
                async with aiofiles.open(filepath, 'r', encoding='utf-8') as f:
                    text = await f.read()

                # チャンク分割
                chunks = chunk_text(text, 2000)

                # ステップ1の結果を連結
                step1_results = []
                for c in chunks:
                    r = await step1_reasoning(c)
                    step1_results.append(r)
                step1_text = "\n\n---\n\n".join(step1_results)

                # ステップ1出力ファイル
                fname = os.path.basename(filepath)
                step1_path = os.path.join(step1_dir, fname)
                async with aiofiles.open(step1_path, 'w', encoding='utf-8') as out1:
                    await out1.write(step1_text)

                # ステップ2の結果を連結
                step2_results = []
                for s1 in step1_results:
                    final_r = await step2_finalize(s1)
                    step2_results.append(final_r)
                step2_text = "\n\n===\n\n".join(step2_results)

                # ステップ2出力ファイル
                step2_path = os.path.join(step2_dir, fname)
                async with aiofiles.open(step2_path, 'w', encoding='utf-8') as out2:
                    await out2.write(step2_text)

        except Exception as e:
            # 失敗したらログに記録
            err_rec = {"file": filepath, "error": str(e)}
            async with aiofiles.open(failed_log, 'a', encoding='utf-8') as ff:
                await ff.write(json.dumps(err_rec, ensure_ascii=False) + "\n")

        finally:
            # ファイル1件終了→進捗を1つ進める
            async with pbar_lock:
                pbar.update(1)
            queue.task_done()


# ----------------------------------
# 5) メインのイベントループ
# ----------------------------------
async def main():
    """
    src/data/input/*.txt を対象に、段階的推論の2ステップを実行し、
    step1_output/ に中間出力、step2_output/ に最終結果を保存。
    失敗があれば failed.jsonl に記録。
    """
    current_dir = os.path.dirname(__file__)
    data_dir = os.path.join(current_dir, "data")

    input_dir = os.path.join(data_dir, "input")
    step1_dir = os.path.join(data_dir, "step1_output")
    step2_dir = os.path.join(data_dir, "step2_output")
    failed_log = os.path.join(data_dir, "failed.jsonl")

    # 出力ディレクトリを作成
    for d in [step1_dir, step2_dir]:
        os.makedirs(d, exist_ok=True)

    # 過去の失敗ログを削除
    if os.path.exists(failed_log):
        os.remove(failed_log)

    # 入力ファイル収集
    files = [
        os.path.join(input_dir, f)
        for f in os.listdir(input_dir)
        if f.endswith(".txt")
    ]

    # Queueに格納
    queue = asyncio.Queue()
    for fp in files:
        await queue.put(fp)

    # 同時実行数
    sem = asyncio.Semaphore(5)

    # 進捗バー
    total_files = len(files)
    pbar = tqdm(total=total_files, desc="Multi-step Reasoning")
    pbar_lock = asyncio.Lock()

    # ワーカーを3つ用意
    workers = []
    for i in range(3):
        w = asyncio.create_task(worker(
            f"Worker-{i}",
            queue,
            sem,
            step1_dir,
            step2_dir,
            failed_log,
            pbar_lock,
            pbar
        ))
        workers.append(w)

    await queue.join()
    for _ in workers:
        await queue.put(None)
    await asyncio.gather(*workers)
    pbar.close()

    print("---")
    print("段階的推論のパイプライン処理が完了しました。")
    print(f"失敗があれば {failed_log} に記録されています。")
    print("---")

if __name__ == "__main__":
    asyncio.run(main())

実行方法

uv run python src/main.py

実行すると、進捗バー(tqdm) が表示され、ファイルごとに1カウントずつ進みます。

仕組みとポイント

  • 段階的推論
    • step1_reasoning: Chain-of-Thought の下書きを出力
    • step2_finalize: 下書きを踏まえて最終回答を作成
      これにより、「中間推論+最終結果」の2段階の合成データを生成できます。
  • チャンク分割 (chunk_text)
    • 大きなテキスト(例: 数千文字超)を適度に分割し、トークン超過を避けつつ処理。
    • 各チャンクに対して step1, step2 を適用して連結し、ファイル1つ分の結果とする。
  • 非同期処理 (asyncio)
    • asyncio.Queueasyncio.Semaphore による並列化で、I/O待ち(LLM API応答待ち)を有効活用。
    • ファイル数が増えても(数十〜数百)効率良く処理可能。
  • backoff
    • ネットワーク障害やレートリミットを想定し、自動リトライ(最大3回)。
    • 長時間運転でも安定する。
  • tqdm
    • 処理完了ごとに pbar.update(1) を呼ぶ方式で、進捗が実際の完了数に同期。
    • 大量ファイルがある場合でもリアルタイムの進み具合を把握できる。
  • ステップごとのアウトプット
    • step1_outputstep2_output に分割保存。
    • 合成データを「中間推論」と「最終回答」に分けて保存しておくと、後から分析・品質チェック・再利用が容易。

応用例

  • 要約+抽象化 … Step1で詳細要約、Step2で要点抽出など
  • 翻訳+校正 … Step1で機械翻訳、Step2で自然な文体に再整形
  • Q&Aペア生成 … Step1で潜在的な質問を列挙、Step2で回答を付与
  • 多言語パイプライン … Step1で中間言語に変換、Step2で別言語に編集

いずれも2段階以上の処理を行うことで、LLMの性能を段階的に引き出し、さらに合成データを多層的に残せます。

これらの手法を使えば、LLMを使った合成データ生成だけでなく、要約・翻訳・QAペア生成など様々なタスクを安定的に大量処理できるようになります。ぜひご活用ください。

おまけ

"""
LLM評価用汎用スクリプト

このスクリプトは、CSVファイルに記載されたプロンプトを使用してLLMを呼び出し、
その応答を収集・分析するための汎用的なツールです。

主な機能:
1. CSVファイルからsystem_promptとuser_promptを読み込む
2. 各行に対して非同期でLLMを呼び出す(並列処理により高速化)
3. レスポンスから特定のパターンで値を抽出(正規表現で設定可能)
4. 結果をJSON形式で保存(元データ、レスポンス、抽出値、エラー情報を含む)
5. 進捗表示と中間保存により、長時間の実行でも安心
6. エラーハンドリングとリトライ機能

必要な入力:
- CSVファイル(必須列: system_prompt, user_prompt)

出力ファイル:
- results_final.json: 全結果データ
- results_intermediate.json: 中間保存用
- summary.json: 実行サマリー(成功率、抽出率などの統計情報)

使用方法:
1. 環境変数 VLLM_API_KEY を設定
2. CSVファイルを data.csv として配置(または DATA_PATH を変更)
3. 必要に応じて設定値(MODEL_NAME、EXTRACTION_PATTERNS等)を調整
4. python script.py で実行
"""

import os
import json
import asyncio
import backoff
import pandas as pd
from tqdm import tqdm
from openai import AsyncOpenAI
import aiofiles
from typing import Optional
import re

# ── 設定 ─────────────────────────────────────────────────────────
API_KEY = os.getenv("VLLM_API_KEY", "your-api-key-here")
BASE_URL = "https://api.openai.com/v1"  # デフォルトのOpenAI API
MODEL_NAME = "gpt-4"
MAX_TOKENS = 4096
TEMPERATURE = 0.7
TOP_P = 1.0
MAX_CONCURRENT = 100  # 同時実行数の上限

# 入力CSVパス(system_prompt, user_prompt列を含む)
DATA_PATH = "data.csv"
# 出力ディレクトリ
OUTPUT_DIR = "outputs"

# ── 正規表現による抽出設定 ─────────────────────────────────────────
# 抽出を行うかどうか
EXTRACT_FROM_RESPONSE = True

# 抽出パターンの正規表現リスト(グループ1が抽出される値)
# 上から順に試され、最初にマッチしたものが使用される
EXTRACTION_PATTERNS = [
    r"<answer>(.+?)</answer>",
    r"<r>(.+?)</r>",
    r"<o>(.+?)</o>",
    r"答え[::]\s*(.+?)(?:\n|$)",
]

# 単一パターンのみ使用したい場合の例
# EXTRACTION_PATTERNS = [r"<answer>(.+?)</answer>"]

# ── OpenAI クライアント & セマフォ ─────────────────────────────────────────
client = AsyncOpenAI(api_key=API_KEY, base_url=BASE_URL)
semaphore = asyncio.Semaphore(MAX_CONCURRENT)

@backoff.on_exception(backoff.expo, Exception, max_tries=5)
async def fetch_completion(system_prompt: str, user_prompt: str) -> str:
    """Semaphore で同時呼び出しを制御した上で LLM を叩く"""
    async with semaphore:
        resp = await client.chat.completions.create(
            model=MODEL_NAME,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt},
            ],
            max_tokens=MAX_TOKENS,
            temperature=TEMPERATURE,
            top_p=TOP_P,
        )
        return resp.choices[0].message.content.strip()

def extract_value_from_response(response: str) -> Optional[str]:
    """レスポンスから設定されたパターンで値を抽出"""
    if not EXTRACT_FROM_RESPONSE:
        return None
    
    if response is None:
        return None
    
    # 複数パターンを順に試す
    for pattern in EXTRACTION_PATTERNS:
        match = re.search(pattern, response)
        if match:
            return match.group(1)
    
    return None

async def save_results(results: list, filename: str):
    """結果を非同期に保存"""
    path = os.path.join(OUTPUT_DIR, filename)
    async with aiofiles.open(path, "w", encoding="utf-8") as f:
        await f.write(json.dumps(results, ensure_ascii=False, indent=2))

async def main():
    # ── データ読み込み ────────────────────────────────
    df = pd.read_csv(DATA_PATH)
    
    # 必須列のチェック
    required_cols = ["system_prompt", "user_prompt"]
    missing_cols = [col for col in required_cols if col not in df.columns]
    if missing_cols:
        raise ValueError(f"必須列が不足しています: {missing_cols}")
    
    # NaN を空文字に置換
    df = df.fillna("")
    
    # 出力ディレクトリ作成
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    
    # ── タスクの準備 ────────────────────────────────
    tasks = []
    
    print(f"データ件数: {len(df)}")
    
    # 各行について1つのタスクを作成
    for idx, row in df.iterrows():
        task = asyncio.create_task(
            fetch_completion(row["system_prompt"], row["user_prompt"])
        )
        tasks.append(task)
    
    # ── 結果の初期化 ────────────────────────────────
    results = []
    for idx, row in df.iterrows():
        result_row = row.to_dict()
        result_row["response"] = None  # 応答を格納
        result_row["extracted_value"] = None  # 抽出値を格納
        result_row["error"] = None     # エラーを格納
        results.append(result_row)
    
    # ── タスク実行と進捗表示 ────────────────────────────────
    completed_count = 0
    save_interval = max(1, len(tasks) // 10)  # 10%ごとに保存
    
    for idx, task in enumerate(tqdm(tasks, desc="LLM呼び出し進捗")):
        try:
            response = await task
            results[idx]["response"] = response
            
            # 設定に応じて値を抽出
            if EXTRACT_FROM_RESPONSE:
                extracted = extract_value_from_response(response)
                results[idx]["extracted_value"] = extracted
                
        except Exception as e:
            error_msg = f"ERROR: {type(e).__name__}: {str(e)}"
            results[idx]["error"] = error_msg
            print(f"\nエラー発生 (行{idx}): {error_msg}")
        
        completed_count += 1
        
        # 定期的に中間結果を保存
        if completed_count % save_interval == 0 or completed_count == len(tasks):
            await save_results(results, "results_intermediate.json")
    
    # ── 最終結果の保存 ────────────────────────────────
    await save_results(results, "results_final.json")
    print(f"\n完了!結果を {OUTPUT_DIR}/results_final.json に保存しました。")
    
    # ── サマリー情報の作成と保存 ────────────────────────────────
    total_tasks = len(tasks)
    successful_tasks = sum(1 for result in results if result["response"] is not None)
    failed_tasks = total_tasks - successful_tasks
    
    # 抽出成功数をカウント
    extracted_count = 0
    if EXTRACT_FROM_RESPONSE:
        extracted_count = sum(1 for result in results if result["extracted_value"] is not None)
    
    summary = {
        "total_rows": len(df),
        "total_tasks": total_tasks,
        "successful_tasks": successful_tasks,
        "failed_tasks": failed_tasks,
        "success_rate": successful_tasks / total_tasks if total_tasks > 0 else 0,
        "extraction_enabled": EXTRACT_FROM_RESPONSE,
        "extraction_patterns": EXTRACTION_PATTERNS if EXTRACT_FROM_RESPONSE else None,
        "extracted_count": extracted_count,
        "extraction_rate": extracted_count / successful_tasks if successful_tasks > 0 else 0,
        "model": MODEL_NAME,
        "temperature": TEMPERATURE,
        "max_tokens": MAX_TOKENS,
    }
    
    await save_results(summary, "summary.json")
    print(f"\nサマリー情報を {OUTPUT_DIR}/summary.json に保存しました。")
    print(f"成功率: {summary['success_rate']:.2%}")
    if EXTRACT_FROM_RESPONSE:
        print(f"抽出率: {summary['extraction_rate']:.2%} (成功したレスポンスのうち)")
        print(f"抽出パターン数: {len(EXTRACTION_PATTERNS)}")

if __name__ == "__main__":
    asyncio.run(main())

この記事が気に入ったら
フォローしてね!

よかったらシェアしてね!
  • URLをコピーしました!

コメント

コメントする

目次