Source code for pyGenClean.NoCallHetero.clean_noCall_hetero_snps

#!/usr/bin/env python2.7

# This file is part of pyGenClean.
#
# pyGenClean is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# pyGenClean is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# pyGenClean.  If not, see <http://www.gnu.org/licenses/>.


import os
import sys
import shutil
import logging
import argparse

import numpy as np

from .. import __version__


logger = logging.getLogger("noCall_hetero_snps")


[docs]def main(argString=None): """The main function of the module. :param argString: the options. :type argString: list These are the steps: 1. Prints the options. 2. Reads the ``tfam`` and ``tped`` files and find all heterozygous and all failed markers (:py:func:`processTPEDandTFAM`). """ # Getting and checking the options args = parseArgs(argString) checkArgs(args) logger.info("Options used:") for key, value in vars(args).iteritems(): logger.info(" --{} {}".format(key.replace("_", "-"), value)) # Process the TPED and TFAM file logger.info("Processing the TPED and TFAM file") processTPEDandTFAM(args.tfile + ".tped", args.tfile + ".tfam", args.out)
[docs]def processTPEDandTFAM(tped, tfam, prefix): """Process the TPED and TFAM files. :param tped: the name of the ``tped`` file. :param tfam: the name of the ``tfam`` file. :param prefix: the prefix of the output files. :type tped: str :type tfam: str :type prefix: str Copies the original ``tfam`` file into ``prefix.tfam``. Then, it reads the ``tped`` file and keeps in memory two sets containing the markers which are all failed or which contains only heterozygous genotypes. It creates two output files, ``prefix.allFailed`` and ``prefix.allHetero``, containing the markers that are all failed and are all heterozygous, respectively. .. note:: All heterozygous markers located on the mitochondrial chromosome are not remove. """ # Copying the tfam file try: shutil.copy(tfam, prefix + ".tfam") except IOError: msg = "%s: can't write file" % prefix + ".tfam" raise ProgramError(msg) try: outputFile = open(prefix + ".tped", "w") except IOError: msg = "%s: can't write to file" % prefix + ".tped" raise ProgramError(msg) # The name of the bad SNPs allFailed = set() allHetero = set() with open(tped, 'r') as inputFile: for line in inputFile: row = line.rstrip("\r\n").split("\t") snpInfo = row[:4] chromosome = snpInfo[0] genotypes = np.array([i.upper() for i in row[4:]]) # Testing the genotypes uniqueGenotypes = np.unique(genotypes) if len(uniqueGenotypes) == 1: # We have only one kind of genotype, either all homo, all # hetero or all no call if uniqueGenotypes[0] == "0 0": # This one is a no call allFailed.add(snpInfo[1]) elif len(set(uniqueGenotypes[0].split(" "))) == 2: # There are two different alleles, hence, hetero if chromosome not in {"26", "MT"}: # The SNP is not on a mitochondrial chromosome # (because we want to keep those) allHetero.add(snpInfo[1]) # If the SNP is good (neither in allFailed or allHetero), we keep # the SNP if (snpInfo[1] not in allFailed) and (snpInfo[1] not in allHetero): print >>outputFile, "\t".join(row) outputFile.close() # Now printing the summary files # The SNPs with no calls fileName = prefix + ".allFailed" try: with open(fileName, 'w') as outputFile: if len(allFailed) > 0: print >>outputFile, "\n".join(allFailed) except IOError: msg = "%(fileName)s: can't write to file" % locals() raise ProgramError(msg) # The SNPs with only hetero calls fileName = prefix + ".allHetero" try: with open(fileName, 'w') as outputFile: if len(allHetero) > 0: print >>outputFile, "\n".join(allHetero) except IOError: msg = "%(fileName)s: can't write to file" % locals() raise ProgramError(msg)
[docs]def checkArgs(args): """Checks the arguments and options. :param args: an object containing the options of the program. :type args: argparse.Namespace :returns: ``True`` if everything was OK. If there is a problem with an option, an exception is raised using the :py:class:`ProgramError` class, a message is printed to the :class:`sys.stderr` and the program exists with code 1. """ # Checking the input files for suffix in [".tped", ".tfam"]: fileName = args.tfile + suffix if not os.path.isfile(fileName): msg = "%(fileName)s: no such file" % locals() raise ProgramError(msg) return True
[docs]def parseArgs(argString=None): # pragma: no cover """Parses the command line options and arguments. :param argString: the options. :type argString: list :returns: A :py:class:`argparse.Namespace` object created by the :py:mod:`argparse` module. It contains the values of the different options. =========== ====== ==================================== Options Type Description =========== ====== ==================================== ``--tfile`` string The input file prefix (Plink tfile). ``--out`` string The prefix of the output files =========== ====== ==================================== .. note:: No option check is done here (except for the one automatically done by argparse). Those need to be done elsewhere (see :py:func:`checkArgs`). """ args = None if argString is None: args = parser.parse_args() else: args = parser.parse_args(argString) return args
[docs]class ProgramError(Exception): """An :py:class:`Exception` raised in case of a problem. :param msg: the message to print to the user before exiting. :type msg: str """ def __init__(self, msg): """Construction of the :py:class:`ProgramError` class. :param msg: the message to print to the user :type msg: str """ self.message = str(msg) def __str__(self): return self.message
# The parser object pretty_name = "No calls and heterozygous only markers" desc = 'Removes "no calls" only and heterozygous only markers.' long_desc = ("The script removes completely failed markers (no calls) or " "markers with only heterozygous genotypes.") parser = argparse.ArgumentParser(description=desc) parser.add_argument("-v", "--version", action="version", version="pyGenClean version {}".format(__version__)) # The INPUT files group = parser.add_argument_group("Input File") group.add_argument("--tfile", type=str, metavar="FILE", required=True, help=("The input file prefix (will find the tped and tfam " "file by appending the prefix to .tped and .tfam, " "respectively.")) # The OUTPUT files group = parser.add_argument_group("Output File") group.add_argument("--out", type=str, metavar="FILE", default="clean_noCall_hetero", help=("The prefix of the output files. [default: " "%(default)s]"))
[docs]def safe_main(): """A safe version of the main function (that catches ProgramError).""" try: main() except KeyboardInterrupt: logger.info("Cancelled by user") sys.exit(0) except ProgramError as e: logger.error(e.message) parser.error(e.message)
if __name__ == "__main__": safe_main()