【openai, aiofiles, backoff】非同期の段階的推論パイプラインを構築する – LLMで合成データを作ろう
近年、LLM(Large Language Model)を用いた合成データ生成が様々な分野で注目を集めています。特に要約や翻訳といった単純タスクから一歩進んで、段階的推論(Chain-of-Thought)を取り入れた合成データを作りたいケースが増えてきました。本記事では、openai, aiofiles, backoff といったライブラリを活用し、Pythonの非同期処理(asyncio
)を組み合わせて段階的推論パイプラインを構築する方法をご紹介します。
目次
プロジェクトの準備
プロジェクト管理にはuvを使います。
uvをインストールしていない方はこちらを参考にしてください。
# プロジェクトを初期化
uv init my-project
cd my-project
uv add openai aiofiles backoff tqdm
- openai: ChatCompletion 用
- aiofiles: 非同期ファイルI/O
- backoff: レートリミットやネットワークエラー時の自動リトライ
- tqdm: 進捗バー表示
vllmでLLMを使えるようにしていることを想定しています。
macの方はこちらを参考にしてください。
vllmがいつの間にかmacに対応してたので試す
最近、LLM(大規模言語モデル)を手軽に扱う方法として「vLLM」というツールが注目を集めています。私は普段、LLMまわりの実験をするときにvLLMを使ってモデルを立てる...
ディレクトリ構成
本記事でのディレクトリ構成はこのようになります。
my-project
├── README.md
├── pyproject.toml
├── uv.lock
├── .venv
└── src
├── main.py
└── data
├── input
│ ├── 0001.txt
│ ├── 0002.txt
│ └── ...
├── step1_output
├── step2_output
└── failed.jsonl
src/main.py
: これから紹介するメインのPythonコードsrc/data/input/
: 推論対象のファイル群(例:0001.txt
,0002.txt
, …)src/data/step1_output/
: ステップ1(推論下書き)の出力ファイルsrc/data/step2_output/
: ステップ2(最終結論)の出力ファイルsrc/data/failed.jsonl
: 処理失敗時のログがJSON形式で追記されるファイル
src/main.py
import asyncio
import aiofiles
import backoff
import textwrap
import json
import os
from openai import AsyncOpenAI
from tqdm import tqdm
# ----------------------------------
# 1) LLMクライアント初期化
# ----------------------------------
client = AsyncOpenAI(
api_key="EMPTY", # ダミー
base_url="http://0.0.0.0:8000/v1/" # 例: Qwen/Qwen2.5-0.5B-Instruct互換
)
# ----------------------------------
# 2) テキストをチャンク分割するユーティリティ
# ----------------------------------
def chunk_text(text: str, chunk_size: int = 2000) -> list[str]:
"""
chunk_size文字単位で分割し、リストとして返す。
"""
return textwrap.wrap(text, width=chunk_size)
# ----------------------------------
# 3) backoffデコレータ (リトライ)
# ----------------------------------
@backoff.on_exception(backoff.expo, Exception, max_tries=3)
async def step1_reasoning(chunk: str) -> str:
"""
ステップ1: 段階的推論の「下書き」(Chain-of-Thought)を生成する。
例: 論理展開や仮説を詳述してもらう
"""
system_prompt = (
"あなたは複雑な問題を段階的に思考するアシスタントです。\n"
"中間思考(Chain-of-Thought)を丁寧に書き出してください。"
)
user_prompt = f"以下の内容について、推論ステップを示してください:\n\n{chunk}"
completion = await client.chat.completions.create(
model="Qwen/Qwen2.5-0.5B-Instruct",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
max_tokens=800,
temperature=0.7,
top_p=0.9
)
return completion.choices[0].message.content.strip()
@backoff.on_exception(backoff.expo, Exception, max_tries=3)
async def step2_finalize(reasoning_text: str) -> str:
"""
ステップ2: ステップ1の推論下書きを踏まえ、最終回答や結論をまとめる。
"""
system_prompt = (
"あなたは段階的な思考から結論を導くアシスタントです。\n"
"下書きを参考にしつつ、最終的な要点を明確に示してください。"
)
user_prompt = (
"以下の推論下書きを元に、結論をまとめてください:\n\n"
+ reasoning_text
)
completion = await client.chat.completions.create(
model="Qwen/Qwen2.5-0.5B-Instruct",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
max_tokens=800,
temperature=0.5,
top_p=0.9
)
return completion.choices[0].message.content.strip()
# ----------------------------------
# 4) ワーカー (ファイル1件に対する処理)
# ----------------------------------
async def worker(
name: str,
queue: asyncio.Queue,
sem: asyncio.Semaphore,
step1_dir: str,
step2_dir: str,
failed_log: str,
pbar_lock: asyncio.Lock,
pbar: tqdm
):
"""
Queueからファイルパスを取り、step1・step2の推論を行う。
- step1_outputに中間思考を書き出し
- step2_outputに最終結果を書き出し
- 失敗時にはfailed_logに追記
- 処理完了ごとにpbarを1つ進める
"""
while True:
filepath = await queue.get()
if filepath is None:
queue.task_done()
break
try:
async with sem:
# ファイル読み込み
async with aiofiles.open(filepath, 'r', encoding='utf-8') as f:
text = await f.read()
# チャンク分割
chunks = chunk_text(text, 2000)
# ステップ1の結果を連結
step1_results = []
for c in chunks:
r = await step1_reasoning(c)
step1_results.append(r)
step1_text = "\n\n---\n\n".join(step1_results)
# ステップ1出力ファイル
fname = os.path.basename(filepath)
step1_path = os.path.join(step1_dir, fname)
async with aiofiles.open(step1_path, 'w', encoding='utf-8') as out1:
await out1.write(step1_text)
# ステップ2の結果を連結
step2_results = []
for s1 in step1_results:
final_r = await step2_finalize(s1)
step2_results.append(final_r)
step2_text = "\n\n===\n\n".join(step2_results)
# ステップ2出力ファイル
step2_path = os.path.join(step2_dir, fname)
async with aiofiles.open(step2_path, 'w', encoding='utf-8') as out2:
await out2.write(step2_text)
except Exception as e:
# 失敗したらログに記録
err_rec = {"file": filepath, "error": str(e)}
async with aiofiles.open(failed_log, 'a', encoding='utf-8') as ff:
await ff.write(json.dumps(err_rec, ensure_ascii=False) + "\n")
finally:
# ファイル1件終了→進捗を1つ進める
async with pbar_lock:
pbar.update(1)
queue.task_done()
# ----------------------------------
# 5) メインのイベントループ
# ----------------------------------
async def main():
"""
src/data/input/*.txt を対象に、段階的推論の2ステップを実行し、
step1_output/ に中間出力、step2_output/ に最終結果を保存。
失敗があれば failed.jsonl に記録。
"""
current_dir = os.path.dirname(__file__)
data_dir = os.path.join(current_dir, "data")
input_dir = os.path.join(data_dir, "input")
step1_dir = os.path.join(data_dir, "step1_output")
step2_dir = os.path.join(data_dir, "step2_output")
failed_log = os.path.join(data_dir, "failed.jsonl")
# 出力ディレクトリを作成
for d in [step1_dir, step2_dir]:
os.makedirs(d, exist_ok=True)
# 過去の失敗ログを削除
if os.path.exists(failed_log):
os.remove(failed_log)
# 入力ファイル収集
files = [
os.path.join(input_dir, f)
for f in os.listdir(input_dir)
if f.endswith(".txt")
]
# Queueに格納
queue = asyncio.Queue()
for fp in files:
await queue.put(fp)
# 同時実行数
sem = asyncio.Semaphore(5)
# 進捗バー
total_files = len(files)
pbar = tqdm(total=total_files, desc="Multi-step Reasoning")
pbar_lock = asyncio.Lock()
# ワーカーを3つ用意
workers = []
for i in range(3):
w = asyncio.create_task(worker(
f"Worker-{i}",
queue,
sem,
step1_dir,
step2_dir,
failed_log,
pbar_lock,
pbar
))
workers.append(w)
await queue.join()
for _ in workers:
await queue.put(None)
await asyncio.gather(*workers)
pbar.close()
print("---")
print("段階的推論のパイプライン処理が完了しました。")
print(f"失敗があれば {failed_log} に記録されています。")
print("---")
if __name__ == "__main__":
asyncio.run(main())
実行方法
uv run python src/main.py
実行すると、進捗バー(tqdm) が表示され、ファイルごとに1カウントずつ進みます。
仕組みとポイント
- 段階的推論
- step1_reasoning: Chain-of-Thought の下書きを出力
- step2_finalize: 下書きを踏まえて最終回答を作成
これにより、「中間推論+最終結果」の2段階の合成データを生成できます。
- チャンク分割 (chunk_text)
- 大きなテキスト(例: 数千文字超)を適度に分割し、トークン超過を避けつつ処理。
- 各チャンクに対して step1, step2 を適用して連結し、ファイル1つ分の結果とする。
- 非同期処理 (asyncio)
asyncio.Queue
とasyncio.Semaphore
による並列化で、I/O待ち(LLM API応答待ち)を有効活用。- ファイル数が増えても(数十〜数百)効率良く処理可能。
- backoff
- ネットワーク障害やレートリミットを想定し、自動リトライ(最大3回)。
- 長時間運転でも安定する。
- tqdm
- 処理完了ごとに
pbar.update(1)
を呼ぶ方式で、進捗が実際の完了数に同期。 - 大量ファイルがある場合でもリアルタイムの進み具合を把握できる。
- 処理完了ごとに
- ステップごとのアウトプット
step1_output
とstep2_output
に分割保存。- 合成データを「中間推論」と「最終回答」に分けて保存しておくと、後から分析・品質チェック・再利用が容易。
応用例
- 要約+抽象化 … Step1で詳細要約、Step2で要点抽出など
- 翻訳+校正 … Step1で機械翻訳、Step2で自然な文体に再整形
- Q&Aペア生成 … Step1で潜在的な質問を列挙、Step2で回答を付与
- 多言語パイプライン … Step1で中間言語に変換、Step2で別言語に編集
いずれも2段階以上の処理を行うことで、LLMの性能を段階的に引き出し、さらに合成データを多層的に残せます。
これらの手法を使えば、LLMを使った合成データ生成だけでなく、要約・翻訳・QAペア生成など様々なタスクを安定的に大量処理できるようになります。ぜひご活用ください。
コメント