PyTorch Complete Guide Book

11 min read

Getting Started
Installation
Latest Stable Version:
# CPU only
pip install torch torchvision torchaudio
# CUDA 11.8
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
# CUDA 12.1
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
# Apple Silicon (M1/M2)
pip install torch torchvision torchaudio
Development Version:
pip install --pre torch torchvision torchaudio --index-url https://download.pytorch.org/whl/nightly/cu121
Verification
import torch
import torchvision
import torchaudio
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"MPS available: {torch.backends.mps.is_available()}") # Apple Silicon
print(f"Number of GPUs: {torch.cuda.device_count()}")
Tensor Fundamentals
Creating Tensors
Basic Creation:
import torch
# From lists
tensor_from_list = torch.tensor([1, 2, 3, 4])
tensor_2d = torch.tensor([[1, 2], [3, 4]])
# Specify dtype and device
tensor_float = torch.tensor([1, 2, 3], dtype=torch.float32, device='cuda')
# Common creation functions
zeros = torch.zeros(3, 4)
ones = torch.ones(2, 3)
empty = torch.empty(2, 2) # Uninitialized
eye = torch.eye(3) # Identity matrix
# Random tensors
randn = torch.randn(2, 3) # Normal distribution
rand = torch.rand(2, 3) # Uniform [0, 1)
randint = torch.randint(0, 10, (2, 3)) # Random integers
# From NumPy
import numpy as np
numpy_array = np.array([1, 2, 3])
from_numpy = torch.from_numpy(numpy_array)
Advanced Creation:
# Like operations (same shape/dtype as another tensor)
x = torch.randn(2, 3)
zeros_like = torch.zeros_like(x)
ones_like = torch.ones_like(x)
randn_like = torch.randn_like(x)
# Ranges
arange = torch.arange(0, 10, 2) # [0, 2, 4, 6, 8]
linspace = torch.linspace(0, 1, 5) # [0, 0.25, 0.5, 0.75, 1]
# Complex tensors
complex_tensor = torch.complex(torch.randn(2, 2), torch.randn(2, 2))
Tensor Properties
x = torch.randn(2, 3, 4)
print(f"Shape: {x.shape}") # torch.Size([2, 3, 4])
print(f"Size: {x.size()}") # torch.Size([2, 3, 4])
print(f"Dimensions: {x.ndim}") # 3
print(f"Total elements: {x.numel()}") # 24
print(f"Data type: {x.dtype}") # torch.float32
print(f"Device: {x.device}") # cpu or cuda:0
print(f"Memory layout: {x.layout}") # torch.strided
print(f"Requires grad: {x.requires_grad}") # False
Tensor Operations
Basic Math:
a = torch.tensor([1, 2, 3])
b = torch.tensor([4, 5, 6])
# Element-wise operations
add = a + b # or torch.add(a, b)
sub = a - b # or torch.sub(a, b)
mul = a * b # or torch.mul(a, b)
div = a / b # or torch.div(a, b)
pow = a ** 2 # or torch.pow(a, 2)
# In-place operations (modify original tensor)
a.add_(b) # a = a + b
a.mul_(2) # a = a * 2
Matrix Operations:
# Matrix multiplication
A = torch.randn(3, 4)
B = torch.randn(4, 5)
C = torch.mm(A, B) # or A @ B
# Batch matrix multiplication
batch_A = torch.randn(10, 3, 4)
batch_B = torch.randn(10, 4, 5)
batch_C = torch.bmm(batch_A, batch_B)
# Einstein summation (most flexible)
result = torch.einsum('ij,jk->ik', A, B) # Matrix multiplication
Reshaping and Indexing:
x = torch.randn(2, 3, 4)
# Reshaping
reshaped = x.view(6, 4) # View (shares memory)
reshaped = x.reshape(6, 4) # Reshape (may copy)
flattened = x.flatten() # 1D tensor
squeezed = x.squeeze() # Remove dimensions of size 1
unsqueezed = x.unsqueeze(0) # Add dimension of size 1
# Transposing
transposed = x.transpose(0, 1) # Swap dimensions 0 and 1
permuted = x.permute(2, 0, 1) # Reorder dimensions
# Indexing
first_row = x[0] # First matrix
first_element = x[0, 0, 0] # Single element
slice_tensor = x[:, 1:, :] # Slice along dimension 1
Advanced Indexing:
# Boolean indexing
mask = x > 0
positive_values = x[mask]
# Fancy indexing
indices = torch.tensor([0, 2])
selected = x[indices]
# gather and scatter
source = torch.tensor([[1, 2], [3, 4]])
index = torch.tensor([[0, 0], [1, 0]])
gathered = torch.gather(source, 1, index)
Automatic Differentiation
Requires Grad
# Enable gradient computation
x = torch.tensor([2.0], requires_grad=True)
y = torch.tensor([3.0], requires_grad=True)
# Forward pass
z = x * y + x ** 2
loss = z.sum()
# Backward pass
loss.backward()
print(f"x.grad: {x.grad}") # dL/dx
print(f"y.grad: {y.grad}") # dL/dy
Gradient Context Managers
# Disable gradients (inference mode)
with torch.no_grad():
output = model(input_tensor)
# Enable gradients temporarily
x = torch.randn(10, requires_grad=False)
with torch.enable_grad():
x.requires_grad_(True)
y = x.sum()
y.backward()
# Inference mode (faster than no_grad for inference)
with torch.inference_mode():
output = model(input_tensor)
Custom Autograd Functions
class CustomFunction(torch.autograd.Function):
@staticmethod
def forward(ctx, input, weight):
ctx.save_for_backward(input, weight)
output = input @ weight
return output
@staticmethod
def backward(ctx, grad_output):
input, weight = ctx.saved_tensors
grad_input = grad_output @ weight.t()
grad_weight = input.t() @ grad_output
return grad_input, grad_weight
# Usage
custom_fn = CustomFunction.apply
Gradient Computation Utilities
# Compute gradients without modifying tensors
def compute_gradients(outputs, inputs):
grads = torch.autograd.grad(
outputs=outputs,
inputs=inputs,
create_graph=True, # For higher-order gradients
retain_graph=True, # Keep graph for multiple backward passes
)
return grads
# Gradient accumulation
optimizer.zero_grad()
for micro_batch in split_batch(batch):
loss = model(micro_batch)
loss.backward() # Accumulates gradients
optimizer.step()
Neural Networks with nn.Module
Basic Module Structure
import torch.nn as nn
import torch.nn.functional as F
class MLP(nn.Module):
def __init__(self, input_size, hidden_size, output_size, dropout_rate=0.1):
super(MLP, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc3 = nn.Linear(hidden_size, output_size)
self.dropout = nn.Dropout(dropout_rate)
self.layer_norm = nn.LayerNorm(hidden_size)
def forward(self, x):
x = F.relu(self.fc1(x))
x = self.layer_norm(x)
x = self.dropout(x)
x = F.relu(self.fc2(x))
x = self.dropout(x)
x = self.fc3(x)
return x
# Initialize model
model = MLP(784, 256, 10)
Common Layers
Linear Layers:
# Fully connected layer
linear = nn.Linear(in_features=512, out_features=256, bias=True)
# Bilinear layer
bilinear = nn.Bilinear(in1_features=128, in2_features=64, out_features=32)
Convolutional Layers:
# 2D Convolution
conv2d = nn.Conv2d(
in_channels=3,
out_channels=64,
kernel_size=3,
stride=1,
padding=1,
dilation=1,
groups=1,
bias=True
)
# Transposed convolution (deconvolution)
conv_transpose = nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1)
# Depthwise separable convolution
depthwise = nn.Conv2d(64, 64, kernel_size=3, groups=64)
pointwise = nn.Conv2d(64, 128, kernel_size=1)
Normalization Layers:
# Batch normalization
batch_norm = nn.BatchNorm2d(64)
# Layer normalization
layer_norm = nn.LayerNorm(512)
# Group normalization
group_norm = nn.GroupNorm(num_groups=8, num_channels=64)
# Instance normalization
instance_norm = nn.InstanceNorm2d(64)
Activation Functions:
# Common activations
relu = nn.ReLU()
gelu = nn.GELU()
swish = nn.SiLU() # Swish/SiLU
leaky_relu = nn.LeakyReLU(negative_slope=0.01)
elu = nn.ELU()
tanh = nn.Tanh()
sigmoid = nn.Sigmoid()
# Advanced activations
mish = nn.Mish()
Attention Mechanisms:
# Multi-head attention
multihead_attn = nn.MultiheadAttention(
embed_dim=512,
num_heads=8,
dropout=0.1,
batch_first=True
)
# Transformer encoder layer
encoder_layer = nn.TransformerEncoderLayer(
d_model=512,
nhead=8,
dim_feedforward=2048,
dropout=0.1,
activation='gelu'
)
Model Utilities
Parameter Management:
# Count parameters
def count_parameters(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
# Freeze/unfreeze parameters
def freeze_model(model):
for param in model.parameters():
param.requires_grad = False
def unfreeze_model(model):
for param in model.parameters():
param.requires_grad = True
# Initialize weights
def init_weights(model):
for m in model.modules():
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
Model Information:
# Model summary
def model_summary(model, input_size):
def hook_fn(module, input, output):
class_name = str(module.__class__).split(".")[-1].split("'")[0]
module_idx = len(summary)
m_key = f"{class_name}-{module_idx+1}"
summary[m_key] = {
"input_shape": list(input[0].size()),
"output_shape": list(output.size()),
"num_params": sum(p.numel() for p in module.parameters())
}
hooks = []
summary = {}
for name, layer in model.named_modules():
if len(list(layer.children())) == 0: # leaf module
hooks.append(layer.register_forward_hook(hook_fn))
# Forward pass
model(torch.zeros(1, *input_size))
# Remove hooks
for h in hooks:
h.remove()
return summary
Data Loading and Processing
Dataset Classes
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
class CustomDataset(Dataset):
def __init__(self, data, targets, transform=None):
self.data = data
self.targets = targets
self.transform = transform
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
sample = self.data[idx]
target = self.targets[idx]
if self.transform:
sample = self.transform(sample)
return sample, target
# Image dataset example
class ImageDataset(Dataset):
def __init__(self, image_paths, labels, transform=None):
self.image_paths = image_paths
self.labels = labels
self.transform = transform
def __len__(self):
return len(self.image_paths)
def __getitem__(self, idx):
from PIL import Image
image = Image.open(self.image_paths[idx])
label = self.labels[idx]
if self.transform:
image = self.transform(image)
return image, label
Data Transformations
# Image transformations
transform_train = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(p=0.5),
transforms.RandomRotation(degrees=10),
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
transform_val = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# Custom transform
class CustomNormalize:
def __init__(self, mean, std):
self.mean = mean
self.std = std
def __call__(self, tensor):
return (tensor - self.mean) / self.std
DataLoader Configuration
# Basic DataLoader
train_loader = DataLoader(
dataset=train_dataset,
batch_size=32,
shuffle=True,
num_workers=4,
pin_memory=True, # Faster GPU transfer
drop_last=True, # Drop incomplete batch
persistent_workers=True # Keep workers alive between epochs
)
# Custom collate function
def custom_collate_fn(batch):
# Handle variable-length sequences
data, targets = zip(*batch)
# Pad sequences to same length
from torch.nn.utils.rnn import pad_sequence
data = pad_sequence(data, batch_first=True, padding_value=0)
targets = torch.stack(targets)
return data, targets
# DataLoader with custom collate
loader = DataLoader(
dataset,
batch_size=32,
collate_fn=custom_collate_fn
)
Built-in Datasets
import torchvision.datasets as datasets
# CIFAR-10
cifar10_train = datasets.CIFAR10(
root='./data',
train=True,
download=True,
transform=transform_train
)
# ImageNet
imagenet = datasets.ImageNet(
root='./data/imagenet',
split='train',
transform=transform_train
)
# Custom folder dataset
folder_dataset = datasets.ImageFolder(
root='./data/custom',
transform=transform_train
)
Training Loops and Optimization
Basic Training Loop
import torch.optim as optim
def train_epoch(model, dataloader, criterion, optimizer, device):
model.train()
running_loss = 0.0
correct = 0
total = 0
for batch_idx, (data, target) in enumerate(dataloader):
data, target = data.to(device), target.to(device)
# Zero gradients
optimizer.zero_grad()
# Forward pass
output = model(data)
loss = criterion(output, target)
# Backward pass
loss.backward()
# Update weights
optimizer.step()
# Statistics
running_loss += loss.item()
_, predicted = output.max(1)
total += target.size(0)
correct += predicted.eq(target).sum().item()
if batch_idx % 100 == 0:
print(f'Batch {batch_idx}, Loss: {loss.item():.4f}')
epoch_loss = running_loss / len(dataloader)
accuracy = 100. * correct / total
return epoch_loss, accuracy
def validate_epoch(model, dataloader, criterion, device):
model.eval()
val_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for data, target in dataloader:
data, target = data.to(device), target.to(device)
output = model(data)
val_loss += criterion(output, target).item()
_, predicted = output.max(1)
total += target.size(0)
correct += predicted.eq(target).sum().item()
val_loss /= len(dataloader)
accuracy = 100. * correct / total
return val_loss, accuracy
Optimizers
# SGD with momentum
optimizer = optim.SGD(
model.parameters(),
lr=0.01,
momentum=0.9,
weight_decay=1e-4,
nesterov=True
)
# Adam
optimizer = optim.Adam(
model.parameters(),
lr=1e-3,
betas=(0.9, 0.999),
eps=1e-8,
weight_decay=1e-4
)
# AdamW (better weight decay)
optimizer = optim.AdamW(
model.parameters(),
lr=1e-3,
betas=(0.9, 0.999),
eps=1e-8,
weight_decay=0.01
)
# Different learning rates for different parts
optimizer = optim.Adam([
{'params': model.backbone.parameters(), 'lr': 1e-4},
{'params': model.classifier.parameters(), 'lr': 1e-3}
])
Learning Rate Scheduling
from torch.optim.lr_scheduler import *
# Step decay
scheduler = StepLR(optimizer, step_size=30, gamma=0.1)
# Exponential decay
scheduler = ExponentialLR(optimizer, gamma=0.95)
# Cosine annealing
scheduler = CosineAnnealingLR(optimizer, T_max=100, eta_min=1e-6)
# Reduce on plateau
scheduler = ReduceLROnPlateau(
optimizer,
mode='min',
factor=0.5,
patience=10,
min_lr=1e-6
)
# Warm restart
scheduler = CosineAnnealingWarmRestarts(
optimizer,
T_0=10,
T_mult=2,
eta_min=1e-6
)
# One cycle (for super-convergence)
scheduler = OneCycleLR(
optimizer,
max_lr=0.1,
steps_per_epoch=len(train_loader),
epochs=100
)
# Custom scheduler
class WarmupScheduler:
def __init__(self, optimizer, warmup_steps, max_lr):
self.optimizer = optimizer
self.warmup_steps = warmup_steps
self.max_lr = max_lr
self.step_count = 0
def step(self):
self.step_count += 1
if self.step_count <= self.warmup_steps:
lr = self.max_lr * self.step_count / self.warmup_steps
for param_group in self.optimizer.param_groups:
param_group['lr'] = lr
Loss Functions
# Classification
cross_entropy = nn.CrossEntropyLoss()
nll_loss = nn.NLLLoss()
binary_cross_entropy = nn.BCELoss()
focal_loss = nn.CrossEntropyLoss() # Can implement custom focal loss
# Regression
mse_loss = nn.MSELoss()
mae_loss = nn.L1Loss()
huber_loss = nn.SmoothL1Loss()
# Custom loss
class FocalLoss(nn.Module):
def __init__(self, alpha=1, gamma=2):
super().__init__()
self.alpha = alpha
self.gamma = gamma
self.ce_loss = nn.CrossEntropyLoss(reduction='none')
def forward(self, inputs, targets):
ce_loss = self.ce_loss(inputs, targets)
pt = torch.exp(-ce_loss)
focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
return focal_loss.mean()
Advanced Features
Mixed Precision Training
from torch.cuda.amp import autocast, GradScaler
# Initialize scaler
scaler = GradScaler()
def train_with_amp(model, dataloader, criterion, optimizer, device):
model.train()
for data, target in dataloader:
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
# Forward pass with autocast
with autocast():
output = model(data)
loss = criterion(output, target)
# Backward pass with scaling
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
Model Checkpointing
def save_checkpoint(model, optimizer, scheduler, epoch, loss, path):
checkpoint = {
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'scheduler_state_dict': scheduler.state_dict() if scheduler else None,
'loss': loss,
'torch_version': torch.__version__
}
torch.save(checkpoint, path)
def load_checkpoint(model, optimizer, scheduler, path, device):
checkpoint = torch.load(path, map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
if scheduler and checkpoint['scheduler_state_dict']:
scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
return checkpoint['epoch'], checkpoint['loss']
Distributed Training
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
dist.init_process_group("nccl", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
def train_ddp(rank, world_size, model_class, dataset):
setup(rank, world_size)
# Create model and move to GPU
model = model_class().to(rank)
model = DDP(model, device_ids=[rank])
# Create distributed sampler
sampler = torch.utils.data.distributed.DistributedSampler(
dataset,
num_replicas=world_size,
rank=rank
)
dataloader = DataLoader(
dataset,
batch_size=32,
sampler=sampler,
num_workers=2
)
# Training loop
for epoch in range(num_epochs):
sampler.set_epoch(epoch) # Important for proper shuffling
# ... training code ...
cleanup()
# Launch distributed training
if __name__ == '__main__':
world_size = torch.cuda.device_count()
mp.spawn(train_ddp, args=(world_size, MyModel, dataset), nprocs=world_size)
Model Compilation (PyTorch 2.0+)
# Compile model for optimization
model = torch.compile(model)
# Different modes
model = torch.compile(model, mode="reduce-overhead") # Faster for smaller models
model = torch.compile(model, mode="max-autotune") # Best performance
model = torch.compile(model, mode="default") # Balanced
# Disable compilation for debugging
model = torch.compile(model, disable=True)
TorchScript
# Tracing
model.eval()
example_input = torch.randn(1, 3, 224, 224)
traced_model = torch.jit.trace(model, example_input)
# Scripting
scripted_model = torch.jit.script(model)
# Save and load
traced_model.save("model_traced.pt")
loaded_model = torch.jit.load("model_traced.pt")
Model Deployment
ONNX Export
import torch.onnx
def export_to_onnx(model, dummy_input, onnx_path):
model.eval()
torch.onnx.export(
model,
dummy_input,
onnx_path,
export_params=True,
opset_version=11,
do_constant_folding=True,
input_names=['input'],
output_names=['output'],
dynamic_axes={
'input': {0: 'batch_size'},
'output': {0: 'batch_size'}
}
)
# Usage
dummy_input = torch.randn(1, 3, 224, 224)
export_to_onnx(model, dummy_input, "model.onnx")
Mobile Deployment
# Optimize for mobile
from torch.utils.mobile_optimizer import optimize_for_mobile
# Trace model
traced_model = torch.jit.trace(model, example_input)
# Optimize for mobile
optimized_model = optimize_for_mobile(traced_model)
# Save for mobile
optimized_model._save_for_lite_interpreter("model_mobile.ptl")
Quantization
# Post-training quantization
import torch.quantization as quant
# Prepare model
model.qconfig = quant.get_default_qconfig('fbgemm')
model_prepared = quant.prepare(model)
# Calibrate with representative data
model_prepared.eval()
with torch.no_grad():
for data, _ in calibration_loader:
model_prepared(data)
# Convert to quantized model
quantized_model = quant.convert(model_prepared)
# Dynamic quantization (easier)
quantized_model = torch.quantization.quantize_dynamic(
model,
{torch.nn.Linear},
dtype=torch.qint8
)
Best Practices
Memory Management
# Clear cache
torch.cuda.empty_cache()
# Monitor memory usage
print(f"Allocated: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
print(f"Cached: {torch.cuda.memory_reserved() / 1024**3:.2f} GB")
# Use memory-efficient operations
# Instead of: loss = criterion(output, target)
# Use checkpointing for large models
from torch.utils.checkpoint import checkpoint
def forward_with_checkpoint(model, x):
return checkpoint(model, x)
Reproducibility
import random
import numpy as np
def set_seed(seed):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
set_seed(42)
Debugging and Profiling
```python
Gradient clipping
torch.nn.utils.clip_gradnorm(model.parameters(), max_norm=1.0)
Detect anomalies
torch.autogra
0
Subscribe to my newsletter
Read articles from Anni Huang directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Anni Huang
Anni Huang
I am Anni HUANG, a software engineer with 3 years of experience in IDE development and Chatbot.