Master Git reflog to recover lost commits, undo destructive operations, and implement bulletproof disaster recovery strategies for your repositories.
Git reflog is your time machine and safety net rolled into one. After recovering countless "lost" commits and saving teams from disaster, I've learned that understanding reflog is the difference between panic and confidence when things go wrong. Here's your complete guide to Git's most powerful recovery tool.
Understanding Git Reflog
Reflog Fundamentals
# reflog_manager.py
import subprocess
import re
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from dataclasses import dataclass
@dataclass
class ReflogEntry:
hash: str
ref: str
action: str
message: str
timestamp: datetime
author: str
class ReflogManager:
def __init__(self, repo_path: str = "."):
self.repo_path = repo_path
def parse_reflog(self, ref: str = "HEAD", days: int = 30) -> List[ReflogEntry]:
"""Parse reflog entries for analysis"""
since_date = datetime.now() - timedelta(days=days)
cmd = [
"git", "reflog", "show", ref,
"--format=%H|%gD|%gs|%s|%ai|%an",
f"--since={since_date.isoformat()}"
]
result = subprocess.run(
cmd,
cwd=self.repo_path,
capture_output=True,
text=True
)
entries = []
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split('|')
if len(parts) >= 6:
entry = ReflogEntry(
hash=parts[0],
ref=parts[1],
action=parts[2],
message=parts[3],
timestamp=datetime.fromisoformat(parts[4].split(' +')[0]),
author=parts[5]
)
entries.append(entry)
return entries
def find_lost_commits(self) -> List[str]:
"""Find commits that are no longer referenced"""
# Get all commits in reflog
reflog_commits = set()
result = subprocess.run(
["git", "reflog", "--format=%H"],
cwd=self.repo_path,
capture_output=True,
text=True
)
for line in result.stdout.strip().split('\n'):
if line:
reflog_commits.add(line)
# Get all reachable commits
reachable_commits = set()
result = subprocess.run(
["git", "rev-list", "--all"],
cwd=self.repo_path,
capture_output=True,
text=True
)
for line in result.stdout.strip().split('\n'):
if line:
reachable_commits.add(line)
# Find unreachable commits
lost_commits = reflog_commits - reachable_commits
# Get details for lost commits
lost_details = []
for commit in lost_commits:
try:
info = subprocess.run(
["git", "show", "--format=%H|%s|%an|%ai", "-s", commit],
cwd=self.repo_path,
capture_output=True,
text=True
).stdout.strip()
lost_details.append(info)
except:
pass
return lost_details
def recover_commit(self, commit_hash: str,
recovery_branch: str = None) -> bool:
"""Recover a lost commit"""
if not recovery_branch:
recovery_branch = f"recovery-{commit_hash[:8]}"
try:
# Create branch at lost commit
subprocess.run(
["git", "branch", recovery_branch, commit_hash],
cwd=self.repo_path,
check=True
)
print(f"Recovered commit {commit_hash[:8]} to branch {recovery_branch}")
return True
except subprocess.CalledProcessError as e:
print(f"Failed to recover commit: {e}")
return False
def analyze_dangerous_operations(self) -> List[Dict]:
"""Identify potentially dangerous operations in reflog"""
dangerous_patterns = [
(r'reset.*--hard', 'Hard reset detected'),
(r'rebase.*abort', 'Aborted rebase'),
(r'merge.*abort', 'Aborted merge'),
(r'checkout.*-f', 'Force checkout'),
(r'branch.*-D', 'Force branch deletion'),
(r'push.*--force', 'Force push detected')
]
entries = self.parse_reflog()
dangerous_ops = []
for entry in entries:
for pattern, description in dangerous_patterns:
if re.search(pattern, entry.action, re.IGNORECASE):
dangerous_ops.append({
'timestamp': entry.timestamp,
'operation': entry.action,
'description': description,
'commit': entry.hash,
'author': entry.author
})
return dangerous_ops
Recovery Operations
Comprehensive Recovery Tools
#!/bin/bash
# recovery_tools.sh
# Recover from accidental reset
recover_from_reset() {
echo "Recovering from accidental reset..."
# Show recent resets
echo "Recent reset operations:"
git reflog | grep -E "reset:" | head -5
# Get the commit before reset
BEFORE_RESET=$(git reflog | grep -E "reset:" | head -1 | cut -d' ' -f1)
if [ -z "$BEFORE_RESET" ]; then
echo "No recent reset found"
return 1
fi
echo "Found previous HEAD at: $BEFORE_RESET"
# Show what would be recovered
echo "Commits that would be recovered:"
git log --oneline $BEFORE_RESET ^HEAD
read -p "Recover to this state? (y/n) " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
git reset --hard $BEFORE_RESET
echo "Recovery complete!"
fi
}
# Recover deleted branch
recover_deleted_branch() {
BRANCH_NAME=$1
echo "Searching for deleted branch: $BRANCH_NAME"
# Search reflog for branch deletion
BRANCH_TIP=$(git reflog | grep -E "checkout: moving from $BRANCH_NAME" | head -1 | cut -d' ' -f1)
if [ -z "$BRANCH_TIP" ]; then
# Try alternative search
BRANCH_TIP=$(git reflog | grep -E "$BRANCH_NAME" | head -1 | cut -d' ' -f1)
fi
if [ -z "$BRANCH_TIP" ]; then
echo "Could not find branch $BRANCH_NAME in reflog"
return 1
fi
echo "Found branch tip at: $BRANCH_TIP"
# Show branch content
echo "Branch contained these commits:"
git log --oneline -10 $BRANCH_TIP
read -p "Recreate branch $BRANCH_NAME? (y/n) " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
git branch $BRANCH_NAME $BRANCH_TIP
echo "Branch $BRANCH_NAME recovered!"
fi
}
# Recover from bad rebase
recover_from_bad_rebase() {
echo "Recovering from bad rebase..."
# Find ORIG_HEAD
if [ -f .git/ORIG_HEAD ]; then
ORIG=$(cat .git/ORIG_HEAD)
echo "Found ORIG_HEAD at: $ORIG"
else
# Search reflog for rebase start
ORIG=$(git reflog | grep -E "rebase.*start" | head -1 | cut -d' ' -f1)
echo "Found rebase start at: $ORIG"
fi
if [ -z "$ORIG" ]; then
echo "Could not find original HEAD before rebase"
return 1
fi
# Show difference
echo "Current state vs original:"
git log --oneline --graph HEAD ^$ORIG | head -20
echo "---"
git log --oneline --graph $ORIG ^HEAD | head -20
echo "Options:"
echo " 1) Reset to original state (lose rebase)"
echo " 2) Create branch at original state"
echo " 3) Cherry-pick specific commits"
echo " 4) Cancel"
read -p "Choice [1-4]: " choice
case $choice in
1)
git reset --hard $ORIG
echo "Reset to original state"
;;
2)
read -p "Branch name: " branch_name
git branch $branch_name $ORIG
echo "Created branch $branch_name at original state"
;;
3)
echo "Commits to cherry-pick:"
git log --oneline HEAD ^$ORIG
read -p "Enter commit SHAs (space-separated): " commits
git checkout $ORIG
for commit in $commits; do
git cherry-pick $commit
done
;;
4)
echo "Cancelled"
;;
esac
}
# Recover lost stash
recover_lost_stash() {
echo "Searching for lost stashes..."
# Find all stash commits
STASHES=$(git fsck --unreachable | grep commit | cut -d' ' -f3)
for commit in $STASHES; do
# Check if it looks like a stash
if git show --format="%s" -s $commit | grep -qE "^(WIP on|On |index on)"; then
echo ""
echo "Found stash: $commit"
git show --stat $commit
read -p "Recover this stash? (y/n/q) " -n 1 -r
echo
case $REPLY in
y|Y)
git stash store -m "Recovered stash" $commit
echo "Stash recovered!"
;;
q|Q)
break
;;
esac
fi
done
}
Advanced Recovery Scenarios
# advanced_recovery.py
class AdvancedRecovery:
def __init__(self):
self.reflog_manager = ReflogManager()
def recover_from_force_push(self, branch: str, remote: str = "origin"):
"""Recover from accidental force push"""
print(f"Recovering from force push on {branch}")
# Find the commit before force push
reflog_entries = self.reflog_manager.parse_reflog(branch)
force_push_entry = None
for entry in reflog_entries:
if "forced-update" in entry.action or "force" in entry.message.lower():
force_push_entry = entry
break
if not force_push_entry:
print("No force push detected in recent history")
return False
# Get the commit before force push
previous_commit = None
for i, entry in enumerate(reflog_entries):
if entry == force_push_entry and i + 1 < len(reflog_entries):
previous_commit = reflog_entries[i + 1].hash
break
if not previous_commit:
print("Could not find commit before force push")
return False
print(f"Found previous state: {previous_commit[:8]}")
# Show what was lost
lost_commits = subprocess.run(
["git", "log", "--oneline", f"{previous_commit}..{force_push_entry.hash}"],
capture_output=True,
text=True
).stdout
print("Commits that were force-pushed away:")
print(lost_commits)
# Offer recovery options
print("\nRecovery options:")
print("1. Reset local branch to previous state")
print("2. Create recovery branch")
print("3. Force push to restore remote")
choice = input("Choose option (1-3): ")
if choice == "1":
subprocess.run(["git", "reset", "--hard", previous_commit])
print(f"Local branch reset to {previous_commit[:8]}")
elif choice == "2":
recovery_branch = f"recovery-{branch}-{datetime.now().strftime('%Y%m%d')}"
subprocess.run(["git", "branch", recovery_branch, previous_commit])
print(f"Created recovery branch: {recovery_branch}")
elif choice == "3":
confirm = input("This will overwrite remote. Are you sure? (yes/no): ")
if confirm.lower() == "yes":
subprocess.run(["git", "push", "--force-with-lease",
remote, f"{previous_commit}:{branch}"])
print("Remote restored")
return True
def recover_merge_conflict_resolution(self):
"""Recover lost merge conflict resolution"""
# Find recent merge commits
merge_commits = subprocess.run(
["git", "reflog", "--grep=merge"],
capture_output=True,
text=True
).stdout.strip().split('\n')
if not merge_commits:
print("No recent merges found")
return
print("Recent merges:")
for i, commit in enumerate(merge_commits[:5]):
print(f"{i}: {commit}")
choice = int(input("Select merge to recover (number): "))
merge_ref = merge_commits[choice].split()[0]
# Get the merge commit
merge_commit = subprocess.run(
["git", "rev-parse", merge_ref],
capture_output=True,
text=True
).stdout.strip()
# Extract conflict resolution
parents = subprocess.run(
["git", "show", "--format=%P", "-s", merge_commit],
capture_output=True,
text=True
).stdout.strip().split()
if len(parents) != 2:
print("Not a merge commit")
return
# Show resolution
print("\nConflict resolution from merge:")
subprocess.run([
"git", "diff", f"{parents[0]}...{parents[1]}", "--",
subprocess.run([
"git", "diff", "--name-only", f"{parents[0]}...{parents[1]}"
], capture_output=True, text=True).stdout.strip()
])
# Apply resolution
apply = input("Apply this resolution? (y/n): ")
if apply.lower() == 'y':
# Create patch from merge resolution
patch = subprocess.run(
["git", "diff", f"{parents[0]}..{merge_commit}"],
capture_output=True,
text=True
).stdout
# Apply patch
process = subprocess.Popen(
["git", "apply", "-"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate(input=patch)
if process.returncode == 0:
print("Resolution applied successfully")
else:
print(f"Failed to apply resolution: {stderr}")
Reflog Analysis and Visualization
Reflog Analytics
# reflog_analytics.py
import matplotlib.pyplot as plt
import pandas as pd
from collections import defaultdict
class ReflogAnalytics:
def __init__(self):
self.reflog_manager = ReflogManager()
def analyze_activity_patterns(self, days: int = 30):
"""Analyze repository activity patterns"""
entries = self.reflog_manager.parse_reflog(days=days)
# Group by hour of day
hourly_activity = defaultdict(int)
daily_activity = defaultdict(int)
action_types = defaultdict(int)
for entry in entries:
hour = entry.timestamp.hour
date = entry.timestamp.date()
action = entry.action.split(':')[0] if ':' in entry.action else entry.action
hourly_activity[hour] += 1
daily_activity[date] += 1
action_types[action] += 1
# Create visualizations
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# Hourly activity
hours = list(range(24))
activity = [hourly_activity.get(h, 0) for h in hours]
axes[0, 0].bar(hours, activity)
axes[0, 0].set_xlabel('Hour of Day')
axes[0, 0].set_ylabel('Number of Operations')
axes[0, 0].set_title('Activity by Hour')
# Daily activity
dates = sorted(daily_activity.keys())
counts = [daily_activity[d] for d in dates]
axes[0, 1].plot(dates, counts)
axes[0, 1].set_xlabel('Date')
axes[0, 1].set_ylabel('Number of Operations')
axes[0, 1].set_title('Daily Activity')
axes[0, 1].tick_params(axis='x', rotation=45)
# Action types
actions = list(action_types.keys())
counts = list(action_types.values())
axes[1, 0].pie(counts, labels=actions, autopct='%1.1f%%')
axes[1, 0].set_title('Operation Types')
# Risk analysis
dangerous_ops = self.reflog_manager.analyze_dangerous_operations()
risk_dates = [op['timestamp'].date() for op in dangerous_ops]
risk_counts = defaultdict(int)
for date in risk_dates:
risk_counts[date] += 1
if risk_counts:
dates = sorted(risk_counts.keys())
counts = [risk_counts[d] for d in dates]
axes[1, 1].bar(dates, counts, color='red')
axes[1, 1].set_xlabel('Date')
axes[1, 1].set_ylabel('Dangerous Operations')
axes[1, 1].set_title('Risk Operations Over Time')
plt.tight_layout()
plt.savefig('reflog_analysis.png')
plt.show()
return {
'hourly_activity': dict(hourly_activity),
'daily_activity': {str(k): v for k, v in daily_activity.items()},
'action_types': dict(action_types),
'dangerous_operations': len(dangerous_ops)
}
def generate_recovery_report(self) -> str:
"""Generate comprehensive recovery report"""
report = []
report.append("# Git Repository Recovery Report")
report.append(f"Generated: {datetime.now().isoformat()}")
report.append("")
# Lost commits
lost_commits = self.reflog_manager.find_lost_commits()
report.append(f"## Lost Commits: {len(lost_commits)}")
for commit_info in lost_commits[:10]:
parts = commit_info.split('|')
if len(parts) >= 4:
report.append(f"- {parts[0][:8]}: {parts[1]} by {parts[2]}")
report.append("")
# Dangerous operations
dangerous_ops = self.reflog_manager.analyze_dangerous_operations()
report.append(f"## Recent Dangerous Operations: {len(dangerous_ops)}")
for op in dangerous_ops[:10]:
report.append(f"- {op['timestamp']}: {op['description']}")
report.append("")
# Recovery points
report.append("## Recovery Points")
entries = self.reflog_manager.parse_reflog(days=7)
checkpoints = []
for entry in entries:
if any(word in entry.action.lower()
for word in ['commit', 'merge', 'checkout']):
checkpoints.append(entry)
for checkpoint in checkpoints[:20]:
report.append(f"- {checkpoint.timestamp}: {checkpoint.hash[:8]} - {checkpoint.action}")
return '\n'.join(report)
Automation and Prevention
Automated Backup System
#!/bin/bash
# backup_system.sh
# Create reflog backup
backup_reflog() {
BACKUP_DIR=".git/reflog_backups"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
mkdir -p $BACKUP_DIR
# Backup all reflogs
cp -r .git/logs $BACKUP_DIR/logs_$TIMESTAMP
# Compress old backups
find $BACKUP_DIR -name "logs_*" -mtime +7 -exec gzip {} \;
# Remove very old backups
find $BACKUP_DIR -name "*.gz" -mtime +30 -delete
echo "Reflog backed up to $BACKUP_DIR/logs_$TIMESTAMP"
}
# Create safety snapshot before dangerous operation
create_safety_snapshot() {
OPERATION=$1
echo "Creating safety snapshot before: $OPERATION"
# Create temporary branch
SAFETY_BRANCH="safety-$(date +%Y%m%d-%H%M%S)"
git branch $SAFETY_BRANCH
# Store in reflog
git update-ref -m "Safety snapshot before $OPERATION" refs/safety/HEAD HEAD
# Log the snapshot
echo "$(date): $SAFETY_BRANCH - Before $OPERATION" >> .git/safety_log
echo "Safety snapshot created: $SAFETY_BRANCH"
echo "To recover: git checkout $SAFETY_BRANCH"
}
# Protect against dangerous operations
safe_git() {
COMMAND=$1
shift
case $COMMAND in
reset)
if [[ "$*" == *"--hard"* ]]; then
create_safety_snapshot "hard reset"
fi
;;
push)
if [[ "$*" == *"--force"* ]]; then
create_safety_snapshot "force push"
echo "Warning: Force push detected!"
read -p "Are you sure? (yes/no): " confirm
[ "$confirm" != "yes" ] && exit 1
fi
;;
rebase)
create_safety_snapshot "rebase"
;;
esac
# Execute command
git $COMMAND "$@"
}
# Periodic reflog maintenance
maintain_reflog() {
echo "Performing reflog maintenance..."
# Expire old reflog entries (keep 90 days)
git reflog expire --expire=90.days.ago --all
# But keep important entries
git reflog expire --expire-unreachable=never --all
# Backup before gc
backup_reflog
# Gentle garbage collection
git gc --auto --prune=30.days.ago
echo "Reflog maintenance complete"
}
Recovery Hooks
# recovery_hooks.py
#!/usr/bin/env python3
import sys
import subprocess
import json
from pathlib import Path
class RecoveryHooks:
def __init__(self):
self.config_file = Path(".git/recovery_config.json")
self.load_config()
def load_config(self):
"""Load recovery configuration"""
if self.config_file.exists():
with open(self.config_file, 'r') as f:
self.config = json.load(f)
else:
self.config = {
'auto_backup': True,
'protect_branches': ['main', 'master', 'production'],
'snapshot_before': ['reset --hard', 'rebase', 'filter-branch'],
'max_reflog_entries': 10000
}
def pre_command_hook(self, command: str):
"""Hook to run before dangerous commands"""
# Check if command needs protection
for protected_cmd in self.config['snapshot_before']:
if protected_cmd in command:
self.create_snapshot(f"Before: {command}")
break
# Warn about protected branches
current_branch = subprocess.run(
["git", "branch", "--show-current"],
capture_output=True,
text=True
).stdout.strip()
if current_branch in self.config['protect_branches']:
if any(danger in command for danger in ['force', 'reset --hard', 'rebase']):
print(f"ā ļø Warning: Dangerous operation on protected branch {current_branch}")
response = input("Continue? (yes/no): ")
if response.lower() != 'yes':
sys.exit(1)
def create_snapshot(self, message: str):
"""Create recovery snapshot"""
snapshot_ref = f"refs/snapshots/{datetime.now().strftime('%Y%m%d_%H%M%S')}"
subprocess.run([
"git", "update-ref", "-m", message, snapshot_ref, "HEAD"
])
print(f"šø Snapshot created: {snapshot_ref}")
def post_command_hook(self, command: str, exit_code: int):
"""Hook to run after commands"""
if exit_code != 0:
# Command failed, offer recovery
print("\nā Command failed. Recovery options:")
print("1. Show recent reflog entries")
print("2. Reset to previous state")
print("3. Show recovery guide")
print("4. Do nothing")
choice = input("Choose (1-4): ")
if choice == "1":
subprocess.run(["git", "reflog", "-10"])
elif choice == "2":
subprocess.run(["git", "reset", "--hard", "HEAD@{1}"])
print("Reset to previous state")
elif choice == "3":
self.show_recovery_guide()
Best Practices Checklist
Conclusion
Git reflog is your safety net and time machine combined. Master it, and you'll never truly lose work in Git again. The key is understanding that Git keeps more history than you see, and reflog is your window into that hidden history. Implement proper backup strategies, automate recovery procedures, and sleep soundly knowing you can recover from almost any Git disaster.