Skip to content

vllm.entrypoints.openai.run_batch

BatchFrontendArgs

Bases: BaseFrontendArgs

Arguments for the batch runner frontend.

Source code in vllm/entrypoints/openai/run_batch.py
@config
class BatchFrontendArgs(BaseFrontendArgs):
    """Arguments for the batch runner frontend."""

    input_file: str | None = None
    """The path or url to a single input file. Currently supports local file
    paths, or the http protocol (http or https). If a URL is specified,
    the file should be available via HTTP GET."""
    output_file: str | None = None
    """The path or url to a single output file. Currently supports
    local file paths, or web (http or https) urls. If a URL is specified,
    the file should be available via HTTP PUT."""
    output_tmp_dir: str | None = None
    """The directory to store the output file before uploading it
    to the output URL."""
    enable_metrics: bool = False
    """Enable Prometheus metrics"""
    host: str | None = None
    """Host name for the Prometheus metrics server
    (only needed if enable-metrics is set)."""
    port: int = 8000
    """Port number for the Prometheus metrics server
    (only needed if enable-metrics is set)."""
    url: str = "0.0.0.0"
    """[DEPRECATED] Host name for the Prometheus metrics server
    (only needed if enable-metrics is set). Use --host instead."""

    @classmethod
    def _customize_cli_kwargs(
        cls,
        frontend_kwargs: dict[str, Any],
    ) -> dict[str, Any]:
        frontend_kwargs = super()._customize_cli_kwargs(frontend_kwargs)

        frontend_kwargs["input_file"]["flags"] = ["-i"]
        frontend_kwargs["input_file"]["required"] = True
        frontend_kwargs["output_file"]["flags"] = ["-o"]
        frontend_kwargs["output_file"]["required"] = True

        frontend_kwargs["enable_metrics"]["action"] = "store_true"

        frontend_kwargs["url"]["deprecated"] = True
        return frontend_kwargs

enable_metrics class-attribute instance-attribute

enable_metrics: bool = False

Enable Prometheus metrics

host class-attribute instance-attribute

host: str | None = None

Host name for the Prometheus metrics server (only needed if enable-metrics is set).

input_file class-attribute instance-attribute

input_file: str | None = None

The path or url to a single input file. Currently supports local file paths, or the http protocol (http or https). If a URL is specified, the file should be available via HTTP GET.

output_file class-attribute instance-attribute

output_file: str | None = None

The path or url to a single output file. Currently supports local file paths, or web (http or https) urls. If a URL is specified, the file should be available via HTTP PUT.

output_tmp_dir class-attribute instance-attribute

output_tmp_dir: str | None = None

The directory to store the output file before uploading it to the output URL.

port class-attribute instance-attribute

port: int = 8000

Port number for the Prometheus metrics server (only needed if enable-metrics is set).

url class-attribute instance-attribute

url: str = '0.0.0.0'

[DEPRECATED] Host name for the Prometheus metrics server (only needed if enable-metrics is set). Use --host instead.

BatchRequestInput

Bases: OpenAIBaseModel

The per-line object of the batch input file.

NOTE: Currently only the /v1/chat/completions endpoint is supported.

Source code in vllm/entrypoints/openai/run_batch.py
class BatchRequestInput(OpenAIBaseModel):
    """
    The per-line object of the batch input file.

    NOTE: Currently only the `/v1/chat/completions` endpoint is supported.
    """

    # A developer-provided per-request id that will be used to match outputs to
    # inputs. Must be unique for each request in a batch.
    custom_id: str

    # The HTTP method to be used for the request. Currently only POST is
    # supported.
    method: str

    # The OpenAI API relative URL to be used for the request. Currently
    # /v1/chat/completions is supported.
    url: str

    # The parameters of the request.
    body: BatchRequestInputBody

    @field_validator("body", mode="plain")
    @classmethod
    def check_type_for_url(cls, value: Any, info: ValidationInfo):
        # Use url to disambiguate models
        url: str = info.data["url"]
        if url == "/v1/chat/completions":
            return ChatCompletionRequest.model_validate(value)
        if url == "/v1/embeddings":
            return TypeAdapter(EmbeddingRequest).validate_python(value)
        if url.endswith("/score"):
            return TypeAdapter(ScoreRequest).validate_python(value)
        if url.endswith("/rerank"):
            return RerankRequest.model_validate(value)
        if url == "/v1/audio/transcriptions":
            return BatchTranscriptionRequest.model_validate(value)
        if url == "/v1/audio/translations":
            return BatchTranslationRequest.model_validate(value)
        return TypeAdapter(BatchRequestInputBody).validate_python(value)

BatchRequestOutput

Bases: OpenAIBaseModel

The per-line object of the batch output and error files

Source code in vllm/entrypoints/openai/run_batch.py
class BatchRequestOutput(OpenAIBaseModel):
    """
    The per-line object of the batch output and error files
    """

    id: str

    # A developer-provided per-request id that will be used to match outputs to
    # inputs.
    custom_id: str

    response: BatchResponseData | None

    # For requests that failed with a non-HTTP error, this will contain more
    # information on the cause of the failure.
    error: Any | None

BatchTranscriptionRequest

Bases: TranscriptionRequest

Batch transcription request that uses file_url instead of file.

This class extends TranscriptionRequest but replaces the file field with file_url to support batch processing from audio files written in JSON format.

Source code in vllm/entrypoints/openai/run_batch.py
class BatchTranscriptionRequest(TranscriptionRequest):
    """
    Batch transcription request that uses file_url instead of file.

    This class extends TranscriptionRequest but replaces the file field
    with file_url to support batch processing from audio files written in JSON format.
    """

    file_url: str = Field(
        ...,
        description=(
            "Either a URL of the audio or a data URL with base64 encoded audio data. "
        ),
    )

    # Override file to be optional and unused for batch processing
    file: UploadFile | None = Field(default=None, exclude=True)  # type: ignore[assignment]

    @model_validator(mode="before")
    @classmethod
    def validate_no_file(cls, data: Any):
        """Ensure file field is not provided in batch requests."""
        if isinstance(data, dict) and "file" in data:
            raise ValueError(
                "The 'file' field is not supported in batch requests. "
                "Use 'file_url' instead."
            )
        return data

validate_no_file classmethod

validate_no_file(data: Any)

Ensure file field is not provided in batch requests.

Source code in vllm/entrypoints/openai/run_batch.py
@model_validator(mode="before")
@classmethod
def validate_no_file(cls, data: Any):
    """Ensure file field is not provided in batch requests."""
    if isinstance(data, dict) and "file" in data:
        raise ValueError(
            "The 'file' field is not supported in batch requests. "
            "Use 'file_url' instead."
        )
    return data

BatchTranslationRequest

Bases: TranslationRequest

Batch translation request that uses file_url instead of file.

This class extends TranslationRequest but replaces the file field with file_url to support batch processing from audio files written in JSON format.

Source code in vllm/entrypoints/openai/run_batch.py
class BatchTranslationRequest(TranslationRequest):
    """
    Batch translation request that uses file_url instead of file.

    This class extends TranslationRequest but replaces the file field
    with file_url to support batch processing from audio files written in JSON format.
    """

    file_url: str = Field(
        ...,
        description=(
            "Either a URL of the audio or a data URL with base64 encoded audio data. "
        ),
    )

    # Override file to be optional and unused for batch processing
    file: UploadFile | None = Field(default=None, exclude=True)  # type: ignore[assignment]

    @model_validator(mode="before")
    @classmethod
    def validate_no_file(cls, data: Any):
        """Ensure file field is not provided in batch requests."""
        if isinstance(data, dict) and "file" in data:
            raise ValueError(
                "The 'file' field is not supported in batch requests. "
                "Use 'file_url' instead."
            )
        return data

validate_no_file classmethod

validate_no_file(data: Any)

Ensure file field is not provided in batch requests.

Source code in vllm/entrypoints/openai/run_batch.py
@model_validator(mode="before")
@classmethod
def validate_no_file(cls, data: Any):
    """Ensure file field is not provided in batch requests."""
    if isinstance(data, dict) and "file" in data:
        raise ValueError(
            "The 'file' field is not supported in batch requests. "
            "Use 'file_url' instead."
        )
    return data

build_endpoint_registry async

build_endpoint_registry(
    engine_client: EngineClient, args: Namespace
) -> dict[str, dict[str, Any]]

Build the endpoint registry with all serving objects and handler configurations.

Parameters:

Name Type Description Default
engine_client EngineClient

The engine client

required
args Namespace

Command line arguments

required

Returns:

Type Description
dict[str, dict[str, Any]]

Dictionary mapping endpoint keys to their configurations

Source code in vllm/entrypoints/openai/run_batch.py
async def build_endpoint_registry(
    engine_client: EngineClient,
    args: Namespace,
) -> dict[str, dict[str, Any]]:
    """
    Build the endpoint registry with all serving objects and handler configurations.

    Args:
        engine_client: The engine client
        args: Command line arguments

    Returns:
        Dictionary mapping endpoint keys to their configurations
    """
    supported_tasks = await engine_client.get_supported_tasks()
    logger.info("Supported tasks: %s", supported_tasks)

    # Create a state object to hold serving objects
    state = State()

    # Initialize all serving objects using init_app_state
    # This provides full functionality including chat template processing,
    # LoRA support, tool servers, etc.
    await init_app_state(engine_client, state, args, supported_tasks)

    # Get serving objects from state (defaulting to None if not set)
    openai_serving_chat = getattr(state, "openai_serving_chat", None)
    openai_serving_embedding = getattr(state, "openai_serving_embedding", None)
    openai_serving_scores = getattr(state, "openai_serving_scores", None)
    openai_serving_transcription = getattr(state, "openai_serving_transcription", None)
    openai_serving_translation = getattr(state, "openai_serving_translation", None)

    # Registry of endpoint configurations
    endpoint_registry: dict[str, dict[str, Any]] = {
        "completions": {
            "url_matcher": lambda url: url == "/v1/chat/completions",
            "handler_getter": lambda: (
                openai_serving_chat.create_chat_completion
                if openai_serving_chat is not None
                else None
            ),
            "wrapper_fn": None,
        },
        "embeddings": {
            "url_matcher": lambda url: url == "/v1/embeddings",
            "handler_getter": lambda: (
                openai_serving_embedding.create_embedding
                if openai_serving_embedding is not None
                else None
            ),
            "wrapper_fn": None,
        },
        "score": {
            "url_matcher": lambda url: url.endswith("/score"),
            "handler_getter": lambda: (
                openai_serving_scores.create_score
                if openai_serving_scores is not None
                else None
            ),
            "wrapper_fn": None,
        },
        "rerank": {
            "url_matcher": lambda url: url.endswith("/rerank"),
            "handler_getter": lambda: (
                openai_serving_scores.do_rerank
                if openai_serving_scores is not None
                else None
            ),
            "wrapper_fn": None,
        },
        "transcriptions": {
            "url_matcher": lambda url: url == "/v1/audio/transcriptions",
            "handler_getter": lambda: (
                openai_serving_transcription.create_transcription
                if openai_serving_transcription is not None
                else None
            ),
            "wrapper_fn": make_transcription_wrapper(is_translation=False),
        },
        "translations": {
            "url_matcher": lambda url: url == "/v1/audio/translations",
            "handler_getter": lambda: (
                openai_serving_translation.create_translation
                if openai_serving_translation is not None
                else None
            ),
            "wrapper_fn": make_transcription_wrapper(is_translation=True),
        },
    }

    return endpoint_registry

download_bytes_from_url async

download_bytes_from_url(url: str) -> bytes

Download data from a URL or decode from a data URL.

Parameters:

Name Type Description Default
url str

Either an HTTP/HTTPS URL or a data URL (data:...;base64,...)

required

Returns:

Type Description
bytes

Data as bytes

Source code in vllm/entrypoints/openai/run_batch.py
async def download_bytes_from_url(url: str) -> bytes:
    """
    Download data from a URL or decode from a data URL.

    Args:
        url: Either an HTTP/HTTPS URL or a data URL (data:...;base64,...)

    Returns:
        Data as bytes
    """
    parsed = urlparse(url)

    # Handle data URLs (base64 encoded)
    if parsed.scheme == "data":
        # Format: data:...;base64,<base64_data>
        if "," in url:
            header, data = url.split(",", 1)
            if "base64" in header:
                return base64.b64decode(data)
            else:
                raise ValueError(f"Unsupported data URL encoding: {header}")
        else:
            raise ValueError(f"Invalid data URL format: {url}")

    # Handle HTTP/HTTPS URLs
    elif parsed.scheme in ("http", "https"):
        async with (
            aiohttp.ClientSession() as session,
            session.get(url) as resp,
        ):
            if resp.status != 200:
                raise Exception(
                    f"Failed to download data from URL: {url}. Status: {resp.status}"
                )
            return await resp.read()

    else:
        raise ValueError(
            f"Unsupported URL scheme: {parsed.scheme}. "
            "Supported schemes: http, https, data"
        )

handle_endpoint_request

handle_endpoint_request(
    request: BatchRequestInput,
    tracker: BatchProgressTracker,
    url_matcher: Callable[[str], bool],
    handler_getter: Callable[[], Callable | None],
    wrapper_fn: WrapperFn | None = None,
) -> Awaitable[BatchRequestOutput] | None

Generic handler for endpoint requests.

Parameters:

Name Type Description Default
request BatchRequestInput

The batch request input

required
tracker BatchProgressTracker

Progress tracker for the batch

required
url_matcher Callable[[str], bool]

Function that takes a URL and returns True if it matches

required
handler_getter Callable[[], Callable | None]

Function that returns the handler function or None

required
wrapper_fn WrapperFn | None

Optional function to wrap the handler (e.g., for transcriptions)

None

Returns:

Type Description
Awaitable[BatchRequestOutput] | None

Awaitable[BatchRequestOutput] if the request was handled,

Awaitable[BatchRequestOutput] | None

None if URL didn't match

Source code in vllm/entrypoints/openai/run_batch.py
def handle_endpoint_request(
    request: BatchRequestInput,
    tracker: BatchProgressTracker,
    url_matcher: Callable[[str], bool],
    handler_getter: Callable[[], Callable | None],
    wrapper_fn: WrapperFn | None = None,
) -> Awaitable[BatchRequestOutput] | None:
    """
    Generic handler for endpoint requests.

    Args:
        request: The batch request input
        tracker: Progress tracker for the batch
        url_matcher: Function that takes a URL and returns True if it matches
        handler_getter: Function that returns the handler function or None
        wrapper_fn: Optional function to wrap the handler (e.g., for transcriptions)

    Returns:
        Awaitable[BatchRequestOutput] if the request was handled,
        None if URL didn't match
    """
    if not url_matcher(request.url):
        return None

    handler_fn = handler_getter()
    if handler_fn is None:
        error_msg = f"Model does not support endpoint: {request.url}"
        return make_async_error_request_output(request, error_msg=error_msg)

    # Apply wrapper if provided (e.g., for transcriptions/translations)
    if wrapper_fn is not None:
        handler_fn = wrapper_fn(handler_fn)

    tracker.submitted()
    return run_request(handler_fn, request, tracker)

make_transcription_wrapper

make_transcription_wrapper(
    is_translation: bool,
) -> WrapperFn

Factory function to create a wrapper for transcription/translation handlers. The wrapper converts BatchTranscriptionRequest or BatchTranslationRequest to TranscriptionRequest or TranslationRequest and calls the appropriate handler.

Parameters:

Name Type Description Default
is_translation bool

If True, process as translation; otherwise process as transcription

required

Returns:

Type Description
WrapperFn

A function that takes a handler and returns a wrapped handler

Source code in vllm/entrypoints/openai/run_batch.py
def make_transcription_wrapper(is_translation: bool) -> WrapperFn:
    """
    Factory function to create a wrapper for transcription/translation handlers.
    The wrapper converts BatchTranscriptionRequest or BatchTranslationRequest
    to TranscriptionRequest or TranslationRequest and calls the appropriate handler.

    Args:
        is_translation: If True, process as translation; otherwise process
            as transcription

    Returns:
        A function that takes a handler and returns a wrapped handler
    """

    def wrapper(handler_fn: Callable):
        async def transcription_wrapper(
            batch_request_body: (BatchTranscriptionRequest | BatchTranslationRequest),
        ) -> (
            TranscriptionResponse
            | TranscriptionResponseVerbose
            | TranslationResponse
            | TranslationResponseVerbose
            | ErrorResponse
        ):
            try:
                # Download data from URL
                audio_data = await download_bytes_from_url(batch_request_body.file_url)

                # Create a mock file from the downloaded audio data
                mock_file = UploadFile(
                    file=BytesIO(audio_data),
                    filename="audio.bin",
                )

                # Convert batch request to regular request
                # by copying all fields except file_url and setting file to mock_file
                request_dict = batch_request_body.model_dump(exclude={"file_url"})
                request_dict["file"] = mock_file

                if is_translation:
                    # Create TranslationRequest from BatchTranslationRequest
                    translation_request = TranslationRequest.model_validate(
                        request_dict
                    )
                    return await handler_fn(audio_data, translation_request)
                else:
                    # Create TranscriptionRequest from BatchTranscriptionRequest
                    transcription_request = TranscriptionRequest.model_validate(
                        request_dict
                    )
                    return await handler_fn(audio_data, transcription_request)
            except Exception as e:
                operation = "translation" if is_translation else "transcription"
                return ErrorResponse(
                    error=ErrorInfo(
                        message=f"Failed to process {operation}: {str(e)}",
                        type="BadRequestError",
                        code=HTTPStatus.BAD_REQUEST.value,
                    )
                )

        return transcription_wrapper

    return wrapper

upload_data async

upload_data(
    output_url: str, data_or_file: str, from_file: bool
) -> None

Upload a local file to a URL. output_url: The URL to upload the file to. data_or_file: Either the data to upload or the path to the file to upload. from_file: If True, data_or_file is the path to the file to upload.

Source code in vllm/entrypoints/openai/run_batch.py
async def upload_data(output_url: str, data_or_file: str, from_file: bool) -> None:
    """
    Upload a local file to a URL.
    output_url: The URL to upload the file to.
    data_or_file: Either the data to upload or the path to the file to upload.
    from_file: If True, data_or_file is the path to the file to upload.
    """
    # Timeout is a common issue when uploading large files.
    # We retry max_retries times before giving up.
    max_retries = 5
    # Number of seconds to wait before retrying.
    delay = 5

    for attempt in range(1, max_retries + 1):
        try:
            # We increase the timeout to 1000 seconds to allow
            # for large files (default is 300).
            async with aiohttp.ClientSession(
                timeout=aiohttp.ClientTimeout(total=1000)
            ) as session:
                if from_file:
                    with open(data_or_file, "rb") as file:
                        async with session.put(output_url, data=file) as response:
                            if response.status != 200:
                                raise Exception(
                                    f"Failed to upload file.\n"
                                    f"Status: {response.status}\n"
                                    f"Response: {response.text()}"
                                )
                else:
                    async with session.put(output_url, data=data_or_file) as response:
                        if response.status != 200:
                            raise Exception(
                                f"Failed to upload data.\n"
                                f"Status: {response.status}\n"
                                f"Response: {response.text()}"
                            )

        except Exception as e:
            if attempt < max_retries:
                logger.error(
                    "Failed to upload data (attempt %d). Error message: %s.\nRetrying in %d seconds...",  # noqa: E501
                    attempt,
                    e,
                    delay,
                )
                await asyncio.sleep(delay)
            else:
                raise Exception(
                    f"Failed to upload data (attempt {attempt}). Error message: {str(e)}."  # noqa: E501
                ) from e

write_file async

write_file(
    path_or_url: str,
    batch_outputs: list[BatchRequestOutput],
    output_tmp_dir: str,
) -> None

Write batch_outputs to a file or upload to a URL. path_or_url: The path or URL to write batch_outputs to. batch_outputs: The list of batch outputs to write. output_tmp_dir: The directory to store the output file before uploading it to the output URL.

Source code in vllm/entrypoints/openai/run_batch.py
async def write_file(
    path_or_url: str, batch_outputs: list[BatchRequestOutput], output_tmp_dir: str
) -> None:
    """
    Write batch_outputs to a file or upload to a URL.
    path_or_url: The path or URL to write batch_outputs to.
    batch_outputs: The list of batch outputs to write.
    output_tmp_dir: The directory to store the output file before uploading it
    to the output URL.
    """
    if path_or_url.startswith("http://") or path_or_url.startswith("https://"):
        if output_tmp_dir is None:
            logger.info("Writing outputs to memory buffer")
            output_buffer = StringIO()
            for o in batch_outputs:
                print(o.model_dump_json(), file=output_buffer)
            output_buffer.seek(0)
            logger.info("Uploading outputs to %s", path_or_url)
            await upload_data(
                path_or_url,
                output_buffer.read().strip().encode("utf-8"),
                from_file=False,
            )
        else:
            # Write responses to a temporary file and then upload it to the URL.
            with tempfile.NamedTemporaryFile(
                mode="w",
                encoding="utf-8",
                dir=output_tmp_dir,
                prefix="tmp_batch_output_",
                suffix=".jsonl",
            ) as f:
                logger.info("Writing outputs to temporary local file %s", f.name)
                await write_local_file(f.name, batch_outputs)
                logger.info("Uploading outputs to %s", path_or_url)
                await upload_data(path_or_url, f.name, from_file=True)
    else:
        logger.info("Writing outputs to local file %s", path_or_url)
        await write_local_file(path_or_url, batch_outputs)

write_local_file async

write_local_file(
    output_path: str,
    batch_outputs: list[BatchRequestOutput],
) -> None

Write the responses to a local file. output_path: The path to write the responses to. batch_outputs: The list of batch outputs to write.

Source code in vllm/entrypoints/openai/run_batch.py
async def write_local_file(
    output_path: str, batch_outputs: list[BatchRequestOutput]
) -> None:
    """
    Write the responses to a local file.
    output_path: The path to write the responses to.
    batch_outputs: The list of batch outputs to write.
    """
    # We should make this async, but as long as run_batch runs as a
    # standalone program, blocking the event loop won't affect performance.
    with open(output_path, "w", encoding="utf-8") as f:
        for o in batch_outputs:
            print(o.model_dump_json(), file=f)