newnew / network_diagnostic_skills.py
joebruce1313's picture
Upload 5 files
bf78963 verified
import os
import platform
import socket
import subprocess
import time
from dns import resolver
import psutil
import requests
from scapy.all import (
ARP,
Ether,
ICMP,
IP,
IPerror,
TCP,
UDP,
sniff,
sr1,
srp,
traceroute,
)
class NetworkDiagnosticSkill:
def __init__(self):
self.os = platform.system()
def ping(self, target_ip: str, packet_size: int = 56, count: int = 1, timeout: int = 1) -> str:
"""
Perform a ping operation on the target IP address.
Args:
target_ip (str): The IP address to ping.
packet_size (int): Size of the ping packet (default: 56 bytes).
count (int): Number of ping packets to send (default: 1).
timeout (int): Timeout in seconds for each packet (default: 1).
Returns:
str: Plain text output of the ping results.
"""
if self.os == "Windows":
ping_cmd = f"ping -n {count} -l {packet_size} -w {timeout*1000} {target_ip}"
else:
ping_cmd = f"ping -c {count} -s {packet_size} -W {timeout} {target_ip}"
try:
result = subprocess.run(ping_cmd, shell=True, check=True, text=True, capture_output=True)
return result.stdout
except subprocess.CalledProcessError as e:
return f"Error: {e.stderr}"
def traceroute(self, target_ip: str, max_hops: int = 30, packet_size: int = 40) -> str:
"""
Perform a traceroute operation to the target IP address.
Args:
target_ip (str): The IP address to trace the route to.
max_hops (int): Maximum number of hops to attempt (default: 30).
packet_size (int): Size of the probe packets (default: 40 bytes).
Returns:
str: Plain text output of the traceroute results.
"""
# Note: scapy's traceroute might require root/admin privileges
try:
ans, unans = traceroute(target_ip, maxttl=max_hops, psize=packet_size, verbose=0)
output = "Traceroute Results:\n"
# Process ans for a more standard output if needed, here's a simple way
hops = {}
for snd, rcv in ans:
if snd.ttl not in hops:
hops[snd.ttl] = rcv.src
for ttl in sorted(hops.keys()):
output += f"Hop {ttl}: {hops[ttl]}\n"
if not hops and unans: # If no successful hops, mention it
output += "No successful hops. Check target or permissions.\n"
return output
except Exception as e:
return f"Traceroute failed: {e}. (Scapy traceroute might require root/admin privileges)"
def help(self) -> str:
"""
Return help text for the network diagnostic skill.
"""
return (
"Network Diagnostic Skill\n"
"-----------------------\n"
"1. ping(target_ip, packet_size=56, count=1, timeout=1)\n"
" Perform a ping operation on the target IP address.\n"
"2. traceroute(target_ip, max_hops=30, packet_size=40)\n"
" Perform a traceroute operation to the target IP address.\n"
)
def describe(self) -> str:
"""
Return a description of the skill.
"""
return (
"This skill provides network diagnostic capabilities including ping and traceroute.\n"
"It supports cross-platform operations and allows customization of packet size,\n"
"number of packets, and timeout values."
)
class DNSLookupSkill:
def lookup(self, domain, record_type='A', dns_server='8.8.8.8'):
try:
res = resolver.Resolver()
res.nameservers = [dns_server]
answer = res.resolve(domain, record_type) # Changed from query to resolve for consistency
return f"DNS {record_type} records for {domain}:\n" + '\n'.join([str(r) for r in answer])
except Exception as e:
return f"DNS lookup failed: {e}"
class PortScannerSkill:
def scan(self, target_ip, start_port=1, end_port=1024):
open_ports = []
try:
for port in range(start_port, end_port + 1):
# Constructing IP/TCP packet for port scanning
# SYN packet is sent (flags='S')
packet = IP(dst=target_ip)/TCP(dport=port, flags='S')
response = sr1(packet, timeout=1, verbose=0) # sr1 sends and receives one packet
if response and response.haslayer(TCP):
# Check for SYN-ACK response (flags=0x12 or 'SA')
if response.getlayer(TCP).flags == 0x12:
open_ports.append(port)
# Send RST to close the connection
rst_packet = IP(dst=target_ip)/TCP(dport=port, sport=response.getlayer(TCP).dport, seq=response.getlayer(TCP).ack, ack=response.getlayer(TCP).seq + 1, flags='R')
sr1(rst_packet, timeout=1, verbose=0)
# Check for RST-ACK response (flags=0x14 or 'RA'), also indicates port is closed but reachable
# elif response.getlayer(TCP).flags == 0x14:
# pass # Port is closed
if open_ports:
return f"Open ports on {target_ip}: {open_ports}"
else:
return f"No open ports found on {target_ip} in range {start_port}-{end_port}."
except Exception as e:
return f"Port scan failed: {e}"
class NetworkInterfaceSkill:
def get_info(self):
try:
info = {}
for interface, snics in psutil.net_if_addrs().items():
info[interface] = []
for snic in snics:
info[interface].append({
'family': str(snic.family),
'address': snic.address,
'netmask': snic.netmask,
'broadcast': snic.broadcast
})
return info
except Exception as e:
return f"Failed to get interface info: {e}"
class BandwidthTestSkill:
def test(self, download_url='http://speedtest.ftp.otenet.gr/files/test100Mb.db', upload_url='http://httpbin.org/post'):
try:
# Download test
start_time = time.time()
response = requests.get(download_url, stream=True)
size = 0
for chunk in response.iter_content(1024):
size += len(chunk)
download_time = time.time() - start_time
download_speed_mbps = (size * 8 / (1024 * 1024)) / download_time if download_time > 0 else 0 # in Mbps
# Upload test - creating a 1MB dummy payload
dummy_payload_size = 1 * 1024 * 1024 # 1MB
dummy_payload = {'file': ('dummy.bin', b'0' * dummy_payload_size)}
start_time = time.time()
requests.post(upload_url, files=dummy_payload)
upload_time = time.time() - start_time
upload_speed_mbps = (dummy_payload_size * 8 / (1024 * 1024)) / upload_time if upload_time > 0 else 0 # in Mbps
return f"Download Speed: {download_speed_mbps:.2f} Mbps\nUpload Speed: {upload_speed_mbps:.2f} Mbps"
except Exception as e:
return f"Bandwidth test failed: {e}"
class PacketSnifferSkill:
def sniff(self, filter_expr='', count=10, timeout=None): # Added timeout
try:
# Ensure user knows this might need privileges
print("Attempting to sniff packets. This may require administrative/root privileges.")
packets = sniff(filter=filter_expr, count=count, timeout=timeout)
if packets:
return "\n".join([packet.summary() for packet in packets])
else:
return "No packets captured."
except PermissionError:
return "Packet sniffing failed: Permission denied. Please run as administrator/root."
except Exception as e:
return f"Packet sniffing failed: {e}"
class ARPScanSkill:
def scan(self, ip_range='192.168.1.0/24'):
clients = []
try:
arp_request = ARP(pdst=ip_range)
broadcast = Ether(dst='ff:ff:ff:ff:ff:ff')
arp_request_broadcast = broadcast/arp_request
answered_list = srp(arp_request_broadcast, timeout=1, verbose=False)[0]
for sent, received in answered_list:
clients.append({'ip': received.psrc, 'mac': received.hwsrc})
if clients:
return clients
else:
return f"No devices found in range {ip_range}."
except Exception as e:
return f"ARP scan failed: {e}"
class TCPConnectionTestSkill:
def test(self, host: str, port: int, timeout: int = 5) -> str:
"""
Test TCP connection to a host and port.
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
if result == 0:
return f"Successfully connected to {host} on port {port}."
else:
return f"Failed to connect to {host} on port {port}. Error: {os.strerror(result)}"
except socket.gaierror:
return f"Failed to resolve hostname {host}."
except Exception as e:
return f"TCP connection test failed: {e}"
finally:
if 'sock' in locals():
sock.close()
class LatencyMonitorSkill:
def monitor(self, target_ip: str, interval: int = 60, duration: int = 3600) -> list:
"""
Monitors latency to a target IP over a duration.
This is a placeholder and would typically involve repeated pings.
"""
# This is a simplified placeholder. A real implementation would
# involve a loop, sleeping, and pinging, then collecting results.
results = []
end_time = time.time() + duration
count = 0
nd_skill = NetworkDiagnosticSkill()
print(f"Starting latency monitoring for {target_ip} for {duration}s with {interval}s interval.")
while time.time() < end_time:
count += 1
ping_result = nd_skill.ping(target_ip, count=1, timeout=max(1, interval -1)) # Ensure timeout is less than interval
results.append(f"Ping {count} at {time.strftime('%Y-%m-%d %H:%M:%S')}: {ping_result.strip()}")
if time.time() + interval < end_time:
time.sleep(interval)
else:
break # Avoid sleeping past duration
return results if results else ["No latency data collected."]
class RouteTableSkill:
def get_routes(self) -> list:
"""
Retrieves the system's route table.
This is a placeholder. A real implementation would parse OS-specific commands.
"""
routes = []
try:
if platform.system() == "Windows":
process = subprocess.Popen(['route', 'print', '-4'], stdout=subprocess.PIPE, text=True) # -4 for IPv4
else: # Linux/macOS
process = subprocess.Popen(['netstat', '-rn'], stdout=subprocess.PIPE, text=True) # -r for routing table, -n for numerical
stdout, stderr = process.communicate()
if process.returncode == 0:
# Basic parsing, this will need to be much more robust for actual use
lines = stdout.strip().split('\n')
# Example placeholder parsing (highly dependent on OS and output format)
if platform.system() == "Windows":
# Find the start of the IPv4 Route Table
try:
start_index = lines.index("IPv4 Route Table") +3 # Skip headers
active_routes_section = True
for line in lines[start_index:]:
if "Persistent Routes:" in line or "IPv6 Route Table" in line :
active_routes_section = False
if not active_routes_section or not line.strip() or "Interface List" in line:
continue
parts = line.split()
if len(parts) >= 4 and parts[0] != "Network": # Basic check
routes.append({
'destination': parts[0],
'netmask': parts[1],
'gateway': parts[2],
'interface': parts[3]
})
except ValueError:
routes.append({'error': 'Could not parse IPv4 route table section'})
else: # Linux/macOS
for line in lines:
parts = line.split()
if len(parts) > 3 and (parts[0] != 'Destination' and parts[0] != 'Kernel'): # Basic check
if parts[0] == "default":
dest = "0.0.0.0"
gateway = parts[1]
netmask = "0.0.0.0" # Often not shown directly for default
iface = parts[-1] # Interface is usually the last column
elif len(parts) >= 8 and platform.system() == "Linux": # Linux 'netstat -rn' example
dest = parts[0]
gateway = parts[1]
netmask = parts[2]
iface = parts[7]
elif len(parts) >= 6 and platform.system() == "Darwin": # macOS 'netstat -rn' example
dest = parts[0]
gateway = parts[1]
# Netmask not directly in this macOS output, usually found via ifconfig or ipconfig
netmask = "N/A"
iface = parts[3] if len(parts) <= 4 else parts[5] # Interface column can shift
else:
continue
routes.append({
'destination': dest,
'gateway': gateway,
'netmask': netmask,
'interface': iface
})
else:
routes.append({'error': f'Command failed: {stderr if stderr else "Unknown error"}'})
except FileNotFoundError:
routes.append({'error': 'netstat or route command not found.'})
except Exception as e:
routes.append({'error': f'Failed to get routes: {str(e)}'})
if not routes:
return [{'destination': 'N/A', 'gateway': 'N/A', 'netmask': 'N/A', 'interface': 'N/A', 'status': 'No routes found or failed to parse'}]
return routes