Spaces:
Sleeping
Sleeping
| import os | |
| import platform | |
| import socket | |
| import subprocess | |
| import time | |
| from dns import resolver | |
| import psutil | |
| import requests | |
| from scapy.all import ( | |
| ARP, | |
| Ether, | |
| ICMP, | |
| IP, | |
| IPerror, | |
| TCP, | |
| UDP, | |
| sniff, | |
| sr1, | |
| srp, | |
| traceroute, | |
| ) | |
| class NetworkDiagnosticSkill: | |
| def __init__(self): | |
| self.os = platform.system() | |
| def ping(self, target_ip: str, packet_size: int = 56, count: int = 1, timeout: int = 1) -> str: | |
| """ | |
| Perform a ping operation on the target IP address. | |
| Args: | |
| target_ip (str): The IP address to ping. | |
| packet_size (int): Size of the ping packet (default: 56 bytes). | |
| count (int): Number of ping packets to send (default: 1). | |
| timeout (int): Timeout in seconds for each packet (default: 1). | |
| Returns: | |
| str: Plain text output of the ping results. | |
| """ | |
| if self.os == "Windows": | |
| ping_cmd = f"ping -n {count} -l {packet_size} -w {timeout*1000} {target_ip}" | |
| else: | |
| ping_cmd = f"ping -c {count} -s {packet_size} -W {timeout} {target_ip}" | |
| try: | |
| result = subprocess.run(ping_cmd, shell=True, check=True, text=True, capture_output=True) | |
| return result.stdout | |
| except subprocess.CalledProcessError as e: | |
| return f"Error: {e.stderr}" | |
| def traceroute(self, target_ip: str, max_hops: int = 30, packet_size: int = 40) -> str: | |
| """ | |
| Perform a traceroute operation to the target IP address. | |
| Args: | |
| target_ip (str): The IP address to trace the route to. | |
| max_hops (int): Maximum number of hops to attempt (default: 30). | |
| packet_size (int): Size of the probe packets (default: 40 bytes). | |
| Returns: | |
| str: Plain text output of the traceroute results. | |
| """ | |
| # Note: scapy's traceroute might require root/admin privileges | |
| try: | |
| ans, unans = traceroute(target_ip, maxttl=max_hops, psize=packet_size, verbose=0) | |
| output = "Traceroute Results:\n" | |
| # Process ans for a more standard output if needed, here's a simple way | |
| hops = {} | |
| for snd, rcv in ans: | |
| if snd.ttl not in hops: | |
| hops[snd.ttl] = rcv.src | |
| for ttl in sorted(hops.keys()): | |
| output += f"Hop {ttl}: {hops[ttl]}\n" | |
| if not hops and unans: # If no successful hops, mention it | |
| output += "No successful hops. Check target or permissions.\n" | |
| return output | |
| except Exception as e: | |
| return f"Traceroute failed: {e}. (Scapy traceroute might require root/admin privileges)" | |
| def help(self) -> str: | |
| """ | |
| Return help text for the network diagnostic skill. | |
| """ | |
| return ( | |
| "Network Diagnostic Skill\n" | |
| "-----------------------\n" | |
| "1. ping(target_ip, packet_size=56, count=1, timeout=1)\n" | |
| " Perform a ping operation on the target IP address.\n" | |
| "2. traceroute(target_ip, max_hops=30, packet_size=40)\n" | |
| " Perform a traceroute operation to the target IP address.\n" | |
| ) | |
| def describe(self) -> str: | |
| """ | |
| Return a description of the skill. | |
| """ | |
| return ( | |
| "This skill provides network diagnostic capabilities including ping and traceroute.\n" | |
| "It supports cross-platform operations and allows customization of packet size,\n" | |
| "number of packets, and timeout values." | |
| ) | |
| class DNSLookupSkill: | |
| def lookup(self, domain, record_type='A', dns_server='8.8.8.8'): | |
| try: | |
| res = resolver.Resolver() | |
| res.nameservers = [dns_server] | |
| answer = res.resolve(domain, record_type) # Changed from query to resolve for consistency | |
| return f"DNS {record_type} records for {domain}:\n" + '\n'.join([str(r) for r in answer]) | |
| except Exception as e: | |
| return f"DNS lookup failed: {e}" | |
| class PortScannerSkill: | |
| def scan(self, target_ip, start_port=1, end_port=1024): | |
| open_ports = [] | |
| try: | |
| for port in range(start_port, end_port + 1): | |
| # Constructing IP/TCP packet for port scanning | |
| # SYN packet is sent (flags='S') | |
| packet = IP(dst=target_ip)/TCP(dport=port, flags='S') | |
| response = sr1(packet, timeout=1, verbose=0) # sr1 sends and receives one packet | |
| if response and response.haslayer(TCP): | |
| # Check for SYN-ACK response (flags=0x12 or 'SA') | |
| if response.getlayer(TCP).flags == 0x12: | |
| open_ports.append(port) | |
| # Send RST to close the connection | |
| rst_packet = IP(dst=target_ip)/TCP(dport=port, sport=response.getlayer(TCP).dport, seq=response.getlayer(TCP).ack, ack=response.getlayer(TCP).seq + 1, flags='R') | |
| sr1(rst_packet, timeout=1, verbose=0) | |
| # Check for RST-ACK response (flags=0x14 or 'RA'), also indicates port is closed but reachable | |
| # elif response.getlayer(TCP).flags == 0x14: | |
| # pass # Port is closed | |
| if open_ports: | |
| return f"Open ports on {target_ip}: {open_ports}" | |
| else: | |
| return f"No open ports found on {target_ip} in range {start_port}-{end_port}." | |
| except Exception as e: | |
| return f"Port scan failed: {e}" | |
| class NetworkInterfaceSkill: | |
| def get_info(self): | |
| try: | |
| info = {} | |
| for interface, snics in psutil.net_if_addrs().items(): | |
| info[interface] = [] | |
| for snic in snics: | |
| info[interface].append({ | |
| 'family': str(snic.family), | |
| 'address': snic.address, | |
| 'netmask': snic.netmask, | |
| 'broadcast': snic.broadcast | |
| }) | |
| return info | |
| except Exception as e: | |
| return f"Failed to get interface info: {e}" | |
| class BandwidthTestSkill: | |
| def test(self, download_url='http://speedtest.ftp.otenet.gr/files/test100Mb.db', upload_url='http://httpbin.org/post'): | |
| try: | |
| # Download test | |
| start_time = time.time() | |
| response = requests.get(download_url, stream=True) | |
| size = 0 | |
| for chunk in response.iter_content(1024): | |
| size += len(chunk) | |
| download_time = time.time() - start_time | |
| download_speed_mbps = (size * 8 / (1024 * 1024)) / download_time if download_time > 0 else 0 # in Mbps | |
| # Upload test - creating a 1MB dummy payload | |
| dummy_payload_size = 1 * 1024 * 1024 # 1MB | |
| dummy_payload = {'file': ('dummy.bin', b'0' * dummy_payload_size)} | |
| start_time = time.time() | |
| requests.post(upload_url, files=dummy_payload) | |
| upload_time = time.time() - start_time | |
| upload_speed_mbps = (dummy_payload_size * 8 / (1024 * 1024)) / upload_time if upload_time > 0 else 0 # in Mbps | |
| return f"Download Speed: {download_speed_mbps:.2f} Mbps\nUpload Speed: {upload_speed_mbps:.2f} Mbps" | |
| except Exception as e: | |
| return f"Bandwidth test failed: {e}" | |
| class PacketSnifferSkill: | |
| def sniff(self, filter_expr='', count=10, timeout=None): # Added timeout | |
| try: | |
| # Ensure user knows this might need privileges | |
| print("Attempting to sniff packets. This may require administrative/root privileges.") | |
| packets = sniff(filter=filter_expr, count=count, timeout=timeout) | |
| if packets: | |
| return "\n".join([packet.summary() for packet in packets]) | |
| else: | |
| return "No packets captured." | |
| except PermissionError: | |
| return "Packet sniffing failed: Permission denied. Please run as administrator/root." | |
| except Exception as e: | |
| return f"Packet sniffing failed: {e}" | |
| class ARPScanSkill: | |
| def scan(self, ip_range='192.168.1.0/24'): | |
| clients = [] | |
| try: | |
| arp_request = ARP(pdst=ip_range) | |
| broadcast = Ether(dst='ff:ff:ff:ff:ff:ff') | |
| arp_request_broadcast = broadcast/arp_request | |
| answered_list = srp(arp_request_broadcast, timeout=1, verbose=False)[0] | |
| for sent, received in answered_list: | |
| clients.append({'ip': received.psrc, 'mac': received.hwsrc}) | |
| if clients: | |
| return clients | |
| else: | |
| return f"No devices found in range {ip_range}." | |
| except Exception as e: | |
| return f"ARP scan failed: {e}" | |
| class TCPConnectionTestSkill: | |
| def test(self, host: str, port: int, timeout: int = 5) -> str: | |
| """ | |
| Test TCP connection to a host and port. | |
| """ | |
| try: | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| sock.settimeout(timeout) | |
| result = sock.connect_ex((host, port)) | |
| if result == 0: | |
| return f"Successfully connected to {host} on port {port}." | |
| else: | |
| return f"Failed to connect to {host} on port {port}. Error: {os.strerror(result)}" | |
| except socket.gaierror: | |
| return f"Failed to resolve hostname {host}." | |
| except Exception as e: | |
| return f"TCP connection test failed: {e}" | |
| finally: | |
| if 'sock' in locals(): | |
| sock.close() | |
| class LatencyMonitorSkill: | |
| def monitor(self, target_ip: str, interval: int = 60, duration: int = 3600) -> list: | |
| """ | |
| Monitors latency to a target IP over a duration. | |
| This is a placeholder and would typically involve repeated pings. | |
| """ | |
| # This is a simplified placeholder. A real implementation would | |
| # involve a loop, sleeping, and pinging, then collecting results. | |
| results = [] | |
| end_time = time.time() + duration | |
| count = 0 | |
| nd_skill = NetworkDiagnosticSkill() | |
| print(f"Starting latency monitoring for {target_ip} for {duration}s with {interval}s interval.") | |
| while time.time() < end_time: | |
| count += 1 | |
| ping_result = nd_skill.ping(target_ip, count=1, timeout=max(1, interval -1)) # Ensure timeout is less than interval | |
| results.append(f"Ping {count} at {time.strftime('%Y-%m-%d %H:%M:%S')}: {ping_result.strip()}") | |
| if time.time() + interval < end_time: | |
| time.sleep(interval) | |
| else: | |
| break # Avoid sleeping past duration | |
| return results if results else ["No latency data collected."] | |
| class RouteTableSkill: | |
| def get_routes(self) -> list: | |
| """ | |
| Retrieves the system's route table. | |
| This is a placeholder. A real implementation would parse OS-specific commands. | |
| """ | |
| routes = [] | |
| try: | |
| if platform.system() == "Windows": | |
| process = subprocess.Popen(['route', 'print', '-4'], stdout=subprocess.PIPE, text=True) # -4 for IPv4 | |
| else: # Linux/macOS | |
| process = subprocess.Popen(['netstat', '-rn'], stdout=subprocess.PIPE, text=True) # -r for routing table, -n for numerical | |
| stdout, stderr = process.communicate() | |
| if process.returncode == 0: | |
| # Basic parsing, this will need to be much more robust for actual use | |
| lines = stdout.strip().split('\n') | |
| # Example placeholder parsing (highly dependent on OS and output format) | |
| if platform.system() == "Windows": | |
| # Find the start of the IPv4 Route Table | |
| try: | |
| start_index = lines.index("IPv4 Route Table") +3 # Skip headers | |
| active_routes_section = True | |
| for line in lines[start_index:]: | |
| if "Persistent Routes:" in line or "IPv6 Route Table" in line : | |
| active_routes_section = False | |
| if not active_routes_section or not line.strip() or "Interface List" in line: | |
| continue | |
| parts = line.split() | |
| if len(parts) >= 4 and parts[0] != "Network": # Basic check | |
| routes.append({ | |
| 'destination': parts[0], | |
| 'netmask': parts[1], | |
| 'gateway': parts[2], | |
| 'interface': parts[3] | |
| }) | |
| except ValueError: | |
| routes.append({'error': 'Could not parse IPv4 route table section'}) | |
| else: # Linux/macOS | |
| for line in lines: | |
| parts = line.split() | |
| if len(parts) > 3 and (parts[0] != 'Destination' and parts[0] != 'Kernel'): # Basic check | |
| if parts[0] == "default": | |
| dest = "0.0.0.0" | |
| gateway = parts[1] | |
| netmask = "0.0.0.0" # Often not shown directly for default | |
| iface = parts[-1] # Interface is usually the last column | |
| elif len(parts) >= 8 and platform.system() == "Linux": # Linux 'netstat -rn' example | |
| dest = parts[0] | |
| gateway = parts[1] | |
| netmask = parts[2] | |
| iface = parts[7] | |
| elif len(parts) >= 6 and platform.system() == "Darwin": # macOS 'netstat -rn' example | |
| dest = parts[0] | |
| gateway = parts[1] | |
| # Netmask not directly in this macOS output, usually found via ifconfig or ipconfig | |
| netmask = "N/A" | |
| iface = parts[3] if len(parts) <= 4 else parts[5] # Interface column can shift | |
| else: | |
| continue | |
| routes.append({ | |
| 'destination': dest, | |
| 'gateway': gateway, | |
| 'netmask': netmask, | |
| 'interface': iface | |
| }) | |
| else: | |
| routes.append({'error': f'Command failed: {stderr if stderr else "Unknown error"}'}) | |
| except FileNotFoundError: | |
| routes.append({'error': 'netstat or route command not found.'}) | |
| except Exception as e: | |
| routes.append({'error': f'Failed to get routes: {str(e)}'}) | |
| if not routes: | |
| return [{'destination': 'N/A', 'gateway': 'N/A', 'netmask': 'N/A', 'interface': 'N/A', 'status': 'No routes found or failed to parse'}] | |
| return routes |