Python集成测试:使用pariko伪装ssh服务器,并记录给它的命令

2022-09-01 00:00:00 python ssh paramiko

问题描述

以下是上下文: 我正在编写一个通过SSH(使用pariko)连接到远程服务器的过程,执行一些命令,然后将命令的结果返回给该过程。据我所知,它工作得很好。

我现在正试图通过伪造ssh服务器进行"端到端"测试,以检查是否发送了ssh命令,以及它们的结果是否返回给该过程。

我发现了一个使用paraiko的伪SSH服务器,当我通过真正的SSH客户端连接到它时,它工作得很好,但当我尝试使用pariko ssh客户端连接到它时,我遇到了错误:/

你知道我做错了什么吗?

以下是我的代码:

服务器:

#!/usr/bin/env python
"""Fake SSH Server Utilizing Paramiko"""
import threading
import socket
import sys
import traceback
import paramiko
import os

__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
LOG = open("log.txt", "a")
#HOST_KEY = paramiko.RSAKey(filename='keys/private.key')
HOST_KEY = paramiko.RSAKey(filename=os.path.join(__location__, 'id_rsa.new'))
PORT = 2200


def handle_cmd(cmd, chan):
    """Branching statements to handle and prepare a response for a command"""
    response = ""
    if cmd.startswith("sudo"):
        send_ascii("sudo.txt", chan)
        return
    elif cmd.startswith("ls"):
        response = "pw.txt"
    elif cmd.startswith("version"):
        response = "Super Amazing Awesome (tm) Shell v1.1"
    elif cmd.startswith("pwd"):
        response = "/home/clippy"
    elif cmd.startswith("cd"):
        send_ascii("cd.txt", chan)
        return
    elif cmd.startswith("cat"):
        send_ascii("cat.txt", chan)
        return
    elif cmd.startswith("rm"):
        send_ascii("bomb.txt", chan)
        response = "You blew up our files! How could you???"
    elif cmd.startswith("whoami"):
        send_ascii("wizard.txt", chan)
        response = "You are a wizard of the internet!"
    elif ".exe" in cmd:
        response = "Hmm, trying to access .exe files from an ssh terminal..... Your methods are unconventional"
    elif cmd.startswith("cmd"):
        response = "Command Prompt? We only use respectable shells on this machine.... Sorry"
    elif cmd == "help":
        send_ascii("help.txt", chan)
        return
    else:
        send_ascii("clippy.txt", chan)
        response = "Use the 'help' command to view available commands"

    LOG.write(response + "
")
    LOG.flush()
    chan.send(response + "
")


def send_ascii(filename, chan):
    """Print ascii from a file and send it to the channel"""
    with open('ascii/{}'.format(filename)) as text:
        chan.send("")
        for line in enumerate(text):
            LOG.write(line[1])
            chan.send(line[1] + "")
    LOG.flush()


class FakeSshServer(paramiko.ServerInterface):
    """Settings for paramiko server interface"""
    def __init__(self):
        self.event = threading.Event()

    def check_channel_request(self, kind, chanid):
        if kind == 'session':
            return paramiko.OPEN_SUCCEEDED
        return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED

    def check_auth_password(self, username, password):
        # Accept all passwords as valid by default
        return paramiko.AUTH_SUCCESSFUL

    def get_allowed_auths(self, username):
        return 'password'

    def check_channel_shell_request(self, channel):
        self.event.set()
        return True

    def check_channel_pty_request(self, channel, term, width, height, pixelwidth, pixelheight, modes):
        return True


def start_server():
    """Init and run the ssh server"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind(('', PORT))
    except Exception as err:
        print('*** Bind failed: {}'.format(err))
        traceback.print_exc()
        sys.exit(1)

    while True:
        try:
            sock.listen(100)
            print('Listening for connection ...')
            client, addr = sock.accept()
        except Exception as err:
            print('*** Listen/accept failed: {}'.format(err))
            traceback.print_exc()

        LOG.write("

Connection from: " + addr[0] + "
")
        print('Got a connection!')
        try:
            transport = paramiko.Transport(client)
            transport.add_server_key(HOST_KEY)
            # Change banner to appear legit on nmap (or other network) scans
            transport.local_version = "SSH-2.0-OpenSSH_7.6p1 Ubuntu-4ubuntu0.3"
            server = FakeSshServer()
            try:
                transport.start_server(server=server)
            except paramiko.SSHException:
                print('*** SSH negotiation failed.')
                raise Exception("SSH negotiation failed")
            # wait for auth
            chan = transport.accept(20)
            if chan is None:
                print('*** No channel.')
                raise Exception("No channel")

            server.event.wait(10)
            if not server.event.is_set():
                print('*** Client never asked for a shell.')
                raise Exception("No shell request")

            try:
                chan.send("Welcome to the my control server

")
                run = True
                while run:
                    chan.send("$ ")
                    command = ""
                    while not command.endswith(""):
                        transport = chan.recv(1024)
                        # Echo input to psuedo-simulate a basic terminal
                        chan.send(transport)
                        command += transport.decode("utf-8")

                    chan.send("
")
                    command = command.rstrip()
                    LOG.write("$ " + command + "
")
                    print(command)
                    if command == "exit":
                        run = False
                    else:
                        handle_cmd(command, chan)

            except Exception as err:
                print('!!! Exception: {}: {}'.format(err.__class__, err))
                traceback.print_exc()
                try:
                    transport.close()
                except Exception:
                    pass

            chan.close()

        except Exception as err:
            print('!!! Exception: {}: {}'.format(err.__class__, err))
            traceback.print_exc()
            try:
                transport.close()
            except Exception:
                pass


if __name__ == "__main__":
    start_server()

以下是我尝试使用pariko连接到它的代码:

    sshClient = paramiko.SSHClient()
    sshClient.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    sshClient.connect('127.0.0.1', username='bnc', password='pass', port=2200)

    sshClient.exec_command('ls')

    sshClient = paramiko.SSHClient()
    sshClient.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    sshClient.connect('127.0.0.1', username='bnc', password='pass', port=2200)

    channel = sshClient.get_transport().open_session()
    channel.get_pty()
    channel.invoke_shell()
    channel.send('ls
')

在两个测试中,我收到了相同的错误:

Listening for connection ...
Got a connection!
!!! Exception: <class 'socket.error'>: Socket is closed
Traceback (most recent call last):
Listening for connection ...
  File "/home/cbrunain/Projects/daa/Python/test/ssh_mock_server.py", line 152, in start_server
    chan.send(transport)
  File "/home/cbrunain/.local/share/virtualenvs/daa-hnBs4Nn2/lib/python2.7/site-packages/paramiko/channel.py", line 801, in send
    return self._send(s, m)
  File "/home/cbrunain/.local/share/virtualenvs/daa-hnBs4Nn2/lib/python2.7/site-packages/paramiko/channel.py", line 1198, in _send
    raise socket.error("Socket is closed")
error: Socket is closed
No handlers could be found for logger "paramiko.transport"

解决方案

我终于找到了工作正常的东西:https://gist.github.com/cschwede/3e2c025408ab4af531651098331cce45

import logging
import socket
import sys
import threading

import paramiko

logging.basicConfig()
logger = logging.getLogger()

running = True
host_key = paramiko.RSAKey(filename='id_rsa')


def ssh_command_handler(command):
    print('default : ', command)


class Server(paramiko.ServerInterface):
    def __init__(self):
        self.event = threading.Event()

    def check_channel_request(self, kind, chanid):
        if kind == 'session':
            return paramiko.OPEN_SUCCEEDED

    def check_auth_publickey(self, username, key):
        return paramiko.AUTH_SUCCESSFUL

    def get_allowed_auths(self, username):
        return 'publickey'

    def check_channel_exec_request(self, channel, command):
        global running
        # This is the command we need to parse
        if command == 'exit':
            running = False
        ssh_command_handler(command)
        self.event.set()
        return True


def listener():
    print('listener')
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('', 2222))

    sock.listen(100)
    client, addr = sock.accept()

    t = paramiko.Transport(client)
    t.set_gss_host(socket.getfqdn(""))
    t.load_server_moduli()
    t.add_server_key(host_key)
    server = Server()
    t.start_server(server=server)

    # Wait 30 seconds for a command
    server.event.wait(30)
    t.close()
    print('end listener')


def run_server(command_handler):
    global running
    global ssh_command_handler
    ssh_command_handler = command_handler
    while running:
        try:
            listener()
        except KeyboardInterrupt:
            sys.exit(0)
        except Exception as exc:
            logger.error(exc)


def run_in_thread(command_handler):
    thread = threading.Thread(target=run_server, args=(command_handler,))
    thread.start()


if __name__ == '__main__':
    run_in_thread(ssh_command_handler)

相关文章