Data Pipeline

Data Pipeline

The NanoChat data pipeline provides efficient distributed loading and tokenization of pretraining data. The system streams text data from parquet files, tokenizes it on-the-fly, and yields properly formatted training batches. This document covers the data format, loading mechanisms, and distributed processing capabilities.

Overview

Source: nanochat/dataloader.py:1-8

python
from collections import deque

import torch
import pyarrow.parquet as pq

from nanochat.common import get_dist_info
from nanochat.dataset import list_parquet_files
from nanochat.tokenizer import get_tokenizer

The data pipeline consists of three main components:

  1. Dataset Management: Downloading and organizing parquet files
  2. Distributed Loading: Reading data across multiple processes/GPUs
  3. Tokenization Pipeline: Converting text to tokens with proper batching

Dataset Format and Management

Dataset Structure

Source: nanochat/dataset.py:15-25

python
# The URL on the internet where the data is hosted and downloaded from on demand
BASE_URL = "https://huggingface.co/datasets/karpathy/fineweb-edu-100b-shuffle/resolve/main"
MAX_SHARD = 1822 # the last datashard is shard_01822.parquet
index_to_filename = lambda index: f"shard_{index:05d}.parquet" # format of the filenames
base_dir = get_base_dir()
DATA_DIR = os.path.join(base_dir, "base_data")
os.makedirs(DATA_DIR, exist_ok=True)

The dataset consists of:

  • 1,822 parquet shards containing preprocessed text
  • ~250M characters per shard (compressed to ~100MB)
  • Row groups of 1024 documents for efficient loading
  • Hosted on HuggingFace for on-demand download

Dataset Preparation

The original dataset preparation process is documented for reference:

Source: dev/repackage_data_reference.py:1-25

python
"""
Repackage the FinewebEdu-100B dataset into shards:

- each shard is ~100MB in size (after zstd compression)
- parquets are written with row group size of 1000
- shuffle the dataset

This will be uploaded to HuggingFace for hosting.
The big deal is that our DataLoader will be able to stream
the data and cache it along the way on disk, decreasing the
training latency.
"""

# Source dataset
dataset_kwargs = {
    "path": "HuggingFaceFW/fineweb-edu",
    "split": "train", 
    "name": "sample-100BT", # ~100B GPT-2 tokens
}
ds = load_dataset(**dataset_kwargs)
ds = ds.shuffle(seed=42)  # Shuffle to scramble the order

File Management

Source: nanochat/dataset.py:30-42

python
def list_parquet_files(data_dir=None):
    """ Looks into a data dir and returns full paths to all parquet files. """
    data_dir = DATA_DIR if data_dir is None else data_dir
    parquet_files = sorted([
        f for f in os.listdir(data_dir)
        if f.endswith('.parquet') and not f.endswith('.tmp')
    ])
    parquet_paths = [os.path.join(data_dir, f) for f in parquet_files]
    return parquet_paths

Distributed Data Loading

Main Data Loader Function

The core data loader supports distributed training with resumption capabilities:

Source: nanochat/dataloader.py:9-23

python
def tokenizing_distributed_data_loader_with_state(B, T, split, tokenizer_threads=4, 
                                                tokenizer_batch_size=128, device="cuda", 
                                                resume_state_dict=None):
    """
    Stream pretraining text from parquet files, tokenize, yield training batches.

    This implementation supports approximate resume training.
    Instead of turning this into a Class, we return the state_dict with every batch,
    and the caller can pass in a state_dict to resume training from a desired point.
    Note that this resumption is only *approximate* for simplicity.
    We won't repeat the same documents but we might skip a few.

    Perfect state resumption is possible but would be a lot more bloated, probably not worth it.
    """
    assert split in ["train", "val"], "split must be 'train' or 'val'"

Parameters

  • B, T: Batch size and sequence length for output tensors
  • split: "train" or "val" (uses different parquet files)
  • tokenizer_threads: Number of threads for parallel tokenization
  • tokenizer_batch_size: Number of documents to tokenize together
  • device: Target device for tensors
  • resume_state_dict: Optional state for resuming training

Distributed Iteration Logic

Source: nanochat/dataloader.py:27-50

python
def document_batches():
    parquet_paths = list_parquet_files()
    assert len(parquet_paths) != 0, "No dataset parquet files found, did you run dataset.py?"
    parquet_paths = parquet_paths[:-1] if split == "train" else parquet_paths[-1:]
    resume_pq_idx = resume_state_dict["pq_idx"] if resume_state_dict is not None else 0
    resume_rg_idx = resume_state_dict["rg_idx"] if resume_state_dict is not None else None
    first_pass = True
    pq_idx = resume_pq_idx
    
    while True:  # iterate infinitely (multi-epoch)
        pq_idx = resume_pq_idx if first_pass else 0
        while pq_idx < len(parquet_paths):  # iterate over all parquet files
            filepath = parquet_paths[pq_idx]
            pf = pq.ParquetFile(filepath)
            
            # Handle distributed processing and resumption
            if first_pass and (resume_rg_idx is not None) and (pq_idx == resume_pq_idx):
                base_idx = resume_rg_idx // ddp_world_size
                base_idx += 1  # advance by 1 to avoid repeating data
                rg_idx = base_idx * ddp_world_size + ddp_rank
                if rg_idx >= pf.num_row_groups:
                    pq_idx += 1
                    continue
                resume_rg_idx = None
            else:
                rg_idx = ddp_rank

Row Group Processing

Source: nanochat/dataloader.py:55-68

python
while rg_idx < pf.num_row_groups:
    rg = pf.read_row_group(rg_idx)
    batch = rg.column('text').to_pylist()  # each batch is a parquet group, e.g. 1024 rows
    
    # the tokenizer encode might want to go in even smaller batches, e.g. 128 rows
    for i in range(0, len(batch), tokenizer_batch_size):
        yield batch[i:i+tokenizer_batch_size], (pq_idx, rg_idx)
    
    rg_idx += ddp_world_size  # advance to the next row group (in DDP)
pq_idx += 1  # advance to the next parquet file

Each process reads different row groups within each parquet file:

  • Process 0: Reads row groups 0, 8, 16, ... (with world_size=8)
  • Process 1: Reads row groups 1, 9, 17, ...
  • Process N: Reads row groups N, N+8, N+16, ...

Tokenization Pipeline

Token Buffer Management

Source: nanochat/dataloader.py:72-88

python
# Now emit batches of tokens.
needed_tokens = B * T + 1  # +1 is because we also need the target at the last token
# get the tokenizer and the bos token
tokenizer = get_tokenizer()
bos_token = tokenizer.get_bos_token_id()
# scratch buffer holds the tokens for one iteration
token_buffer = deque()  # we stream tokens on the right and pop from the left

while True:
    # Accumulate enough tokens for one iteration before yielding.
    while len(token_buffer) < needed_tokens:
        doc_batch, (pq_idx, rg_idx) = next(batches)
        token_lists = tokenizer.encode(doc_batch, prepend=bos_token, num_threads=tokenizer_threads)
        for tokens in token_lists:
            token_buffer.extend(tokens)

The tokenizer processes documents in batches:

  1. Document batches are read from parquet files
  2. Parallel tokenization processes multiple documents simultaneously
  3. BOS tokens are prepended to each document
  4. Token buffer accumulates tokens until enough for one training batch

Batch Construction

Source: nanochat/dataloader.py:75-95

python
# Move tokens from the deque into the scratch buffer
tokens = [token_buffer.popleft() for _ in range(needed_tokens)]

# CUDA supports memory pinning for asynchronous transfers between CPU and GPU
use_cuda_optimizations = device == "cuda"
scratch = torch.tensor(tokens, dtype=torch.long, pin_memory=use_cuda_optimizations)

# Create the inputs/targets as 1D tensors
inputs_cpu = scratch[:-1]
targets_cpu = scratch[1:]

# Reshape to 2D and move to GPU async
inputs = inputs_cpu.view(B, T).to(device=device, non_blocking=use_cuda_optimizations)
targets = targets_cpu.view(B, T).to(device=device, non_blocking=use_cuda_optimizations)

state_dict = {"pq_idx": pq_idx, "rg_idx": rg_idx}  # for approximate resume training
yield inputs, targets, state_dict

Memory Optimizations

  1. Pinned Memory: Used for faster CPU->GPU transfers when available
  2. Asynchronous Transfer: Non-blocking GPU transfers overlap with computation
  3. Deque Buffer: Efficient token streaming with O(1) append/pop operations
  4. Memory Layout: Reshaping from 1D to 2D avoids unnecessary copies

Simplified Interface

Source: nanochat/dataloader.py:92-95

python
def tokenizing_distributed_data_loader(*args, **kwargs):
    # helper function that only emits the inputs/targets and not the state_dict
    for inputs, targets, state_dict in tokenizing_distributed_data_loader_with_state(*args, **kwargs):
        yield inputs, targets

Dataset Iteration Utilities

Batched Parquet Reading

Source: nanochat/dataset.py:44-55

python
def parquets_iter_batched(split, start=0, step=1):
    """
    Iterate through the dataset, in batches of underlying row_groups for efficiency.
    - split can be "train" or "val". the last parquet file will be val.
    - start/step are useful for skipping rows in DDP. e.g. start=rank, step=world_size
    """
    assert split in ["train", "val"], "split must be 'train' or 'val'"
    parquet_paths = list_parquet_files()
    parquet_paths = parquet_paths[:-1] if split == "train" else parquet_paths[-1:]
    for filepath in parquet_paths:
        pf = pq.ParquetFile(filepath)
        for rg_idx in range(start, pf.num_row_groups, step):
            rg = pf.read_row_group(rg_idx)
            texts = rg.column('text').to_pylist()
            yield texts

This utility function is used by tokenizer training and other preprocessing tasks.

Integration with Training

Base Training Integration

Source: scripts/base_train.py:200-205

python
# Initialize the DataLoaders for train/val
dataloader_resume_state_dict = None if not resuming else meta_data["dataloader_state_dict"]
train_loader = tokenizing_distributed_data_loader_with_state(
    args.device_batch_size, args.max_seq_len, split="train", 
    device=device, resume_state_dict=dataloader_resume_state_dict
)
build_val_loader = lambda: tokenizing_distributed_data_loader(
    args.device_batch_size, args.max_seq_len, split="val", device=device
)
x, y, dataloader_state_dict = next(train_loader)  # kick off load of the very first batch

Resumption Support

The data loader supports approximate training resumption by tracking:

  • pq_idx: Current parquet file index
  • rg_idx: Current row group index within the file

This state is saved with model checkpoints and can be used to resume training from approximately the same data position.

Data Download and Management

On-Demand Download

Source: nanochat/dataset.py:58-85

python
def download_single_file(index):
    """ Downloads a single file index, with some backoff """
    
    # Construct the local filepath for this file and skip if it already exists
    filename = index_to_filename(index)
    filepath = os.path.join(DATA_DIR, filename)
    if os.path.exists(filepath):
        print(f"Skipping {filepath} (already exists)")
        return True

    # Construct the remote URL for this file
    url = f"{BASE_URL}/{filename}"
    print(f"Downloading {filename}...")

    # Download with retries
    max_attempts = 5
    for attempt in range(1, max_attempts + 1):
        try:
            response = requests.get(url, stream=True, timeout=30)
            response.raise_for_status()
            
            # Write to temporary file first, then rename
            temp_filepath = filepath + ".tmp"
            with open(temp_filepath, "wb") as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
            os.rename(temp_filepath, filepath)
            return True
            
        except Exception as e:
            print(f"Attempt {attempt} failed: {e}")
            if attempt < max_attempts:
                time.sleep(2 ** attempt)  # exponential backoff
    return False

Usage in Training Scripts

Training scripts typically download the required number of shards before starting:

Source: speedrun.sh:55-76

bash
# Download the first ~2B characters of pretraining dataset
python -m nanochat.dataset -n 8

# Immediately kick off downloading more shards in the background
# The d20 model needs ~240 shards for Chinchilla-optimal training
python -m nanochat.dataset -n 240 &
DATASET_DOWNLOAD_PID=$!

# Train tokenizer while downloading continues
python -m scripts.tok_train --max_chars=2000000000 --vocab_size=65536

# Wait for full dataset download to complete
wait $DATASET_DOWNLOAD_PID

Performance Characteristics

Throughput Optimization

  1. Streaming: Data is processed as it's read, avoiding large memory usage
  2. Parallel Tokenization: Multi-threaded encoding of text batches
  3. Efficient Parquet Format: Columnar storage with compression
  4. Row Group Granularity: 1024 documents per row group for optimal I/O

Memory Efficiency

  1. Token Buffer: Circular buffer manages tokens efficiently
  2. Batch Processing: Documents are tokenized in configurable batch sizes
  3. Memory Pinning: Optimized GPU transfers when available
  4. Lazy Loading: Files are only read when needed

Distributed Scaling

  1. Process-level Parallelism: Each GPU process reads different data chunks
  2. Load Balancing: Row groups are distributed evenly across processes
  3. Minimal Coordination: No inter-process communication required
  4. Resumption Support: Training can continue from checkpoints

The data pipeline provides efficient, scalable data loading that supports both single-GPU and distributed training while maintaining simplicity and performance.


Sources:

  • nanochat/dataloader.py (distributed data loading implementation)
  • nanochat/dataset.py (dataset management and utilities)
  • dev/repackage_data_reference.py (dataset preparation documentation)
  • speedrun.sh (integration example)
Last updated: 1/10/2026