#!/usr/bin/env python2.7
# This file is part of pyGenClean.
#
# pyGenClean is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# pyGenClean is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# pyGenClean. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import logging
import argparse
import subprocess
from .. import __version__
logger = logging.getLogger("subset")
[docs]def main(argString=None):
"""The main function of the modile.
:param argString: the options.
:type argString: list
Here are the steps:
1. Prints the options.
2. Subset the data (:py:func:`subset_data`).
.. note::
The type of the output files are determined by the type of the input
files (*e.g.* if the input files are binary files, so will be the
output ones).
"""
# Getting and checking the options
args = parseArgs(argString)
checkArgs(args)
logger.info("Options used:")
for key, value in vars(args).iteritems():
logger.info(" --{} {}".format(key.replace("_", "-"), value))
# Subset the data
logger.info("Subsetting the data using Plink")
subset_data(args)
[docs]def subset_data(options):
"""Subset the data.
:param options: the options.
:type options: argparse.Namespace
Subset the data using either ``--exclude`` or ``--extract``for markers or
``--remove`` or ``keep`` for samples.
"""
# The plink command
plinkCommand = ["plink", "--noweb"]
# The input file prefix
if options.is_bfile:
plinkCommand += ["--bfile", options.ifile, "--make-bed"]
elif options.is_tfile:
plinkCommand += ["--tfile", options.ifile, "--recode", "--transpose",
"--tab"]
elif options.is_file:
plinkCommand += ["--file", options.ifile, "--recode", "--tab"]
# The subset command for SNPs
if options.exclude is not None:
plinkCommand += ["--exclude", options.exclude]
elif options.extract is not None:
plinkCommand += ["--extract", options.extract]
# The subset command for samples
if options.keep is not None:
plinkCommand += ["--keep", options.keep]
elif options.remove is not None:
plinkCommand += ["--remove", options.remove]
# The output prefix
plinkCommand += ["--out", options.out]
# Running the command
runCommand(plinkCommand)
[docs]def runCommand(command):
"""Runs a command.
:param command: the command to run.
:type command: list
If there is a problem, a :py:class:`ProgramError` is raised.
"""
output = None
try:
output = subprocess.check_output(command,
stderr=subprocess.STDOUT, shell=False)
except subprocess.CalledProcessError:
msg = "couldn't run command\n" + " ".join(command)
raise ProgramError(msg)
[docs]def checkArgs(args):
"""Checks the arguments and options.
:param args: an object containing the options of the program.
:type args: argparse.Namespace
:returns: ``True`` if everything was OK.
If there is a problem with an option, an exception is raised using the
:py:class:`ProgramError` class, a message is printed to the
:class:`sys.stderr` and the program exists with code 1.
.. note::
Only one operation for markers and one operation for samples can be
done at a time. Hence, one of ``--exclude`` or ``--extract`` can be
done for markers, and one of ``--remove`` or ``--keep`` can be done for
samples.
"""
# Check the input files
if not args.is_bfile and not args.is_tfile and not args.is_file:
msg = "needs one input file type (--is-bfile, --is-tfile or --is-file)"
raise ProgramError(msg)
if args.is_bfile and not args.is_tfile and not args.is_file:
for fileName in [args.ifile + i for i in [".bed", ".bim", ".fam"]]:
if not os.path.isfile(fileName):
msg = "{}: no such file".format(fileName)
raise ProgramError(msg)
elif args.is_tfile and not args.is_bfile and not args.is_file:
for fileName in [args.ifile + i for i in [".tped", ".tfam"]]:
if not os.path.isfile(fileName):
msg = "{}: no such file".format(fileName)
raise ProgramError(msg)
elif args.is_file and not args.is_bfile and not args.is_tfile:
for fileName in [args.ifile + i for i in [".ped", ".map"]]:
if not os.path.isfile(fileName):
msg = "{}: no such file". format(fileName)
raise ProgramError(msg)
else:
msg = ("needs only one input file type (--is-bfile, --is-tfile or "
"--is-file)")
raise ProgramError(msg)
# Check that we have at least one of exclude, extract remove or keep
if args.exclude is None and args.extract is None and \
args.remove is None and args.keep is None:
msg = "needs at least one of --exclude, --extract, --remove or --keep"
raise ProgramError(msg)
# Check for SNPs
if args.exclude is not None and args.extract is None:
if not os.path.isfile(args.exclude):
msg = "{}: no such file".format(args.exclude)
raise ProgramError(msg)
elif args.extract is not None and args.exclude is None:
if not os.path.isfile(args.extract):
msg = "{}: no such file".format(args.extract)
raise ProgramError(msg)
elif args.exclude is not None and args.extract is not None:
msg = "use only one of --extract or --exclude"
raise ProgramError(msg)
# Check for samples
if args.remove is not None and args.keep is None:
if not os.path.isfile(args.remove):
msg = "{}: no such file".format(args.remove)
raise ProgramError(msg)
elif args.keep is not None and args.remove is None:
if not os.path.isfile(args.keep):
msg = "{}: no such file".format(args.keep)
raise ProgramError(msg)
elif args.remove is not None and args.keep is not None:
msg = "use only one of --keep or --remove"
raise ProgramError(msg)
return True
[docs]def parseArgs(argString=None): # pragma: no cover
"""Parses the command line options and arguments.
:param argString: the parameters.
:type argString: list
:returns: A :py:class:`argparse.Namespace` object created by the
:py:mod:`argparse` module. It contains the values of the
different options.
============== ====== =====================================================
Options Type Description
============== ====== =====================================================
``--ifile`` string The input file prefix.
``--is-bfile`` bool The input file is a bfile
``--is-tfile`` bool The input file is a tfile
``--is-file`` bool The input file is a file
``--exclude`` string A file containing SNPs to exclude from the data set.
``--extract`` string A file containing SNPs to extract from the data set.
``--remove`` string A file containing samples (FID and IID) to remove
from the data set.
``--keep`` string A file containing samples (FID and IID) to keep from
the data set.
``--out`` string The prefix of the output files.
============== ====== =====================================================
.. note::
No option check is done here (except for the one automatically done by
argparse). Those need to be done elsewhere (see :py:func:`checkArgs`).
"""
args = None
if argString is None:
args = parser.parse_args()
else:
args = parser.parse_args(argString)
return args
[docs]class ProgramError(Exception):
"""An :py:class:`Exception` raised in case of a problem.
:param msg: the message to print to the user before exiting.
:type msg: str
"""
def __init__(self, msg):
"""Construction of the :py:class:`ProgramError` class.
:param msg: the message to print to the user
:type msg: str
"""
self.message = str(msg)
def __str__(self):
return self.message
# The parser object
pretty_name = "Data subset"
desc = "Subsets genotype data using Plink."
long_desc = "The script excludes a set of markers and/or samples."
parser = argparse.ArgumentParser(description=desc)
parser.add_argument("-v", "--version", action="version",
version="pyGenClean version {}".format(__version__))
# The INPUT files
group = parser.add_argument_group("Input File")
group.add_argument("--ifile", type=str, metavar="FILE", required=True,
help=("The input file prefix. The format will be specified "
"by --is-bfile, --is-tfile or --is-file, for bfile, "
"tfile and file, respectively."))
group.add_argument("--is-bfile", action="store_true",
help="The file specified by --ifile is a bfile")
group.add_argument("--is-tfile", action="store_true",
help="The file specified by --ifile is a tfile")
group.add_argument("--is-file", action="store_true",
help="The file specified by --ifile is a file")
# The options
group = parser.add_argument_group("Options")
group.add_argument("--exclude", type=str, metavar="FILE",
help=("A file containing SNPs to exclude from the data "
"set."))
group.add_argument("--extract", type=str, metavar="FILE",
help=("A file containing SNPs to extract from the data "
"set."))
group.add_argument("--remove", type=str, metavar="FILE",
help=("A file containing samples (FID and IID) to "
"remove from the data set."))
group.add_argument("--keep", type=str, metavar="FILE",
help=("A file containing samples (FID and IID) to keep "
"from the data set."))
# The OUTPUT files
group = parser.add_argument_group("Output File")
group.add_argument("--out", type=str, metavar="FILE", default="subset",
help=("The prefix of the output files. [default: "
"%(default)s]"))
[docs]def safe_main():
"""A safe version of the main function (that catches ProgramError)."""
try:
main()
except KeyboardInterrupt:
logger.info("Cancelled by user")
sys.exit(0)
except ProgramError as e:
logger.error(e.message)
parser.error(e.message)
if __name__ == "__main__":
safe_main()