#!/usr/bin/env python2.7
# This file is part of pyGenClean.
#
# pyGenClean is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# pyGenClean is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# pyGenClean. If not, see <http://www.gnu.org/licenses/>.
import os
import re
import sys
import glob
import logging
import argparse
import subprocess
from collections import namedtuple
from .. import __version__
from ..PlinkUtils import createRowFromPlinkSpacedOutput
logger = logging.getLogger("plate_bias")
[docs]def main(argString=None):
"""The main function of this module.
:param argString: the options.
:type argString: list
These are the steps:
1. Runs a plate bias analysis using Plink
(:py:func:`executePlateBiasAnalysis`).
2. Extracts the list of significant markers after plate bias analysis
(:py:func:`extractSignificantSNPs`).
3. Computes the frequency of all significant markers after plate bias
analysis (:py:func:`computeFrequencyOfSignificantSNPs`).
"""
# Getting and checking the options
args = parseArgs(argString)
checkArgs(args)
logger.info("Options used:")
for key, value in vars(args).iteritems():
logger.info(" --{} {}".format(key.replace("_", "-"), value))
# Run plink
logger.info("Running Plink to check the plate bias")
executePlateBiasAnalysis(args)
# Extract significant SNPs
logger.info("Extracting significant SNPs")
assocResults = extractSignificantSNPs(args.out)
# Remove significant SNPs using plink
logger.info("Computing frequency of significant SNPs")
maf = computeFrequencyOfSignificantSNPs(args)
# Create the final summary file
logger.info("Creating the summary file")
createSummaryFile(assocResults, maf, args.out)
[docs]def createSummaryFile(results, maf, prefix):
"""Creat the final summary file containing plate bias results.
:param results: the list of all the significant results.
:param maf: the minor allele frequency of the significant results.
:param prefix: the prefix of all the files.
:type results: list
:type maf: dict
:type prefix: str
"""
o_filename = prefix + ".significant_SNPs.summary"
try:
with open(o_filename, "w") as o_file:
print >>o_file, "\t".join(("chrom", "pos", "name", "maf", "p",
"odds", "plate"))
for row in results:
print >>o_file, "\t".join((
row.chrom,
row.pos,
row.name,
maf.get(row.name, "N/A"),
row.p,
row.odds,
row.plate,
))
except IOError:
msg = "{}: cannot write file".format(o_filename)
raise ProgramError(msg)
[docs]def computeFrequencyOfSignificantSNPs(options):
"""Computes the frequency of the significant markers.
:param options: the options.
:type options: argparse.Namespace
Extract a list of markers (significant after plate bias analysis) and
computes their frequencies.
"""
# The plink command
plinkCommand = ["plink", "--noweb", "--bfile", options.bfile, "--extract",
options.out + ".significant_SNPs.txt", "--freq", "--out",
options.out + ".significant_SNPs"]
runCommand(plinkCommand)
# Reading the frequency file
maf = {}
with open(options.out + ".significant_SNPs.frq", "r") as i_file:
header = {
name: i for i, name in
enumerate(createRowFromPlinkSpacedOutput(i_file.readline()))
}
for required_col in ("SNP", "MAF"):
if required_col not in header:
msg = "{}: missing column {}".format(
script_prefix + ".significant_SNPs.frq",
required_col,
)
raise ProgramError(msg)
for line in i_file:
row = createRowFromPlinkSpacedOutput(line)
maf[row[header["SNP"]]] = row[header["MAF"]]
return maf
[docs]def executePlateBiasAnalysis(options):
"""Execute the plate bias analysis with Plink.
:param options: the options.
:type options: argparse.Namespace
"""
# The plink command
plinkCommand = ["plink", "--noweb", "--bfile", options.bfile,
"--loop-assoc", options.loop_assoc, "--fisher",
"--pfilter", str(options.pfilter), "--out", options.out]
runCommand(plinkCommand)
[docs]def runCommand(command):
"""Run a command.
:param command: the command to run.
:type command: list
Tries to run a command. If it fails, raise a :py:class:`ProgramError`. This
function uses the :py:mod:`subprocess` module.
.. warning::
The variable ``command`` should be a list of strings (no other type).
"""
output = None
try:
output = subprocess.check_output(command,
stderr=subprocess.STDOUT, shell=False)
except subprocess.CalledProcessError:
msg = "couldn't run command\n" + " ".join(command)
raise ProgramError(msg)
[docs]def checkArgs(args):
"""Checks the arguments and options.
:param args: an object containing the options of the program.
:type args: argparse.Namespace
:returns: ``True`` if everything was OK.
If there is a problem with an option, an exception is raised using the
:py:class:`ProgramError` class, a message is printed to the
:class:`sys.stderr` and the program exists with code 1.
"""
# Check if we have the tped and the tfam files
for fileName in [args.bfile + i for i in [".bed", ".bim", ".fam"]]:
if not os.path.isfile(fileName):
msg = "%(fileName)s: no such file" % locals()
raise ProgramError(msg)
# Check the plate bias file
if not os.path.isfile(args.loop_assoc):
msg = "%s: no such file" % args.loop_assoc
raise ProgramError(msg)
return True
[docs]def parseArgs(argString=None): # pragma: no cover
"""Parses the command line options and arguments.
:param argString: the options.
:type argString: list
:returns: A :py:class:`argparse.Namespace` object created by the
:py:mod:`argparse` module. It contains the values of the
different options.
================ ====== ==================================================
Options Type Description
================ ====== ==================================================
``--bfile`` string The input file prefix (Plink binary).
``--loop-assoc`` string The file containing the plate organization of each
samples.
``--pfilter`` float The significance threshold used for the plate
effect.
``--out`` string The prefix of the output files.
================ ====== ==================================================
.. note::
No option check is done here (except for the one automatically done by
argparse). Those need to be done elsewhere (see :py:func:`checkArgs`).
"""
args = None
if argString is None:
args = parser.parse_args()
else:
args = parser.parse_args(argString)
return args
[docs]class ProgramError(Exception):
"""An :py:class:`Exception` raised in case of a problem.
:param msg: the message to print to the user before exiting.
:type msg: str
"""
def __init__(self, msg):
"""Construction of the :py:class:`ProgramError` class.
:param msg: the message to print to the user
:type msg: str
"""
self.message = str(msg)
def __str__(self):
return self.message
# The parser object
pretty_name = "Plate bias"
desc = "Checks for plate bias."
long_desc = ("The script identifies markers with plate bias, based on the "
"plates used to dilute DNA samples.")
parser = argparse.ArgumentParser(description=desc)
parser.add_argument("-v", "--version", action="version",
version="pyGenClean version {}".format(__version__))
# The INPUT files
group = parser.add_argument_group("Input File")
group.add_argument("--bfile", type=str, metavar="FILE", required=True,
help=("The input file prefix (will find the plink binary "
"files by appending the prefix to the .bim, .bed "
"and .fam files, respectively."))
group.add_argument("--loop-assoc", type=str, metavar="FILE", required=True,
help=("The file containing the plate organization of "
"each samples. Must contains three column (with no "
"header): famID, indID and plateName."))
# The options
group = parser.add_argument_group("Options")
group.add_argument("--pfilter", type=float, metavar="FLOAT", default=1e-7,
help=("The significance threshold used for the plate "
"effect. [default: %(default).1e]"))
# The OUTPUT files
group = parser.add_argument_group("Output File")
group.add_argument("--out", type=str, metavar="FILE",
default="plate_bias",
help=("The prefix of the output files. [default: "
"%(default)s]"))
[docs]def safe_main():
"""A safe version of the main function (that catches ProgramError)."""
try:
main()
except KeyboardInterrupt:
logger.info("Cancelled by user")
sys.exit(0)
except ProgramError as e:
logger.error(e.message)
parser.error(e.message)
if __name__ == "__main__":
safe_main()