Source code for pyGenClean.FlagMAF.flag_maf_zero

#!/usr/bin/env python2.7

# This file is part of pyGenClean.
#
# pyGenClean is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# pyGenClean is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# pyGenClean.  If not, see <http://www.gnu.org/licenses/>.


import os
import sys
import logging
import argparse
import subprocess

from .. import __version__
from ..PlinkUtils import createRowFromPlinkSpacedOutput


logger = logging.getLogger("flag_maf_zero")


[docs]def main(argString=None): """The main function. :param argString: the options. :type argString: list These are the steps: 1. Prints the options. 2. Computes the frequencies using Plinl (:py:func:`computeFrequency`). 3. Finds markers with MAF of 0, and saves them in a file (:py:func:`findSnpWithMaf0`). """ # Getting and checking the options args = parseArgs(argString) checkArgs(args) logger.info("Options used:") for key, value in vars(args).iteritems(): logger.info(" --{} {}".format(key.replace("_", "-"), value)) # Compute frequency using plink logger.info("Computing the frequencies using Plink") computeFrequency(args) # Read the freqency file logger.info("Flagging SNPs with MAF = 0") findSnpWithMaf0(args.out + ".frq", args.out)
[docs]def findSnpWithMaf0(freqFileName, prefix): """Finds SNPs with MAF of 0 and put them in a file. :param freqFileName: the name of the frequency file. :param prefix: the prefix of all the files. :type freqFileName: str :type prefix: str Reads a frequency file from Plink, and find markers with a minor allele frequency of zero. """ maf_0_set = set() na_set = set() try: with open(freqFileName, "r") as inputFile: headerIndex = None for i, line in enumerate(inputFile): row = createRowFromPlinkSpacedOutput(line) if i == 0: # We have the header headerIndex = dict([ (row[i], i) for i in xrange(len(row)) ]) for columnName in ["SNP", "MAF"]: if columnName not in headerIndex: msg = "%(freqFileName)s: no column named " \ "%(columnName)s" % locals() raise ProgramError(msg) else: # We have data snpName = row[headerIndex["SNP"]] snpMAF = row[headerIndex["MAF"]] if snpMAF == "0": # We want to flag this SNP maf_0_set.add(snpName) elif snpMAF == "NA": # We want to flag this SNP, because the MAF est NA na_set.add(snpName) except IOError: msg = "%(freqFileName)s: no such file" % locals() raise ProgramError(msg) # Creating the output files if len(maf_0_set) == 0: logger.info(" - There are no markers with MAF 0") else: logger.info(" - There are {} markers with MAF 0".format( len(maf_0_set), )) outputFile = None try: with open(prefix + ".list", "w") as output_file: for marker_name in maf_0_set: print >>output_file, marker_name except IOError: msg = "{}.list: can't write file".format(prefix) raise ProgramError(msg) if len(na_set) > 0: logger.info(" - There are {} markers with NA MAF".format(len(na_set))) try: with open(prefix + ".na_list", "w") as output_file: for marker_name in na_set: print >>output_file, marker_name except IOError: msg = "{}.na_list: can't write file".format(prefix) raise ProgramError(msg)
[docs]def computeFrequency(options): """Compute the frequency of the SNPs. :param options: the options. :type options: argparse.Namespace """ # The plink command plinkCommand = ["plink", "--noweb", "--bfile", options.bfile, "--freq", "--out", options.out] output = None try: output = subprocess.check_output(plinkCommand, stderr=subprocess.STDOUT, shell=False) except subprocess.CalledProcessError: msg = "couldn't run command\n" + " ".join(plinkCommand) raise ProgramError(msg)
[docs]def checkArgs(args): """Checks the arguments and options. :param args: a :py:class:`argparse.Namespace` object containing the options of the program. :type args: argparse.Namespace :returns: ``True`` if everything was OK. If there is a problem with an option, an exception is raised using the :py:class:`ProgramError` class, a message is printed to the :class:`sys.stderr` and the program exists with code 1. """ # Check if we have the tped and the tfam files for fileName in [args.bfile + i for i in [".bed", ".bim", ".fam"]]: if not os.path.isfile(fileName): msg = "%(fileName)s: no such file" % locals() raise ProgramError(msg) return True
[docs]def parseArgs(argString=None): # pragma: no cover """Parses the command line options and arguments. :param argString: the options. :type argString: list :returns: A :py:class:`argparse.Namespace` object created by the :py:mod:`argparse` module. It contains the values of the different options. =========== ====== ========================================== Options Type Description =========== ====== ========================================== ``--bfile`` string The input file prefix (Plink binary file). ``--out`` string The prefix of the output files. =========== ====== ========================================== .. note:: No option check is done here (except for the one automatically done by argparse). Those need to be done elsewhere (see :py:func:`checkArgs`). """ args = None if argString is None: args = parser.parse_args() else: args = parser.parse_args(argString) return args
[docs]class ProgramError(Exception): """An :py:class:`Exception` raised in case of a problem. :param msg: the message to print to the user before exiting. :type msg: str """ def __init__(self, msg): """Construction of the :py:class:`ProgramError` class. :param msg: the message to print to the user :type msg: str """ self.message = str(msg) def __str__(self): return self.message
# The parser object pretty_name = "MAF flagging" desc = "Flags SNPs with MAF of 0." long_desc = ("The script flags markers with a minor allele frequency of 0 " r"\textit{i.e} monomorphic markers).") parser = argparse.ArgumentParser(description=desc) parser.add_argument("-v", "--version", action="version", version="pyGenClean version {}".format(__version__)) # The INPUT files group = parser.add_argument_group("Input File") group.add_argument("--bfile", type=str, metavar="FILE", required=True, help=("The input file prefix (will find the plink binary " "files by appending the prefix to the .bim, .bed and " ".fam files, respectively.")) # The OUTPUT files group = parser.add_argument_group("Output File") group.add_argument("--out", type=str, metavar="FILE", default="flag_maf_0", help=("The prefix of the output files. [default: " "%(default)s]"))
[docs]def safe_main(): """A safe version of the main function (that catches ProgramError).""" try: main() except KeyboardInterrupt: logger.info("Cancelled by user") sys.exit(0) except ProgramError as e: logger.error(e.message) parser.error(e.message)
if __name__ == "__main__": safe_main()