Python集成测试:使用pariko伪装ssh服务器,并记录给它的命令
问题描述
以下是上下文: 我正在编写一个通过SSH(使用pariko)连接到远程服务器的过程,执行一些命令,然后将命令的结果返回给该过程。据我所知,它工作得很好。
我现在正试图通过伪造ssh服务器进行"端到端"测试,以检查是否发送了ssh命令,以及它们的结果是否返回给该过程。
我发现了一个使用paraiko的伪SSH服务器,当我通过真正的SSH客户端连接到它时,它工作得很好,但当我尝试使用pariko ssh客户端连接到它时,我遇到了错误:/
你知道我做错了什么吗?
以下是我的代码:
服务器:
#!/usr/bin/env python
"""Fake SSH Server Utilizing Paramiko"""
import threading
import socket
import sys
import traceback
import paramiko
import os
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
LOG = open("log.txt", "a")
#HOST_KEY = paramiko.RSAKey(filename='keys/private.key')
HOST_KEY = paramiko.RSAKey(filename=os.path.join(__location__, 'id_rsa.new'))
PORT = 2200
def handle_cmd(cmd, chan):
"""Branching statements to handle and prepare a response for a command"""
response = ""
if cmd.startswith("sudo"):
send_ascii("sudo.txt", chan)
return
elif cmd.startswith("ls"):
response = "pw.txt"
elif cmd.startswith("version"):
response = "Super Amazing Awesome (tm) Shell v1.1"
elif cmd.startswith("pwd"):
response = "/home/clippy"
elif cmd.startswith("cd"):
send_ascii("cd.txt", chan)
return
elif cmd.startswith("cat"):
send_ascii("cat.txt", chan)
return
elif cmd.startswith("rm"):
send_ascii("bomb.txt", chan)
response = "You blew up our files! How could you???"
elif cmd.startswith("whoami"):
send_ascii("wizard.txt", chan)
response = "You are a wizard of the internet!"
elif ".exe" in cmd:
response = "Hmm, trying to access .exe files from an ssh terminal..... Your methods are unconventional"
elif cmd.startswith("cmd"):
response = "Command Prompt? We only use respectable shells on this machine.... Sorry"
elif cmd == "help":
send_ascii("help.txt", chan)
return
else:
send_ascii("clippy.txt", chan)
response = "Use the 'help' command to view available commands"
LOG.write(response + "
")
LOG.flush()
chan.send(response + "
")
def send_ascii(filename, chan):
"""Print ascii from a file and send it to the channel"""
with open('ascii/{}'.format(filename)) as text:
chan.send("")
for line in enumerate(text):
LOG.write(line[1])
chan.send(line[1] + "")
LOG.flush()
class FakeSshServer(paramiko.ServerInterface):
"""Settings for paramiko server interface"""
def __init__(self):
self.event = threading.Event()
def check_channel_request(self, kind, chanid):
if kind == 'session':
return paramiko.OPEN_SUCCEEDED
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
def check_auth_password(self, username, password):
# Accept all passwords as valid by default
return paramiko.AUTH_SUCCESSFUL
def get_allowed_auths(self, username):
return 'password'
def check_channel_shell_request(self, channel):
self.event.set()
return True
def check_channel_pty_request(self, channel, term, width, height, pixelwidth, pixelheight, modes):
return True
def start_server():
"""Init and run the ssh server"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', PORT))
except Exception as err:
print('*** Bind failed: {}'.format(err))
traceback.print_exc()
sys.exit(1)
while True:
try:
sock.listen(100)
print('Listening for connection ...')
client, addr = sock.accept()
except Exception as err:
print('*** Listen/accept failed: {}'.format(err))
traceback.print_exc()
LOG.write("
Connection from: " + addr[0] + "
")
print('Got a connection!')
try:
transport = paramiko.Transport(client)
transport.add_server_key(HOST_KEY)
# Change banner to appear legit on nmap (or other network) scans
transport.local_version = "SSH-2.0-OpenSSH_7.6p1 Ubuntu-4ubuntu0.3"
server = FakeSshServer()
try:
transport.start_server(server=server)
except paramiko.SSHException:
print('*** SSH negotiation failed.')
raise Exception("SSH negotiation failed")
# wait for auth
chan = transport.accept(20)
if chan is None:
print('*** No channel.')
raise Exception("No channel")
server.event.wait(10)
if not server.event.is_set():
print('*** Client never asked for a shell.')
raise Exception("No shell request")
try:
chan.send("Welcome to the my control server
")
run = True
while run:
chan.send("$ ")
command = ""
while not command.endswith(""):
transport = chan.recv(1024)
# Echo input to psuedo-simulate a basic terminal
chan.send(transport)
command += transport.decode("utf-8")
chan.send("
")
command = command.rstrip()
LOG.write("$ " + command + "
")
print(command)
if command == "exit":
run = False
else:
handle_cmd(command, chan)
except Exception as err:
print('!!! Exception: {}: {}'.format(err.__class__, err))
traceback.print_exc()
try:
transport.close()
except Exception:
pass
chan.close()
except Exception as err:
print('!!! Exception: {}: {}'.format(err.__class__, err))
traceback.print_exc()
try:
transport.close()
except Exception:
pass
if __name__ == "__main__":
start_server()
以下是我尝试使用pariko连接到它的代码:
sshClient = paramiko.SSHClient()
sshClient.set_missing_host_key_policy(paramiko.AutoAddPolicy())
sshClient.connect('127.0.0.1', username='bnc', password='pass', port=2200)
sshClient.exec_command('ls')
和
sshClient = paramiko.SSHClient()
sshClient.set_missing_host_key_policy(paramiko.AutoAddPolicy())
sshClient.connect('127.0.0.1', username='bnc', password='pass', port=2200)
channel = sshClient.get_transport().open_session()
channel.get_pty()
channel.invoke_shell()
channel.send('ls
')
在两个测试中,我收到了相同的错误:
Listening for connection ...
Got a connection!
!!! Exception: <class 'socket.error'>: Socket is closed
Traceback (most recent call last):
Listening for connection ...
File "/home/cbrunain/Projects/daa/Python/test/ssh_mock_server.py", line 152, in start_server
chan.send(transport)
File "/home/cbrunain/.local/share/virtualenvs/daa-hnBs4Nn2/lib/python2.7/site-packages/paramiko/channel.py", line 801, in send
return self._send(s, m)
File "/home/cbrunain/.local/share/virtualenvs/daa-hnBs4Nn2/lib/python2.7/site-packages/paramiko/channel.py", line 1198, in _send
raise socket.error("Socket is closed")
error: Socket is closed
No handlers could be found for logger "paramiko.transport"
解决方案
我终于找到了工作正常的东西:https://gist.github.com/cschwede/3e2c025408ab4af531651098331cce45
import logging
import socket
import sys
import threading
import paramiko
logging.basicConfig()
logger = logging.getLogger()
running = True
host_key = paramiko.RSAKey(filename='id_rsa')
def ssh_command_handler(command):
print('default : ', command)
class Server(paramiko.ServerInterface):
def __init__(self):
self.event = threading.Event()
def check_channel_request(self, kind, chanid):
if kind == 'session':
return paramiko.OPEN_SUCCEEDED
def check_auth_publickey(self, username, key):
return paramiko.AUTH_SUCCESSFUL
def get_allowed_auths(self, username):
return 'publickey'
def check_channel_exec_request(self, channel, command):
global running
# This is the command we need to parse
if command == 'exit':
running = False
ssh_command_handler(command)
self.event.set()
return True
def listener():
print('listener')
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', 2222))
sock.listen(100)
client, addr = sock.accept()
t = paramiko.Transport(client)
t.set_gss_host(socket.getfqdn(""))
t.load_server_moduli()
t.add_server_key(host_key)
server = Server()
t.start_server(server=server)
# Wait 30 seconds for a command
server.event.wait(30)
t.close()
print('end listener')
def run_server(command_handler):
global running
global ssh_command_handler
ssh_command_handler = command_handler
while running:
try:
listener()
except KeyboardInterrupt:
sys.exit(0)
except Exception as exc:
logger.error(exc)
def run_in_thread(command_handler):
thread = threading.Thread(target=run_server, args=(command_handler,))
thread.start()
if __name__ == '__main__':
run_in_thread(ssh_command_handler)
相关文章