Files
yichuan520030910320 46f6cc100b Initial commit
2025-06-30 09:05:05 +00:00

376 lines
13 KiB
Python

import argparse
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
import numpy as np
import torch
from torch import nn
from transformers import AutoModel
from tqdm import tqdm
from contextlib import contextmanager
import math
@dataclass
class BenchmarkConfig:
model_path: str
batch_sizes: List[int]
seq_length: int
num_runs: int
use_fp16: bool = True
use_cuda_graphs: bool = False
use_flash_attention: bool = False
max_batch_size: int = 256 # Maximum batch size before splitting
class CUDAGraphContainer:
"""Container for managing CUDA graphs for different batch sizes."""
def __init__(self, model: nn.Module, seq_length: int, max_batch_size: int):
self.model = model
self.seq_length = seq_length
self.max_batch_size = max_batch_size
self.graphs: Dict[int, CUDAGraphWrapper] = {}
def get_or_create(self, batch_size: int) -> 'CUDAGraphWrapper':
# For CUDA graphs, we always use the actual batch size or max_batch_size
effective_batch_size = min(batch_size, self.max_batch_size)
if effective_batch_size not in self.graphs:
self.graphs[effective_batch_size] = CUDAGraphWrapper(
self.model, effective_batch_size, self.seq_length
)
return self.graphs[effective_batch_size]
class CUDAGraphWrapper:
"""Wrapper for CUDA graph capture and replay."""
def __init__(self, model: nn.Module, batch_size: int, seq_length: int):
self.model = model
self.static_input = self._create_random_batch(batch_size, seq_length)
self.static_attention_mask = torch.ones_like(self.static_input)
# Warm up
self._warmup()
# Capture graph
self.graph = torch.cuda.CUDAGraph()
with torch.cuda.graph(self.graph):
self.static_output = self.model(
input_ids=self.static_input,
attention_mask=self.static_attention_mask
)
def _create_random_batch(self, batch_size: int, seq_length: int) -> torch.Tensor:
return torch.randint(
0, 1000, (batch_size, seq_length),
device="cuda",
dtype=torch.long
)
def _warmup(self, num_warmup: int = 3):
with torch.no_grad():
for _ in range(num_warmup):
self.model(
input_ids=self.static_input,
attention_mask=self.static_attention_mask
)
def __call__(self, input_ids: torch.Tensor, attention_mask: torch.Tensor) -> torch.Tensor:
self.static_input.copy_(input_ids)
self.static_attention_mask.copy_(attention_mask)
self.graph.replay()
return self.static_output
class ModelOptimizer:
"""Applies various optimizations to the model."""
@staticmethod
def optimize(model: nn.Module, config: BenchmarkConfig) -> nn.Module:
print("\nApplying model optimizations:")
# Move to GPU
model = model.cuda()
print("- Model moved to GPU")
# FP16
if config.use_fp16:
model = model.half()
print("- Using FP16 precision")
# Check if using SDPA
if torch.version.cuda and float(torch.version.cuda[:3]) >= 11.6:
if hasattr(torch.nn.functional, 'scaled_dot_product_attention'):
print("- Using PyTorch SDPA (scaled_dot_product_attention)")
# No need to do anything as it's automatically enabled
else:
print("- PyTorch SDPA not available")
# Flash Attention
if config.use_flash_attention:
try:
from flash_attn.flash_attention import FlashAttention
print("- Flash Attention 2 available")
if hasattr(model.config, "attention_mode"):
model.config.attention_mode = "flash_attention_2"
print(" - Enabled Flash Attention 2 mode")
except ImportError:
print("- Flash Attention not available")
# Optimize LayerNorm
try:
num_layernorms = 0
for module in model.modules():
if isinstance(module, torch.nn.LayerNorm):
module.forward = torch.jit.script(module.forward)
num_layernorms += 1
if num_layernorms > 0:
print(f"- Optimized {num_layernorms} LayerNorm modules with TorchScript")
except Exception as e:
print(f"- LayerNorm optimization failed: {e}")
# Memory efficient attention
try:
from xformers.ops import memory_efficient_attention
model.enable_xformers_memory_efficient_attention()
print("- Enabled xformers memory efficient attention")
except (ImportError, AttributeError):
print("- Xformers not available")
model.eval()
print("- Model set to eval mode")
return model
class Timer:
"""Handles accurate GPU timing using CUDA events."""
def __init__(self):
self.start_event = torch.cuda.Event(enable_timing=True)
self.end_event = torch.cuda.Event(enable_timing=True)
@contextmanager
def timing(self):
self.start_event.record()
yield
self.end_event.record()
self.end_event.synchronize()
def elapsed_time(self) -> float:
return self.start_event.elapsed_time(self.end_event) / 1000 # ms to seconds
class Benchmark:
"""Main benchmark runner."""
def __init__(self, config: BenchmarkConfig):
self.config = config
self.model = self._load_model()
self.cuda_graphs = (
CUDAGraphContainer(self.model, config.seq_length, config.max_batch_size)
if config.use_cuda_graphs
else None
)
self.timer = Timer()
def _load_model(self) -> nn.Module:
print(f"Loading model from {self.config.model_path}...")
model = AutoModel.from_pretrained(self.config.model_path)
return ModelOptimizer.optimize(model, self.config)
def _create_random_batch(self, batch_size: int) -> torch.Tensor:
return torch.randint(
0, 1000,
(batch_size, self.config.seq_length),
device="cuda",
dtype=torch.long
)
def _run_inference(
self,
input_ids: torch.Tensor,
cuda_graph_wrapper: Optional[CUDAGraphWrapper] = None
) -> Tuple[float, torch.Tensor]:
attention_mask = torch.ones_like(input_ids)
original_batch_size = input_ids.shape[0]
print(f"Original input_ids shape: {input_ids.shape}")
# Split large batches to avoid OOM
max_batch_size = self.config.max_batch_size
if original_batch_size > max_batch_size:
print(f"Splitting batch of size {original_batch_size} into chunks of {max_batch_size}")
total_time = 0
outputs = []
with torch.no_grad():
for i in range(0, original_batch_size, max_batch_size):
end_idx = min(i + max_batch_size, original_batch_size)
batch_slice = input_ids[i:end_idx]
mask_slice = attention_mask[i:end_idx]
print(f"Processing chunk {i//max_batch_size + 1}: shape {batch_slice.shape}")
# Use CUDA graph if available (with the smaller batch size)
chunk_cuda_graph = None
if cuda_graph_wrapper is not None:
chunk_cuda_graph = self.cuda_graphs.get_or_create(batch_slice.shape[0])
with self.timer.timing():
if chunk_cuda_graph is not None:
chunk_output = chunk_cuda_graph(batch_slice, mask_slice)
else:
chunk_output = self.model(input_ids=batch_slice, attention_mask=mask_slice)
total_time += self.timer.elapsed_time()
outputs.append(chunk_output.last_hidden_state)
# Combine outputs
combined_output = torch.cat(outputs, dim=0)
print(f"Combined output shape: {combined_output.shape}")
# Create a wrapper object similar to model output to maintain consistency
class DummyOutput:
def __init__(self, hidden_states):
self.last_hidden_state = hidden_states
output = DummyOutput(combined_output)
return total_time, output
else:
# Process normally for small batches
with torch.no_grad(), self.timer.timing():
if cuda_graph_wrapper is not None:
output = cuda_graph_wrapper(input_ids, attention_mask)
else:
output = self.model(input_ids=input_ids, attention_mask=attention_mask)
print(f"Output shape: {output.last_hidden_state.shape}")
return self.timer.elapsed_time(), output
def run(self) -> Dict[int, Dict[str, float]]:
results = {}
for batch_size in self.config.batch_sizes:
print(f"\nTesting batch size: {batch_size}")
times = []
# Get or create CUDA graph for this batch size
cuda_graph_wrapper = None
if self.cuda_graphs is not None:
if batch_size <= self.config.max_batch_size:
cuda_graph_wrapper = self.cuda_graphs.get_or_create(batch_size)
else:
# For large batches, we'll use the max_batch_size graph in chunks
cuda_graph_wrapper = True # Just a flag to indicate we want to use CUDA graphs
# Pre-allocate input tensor
input_ids = self._create_random_batch(batch_size)
# Run benchmark
for run_idx in tqdm(range(self.config.num_runs), desc=f"Batch size {batch_size}"):
elapsed_time, _ = self._run_inference(input_ids, cuda_graph_wrapper)
times.append(elapsed_time)
print(f"Run {run_idx+1}: {elapsed_time:.4f}s")
# Calculate statistics
avg_time = np.mean(times)
std_time = np.std(times)
throughput = batch_size / avg_time
results[batch_size] = {
"avg_time": avg_time,
"std_time": std_time,
"throughput": throughput,
}
print(f"Avg Time: {avg_time:.4f}s ± {std_time:.4f}s")
print(f"Throughput: {throughput:.2f} sequences/second")
return results
def main():
parser = argparse.ArgumentParser(description="Model Inference Benchmark")
parser.add_argument(
"--model_path",
type=str,
default="facebook/contriever",
help="Path to the model",
)
parser.add_argument(
"--batch_sizes",
type=str,
default="1,2,4,8,16,32,64,128,256,512,1024,2048,4096",
help="Comma-separated list of batch sizes",
)
parser.add_argument(
"--seq_length",
type=int,
default=256,
help="Sequence length for input",
)
parser.add_argument(
"--num_runs",
type=int,
default=5,
help="Number of runs for each batch size",
)
parser.add_argument(
"--no_fp16",
action="store_true",
help="Disable FP16 inference",
)
parser.add_argument(
"--use_cuda_graphs",
action="store_true",
help="Enable CUDA Graphs optimization",
)
parser.add_argument(
"--use_flash_attention",
action="store_true",
help="Enable Flash Attention 2 if available",
)
parser.add_argument(
"--max_batch_size",
type=int,
default=256,
help="Maximum batch size before splitting to prevent OOM",
)
args = parser.parse_args()
config = BenchmarkConfig(
model_path=args.model_path,
batch_sizes=[int(bs) for bs in args.batch_sizes.split(",")],
seq_length=args.seq_length,
num_runs=args.num_runs,
use_fp16=not args.no_fp16,
use_cuda_graphs=args.use_cuda_graphs,
use_flash_attention=args.use_flash_attention,
max_batch_size=args.max_batch_size,
)
benchmark = Benchmark(config)
results = benchmark.run()
# Print overall summary
print("\n===== BENCHMARK SUMMARY =====")
print(f"Model: {config.model_path}")
print(f"Sequence Length: {config.seq_length}")
print(f"FP16: {config.use_fp16}")
print(f"CUDA Graphs: {config.use_cuda_graphs}")
print(f"Flash Attention: {config.use_flash_attention}")
print(f"Max Batch Size: {config.max_batch_size}")
print("\nResults:")
print("\nBatch Size | Avg Time (s) | Throughput (seq/s)")
print("-" * 50)
for bs in sorted(results.keys()):
r = results[bs]
print(f"{bs:^10} | {r['avg_time']:^12.4f} | {r['throughput']:^17.2f}")
if __name__ == "__main__":
main()