feat: add NetBox MCP server for infrastructure management
Comprehensive MCP server covering the entire NetBox REST API: - DCIM: sites, racks, devices, interfaces, cables, power - IPAM: prefixes, IP addresses, VLANs, VRFs, ASNs - Circuits: providers, circuits, terminations - Virtualization: clusters, VMs, VM interfaces - Tenancy: tenants, contacts, contact assignments - VPN: tunnels, IKE/IPSec policies, L2VPN - Wireless: WLANs, links, groups - Extras: tags, custom fields, webhooks, audit log Features: - 100+ MCP tools for full CRUD operations - Auto-pagination handling - Hybrid config (system + project level) - Available prefix/IP allocation support 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
428
mcp-servers/netbox/mcp_server/tools/vpn.py
Normal file
428
mcp-servers/netbox/mcp_server/tools/vpn.py
Normal file
@@ -0,0 +1,428 @@
|
||||
"""
|
||||
VPN tools for NetBox MCP Server.
|
||||
|
||||
Covers: Tunnels, Tunnel Groups, Tunnel Terminations, IKE/IPSec Policies, and L2VPN.
|
||||
"""
|
||||
import logging
|
||||
from typing import List, Dict, Optional, Any
|
||||
from ..netbox_client import NetBoxClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VPNTools:
|
||||
"""Tools for VPN operations in NetBox"""
|
||||
|
||||
def __init__(self, client: NetBoxClient):
|
||||
self.client = client
|
||||
self.base_endpoint = 'vpn'
|
||||
|
||||
# ==================== Tunnel Groups ====================
|
||||
|
||||
async def list_tunnel_groups(
|
||||
self,
|
||||
name: Optional[str] = None,
|
||||
slug: Optional[str] = None,
|
||||
**kwargs
|
||||
) -> List[Dict]:
|
||||
"""List all tunnel groups."""
|
||||
params = {k: v for k, v in {'name': name, 'slug': slug, **kwargs}.items() if v is not None}
|
||||
return self.client.list(f'{self.base_endpoint}/tunnel-groups', params=params)
|
||||
|
||||
async def get_tunnel_group(self, id: int) -> Dict:
|
||||
"""Get a specific tunnel group by ID."""
|
||||
return self.client.get(f'{self.base_endpoint}/tunnel-groups', id)
|
||||
|
||||
async def create_tunnel_group(
|
||||
self,
|
||||
name: str,
|
||||
slug: str,
|
||||
description: Optional[str] = None,
|
||||
**kwargs
|
||||
) -> Dict:
|
||||
"""Create a new tunnel group."""
|
||||
data = {'name': name, 'slug': slug, **kwargs}
|
||||
if description:
|
||||
data['description'] = description
|
||||
return self.client.create(f'{self.base_endpoint}/tunnel-groups', data)
|
||||
|
||||
async def update_tunnel_group(self, id: int, **kwargs) -> Dict:
|
||||
"""Update a tunnel group."""
|
||||
return self.client.patch(f'{self.base_endpoint}/tunnel-groups', id, kwargs)
|
||||
|
||||
async def delete_tunnel_group(self, id: int) -> None:
|
||||
"""Delete a tunnel group."""
|
||||
self.client.delete(f'{self.base_endpoint}/tunnel-groups', id)
|
||||
|
||||
# ==================== Tunnels ====================
|
||||
|
||||
async def list_tunnels(
|
||||
self,
|
||||
name: Optional[str] = None,
|
||||
status: Optional[str] = None,
|
||||
group_id: Optional[int] = None,
|
||||
encapsulation: Optional[str] = None,
|
||||
tenant_id: Optional[int] = None,
|
||||
**kwargs
|
||||
) -> List[Dict]:
|
||||
"""List all tunnels with optional filtering."""
|
||||
params = {k: v for k, v in {
|
||||
'name': name, 'status': status, 'group_id': group_id,
|
||||
'encapsulation': encapsulation, 'tenant_id': tenant_id, **kwargs
|
||||
}.items() if v is not None}
|
||||
return self.client.list(f'{self.base_endpoint}/tunnels', params=params)
|
||||
|
||||
async def get_tunnel(self, id: int) -> Dict:
|
||||
"""Get a specific tunnel by ID."""
|
||||
return self.client.get(f'{self.base_endpoint}/tunnels', id)
|
||||
|
||||
async def create_tunnel(
|
||||
self,
|
||||
name: str,
|
||||
status: str = 'active',
|
||||
encapsulation: str = 'ipsec-tunnel',
|
||||
group: Optional[int] = None,
|
||||
ipsec_profile: Optional[int] = None,
|
||||
tenant: Optional[int] = None,
|
||||
tunnel_id: Optional[int] = None,
|
||||
description: Optional[str] = None,
|
||||
**kwargs
|
||||
) -> Dict:
|
||||
"""Create a new tunnel."""
|
||||
data = {'name': name, 'status': status, 'encapsulation': encapsulation, **kwargs}
|
||||
for key, val in [
|
||||
('group', group), ('ipsec_profile', ipsec_profile),
|
||||
('tenant', tenant), ('tunnel_id', tunnel_id), ('description', description)
|
||||
]:
|
||||
if val is not None:
|
||||
data[key] = val
|
||||
return self.client.create(f'{self.base_endpoint}/tunnels', data)
|
||||
|
||||
async def update_tunnel(self, id: int, **kwargs) -> Dict:
|
||||
"""Update a tunnel."""
|
||||
return self.client.patch(f'{self.base_endpoint}/tunnels', id, kwargs)
|
||||
|
||||
async def delete_tunnel(self, id: int) -> None:
|
||||
"""Delete a tunnel."""
|
||||
self.client.delete(f'{self.base_endpoint}/tunnels', id)
|
||||
|
||||
# ==================== Tunnel Terminations ====================
|
||||
|
||||
async def list_tunnel_terminations(
|
||||
self,
|
||||
tunnel_id: Optional[int] = None,
|
||||
role: Optional[str] = None,
|
||||
**kwargs
|
||||
) -> List[Dict]:
|
||||
"""List all tunnel terminations."""
|
||||
params = {k: v for k, v in {
|
||||
'tunnel_id': tunnel_id, 'role': role, **kwargs
|
||||
}.items() if v is not None}
|
||||
return self.client.list(f'{self.base_endpoint}/tunnel-terminations', params=params)
|
||||
|
||||
async def get_tunnel_termination(self, id: int) -> Dict:
|
||||
"""Get a specific tunnel termination by ID."""
|
||||
return self.client.get(f'{self.base_endpoint}/tunnel-terminations', id)
|
||||
|
||||
async def create_tunnel_termination(
|
||||
self,
|
||||
tunnel: int,
|
||||
role: str,
|
||||
termination_type: str,
|
||||
termination_id: int,
|
||||
outside_ip: Optional[int] = None,
|
||||
**kwargs
|
||||
) -> Dict:
|
||||
"""Create a new tunnel termination."""
|
||||
data = {
|
||||
'tunnel': tunnel, 'role': role,
|
||||
'termination_type': termination_type, 'termination_id': termination_id, **kwargs
|
||||
}
|
||||
if outside_ip:
|
||||
data['outside_ip'] = outside_ip
|
||||
return self.client.create(f'{self.base_endpoint}/tunnel-terminations', data)
|
||||
|
||||
async def update_tunnel_termination(self, id: int, **kwargs) -> Dict:
|
||||
"""Update a tunnel termination."""
|
||||
return self.client.patch(f'{self.base_endpoint}/tunnel-terminations', id, kwargs)
|
||||
|
||||
async def delete_tunnel_termination(self, id: int) -> None:
|
||||
"""Delete a tunnel termination."""
|
||||
self.client.delete(f'{self.base_endpoint}/tunnel-terminations', id)
|
||||
|
||||
# ==================== IKE Proposals ====================
|
||||
|
||||
async def list_ike_proposals(self, name: Optional[str] = None, **kwargs) -> List[Dict]:
|
||||
"""List all IKE proposals."""
|
||||
params = {k: v for k, v in {'name': name, **kwargs}.items() if v is not None}
|
||||
return self.client.list(f'{self.base_endpoint}/ike-proposals', params=params)
|
||||
|
||||
async def get_ike_proposal(self, id: int) -> Dict:
|
||||
"""Get a specific IKE proposal by ID."""
|
||||
return self.client.get(f'{self.base_endpoint}/ike-proposals', id)
|
||||
|
||||
async def create_ike_proposal(
|
||||
self,
|
||||
name: str,
|
||||
authentication_method: str,
|
||||
encryption_algorithm: str,
|
||||
authentication_algorithm: str,
|
||||
group: int,
|
||||
sa_lifetime: Optional[int] = None,
|
||||
description: Optional[str] = None,
|
||||
**kwargs
|
||||
) -> Dict:
|
||||
"""Create a new IKE proposal."""
|
||||
data = {
|
||||
'name': name, 'authentication_method': authentication_method,
|
||||
'encryption_algorithm': encryption_algorithm,
|
||||
'authentication_algorithm': authentication_algorithm, 'group': group, **kwargs
|
||||
}
|
||||
if sa_lifetime:
|
||||
data['sa_lifetime'] = sa_lifetime
|
||||
if description:
|
||||
data['description'] = description
|
||||
return self.client.create(f'{self.base_endpoint}/ike-proposals', data)
|
||||
|
||||
async def update_ike_proposal(self, id: int, **kwargs) -> Dict:
|
||||
"""Update an IKE proposal."""
|
||||
return self.client.patch(f'{self.base_endpoint}/ike-proposals', id, kwargs)
|
||||
|
||||
async def delete_ike_proposal(self, id: int) -> None:
|
||||
"""Delete an IKE proposal."""
|
||||
self.client.delete(f'{self.base_endpoint}/ike-proposals', id)
|
||||
|
||||
# ==================== IKE Policies ====================
|
||||
|
||||
async def list_ike_policies(self, name: Optional[str] = None, **kwargs) -> List[Dict]:
|
||||
"""List all IKE policies."""
|
||||
params = {k: v for k, v in {'name': name, **kwargs}.items() if v is not None}
|
||||
return self.client.list(f'{self.base_endpoint}/ike-policies', params=params)
|
||||
|
||||
async def get_ike_policy(self, id: int) -> Dict:
|
||||
"""Get a specific IKE policy by ID."""
|
||||
return self.client.get(f'{self.base_endpoint}/ike-policies', id)
|
||||
|
||||
async def create_ike_policy(
|
||||
self,
|
||||
name: str,
|
||||
version: int,
|
||||
mode: str,
|
||||
proposals: List[int],
|
||||
preshared_key: Optional[str] = None,
|
||||
description: Optional[str] = None,
|
||||
**kwargs
|
||||
) -> Dict:
|
||||
"""Create a new IKE policy."""
|
||||
data = {'name': name, 'version': version, 'mode': mode, 'proposals': proposals, **kwargs}
|
||||
if preshared_key:
|
||||
data['preshared_key'] = preshared_key
|
||||
if description:
|
||||
data['description'] = description
|
||||
return self.client.create(f'{self.base_endpoint}/ike-policies', data)
|
||||
|
||||
async def update_ike_policy(self, id: int, **kwargs) -> Dict:
|
||||
"""Update an IKE policy."""
|
||||
return self.client.patch(f'{self.base_endpoint}/ike-policies', id, kwargs)
|
||||
|
||||
async def delete_ike_policy(self, id: int) -> None:
|
||||
"""Delete an IKE policy."""
|
||||
self.client.delete(f'{self.base_endpoint}/ike-policies', id)
|
||||
|
||||
# ==================== IPSec Proposals ====================
|
||||
|
||||
async def list_ipsec_proposals(self, name: Optional[str] = None, **kwargs) -> List[Dict]:
|
||||
"""List all IPSec proposals."""
|
||||
params = {k: v for k, v in {'name': name, **kwargs}.items() if v is not None}
|
||||
return self.client.list(f'{self.base_endpoint}/ipsec-proposals', params=params)
|
||||
|
||||
async def get_ipsec_proposal(self, id: int) -> Dict:
|
||||
"""Get a specific IPSec proposal by ID."""
|
||||
return self.client.get(f'{self.base_endpoint}/ipsec-proposals', id)
|
||||
|
||||
async def create_ipsec_proposal(
|
||||
self,
|
||||
name: str,
|
||||
encryption_algorithm: str,
|
||||
authentication_algorithm: str,
|
||||
sa_lifetime_seconds: Optional[int] = None,
|
||||
sa_lifetime_data: Optional[int] = None,
|
||||
description: Optional[str] = None,
|
||||
**kwargs
|
||||
) -> Dict:
|
||||
"""Create a new IPSec proposal."""
|
||||
data = {
|
||||
'name': name, 'encryption_algorithm': encryption_algorithm,
|
||||
'authentication_algorithm': authentication_algorithm, **kwargs
|
||||
}
|
||||
for key, val in [
|
||||
('sa_lifetime_seconds', sa_lifetime_seconds),
|
||||
('sa_lifetime_data', sa_lifetime_data), ('description', description)
|
||||
]:
|
||||
if val is not None:
|
||||
data[key] = val
|
||||
return self.client.create(f'{self.base_endpoint}/ipsec-proposals', data)
|
||||
|
||||
async def update_ipsec_proposal(self, id: int, **kwargs) -> Dict:
|
||||
"""Update an IPSec proposal."""
|
||||
return self.client.patch(f'{self.base_endpoint}/ipsec-proposals', id, kwargs)
|
||||
|
||||
async def delete_ipsec_proposal(self, id: int) -> None:
|
||||
"""Delete an IPSec proposal."""
|
||||
self.client.delete(f'{self.base_endpoint}/ipsec-proposals', id)
|
||||
|
||||
# ==================== IPSec Policies ====================
|
||||
|
||||
async def list_ipsec_policies(self, name: Optional[str] = None, **kwargs) -> List[Dict]:
|
||||
"""List all IPSec policies."""
|
||||
params = {k: v for k, v in {'name': name, **kwargs}.items() if v is not None}
|
||||
return self.client.list(f'{self.base_endpoint}/ipsec-policies', params=params)
|
||||
|
||||
async def get_ipsec_policy(self, id: int) -> Dict:
|
||||
"""Get a specific IPSec policy by ID."""
|
||||
return self.client.get(f'{self.base_endpoint}/ipsec-policies', id)
|
||||
|
||||
async def create_ipsec_policy(
|
||||
self,
|
||||
name: str,
|
||||
proposals: List[int],
|
||||
pfs_group: Optional[int] = None,
|
||||
description: Optional[str] = None,
|
||||
**kwargs
|
||||
) -> Dict:
|
||||
"""Create a new IPSec policy."""
|
||||
data = {'name': name, 'proposals': proposals, **kwargs}
|
||||
if pfs_group:
|
||||
data['pfs_group'] = pfs_group
|
||||
if description:
|
||||
data['description'] = description
|
||||
return self.client.create(f'{self.base_endpoint}/ipsec-policies', data)
|
||||
|
||||
async def update_ipsec_policy(self, id: int, **kwargs) -> Dict:
|
||||
"""Update an IPSec policy."""
|
||||
return self.client.patch(f'{self.base_endpoint}/ipsec-policies', id, kwargs)
|
||||
|
||||
async def delete_ipsec_policy(self, id: int) -> None:
|
||||
"""Delete an IPSec policy."""
|
||||
self.client.delete(f'{self.base_endpoint}/ipsec-policies', id)
|
||||
|
||||
# ==================== IPSec Profiles ====================
|
||||
|
||||
async def list_ipsec_profiles(self, name: Optional[str] = None, **kwargs) -> List[Dict]:
|
||||
"""List all IPSec profiles."""
|
||||
params = {k: v for k, v in {'name': name, **kwargs}.items() if v is not None}
|
||||
return self.client.list(f'{self.base_endpoint}/ipsec-profiles', params=params)
|
||||
|
||||
async def get_ipsec_profile(self, id: int) -> Dict:
|
||||
"""Get a specific IPSec profile by ID."""
|
||||
return self.client.get(f'{self.base_endpoint}/ipsec-profiles', id)
|
||||
|
||||
async def create_ipsec_profile(
|
||||
self,
|
||||
name: str,
|
||||
mode: str,
|
||||
ike_policy: int,
|
||||
ipsec_policy: int,
|
||||
description: Optional[str] = None,
|
||||
**kwargs
|
||||
) -> Dict:
|
||||
"""Create a new IPSec profile."""
|
||||
data = {'name': name, 'mode': mode, 'ike_policy': ike_policy, 'ipsec_policy': ipsec_policy, **kwargs}
|
||||
if description:
|
||||
data['description'] = description
|
||||
return self.client.create(f'{self.base_endpoint}/ipsec-profiles', data)
|
||||
|
||||
async def update_ipsec_profile(self, id: int, **kwargs) -> Dict:
|
||||
"""Update an IPSec profile."""
|
||||
return self.client.patch(f'{self.base_endpoint}/ipsec-profiles', id, kwargs)
|
||||
|
||||
async def delete_ipsec_profile(self, id: int) -> None:
|
||||
"""Delete an IPSec profile."""
|
||||
self.client.delete(f'{self.base_endpoint}/ipsec-profiles', id)
|
||||
|
||||
# ==================== L2VPN ====================
|
||||
|
||||
async def list_l2vpns(
|
||||
self,
|
||||
name: Optional[str] = None,
|
||||
slug: Optional[str] = None,
|
||||
type: Optional[str] = None,
|
||||
tenant_id: Optional[int] = None,
|
||||
**kwargs
|
||||
) -> List[Dict]:
|
||||
"""List all L2VPNs with optional filtering."""
|
||||
params = {k: v for k, v in {
|
||||
'name': name, 'slug': slug, 'type': type, 'tenant_id': tenant_id, **kwargs
|
||||
}.items() if v is not None}
|
||||
return self.client.list(f'{self.base_endpoint}/l2vpns', params=params)
|
||||
|
||||
async def get_l2vpn(self, id: int) -> Dict:
|
||||
"""Get a specific L2VPN by ID."""
|
||||
return self.client.get(f'{self.base_endpoint}/l2vpns', id)
|
||||
|
||||
async def create_l2vpn(
|
||||
self,
|
||||
name: str,
|
||||
slug: str,
|
||||
type: str,
|
||||
identifier: Optional[int] = None,
|
||||
tenant: Optional[int] = None,
|
||||
description: Optional[str] = None,
|
||||
import_targets: Optional[List[int]] = None,
|
||||
export_targets: Optional[List[int]] = None,
|
||||
**kwargs
|
||||
) -> Dict:
|
||||
"""Create a new L2VPN."""
|
||||
data = {'name': name, 'slug': slug, 'type': type, **kwargs}
|
||||
for key, val in [
|
||||
('identifier', identifier), ('tenant', tenant), ('description', description),
|
||||
('import_targets', import_targets), ('export_targets', export_targets)
|
||||
]:
|
||||
if val is not None:
|
||||
data[key] = val
|
||||
return self.client.create(f'{self.base_endpoint}/l2vpns', data)
|
||||
|
||||
async def update_l2vpn(self, id: int, **kwargs) -> Dict:
|
||||
"""Update an L2VPN."""
|
||||
return self.client.patch(f'{self.base_endpoint}/l2vpns', id, kwargs)
|
||||
|
||||
async def delete_l2vpn(self, id: int) -> None:
|
||||
"""Delete an L2VPN."""
|
||||
self.client.delete(f'{self.base_endpoint}/l2vpns', id)
|
||||
|
||||
# ==================== L2VPN Terminations ====================
|
||||
|
||||
async def list_l2vpn_terminations(
|
||||
self,
|
||||
l2vpn_id: Optional[int] = None,
|
||||
**kwargs
|
||||
) -> List[Dict]:
|
||||
"""List all L2VPN terminations."""
|
||||
params = {k: v for k, v in {'l2vpn_id': l2vpn_id, **kwargs}.items() if v is not None}
|
||||
return self.client.list(f'{self.base_endpoint}/l2vpn-terminations', params=params)
|
||||
|
||||
async def get_l2vpn_termination(self, id: int) -> Dict:
|
||||
"""Get a specific L2VPN termination by ID."""
|
||||
return self.client.get(f'{self.base_endpoint}/l2vpn-terminations', id)
|
||||
|
||||
async def create_l2vpn_termination(
|
||||
self,
|
||||
l2vpn: int,
|
||||
assigned_object_type: str,
|
||||
assigned_object_id: int,
|
||||
**kwargs
|
||||
) -> Dict:
|
||||
"""Create a new L2VPN termination."""
|
||||
data = {
|
||||
'l2vpn': l2vpn, 'assigned_object_type': assigned_object_type,
|
||||
'assigned_object_id': assigned_object_id, **kwargs
|
||||
}
|
||||
return self.client.create(f'{self.base_endpoint}/l2vpn-terminations', data)
|
||||
|
||||
async def update_l2vpn_termination(self, id: int, **kwargs) -> Dict:
|
||||
"""Update an L2VPN termination."""
|
||||
return self.client.patch(f'{self.base_endpoint}/l2vpn-terminations', id, kwargs)
|
||||
|
||||
async def delete_l2vpn_termination(self, id: int) -> None:
|
||||
"""Delete an L2VPN termination."""
|
||||
self.client.delete(f'{self.base_endpoint}/l2vpn-terminations', id)
|
||||
Reference in New Issue
Block a user