♻️ Refactor everything so it can be extended with providers. Added gitlab support

main
Hugo ODY 3 days ago
parent 3f546b745b
commit c57269dcfd

@ -0,0 +1,258 @@
# 🏗 Architecture du Migration Tool
## Vue d'ensemble
Le Migration Tool a été conçu selon les principes de développement suivants :
- **Single Responsibility Principle (SRP)** : Chaque classe a une responsabilité unique
- **Open/Closed Principle** : Ouvert à l'extension, fermé à la modification
- **Dependency Inversion** : Dépendance sur des abstractions, pas des implémentations
- **Extensibilité** : Facilité d'ajout de nouveaux providers
## Structure du projet
```
GiteaToGithubMigrator/
├── providers/ # Providers pour différents services Git
│ ├── __init__.py
│ ├── base.py # Classes abstraites et modèles
│ ├── factory.py # Factory pour créer les providers
│ ├── source/ # Providers source (Gitea, GitLab, etc.)
│ │ ├── __init__.py
│ │ ├── gitea.py
│ │ └── gitlab.py
│ └── destination/ # Providers destination (GitHub, GitLab, etc.)
│ ├── __init__.py
│ ├── github.py
│ └── gitlab.py
├── core/ # Logique métier centrale
│ ├── __init__.py
│ ├── config.py # Gestion de la configuration
│ └── migration_engine.py # Moteur de migration
├── ui/ # Interface utilisateur
│ ├── __init__.py
│ └── interactive_selector.py
├── main.py # Point d'entrée principal
└── run.sh # Script de lancement
```
## Responsabilités des modules
### 🔧 Providers (providers/)
#### Base (`providers/base.py`)
- **Repository** : Modèle de données unifié pour tous les providers
- **SourceProvider** : Interface abstraite pour les providers source
- **DestinationProvider** : Interface abstraite pour les providers destination
- **Exceptions** : Exceptions spécialisées pour la gestion d'erreurs
#### Factory (`providers/factory.py`)
- Création dynamique des instances de providers
- Enregistrement de nouveaux providers
- Validation des types de providers disponibles
#### Source Providers (`providers/source/`)
- **Gitea** : Implémentation pour Gitea
- **GitLab** : Implémentation pour GitLab (exemple d'extensibilité)
#### Destination Providers (`providers/destination/`)
- **GitHub** : Implémentation pour GitHub
- **GitLab** : Implémentation pour GitLab (exemple d'extensibilité)
### ⚙ Core (`core/`)
#### Configuration (`core/config.py`)
- Gestion centralisée de la configuration
- Support multi-providers via variables d'environnement
- Validation des paramètres de configuration
#### Migration Engine (`core/migration_engine.py`)
- Orchestration du processus de migration
- Gestion des repositories temporaires
- Exécution des commandes Git
- Logging et rapport de progression
### 🎨 UI (`ui/`)
#### Interactive Selector (`ui/interactive_selector.py`)
- Interface interactive pour la sélection de repositories
- Navigation au clavier
- Système de renommage
- Affichage paginated
## Extensibilité
### Ajouter un nouveau provider source
1. **Créer le fichier** : `providers/source/mon_provider.py`
```python
from typing import List, Optional
from ..base import SourceProvider, Repository, ProviderError, ConfigurationError
class MonProviderSourceProvider(SourceProvider):
def _validate_config(self) -> None:
# Valider la configuration spécifique
pass
def get_user_repositories(self) -> List[Repository]:
# Récupérer les repos de l'utilisateur
pass
def get_accessible_repositories(self) -> List[Repository]:
# Récupérer tous les repos accessibles
pass
def get_repository_info(self, owner: str, name: str) -> Optional[Repository]:
# Récupérer les infos d'un repo spécifique
pass
def get_authenticated_clone_url(self, repository: Repository) -> str:
# Générer l'URL de clone authentifiée
pass
```
2. **Enregistrer le provider** dans `providers/factory.py` :
```python
from .source.mon_provider import MonProviderSourceProvider
class ProviderFactory:
_source_providers: Dict[str, Type[SourceProvider]] = {
'gitea': GiteaSourceProvider,
'gitlab': GitLabSourceProvider,
'mon_provider': MonProviderSourceProvider, # Nouveau provider
}
```
3. **Ajouter la configuration** dans `core/config.py` :
```python
def _load_source_config(self) -> Dict[str, Any]:
if self.source_provider == 'mon_provider':
return {
'url': os.getenv('MON_PROVIDER_URL'),
'token': os.getenv('MON_PROVIDER_TOKEN'),
'username': os.getenv('MON_PROVIDER_USERNAME')
}
# ... autres providers
```
### Ajouter un nouveau provider destination
Le processus est identique, mais dans `providers/destination/`.
## Patterns utilisés
### 1. Abstract Factory Pattern
- `ProviderFactory` crée des instances de providers
- Permet d'ajouter de nouveaux providers sans modifier le code existant
### 2. Strategy Pattern
- Les providers implémentent des stratégies différentes pour accéder aux APIs
- Le moteur de migration utilise ces stratégies de manière transparente
### 3. Template Method Pattern
- `SourceProvider` et `DestinationProvider` définissent le squelette des opérations
- Les implémentations concrètes remplissent les détails spécifiques
### 4. Dependency Injection
- Les providers sont injectés dans `MigrationEngine`
- Facilite les tests et la flexibilité
## Configuration
### Variables d'environnement
```bash
# Provider source
SOURCE_PROVIDER=gitea|gitlab
GITEA_URL=https://gitea.example.com
GITEA_TOKEN=your_token
GITEA_USERNAME=your_username
# Provider destination
DESTINATION_PROVIDER=github|gitlab
GITHUB_TOKEN=your_token
GITHUB_USERNAME=your_username
```
### Extensibilité de la configuration
Pour ajouter un nouveau provider, il suffit d'ajouter les variables correspondantes et de modifier `MigrationConfig`.
## Tests
### Structure recommandée
```
tests/
├── unit/
│ ├── providers/
│ │ ├── test_gitea_provider.py
│ │ └── test_github_provider.py
│ ├── core/
│ │ └── test_migration_engine.py
│ └── ui/
│ └── test_interactive_selector.py
├── integration/
│ └── test_full_migration.py
└── fixtures/
└── sample_repositories.json
```
### Exemples de tests
```python
# Test d'un provider
def test_gitea_provider_validates_config():
config = {'url': 'https://gitea.com', 'token': 'token', 'username': 'user'}
provider = GiteaSourceProvider(config)
assert provider.base_url == 'https://gitea.com'
# Test du moteur de migration
def test_migration_engine_handles_errors():
source = Mock(spec=SourceProvider)
dest = Mock(spec=DestinationProvider)
engine = MigrationEngine(source, dest)
# Test error handling...
```
## Bonnes pratiques
### 1. Gestion d'erreurs
- Exceptions spécialisées pour chaque type d'erreur
- Logging approprié à chaque niveau
- Gestion gracieuse des échecs
### 2. Documentation
- Docstrings pour toutes les méthodes publiques
- Type hints pour la clarté du code
- README et documentation d'architecture
### 3. Sécurité
- Tokens jamais loggés
- Nettoyage des repositories temporaires
- Validation des entrées utilisateur
### 4. Performance
- Pagination pour les listes de repositories
- Parallélisation possible des migrations
- Gestion efficace de la mémoire
## Évolutions futures
### Fonctionnalités potentielles
- Migration incrémentale (seulement les changements)
- Support des webhooks
- Interface web
- API REST
- Migration de métadonnées (issues, pull requests)
### Nouveaux providers
- Bitbucket
- Azure DevOps
- Sourcehut
- Codeberg
L'architecture actuelle permet d'ajouter facilement ces fonctionnalités sans restructuration majeure.

@ -1,45 +0,0 @@
"""
Configuration module for Gitea to GitHub migration tool
"""
import os
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
class Config:
"""Configuration class for migration settings"""
def __init__(self):
self.gitea_url = os.getenv('GITEA_URL', 'https://codefirst.iut.uca.fr/git')
self.gitea_token = os.getenv('GITEA_TOKEN')
self.github_token = os.getenv('GITHUB_TOKEN')
self.github_username = os.getenv('GITHUB_USERNAME')
self.gitea_username = os.getenv('GITEA_USERNAME')
# Validate required configuration
self._validate_config()
def _validate_config(self):
"""Validate that all required configuration is present"""
missing = []
if not self.gitea_token:
missing.append('GITEA_TOKEN')
if not self.github_token:
missing.append('GITHUB_TOKEN')
if not self.github_username:
missing.append('GITHUB_USERNAME')
if not self.gitea_username:
missing.append('GITEA_USERNAME')
if missing:
raise ValueError(f"Missing required environment variables: {', '.join(missing)}")
def is_valid(self):
"""Check if configuration is valid"""
try:
self._validate_config()
return True
except ValueError:
return False

@ -0,0 +1,3 @@
"""
Core migration engine and utilities
"""

@ -0,0 +1,76 @@
"""
Configuration management for migration tool
"""
import os
from typing import Dict, Any
from dotenv import load_dotenv
from providers.base import ConfigurationError
# Load environment variables from .env file
load_dotenv()
class MigrationConfig:
"""Configuration manager for migration settings"""
def __init__(self):
self.source_provider = os.getenv('SOURCE_PROVIDER', 'gitea').lower()
self.destination_provider = os.getenv('DESTINATION_PROVIDER', 'github').lower()
self.source_config = self._load_source_config()
self.destination_config = self._load_destination_config()
self._validate_config()
def _load_source_config(self) -> Dict[str, Any]:
"""Load source provider configuration"""
if self.source_provider == 'gitea':
return {
'url': os.getenv('GITEA_URL', 'https://codefirst.iut.uca.fr/git'),
'token': os.getenv('GITEA_TOKEN'),
'username': os.getenv('GITEA_USERNAME')
}
elif self.source_provider == 'gitlab':
return {
'url': os.getenv('GITLAB_URL', 'https://gitlab.com'),
'token': os.getenv('GITLAB_TOKEN'),
'username': os.getenv('GITLAB_USERNAME')
}
else:
raise ConfigurationError(f"Unsupported source provider: {self.source_provider}")
def _load_destination_config(self) -> Dict[str, Any]:
"""Load destination provider configuration"""
if self.destination_provider == 'github':
return {
'token': os.getenv('GITHUB_TOKEN'),
'username': os.getenv('GITHUB_USERNAME')
}
elif self.destination_provider == 'gitlab':
return {
'url': os.getenv('GITLAB_DEST_URL', 'https://gitlab.com'),
'token': os.getenv('GITLAB_DEST_TOKEN'),
'username': os.getenv('GITLAB_DEST_USERNAME')
}
else:
raise ConfigurationError(f"Unsupported destination provider: {self.destination_provider}")
def _validate_config(self) -> None:
"""Validate configuration completeness"""
# Check source config
missing_source = [key for key, value in self.source_config.items() if not value]
if missing_source:
raise ConfigurationError(f"Missing {self.source_provider} source configuration: {', '.join(missing_source)}")
# Check destination config
missing_dest = [key for key, value in self.destination_config.items() if not value]
if missing_dest:
raise ConfigurationError(f"Missing {self.destination_provider} destination configuration: {', '.join(missing_dest)}")
def is_valid(self) -> bool:
"""Check if configuration is valid"""
try:
self._validate_config()
return True
except ConfigurationError:
return False

@ -0,0 +1,164 @@
"""
Main migration engine
"""
import os
import subprocess
import tempfile
import shutil
import logging
from typing import List, Dict
from pathlib import Path
from providers.base import SourceProvider, DestinationProvider, Repository, MigrationError
from .config import MigrationConfig
logger = logging.getLogger(__name__)
class MigrationEngine:
"""Main migration engine that orchestrates repository migrations"""
def __init__(self, source_provider: SourceProvider, destination_provider: DestinationProvider):
self.source_provider = source_provider
self.destination_provider = destination_provider
def migrate_repositories(self, repositories: List[Repository]) -> Dict[str, bool]:
"""Migrate a list of repositories"""
if not repositories:
logger.info("No repositories selected for migration")
return {}
results = {}
for repository in repositories:
target_name = repository.github_name or repository.name
if repository.github_name and repository.github_name != repository.name:
logger.info(f"Migrating repository: {repository.owner}/{repository.name}{target_name}")
else:
logger.info(f"Migrating repository: {repository.owner}/{repository.name}")
try:
success = self._migrate_single_repository(repository, target_name)
display_name = f"{repository.owner}/{repository.name}"
if target_name != repository.name:
display_name += f"{target_name}"
results[display_name] = success
except Exception as e:
logger.error(f"Unexpected error migrating {repository.name}: {e}")
display_name = f"{repository.owner}/{repository.name}"
if target_name != repository.name:
display_name += f"{target_name}"
results[display_name] = False
return results
def _migrate_single_repository(self, repository: Repository, target_name: str) -> bool:
"""Migrate a single repository"""
try:
# Create destination repository
logger.info(f"Creating destination repository: {target_name}")
success = self.destination_provider.create_repository(repository, target_name)
if not success:
logger.error(f"Failed to create destination repository: {target_name}")
return False
# Clone and push repository
logger.info(f"Starting clone and push for repository: {repository.name}")
return self._clone_and_push_repository(repository, target_name)
except MigrationError as e:
logger.error(f"Migration error for repository {repository.name}: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error migrating repository {repository.name}: {e}")
logger.error(f"Exception type: {type(e).__name__}")
logger.error(f"Exception args: {e.args}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return False
def _clone_and_push_repository(self, repository: Repository, target_name: str) -> bool:
"""Clone repository from source and push to destination"""
temp_dir = None
original_cwd = os.getcwd()
try:
# Create temporary directory
temp_dir = tempfile.mkdtemp(prefix=f"migration_{repository.name}_")
repo_path = Path(temp_dir) / repository.name
# Clone from source
source_url = self.source_provider.get_authenticated_clone_url(repository)
clone_cmd = ['git', 'clone', '--mirror', source_url, str(repo_path)]
logger.info(f"Cloning repository from {self.source_provider.__class__.__name__}: {repository.owner}/{repository.name}")
result = subprocess.run(clone_cmd, capture_output=True, text=True, cwd=temp_dir)
if result.returncode != 0:
logger.error(f"Failed to clone repository: {result.stderr}")
return False
# Verify repository was cloned successfully
if not repo_path.exists():
logger.error(f"Repository directory not found after cloning: {repo_path}")
return False
# Add destination remote
dest_url = self.destination_provider.get_authenticated_push_url(target_name)
add_remote_cmd = ['git', 'remote', 'add', 'destination', dest_url]
result = subprocess.run(add_remote_cmd, capture_output=True, text=True, cwd=str(repo_path))
if result.returncode != 0:
logger.error(f"Failed to add destination remote: {result.stderr}")
return False
# Push to destination
return self._push_to_destination(repository, target_name, repo_path)
except Exception as e:
logger.error(f"Error during repository migration: {e}")
return False
finally:
# Restore original working directory
try:
os.chdir(original_cwd)
except Exception as e:
logger.warning(f"Failed to restore original working directory: {e}")
# Clean up temporary directory
if temp_dir and os.path.exists(temp_dir):
try:
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
logger.warning(f"Failed to clean up temporary directory {temp_dir}: {e}")
def _push_to_destination(self, repository: Repository, target_name: str, repo_path: Path) -> bool:
"""Push repository to destination provider"""
if target_name != repository.name:
logger.info(f"Pushing repository to destination: {repository.name}{target_name}")
else:
logger.info(f"Pushing repository to destination: {repository.name}")
# Push branches
push_branches_cmd = ['git', 'push', '--all', 'destination']
result = subprocess.run(push_branches_cmd, capture_output=True, text=True, cwd=str(repo_path))
if result.returncode != 0:
logger.error(f"Failed to push branches to destination: {result.stderr}")
return False
# Push tags (non-blocking)
push_tags_cmd = ['git', 'push', '--tags', 'destination']
result = subprocess.run(push_tags_cmd, capture_output=True, text=True, cwd=str(repo_path))
if result.returncode != 0:
logger.warning(f"Failed to push tags to destination (this is often normal): {result.stderr}")
if target_name != repository.name:
logger.info(f"Successfully migrated repository: {repository.name}{target_name}")
else:
logger.info(f"Successfully migrated repository: {repository.name}")
return True

@ -1,87 +0,0 @@
"""
Gitea API client for repository operations
"""
import requests
import json
from typing import List, Dict, Optional
class GiteaClient:
"""Client for interacting with Gitea API"""
def __init__(self, base_url: str, token: str, username: str):
self.base_url = base_url.rstrip('/')
self.token = token
self.username = username
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'token {token}',
'Content-Type': 'application/json'
})
def get_user_repos(self) -> List[Dict]:
"""Get all repositories owned by the user"""
url = f"{self.base_url}/api/v1/user/repos"
params = {
'limit': 100, # Adjust as needed
'page': 1
}
all_repos = []
while True:
response = self.session.get(url, params=params)
response.raise_for_status()
repos = response.json()
if not repos:
break
all_repos.extend(repos)
params['page'] += 1
# Break if we got less than the limit (last page)
if len(repos) < params.get('limit', 100):
break
return all_repos
def get_repo_info(self, owner: str, repo_name: str) -> Optional[Dict]:
"""Get information about a specific repository"""
url = f"{self.base_url}/api/v1/repos/{owner}/{repo_name}"
try:
response = self.session.get(url)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException:
return None
def get_repo_clone_url(self, owner: str, repo_name: str) -> str:
"""Get the clone URL for a repository with authentication"""
return f"{self.base_url}/{owner}/{repo_name}.git"
def list_accessible_repos(self) -> List[Dict]:
"""List all repositories the user has access to (including organizations)"""
url = f"{self.base_url}/api/v1/user/repos"
params = {
'limit': 100,
'page': 1
}
all_repos = []
while True:
response = self.session.get(url, params=params)
response.raise_for_status()
repos = response.json()
if not repos:
break
all_repos.extend(repos)
params['page'] += 1
if len(repos) < params.get('limit', 100):
break
return all_repos

@ -1,54 +0,0 @@
"""
GitHub API client for repository operations
"""
from github import Github
from github.GithubException import GithubException
from typing import Optional, Dict
import logging
logger = logging.getLogger(__name__)
class GitHubClient:
"""Client for interacting with GitHub API"""
def __init__(self, token: str, username: str):
self.github = Github(token)
self.username = username
self.user = self.github.get_user()
def create_repository(self, repo_name: str, description: str = "", private: bool = False) -> bool:
"""Create a new repository on GitHub"""
try:
# Check if repository already exists
if self.repository_exists(repo_name):
logger.warning(f"Repository {repo_name} already exists on GitHub")
return True
self.user.create_repo(
name=repo_name,
description=description,
private=private,
auto_init=False # Don't auto-init since we'll push existing content
)
logger.info(f"Created repository: {repo_name}")
return True
except GithubException as e:
logger.error(f"Failed to create repository {repo_name}: {e}")
return False
def repository_exists(self, repo_name: str) -> bool:
"""Check if a repository exists"""
try:
self.user.get_repo(repo_name)
return True
except GithubException:
return False
def get_repo_clone_url(self, repo_name: str) -> str:
"""Get the clone URL for a GitHub repository"""
return f"https://github.com/{self.username}/{repo_name}.git"
def get_authenticated_clone_url(self, repo_name: str, token: str) -> str:
"""Get the authenticated clone URL for pushing"""
return f"https://{token}@github.com/{self.username}/{repo_name}.git"

@ -1,19 +1,26 @@
#!/usr/bin/env python3
"""
Gitea to GitHub Migration Tool
Repository Migration Tool
This script migrates repositories from Gitea to GitHub.
It can migrate all user repositories or specific ones.
A flexible tool for migrating repositories between different Git hosting providers.
Currently supports:
- Source providers: Gitea
- Destination providers: GitHub
Future providers can be easily added through the extensible provider system.
"""
import argparse
import logging
import sys
from colorama import init, Fore, Style
from pathlib import Path
from colorama import init, Fore, Style
from config import Config
from migration_tool import MigrationTool
from core.config import MigrationConfig
from core.migration_engine import MigrationEngine
from providers.factory import ProviderFactory
from providers.base import ConfigurationError, ProviderError, MigrationError
from ui.interactive_selector import select_repositories_interactive
# Initialize colorama for cross-platform colored output
init()
@ -37,9 +44,9 @@ def print_banner():
banner = f"""
{Fore.CYAN}
🚀 Gitea to GitHub Migration Tool 🚀
🚀 Repository Migration Tool 🚀
Migrates your repositories from Gitea to GitHub seamlessly
Migrate repositories between Git hosting providers
{Style.RESET_ALL}
"""
@ -68,14 +75,28 @@ def create_env_template():
env_file = Path('.env')
if not env_file.exists():
template = """# Gitea Configuration
GITEA_URL=https://your-gitea-instance.com
template = """# Source Provider Configuration
SOURCE_PROVIDER=gitea
GITEA_URL=https://codefirst.iut.uca.fr/git
GITEA_TOKEN=your_gitea_personal_access_token
GITEA_USERNAME=your_gitea_username
# GitHub Configuration
# Alternative source provider (GitLab)
# SOURCE_PROVIDER=gitlab
# GITLAB_URL=https://gitlab.com
# GITLAB_TOKEN=your_gitlab_token
# GITLAB_USERNAME=your_gitlab_username
# Destination Provider Configuration
DESTINATION_PROVIDER=github
GITHUB_TOKEN=your_github_personal_access_token
GITHUB_USERNAME=your_github_username
# Alternative destination provider (GitLab)
# DESTINATION_PROVIDER=gitlab
# GITLAB_DEST_URL=https://gitlab.com
# GITLAB_DEST_TOKEN=your_gitlab_dest_token
# GITLAB_DEST_USERNAME=your_gitlab_dest_username
"""
env_file.write_text(template)
print(f"{Fore.YELLOW}📝 Created .env template file. Please fill it with your credentials.{Style.RESET_ALL}")
@ -86,7 +107,7 @@ GITHUB_USERNAME=your_github_username
def main():
"""Main application entry point"""
parser = argparse.ArgumentParser(
description="Migrate repositories from Gitea to GitHub",
description="Migrate repositories between Git hosting providers",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
@ -96,6 +117,10 @@ Examples:
%(prog)s --repos owner/repo1 # Migrate repositories from other owners
%(prog)s --list # List available repositories
%(prog)s --verbose # Enable verbose logging
Supported providers:
Source: """ + ", ".join(ProviderFactory.get_available_source_providers()) + """
Destination: """ + ", ".join(ProviderFactory.get_available_destination_providers()) + """
"""
)
@ -145,23 +170,31 @@ Examples:
try:
# Initialize configuration
config = Config()
config = MigrationConfig()
# Create providers
source_provider = ProviderFactory.create_source_provider(
config.source_provider,
config.source_config
)
destination_provider = ProviderFactory.create_destination_provider(
config.destination_provider,
config.destination_config
)
# Initialize migration tool
migration_tool = MigrationTool(config)
# Initialize migration engine
migration_engine = MigrationEngine(source_provider, destination_provider)
# Handle list command
if args.list:
print(f"{Fore.CYAN}📋 Available repositories:{Style.RESET_ALL}")
repos = migration_tool.list_available_repos()
print(f"{Fore.CYAN}📋 Available repositories from {config.source_provider}:{Style.RESET_ALL}")
repos = source_provider.get_accessible_repositories()
for repo in repos:
owner = repo['owner']['login']
name = repo['name']
private = "🔒 Private" if repo.get('private', False) else "🌐 Public"
description = repo.get('description', 'No description')
private = "🔒 Private" if repo.private else "🌐 Public"
description = repo.description or 'No description'
print(f" {Fore.BLUE}{owner}/{name}{Style.RESET_ALL} - {private}")
print(f" {Fore.BLUE}{repo.owner}/{repo.name}{Style.RESET_ALL} - {private}")
if description:
print(f" 📝 {description}")
@ -171,14 +204,35 @@ Examples:
# Perform migration
if args.repos:
print(f"{Fore.CYAN}🎯 Migrating specific repositories: {', '.join(args.repos)}{Style.RESET_ALL}")
results = migration_tool.migrate_specific_repos(args.repos)
repositories = []
for repo_spec in args.repos:
if '/' in repo_spec:
owner, repo_name = repo_spec.split('/', 1)
else:
owner = config.source_config['username']
repo_name = repo_spec
repo = source_provider.get_repository_info(owner, repo_name)
if repo:
repositories.append(repo)
else:
print(f"{Fore.RED}⚠️ Repository {owner}/{repo_name} not found or not accessible{Style.RESET_ALL}")
results = migration_engine.migrate_repositories(repositories)
else:
# Get all accessible repositories
all_repos = source_provider.get_accessible_repositories()
if args.no_interactive:
print(f"{Fore.CYAN}🚀 Migrating all your repositories automatically...{Style.RESET_ALL}")
results = migration_tool.migrate_all_accessible_repos(interactive=False)
# Filter to only user's repositories
user_repos = source_provider.get_user_repositories()
results = migration_engine.migrate_repositories(user_repos)
else:
print(f"{Fore.CYAN}🎯 Interactive mode - select repositories to migrate{Style.RESET_ALL}")
results = migration_tool.migrate_all_accessible_repos(interactive=True)
username = config.source_config['username']
selected_repos = select_repositories_interactive(all_repos, username)
results = migration_engine.migrate_repositories(selected_repos)
# Print results
if results:
@ -186,11 +240,15 @@ Examples:
else:
print(f"{Fore.YELLOW}⚠️ No repositories found to migrate.{Style.RESET_ALL}")
except ValueError as e:
except ConfigurationError as e:
print(f"{Fore.RED}❌ Configuration error: {e}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}💡 Run '{sys.argv[0]} --setup' to create a configuration template.{Style.RESET_ALL}")
sys.exit(1)
except (ProviderError, MigrationError) as e:
print(f"{Fore.RED}❌ Migration error: {e}{Style.RESET_ALL}")
sys.exit(1)
except Exception as e:
logging.error(f"Unexpected error: {e}")
print(f"{Fore.RED}❌ An unexpected error occurred. Check migration.log for details.{Style.RESET_ALL}")

@ -1,220 +0,0 @@
"""
Main migration tool for transferring repositories from Gitea to GitHub
"""
import os
import subprocess
import tempfile
import shutil
import logging
from typing import List, Dict, Optional
from pathlib import Path
from config import Config
from gitea_client import GiteaClient
from github_client import GitHubClient
from interactive_selector import select_repositories_interactive
logger = logging.getLogger(__name__)
class MigrationTool:
"""Main tool for migrating repositories from Gitea to GitHub"""
def __init__(self, config: Config):
self.config = config
self.gitea_client = GiteaClient(
config.gitea_url,
config.gitea_token,
config.gitea_username
)
self.github_client = GitHubClient(
config.github_token,
config.github_username
)
def migrate_all_accessible_repos(self, interactive: bool = True) -> Dict[str, bool]:
"""Migrate repositories with interactive selection (default) or all user repos"""
logger.info("Fetching accessible repositories from Gitea...")
repos = self.gitea_client.list_accessible_repos()
if not repos:
logger.warning("No repositories found")
return {}
# Interactive selection (default behavior)
if interactive:
selected_repos = select_repositories_interactive(repos, self.config.gitea_username)
else:
# Non-interactive: only user's own repositories
selected_repos = [repo for repo in repos if repo['owner']['login'] == self.config.gitea_username]
if not selected_repos:
logger.info("No repositories selected for migration")
return {}
results = {}
for repo in selected_repos:
repo_name = repo['name']
repo_owner = repo['owner']['login']
github_name = repo.get('github_name', repo_name) # Use renamed name if available
if github_name != repo_name:
logger.info(f"Migrating repository: {repo_owner}/{repo_name}{github_name}")
else:
logger.info(f"Migrating repository: {repo_owner}/{repo_name}")
success = self.migrate_repository(repo)
display_name = f"{repo_owner}/{repo_name}"
if github_name != repo_name:
display_name += f"{github_name}"
results[display_name] = success
return results
def migrate_specific_repos(self, repo_specs: List[str]) -> Dict[str, bool]:
"""
Migrate specific repositories
repo_specs: List of repository specifications in format 'owner/repo' or just 'repo'
"""
results = {}
for repo_spec in repo_specs:
if '/' in repo_spec:
owner, repo_name = repo_spec.split('/', 1)
else:
owner = self.config.gitea_username
repo_name = repo_spec
logger.info(f"Migrating repository: {owner}/{repo_name}")
repo_info = self.gitea_client.get_repo_info(owner, repo_name)
if repo_info:
success = self.migrate_repository(repo_info)
results[f"{owner}/{repo_name}"] = success
else:
logger.error(f"Repository {owner}/{repo_name} not found or not accessible")
results[f"{owner}/{repo_name}"] = False
return results
def migrate_repository(self, repo_info: Dict) -> bool:
"""Migrate a single repository"""
repo_name = repo_info['name']
repo_owner = repo_info['owner']['login']
github_name = repo_info.get('github_name', repo_name) # Use renamed name if available
try:
# Create GitHub repository with the (possibly renamed) name
success = self.github_client.create_repository(
repo_name=github_name,
description=repo_info.get('description', ''),
private=repo_info.get('private', False)
)
if not success:
return False
# Clone and push repository
return self._clone_and_push_repo(repo_owner, repo_name, github_name)
except Exception as e:
logger.error(f"Failed to migrate repository {repo_name}: {e}")
return False
def _clone_and_push_repo(self, repo_owner: str, repo_name: str, github_name: str = None) -> bool:
"""Clone repository from Gitea and push to GitHub"""
if github_name is None:
github_name = repo_name
temp_dir = None
original_cwd = os.getcwd() # Save original working directory
try:
# Create temporary directory
temp_dir = tempfile.mkdtemp(prefix=f"migration_{repo_name}_")
repo_path = Path(temp_dir) / repo_name
# Clone from Gitea
gitea_url = self._get_authenticated_gitea_url(repo_owner, repo_name)
clone_cmd = ['git', 'clone', '--mirror', gitea_url, str(repo_path)]
logger.info(f"Cloning repository from Gitea: {repo_owner}/{repo_name}")
result = subprocess.run(clone_cmd, capture_output=True, text=True, cwd=temp_dir)
if result.returncode != 0:
logger.error(f"Failed to clone repository: {result.stderr}")
return False
# Verify repository was cloned successfully
if not repo_path.exists():
logger.error(f"Repository directory not found after cloning: {repo_path}")
return False
# Add GitHub remote (run command in the repository directory)
github_url = self.github_client.get_authenticated_clone_url(
github_name, # Use the GitHub name (possibly renamed)
self.config.github_token
)
add_remote_cmd = ['git', 'remote', 'add', 'github', github_url]
result = subprocess.run(add_remote_cmd, capture_output=True, text=True, cwd=str(repo_path))
if result.returncode != 0:
logger.error(f"Failed to add GitHub remote: {result.stderr}")
return False
# Push to GitHub (run command in the repository directory)
if github_name != repo_name:
logger.info(f"Pushing repository to GitHub: {repo_name}{github_name}")
else:
logger.info(f"Pushing repository to GitHub: {repo_name}")
# Push branches first
push_branches_cmd = ['git', 'push', '--all', 'github']
result = subprocess.run(push_branches_cmd, capture_output=True, text=True, cwd=str(repo_path))
if result.returncode != 0:
logger.error(f"Failed to push branches to GitHub: {result.stderr}")
return False
# Push tags
push_tags_cmd = ['git', 'push', '--tags', 'github']
result = subprocess.run(push_tags_cmd, capture_output=True, text=True, cwd=str(repo_path))
if result.returncode != 0:
logger.warning(f"Failed to push tags to GitHub (this is often normal): {result.stderr}")
# Don't fail the migration if only tags fail
if github_name != repo_name:
logger.info(f"Successfully migrated repository: {repo_name}{github_name}")
else:
logger.info(f"Successfully migrated repository: {repo_name}")
return True
except Exception as e:
logger.error(f"Error during repository migration: {e}")
return False
finally:
# Restore original working directory
try:
os.chdir(original_cwd)
except Exception as e:
logger.warning(f"Failed to restore original working directory: {e}")
# Clean up temporary directory
if temp_dir and os.path.exists(temp_dir):
try:
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
logger.warning(f"Failed to clean up temporary directory {temp_dir}: {e}")
def _get_authenticated_gitea_url(self, owner: str, repo_name: str) -> str:
"""Get authenticated Gitea URL for cloning"""
base_url = self.config.gitea_url.replace('https://', '').replace('http://', '')
return f"https://{self.config.gitea_username}:{self.config.gitea_token}@{base_url}/{owner}/{repo_name}.git"
def list_available_repos(self) -> List[Dict]:
"""List all repositories available for migration"""
return self.gitea_client.list_accessible_repos()

@ -0,0 +1,3 @@
"""
Providers package for different Git hosting services
"""

@ -0,0 +1,96 @@
"""
Abstract base classes for source and destination providers
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Optional
from dataclasses import dataclass
@dataclass
class Repository:
"""Repository data model"""
name: str
owner: str
description: str
private: bool
clone_url: str
ssh_url: Optional[str] = None
web_url: Optional[str] = None
default_branch: Optional[str] = None
github_name: Optional[str] = None # For renaming during migration
class SourceProvider(ABC):
"""Abstract base class for source providers (Gitea, GitLab, etc.)"""
def __init__(self, config: Dict):
self.config = config
self._validate_config()
@abstractmethod
def _validate_config(self) -> None:
"""Validate provider-specific configuration"""
pass
@abstractmethod
def get_user_repositories(self) -> List[Repository]:
"""Get repositories owned by the authenticated user"""
pass
@abstractmethod
def get_accessible_repositories(self) -> List[Repository]:
"""Get all repositories accessible to the authenticated user"""
pass
@abstractmethod
def get_repository_info(self, owner: str, name: str) -> Optional[Repository]:
"""Get information about a specific repository"""
pass
@abstractmethod
def get_authenticated_clone_url(self, repository: Repository) -> str:
"""Get authenticated clone URL for a repository"""
pass
class DestinationProvider(ABC):
"""Abstract base class for destination providers (GitHub, GitLab, etc.)"""
def __init__(self, config: Dict):
self.config = config
self._validate_config()
@abstractmethod
def _validate_config(self) -> None:
"""Validate provider-specific configuration"""
pass
@abstractmethod
def create_repository(self, repository: Repository, target_name: str) -> bool:
"""Create a new repository in the destination provider"""
pass
@abstractmethod
def repository_exists(self, name: str) -> bool:
"""Check if a repository exists"""
pass
@abstractmethod
def get_authenticated_push_url(self, name: str) -> str:
"""Get authenticated URL for pushing to repository"""
pass
class MigrationError(Exception):
"""Base exception for migration errors"""
pass
class ProviderError(MigrationError):
"""Exception for provider-specific errors"""
pass
class ConfigurationError(MigrationError):
"""Exception for configuration errors"""
pass

@ -0,0 +1,3 @@
"""
Destination providers for migration tool
"""

@ -0,0 +1,88 @@
"""
GitHub destination provider implementation
"""
import logging
from github import Github
from github.GithubException import GithubException
from typing import Dict
from ..base import DestinationProvider, Repository, ProviderError, ConfigurationError
logger = logging.getLogger(__name__)
class GitHubDestinationProvider(DestinationProvider):
"""GitHub destination provider implementation"""
def _validate_config(self) -> None:
"""Validate GitHub-specific configuration"""
required_keys = ['token', 'username']
missing = [key for key in required_keys if not self.config.get(key)]
if missing:
raise ConfigurationError(f"Missing GitHub configuration: {', '.join(missing)}")
self.token = self.config['token']
self.username = self.config['username']
try:
self.github = Github(self.token)
self.user = self.github.get_user()
except GithubException as e:
raise ConfigurationError(f"Failed to authenticate with GitHub: {e}")
def create_repository(self, repository: Repository, target_name: str) -> bool:
"""Create a new repository on GitHub"""
try:
# Check if repository already exists
if self.repository_exists(target_name):
logger.warning(f"Repository {target_name} already exists on GitHub")
return True
logger.info(f"Creating GitHub repository: {target_name}")
self.user.create_repo(
name=target_name,
description=repository.description or '',
private=repository.private,
auto_init=False # Don't auto-init since we'll push existing content
)
logger.info(f"Successfully created repository: {target_name}")
return True
except GithubException as e:
logger.error(f"GitHub API error creating repository {target_name}: {e}")
logger.error(f"GitHub error status: {e.status}")
logger.error(f"GitHub error data: {e.data}")
# Handle specific GitHub errors
if e.status == 422 and 'already exists' in str(e.data).lower():
logger.warning(f"Repository {target_name} already exists (422 error)")
return True
raise ProviderError(f"Failed to create GitHub repository: {e}")
except Exception as e:
logger.error(f"Unexpected error creating GitHub repository {target_name}: {e}")
logger.error(f"Exception type: {type(e).__name__}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
raise ProviderError(f"Unexpected error creating GitHub repository: {e}")
def repository_exists(self, name: str) -> bool:
"""Check if a repository exists"""
try:
repo = self.user.get_repo(name)
logger.debug(f"Repository {name} exists: {repo.full_name}")
return True
except GithubException as e:
if e.status == 404:
logger.debug(f"Repository {name} does not exist (404)")
return False
else:
logger.warning(f"Error checking if repository {name} exists: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error checking repository existence {name}: {e}")
return False
def get_authenticated_push_url(self, name: str) -> str:
"""Get authenticated URL for pushing to repository"""
return f"https://{self.token}@github.com/{self.username}/{name}.git"

@ -0,0 +1,105 @@
"""
GitLab destination provider implementation (Example for extensibility)
"""
import logging
import requests
from typing import Dict
from ..base import DestinationProvider, Repository, ProviderError, ConfigurationError
logger = logging.getLogger(__name__)
class GitLabDestinationProvider(DestinationProvider):
"""GitLab destination provider implementation"""
def _validate_config(self) -> None:
"""Validate GitLab-specific configuration"""
required_keys = ['url', 'token', 'username']
missing = [key for key in required_keys if not self.config.get(key)]
if missing:
raise ConfigurationError(f"Missing GitLab configuration: {', '.join(missing)}")
self.base_url = self.config['url'].rstrip('/')
self.token = self.config['token']
self.username = self.config['username']
# Setup HTTP session
self.session = requests.Session()
# GitLab uses different token formats - try Private-Token first
if self.token.startswith('glpat-'):
# Personal Access Token
self.session.headers.update({
'Private-Token': self.token,
'Content-Type': 'application/json'
})
else:
# OAuth token or other format
self.session.headers.update({
'Authorization': f'Bearer {self.token}',
'Content-Type': 'application/json'
})
# Verify authentication
try:
response = self.session.get(f"{self.base_url}/api/v4/user")
response.raise_for_status()
self.user_id = response.json()['id']
except requests.RequestException as e:
raise ConfigurationError(f"Failed to authenticate with GitLab: {e}")
def create_repository(self, repository: Repository, target_name: str) -> bool:
"""Create a new repository on GitLab"""
response = None
try:
# Check if repository already exists
if self.repository_exists(target_name):
logger.warning(f"Repository {target_name} already exists on GitLab")
return True
project_data = {
'name': target_name,
'description': repository.description or '',
'visibility': 'private' if repository.private else 'public',
'initialize_with_readme': False
}
response = self.session.post(
f"{self.base_url}/api/v4/projects",
json=project_data
)
response.raise_for_status()
logger.info(f"Created repository: {target_name}")
return True
except requests.RequestException as e:
logger.error(f"Failed to create repository {target_name}: {e}")
if response and response.status_code == 400:
# Repository might already exist or name is invalid
logger.warning(f"Repository creation failed, possibly already exists: {target_name}")
return self.repository_exists(target_name)
elif response and response.status_code == 409:
# Conflict - repository already exists
logger.warning(f"Repository {target_name} already exists (conflict)")
return True
raise ProviderError(f"Failed to create GitLab repository: {e}")
except Exception as e:
logger.error(f"Unexpected error creating repository {target_name}: {e}")
return False
def repository_exists(self, name: str) -> bool:
"""Check if a repository exists"""
project_path = f"{self.username}/{name}"
url = f"{self.base_url}/api/v4/projects/{requests.utils.quote(project_path, safe='')}"
try:
response = self.session.get(url)
return response.status_code == 200
except requests.RequestException:
return False
def get_authenticated_push_url(self, name: str) -> str:
"""Get authenticated URL for pushing to repository"""
base_url = self.base_url.replace('https://', '').replace('http://', '')
return f"https://oauth2:{self.token}@{base_url}/{self.username}/{name}.git"

@ -0,0 +1,63 @@
"""
Factory for creating provider instances
"""
from typing import Dict, Type
from .base import SourceProvider, DestinationProvider, ConfigurationError
from .source.gitea import GiteaSourceProvider
from .source.gitlab import GitLabSourceProvider
from .destination.github import GitHubDestinationProvider
from .destination.gitlab import GitLabDestinationProvider
class ProviderFactory:
"""Factory for creating provider instances"""
_source_providers: Dict[str, Type[SourceProvider]] = {
'gitea': GiteaSourceProvider,
'gitlab': GitLabSourceProvider,
}
_destination_providers: Dict[str, Type[DestinationProvider]] = {
'github': GitHubDestinationProvider,
'gitlab': GitLabDestinationProvider,
}
@classmethod
def create_source_provider(cls, provider_type: str, config: Dict) -> SourceProvider:
"""Create a source provider instance"""
if provider_type not in cls._source_providers:
available = ', '.join(cls._source_providers.keys())
raise ConfigurationError(f"Unknown source provider '{provider_type}'. Available: {available}")
provider_class = cls._source_providers[provider_type]
return provider_class(config)
@classmethod
def create_destination_provider(cls, provider_type: str, config: Dict) -> DestinationProvider:
"""Create a destination provider instance"""
if provider_type not in cls._destination_providers:
available = ', '.join(cls._destination_providers.keys())
raise ConfigurationError(f"Unknown destination provider '{provider_type}'. Available: {available}")
provider_class = cls._destination_providers[provider_type]
return provider_class(config)
@classmethod
def register_source_provider(cls, name: str, provider_class: Type[SourceProvider]) -> None:
"""Register a new source provider"""
cls._source_providers[name] = provider_class
@classmethod
def register_destination_provider(cls, name: str, provider_class: Type[DestinationProvider]) -> None:
"""Register a new destination provider"""
cls._destination_providers[name] = provider_class
@classmethod
def get_available_source_providers(cls) -> list:
"""Get list of available source providers"""
return list(cls._source_providers.keys())
@classmethod
def get_available_destination_providers(cls) -> list:
"""Get list of available destination providers"""
return list(cls._destination_providers.keys())

@ -0,0 +1,3 @@
"""
Source providers for migration tool
"""

@ -0,0 +1,92 @@
"""
Gitea source provider implementation
"""
import requests
from typing import List, Dict, Optional
from ..base import SourceProvider, Repository, ProviderError, ConfigurationError
class GiteaSourceProvider(SourceProvider):
"""Gitea source provider implementation"""
def _validate_config(self) -> None:
"""Validate Gitea-specific configuration"""
required_keys = ['url', 'token', 'username']
missing = [key for key in required_keys if not self.config.get(key)]
if missing:
raise ConfigurationError(f"Missing Gitea configuration: {', '.join(missing)}")
self.base_url = self.config['url'].rstrip('/')
self.token = self.config['token']
self.username = self.config['username']
# Setup HTTP session
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'token {self.token}',
'Content-Type': 'application/json'
})
def get_user_repositories(self) -> List[Repository]:
"""Get repositories owned by the authenticated user"""
repositories = self.get_accessible_repositories()
return [repo for repo in repositories if repo.owner == self.username]
def get_accessible_repositories(self) -> List[Repository]:
"""Get all repositories accessible to the authenticated user"""
url = f"{self.base_url}/api/v1/user/repos"
params = {'limit': 100, 'page': 1}
all_repos = []
while True:
try:
response = self.session.get(url, params=params)
response.raise_for_status()
repos_data = response.json()
if not repos_data:
break
repos = [self._parse_repository(repo_data) for repo_data in repos_data]
all_repos.extend(repos)
params['page'] += 1
if len(repos_data) < params['limit']:
break
except requests.RequestException as e:
raise ProviderError(f"Failed to fetch repositories from Gitea: {e}")
return all_repos
def get_repository_info(self, owner: str, name: str) -> Optional[Repository]:
"""Get information about a specific repository"""
url = f"{self.base_url}/api/v1/repos/{owner}/{name}"
try:
response = self.session.get(url)
response.raise_for_status()
return self._parse_repository(response.json())
except requests.RequestException:
return None
def get_authenticated_clone_url(self, repository: Repository) -> str:
"""Get authenticated clone URL for a repository"""
base_url = self.base_url.replace('https://', '').replace('http://', '')
return f"https://{self.username}:{self.token}@{base_url}/{repository.owner}/{repository.name}.git"
def _parse_repository(self, repo_data: Dict) -> Repository:
"""Parse repository data from Gitea API response"""
return Repository(
name=repo_data['name'],
owner=repo_data['owner']['login'],
description=repo_data.get('description', ''),
private=repo_data.get('private', False),
clone_url=repo_data['clone_url'],
ssh_url=repo_data.get('ssh_url'),
web_url=repo_data.get('html_url'),
default_branch=repo_data.get('default_branch', 'main')
)

@ -0,0 +1,111 @@
"""
GitLab source provider implementation (Example for extensibility)
"""
import requests
from typing import List, Dict, Optional
from ..base import SourceProvider, Repository, ProviderError, ConfigurationError
class GitLabSourceProvider(SourceProvider):
"""GitLab source provider implementation"""
def _validate_config(self) -> None:
"""Validate GitLab-specific configuration"""
required_keys = ['url', 'token', 'username']
missing = [key for key in required_keys if not self.config.get(key)]
if missing:
raise ConfigurationError(f"Missing GitLab configuration: {', '.join(missing)}")
self.base_url = self.config['url'].rstrip('/')
self.token = self.config['token']
self.username = self.config['username']
# Setup HTTP session
self.session = requests.Session()
# GitLab uses different token formats - try Private-Token first
if self.token.startswith('glpat-'):
# Personal Access Token
self.session.headers.update({
'Private-Token': self.token,
'Content-Type': 'application/json'
})
else:
# OAuth token or other format
self.session.headers.update({
'Authorization': f'Bearer {self.token}',
'Content-Type': 'application/json'
})
def get_user_repositories(self) -> List[Repository]:
"""Get repositories owned by the authenticated user"""
repositories = self.get_accessible_repositories()
return [repo for repo in repositories if repo.owner == self.username]
def get_accessible_repositories(self) -> List[Repository]:
"""Get all repositories accessible to the authenticated user"""
url = f"{self.base_url}/api/v4/projects"
params = {
'membership': 'true',
'per_page': 100,
'page': 1
}
all_repos = []
while True:
try:
response = self.session.get(url, params=params)
response.raise_for_status()
repos_data = response.json()
if not repos_data:
break
repos = [self._parse_repository(repo_data) for repo_data in repos_data]
all_repos.extend(repos)
# Check if there are more pages
if 'X-Next-Page' not in response.headers:
break
params['page'] += 1
except requests.RequestException as e:
raise ProviderError(f"Failed to fetch repositories from GitLab: {e}")
return all_repos
def get_repository_info(self, owner: str, name: str) -> Optional[Repository]:
"""Get information about a specific repository"""
# GitLab uses project ID or namespace/project format
project_path = f"{owner}/{name}"
url = f"{self.base_url}/api/v4/projects/{requests.utils.quote(project_path, safe='')}"
try:
response = self.session.get(url)
response.raise_for_status()
return self._parse_repository(response.json())
except requests.RequestException:
return None
def get_authenticated_clone_url(self, repository: Repository) -> str:
"""Get authenticated clone URL for a repository"""
base_url = self.base_url.replace('https://', '').replace('http://', '')
return f"https://oauth2:{self.token}@{base_url}/{repository.owner}/{repository.name}.git"
def _parse_repository(self, repo_data: Dict) -> Repository:
"""Parse repository data from GitLab API response"""
namespace = repo_data['namespace']
owner = namespace['path'] if namespace['kind'] == 'user' else namespace['full_path']
return Repository(
name=repo_data['name'],
owner=owner,
description=repo_data.get('description', ''),
private=repo_data.get('visibility') == 'private',
clone_url=repo_data['http_url_to_repo'],
ssh_url=repo_data.get('ssh_url_to_repo'),
web_url=repo_data.get('web_url'),
default_branch=repo_data.get('default_branch', 'main')
)

@ -12,4 +12,4 @@ else
fi
# Run the migration tool with all passed arguments
python migrate.py "$@"
python main.py "$@"

@ -0,0 +1,3 @@
"""
User interface components
"""

@ -4,21 +4,22 @@ Interactive repository selector for migration tool
import sys
import termios
import tty
from typing import List, Dict, Set
from typing import List, Set
from colorama import Fore, Style, init
from providers.base import Repository
init()
class InteractiveSelector:
"""Interactive repository selector with keyboard navigation"""
def __init__(self, repositories: List[Dict], username: str):
def __init__(self, repositories: List[Repository], username: str):
self.username = username
# Sort repositories: user's repos first, then others, both alphabetically
self.repositories = self._sort_repositories(repositories, username)
# Only select user's own repositories by default
self.selected = set(i for i, repo in enumerate(self.repositories)
if repo['owner']['login'] == username)
if repo.owner == username)
self.current_index = 0
self.page_size = 15 # Number of repos to show per page
self.current_page = 0
@ -73,7 +74,7 @@ class InteractiveSelector:
is_current = i == self.current_index
# Check if we need to add a separator
owner = repo['owner']['login']
owner = repo.owner
is_own_repo = owner == self.username
current_owner_type = "own" if is_own_repo else "others"
@ -87,11 +88,11 @@ class InteractiveSelector:
checkbox = "☑️ " if is_selected else ""
# Repository info
name = repo['name']
private = "🔒" if repo.get('private', False) else "🌐"
name = repo.name
private = "🔒" if repo.private else "🌐"
ownership_indicator = "👤" if is_own_repo else "👥"
description = repo.get('description', 'No description')[:45]
if len(repo.get('description', '')) > 45:
description = (repo.description or 'No description')[:45]
if len(repo.description or '') > 45:
description += "..."
# Highlight current selection
@ -167,7 +168,20 @@ class InteractiveSelector:
self.current_page += 1
self.current_index = self.current_page * self.page_size
def _rename_repositories_interface(self, selected_repos: List[Dict]) -> List[Dict]:
def _sort_repositories(self, repositories: List[Repository], username: str) -> List[Repository]:
"""Sort repositories: user's repos first, then others, both alphabetically"""
def sort_key(repo):
owner = repo.owner
name = repo.name.lower() # Case-insensitive sorting
is_user_repo = owner == username
# Return tuple: (is_not_user_repo, owner.lower(), name)
# This will sort user repos first (False < True), then alphabetically
return (not is_user_repo, owner.lower(), name)
return sorted(repositories, key=sort_key)
def _rename_repositories_interface(self, selected_repos: List[Repository]) -> List[Repository]:
"""Interface for renaming selected repositories"""
print('\033[2J\033[H', end='') # Clear screen
@ -182,9 +196,9 @@ class InteractiveSelector:
renamed_repos = []
for i, repo in enumerate(selected_repos, 1):
owner = repo['owner']['login']
original_name = repo['name']
private = "🔒" if repo.get('private', False) else "🌐"
owner = repo.owner
original_name = repo.name
private = "🔒" if repo.private else "🌐"
print(f"{Fore.YELLOW}📦 Repository {i}/{len(selected_repos)}:{Style.RESET_ALL}")
print(f" Source: {Fore.BLUE}{owner}/{original_name}{Style.RESET_ALL} {private}")
@ -204,10 +218,9 @@ class InteractiveSelector:
new_name = original_name
print(f" {Fore.CYAN} Keeping original name: {original_name}{Style.RESET_ALL}")
# Create new repo dict with updated name
renamed_repo = repo.copy()
renamed_repo['github_name'] = new_name # Add field for GitHub name
renamed_repos.append(renamed_repo)
# Update repository with new GitHub name
repo.github_name = new_name
renamed_repos.append(repo)
print()
# Summary
@ -215,10 +228,10 @@ class InteractiveSelector:
print(f"\n{Fore.CYAN}📋 Migration summary:{Style.RESET_ALL}")
for repo in renamed_repos:
owner = repo['owner']['login']
original_name = repo['name']
github_name = repo['github_name']
private = "🔒" if repo.get('private', False) else "🌐"
owner = repo.owner
original_name = repo.name
github_name = repo.github_name
private = "🔒" if repo.private else "🌐"
if original_name != github_name:
print(f"{Fore.BLUE}{owner}/{original_name}{Style.RESET_ALL}{Fore.GREEN}{github_name}{Style.RESET_ALL} {private}")
@ -228,19 +241,6 @@ class InteractiveSelector:
input(f"\n{Fore.YELLOW}Press ENTER to continue...{Style.RESET_ALL}")
return renamed_repos
def _sort_repositories(self, repositories: List[Dict], username: str) -> List[Dict]:
"""Sort repositories: user's repos first, then others, both alphabetically"""
def sort_key(repo):
owner = repo['owner']['login']
name = repo['name'].lower() # Case-insensitive sorting
is_user_repo = owner == username
# Return tuple: (is_not_user_repo, owner.lower(), name)
# This will sort user repos first (False < True), then alphabetically
return (not is_user_repo, owner.lower(), name)
return sorted(repositories, key=sort_key)
def _is_valid_repo_name(self, name: str) -> bool:
"""Validate GitHub repository name"""
if not name:
@ -258,7 +258,7 @@ class InteractiveSelector:
import re
return bool(re.match(r'^[a-zA-Z0-9._-]+$', name))
def run(self) -> List[Dict]:
def run(self) -> List[Repository]:
"""Run interactive selection and return selected repositories"""
if not self.repositories:
print(f"{Fore.YELLOW}⚠️ No repositories found.{Style.RESET_ALL}")
@ -301,9 +301,9 @@ class InteractiveSelector:
print(f"{Fore.GREEN}✅ Selected {len(selected_repos)} repositories for migration:{Style.RESET_ALL}\n")
for repo in selected_repos:
owner = repo['owner']['login']
name = repo['name']
private = "🔒" if repo.get('private', False) else "🌐"
owner = repo.owner
name = repo.name
private = "🔒" if repo.private else "🌐"
print(f"{Fore.BLUE}{owner}/{name}{Style.RESET_ALL} {private}")
# Ask if user wants to rename repositories
@ -321,12 +321,12 @@ class InteractiveSelector:
return selected_repos
def select_repositories_interactive(repositories: List[Dict], username: str) -> List[Dict]:
def select_repositories_interactive(repositories: List[Repository], username: str) -> List[Repository]:
"""
Interactive repository selection interface
Args:
repositories: List of repository dictionaries from Gitea API
repositories: List of Repository objects from source provider
username: Current user's username to distinguish own repos
Returns:
Loading…
Cancel
Save