Whisper 音视频转写 API 接口文档

api.py

import os
import shutil
import socket
import torch
import whisper
from moviepy.editor import VideoFileClip
import opencc
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Request
from fastapi.responses import JSONResponse
from typing import Optional
from fastapi.staticfiles import StaticFiles

app = FastAPI(
    title="Whisper 音视频转写 API",
    description="基于 OpenAI Whisper 模型的音视频转写服务,支持上传文件或使用服务器上的文件生成字幕。",
    version="1.0.0"
)

# 挂载静态目录,用于提供文件下载
app.mount("/static", StaticFiles(directory="/media/ubuntu/SOFT/whisper_test"), name="static")

# 支持的文件扩展名
ALLOWED_EXTENSIONS = {'mp3', 'wav', 'mp4', 'avi', 'mov'}
UPLOAD_DIR = "/media/ubuntu/SOFT/whisper_test/uploads"

# 检查文件扩展名是否允许
def allowed_file(filename: str):
    return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS

# 格式化时间戳为 SRT 格式
def format_timestamp(seconds: float) -> str:
    milliseconds = int(seconds * 1000)
    hours = milliseconds // (1000 * 60 * 60)
    minutes = (milliseconds // (1000 * 60)) % 60
    seconds = (milliseconds // 1000) % 60
    milliseconds = milliseconds % 1000
    return f"{hours:02}:{minutes:02}:{seconds:02},{milliseconds:03}"

# 生成 SRT 文件内容
def generate_srt(transcription_segments) -> str:
    srt_content = ""
    converter = opencc.OpenCC('t2s')  # 繁体转简体
    for i, segment in enumerate(transcription_segments):
        start_time = format_timestamp(segment['start'])  # 获取开始时间戳
        end_time = format_timestamp(segment['end'])  # 获取结束时间戳
        text = converter.convert(segment['text'].strip())  # 繁体转简体
        srt_content += f"{i+1}\n{start_time} --> {end_time}\n{text}\n\n"
    return srt_content

# 处理音频文件并生成 SRT 文件,返回转录文本
def transcribe_audio_to_srt(audio_path: str, srt_path: str, model_name="tiny"):
    device = "cuda" if torch.cuda.is_available() else "cpu"  # 判断是否使用 GPU
    model = whisper.load_model(model_name).to(device)  # 加载模型
    result = model.transcribe(audio_path, language="zh")  # 转录音频
    print("当前模型:",model_name,"转录内容:",result["text"],"\n")
    srt_content = generate_srt(result['segments'])  # 生成 SRT 文件内容
    with open(srt_path, "w", encoding="utf-8") as srt_file:
        srt_file.write(srt_content)  # 将内容写入 SRT 文件
    return result["text"]  # 返回转录的文本内容

# 从视频中提取音频
def extract_audio_from_video(video_path: str, audio_path: str):
    video_clip = VideoFileClip(video_path)  # 读取视频文件
    audio_clip = video_clip.audio  # 获取音频
    audio_clip.write_audiofile(audio_path, codec='libmp3lame', bitrate="192k")  # 保存为 MP3
    audio_clip.close()  # 关闭音频文件
    video_clip.close()  # 关闭视频文件

# 处理单个音频或视频文件,生成 SRT 文件,并保留相对目录结构
def process_file_with_structure(file_path: str, input_dir: str, output_dir: str, model_name="tiny"):
    # 生成相对路径,保持输入和输出目录结构一致
    rel_path = os.path.relpath(file_path, input_dir)
    output_srt_dir = os.path.join(output_dir, os.path.dirname(rel_path))
    os.makedirs(output_srt_dir, exist_ok=True)  # 创建对应的输出目录

    srt_output_path = os.path.join(output_srt_dir, os.path.splitext(os.path.basename(file_path))[0] + ".srt")  # 生成 SRT 文件路径

    if file_path.lower().endswith((".mp3", ".wav")):  # 如果是音频文件
        text_content = transcribe_audio_to_srt(file_path, srt_output_path, model_name)  # 直接处理音频并返回转录文本
    elif file_path.lower().endswith((".mp4", ".avi", ".mov")):  # 如果是视频文件
        audio_path = os.path.join(output_srt_dir, os.path.splitext(os.path.basename(file_path))[0] + "_audio.mp3")
        extract_audio_from_video(file_path, audio_path)  # 提取音频
        text_content = transcribe_audio_to_srt(audio_path, srt_output_path, model_name)  # 处理提取的音频并返回转录文本
        os.remove(audio_path)  # 删除临时音频文件

    return srt_output_path, text_content  # 返回 SRT 文件路径和转录文本

# 遍历目录并处理所有音视频文件,保持目录结构
def process_directory_with_structure(input_dir: str, output_dir: str, model_name="tiny"):
    srt_files = []
    for root, _, files in os.walk(input_dir):
        for file in files:
            if allowed_file(file):
                file_path = os.path.join(root, file)
                srt_output_path, text_content = process_file_with_structure(file_path, input_dir, output_dir, model_name)
                srt_files.append((srt_output_path, text_content))

    return srt_files

# 获取局域网 IP 地址
def get_local_ip():
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.connect(("8.8.8.8", 80))  # Google Public DNS
    ip = s.getsockname()[0]
    s.close()
    return ip

# 处理服务器上的文件和目录
@app.post("/transcribe_server/", summary="处理服务器上的目录或文件生成字幕文件", description="通过指定服务器的目录或文件路径,生成字幕文件。")
async def transcribe_server(
    request: Request,
    model: Optional[str] = Form("tiny"),
    input: str = Form(..., description="输入的服务器目录或文件路径"),
    output: Optional[str] = Form(None, description="输出目录路径。如果未指定,则默认在输入路径下创建'srt'文件夹。")
):
    """
    处理服务器上的目录或文件,生成字幕文件。
    """
    input_path = input
    output_path = output

    if not os.path.exists(input_path):
        raise HTTPException(status_code=400, detail="输入路径不存在")

    # 如果是目录
    if os.path.isdir(input_path):
        if not output_path:
            output_path = os.path.join(input_path, "srt")  # 默认在输入路径下创建 srt 文件夹
        srt_files = process_directory_with_structure(input_path, output_path, model)
        
        

        # 创建下载链接
        local_ip = get_local_ip()  # 获取局域网 IP 地址
        download_links = [f"http://{local_ip}:5001/static/{os.path.relpath(srt[0], '/media/ubuntu/SOFT/whisper_test')}" for srt in srt_files]
        
        return JSONResponse(content={
            "input": input_path,
            "output": output_path,
            "srt_files": [srt[0] for srt in srt_files],
            "transcripts": [srt[1] for srt in srt_files],
            "download_links": download_links
        })

    # 如果是文件
    elif os.path.isfile(input_path):
        if not output_path:
            output_path = os.path.join(os.path.dirname(input_path), "srt")  # 默认在输入文件所在目录下创建 srt 文件夹
        srt_file, text_content = process_file_with_structure(input_path, os.path.dirname(input_path), output_path, model)
        
        # 创建下载链接
        local_ip = get_local_ip()  # 获取局域网 IP 地址
        srt_download_link = f"http://{local_ip}:5001/static/{os.path.relpath(srt_file, '/media/ubuntu/SOFT/whisper_test')}"
        
        return JSONResponse(content={
            "input": input_path,
            "output": output_path,
            "srt_file": srt_file,
            "content": text_content,
            "download_link": srt_download_link
        })

    else:
        raise HTTPException(status_code=400, detail="输入路径无效:不是有效的文件或目录")

# 处理客户端上传的文件,生成 SRT 文件并返回下载链接和文本内容
@app.post("/transcribe_client/", summary="处理客户端上传的文件生成字幕文件", description="上传客户端的文件,生成 SRT 文件,并返回下载链接和转录内容。")
async def transcribe_client(
    request: Request,
    model: Optional[str] = Form("tiny"),
    input_file: UploadFile = File(..., description="客户端上传的文件")
):
    """
    处理客户端上传的文件,生成字幕文件,并返回生成的 SRT 文件路径和转录文本。
    """
    if not os.path.exists(UPLOAD_DIR):
        os.makedirs(UPLOAD_DIR)  # 确保临时目录存在

    # 将上传的文件保存到服务器的临时目录
    file_location = os.path.join(UPLOAD_DIR, input_file.filename)
    with open(file_location, "wb") as f:
        shutil.copyfileobj(input_file.file, f)

    input_path = file_location  # 使用上传的文件路径作为输入路径

    if os.path.isfile(input_path):
        output_path = os.path.join(UPLOAD_DIR, "srt")
        srt_file, text_content = process_file_with_structure(input_path, UPLOAD_DIR, output_path, model)
        print("srt_file:",srt_file)
        # 返回下载链接和转录文本
        local_ip = get_local_ip()  # 获取局域网 IP 地址
        
        srt_download_link = f"http://{local_ip}:5001/static/{os.path.relpath(srt_file, '/media/ubuntu/SOFT/whisper_test')}"
        print("srt_download_link",srt_download_link)
        print("",os.path.relpath(srt_file, '/media/ubuntu/SOFT/whisper_test/srt'))
        return JSONResponse(content={
            "input": input_path,
            "output": output_path,
            "srt_file": srt_file,
            "content": text_content,  # 返回转录的文本内容
            "download_link": srt_download_link  # 返回生成 SRT 文件的下载链接
        })

    raise HTTPException(status_code=400, detail="上传的文件无效,必须是音频或视频文件。")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=5001)

项目简介

基于 OpenAI Whisper 模型的音视频转写服务,支持上传文件或使用服务器上的文件生成字幕。该 API 提供了处理音频和视频文件的能力,并将其转录为 SRT 字幕文件。

运行环境

  • Python 3.x
  • FastAPI
  • torch
  • whisper
  • moviepy
  • opencc

安装依赖

在运行该项目之前,请确保安装以下依赖:

pip install fastapi[all] torch moviepy opencc-python-reimplemented

启动服务器

在项目根目录下运行以下命令启动 FastAPI 服务器:

uvicorn main:app --host 0.0.0.0 --port 5001 --reload

接口列表

1. /transcribe_server/

描述:处理服务器上的目录或文件,生成字幕文件。

请求方法POST

请求参数

参数类型必填描述
modelstring使用的 Whisper 模型,默认为 tiny
inputstring输入的服务器目录或文件路径
outputstring输出目录路径,默认为输入路径下创建 srt 文件夹

返回示例

{
    "input": "/path/to/server/directory",
    "output": "/path/to/server/directory/srt",
    "srt_files": ["/path/to/server/directory/srt/file1.srt"],
    "transcripts": ["转录内容"],
    "download_links": ["http://192.168.1.1:5001/static/file1.srt"]
}

2. /transcribe_client/

描述:处理客户端上传的文件,生成字幕文件。

请求方法POST

请求参数

参数类型必填描述
modelstring使用的 Whisper 模型,默认为 tiny
input_fileUploadFile客户端上传的音频或视频文件

返回示例

{
    "input": "/media/ubuntu/SOFT/whisper_test/uploads/example.wav",
    "output": "/media/ubuntu/SOFT/whisper_test/uploads/srt",
    "srt_file": "/media/ubuntu/SOFT/whisper_test/uploads/srt/example.srt",
    "content": "转录后的文本内容",
    "download_link": "http://192.168.1.1:5001/static/example.srt"
}

接口调用示例

使用 Python 调用接口

import requests

# 调用 /transcribe_server 接口
response = requests.post("http://192.168.1.1:5001/transcribe_server/", data={
    "model": "tiny",
    "input": "/path/to/server/directory"
})

print(response.json())

# 调用 /transcribe_client 接口
files = {'input_file': open('C:/path/to/your/example.wav', 'rb')}
response = requests.post("http://192.168.1.1:5001/transcribe_client/", files=files, data={"model": "tiny"})

print(response.json())

使用 cURL 测试接口

调用 /transcribe_server/
curl -X POST "http://192.168.1.1:5001/transcribe_server/" \
    -H "Content-Type: application/x-www-form-urlencoded" \
    -d "model=tiny&input=/path/to/server/directory"
调用 /transcribe_client/
curl -X POST "http://192.168.1.1:5001/transcribe_client/" \
    -F "model=tiny" \
    -F "input_file=@C:/path/to/your/example.wav"

使用 Postman 测试接口

  1. 打开 Postman,创建一个新的请求。
  2. 设置请求方法为 POST
  3. 输入请求 URL,例如 http://192.168.1.1:5001/transcribe_server/http://192.168.1.1:5001/transcribe_client/
  4. Body 选项中,选择 form-data
    • 对于 /transcribe_server/
      • 添加字段 model(可选),值为 tiny
      • 添加字段 input(必填),值为服务器上的目录路径。
    • 对于 /transcribe_client/
      • 添加字段 model(可选),值为 tiny
      • 添加字段 input_file(必填),值为上传的音频或视频文件。
  5. 点击 Send 发送请求,查看返回结果。

注意事项

  • 确保输入的目录或文件路径正确。
  • 上传的文件类型必须为支持的音频或视频格式(mp3, wav, mp4, avi, mov)。
  • 下载链接将在响应中返回,确保使用正确的局域网 IP 地址进行访问。

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部