ブログ一覧へ
Laravel開発

Laravel + Python Workerでファイル処理を非同期化する方法

重いCSV処理をRedisキューとPython Workerで非同期化する実装パターン。Laravel Horizon・ECS Taskによる構成。

2026-05-10Laravel開発

なぜ非同期処理が必要か

CSVファイルの処理は、行数によっては数十秒〜数分かかることがあります。 PHPのHTTPリクエスト内でこれを実行すると:

  • タイムアウト(30秒制限など)
  • ユーザーがずっと待たされる
  • サーバーリソースを占有する

これを解決するのが非同期処理です。

アーキテクチャ全体像

[ブラウザ]
    ↓ ファイルアップロード
[Laravel API]
    ↓ S3にファイル保存 + Redisにジョブ投入
[Redis Queue]
    ↓ ジョブを取り出す
[Python Worker]
    ↓ S3からファイル取得 → pandas処理 → 結果をS3に保存
[ブラウザ]
    ↓ ポーリング or WebSocket でステータス確認
    完了通知 → 結果ダウンロード

Laravel側の実装

ジョブクラスの定義

// app/Jobs/ProcessCsvJob.php
class ProcessCsvJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public function __construct(
        private string $taskId,
        private string $s3Key,
        private string $instruction,
    ) {}

    public function handle(): void
    {
        // Redisにジョブ情報を格納
        // Python Workerが監視しているキューに積む
        Redis::lpush('python_jobs', json_encode([
            'task_id'     => $this->taskId,
            's3_key'      => $this->s3Key,
            'instruction' => $this->instruction,
        ]));
    }
}

コントローラーでジョブを投入

// app/Http/Controllers/ProcessController.php
public function upload(Request $request): JsonResponse
{
    $file = $request->file('csv');
    $taskId = Str::uuid()->toString();
    
    // S3にアップロード
    $s3Key = "uploads/{$taskId}/{$file->getClientOriginalName()}";
    Storage::disk('s3')->put($s3Key, $file->get());
    
    // タスクをDBに記録
    Task::create([
        'id'          => $taskId,
        'status'      => 'pending',
        'instruction' => $request->input('instruction'),
    ]);
    
    // ジョブをキューに投入
    ProcessCsvJob::dispatch($taskId, $s3Key, $request->input('instruction'));
    
    return response()->json(['task_id' => $taskId]);
}

Python Worker側の実装

# worker.py
import redis
import json
import boto3
import pandas as pd
from io import StringIO

r = redis.Redis(host='redis', port=6379)
s3 = boto3.client('s3')

def process_job(job: dict):
    task_id = job['task_id']
    s3_key = job['s3_key']
    
    # S3からCSVを取得
    obj = s3.get_object(Bucket='my-bucket', Key=s3_key)
    df = pd.read_csv(obj['Body'])
    
    # 処理(AI計画に基づいて実行)
    result_df = execute_plan(df, job['instruction'])
    
    # 結果をS3に保存
    output_key = f"results/{task_id}/output.xlsx"
    buffer = BytesIO()
    result_df.to_excel(buffer, index=False)
    s3.put_object(Bucket='my-bucket', Key=output_key, Body=buffer.getvalue())
    
    # 完了をLaravelに通知(DBステータス更新)
    update_task_status(task_id, 'completed', output_key)

# メインループ
while True:
    _, job_data = r.brpop('python_jobs', timeout=10)
    if job_data:
        job = json.loads(job_data)
        process_job(job)

AWS ECSでのデプロイ

# docker-compose.yml(本番はECS Task Definitionに変換)
services:
  worker:
    build: ./python-worker
    environment:
      - REDIS_URL=redis://redis:6379
      - AWS_DEFAULT_REGION=ap-northeast-1
    depends_on:
      - redis
  
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

ステータス確認API

// ポーリング用エンドポイント
public function status(string $taskId): JsonResponse
{
    $task = Task::findOrFail($taskId);
    
    return response()->json([
        'status'      => $task->status,         // pending / processing / completed / failed
        'download_url' => $task->status === 'completed'
            ? Storage::disk('s3')->temporaryUrl($task->result_key, now()->addHours(1))
            : null,
    ]);
}

まとめ

  • Laravel → Redis → Python の流れで重い処理を分離できる
  • Python Workerはスケールアウトが容易(ECS Task数を増やすだけ)
  • S3の署名付きURLでセキュアなファイルダウンロードを実現

この構成が、SOFTBASEの「AI Excel/CSV自動処理」の基盤になっています。

ご意見・お問い合わせ

記事の内容についてご質問・ご意見はお気軽に。