Skip to content

Reads I/O

Low-level FASTQ reading and barcode library loading. Used internally by heuristic_discover(), tnseq_discover(), and count_barcodes().

read_barcode_fasta

read_barcode_fasta(path: str) -> set[str]

Read barcode library from FASTA. Supports .gz, .zst.

Parameters:

Name Type Description Default
path str

Path to FASTA file.

required

Returns:

Type Description
set[str]

Set of uppercase barcode sequences.

Raises:

Type Description
ValueError

If file format unsupported.

Examples:

>>> barcodes = read_barcode_fasta("barcodes.fasta")
Source code in src/seqchain/io/reads.py
def read_barcode_fasta(path: str) -> set[str]:
    """Read barcode library from FASTA. Supports .gz, .zst.

    Args:
        path: Path to FASTA file.

    Returns:
        Set of uppercase barcode sequences.

    Raises:
        ValueError: If file format unsupported.

    Examples:
        >>> barcodes = read_barcode_fasta("barcodes.fasta")
    """
    barcodes: set[str] = set()
    with open_reads_file(path, "rt") as f:
        for line in f:
            line = line.strip()
            if not line.startswith(">"):
                barcodes.add(line.upper())
    return barcodes

validate_barcodes

validate_barcodes(barcodes: set[str]) -> int

Validate barcode set: at least 10, at most 1000bp each, uniform length.

Parameters:

Name Type Description Default
barcodes set[str]

Set of barcode sequences.

required

Returns:

Type Description
int

Barcode length (all must be equal).

Raises:

Type Description
ValueError

If validation fails.

Examples:

>>> bc_len = validate_barcodes({"ATCG" * 5} | set(...))
Source code in src/seqchain/io/reads.py
def validate_barcodes(barcodes: set[str]) -> int:
    """Validate barcode set: at least 10, at most 1000bp each, uniform length.

    Args:
        barcodes: Set of barcode sequences.

    Returns:
        Barcode length (all must be equal).

    Raises:
        ValueError: If validation fails.

    Examples:
        >>> bc_len = validate_barcodes({"ATCG" * 5} | set(...))
    """
    if len(barcodes) < 10:
        raise ValueError(
            "The input contains fewer than 10 sequences. "
            "Please provide at least 10 short barcodes."
        )

    lengths = {len(bc) for bc in barcodes}
    if len(lengths) != 1:
        raise ValueError("All barcodes must be the same length.")

    bc_len = lengths.pop()
    if bc_len > 1000:
        raise ValueError(
            "Barcode length exceeds 1000bp. Provide short barcodes."
        )

    return bc_len

read_fastq_chunks

read_fastq_chunks(path: str, chunk_size: int = 65536, path2: str | None = None) -> Iterator[tuple[list[str], list[str] | None]]

Stream reads from FASTQ or .reads file in chunks. Supports .gz, .zst.

Yields (reads1, reads2) tuples where reads2 is None for single-end data. Each element is a list of read sequences (strings).

Parameters:

Name Type Description Default
path str

Path to reads file.

required
chunk_size int

Number of reads per chunk.

65536
path2 str | None

Optional path to paired-end reads file.

None

Yields:

Type Description
tuple[list[str], list[str] | None]

Tuples of (reads1_chunk, reads2_chunk_or_None).

Examples:

>>> for r1, r2 in read_fastq_chunks("reads.fastq", chunk_size=1000):
...     print(len(r1))
Source code in src/seqchain/io/reads.py
def read_fastq_chunks(
    path: str,
    chunk_size: int = 65536,
    path2: str | None = None,
) -> Iterator[tuple[list[str], list[str] | None]]:
    """Stream reads from FASTQ or .reads file in chunks. Supports .gz, .zst.

    Yields ``(reads1, reads2)`` tuples where ``reads2`` is ``None`` for
    single-end data. Each element is a list of read sequences (strings).

    Args:
        path: Path to reads file.
        chunk_size: Number of reads per chunk.
        path2: Optional path to paired-end reads file.

    Yields:
        Tuples of ``(reads1_chunk, reads2_chunk_or_None)``.

    Examples:
        >>> for r1, r2 in read_fastq_chunks("reads.fastq", chunk_size=1000):
        ...     print(len(r1))
    """
    # Determine file type from extension (strip compression suffix first)
    stripped = path
    if path.endswith(".gz") or path.endswith(".zst"):
        stripped = os.path.splitext(path)[0]

    if stripped.endswith((".fastq", ".fq")):
        file_type = "fastq"
    elif stripped.endswith(".reads"):
        file_type = "reads"
    else:
        raise ValueError(
            f"Unsupported file type for {path!r}. Must be .fastq, .fq, or .reads."
        )

    reads1: list[str] = []
    reads2: list[str] = []

    with open_reads_file(path, "rt") as f1, (
        open_reads_file(path2, "rt") if path2 else nullcontext()
    ) as f2:
        iter1 = iter(f1)
        iter2 = iter(f2) if f2 else iter([])

        while True:
            if file_type == "fastq":
                # Read the 4-line FASTQ record. If the first next()
                # raises StopIteration, that's a clean EOF at a record
                # boundary. If any of the remaining three raise, the
                # file is truncated mid-record.
                try:
                    next(iter1)  # @SEQUENCE_ID
                except StopIteration:
                    break
                try:
                    reads1.append(next(iter1).strip())
                    next(iter1)  # +
                    next(iter1)  # QUALITY
                except StopIteration:
                    raise ValueError(
                        f"Truncated FASTQ record in {path}: "
                        f"file ended mid-record"
                    ) from None

                if path2:
                    try:
                        next(iter2)
                    except StopIteration:
                        raise ValueError(
                            f"Truncated FASTQ record in {path2}: "
                            f"R2 file ended before R1"
                        ) from None
                    try:
                        reads2.append(next(iter2).strip())
                        next(iter2)
                        next(iter2)
                    except StopIteration:
                        raise ValueError(
                            f"Truncated FASTQ record in {path2}: "
                            f"file ended mid-record"
                        ) from None
            else:
                try:
                    reads1.append(next(iter1).strip())
                except StopIteration:
                    break
                if path2:
                    try:
                        reads2.append(next(iter2).strip())
                    except StopIteration:
                        break

            if len(reads1) >= chunk_size:
                yield (
                    reads1[:chunk_size],
                    reads2[:chunk_size] if path2 else None,
                )
                reads1 = reads1[chunk_size:]
                reads2 = reads2[chunk_size:] if path2 else []

        if reads1:
            yield (reads1, reads2 if path2 else None)

open_reads_file

open_reads_file(path: str, mode: str = 'rt') -> IO[Any]

Open a reads file with appropriate decompression.

Parameters:

Name Type Description Default
path str

Path to file (.fastq, .fq, .reads, .fasta, .fa, with optional .gz/.zst suffix).

required
mode str

File mode.

'rt'

Returns:

Type Description
IO[Any]

File handle.

Raises:

Type Description
ValueError

If the extension is not recognized.

Examples:

>>> with open_reads_file("reads.fastq.gz", "rt") as f:
...     pass
Source code in src/seqchain/io/reads.py
def open_reads_file(path: str, mode: str = "rt") -> IO[Any]:
    """Open a reads file with appropriate decompression.

    Args:
        path: Path to file (.fastq, .fq, .reads, .fasta, .fa, with
            optional .gz/.zst suffix).
        mode: File mode.

    Returns:
        File handle.

    Raises:
        ValueError: If the extension is not recognized.

    Examples:
        >>> with open_reads_file("reads.fastq.gz", "rt") as f:
        ...     pass
    """
    try:
        if path.endswith(".gz"):
            return gzip.open(path, mode)
        if path.endswith(".zst"):
            return zstd.open(path, mode)

        _PLAIN_EXTS = (".fastq", ".fq", ".reads", ".fasta", ".fa")
        if any(path.endswith(ext) for ext in _PLAIN_EXTS):
            return open(path, mode)
    except FileNotFoundError:
        raise FileNotFoundError(f"Reads file not found: {path}") from None

    raise ValueError(
        f"{path!r} does not appear to be a supported file: "
        f".fastq, .fq, .reads, .fasta, .fa (optionally .gz/.zst)."
    )