Source code for

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import os
import sys
import shlex
import types
import subprocess as sp
import docker
from docker.models.containers import Container, ExecResult
import logging

logger = logging.getLogger(__name__)

STDOUT = sys.stdout

[docs]def get_conda_root(): output = sp.check_output(shlex.split("conda info --json")) m ="\"root_prefix\":\s+\"(\S+)\",$", output.decode("utf-8"), re.MULTILINE) try: return except AttributeError: logger.error("Failed to set conda root prefix") raise
[docs]class shell: """Class wrapper for shell commands. Wrapper for running shell commands. The wrapper accepts arguments for seamlessly running in docker/singularity containers. Based on snakemake shell implementation by Johannes Köster. Args: cmd (str): command string container (Container): docker/singularity container instance conda_env (str): conda environment to activate conda_env_list (list): additional conda environment paths to add to PATH. Note: only the conda environment names need be provided as the conda root is inferred from the current conda environment conda_root (str): conda root path. Use in case automatically inferring the root fails image (Image): docker/singularity image to run in iterable (bool): return output as iterable read (bool): read and return output async (bool): run asynchronously path_list (list): prefix command with additional paths stdout (_io.TextIOWrapper, int): stdout file object stderr (int): stderr special value stream (bool): stream containerized run; alias to iterable detach (bool): detach containerized run; alias to async process_prefix (str): temporarily override class process prefix. In containerized settings, using 'source activate' together with the automatically generated class process prefix 'set -euo pipefail' may fail Returns: stdout if read is set, an iterable if iterable is set, a process (either :py:mod:`subprocess.Popen` or :py:mod:`docker.models.containers.Container`) if async is set, None otherwise """ _process_args = {} _process_prefix = ""
[docs] @classmethod def executable(cls, cmd): if os.path.split(cmd)[-1] == "bash": cls._process_prefix = "set -euo pipefail;" cls._process_args["executable"] = cmd
[docs] @classmethod def prefix(cls, prefix): cls._process_prefix = prefix
def __new__(cls, cmd, container=None, conda_env=None, conda_env_list=[], conda_root=None, image=None, iterable=False, read=False, async=False, path_list=[], **kwargs): stdout = sp.PIPE if iterable or async or read else kwargs.pop("stdout", STDOUT) stderr = kwargs.pop("stderr", STDOUT) close_fds = sys.platform != 'win32' plist = path_list if kwargs.get("stream", False): iterable = kwargs.pop("stream") if kwargs.get("detach", False): async = kwargs.pop("detach") if conda_env_list: if not conda_root: conda_root = get_conda_root() for env in conda_env_list: p = os.path.join(conda_root, "envs", env, "bin") plist.append(p) if plist: plist.append("$PATH") path = "PATH=\"{}\";".format(":".join(plist)) else: path = "" env_prefix = "" if conda_env: # Run conda environment env_prefix = "source activate {};".format(conda_env)"Activating conda environment {}.".format(conda_env)) cmd = "{} {} {} {}".format( kwargs.pop("process_prefix", cls._process_prefix), path, env_prefix, cmd.rstrip()) if container or image: if cls._process_args["executable"]: cmd = "{} -c '{}'".format(cls._process_args["executable"], cmd) if container: try: proc = container.exec_run(cmd, stream=iterable, detach=async, **kwargs) if async: proc = proc.output except: raise elif image: try: client = docker.from_env() proc =, command=cmd, detach=async, **kwargs) except: raise else: proc = sp.Popen(cmd, bufsize=-1, shell=True, stdout=stdout, stderr=stderr, close_fds=close_fds, **cls._process_args) if iterable: return cls.iter_stdout(proc, cmd) if read: proc = cls.read_stdout(proc) return cls.stdout(proc, cmd, ret=proc) elif async: return proc return cls.stdout(proc, cmd)
[docs] @staticmethod def stdout(proc, cmd, ret=None): if isinstance(proc, sp.Popen): retcode = proc.wait() if retcode: raise sp.CalledProcessError(retcode, cmd) return ret elif isinstance(proc, str): return ret elif isinstance(proc, bytes): if ret is None: return ret else: return ret.decode() else: return ret
[docs] @staticmethod def read_stdout(proc): if isinstance(proc, sp.Popen): return elif isinstance(proc, ExecResult): return proc.output elif isinstance(proc, str): return proc elif isinstance(proc, bytes): return proc.decode()
[docs] @staticmethod def iter_stdout(proc, cmd): if isinstance(proc, types.GeneratorType): for l in proc: if isinstance(l, bytes): for k in l.decode().split("\n"): yield k else: yield l[:-1] raise StopIteration elif isinstance(proc, Container): for l in proc.logs(stream=True): yield l[:-1].decode() raise StopIteration elif isinstance(proc, ExecResult): for l in proc.output: if isinstance(l, bytes): for k in l.decode().split("\n"): yield k else: yield l[:-1].decode() raise StopIteration elif isinstance(proc, bytes): for l in proc.decode().split("\n"): yield l raise StopIteration elif isinstance(proc, str): return proc for l in proc.stdout: yield l[:-1].decode() retcode = proc.wait() if retcode: raise sp.CalledProcessError(retcode, cmd)
if "SHELL" in os.environ: shell.executable(os.environ["SHELL"])