PyTorch has become the framework of choice for deep learning research and increasingly for production. This guide covers advanced techniques for training models efficiently, from distributed training to mixed precision.

Project Structure

A well-organized PyTorch project:

my_prcdmtutrooaortrejntdaiaqefaeiliucie/dltntlsmnitgx_as_ri_ro/_e.r/spit/ianiasitpe/enanngnisnrymrisis/ineiieitetftestcnm_t_o_r._ste_._r_.p_.sn.p.m.py.p.tpypepypyt.yyryyxy.tapmyl

Efficient Data Loading

Custom Dataset

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import torch
from torch.utils.data import Dataset, DataLoader
from pathlib import Path
import json

class TextDataset(Dataset):
    def __init__(self, data_path: Path, tokenizer, max_length: int = 512):
        self.tokenizer = tokenizer
        self.max_length = max_length
        
        with open(data_path) as f:
            self.data = json.load(f)
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        item = self.data[idx]
        
        encoding = self.tokenizer(
            item["text"],
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )
        
        return {
            "input_ids": encoding["input_ids"].squeeze(),
            "attention_mask": encoding["attention_mask"].squeeze(),
            "labels": torch.tensor(item["label"])
        }

Optimized DataLoader

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def create_dataloader(dataset, batch_size: int, is_train: bool = True):
    return DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=is_train,
        num_workers=4,  # Parallel data loading
        pin_memory=True,  # Faster GPU transfer
        prefetch_factor=2,  # Prefetch batches
        persistent_workers=True,  # Keep workers alive
        drop_last=is_train  # Consistent batch sizes for training
    )

Model Definition Best Practices

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import torch
import torch.nn as nn
import torch.nn.functional as F

class TransformerClassifier(nn.Module):
    def __init__(
        self,
        vocab_size: int,
        d_model: int = 512,
        nhead: int = 8,
        num_layers: int = 6,
        num_classes: int = 10,
        dropout: float = 0.1
    ):
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=d_model * 4,
            dropout=dropout,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
        
        self.classifier = nn.Sequential(
            nn.Linear(d_model, d_model // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(d_model // 2, num_classes)
        )
        
        self._init_weights()
    
    def _init_weights(self):
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)
    
    def forward(self, x, attention_mask=None):
        x = self.embedding(x)
        x = self.pos_encoder(x)
        
        # Convert attention mask for transformer
        if attention_mask is not None:
            attention_mask = attention_mask == 0
        
        x = self.transformer(x, src_key_padding_mask=attention_mask)
        
        # Pool over sequence
        x = x.mean(dim=1)
        
        return self.classifier(x)

Mixed Precision Training

Reduce memory and speed up training with fp16:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from torch.cuda.amp import autocast, GradScaler

def train_epoch(model, dataloader, optimizer, criterion, scaler, device):
    model.train()
    total_loss = 0
    
    for batch in dataloader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        
        optimizer.zero_grad()
        
        # Mixed precision forward pass
        with autocast():
            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, labels)
        
        # Scaled backward pass
        scaler.scale(loss).backward()
        
        # Gradient clipping
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        scaler.step(optimizer)
        scaler.update()
        
        total_loss += loss.item()
    
    return total_loss / len(dataloader)

# Usage
scaler = GradScaler()
for epoch in range(num_epochs):
    loss = train_epoch(model, train_loader, optimizer, criterion, scaler, device)

Distributed Data Parallel Training

Train on multiple GPUs:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

def setup_distributed(rank, world_size):
    dist.init_process_group(
        backend="nccl",
        init_method="env://",
        world_size=world_size,
        rank=rank
    )
    torch.cuda.set_device(rank)

def train_distributed(rank, world_size, config):
    setup_distributed(rank, world_size)
    
    # Create model and move to GPU
    model = TransformerClassifier(**config["model"]).to(rank)
    model = DDP(model, device_ids=[rank])
    
    # Distributed sampler
    train_sampler = DistributedSampler(train_dataset, shuffle=True)
    train_loader = DataLoader(
        train_dataset,
        batch_size=config["batch_size"],
        sampler=train_sampler,
        num_workers=4,
        pin_memory=True
    )
    
    optimizer = torch.optim.AdamW(model.parameters(), lr=config["lr"])
    
    for epoch in range(config["epochs"]):
        train_sampler.set_epoch(epoch)  # Important for shuffling
        train_epoch(model, train_loader, optimizer, criterion, scaler, rank)
        
        # Only save from rank 0
        if rank == 0:
            save_checkpoint(model.module, optimizer, epoch)
    
    dist.destroy_process_group()

# Launch
if __name__ == "__main__":
    world_size = torch.cuda.device_count()
    torch.multiprocessing.spawn(
        train_distributed,
        args=(world_size, config),
        nprocs=world_size
    )

Learning Rate Scheduling

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from torch.optim.lr_scheduler import OneCycleLR, CosineAnnealingWarmRestarts

# OneCycleLR - good for training from scratch
scheduler = OneCycleLR(
    optimizer,
    max_lr=1e-3,
    epochs=num_epochs,
    steps_per_epoch=len(train_loader),
    pct_start=0.1,  # 10% warmup
    anneal_strategy="cos"
)

# Step scheduler after each batch
for batch in train_loader:
    loss = train_step(batch)
    optimizer.step()
    scheduler.step()

# Cosine annealing with restarts
scheduler = CosineAnnealingWarmRestarts(
    optimizer,
    T_0=10,  # Restart every 10 epochs
    T_mult=2  # Double period after each restart
)

Checkpointing

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def save_checkpoint(model, optimizer, scheduler, epoch, path):
    torch.save({
        "epoch": epoch,
        "model_state_dict": model.state_dict(),
        "optimizer_state_dict": optimizer.state_dict(),
        "scheduler_state_dict": scheduler.state_dict(),
    }, path)

def load_checkpoint(model, optimizer, scheduler, path):
    checkpoint = torch.load(path)
    model.load_state_dict(checkpoint["model_state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
    scheduler.load_state_dict(checkpoint["scheduler_state_dict"])
    return checkpoint["epoch"]

Logging with Weights & Biases

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import wandb

wandb.init(project="my-project", config=config)

for epoch in range(num_epochs):
    train_loss = train_epoch(...)
    val_loss, val_acc = validate(...)
    
    wandb.log({
        "epoch": epoch,
        "train_loss": train_loss,
        "val_loss": val_loss,
        "val_accuracy": val_acc,
        "learning_rate": scheduler.get_last_lr()[0]
    })

# Log model artifact
wandb.save("model_best.pt")

Full Training Script

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import torch
import yaml
from pathlib import Path

def main():
    # Load config
    with open("configs/experiment.yaml") as f:
        config = yaml.safe_load(f)
    
    # Setup
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Data
    train_dataset = TextDataset(Path(config["data"]["train"]), tokenizer)
    val_dataset = TextDataset(Path(config["data"]["val"]), tokenizer)
    
    train_loader = create_dataloader(train_dataset, config["batch_size"], True)
    val_loader = create_dataloader(val_dataset, config["batch_size"], False)
    
    # Model
    model = TransformerClassifier(**config["model"]).to(device)
    
    # Optimizer
    optimizer = torch.optim.AdamW(
        model.parameters(),
        lr=config["lr"],
        weight_decay=config["weight_decay"]
    )
    
    scheduler = OneCycleLR(
        optimizer,
        max_lr=config["lr"],
        epochs=config["epochs"],
        steps_per_epoch=len(train_loader)
    )
    
    criterion = nn.CrossEntropyLoss()
    scaler = GradScaler()
    
    best_val_loss = float("inf")
    
    for epoch in range(config["epochs"]):
        train_loss = train_epoch(
            model, train_loader, optimizer, 
            criterion, scaler, device, scheduler
        )
        val_loss, val_acc = validate(model, val_loader, criterion, device)
        
        print(f"Epoch {epoch}: train_loss={train_loss:.4f}, "
              f"val_loss={val_loss:.4f}, val_acc={val_acc:.4f}")
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            save_checkpoint(model, optimizer, scheduler, epoch, "best_model.pt")

if __name__ == "__main__":
    main()

Conclusion

Production PyTorch training requires:

  • Efficient data loading with proper parallelization
  • Mixed precision for memory and speed
  • Distributed training for multi-GPU scaling
  • Proper logging and checkpointing

At Sajima Solutions, we train and deploy custom models for businesses across the Gulf. Contact us to discuss your machine learning needs.