Source code for create_input_txt

#!/usr/bin/env python3
"""
create_input_txt
================

Create an input list of BIDS images to preprocess.

This script scans one or more BIDS dataset roots and writes a text file
containing one **quoted absolute path** per line (e.g., T1w images). It supports:

- Longitudinal (``sub-*/ses-*/anat``) and cross-sectional (``sub-*/anat``) layouts
- Excluding subjects/sessions/runs via ``exclude.yaml``
- Optional filtering by a list of subjects
- Optional filtering by age range (in months), using ``participants.tsv`` or a custom TSV

By default, the exclusion file is expected at::

    <bids_root>/code/qc/raw/exclude.yaml

The output text file can be passed directly to your preprocessing script
(e.g., ``brainprep.py --inputs <output.txt>``).

Notes for Sphinx
----------------
- All logic is import-safe (no work performed at import).
- Use :func:`main` as the CLI entrypoint.

Examples
--------
Single longitudinal dataset::

    python create_input_txt.py /path/to/hc-calgary-preschool \
      -l long \
      --modality T1w \
      -o to_preprocess_hc-calgary-preschool.txt

Multiple datasets at once::

    python create_input_txt.py /path/to/hc-bcp /path/to/hc-calgary-preschool \
      -l long long \
      --modality T1w \
      -o to_preprocess_all.txt

With an age filter (1–7 years = 12–84 months)::

    python create_input_txt.py /path/to/hc-calgary-preschool \
      -l long \
      --modality T1w \
      --min-age-months 12 --max-age-months 84 \
      --age-tsv /path/to/hc-calgary-preschool/participants.tsv \
      --age-col age --age-units years \
      -o to_preprocess_12to84mo.txt
"""

from __future__ import annotations

import argparse
import csv
import fnmatch
import glob
import os
from dataclasses import dataclass
from typing import Dict, Iterable, Iterator, List, Optional, Sequence, Set, Tuple, Union

import yaml


# -----------------------------------------------------------------------------
# Small helpers
# -----------------------------------------------------------------------------
def _norm_sub(s: str) -> str:
    """Normalize a participant label into a BIDS-like ``sub-...`` form."""
    s = str(s).strip().strip('"').strip("'")
    return s if s.startswith("sub-") else f"sub-{s}"


def _norm_ses(s: Union[str, int, float, None]) -> Optional[str]:
    """Normalize a session label into ``ses-XX`` (numeric) or ``ses-<str>``."""
    if s is None or str(s).strip() == "":
        return None
    s = str(s).strip()
    if s.startswith("ses-"):
        return s
    try:
        return f"ses-{int(float(s)):02d}"
    except ValueError:
        return f"ses-{s}"


def _iter_unwrap_lines(fp: Iterable[str]) -> Iterator[str]:
    """Yield lines with outer quotes removed if the entire line is quoted."""
    for line in fp:
        line = line.rstrip("\r\n")
        if len(line) >= 2 and line.startswith('"') and line.endswith('"'):
            yield line[1:-1]
        else:
            yield line


def _robust_dict_reader(path: str) -> Iterator[Dict[str, str]]:
    """
    CSV/TSV DictReader that handles:
    - auto delimiter sniffing
    - "whole line quoted" files (a single field containing embedded separators)
    """
    with open(path, newline="") as f:
        sample = f.read(8192)
        f.seek(0)
        try:
            dialect = csv.Sniffer().sniff(sample)
        except csv.Error:
            dialect = csv.excel_tab

        # Peek header to detect "one giant quoted field" case
        peek = next(csv.reader([f.readline()], dialect=dialect))
        f.seek(0)

        if len(peek) == 1 and ("," in peek[0] or "\t" in peek[0]):
            reader = csv.DictReader(_iter_unwrap_lines(f), dialect=dialect)
        else:
            reader = csv.DictReader(f, dialect=dialect)

        for row in reader:
            yield {
                (k.strip() if isinstance(k, str) else k): (v.strip() if isinstance(v, str) else v)
                for k, v in row.items()
            }


# -----------------------------------------------------------------------------
# Age lookup
# -----------------------------------------------------------------------------
[docs] def load_age_lookup( bids_root: str, layout: str, tsv_path: Optional[str] = None, pid_col: str = "participant_id", ses_col: str = "session", age_col: str = "age", age_units: str = "years", ) -> Dict[Union[str, Tuple[str, str]], float]: """ Load ages from a TSV/CSV into a lookup dictionary. For ``layout="long"`` this returns:: {(sub_id, ses_id): age_months, ...} For ``layout="cross"`` this returns:: {sub_id: age_months, ...} Parameters ---------- bids_root: BIDS dataset root. layout: ``"long"`` or ``"cross"``. tsv_path: Optional TSV/CSV path. If None, defaults to ``<bids_root>/participants.tsv``. pid_col, ses_col, age_col: Column names in the TSV. age_units: Either ``"years"`` or ``"months"``. Years are converted to months. Returns ------- dict Age lookup in months. """ if tsv_path is None: tsv_path = os.path.join(bids_root, "participants.tsv") if not os.path.exists(tsv_path): print("The participants.tsv file does not exist...") return {} lut: Dict[Union[str, Tuple[str, str]], float] = {} for row in _robust_dict_reader(tsv_path): pid = row.get(pid_col) if not pid: continue sub = _norm_sub(pid) age_raw = row.get(age_col, "") if age_raw in ("", "NA", "NaN", None): continue try: age = float(age_raw) except ValueError: continue age_mo = age * 12.0 if age_units.lower().startswith("year") else age if layout == "long": ses = _norm_ses(row.get(ses_col, "")) if not ses: continue lut[(sub, ses)] = age_mo else: lut[sub] = age_mo return lut
# ----------------------------------------------------------------------------- # Excludes handling # -----------------------------------------------------------------------------
[docs] def load_excludes(yaml_path: str) -> List[str]: """ Load exclude identifiers from YAML. Supports either: - a YAML list - a dict containing a list under keys: ``exclude``, ``exclude_paths``, ``exclude_images`` """ with open(yaml_path, "r") as f: data = yaml.safe_load(f) if isinstance(data, list): return data if isinstance(data, dict): for key in ("exclude", "exclude_paths", "exclude_images"): if key in data and isinstance(data[key], list): return data[key] raise ValueError(f"Could not parse excludes from {yaml_path!r}")
[docs] def expand_excludes(excludes: Sequence[str]) -> List[str]: """ Expand BIDS-style identifiers into glob-like relative patterns. Examples -------- - ``sub-10001`` -> ``sub-10001/**`` - ``sub-10001_ses-001`` -> ``sub-10001/ses-001/**`` - path-like patterns are kept as-is. """ patterns: List[str] = [] for e in excludes: norm = e.strip().replace(os.sep, "/") if "/" in norm or "*" in norm: patterns.append(norm) elif "_ses-" in norm: sub, _, ses = norm.partition("_ses-") patterns.append(f"{sub}/ses-{ses}/**") else: patterns.append(f"{norm}/**") return patterns
[docs] def is_excluded(path: str, patterns: Sequence[str], root: str) -> bool: """Check whether an absolute path is excluded by any relative pattern.""" rel = os.path.relpath(path, root).replace(os.sep, "/") return any(fnmatch.fnmatch(rel, pat) for pat in patterns)
# ----------------------------------------------------------------------------- # Core scanning logic # -----------------------------------------------------------------------------
[docs] @dataclass(frozen=True) class ScanArgs: """Arguments used during scanning (subset of CLI args).""" exclude_file: str modality: Optional[str] min_age_months: Optional[int] max_age_months: Optional[int] pattern: Optional[str]
[docs] def process_dir( root: str, glob_suffix: str, layout: str, scan_args: ScanArgs, allowed_subs: Set[str], age_lut: Dict[Union[str, Tuple[str, str]], float], ) -> Set[str]: """ Scan one BIDS dataset root and return a set of matching absolute file paths. Applies: - exclusion patterns from ``exclude.yaml`` - optional subject filter - optional age filter (months), with fallback to parsing ``ses-XXmo`` if present Parameters ---------- root: Absolute dataset root. glob_suffix: Glob pattern to use for non-anat modalities (or fallback scanning). layout: ``"long"`` or ``"cross"``. scan_args: Scanning parameters (exclude file, modality, age range, pattern). allowed_subs: Set of allowed subject IDs (e.g. ``{"sub-10001", ...}``). Empty means no restriction. age_lut: Age lookup in months. Returns ------- set[str] Matching image paths (absolute). """ print(f"Processing {root}") def _rel(path: str) -> str: return os.path.relpath(path, root).replace(os.sep, "/") def _first_sub_from_path(path: str) -> Optional[str]: for part in _rel(path).split("/"): if part.startswith("sub-"): return part return None def _first_ses_from_path(path: str) -> Optional[str]: for part in _rel(path).split("/"): if part.startswith("ses-"): return part return None def _age_ok_for_dir(sub: str, ses: Optional[str]) -> bool: # No filtering requested if scan_args.min_age_months is None and scan_args.max_age_months is None: return True # Prefer TSV lookup if layout == "long": age_mo = age_lut.get((sub, ses)) if ses else None else: age_mo = age_lut.get(sub) # Fallback: parse ses-XXmo if not in TSV if age_mo is None and ses: if ses.endswith("wk"): return False if ses.endswith("mo"): try: age_mo = int(ses[len("ses-") : -len("mo")]) except Exception: age_mo = None if age_mo is None: return False if scan_args.min_age_months is not None and age_mo < scan_args.min_age_months: return False if scan_args.max_age_months is not None and age_mo > scan_args.max_age_months: return False return True # --- Excludes ------------------------------------------------------------ yaml_file = os.path.join(root, "code", "qc", "raw", scan_args.exclude_file) raw = load_excludes(yaml_file) if os.path.exists(yaml_file) else [] runs = [e for e in raw if "_run-" in e] gens = [e for e in raw if "_run-" not in e] patterns = expand_excludes(gens) # Add run-level exact patterns for run_id in runs: session_key, _ = run_id.rsplit("_run-", 1) session_path = session_key.replace("_ses-", "/ses-") if scan_args.modality in ("T1w", "T2w"): pat = f"{session_path}/anat/{run_id}_{scan_args.modality}.nii.gz" else: pat = f"**/{run_id}_*.nii.gz" patterns.append(pat) found: Set[str] = set() # --- T1w/T2w (anat) branch ---------------------------------------------- if scan_args.modality in ("T1w", "T2w"): anat_glob = ( os.path.join(root, "sub-*", "ses-*", "anat") if layout == "long" else os.path.join(root, "sub-*", "anat") ) for anat_dir in glob.glob(anat_glob): sub = _first_sub_from_path(anat_dir) if sub is None: continue if allowed_subs and (sub not in allowed_subs): continue ses = _first_ses_from_path(anat_dir) if layout == "long" else None if not _age_ok_for_dir(sub, ses): continue for img in glob.glob(os.path.join(anat_dir, f"*_{scan_args.modality}.nii.gz")): if is_excluded(img, patterns, root): continue if allowed_subs: sub_img = _first_sub_from_path(img) if sub_img not in allowed_subs: continue found.add(os.path.abspath(img)) # --- Other modalities (recursive pattern) -------------------------------- else: for img in glob.glob(os.path.join(root, glob_suffix), recursive=True): sub = _first_sub_from_path(img) if sub is None: continue if allowed_subs and (sub not in allowed_subs): continue ses = _first_ses_from_path(img) if layout == "long" else None if not _age_ok_for_dir(sub, ses): continue if is_excluded(img, patterns, root): continue found.add(os.path.abspath(img)) print(f" found {len(found)} images") return found
# ----------------------------------------------------------------------------- # Subject filter # -----------------------------------------------------------------------------
[docs] def norm_sub_id(s: str) -> str: """Normalize subject IDs provided by the user into ``sub-...`` format.""" s = str(s).strip().strip('"').strip("'") return s if s.startswith("sub-") else f"sub-{s}"
[docs] def load_subjects(subjects_list: Optional[Sequence[str]], subjects_file: Optional[str]) -> Set[str]: """ Load allowed subjects from CLI arguments. Parameters ---------- subjects_list: Subject IDs provided directly on the command line. subjects_file: Optional file containing one subject ID per line. Returns ------- set[str] Allowed subject IDs. """ subs: Set[str] = set() if subjects_list: subs.update(norm_sub_id(x) for x in subjects_list) if subjects_file and os.path.exists(subjects_file): with open(subjects_file) as f: for line in f: line = line.strip() if line: subs.add(norm_sub_id(line)) return subs
# ----------------------------------------------------------------------------- # CLI # -----------------------------------------------------------------------------
[docs] def build_argparser() -> argparse.ArgumentParser: """Build CLI argument parser.""" parser = argparse.ArgumentParser(description="List BIDS images with per-dataset layout") parser.add_argument("bids_dirs", nargs="+", help="BIDS root directories") parser.add_argument( "-l", "--layouts", nargs="+", choices=["long", "cross"], required=True, help="Layout per dir: 'long' or 'cross'", ) parser.add_argument( "-e", "--exclude-file", default="exclude.yaml", help="YAML file listing identifiers to skip (relative to code/qc/raw/)", ) parser.add_argument( "-m", "--modality", choices=["T1w", "T2w", "FLAIR", "bold", "dwi"], required=True, help="Suffix to include (e.g. T1w, dwi)", ) parser.add_argument("-p", "--pattern", default=None, help="Override glob pattern for non-anat scans") parser.add_argument("-o", "--output", required=True, help="Output .txt file") parser.add_argument("--min-age-months", type=int, default=None, help="Exclude any session younger than this (months)") parser.add_argument("--max-age-months", type=int, default=None, help="Exclude any session older than this (months)") parser.add_argument("--subjects", nargs="*", default=None, help="Explicit list of subject IDs (sub-XXXX). If set, only these subjects are considered.") parser.add_argument("--subjects-file", default=None, help="Text file with one subject ID per line; combined with --subjects if both given.") parser.add_argument("--age-tsv", default=None, help="Use age from this TSV/CSV (default: <bids_root>/participants.tsv)") parser.add_argument("--age-pid-col", default="participant_id", help="Column name for participant id in age TSV (default: participant_id)") parser.add_argument("--age-ses-col", default="session", help="Column name for session in age TSV (default: session)") parser.add_argument("--age-col", default="age", help="Column name for age in age TSV (default: age)") parser.add_argument("--age-units", choices=["years", "months"], default="years", help="Units of the age column (default: years)") return parser
[docs] def main(argv: Optional[Sequence[str]] = None) -> int: """ CLI entrypoint. Returns ------- int Exit code (0 on success). """ parser = build_argparser() args = parser.parse_args(argv) if len(args.layouts) != len(args.bids_dirs): parser.error("--layouts must match number of bids_dirs") allowed_subs = load_subjects(args.subjects, args.subjects_file) scan_args = ScanArgs( exclude_file=args.exclude_file, modality=args.modality, min_age_months=args.min_age_months, max_age_months=args.max_age_months, pattern=args.pattern, ) # Build one age lookup per root age_lookups: Dict[str, Dict[Union[str, Tuple[str, str]], float]] = {} for root, layout in zip(args.bids_dirs, args.layouts): root_abs = os.path.abspath(root) age_lookups[root_abs] = load_age_lookup( bids_root=root_abs, layout=layout, tsv_path=args.age_tsv, pid_col=args.age_pid_col, ses_col=args.age_ses_col, age_col=args.age_col, age_units=args.age_units, ) # Collect matches final: Set[str] = set() for root, layout in zip(args.bids_dirs, args.layouts): root_abs = os.path.abspath(root) if args.pattern: glob_s = args.pattern elif args.modality not in ("T1w", "T2w") and args.modality: glob_s = ( f"sub-*/ses-*/*/*_{args.modality}.nii.gz" if layout == "long" else f"sub-*/*/*_{args.modality}.nii.gz" ) else: glob_s = "**/*.nii.gz" final |= process_dir( root_abs, glob_s, layout, scan_args, allowed_subs=allowed_subs, age_lut=age_lookups.get(root_abs, {}), ) print(f"Writing {len(final)} paths to {args.output}") with open(args.output, "w") as out: for pth in sorted(final): out.write(f'"{pth}"\n') return 0
if __name__ == "__main__": raise SystemExit(main())