Data Pipeline
Data Pipeline
The NanoChat data pipeline provides efficient distributed loading and tokenization of pretraining data. The system streams text data from parquet files, tokenizes it on-the-fly, and yields properly formatted training batches. This document covers the data format, loading mechanisms, and distributed processing capabilities.
Overview
Source: nanochat/dataloader.py:1-8
from collections import deque
import torch
import pyarrow.parquet as pq
from nanochat.common import get_dist_info
from nanochat.dataset import list_parquet_files
from nanochat.tokenizer import get_tokenizer
The data pipeline consists of three main components:
- Dataset Management: Downloading and organizing parquet files
- Distributed Loading: Reading data across multiple processes/GPUs
- Tokenization Pipeline: Converting text to tokens with proper batching
Dataset Format and Management
Dataset Structure
Source: nanochat/dataset.py:15-25
# The URL on the internet where the data is hosted and downloaded from on demand
BASE_URL = "https://huggingface.co/datasets/karpathy/fineweb-edu-100b-shuffle/resolve/main"
MAX_SHARD = 1822 # the last datashard is shard_01822.parquet
index_to_filename = lambda index: f"shard_{index:05d}.parquet" # format of the filenames
base_dir = get_base_dir()
DATA_DIR = os.path.join(base_dir, "base_data")
os.makedirs(DATA_DIR, exist_ok=True)
The dataset consists of:
- 1,822 parquet shards containing preprocessed text
- ~250M characters per shard (compressed to ~100MB)
- Row groups of 1024 documents for efficient loading
- Hosted on HuggingFace for on-demand download
Dataset Preparation
The original dataset preparation process is documented for reference:
Source: dev/repackage_data_reference.py:1-25
"""
Repackage the FinewebEdu-100B dataset into shards:
- each shard is ~100MB in size (after zstd compression)
- parquets are written with row group size of 1000
- shuffle the dataset
This will be uploaded to HuggingFace for hosting.
The big deal is that our DataLoader will be able to stream
the data and cache it along the way on disk, decreasing the
training latency.
"""
# Source dataset
dataset_kwargs = {
"path": "HuggingFaceFW/fineweb-edu",
"split": "train",
"name": "sample-100BT", # ~100B GPT-2 tokens
}
ds = load_dataset(**dataset_kwargs)
ds = ds.shuffle(seed=42) # Shuffle to scramble the order
File Management
Source: nanochat/dataset.py:30-42
def list_parquet_files(data_dir=None):
""" Looks into a data dir and returns full paths to all parquet files. """
data_dir = DATA_DIR if data_dir is None else data_dir
parquet_files = sorted([
f for f in os.listdir(data_dir)
if f.endswith('.parquet') and not f.endswith('.tmp')
])
parquet_paths = [os.path.join(data_dir, f) for f in parquet_files]
return parquet_paths
Distributed Data Loading
Main Data Loader Function
The core data loader supports distributed training with resumption capabilities:
Source: nanochat/dataloader.py:9-23
def tokenizing_distributed_data_loader_with_state(B, T, split, tokenizer_threads=4,
tokenizer_batch_size=128, device="cuda",
resume_state_dict=None):
"""
Stream pretraining text from parquet files, tokenize, yield training batches.
This implementation supports approximate resume training.
Instead of turning this into a Class, we return the state_dict with every batch,
and the caller can pass in a state_dict to resume training from a desired point.
Note that this resumption is only *approximate* for simplicity.
We won't repeat the same documents but we might skip a few.
Perfect state resumption is possible but would be a lot more bloated, probably not worth it.
"""
assert split in ["train", "val"], "split must be 'train' or 'val'"
Parameters
- B, T: Batch size and sequence length for output tensors
- split: "train" or "val" (uses different parquet files)
- tokenizer_threads: Number of threads for parallel tokenization
- tokenizer_batch_size: Number of documents to tokenize together
- device: Target device for tensors
- resume_state_dict: Optional state for resuming training
Distributed Iteration Logic
Source: nanochat/dataloader.py:27-50
def document_batches():
parquet_paths = list_parquet_files()
assert len(parquet_paths) != 0, "No dataset parquet files found, did you run dataset.py?"
parquet_paths = parquet_paths[:-1] if split == "train" else parquet_paths[-1:]
resume_pq_idx = resume_state_dict["pq_idx"] if resume_state_dict is not None else 0
resume_rg_idx = resume_state_dict["rg_idx"] if resume_state_dict is not None else None
first_pass = True
pq_idx = resume_pq_idx
while True: # iterate infinitely (multi-epoch)
pq_idx = resume_pq_idx if first_pass else 0
while pq_idx < len(parquet_paths): # iterate over all parquet files
filepath = parquet_paths[pq_idx]
pf = pq.ParquetFile(filepath)
# Handle distributed processing and resumption
if first_pass and (resume_rg_idx is not None) and (pq_idx == resume_pq_idx):
base_idx = resume_rg_idx // ddp_world_size
base_idx += 1 # advance by 1 to avoid repeating data
rg_idx = base_idx * ddp_world_size + ddp_rank
if rg_idx >= pf.num_row_groups:
pq_idx += 1
continue
resume_rg_idx = None
else:
rg_idx = ddp_rank
Row Group Processing
Source: nanochat/dataloader.py:55-68
while rg_idx < pf.num_row_groups:
rg = pf.read_row_group(rg_idx)
batch = rg.column('text').to_pylist() # each batch is a parquet group, e.g. 1024 rows
# the tokenizer encode might want to go in even smaller batches, e.g. 128 rows
for i in range(0, len(batch), tokenizer_batch_size):
yield batch[i:i+tokenizer_batch_size], (pq_idx, rg_idx)
rg_idx += ddp_world_size # advance to the next row group (in DDP)
pq_idx += 1 # advance to the next parquet file
Each process reads different row groups within each parquet file:
- Process 0: Reads row groups 0, 8, 16, ... (with world_size=8)
- Process 1: Reads row groups 1, 9, 17, ...
- Process N: Reads row groups N, N+8, N+16, ...
Tokenization Pipeline
Token Buffer Management
Source: nanochat/dataloader.py:72-88
# Now emit batches of tokens.
needed_tokens = B * T + 1 # +1 is because we also need the target at the last token
# get the tokenizer and the bos token
tokenizer = get_tokenizer()
bos_token = tokenizer.get_bos_token_id()
# scratch buffer holds the tokens for one iteration
token_buffer = deque() # we stream tokens on the right and pop from the left
while True:
# Accumulate enough tokens for one iteration before yielding.
while len(token_buffer) < needed_tokens:
doc_batch, (pq_idx, rg_idx) = next(batches)
token_lists = tokenizer.encode(doc_batch, prepend=bos_token, num_threads=tokenizer_threads)
for tokens in token_lists:
token_buffer.extend(tokens)
The tokenizer processes documents in batches:
- Document batches are read from parquet files
- Parallel tokenization processes multiple documents simultaneously
- BOS tokens are prepended to each document
- Token buffer accumulates tokens until enough for one training batch
Batch Construction
Source: nanochat/dataloader.py:75-95
# Move tokens from the deque into the scratch buffer
tokens = [token_buffer.popleft() for _ in range(needed_tokens)]
# CUDA supports memory pinning for asynchronous transfers between CPU and GPU
use_cuda_optimizations = device == "cuda"
scratch = torch.tensor(tokens, dtype=torch.long, pin_memory=use_cuda_optimizations)
# Create the inputs/targets as 1D tensors
inputs_cpu = scratch[:-1]
targets_cpu = scratch[1:]
# Reshape to 2D and move to GPU async
inputs = inputs_cpu.view(B, T).to(device=device, non_blocking=use_cuda_optimizations)
targets = targets_cpu.view(B, T).to(device=device, non_blocking=use_cuda_optimizations)
state_dict = {"pq_idx": pq_idx, "rg_idx": rg_idx} # for approximate resume training
yield inputs, targets, state_dict
Memory Optimizations
- Pinned Memory: Used for faster CPU->GPU transfers when available
- Asynchronous Transfer: Non-blocking GPU transfers overlap with computation
- Deque Buffer: Efficient token streaming with O(1) append/pop operations
- Memory Layout: Reshaping from 1D to 2D avoids unnecessary copies
Simplified Interface
Source: nanochat/dataloader.py:92-95
def tokenizing_distributed_data_loader(*args, **kwargs):
# helper function that only emits the inputs/targets and not the state_dict
for inputs, targets, state_dict in tokenizing_distributed_data_loader_with_state(*args, **kwargs):
yield inputs, targets
Dataset Iteration Utilities
Batched Parquet Reading
Source: nanochat/dataset.py:44-55
def parquets_iter_batched(split, start=0, step=1):
"""
Iterate through the dataset, in batches of underlying row_groups for efficiency.
- split can be "train" or "val". the last parquet file will be val.
- start/step are useful for skipping rows in DDP. e.g. start=rank, step=world_size
"""
assert split in ["train", "val"], "split must be 'train' or 'val'"
parquet_paths = list_parquet_files()
parquet_paths = parquet_paths[:-1] if split == "train" else parquet_paths[-1:]
for filepath in parquet_paths:
pf = pq.ParquetFile(filepath)
for rg_idx in range(start, pf.num_row_groups, step):
rg = pf.read_row_group(rg_idx)
texts = rg.column('text').to_pylist()
yield texts
This utility function is used by tokenizer training and other preprocessing tasks.
Integration with Training
Base Training Integration
Source: scripts/base_train.py:200-205
# Initialize the DataLoaders for train/val
dataloader_resume_state_dict = None if not resuming else meta_data["dataloader_state_dict"]
train_loader = tokenizing_distributed_data_loader_with_state(
args.device_batch_size, args.max_seq_len, split="train",
device=device, resume_state_dict=dataloader_resume_state_dict
)
build_val_loader = lambda: tokenizing_distributed_data_loader(
args.device_batch_size, args.max_seq_len, split="val", device=device
)
x, y, dataloader_state_dict = next(train_loader) # kick off load of the very first batch
Resumption Support
The data loader supports approximate training resumption by tracking:
- pq_idx: Current parquet file index
- rg_idx: Current row group index within the file
This state is saved with model checkpoints and can be used to resume training from approximately the same data position.
Data Download and Management
On-Demand Download
Source: nanochat/dataset.py:58-85
def download_single_file(index):
""" Downloads a single file index, with some backoff """
# Construct the local filepath for this file and skip if it already exists
filename = index_to_filename(index)
filepath = os.path.join(DATA_DIR, filename)
if os.path.exists(filepath):
print(f"Skipping {filepath} (already exists)")
return True
# Construct the remote URL for this file
url = f"{BASE_URL}/{filename}"
print(f"Downloading {filename}...")
# Download with retries
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
# Write to temporary file first, then rename
temp_filepath = filepath + ".tmp"
with open(temp_filepath, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
os.rename(temp_filepath, filepath)
return True
except Exception as e:
print(f"Attempt {attempt} failed: {e}")
if attempt < max_attempts:
time.sleep(2 ** attempt) # exponential backoff
return False
Usage in Training Scripts
Training scripts typically download the required number of shards before starting:
Source: speedrun.sh:55-76
# Download the first ~2B characters of pretraining dataset
python -m nanochat.dataset -n 8
# Immediately kick off downloading more shards in the background
# The d20 model needs ~240 shards for Chinchilla-optimal training
python -m nanochat.dataset -n 240 &
DATASET_DOWNLOAD_PID=$!
# Train tokenizer while downloading continues
python -m scripts.tok_train --max_chars=2000000000 --vocab_size=65536
# Wait for full dataset download to complete
wait $DATASET_DOWNLOAD_PID
Performance Characteristics
Throughput Optimization
- Streaming: Data is processed as it's read, avoiding large memory usage
- Parallel Tokenization: Multi-threaded encoding of text batches
- Efficient Parquet Format: Columnar storage with compression
- Row Group Granularity: 1024 documents per row group for optimal I/O
Memory Efficiency
- Token Buffer: Circular buffer manages tokens efficiently
- Batch Processing: Documents are tokenized in configurable batch sizes
- Memory Pinning: Optimized GPU transfers when available
- Lazy Loading: Files are only read when needed
Distributed Scaling
- Process-level Parallelism: Each GPU process reads different data chunks
- Load Balancing: Row groups are distributed evenly across processes
- Minimal Coordination: No inter-process communication required
- Resumption Support: Training can continue from checkpoints
The data pipeline provides efficient, scalable data loading that supports both single-GPU and distributed training while maintaining simplicity and performance.
Sources:
- nanochat/dataloader.py (distributed data loading implementation)
- nanochat/dataset.py (dataset management and utilities)
- dev/repackage_data_reference.py (dataset preparation documentation)
- speedrun.sh (integration example)