Python编写的高效网络扫描工具及其原理解析

2023-04-17 00:00:00 高效 解析 编写

一、介绍

网络扫描工具是网络安全和管理中必不可少的一项工具,在网络维护和网络安全中扮演着非常重要的角色。在日常工作中,我们经常需要对局域网或互联网上的主机进行扫描,来获得主机的一些基本信息或发现潜在的网络安全隐患。但是,如果手动进行扫描,工作量会非常大,效率也会比较低。因此,编写一个高效的网络扫描工具非常必要。本文介绍了Python编写的高效网络扫描工具及其原理解析,希望能对读者有所帮助。

二、原理

本文介绍的网络扫描工具基于Python编写,采用了常用的socket编程和多线程技术。本文主要介绍如何使用Python编写高效的端口扫描工具和子网扫描工具。

  1. 端口扫描工具

端口扫描工具是一种常用的网络扫描工具,主要用于检查远程主机上的开放端口。一些常见的网络服务(如HTTP、FTP、SSH等)都运行在特定的端口上,因此端口扫描工具可以用来发现主机上的网络服务。

下面是Python编写的一个简单的端口扫描示例:

import socket

host = "pidancode.com"
port_list = [22, 80, 443, 3389, 5432, 5900]

for port in port_list:
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.settimeout(2)
        s.connect((host, port))
        print("{}:{} is open".format(host, port))
        s.close()
    except socket.timeout:
        print("{}:{} is closed (timeout)".format(host, port))
    except socket.error:
        print("{}:{} is closed (socket error)".format(host, port))

上面的代码中,首先定义了一个host和一个包含多个端口的列表port_list。然后,遍历这个端口列表,对每个端口进行扫描。对于每个端口,建立一个客户端socket连接,连接超时时间设为2秒。如果连接成功,说明该端口为开放状态,输出提示信息。否则,输出该端口为关闭状态的提示信息。

  1. 子网扫描工具

子网扫描工具是另一种常用的网络扫描工具,主要用于检查指定IP地址范围内的主机是否存活。在指定子网范围时,我们通常使用CIDR(无类域间路由,Classless Inter-Domain Routing)表示法。例如,192.168.0.0/24表示一个以192.168.0为网络地址,子网掩码为255.255.255.0的子网。

下面是Python编写的一个简单的子网扫描示例:

import socket
import struct
import threading

class PingThread(threading.Thread):
    def __init__(self, ip, callback):
        threading.Thread.__init__(self)
        self.ip = ip
        self.callback = callback

    def run(self):
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.settimeout(1)
            s.connect((self.ip, 80))
            self.callback(self.ip, True)
        except (socket.timeout, OSError):
            self.callback(self.ip, False)
        except:
            pass

def scan_subnets(subnets, thread_num=20):
    ip_list = []
    for subnet in subnets:
        subnet_ip, subnet_mask = subnet.split("/")
        host_num = 2 ** (32 - int(subnet_mask)) - 2
        net_ip = struct.unpack("!I", socket.inet_aton(subnet_ip))[0]
        for i in range(host_num):
            ip = socket.inet_ntoa(struct.pack("!I", net_ip + i + 1))
            ip_list.append(ip)

    results = []
    def ping_callback(ip, is_live):
        if is_live:
            results.append(ip)

    threads = []
    for ip in ip_list:
        thread = PingThread(ip, ping_callback)
        thread.start()
        threads.append(thread)
        if len(threads) >= thread_num:
            for thread in threads:
                thread.join()
            threads = []

    for thread in threads:
        thread.join()

    return results

上面的代码中,首先定义了一个PingThread类,它继承自threading.Thread类。在run方法中,PingThread类会建立一个客户端socket连接,向目标IP地址的80端口发送HTTP请求。如果连接成功,则说明该主机存在,调用回调函数callback并传入True表示该主机存在。否则,说明该主机不存在,调用回调函数callback并传入False表示该主机不存在。

然后,定义了一个scan_subnets函数,这个函数接受一个子网列表subnets和一个线程数thread_num作为参数。在函数内部,遍历子网列表,计算出子网内主机的IP地址范围。然后,对每个IP地址开启一个线程来执行PingThread类的实例。每个线程执行时,将PingThread的实例和回调函数传入,以便获取回调函数的调用结果。

最后,将所有的线程join,等待线程执行完成,返回活跃的主机IP地址列表。

三、代码演示

使用上面的代码演示端口扫描和子网扫描。

  1. 端口扫描

使用上面介绍的Python代码,扫描pidancode.com的端口:

import socket

host = "pidancode.com"
port_list = [22, 80, 443, 3389, 5432, 5900]

for port in port_list:
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.settimeout(2)
        s.connect((host, port))
        print("{}:{} is open".format(host, port))
        s.close()
    except socket.timeout:
        print("{}:{} is closed (timeout)".format(host, port))
    except socket.error:
        print("{}:{} is closed (socket error)".format(host, port))

输出结果如下:

pidancode.com:22 is closed (socket error)
pidancode.com:80 is open
pidancode.com:443 is open
pidancode.com:3389 is closed (socket error)
pidancode.com:5432 is closed (socket error)
pidancode.com:5900 is closed (socket error)

可以看到,pidancode.com开放了80和443端口,其他端口均为关闭状态。

  1. 子网扫描

使用上面介绍的Python代码,扫描192.168.0.0/24的子网:

import socket
import struct
import threading

class PingThread(threading.Thread):
    def __init__(self, ip, callback):
        threading.Thread.__init__(self)
        self.ip = ip
        self.callback = callback

    def run(self):
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.settimeout(1)
            s.connect((self.ip, 80))
            self.callback(self.ip, True)
        except (socket.timeout, OSError):
            self.callback(self.ip, False)
        except:
            pass

def scan_subnets(subnets, thread_num=20):
    ip_list = []
    for subnet in subnets:
        subnet_ip, subnet_mask = subnet.split("/")
        host_num = 2 ** (32 - int(subnet_mask)) - 2
        net_ip = struct.unpack("!I", socket.inet_aton(subnet_ip))[0]
        for i in range(host_num):
            ip = socket.inet_ntoa(struct.pack("!I", net_ip + i + 1))
            ip_list.append(ip)

    results = []
    def ping_callback(ip, is_live):
        if is_live:
            results.append(ip)

    threads = []
    for ip in ip_list:
        thread = PingThread(ip, ping_callback)
        thread.start()
        threads.append(thread)
        if len(threads) >= thread_num:
            for thread in threads:
                thread.join()
            threads = []

    for thread in threads:
        thread.join()

    return results

subnets = ["192.168.0.0/24"]
result = scan_subnets(subnets)
print("active hosts in {}: {}".format(subnets[0], result))

输出结果如下:

active hosts in 192.168.0.0/24: ['192.168.0.1']

可以看到,仅有一个主机192.168.0.1处于活跃状态。

相关文章