Source code for pyGenClean.LaTeX.merge_reports

#!/usr/bin/env python2.7

# This file is part of pyGenClean.
#
# pyGenClean is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# pyGenClean is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# pyGenClean.  If not, see <http://www.gnu.org/licenses/>.


import os
import re
import sys
import time
import shutil
import logging
import argparse
from glob import glob

from . import auto_report
from ..PlinkUtils import get_plink_version


logger = logging.getLogger("merge_reports")


[docs]def main(argString=None): """The main function of this module. :param argString: the options. :type argString: list of strings """ # Getting and checking the options args = parseArgs(argString) checkArgs(args) logger.info("Options used:") for key, value in vars(args).iteritems(): logger.info(" --{} {}".format(key.replace("_", "-"), value)) # Checking if the output directory exists, creating it otherwise if not os.path.isdir(args.out_dir): os.mkdir(args.out_dir) # Ordering the directories according to their name qc_dir = order_qc_dir(args.qc_dir) # First, we want to merge the required files merge_required_files(qc_dir, args.out_dir) # Then, we want to copy the initial_files file copy_initial_files(os.path.join(qc_dir[0], "initial_files.txt"), args.out_dir) # Get the final number of markers and samples final_nb_markers, final_nb_samples = get_final_numbers( os.path.join(qc_dir[-1], "final_files.txt"), args.out_dir, ) # Getting the steps summary file (TeX) summary_files = get_summary_files(qc_dir) # Generating the report generate_report(args.out_dir, summary_files, final_nb_markers, final_nb_samples, args)
[docs]def order_qc_dir(dirnames): """Order the QC directory names according to their date. :param dirnames: the list of directories to merge data from. :type dirnames: list :returns: the sorted list of directories :rtype: list """ return sorted( dirnames, key=lambda dn: time.strptime( os.path.basename(dn.rstrip("/"))[14:], "%Y-%m-%d_%H.%M.%S", ) )
[docs]def merge_required_files(dirnames, out_dir): """Merges the required files from each of the directories. :param dirnames: the list of directories to merge data from. :param out_dir: the name of the output directory. :type dirnames: list :type out_dir: str """ # The list of files to merge fn_to_merge = ("steps_summary.tex", "excluded_markers.txt", "excluded_samples.txt") # Merging the files for fn in fn_to_merge: o_fn = os.path.join(out_dir, fn) with open(o_fn, "w") as o_file: for dn in dirnames: i_fn = os.path.join(dn, fn) with open(i_fn, "r") as i_file: o_file.write(i_file.read()) # Merging the result summary file o_fn = os.path.join(out_dir, "results_summary.txt") with open(o_fn, "w") as o_file: for i, dn in enumerate(dirnames): i_fn = os.path.join(dn, "results_summary.txt") with open(i_fn, "r") as i_file: if i != 0: # We skip the first 4 lines (file descriptions) [i_file.readline() for i in range(4)] o_file.write(i_file.read()) # Merging the graphic paths file graphic_paths = set() for dn in dirnames: fn = os.path.join(dn, "graphic_paths.txt") if os.path.isfile(fn): with open(fn, "r") as i_file: graphic_paths.update({ os.path.join(dn, path) for path in i_file.read().splitlines() }) if len(graphic_paths) > 0: with open(os.path.join(out_dir, "graphic_paths.txt"), "w") as o_file: for path in sorted(graphic_paths): print >>o_file, os.path.relpath(path, out_dir)
[docs]def copy_initial_files(filename, out_dir): """Copy the initial_files file to the final directory. :param filename: the name of the file. :param out_dir: the name of the output directory :type dirname: str :type out_dir: str """ shutil.copy(filename, out_dir)
[docs]def get_final_numbers(filename, out_dir): """Copy the final_files file and get the number of markers and samples. :param filename: the name of the file. :param out_dir: the output directory. :type filename: str :type out_dir: str :returns: the final number of markers and samples :rtype: tuple """ # Copying the file shutil.copy(filename, out_dir) # Reading the number of markers and samples nb_samples = None nb_markers = None with open(filename, "r") as i_file: for line in i_file: row = line.rstrip("\r\n").split("\t") if len(row) == 1: continue path, ext = os.path.splitext(row[0]) if ext in {".bim", ".tped", ".map"}: nb_markers = row[1] elif ext in {".fam", ".ped", ".tfam"}: nb_samples = row[1] assert nb_samples assert nb_markers return nb_markers, nb_samples
[docs]def get_summary_files(dirnames): """Gets the TeX summary files for each test. :param dirnames: the list of directories to merge data from. :type dirnames: list :returns: a list of summary file names. :rtype: list """ # A useful regular expression to get step number in the current directory step_nb_re = re.compile(r"^([0-9]+)_\S+") # The final list of summary files final_summary_files = [] # For each of the directory for dn in dirnames: # Getting the step directories step_dir = [ n for n in os.listdir(dn) if os.path.isdir(os.path.join(dn, n)) and step_nb_re.match(n) ] # Sorting the step directories step_dir.sort(key=lambda x: int(step_nb_re.match(x).group(1))) # Getting the name of the summary file for each of the step directory step_summary_files = [ glob(os.path.join(dn, sn, "*.summary.tex")) for sn in step_dir ] # Checking we have only one summary file for summary_file in step_summary_files: if len(summary_file) > 1: raise ProgramError("{}: multiple summary files".format( os.path.join(dn, sn), )) if not summary_file: raise ProgramError("{}: missing summary file".format( os.apth.join(dn, sn), )) final_summary_files.extend(i[0] for i in step_summary_files) return [os.path.abspath(fn) for fn in final_summary_files]
[docs]def generate_report(out_dir, latex_summaries, nb_markers, nb_samples, options): """Generates the report. :param out_dir: the output directory. :param latex_summaries: the list of LaTeX summaries. :param nb_markers: the final number of markers. :param nb_samples: the final number of samples. :param options: the list of options. :type out_dir: str :type latex_summaries: list :type nb_markers: str :type nb_samples: str :type options: argparse.Namespace """ # Getting the graphic paths file graphic_paths_fn = None if os.path.isfile(os.path.join(out_dir, "graphic_paths.txt")): graphic_paths_fn = os.path.join(out_dir, "graphic_paths.txt") # We create the automatic report report_name = os.path.join(out_dir, "merged_report.tex") auto_report.create_report( out_dir, report_name, project_name=options.report_number, steps_filename=os.path.join(out_dir, "steps_summary.tex"), summaries=latex_summaries, background=options.report_background, summary_fn=os.path.join(out_dir, "results_summary.txt"), report_title=options.report_title, report_author=options.report_author, initial_files=os.path.join(out_dir, "initial_files.txt"), final_files=os.path.join(out_dir, "final_files.txt"), final_nb_markers=nb_markers, final_nb_samples=nb_samples, plink_version=get_plink_version(), graphic_paths_fn=graphic_paths_fn, )
[docs]def checkArgs(args): """Checks the arguments and options. :param args: an object containing the options of the program. :type args: argparse.Namespace :returns: ``True`` if everything was OK. If there is a problem with an option, an exception is raised using the :py:class:`ProgramError` class, a message is printed to the :class:`sys.stderr` and the program exists with code 1. """ # For all input directories for dn in args.qc_dir: # Checking that all the directories exists if not os.path.isdir(dn): raise ProgramError("{}: no such directory".format(dn)) # Checking that this is a directory created from pyGenClean if not os.path.basename(dn.rstrip("/")).startswith("data_clean_up."): raise ProgramError("{}: not a pyGenClean directory".format(dn)) # Checking that each directory contains the required files for fn in ("excluded_markers.txt", "excluded_samples.txt", "results_summary.txt", "steps_summary.tex", "initial_files.txt", "final_files.txt"): required_fn = os.path.join(dn, fn) if not os.path.isfile(required_fn): raise ProgramError("{}: missing required " "file".format(required_fn)) return True
[docs]def parseArgs(argString=None): # pragma: no cover """Parses the command line options and arguments. :param argString: the options. :type argString: list of strings :returns: A :py:class:`argparse.Namespace` object created by the :py:mod:`argparse` module. It contains the values of the different options. ======================= ====== ============================================ Options Type Description ======================= ====== ============================================ ``--report-author`` String The current project number. ``--report-number`` String The current project author. ``--report-background`` String Text of file containing the background section of the report. ======================= ====== ============================================ .. note:: No option check is done here (except for the one automatically done by argparse). Those need to be done elsewhere (see :py:func:`checkArgs`). """ args = None if argString is None: args = parser.parse_args() else: args = parser.parse_args(argString) return args
[docs]def add_custom_options(parser): """Adds custom options to a parser. :param parser: the parser to which to add options. :type parser: argparse.ArgumentParser """ parser.add_argument("--report-title", type=str, metavar="TITLE", default="Genetic Data Clean Up", help="The report title. [default: %(default)s]") parser.add_argument("--report-author", type=str, metavar="AUTHOR", default="pyGenClean", help="The current project number. " "[default: %(default)s]") parser.add_argument("--report-number", type=str, metavar="NUMBER", default="Simple Project", help="The current project author. " "[default: %(default)s]") parser.add_argument("--report-background", type=str, metavar="BACKGROUND", default="The aim of this project is to perform data " "QC prior to genetic analysis.", help="Text of file containing the background section " "of the report.")
[docs]class ProgramError(Exception): """An :py:class:`Exception` raised in case of a problem. :param msg: the message to print to the user before exiting. :type msg: str """ def __init__(self, msg): """Construction of the :py:class:`ProgramError` class. :param msg: the message to print to the user :type msg: str """ self.message = str(msg) def __str__(self): return self.message
# The parser object pretty_name = "Merge reports" desc = """Merges automatic reports from other pyGenClean runs.""" parser = argparse.ArgumentParser(description=desc) # The INPUT files group = parser.add_argument_group("Input") group.add_argument("--qc-dir", nargs="+", required=True, metavar="DIR", help="A list of directory containing pyGenClean runs.") # The options group = parser.add_argument_group("Report Options") add_custom_options(group) # The OUTPUT files group = parser.add_argument_group("Output Directory") group.add_argument("--out-dir", type=str, metavar="FILE", default="pyGenClean_report", help=("The name of the directory that will contain the " "final report. [default: %(default)s]"))
[docs]def safe_main(): """A safe version of the main function (that catches ProgramError).""" try: main() except KeyboardInterrupt: logger.info("Cancelled by user") sys.exit(0) except ProgramError as e: logger.error(e.message) parser.error(e.message)
if __name__ == "__main__": safe_main()