Disaster Recovery with Git Reflog

David Childs
•
•

Master Git reflog to recover lost commits, undo destructive operations, and implement bulletproof disaster recovery strategies for your repositories.

Git reflog is your time machine and safety net rolled into one. After recovering countless "lost" commits and saving teams from disaster, I've learned that understanding reflog is the difference between panic and confidence when things go wrong. Here's your complete guide to Git's most powerful recovery tool.

Understanding Git Reflog

Reflog Fundamentals

# reflog_manager.py
import subprocess
import re
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from dataclasses import dataclass

@dataclass
class ReflogEntry:
    hash: str
    ref: str
    action: str
    message: str
    timestamp: datetime
    author: str

class ReflogManager:
    def __init__(self, repo_path: str = "."):
        self.repo_path = repo_path
    
    def parse_reflog(self, ref: str = "HEAD", days: int = 30) -> List[ReflogEntry]:
        """Parse reflog entries for analysis"""
        
        since_date = datetime.now() - timedelta(days=days)
        
        cmd = [
            "git", "reflog", "show", ref,
            "--format=%H|%gD|%gs|%s|%ai|%an",
            f"--since={since_date.isoformat()}"
        ]
        
        result = subprocess.run(
            cmd,
            cwd=self.repo_path,
            capture_output=True,
            text=True
        )
        
        entries = []
        for line in result.stdout.strip().split('\n'):
            if not line:
                continue
            
            parts = line.split('|')
            if len(parts) >= 6:
                entry = ReflogEntry(
                    hash=parts[0],
                    ref=parts[1],
                    action=parts[2],
                    message=parts[3],
                    timestamp=datetime.fromisoformat(parts[4].split(' +')[0]),
                    author=parts[5]
                )
                entries.append(entry)
        
        return entries
    
    def find_lost_commits(self) -> List[str]:
        """Find commits that are no longer referenced"""
        
        # Get all commits in reflog
        reflog_commits = set()
        result = subprocess.run(
            ["git", "reflog", "--format=%H"],
            cwd=self.repo_path,
            capture_output=True,
            text=True
        )
        
        for line in result.stdout.strip().split('\n'):
            if line:
                reflog_commits.add(line)
        
        # Get all reachable commits
        reachable_commits = set()
        result = subprocess.run(
            ["git", "rev-list", "--all"],
            cwd=self.repo_path,
            capture_output=True,
            text=True
        )
        
        for line in result.stdout.strip().split('\n'):
            if line:
                reachable_commits.add(line)
        
        # Find unreachable commits
        lost_commits = reflog_commits - reachable_commits
        
        # Get details for lost commits
        lost_details = []
        for commit in lost_commits:
            try:
                info = subprocess.run(
                    ["git", "show", "--format=%H|%s|%an|%ai", "-s", commit],
                    cwd=self.repo_path,
                    capture_output=True,
                    text=True
                ).stdout.strip()
                
                lost_details.append(info)
            except:
                pass
        
        return lost_details
    
    def recover_commit(self, commit_hash: str, 
                       recovery_branch: str = None) -> bool:
        """Recover a lost commit"""
        
        if not recovery_branch:
            recovery_branch = f"recovery-{commit_hash[:8]}"
        
        try:
            # Create branch at lost commit
            subprocess.run(
                ["git", "branch", recovery_branch, commit_hash],
                cwd=self.repo_path,
                check=True
            )
            
            print(f"Recovered commit {commit_hash[:8]} to branch {recovery_branch}")
            return True
            
        except subprocess.CalledProcessError as e:
            print(f"Failed to recover commit: {e}")
            return False
    
    def analyze_dangerous_operations(self) -> List[Dict]:
        """Identify potentially dangerous operations in reflog"""
        
        dangerous_patterns = [
            (r'reset.*--hard', 'Hard reset detected'),
            (r'rebase.*abort', 'Aborted rebase'),
            (r'merge.*abort', 'Aborted merge'),
            (r'checkout.*-f', 'Force checkout'),
            (r'branch.*-D', 'Force branch deletion'),
            (r'push.*--force', 'Force push detected')
        ]
        
        entries = self.parse_reflog()
        dangerous_ops = []
        
        for entry in entries:
            for pattern, description in dangerous_patterns:
                if re.search(pattern, entry.action, re.IGNORECASE):
                    dangerous_ops.append({
                        'timestamp': entry.timestamp,
                        'operation': entry.action,
                        'description': description,
                        'commit': entry.hash,
                        'author': entry.author
                    })
        
        return dangerous_ops

Recovery Operations

Comprehensive Recovery Tools

#!/bin/bash
# recovery_tools.sh

# Recover from accidental reset
recover_from_reset() {
    echo "Recovering from accidental reset..."
    
    # Show recent resets
    echo "Recent reset operations:"
    git reflog | grep -E "reset:" | head -5
    
    # Get the commit before reset
    BEFORE_RESET=$(git reflog | grep -E "reset:" | head -1 | cut -d' ' -f1)
    
    if [ -z "$BEFORE_RESET" ]; then
        echo "No recent reset found"
        return 1
    fi
    
    echo "Found previous HEAD at: $BEFORE_RESET"
    
    # Show what would be recovered
    echo "Commits that would be recovered:"
    git log --oneline $BEFORE_RESET ^HEAD
    
    read -p "Recover to this state? (y/n) " -n 1 -r
    echo
    if [[ $REPLY =~ ^[Yy]$ ]]; then
        git reset --hard $BEFORE_RESET
        echo "Recovery complete!"
    fi
}

# Recover deleted branch
recover_deleted_branch() {
    BRANCH_NAME=$1
    
    echo "Searching for deleted branch: $BRANCH_NAME"
    
    # Search reflog for branch deletion
    BRANCH_TIP=$(git reflog | grep -E "checkout: moving from $BRANCH_NAME" | head -1 | cut -d' ' -f1)
    
    if [ -z "$BRANCH_TIP" ]; then
        # Try alternative search
        BRANCH_TIP=$(git reflog | grep -E "$BRANCH_NAME" | head -1 | cut -d' ' -f1)
    fi
    
    if [ -z "$BRANCH_TIP" ]; then
        echo "Could not find branch $BRANCH_NAME in reflog"
        return 1
    fi
    
    echo "Found branch tip at: $BRANCH_TIP"
    
    # Show branch content
    echo "Branch contained these commits:"
    git log --oneline -10 $BRANCH_TIP
    
    read -p "Recreate branch $BRANCH_NAME? (y/n) " -n 1 -r
    echo
    if [[ $REPLY =~ ^[Yy]$ ]]; then
        git branch $BRANCH_NAME $BRANCH_TIP
        echo "Branch $BRANCH_NAME recovered!"
    fi
}

# Recover from bad rebase
recover_from_bad_rebase() {
    echo "Recovering from bad rebase..."
    
    # Find ORIG_HEAD
    if [ -f .git/ORIG_HEAD ]; then
        ORIG=$(cat .git/ORIG_HEAD)
        echo "Found ORIG_HEAD at: $ORIG"
    else
        # Search reflog for rebase start
        ORIG=$(git reflog | grep -E "rebase.*start" | head -1 | cut -d' ' -f1)
        echo "Found rebase start at: $ORIG"
    fi
    
    if [ -z "$ORIG" ]; then
        echo "Could not find original HEAD before rebase"
        return 1
    fi
    
    # Show difference
    echo "Current state vs original:"
    git log --oneline --graph HEAD ^$ORIG | head -20
    echo "---"
    git log --oneline --graph $ORIG ^HEAD | head -20
    
    echo "Options:"
    echo "  1) Reset to original state (lose rebase)"
    echo "  2) Create branch at original state"
    echo "  3) Cherry-pick specific commits"
    echo "  4) Cancel"
    
    read -p "Choice [1-4]: " choice
    
    case $choice in
        1)
            git reset --hard $ORIG
            echo "Reset to original state"
            ;;
        2)
            read -p "Branch name: " branch_name
            git branch $branch_name $ORIG
            echo "Created branch $branch_name at original state"
            ;;
        3)
            echo "Commits to cherry-pick:"
            git log --oneline HEAD ^$ORIG
            read -p "Enter commit SHAs (space-separated): " commits
            git checkout $ORIG
            for commit in $commits; do
                git cherry-pick $commit
            done
            ;;
        4)
            echo "Cancelled"
            ;;
    esac
}

# Recover lost stash
recover_lost_stash() {
    echo "Searching for lost stashes..."
    
    # Find all stash commits
    STASHES=$(git fsck --unreachable | grep commit | cut -d' ' -f3)
    
    for commit in $STASHES; do
        # Check if it looks like a stash
        if git show --format="%s" -s $commit | grep -qE "^(WIP on|On |index on)"; then
            echo ""
            echo "Found stash: $commit"
            git show --stat $commit
            
            read -p "Recover this stash? (y/n/q) " -n 1 -r
            echo
            
            case $REPLY in
                y|Y)
                    git stash store -m "Recovered stash" $commit
                    echo "Stash recovered!"
                    ;;
                q|Q)
                    break
                    ;;
            esac
        fi
    done
}

Advanced Recovery Scenarios

# advanced_recovery.py
class AdvancedRecovery:
    def __init__(self):
        self.reflog_manager = ReflogManager()
    
    def recover_from_force_push(self, branch: str, remote: str = "origin"):
        """Recover from accidental force push"""
        
        print(f"Recovering from force push on {branch}")
        
        # Find the commit before force push
        reflog_entries = self.reflog_manager.parse_reflog(branch)
        
        force_push_entry = None
        for entry in reflog_entries:
            if "forced-update" in entry.action or "force" in entry.message.lower():
                force_push_entry = entry
                break
        
        if not force_push_entry:
            print("No force push detected in recent history")
            return False
        
        # Get the commit before force push
        previous_commit = None
        for i, entry in enumerate(reflog_entries):
            if entry == force_push_entry and i + 1 < len(reflog_entries):
                previous_commit = reflog_entries[i + 1].hash
                break
        
        if not previous_commit:
            print("Could not find commit before force push")
            return False
        
        print(f"Found previous state: {previous_commit[:8]}")
        
        # Show what was lost
        lost_commits = subprocess.run(
            ["git", "log", "--oneline", f"{previous_commit}..{force_push_entry.hash}"],
            capture_output=True,
            text=True
        ).stdout
        
        print("Commits that were force-pushed away:")
        print(lost_commits)
        
        # Offer recovery options
        print("\nRecovery options:")
        print("1. Reset local branch to previous state")
        print("2. Create recovery branch")
        print("3. Force push to restore remote")
        
        choice = input("Choose option (1-3): ")
        
        if choice == "1":
            subprocess.run(["git", "reset", "--hard", previous_commit])
            print(f"Local branch reset to {previous_commit[:8]}")
        elif choice == "2":
            recovery_branch = f"recovery-{branch}-{datetime.now().strftime('%Y%m%d')}"
            subprocess.run(["git", "branch", recovery_branch, previous_commit])
            print(f"Created recovery branch: {recovery_branch}")
        elif choice == "3":
            confirm = input("This will overwrite remote. Are you sure? (yes/no): ")
            if confirm.lower() == "yes":
                subprocess.run(["git", "push", "--force-with-lease", 
                              remote, f"{previous_commit}:{branch}"])
                print("Remote restored")
        
        return True
    
    def recover_merge_conflict_resolution(self):
        """Recover lost merge conflict resolution"""
        
        # Find recent merge commits
        merge_commits = subprocess.run(
            ["git", "reflog", "--grep=merge"],
            capture_output=True,
            text=True
        ).stdout.strip().split('\n')
        
        if not merge_commits:
            print("No recent merges found")
            return
        
        print("Recent merges:")
        for i, commit in enumerate(merge_commits[:5]):
            print(f"{i}: {commit}")
        
        choice = int(input("Select merge to recover (number): "))
        merge_ref = merge_commits[choice].split()[0]
        
        # Get the merge commit
        merge_commit = subprocess.run(
            ["git", "rev-parse", merge_ref],
            capture_output=True,
            text=True
        ).stdout.strip()
        
        # Extract conflict resolution
        parents = subprocess.run(
            ["git", "show", "--format=%P", "-s", merge_commit],
            capture_output=True,
            text=True
        ).stdout.strip().split()
        
        if len(parents) != 2:
            print("Not a merge commit")
            return
        
        # Show resolution
        print("\nConflict resolution from merge:")
        subprocess.run([
            "git", "diff", f"{parents[0]}...{parents[1]}", "--",
            subprocess.run([
                "git", "diff", "--name-only", f"{parents[0]}...{parents[1]}"
            ], capture_output=True, text=True).stdout.strip()
        ])
        
        # Apply resolution
        apply = input("Apply this resolution? (y/n): ")
        if apply.lower() == 'y':
            # Create patch from merge resolution
            patch = subprocess.run(
                ["git", "diff", f"{parents[0]}..{merge_commit}"],
                capture_output=True,
                text=True
            ).stdout
            
            # Apply patch
            process = subprocess.Popen(
                ["git", "apply", "-"],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True
            )
            
            stdout, stderr = process.communicate(input=patch)
            
            if process.returncode == 0:
                print("Resolution applied successfully")
            else:
                print(f"Failed to apply resolution: {stderr}")

Reflog Analysis and Visualization

Reflog Analytics

# reflog_analytics.py
import matplotlib.pyplot as plt
import pandas as pd
from collections import defaultdict

class ReflogAnalytics:
    def __init__(self):
        self.reflog_manager = ReflogManager()
    
    def analyze_activity_patterns(self, days: int = 30):
        """Analyze repository activity patterns"""
        
        entries = self.reflog_manager.parse_reflog(days=days)
        
        # Group by hour of day
        hourly_activity = defaultdict(int)
        daily_activity = defaultdict(int)
        action_types = defaultdict(int)
        
        for entry in entries:
            hour = entry.timestamp.hour
            date = entry.timestamp.date()
            action = entry.action.split(':')[0] if ':' in entry.action else entry.action
            
            hourly_activity[hour] += 1
            daily_activity[date] += 1
            action_types[action] += 1
        
        # Create visualizations
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        
        # Hourly activity
        hours = list(range(24))
        activity = [hourly_activity.get(h, 0) for h in hours]
        axes[0, 0].bar(hours, activity)
        axes[0, 0].set_xlabel('Hour of Day')
        axes[0, 0].set_ylabel('Number of Operations')
        axes[0, 0].set_title('Activity by Hour')
        
        # Daily activity
        dates = sorted(daily_activity.keys())
        counts = [daily_activity[d] for d in dates]
        axes[0, 1].plot(dates, counts)
        axes[0, 1].set_xlabel('Date')
        axes[0, 1].set_ylabel('Number of Operations')
        axes[0, 1].set_title('Daily Activity')
        axes[0, 1].tick_params(axis='x', rotation=45)
        
        # Action types
        actions = list(action_types.keys())
        counts = list(action_types.values())
        axes[1, 0].pie(counts, labels=actions, autopct='%1.1f%%')
        axes[1, 0].set_title('Operation Types')
        
        # Risk analysis
        dangerous_ops = self.reflog_manager.analyze_dangerous_operations()
        risk_dates = [op['timestamp'].date() for op in dangerous_ops]
        risk_counts = defaultdict(int)
        for date in risk_dates:
            risk_counts[date] += 1
        
        if risk_counts:
            dates = sorted(risk_counts.keys())
            counts = [risk_counts[d] for d in dates]
            axes[1, 1].bar(dates, counts, color='red')
            axes[1, 1].set_xlabel('Date')
            axes[1, 1].set_ylabel('Dangerous Operations')
            axes[1, 1].set_title('Risk Operations Over Time')
        
        plt.tight_layout()
        plt.savefig('reflog_analysis.png')
        plt.show()
        
        return {
            'hourly_activity': dict(hourly_activity),
            'daily_activity': {str(k): v for k, v in daily_activity.items()},
            'action_types': dict(action_types),
            'dangerous_operations': len(dangerous_ops)
        }
    
    def generate_recovery_report(self) -> str:
        """Generate comprehensive recovery report"""
        
        report = []
        report.append("# Git Repository Recovery Report")
        report.append(f"Generated: {datetime.now().isoformat()}")
        report.append("")
        
        # Lost commits
        lost_commits = self.reflog_manager.find_lost_commits()
        report.append(f"## Lost Commits: {len(lost_commits)}")
        for commit_info in lost_commits[:10]:
            parts = commit_info.split('|')
            if len(parts) >= 4:
                report.append(f"- {parts[0][:8]}: {parts[1]} by {parts[2]}")
        report.append("")
        
        # Dangerous operations
        dangerous_ops = self.reflog_manager.analyze_dangerous_operations()
        report.append(f"## Recent Dangerous Operations: {len(dangerous_ops)}")
        for op in dangerous_ops[:10]:
            report.append(f"- {op['timestamp']}: {op['description']}")
        report.append("")
        
        # Recovery points
        report.append("## Recovery Points")
        entries = self.reflog_manager.parse_reflog(days=7)
        
        checkpoints = []
        for entry in entries:
            if any(word in entry.action.lower() 
                   for word in ['commit', 'merge', 'checkout']):
                checkpoints.append(entry)
        
        for checkpoint in checkpoints[:20]:
            report.append(f"- {checkpoint.timestamp}: {checkpoint.hash[:8]} - {checkpoint.action}")
        
        return '\n'.join(report)

Automation and Prevention

Automated Backup System

#!/bin/bash
# backup_system.sh

# Create reflog backup
backup_reflog() {
    BACKUP_DIR=".git/reflog_backups"
    TIMESTAMP=$(date +%Y%m%d_%H%M%S)
    
    mkdir -p $BACKUP_DIR
    
    # Backup all reflogs
    cp -r .git/logs $BACKUP_DIR/logs_$TIMESTAMP
    
    # Compress old backups
    find $BACKUP_DIR -name "logs_*" -mtime +7 -exec gzip {} \;
    
    # Remove very old backups
    find $BACKUP_DIR -name "*.gz" -mtime +30 -delete
    
    echo "Reflog backed up to $BACKUP_DIR/logs_$TIMESTAMP"
}

# Create safety snapshot before dangerous operation
create_safety_snapshot() {
    OPERATION=$1
    
    echo "Creating safety snapshot before: $OPERATION"
    
    # Create temporary branch
    SAFETY_BRANCH="safety-$(date +%Y%m%d-%H%M%S)"
    git branch $SAFETY_BRANCH
    
    # Store in reflog
    git update-ref -m "Safety snapshot before $OPERATION" refs/safety/HEAD HEAD
    
    # Log the snapshot
    echo "$(date): $SAFETY_BRANCH - Before $OPERATION" >> .git/safety_log
    
    echo "Safety snapshot created: $SAFETY_BRANCH"
    echo "To recover: git checkout $SAFETY_BRANCH"
}

# Protect against dangerous operations
safe_git() {
    COMMAND=$1
    shift
    
    case $COMMAND in
        reset)
            if [[ "$*" == *"--hard"* ]]; then
                create_safety_snapshot "hard reset"
            fi
            ;;
        push)
            if [[ "$*" == *"--force"* ]]; then
                create_safety_snapshot "force push"
                echo "Warning: Force push detected!"
                read -p "Are you sure? (yes/no): " confirm
                [ "$confirm" != "yes" ] && exit 1
            fi
            ;;
        rebase)
            create_safety_snapshot "rebase"
            ;;
    esac
    
    # Execute command
    git $COMMAND "$@"
}

# Periodic reflog maintenance
maintain_reflog() {
    echo "Performing reflog maintenance..."
    
    # Expire old reflog entries (keep 90 days)
    git reflog expire --expire=90.days.ago --all
    
    # But keep important entries
    git reflog expire --expire-unreachable=never --all
    
    # Backup before gc
    backup_reflog
    
    # Gentle garbage collection
    git gc --auto --prune=30.days.ago
    
    echo "Reflog maintenance complete"
}

Recovery Hooks

# recovery_hooks.py
#!/usr/bin/env python3

import sys
import subprocess
import json
from pathlib import Path

class RecoveryHooks:
    def __init__(self):
        self.config_file = Path(".git/recovery_config.json")
        self.load_config()
    
    def load_config(self):
        """Load recovery configuration"""
        if self.config_file.exists():
            with open(self.config_file, 'r') as f:
                self.config = json.load(f)
        else:
            self.config = {
                'auto_backup': True,
                'protect_branches': ['main', 'master', 'production'],
                'snapshot_before': ['reset --hard', 'rebase', 'filter-branch'],
                'max_reflog_entries': 10000
            }
    
    def pre_command_hook(self, command: str):
        """Hook to run before dangerous commands"""
        
        # Check if command needs protection
        for protected_cmd in self.config['snapshot_before']:
            if protected_cmd in command:
                self.create_snapshot(f"Before: {command}")
                break
        
        # Warn about protected branches
        current_branch = subprocess.run(
            ["git", "branch", "--show-current"],
            capture_output=True,
            text=True
        ).stdout.strip()
        
        if current_branch in self.config['protect_branches']:
            if any(danger in command for danger in ['force', 'reset --hard', 'rebase']):
                print(f"āš ļø  Warning: Dangerous operation on protected branch {current_branch}")
                response = input("Continue? (yes/no): ")
                if response.lower() != 'yes':
                    sys.exit(1)
    
    def create_snapshot(self, message: str):
        """Create recovery snapshot"""
        
        snapshot_ref = f"refs/snapshots/{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        
        subprocess.run([
            "git", "update-ref", "-m", message, snapshot_ref, "HEAD"
        ])
        
        print(f"šŸ“ø Snapshot created: {snapshot_ref}")
    
    def post_command_hook(self, command: str, exit_code: int):
        """Hook to run after commands"""
        
        if exit_code != 0:
            # Command failed, offer recovery
            print("\nāŒ Command failed. Recovery options:")
            print("1. Show recent reflog entries")
            print("2. Reset to previous state")
            print("3. Show recovery guide")
            print("4. Do nothing")
            
            choice = input("Choose (1-4): ")
            
            if choice == "1":
                subprocess.run(["git", "reflog", "-10"])
            elif choice == "2":
                subprocess.run(["git", "reset", "--hard", "HEAD@{1}"])
                print("Reset to previous state")
            elif choice == "3":
                self.show_recovery_guide()

Best Practices Checklist

  • Understand reflog expiration settings
  • Regular reflog backups for critical repos
  • Create snapshots before dangerous operations
  • Document recovery procedures
  • Train team on reflog usage
  • Set appropriate gc.reflogExpire values
  • Monitor reflog size
  • Use descriptive reflog messages
  • Test recovery procedures regularly
  • Implement pre-operation hooks
  • Keep reflog for deleted branches
  • Use reflog in incident response
  • Automate common recovery scenarios
  • Create recovery branches for important states
  • Document force push procedures

Conclusion

Git reflog is your safety net and time machine combined. Master it, and you'll never truly lose work in Git again. The key is understanding that Git keeps more history than you see, and reflog is your window into that hidden history. Implement proper backup strategies, automate recovery procedures, and sleep soundly knowing you can recover from almost any Git disaster.

Share this article

DC

David Childs

Consulting Systems Engineer with over 10 years of experience building scalable infrastructure and helping organizations optimize their technology stack.

Related Articles