""" Vultr API Client for MCP Server """ import os from typing import Dict, List, Any, Optional import vultr_python_client from vultr_python_client import ApiClient, Configuration from vultr_python_client.apis.tag_to_api import ( InstancesApi, AccountApi, BaremetalApi, BlockApi, DnsApi, FirewallApi, KubernetesApi, LoadBalancerApi, S3Api, ReservedIpApi, SnapshotApi, SshApi, StartupApi, UsersApi, VPCsApi ) from dotenv import load_dotenv load_dotenv() class VultrClient: """Client for interacting with Vultr API""" def __init__(self, api_key: Optional[str] = None): """Initialize Vultr client with API key""" self.api_key = api_key or os.getenv('VULTR_API_KEY') if not self.api_key: raise ValueError("VULTR_API_KEY not found in environment variables") # Configure API client configuration = Configuration() configuration.api_key['Authorization'] = self.api_key configuration.api_key_prefix['Authorization'] = 'Bearer' self.api_client = ApiClient(configuration) # Initialize API instances self.instances = InstancesApi(self.api_client) self.account = AccountApi(self.api_client) self.bare_metal = BaremetalApi(self.api_client) self.block_storage = BlockApi(self.api_client) self.dns = DnsApi(self.api_client) self.firewall = FirewallApi(self.api_client) self.kubernetes = KubernetesApi(self.api_client) self.load_balancer = LoadBalancerApi(self.api_client) self.object_storage = S3Api(self.api_client) self.reserved_ip = ReservedIpApi(self.api_client) self.snapshot = SnapshotApi(self.api_client) self.ssh_key = SshApi(self.api_client) self.startup_script = StartupApi(self.api_client) self.user = UsersApi(self.api_client) self.vpc = VPCsApi(self.api_client) # Account methods def get_account_info(self) -> Dict[str, Any]: """Get account information""" try: account = self.account.get_account() return { 'name': account.account.name, 'email': account.account.email, 'balance': account.account.balance, 'pending_charges': account.account.pending_charges, } except Exception as e: return {'error': str(e)} # Instance methods def list_instances(self, per_page: int = 100) -> List[Dict[str, Any]]: """List all instances""" try: instances = self.instances.list_instances(per_page=per_page) result = [] for instance in instances.instances: result.append({ 'id': instance.id, 'label': instance.label, 'region': instance.region, 'plan': instance.plan, 'status': instance.status, 'power_status': instance.power_status, 'os': instance.os, 'ram': instance.ram, 'disk': instance.disk, 'main_ip': instance.main_ip, 'created': instance.date_created, }) return result except Exception as e: return [{'error': str(e)}] def create_instance(self, **kwargs) -> Dict[str, Any]: """Create a new instance""" try: # Required parameters: region, plan, os_id instance = self.instances.create_instance(**kwargs) return { 'id': instance.instance.id, 'message': 'Instance created successfully', 'details': instance.instance.to_dict() } except Exception as e: return {'error': str(e)} def get_instance(self, instance_id: str) -> Dict[str, Any]: """Get instance details""" try: instance = self.instances.get_instance(instance_id) return instance.instance.to_dict() except Exception as e: return {'error': str(e)} def delete_instance(self, instance_id: str) -> Dict[str, Any]: """Delete an instance""" try: self.instances.delete_instance(instance_id) return {'message': f'Instance {instance_id} deleted successfully'} except Exception as e: return {'error': str(e)} def start_instance(self, instance_id: str) -> Dict[str, Any]: """Start an instance""" try: self.instances.start_instance(instance_id) return {'message': f'Instance {instance_id} started successfully'} except Exception as e: return {'error': str(e)} def stop_instance(self, instance_id: str) -> Dict[str, Any]: """Stop an instance""" try: self.instances.stop_instance(instance_id) return {'message': f'Instance {instance_id} stopped successfully'} except Exception as e: return {'error': str(e)} def reboot_instance(self, instance_id: str) -> Dict[str, Any]: """Reboot an instance""" try: self.instances.reboot_instance(instance_id) return {'message': f'Instance {instance_id} rebooted successfully'} except Exception as e: return {'error': str(e)} # Resource discovery methods def list_regions(self) -> List[Dict[str, Any]]: """List available regions""" try: regions = self.instances.list_regions() result = [] for region in regions.regions: result.append({ 'id': region.id, 'city': region.city, 'country': region.country, 'continent': region.continent, 'options': region.options, }) return result except Exception as e: return [{'error': str(e)}] def list_plans(self) -> List[Dict[str, Any]]: """List available plans""" try: plans = self.instances.list_plans() result = [] for plan in plans.plans: result.append({ 'id': plan.id, 'vcpu_count': plan.vcpu_count, 'ram': plan.ram, 'disk': plan.disk, 'disk_count': plan.disk_count, 'bandwidth': plan.bandwidth, 'monthly_cost': plan.monthly_cost, 'type': plan.type, }) return result except Exception as e: return [{'error': str(e)}] def list_os(self) -> List[Dict[str, Any]]: """List available operating systems""" try: os_list = self.instances.list_os() result = [] for os_item in os_list.os: result.append({ 'id': os_item.id, 'name': os_item.name, 'arch': os_item.arch, 'family': os_item.family, }) return result except Exception as e: return [{'error': str(e)}] # DNS Management Methods def list_dns_domains(self) -> List[Dict[str, Any]]: """List all DNS domains""" try: domains = self.dns.list_dns_domains() result = [] for domain in domains.domains: result.append({ 'domain': domain.domain, 'date_created': domain.date_created, }) return result except Exception as e: return [{'error': str(e)}] def create_dns_domain(self, domain: str, ip: str) -> Dict[str, Any]: """Create a new DNS domain""" try: result = self.dns.create_dns_domain({ 'domain': domain, 'ip': ip }) return { 'message': f'DNS domain {domain} created successfully', 'domain': domain, 'ip': ip } except Exception as e: return {'error': str(e)} def delete_dns_domain(self, domain: str) -> Dict[str, Any]: """Delete a DNS domain""" try: self.dns.delete_dns_domain(domain) return {'message': f'DNS domain {domain} deleted successfully'} except Exception as e: return {'error': str(e)} def get_dns_domain(self, domain: str) -> Dict[str, Any]: """Get DNS domain details""" try: domain_info = self.dns.get_dns_domain(domain) return { 'domain': domain_info.dns_domain.domain, 'date_created': domain_info.dns_domain.date_created, } except Exception as e: return {'error': str(e)} def list_dns_records(self, domain: str) -> List[Dict[str, Any]]: """List DNS records for a domain""" try: records = self.dns.list_dns_domain_records(domain) result = [] for record in records.records: result.append({ 'id': record.id, 'type': record.type, 'name': record.name, 'data': record.data, 'priority': record.priority, 'ttl': record.ttl, }) return result except Exception as e: return [{'error': str(e)}] def create_dns_record(self, domain: str, record_type: str, name: str, data: str, ttl: int = 300, priority: int = 0) -> Dict[str, Any]: """Create a new DNS record""" try: result = self.dns.create_dns_domain_record(domain, { 'type': record_type, 'name': name, 'data': data, 'ttl': ttl, 'priority': priority }) return { 'message': f'DNS record created successfully', 'record_id': result.record.id, 'domain': domain, 'type': record_type, 'name': name, 'data': data } except Exception as e: return {'error': str(e)} def update_dns_record(self, domain: str, record_id: str, **kwargs) -> Dict[str, Any]: """Update a DNS record""" try: self.dns.update_dns_domain_record(domain, record_id, kwargs) return { 'message': f'DNS record {record_id} updated successfully', 'domain': domain, 'record_id': record_id } except Exception as e: return {'error': str(e)} def delete_dns_record(self, domain: str, record_id: str) -> Dict[str, Any]: """Delete a DNS record""" try: self.dns.delete_dns_domain_record(domain, record_id) return { 'message': f'DNS record {record_id} deleted successfully', 'domain': domain, 'record_id': record_id } except Exception as e: return {'error': str(e)}