Python subprocess: 调用命令行程序

By | 8月 4, 2023

参考文档:The subprocess Module: Wrapping Programs With Python

subprocess模块,根据输入的命令参数,起一个新的process。Python2.4引入的,用于替代os module(os.system和os.open也可以调用命令行程序)。

subprocess.run()函数是 Python 3.5 加入的,它阻塞式起一个新的process,当前进程等待子进程执行完成。能使用run()函数则推荐使用run函数,对于某些特殊情况要精细控制子进程时,才需要调用底层的Popen接口。

run()

命令参数 – sequence/string

命令参数接收的是个sequence。

>>> import subprocess
>>> subprocess.run(["python", "timer.py", "5"])
Starting timer of 5 seconds
.....Done!
CompletedProcess(args=['python', 'timer.py', '5'], returncode=0)

如果是个string,可以用自带的shlex.split()转换成sequence。适用于当你在命令行执行命令,参数较多又有引号时,不太好手动分成sequence;或者为了可读性不想分成sequence,想着以后直接copy到terminal里执行。

>>> import shlex
>>> shlex.split("python timer.py 5")
['python', 'timer.py', '5']

>>> subprocess.run(shlex.split("python timer.py 5"))

捕获执行异常

通常我们需要捕获三种异常:

  • subprocess.CalledProcessError:check=True时,如果returncode不是0,抛此异常。
  • subprocess.TimeoutExpired:timeout=2(单位是秒),2秒之内没完成,抛此异常。
  • FileNotFoundError:可执行文件或命令不存在,抛此异常。
import subprocess

try:
    subprocess.run(
        ["python", "timer.py", "5"], timeout=10, check=True
    )
except FileNotFoundError as exc:
    print(f"Process failed because the executable could not be found.\n{exc}")
except subprocess.CalledProcessError as exc:
    print(
        f"Process failed because did not return a successful return code. "
        f"Returned {exc.returncode}\n{exc}"
    )
except subprocess.TimeoutExpired as exc:
    print(f"Process timed out.\n{exc}")

shell=True

当shell设为True时,则使用shell执行指定的命令。适用情况:需要使用shell的功能,如管道、文件通配符、环境变量以及~展开到用户目录。

注意:shell=True时,命令可以是一个字符串。

subprocess.run("ls /usr/bin | grep pycode", shell=True)

shell=True 底层使用 [“sh”, “-c”, …] 执行,所以我们可以手动使用shell执行命令:

>>> import subprocess
>>> subprocess.run(["bash", "-c", "ls /usr/bin | grep pycode"])

与子进程通讯

当用 subprocess.run() 创建子进程时,它继承了parent process的I/O Streams

  • stdin 是键盘。
  • stdout 和 stderr 是Screen,print的内容会显示在terminal上。

直接让子进程print的内容打在terminal上,通常不是我们想要的,我们想要获取print的内容。

capture_output=True:截获stdout和stderr

capture_out=True时,print不会打在terminal上,它们存在CompletedProcess对象里。

  • CompletedProcess::stdout
  • CompletedProcess::stderr

题外话,python怎样向stderr print?

  • print('I am error', file=sys.stderr)
>>> import subprocess
>>> magic_number_process = subprocess.run(
...     ["python", "magic_number.py"], capture_output=True
... )
>>> magic_number_process.stdout
b'769\n'

注意 .stdout 和 .stdout 是 byte sequence。

capture_output=True相当于同时将stdout和stderr设置为subprocess.PIPE。

>>> import subprocess
>>> magic_number_process = subprocess.run(
...     ["python", "magic_number.py"],
...     stdout=subprocess.PIPE,
...     stderr=subprocess.PIPE
... )
...
>>> magic_number_process.stdout
b'769\n'

也可以将stdout重定向到file object。

>>> from tempfile import TemporaryFile
>>> with TemporaryFile() as f:
...     ls_process = subprocess.run(["python", "magic_number.py"], stdout=f)
...     f.seek(0)
...     print(f.read().decode("utf-8"))
...
0
554

text=True / encoding=”utf-8″:stream解码为字符串

默认情况下,子进程的输入(input参数),子进程的输出(CompletedProcess的stderr和stdout)都是字节序列。如果要用字符串,就要使用 text=True,表明用默认编码方式解码byte sequence;或者 encoding=’utf-8’,手动指定编码方式。

这是hello.py,它从stdin读入一个名字,然后print一句话。既有输入,又有输出。

print("What's your name?")
name = input()
print(f"Hi {name}, nice to meet you.")

text=True默认编码,input是字符串,stdout也是字符串。

>>> import subprocess
>>> cp = subprocess.run(['python', 'hello.py'], text=True, input='Alex',
...                     capture_output=True)
>>> cp.stdout
"What's your name?\nHi Alex, nice to meet you.\n"

使用run()模拟pipe

执行两个命令,上个命令的输出时下个命令的输入。

>>> import subprocess
>>> ls_process = subprocess.run(["ls", "/usr/bin"], stdout=subprocess.PIPE)
>>> grep_process = subprocess.run(
...     ["grep", "python"], input=ls_process.stdout, stdout=subprocess.PIPE
... )
>>> print(grep_process.stdout.decode("utf-8"))
python3
python3-config
python3.8
python3.8-config
...

grep_process的输入设置给input而不是stdin,因为.stdout输出的不是文件对象,是字节对象,不能传给stdin。

下面的例子,第一个命令输出到文件,该文件是第二个命令的输入。

>>> import subprocess
>>> from tempfile import TemporaryFile
>>> with TemporaryFile() as f:
...     ls_process = subprocess.run(["ls", "/usr/bin"], stdout=f)
...     f.seek(0)
...     grep_process = subprocess.run(
...         ["grep", "python"], stdin=f, stdout=subprocess.PIPE
...     )
...
0 # from f.seek(0)
>>> print(grep_process.stdout.decode("utf-8"))
python3
python3-config
python3.8
python3.8-config
...

Popen()

Popen()会创建一个新的process,它不会阻塞当前process。

  • Popen.poll() – 检查子进程是否已被终止。终止返回exit code,否则返回None。
  • Popen.wait(timeout=None) – 等待子进程结束,返回exit code。
  • Popen.returncode
    • 此进程的退出码,由 poll() 和 wait() 设置(以及直接由 communicate() 设置)。None 值 表示此进程仍未结束。
  • Popen.communicate(input=None, timeout=None) – 与子进程交互
    • input 参数为要发送给子进程的数据。 如果流是以文本模式打开的,则 input 必须为字符串。 在其他情况下,它必须为字节串。
    • communicate() 返回一个 (stdout_data, stderr_data) 元组。如果文件以文本模式打开则为字符串;否则字节。

每隔1秒查询子进程结束没有。

p = subprocess.Popen(['python', 'calculate.py'])
while p.poll() is None:
    print('still running')
    time.sleep(1)

等待子进程结束,并读取结果。

p = subprocess.Popen(['python', 'hello.py'], text=True,
                     stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
print(p.returncode)

使用Pipe,并行执行两个命令

使用run()执行两个命令,是一个一个执行的;使用Popen()使,两个命令是并行的。前一个的输出时后一个的输入。

import subprocess

ls_process = subprocess.Popen(["ls", "/usr/bin"], stdout=subprocess.PIPE)
grep_process = subprocess.Popen(
    ["grep", "python"], stdin=ls_process.stdout, stdout=subprocess.PIPE
)

for line in grep_process.stdout:
    print(line.decode("utf-8").strip())

run()返回的是CompletedProcess,Popen()返回的是Popen对象。CompletedProcess是的stdout、stderr是string或byte string。Popen对象里的stdout、stderr是stream。