Python 端口扫描器的设计与优化
Python 端口扫描器的设计
- 端口扫描原理
端口是一种数字地址,用于标识网络中的通信终点。开放的端口可以提供服务,而关闭的端口则不能。端口扫描是利用计算机程序扫描指定主机的一个或多个端口来确定其是否处于开放状态,并找到哪些服务运行在这些端口上。
- 端口扫描器设计
端口扫描器的主要设计思路是根据指定的主机和端口列表来进行扫描,检查指定的端口是否处于开放状态,如果开放,则记录下来,并标明该端口所提供的服务类型。
以下是一个简单的端口扫描器的代码实现:
import socket def portscan(host, port): # 创建socket对象 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) # 连接目标主机 try: sock.connect((host, port)) print('[+] Port %d is open' % port) except: pass sock.close() if __name__ == '__main__': host = 'pidancode.com' ports = [21, 22, 80, 443, 8080] for port in ports: portscan(host, port)
在上面的代码中,我们先定义了一个 portscan
函数,它接受两个参数:host
和 port
。这个函数的逻辑很简单:首先创建一个 socket
对象,然后使用 connect
方法尝试连接目标主机的指定端口,如果连接成功,则说明该端口是开放的。
在 main
函数中,我们指定了待扫描的主机和端口列表,然后遍历端口列表,调用 portscan
函数对每个端口进行扫描。
- 单线程扫描的缺点
上面的代码中,我们采用的是单线程的方式进行扫描,一次只能扫描一个端口。如果我们要扫描多个主机的多个端口,那么就需要循环嵌套来实现,这会导致扫描速度特别慢。
为了解决这个问题,我们可以采用多线程或多进程的方式,从而提高扫描速度。
- 多线程扫描实现
下面是采用多线程实现的一个端口扫描器:
import socket import threading import queue def portscan(host, port): # 创建socket对象 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) # 连接目标主机 try: sock.connect((host, port)) print('[+] Port %d is open' % port) except: pass sock.close() def worker(q): while True: host, port = q.get() portscan(host, port) q.task_done() if __name__ == '__main__': hosts = ['pidancode.com', 'www.pidancode.com'] ports = [21, 22, 80, 443, 8080] q = queue.Queue() # 创建线程池 for i in range(10): t = threading.Thread(target=worker, args=(q,)) t.daemon = True t.start() # 添加任务到队列 for host in hosts: for port in ports: q.put((host, port)) # 等待所有任务完成 q.join()
这个端口扫描器采用多线程的方式,可以同时扫描多个主机的多个端口,从而提高扫描速度。为了实现多线程功能,我们使用了 Python 的 queue
模块,同时创建了一个线程池,来处理任务。在 worker
函数中,我们处理队列中的任务,将它们传递给 portscan
函数进行扫描。
- 优化扫描速度
上面采用的多线程方式,虽然提高了扫描速度,但仍然存在一些不足。其中一个瓶颈是 portscan
函数中的 connect
操作。由于 connect
操作是一个阻塞操作,它会等待连接超时或连接成功才会返回。因此,如果我们在扫描时碰到大量关闭的端口,那么 connect
操作就会成为一个很大的瓶颈,导致整个扫描过程特别慢。
为了解决这个问题,我们可以采用非阻塞技术,如 select
或 epoll
。这些特殊的 I/O 模型允许我们异步地向多个目标主机发送连接请求,而不会阻塞线程。当有主机的连接请求完成时,我们可以轮询检查这些连接是否完成,并记录哪些连接已经完成连接,然后对它们执行其他操作。
以下是采用非阻塞方式的一个优化版端口扫描器的例子:
import socket import threading import queue import select def portscan(host, port): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setblocking(False) try: sock.connect((host, port)) except: pass rlist, wlist, xlist = select.select([], [sock], [sock], 1) if sock in wlist: print('[+] Port %d is open' % port) sock.close() def worker(q): while True: host, port = q.get() portscan(host, port) q.task_done() if __name__ == '__main__': hosts = ['pidancode.com', 'www.pidancode.com'] ports = [21, 22, 80, 443, 8080] q = queue.Queue() for i in range(10): t = threading.Thread(target=worker, args=(q,)) t.daemon = True t.start() for host in hosts: for port in ports: q.put((host, port)) q.join()
在上面的代码实现中,我们首先创建了一个非阻塞的 socket 对象,然后通过 connect
方法来发送连接请求,但这不会阻塞线程。接着,我们用 select
函数来轮询检查这个 socket 是否已经连接成功。这个方法会阻塞直到出现下面的情况之一:
- 目标主机的连接请求已经完成,此时
sock
对象将出现在wlist
中; - 目标主机拒绝连接或连接超时,此时
sock
对象将出现在xlist
中; - 发生错误,此时
sock
对象将出现在xlist
中。
如果扫描对象的端口是开放的,那么 sock
对象会出现在 wlist
中,此时我们就可以认为这个端口是开放的。
相关文章