Text Generation
Transformers
PyTorch
English
experimental
research
bit-level
transformer
reversible
safety
telemetry
language-modeling
Instructions to use WCNegentropy/BitTransformerLM with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use WCNegentropy/BitTransformerLM with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("text-generation", model="WCNegentropy/BitTransformerLM")# Load model directly from transformers import AutoModel model = AutoModel.from_pretrained("WCNegentropy/BitTransformerLM", dtype="auto") - Notebooks
- Google Colab
- Kaggle
- Local Apps
- vLLM
How to use WCNegentropy/BitTransformerLM with vLLM:
Install from pip and serve model
# Install vLLM from pip: pip install vllm # Start the vLLM server: vllm serve "WCNegentropy/BitTransformerLM" # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:8000/v1/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "WCNegentropy/BitTransformerLM", "prompt": "Once upon a time,", "max_tokens": 512, "temperature": 0.5 }'Use Docker
docker model run hf.co/WCNegentropy/BitTransformerLM
- SGLang
How to use WCNegentropy/BitTransformerLM with SGLang:
Install from pip and serve model
# Install SGLang from pip: pip install sglang # Start the SGLang server: python3 -m sglang.launch_server \ --model-path "WCNegentropy/BitTransformerLM" \ --host 0.0.0.0 \ --port 30000 # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:30000/v1/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "WCNegentropy/BitTransformerLM", "prompt": "Once upon a time,", "max_tokens": 512, "temperature": 0.5 }'Use Docker images
docker run --gpus all \ --shm-size 32g \ -p 30000:30000 \ -v ~/.cache/huggingface:/root/.cache/huggingface \ --env "HF_TOKEN=<secret>" \ --ipc=host \ lmsysorg/sglang:latest \ python3 -m sglang.launch_server \ --model-path "WCNegentropy/BitTransformerLM" \ --host 0.0.0.0 \ --port 30000 # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:30000/v1/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "WCNegentropy/BitTransformerLM", "prompt": "Once upon a time,", "max_tokens": 512, "temperature": 0.5 }' - Docker Model Runner
How to use WCNegentropy/BitTransformerLM with Docker Model Runner:
docker model run hf.co/WCNegentropy/BitTransformerLM
| #!/usr/bin/env python3 | |
| """ | |
| Final Breakthrough BitTransformerLM Training Script | |
| ================================================= | |
| The complete training script using the ACTUAL BitTransformerLM model | |
| with the breakthrough Fixed RL Adafactor configuration and full | |
| HuggingFace dataset support with checkpoint resumption. | |
| """ | |
| import sys | |
| import os | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Optional, Dict, Any | |
| import torch | |
| import torch.nn.functional as F | |
| from datasets import load_dataset | |
| from huggingface_hub import login | |
| # Add paths for imports | |
| sys.path.append('/data') | |
| sys.path.append('/data/BitTransformerLM') | |
| from bit_transformer import BitTransformerLM, text_to_bits | |
| from BTLM_Extensions import configure_adafactor_optimizer | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('/data/BitTransformerLM/breakthrough_training.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class BreakthroughTrainer: | |
| """Production-grade BitTransformerLM trainer with breakthrough configuration.""" | |
| def __init__(self, config: Dict[str, Any]): | |
| self.config = config | |
| self.device = torch.device('cpu') # CPU training as per breakthrough | |
| self.model = None | |
| self.optimizer = None | |
| self.scheduler = None | |
| self.dataset = None | |
| self.checkpoint_dir = Path(config['checkpoint_dir']) | |
| self.checkpoint_dir.mkdir(parents=True, exist_ok=True) | |
| # Training state | |
| self.current_epoch = 0 | |
| self.total_steps = 0 | |
| self.best_loss = float('inf') | |
| self.training_history = [] | |
| def load_and_prepare_dataset(self): | |
| """Load HF dataset and convert to proper bit tensors.""" | |
| logger.info("Loading WCNegentropy/BitTransformerLM dataset...") | |
| # Login to HuggingFace | |
| login(token=self.config['hf_token']) | |
| # Load dataset | |
| dataset = load_dataset("WCNegentropy/BitTransformerLM") | |
| train_data = dataset['train'] | |
| logger.info(f"Dataset loaded: {len(train_data)} samples") | |
| # Process dataset - convert to bits using the ACTUAL text_to_bits function | |
| bit_sequences = [] | |
| for i, sample in enumerate(train_data): | |
| if i % 1000 == 0: | |
| logger.info(f"Processing sample {i}/{len(train_data)}") | |
| # Try to get text from various fields | |
| text = None | |
| if 'original_text' in sample and sample['original_text']: | |
| text = sample['original_text'] | |
| elif 'text' in sample and sample['text']: | |
| text = sample['text'] | |
| if text and text.strip(): | |
| # Use ACTUAL text_to_bits function | |
| bits = text_to_bits(text) | |
| if len(bits) >= self.config['sequence_length']: | |
| bit_sequences.append(bits) | |
| logger.info(f"Processed {len(bit_sequences)} valid bit sequences") | |
| # Create training sequences with proper length | |
| seq_len = self.config['sequence_length'] | |
| training_sequences = [] | |
| for bits in bit_sequences: | |
| # Create overlapping chunks | |
| for i in range(0, len(bits) - seq_len + 1, seq_len // 2): | |
| chunk = bits[i:i + seq_len] | |
| if len(chunk) == seq_len: | |
| training_sequences.append(chunk) | |
| # Convert to tensor with proper dtype | |
| self.dataset = torch.tensor(training_sequences, dtype=torch.long) | |
| logger.info(f"Created training dataset: {self.dataset.shape}") | |
| return self.dataset | |
| def create_breakthrough_model(self): | |
| """Create the EXACT breakthrough 16M parameter BitTransformerLM.""" | |
| logger.info("Creating breakthrough 16M parameter BitTransformerLM...") | |
| # BREAKTHROUGH CONFIGURATION - exactly as identified before | |
| self.model = BitTransformerLM( | |
| d_model=512, # Breakthrough config | |
| nhead=16, # 16 attention heads | |
| num_layers=8, # 8 layers for ~16M params | |
| dim_feedforward=1024, # 2x d_model | |
| max_seq_len=self.config['sequence_length'], | |
| lambda_K=0.05, # Safety telemetry weights | |
| lambda_C=0.05, | |
| lambda_S=0.05, | |
| reversible=True, # Memory efficiency | |
| use_checkpoint=True, # Gradient checkpointing | |
| use_autocast=True, # CPU mixed precision | |
| use_act=True, # Adaptive Computation Time | |
| act_threshold=0.9 | |
| ).to(self.device) | |
| # Calculate and verify parameter count | |
| total_params = sum(p.numel() for p in self.model.parameters()) | |
| trainable_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad) | |
| logger.info(f"Model created: {total_params:,} total parameters ({trainable_params:,} trainable)") | |
| logger.info(f"Target: ~16M parameters - {'โ' if 15_000_000 <= total_params <= 17_000_000 else 'โ'}") | |
| return self.model | |
| def setup_optimizer(self): | |
| """Setup Fixed RL Adafactor optimizer (the breakthrough secret sauce).""" | |
| logger.info("Setting up Fixed RL Adafactor optimizer...") | |
| # Calculate total steps | |
| steps_per_epoch = len(self.dataset) // self.config['batch_size'] | |
| total_steps = steps_per_epoch * self.config['num_epochs'] | |
| # CRITICAL: Use FIXED LR, not auto-LR (the breakthrough discovery!) | |
| self.optimizer, self.scheduler = configure_adafactor_optimizer( | |
| self.model, | |
| lr=self.config['learning_rate'], # FIXED LR - key to breakthrough! | |
| weight_decay=self.config['weight_decay'], | |
| total_steps=total_steps | |
| ) | |
| logger.info(f"Fixed RL Adafactor configured with LR={self.config['learning_rate']}") | |
| logger.info(f"Total training steps: {total_steps}") | |
| return self.optimizer, self.scheduler | |
| def save_checkpoint(self, epoch: int, loss: float, is_best: bool = False): | |
| """Save complete model checkpoint with all training state.""" | |
| checkpoint_data = { | |
| 'epoch': epoch, | |
| 'total_steps': self.total_steps, | |
| 'model_state_dict': self.model.state_dict(), | |
| 'optimizer_state_dict': self.optimizer.state_dict(), | |
| 'scheduler_state_dict': self.scheduler.state_dict() if self.scheduler else None, | |
| 'loss': loss, | |
| 'best_loss': self.best_loss, | |
| 'config': self.config, | |
| 'training_history': self.training_history, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'model_config': self.model._current_params() # Save model hyperparams | |
| } | |
| # Save latest checkpoint | |
| latest_path = self.checkpoint_dir / 'checkpoint_latest.pt' | |
| torch.save(checkpoint_data, latest_path) | |
| logger.info(f"Saved checkpoint: {latest_path}") | |
| # Save epoch-specific checkpoint | |
| epoch_path = self.checkpoint_dir / f'checkpoint_epoch_{epoch:04d}.pt' | |
| torch.save(checkpoint_data, epoch_path) | |
| # Save best model if this is the best loss | |
| if is_best: | |
| best_path = self.checkpoint_dir / 'checkpoint_best.pt' | |
| torch.save(checkpoint_data, best_path) | |
| logger.info(f"๐ NEW BEST MODEL! Loss: {loss:.6f} -> {best_path}") | |
| # Save training config for reference | |
| config_path = self.checkpoint_dir / 'training_config.json' | |
| with open(config_path, 'w') as f: | |
| json.dump(self.config, f, indent=2) | |
| def load_checkpoint(self, checkpoint_path: Optional[str] = None) -> bool: | |
| """Load checkpoint if available and resume training.""" | |
| if checkpoint_path is None: | |
| checkpoint_path = self.checkpoint_dir / 'checkpoint_latest.pt' | |
| checkpoint_path = Path(checkpoint_path) | |
| if not checkpoint_path.exists(): | |
| logger.info("No checkpoint found - starting fresh training") | |
| return False | |
| logger.info(f"Loading checkpoint: {checkpoint_path}") | |
| try: | |
| checkpoint = torch.load(checkpoint_path, map_location=self.device) | |
| # Load model state | |
| self.model.load_state_dict(checkpoint['model_state_dict']) | |
| # Load optimizer state | |
| self.optimizer.load_state_dict(checkpoint['optimizer_state_dict']) | |
| # Load scheduler state | |
| if self.scheduler and checkpoint.get('scheduler_state_dict'): | |
| self.scheduler.load_state_dict(checkpoint['scheduler_state_dict']) | |
| # Load training state | |
| self.current_epoch = checkpoint['epoch'] | |
| self.total_steps = checkpoint['total_steps'] | |
| self.best_loss = checkpoint['best_loss'] | |
| self.training_history = checkpoint.get('training_history', []) | |
| logger.info(f"โ Resumed from epoch {self.current_epoch}, best loss: {self.best_loss:.6f}") | |
| logger.info(f"Total steps completed: {self.total_steps}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to load checkpoint: {e}") | |
| return False | |
| def training_step(self, batch: torch.Tensor) -> Dict[str, float]: | |
| """Single training step following the ACTUAL model pattern.""" | |
| batch = batch.to(self.device) | |
| # Zero gradients | |
| self.optimizer.zero_grad() | |
| # Forward pass - EXACTLY like the working basic_training.py | |
| logits, telemetry = self.model(batch) | |
| # Loss calculation - EXACTLY like example_training_step | |
| pred = logits[:, :-1, :].reshape(-1, 2) | |
| target = batch[:, 1:].reshape(-1) | |
| loss = F.cross_entropy(pred, target) | |
| # Backward pass | |
| loss.backward() | |
| # Gradient clipping | |
| torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config['max_grad_norm']) | |
| # Optimizer step | |
| self.optimizer.step() | |
| if self.scheduler: | |
| self.scheduler.step() | |
| self.total_steps += 1 | |
| # Extract telemetry values properly | |
| metrics = {'loss': loss.item()} | |
| if telemetry: | |
| for key, value in telemetry.items(): | |
| if torch.is_tensor(value): | |
| metrics[key] = value.mean().item() | |
| else: | |
| metrics[key] = value | |
| return metrics | |
| def train_epoch(self) -> Dict[str, float]: | |
| """Train for one complete epoch.""" | |
| logger.info(f"Starting epoch {self.current_epoch + 1}") | |
| # Use EXACT same pattern as working basic_training.py | |
| self.model.train() | |
| epoch_losses = [] | |
| # Simple batching - EXACTLY like working basic_training.py | |
| batch_size = self.config['batch_size'] | |
| for i in range(0, len(self.dataset), batch_size): | |
| batch = self.dataset[i:i + batch_size] | |
| if len(batch) < batch_size: | |
| continue # Skip incomplete batches | |
| batch = batch.to(self.device) | |
| # Zero gradients | |
| self.optimizer.zero_grad() | |
| # Forward pass - EXACTLY like working basic_training.py | |
| logits, telemetry = self.model(batch) | |
| # Loss calculation - EXACTLY like working basic_training.py | |
| pred = logits[:, :-1, :].reshape(-1, 2) | |
| target = batch[:, 1:].reshape(-1) | |
| loss = F.cross_entropy(pred, target) | |
| # Backward pass | |
| loss.backward() | |
| # Gradient clipping | |
| torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config['max_grad_norm']) | |
| # Optimizer step | |
| self.optimizer.step() | |
| if self.scheduler: | |
| self.scheduler.step() | |
| self.total_steps += 1 | |
| epoch_losses.append(loss.item()) | |
| # Calculate epoch averages - simplified like basic_training.py | |
| avg_loss = sum(epoch_losses) / len(epoch_losses) if epoch_losses else float('inf') | |
| epoch_summary = { | |
| 'epoch': self.current_epoch + 1, | |
| 'avg_loss': avg_loss | |
| } | |
| self.training_history.append(epoch_summary) | |
| logger.info( | |
| f"Epoch {self.current_epoch + 1} completed: " | |
| f"Avg Loss={avg_loss:.6f}" | |
| ) | |
| return epoch_summary | |
| def train(self): | |
| """Main training loop.""" | |
| logger.info("๐ STARTING BREAKTHROUGH BITRANSFORMERLM TRAINING!") | |
| logger.info("Configuration: Fixed RL Adafactor + 16M parameters + CPU training") | |
| start_epoch = self.current_epoch | |
| for epoch in range(start_epoch, self.config['num_epochs']): | |
| try: | |
| # Train epoch | |
| epoch_metrics = self.train_epoch() | |
| avg_loss = epoch_metrics['avg_loss'] | |
| # Check if this is the best model | |
| is_best = avg_loss < self.best_loss | |
| if is_best: | |
| self.best_loss = avg_loss | |
| # Save checkpoint after each epoch | |
| self.save_checkpoint(self.current_epoch + 1, avg_loss, is_best) | |
| self.current_epoch += 1 | |
| # Log progress | |
| logger.info(f"=== EPOCH {self.current_epoch} COMPLETE ===") | |
| logger.info(f"Loss: {avg_loss:.6f} (best: {self.best_loss:.6f})") | |
| # Check for breakthrough performance (loss < 3.0) | |
| if avg_loss < 3.0: | |
| logger.info("๐ BREAKTHROUGH PERFORMANCE ACHIEVED! Loss < 3.0!") | |
| except KeyboardInterrupt: | |
| logger.info("Training interrupted by user") | |
| # Save checkpoint before exiting | |
| try: | |
| self.save_checkpoint(self.current_epoch, float('inf'), False) | |
| except: | |
| pass | |
| break | |
| except Exception as e: | |
| logger.error(f"Error in epoch {self.current_epoch + 1}: {e}") | |
| # Save emergency checkpoint | |
| try: | |
| self.save_checkpoint(self.current_epoch, float('inf'), False) | |
| except: | |
| pass | |
| raise | |
| def main(): | |
| """Main function to run breakthrough training.""" | |
| # BREAKTHROUGH TRAINING CONFIGURATION | |
| config = { | |
| # Model parameters (breakthrough configuration) | |
| 'sequence_length': 512, | |
| # Training parameters | |
| 'learning_rate': 1e-3, # FIXED LR - key to breakthrough! | |
| 'weight_decay': 0.01, | |
| 'batch_size': 4, # Adjust based on memory | |
| 'num_epochs': 50, # Full training run | |
| 'max_grad_norm': 1.0, | |
| # Data parameters | |
| 'hf_token': None, # Set via environment variable HF_TOKEN | |
| # Logging and checkpointing | |
| 'log_interval': 100, | |
| 'checkpoint_dir': '/data/BitTransformerLM/checkpoints', | |
| } | |
| # Create trainer | |
| trainer = BreakthroughTrainer(config) | |
| # Setup all components | |
| logger.info("Setting up training components...") | |
| trainer.load_and_prepare_dataset() | |
| trainer.create_breakthrough_model() | |
| trainer.setup_optimizer() | |
| # Try to resume from checkpoint | |
| trainer.load_checkpoint() | |
| # Start training | |
| trainer.train() | |
| logger.info("๐ BREAKTHROUGH TRAINING COMPLETED!") | |
| logger.info(f"Best loss achieved: {trainer.best_loss:.6f}") | |
| logger.info(f"Checkpoints saved to: {trainer.checkpoint_dir}") | |
| if __name__ == "__main__": | |
| main() |