Source code for pybgen.pybgen

"""Module that reads BGEN files."""

# This file is part of pybgen.
#
# The MIT License (MIT)
#
# Copyright (c) 2017 Louis-Philippe Lemieux Perreault
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.


from __future__ import division

import os
import sys
import zlib
import logging
import sqlite3
from math import ceil
from struct import unpack
from io import UnsupportedOperation

import numpy as np

from six.moves import range

try:
    import zstandard as zstd
    HAS_ZSTD = True
except ImportError:
    HAS_ZSTD = False


__author__ = "Louis-Philippe Lemieux Perreault"
__copyright__ = "Copyright 2017 Louis-Philippe Lemieux Perreault"
__license__ = "MIT"


__all__ = ["PyBGEN"]


# The logger
logger = logging.getLogger(__name__)


# The python version
PYTHON_VERSION = sys.version_info.major


class _Variant(object):
    __slots__ = ("name", "chrom", "pos", "a1", "a2")

    def __init__(self, name, chrom, pos, a1, a2):
        self.name = name
        self.chrom = chrom
        self.pos = pos
        self.a1 = a1
        self.a2 = a2

    def __repr__(self):
        return "<Variant {} chr{}:{}_{}/{}>".format(
            self.name, self.chrom, self.pos, self.a1, self.a2,
        )


[docs]class PyBGEN(object): """Reads and store a set of BGEN files. Args: fn (str): The name of the BGEN file. mode (str): The open mode for the BGEN file. prob_t (float): The probability threshold (optional). probs_only (boolean): Return only the probabilities instead of dosage. Reads or write BGEN files. .. code-block:: python from pybgen import PyBGEN # Reading a BGEN file with PyBGEN("bgen_file_name") as bgen: pass """ def __init__(self, fn, mode="r", prob_t=0.9, _skip_index=False, probs_only=False): """Initializes a new PyBGEN instance.""" # The mode self._mode = mode # What to return self._return_probs = probs_only if self._mode == "r": # Parsing the file self._bgen = open(fn, "rb") self._parse_header() # Did the samples were parsed? if not self._has_sample: self._samples = None # Connecting to the index self._skip_index = _skip_index if not _skip_index: if not os.path.isfile(fn + ".bgi"): raise IOError("{}: no such file".format(fn + ".bgi")) self._connect_index() # The probability self.prob_t = prob_t # Seeking to the first variant of the file self._bgen.seek(self._first_variant_block) elif self._mode == "w": raise NotImplementedError("'w' mode not yet implemented") else: raise ValueError("invalid mode: '{}'".format(self._mode)) def __repr__(self): """The representation of the PyBGEN object.""" if self._mode == "r": return "PyBGEN({:,d} samples; {:,d} variants)".format( self._nb_samples, self._nb_variants, ) return 'PyBGEN(mode="w")' def __iter__(self): """The __iter__ function.""" if self._mode != "r": raise UnsupportedOperation("not available in 'w' mode") return self def __next__(self): """The __next__ function.""" return self.next() def __enter__(self): """Entering the context manager.""" return self def __exit__(self, *args): """Exiting the context manager.""" self.close()
[docs] def close(self): """Closes the BGEN object.""" # Closing the BGEN file self._bgen.close() # Closing the index file (if in read mode) if self._mode == "r" and not self._skip_index: self._bgen_db.close()
@property def nb_variants(self): """Returns the number of markers. Returns: int: The number of markers in the dataset. """ if self._mode != "r": raise UnsupportedOperation("not available in 'w' mode") return self._nb_variants @property def nb_samples(self): """Returns the number of samples. Returns: int: The number of samples in the dataset. """ if self._mode != "r": raise UnsupportedOperation("not available in 'w' mode") return self._nb_samples @property def samples(self): """Returns the samples. Returns: tuple: The samples. """ return self._samples
[docs] def next(self): """Returns the next variant. Returns: tuple: The variant's information and its genotypes (dosage) as :py:class:`numpy.ndarray`. """ if self._mode != "r": raise UnsupportedOperation("not available in 'w' mode") if self._bgen.tell() > self._last_variant_block: raise StopIteration() return self._read_current_variant()
[docs] def iter_variants(self): """Iterates over variants from the beginning of the BGEN file. Returns: tuple: A variant and the dosage. """ if self._mode != "r": raise UnsupportedOperation("not available in 'w' mode") # Seeking back to the first variant block self._bgen.seek(self._first_variant_block) # Return itself (the generator) return self
[docs] def iter_variants_in_region(self, chrom, start, end): """Iterates over variants in a specific region. Args: chrom (str): The name of the chromosome. start (int): The starting position of the region. end (int): The ending position of the region. """ # Getting the region from the index file self._bgen_index.execute( "SELECT file_start_position " "FROM Variant " "WHERE chromosome = ? AND position >= ? AND position <= ?", (chrom, start, end), ) # Fetching all the seek positions seek_positions = [_[0] for _ in self._bgen_index.fetchall()] return self._iter_seeks(seek_positions)
[docs] def iter_variants_by_names(self, names): """Iterates over variants using a list of names. Args: names (list): A list of names to extract specific variants. """ # Fetching all the seek positions seek_positions = self._get_seeks_for_names(names) return self._iter_seeks(seek_positions)
[docs] def get_specific_variant(self, chrom, pos, ref, alt): """Get specific variant with allele lookup Args: chrom (str): The name of the chromosome. pos (int): The starting position of the region. ref (str): The reference allele. alt (str): The alternative allele. Returns: list: A list containing all the value for a given variant. The list has more than one item if there are duplicated variants. """ # Getting the region from the index file self._bgen_index.execute( "SELECT file_start_position " "FROM Variant " "WHERE chromosome = ? AND position = ? AND allele1 = ? AND " " allele2 = ?", (chrom, pos, ref, alt), ) # Fetching all the seek positions seek_positions = [_[0] for _ in self._bgen_index.fetchall()] # Fetching seek positions, we return the variant results = list(self._iter_seeks(seek_positions)) if not results: raise ValueError("{}:{} {}/{}: variant not found" "".format(chrom, pos, ref, alt)) return results
[docs] def iter_variant_info(self): """Iterate over marker information.""" self._bgen_index.execute( "SELECT chromosome, position, rsid, allele1, allele2 FROM Variant", ) # The array size array_size = 1000 # Fetching the results results = self._bgen_index.fetchmany(array_size) while results: for chrom, pos, rsid, a1, a2 in results: yield _Variant(rsid, chrom, pos, a1, a2) results = self._bgen_index.fetchmany(array_size)
def _iter_seeks(self, seeks): """Iterate over seek positions.""" for seek in seeks: self._bgen.seek(seek) yield self._read_current_variant() def _get_seeks_for_names(self, names): """Gets the seek values for each names.""" # Generating a temporary table that will contain the markers to extract self._bgen_index.execute("CREATE TEMPORARY TABLE tnames (name text)") self._bgen_index.executemany( "INSERT INTO tnames VALUES (?)", [(n, ) for n in names], ) # Fetching the seek positions self._bgen_index.execute( "SELECT file_start_position " "FROM Variant " "WHERE rsid IN (SELECT name FROM tnames)", ) return tuple(_[0] for _ in self._bgen_index.fetchall())
[docs] def get_variant(self, name): """Gets the values for a given variant. Args: name (str): The name of the variant. Returns: list: A list containing all the value for a given variant. The list has more than one item if there are duplicated variants. """ if self._mode != "r": raise UnsupportedOperation("not available in 'w' mode") # Fetching the variant self._bgen_index.execute( "SELECT file_start_position FROM Variant WHERE rsid = ?", (name, ) ) # Fetching all the seek positions seek_positions = [_[0] for _ in self._bgen_index.fetchall()] # Constructing the results results = list(self._iter_seeks(seek_positions)) if not results: raise ValueError("{}: name not found".format(name)) return results
def _read_current_variant(self): """Reads the current variant.""" # Getting the variant's information var_id, rs_id, chrom, pos, alleles = self._get_curr_variant_info() # Getting the variant's dosage dosage = self._get_curr_variant_data() return _Variant(rs_id, chrom, pos, *alleles), dosage def _get_curr_variant_info(self): """Gets the current variant's information.""" if self._layout == 1: n = unpack("<I", self._bgen.read(4))[0] if n != self._nb_samples: raise ValueError( "{}: invalid BGEN file".format(self._bgen.name), ) # Reading the variant id var_id = self._bgen.read(unpack("<H", self._bgen.read(2))[0]).decode() # Reading the variant rsid rs_id = self._bgen.read(unpack("<H", self._bgen.read(2))[0]).decode() # Reading the chromosome chrom = self._bgen.read(unpack("<H", self._bgen.read(2))[0]).decode() # Reading the position pos = unpack("<I", self._bgen.read(4))[0] # Getting the number of alleles nb_alleles = 2 if self._layout == 2: nb_alleles = unpack("<H", self._bgen.read(2))[0] # Getting the alleles alleles = [] for _ in range(nb_alleles): alleles.append(self._bgen.read( unpack("<I", self._bgen.read(4))[0] ).decode()) return var_id, rs_id, chrom, pos, tuple(alleles) def _get_curr_variant_probs_layout_1(self): """Gets the current variant's probabilities (layout 1).""" c = self._nb_samples if self._is_compressed: c = unpack("<I", self._bgen.read(4))[0] # Getting the probabilities probs = np.frombuffer( self._decompress(self._bgen.read(c)), dtype="u2", ) / 32768 probs.shape = (self._nb_samples, 3) return probs def _get_curr_variant_probs_layout_2(self): """Gets the current variant's probabilities (layout 2).""" # The total length C of the rest of the data for this variant c = unpack("<I", self._bgen.read(4))[0] # The number of bytes to read to_read = c # D = C if no compression d = c if self._is_compressed: # The total length D of the probability data after # decompression d = unpack("<I", self._bgen.read(4))[0] to_read = c - 4 # Reading the data and checking data = self._decompress(self._bgen.read(to_read)) if len(data) != d: raise ValueError( "{}: invalid BGEN file".format(self._bgen.name) ) # Checking the number of samples n = unpack("<I", data[:4])[0] if n != self._nb_samples: raise ValueError( "{}: invalid BGEN file".format(self._bgen.name) ) data = data[4:] # Checking the number of alleles (we only accept 2 alleles) nb_alleles = unpack("<H", data[:2])[0] if nb_alleles != 2: raise ValueError( "{}: only two alleles are " "supported".format(self._bgen.name) ) data = data[2:] # TODO: Check ploidy for sexual chromosomes # The minimum and maximum for ploidy (we only accept ploidy of 2) min_ploidy = _byte_to_int(data[0]) max_ploidy = _byte_to_int(data[1]) if min_ploidy != 2 or max_ploidy != 2: raise ValueError( "{}: only accepting ploidy of " "2".format(self._bgen.name) ) data = data[2:] # Check the list of N bytes for missingness (since we assume only # diploid values for each sample) ploidy_info = np.frombuffer(data[:n], dtype=np.uint8) ploidy_info = np.unpackbits( ploidy_info.reshape(1, ploidy_info.shape[0]).T, axis=1, ) missing_data = ploidy_info[:, 0] == 1 data = data[n:] # TODO: Permit phased data # Is the data phased? is_phased = data[0] == 1 if is_phased: raise ValueError( "{}: only accepting unphased data".format(self._bgen.name) ) data = data[1:] # The number of bits used to encode each probabilities b = _byte_to_int(data[0]) data = data[1:] # Reading the probabilities (don't forget we allow only for diploid # values) probs = None if b == 8: probs = np.frombuffer(data, dtype=np.uint8) elif b == 16: probs = np.frombuffer(data, dtype=np.uint16) elif b == 32: probs = np.frombuffer(data, dtype=np.uint32) else: probs = _pack_bits(data, b) # Changing shape and computing dosage probs.shape = (self._nb_samples, 2) return probs / (2**b - 1), missing_data def _get_curr_variant_data(self): """Gets the current variant's dosage or probabilities.""" if self._layout == 1: # Getting the probabilities probs = self._get_curr_variant_probs_layout_1() if self._return_probs: # Returning the probabilities return probs else: # Returning the dosage return self._layout_1_probs_to_dosage(probs) else: # Getting the probabilities probs, missing_data = self._get_curr_variant_probs_layout_2() if self._return_probs: # Getting the alternative allele homozygous probabilities last_probs = self._get_layout_2_last_probs(probs) # Stacking the probabilities last_probs.shape = (last_probs.shape[0], 1) full_probs = np.hstack((probs, last_probs)) # Setting the missing to NaN full_probs[missing_data] = np.nan # Returning the probabilities return full_probs else: # Computing the dosage dosage = self._layout_2_probs_to_dosage(probs) # Setting the missing to NaN dosage[missing_data] = np.nan # Returning the dosage return dosage def _layout_1_probs_to_dosage(self, probs): """Transforms probability values to dosage (from layout 1)""" # Constructing the dosage dosage = 2 * probs[:, 2] + probs[:, 1] if self.prob_t > 0: dosage[~np.any(probs >= self.prob_t, axis=1)] = np.nan return dosage @staticmethod def _get_layout_2_last_probs(probs): """Gets the layout 2 last probabilities (homo alternative).""" return 1 - np.sum(probs, axis=1) def _layout_2_probs_to_dosage(self, probs): """Transforms probability values to dosage (from layout 2).""" # Computing the last genotype's probabilities last_probs = self._get_layout_2_last_probs(probs) # Constructing the dosage dosage = 2 * last_probs + probs[:, 1] # Setting low quality to NaN if self.prob_t > 0: good_probs = ( np.any(probs >= self.prob_t, axis=1) | (last_probs >= self.prob_t) ) dosage[~good_probs] = np.nan return dosage def _parse_header(self): """Parses the header (header and sample blocks).""" # Parsing the header block self._parse_header_block() # Parsing the sample block (if any) if self._has_sample: self._parse_sample_block() def _parse_header_block(self): """Parses the header block.""" # Getting the data offset (the start point of the data self._offset = unpack("<I", self._bgen.read(4))[0] self._first_variant_block = self._offset + 4 # Getting the header size self._header_size = unpack("<I", self._bgen.read(4))[0] # Getting the number of samples and variants self._nb_variants = unpack("<I", self._bgen.read(4))[0] self._nb_samples = unpack("<I", self._bgen.read(4))[0] # Checking the magic number magic = self._bgen.read(4) if magic != b"bgen": # The magic number might be 0, then if unpack("<I", magic)[0] != 0: raise ValueError( "{}: invalid BGEN file.".format(self._bgen.name) ) # Passing through the "free data area" self._bgen.read(self._header_size - 20) # Reading the flag flag = np.frombuffer(self._bgen.read(4), dtype=np.uint8) flag = np.unpackbits(flag.reshape(1, flag.shape[0]).T, axis=1) # Getting the compression type from the layout compression = _bits_to_int(flag[0, -2:]) self._is_compressed = False if compression == 0: # No decompression required self._decompress = self._no_decompress elif compression == 1: # ZLIB decompression self._decompress = zlib.decompress self._is_compressed = True elif compression == 2: if not HAS_ZSTD: raise ValueError("zstandard module is not installed") # ZSTANDARD decompression (needs to be check) self._decompress = zstd.ZstdDecompressor().decompress self._is_compressed = True # Getting the layout layout = _bits_to_int(flag[0, -6:-2]) if layout == 0: raise ValueError( "{}: invalid BGEN file".format(self._bgen.name) ) elif layout == 1: self._layout = 1 elif layout == 2: self._layout = 2 else: raise ValueError( "{}: {} invalid layout type".format(self._bgen.name, layout) ) # Checking if samples are in the file self._has_sample = flag[-1, 0] == 1 def _parse_sample_block(self): """Parses the sample block.""" # Getting the block size block_size = unpack("<I", self._bgen.read(4))[0] if block_size + self._header_size > self._offset: raise ValueError( "{}: invalid BGEN file".format(self._bgen.name) ) # Checking the number of samples n = unpack("<I", self._bgen.read(4))[0] if n != self._nb_samples: raise ValueError( "{}: invalid BGEN file".format(self._bgen.name) ) # Getting the sample information samples = [] for i in range(self._nb_samples): size = unpack("<H", self._bgen.read(2))[0] samples.append(self._bgen.read(size).decode()) self._samples = tuple(samples) # Just a check with the header if len(self.samples) != self._nb_samples: raise ValueError("{}: number of samples different between header " "and sample block".format(self._bgen.name)) def _connect_index(self): """Connect to the index (which is an SQLITE database).""" self._bgen_db = sqlite3.connect(self._bgen.name + ".bgi") self._bgen_index = self._bgen_db.cursor() # Fetching the number of variants and the first and last seek position self._bgen_index.execute( "SELECT COUNT (rsid), " " MIN (file_start_position), " " MAX (file_start_position) " "FROM Variant" ) result = self._bgen_index.fetchone() nb_markers = result[0] first_variant_block = result[1] self._last_variant_block = result[2] # Checking the number of markers if nb_markers != self._nb_variants: raise ValueError( "{}: number of markers different between header ({:,d}) " "and index file ({:,d})".format( self._bgen.name, self._nb_variants, nb_markers, ) ) # Checking the first variant seek position if first_variant_block != self._first_variant_block: raise ValueError("{}: invalid index".format(self._bgen.name)) @staticmethod def _no_decompress(data): return data
def _bits_to_int(bits): """Converts bits to int.""" result = 0 for bit in bits: result = (result << 1) | bit return result def _byte_to_int_python3(byte): """Converts a byte to a int for python 3.""" return byte def _byte_to_int_python2(byte): """Converts a byte to a int for python 2.""" return unpack("<B", byte)[0] _byte_to_int = _byte_to_int_python3 if PYTHON_VERSION < 3: _byte_to_int = _byte_to_int_python2 def _pack_bits(data, b): """Unpacks BGEN probabilities (as bits).""" # Getting the data from the bytes data = np.fromiter( ((_byte_to_int(byte) >> i) & 1 for byte in data for i in range(8)), dtype=bool, ) data.shape = (data.shape[0] // b, b) # Finding the closest full bytes (if required) # TODO: Improve this so that it is more efficient full_bytes = data[:, ::-1] if data.shape[1] % 8 != 0: nb_bits = int(ceil(b / 8)) * 8 full_bytes = np.zeros((data.shape[0], nb_bits), dtype=bool) full_bytes[:, -b:] += data[:, ::-1] # Packing the bits packed = np.packbits(full_bytes, axis=1) # Left-shifting for final value final = packed[:, 0] for i in range(1, packed.shape[1]): final = np.left_shift(final, 8, dtype=np.uint) | packed[:, i] return final