Skip to content

API Reference

Auto-generated reference for the cardio_risk_rf package via mkdocstrings. Each module below lists its public classes and functions with their docstrings and type signatures.

Package root

cardio_risk_rf

Production-grade cardiovascular risk tabular classifier.

Data

cardio

sulianova Cardiovascular Disease dataset loader + stratified split.

Canonical main-pipeline dataset for cardio-risk-rf. 70000 patients, binary target cardio at ~50/50 positive rate. See https://www.kaggle.com/datasets/sulianova/cardiovascular-disease-dataset

Raw columns (source CSV): id;age;gender;height;weight;ap_hi;ap_lo;cholesterol;gluc;smoke;alco;active;cardio

  • id is dropped on load.
  • age is converted from days (18000-23000) to integer years (32-65) during load so downstream pipelines, SHAP plots, and the serving API all work in a human-readable unit.
  • gender is left as 1/2 (source encoding) — LightGBM and RandomForest treat it as a numeric feature without issue.
  • cholesterol and gluc are ordinal (1=normal, 2=above normal, 3=well above normal); kept as numeric for trees.
  • smoke, alco, active are binary 0/1.
  • ap_hi/ap_lo are systolic/diastolic BP (mmHg).
  • Target cardio is 0 (no CVD) / 1 (CVD).

Functions

load_cardio
load_cardio(csv_path: str | Path) -> pd.DataFrame

Read the sulianova Cardiovascular Disease CSV.

The source file is ;-separated; some Kaggle forks reshuffle to , — both are auto-detected. id is dropped. age is converted from days to integer years. Column order is stabilised to FEATURES + [TARGET].

Source code in src/cardio_risk_rf/data/cardio.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def load_cardio(csv_path: str | Path) -> pd.DataFrame:
    """Read the sulianova Cardiovascular Disease CSV.

    The source file is `;`-separated; some Kaggle forks reshuffle to `,` —
    both are auto-detected. `id` is dropped. `age` is converted from days
    to integer years. Column order is stabilised to FEATURES + [TARGET].
    """
    path = Path(csv_path)
    with path.open("r", encoding="utf-8") as fh:
        first_line = fh.readline()
    sep = ";" if first_line.count(";") > first_line.count(",") else ","
    df = pd.read_csv(path, sep=sep)

    if "id" in df.columns:
        df = df.drop(columns=["id"])

    missing = {TARGET, *FEATURES} - set(df.columns)
    if missing:
        raise ValueError(f"sulianova CSV missing columns: {sorted(missing)}")

    df["age"] = (df["age"] / 365.25).round().astype("int64")
    df = df[[*FEATURES, TARGET]].copy()
    df[TARGET] = df[TARGET].astype("int64")
    return df
split_stratified
split_stratified(
    df: DataFrame,
    *,
    seed: int = 42,
    train_ratio: float = 0.7,
    val_ratio: float = 0.15,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]

Return (train, val, test) with stratification on the target column.

Source code in src/cardio_risk_rf/data/cardio.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def split_stratified(
    df: pd.DataFrame,
    *,
    seed: int = 42,
    train_ratio: float = 0.70,
    val_ratio: float = 0.15,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """Return (train, val, test) with stratification on the target column."""
    if not 0 < train_ratio < 1:
        raise ValueError(f"train_ratio out of range: {train_ratio}")
    if not 0 < val_ratio < 1 - train_ratio:
        raise ValueError(f"val_ratio out of range: {val_ratio}")
    test_ratio = 1.0 - train_ratio - val_ratio

    train_df, rest = train_test_split(
        df,
        test_size=1 - train_ratio,
        stratify=df[TARGET],
        random_state=seed,
    )
    val_df, test_df = train_test_split(
        rest,
        test_size=test_ratio / (val_ratio + test_ratio),
        stratify=rest[TARGET],
        random_state=seed,
    )
    return (
        train_df.reset_index(drop=True),
        val_df.reset_index(drop=True),
        test_df.reset_index(drop=True),
    )

dataset

Dataset implementations.

Functions

load_dataset
load_dataset(csv_path: Path | str) -> pd.DataFrame

Load a CSV into a dataframe.

Source code in src/cardio_risk_rf/data/dataset.py
10
11
12
def load_dataset(csv_path: Path | str) -> pd.DataFrame:
    """Load a CSV into a dataframe."""
    return pd.read_csv(csv_path)

framingham

Framingham Heart Study loader + stratified split.

Dataset columns (as published on Kaggle): male, age, education, currentSmoker, cigsPerDay, BPMeds, prevalentStroke, prevalentHyp, diabetes, totChol, sysBP, diaBP, BMI, heartRate, glucose, TenYearCHD (target).

Functions

load_framingham
load_framingham(csv_path: str | Path) -> pd.DataFrame

Read the Framingham CSV and return it with a stable column order.

Source code in src/cardio_risk_rf/data/framingham.py
36
37
38
39
40
41
42
43
44
def load_framingham(csv_path: str | Path) -> pd.DataFrame:
    """Read the Framingham CSV and return it with a stable column order."""
    df = pd.read_csv(csv_path)
    missing = {TARGET, *FEATURES} - set(df.columns)
    if missing:
        raise ValueError(f"Framingham CSV missing columns: {sorted(missing)}")
    df = df[[*FEATURES, TARGET]].copy()
    df[TARGET] = df[TARGET].astype("int64")
    return df
split_stratified
split_stratified(
    df: DataFrame,
    *,
    seed: int = 42,
    train_ratio: float = 0.7,
    val_ratio: float = 0.15,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]

Return (train, val, test) with stratification on the target column.

Source code in src/cardio_risk_rf/data/framingham.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def split_stratified(
    df: pd.DataFrame,
    *,
    seed: int = 42,
    train_ratio: float = 0.70,
    val_ratio: float = 0.15,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """Return (train, val, test) with stratification on the target column."""
    if not 0 < train_ratio < 1:
        raise ValueError(f"train_ratio out of range: {train_ratio}")
    if not 0 < val_ratio < 1 - train_ratio:
        raise ValueError(f"val_ratio out of range: {val_ratio}")
    test_ratio = 1.0 - train_ratio - val_ratio

    train_df, rest = train_test_split(
        df,
        test_size=1 - train_ratio,
        stratify=df[TARGET],
        random_state=seed,
    )
    val_df, test_df = train_test_split(
        rest,
        test_size=test_ratio / (val_ratio + test_ratio),
        stratify=rest[TARGET],
        random_state=seed,
    )
    return (
        train_df.reset_index(drop=True),
        val_df.reset_index(drop=True),
        test_df.reset_index(drop=True),
    )

prepare

CLI to produce data/processed/{train,val,test}.parquet from raw CSV.

Functions

Models

factory

Build the two production sklearn Pipelines: main (LGBM) + baseline (RF).

Functions

build_main
build_main(
    *, scale_pos_weight: float, random_state: int = 42, **lgbm_overrides: Any
) -> Pipeline

LightGBM classifier; passthrough preprocessing — native NaN handling.

Source code in src/cardio_risk_rf/models/factory.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def build_main(
    *,
    scale_pos_weight: float,
    random_state: int = 42,
    **lgbm_overrides: Any,
) -> Pipeline:
    """LightGBM classifier; passthrough preprocessing — native NaN handling."""
    params: dict[str, Any] = {
        "objective": "binary",
        "metric": "average_precision",
        "n_estimators": 500,
        "learning_rate": 0.05,
        "num_leaves": 31,
        "max_depth": -1,
        "min_child_samples": 20,
        "subsample": 0.85,
        "subsample_freq": 1,
        "colsample_bytree": 0.85,
        "reg_alpha": 0.0,
        "reg_lambda": 0.0,
        "class_weight": None,
        "scale_pos_weight": float(scale_pos_weight),
        "random_state": random_state,
        "n_jobs": -1,
        "verbosity": -1,
    }
    params.update(lgbm_overrides)
    return Pipeline(steps=[("clf", LGBMClassifier(**params))])
build_baseline
build_baseline(*, random_state: int = 42, **rf_overrides: Any) -> Pipeline

RandomForest baseline; median imputation because RF cannot split on NaN.

Source code in src/cardio_risk_rf/models/factory.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def build_baseline(
    *,
    random_state: int = 42,
    **rf_overrides: Any,
) -> Pipeline:
    """RandomForest baseline; median imputation because RF cannot split on NaN."""
    params: dict[str, Any] = {
        "n_estimators": 500,
        "max_depth": None,
        "min_samples_split": 2,
        "min_samples_leaf": 1,
        "max_features": "sqrt",
        "class_weight": "balanced",
        "random_state": random_state,
        "n_jobs": -1,
    }
    params.update(rf_overrides)
    return Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
            ("clf", RandomForestClassifier(**params)),
        ]
    )

sklearn_pipeline

scikit-learn pipeline builder.

Functions

build_pipeline
build_pipeline(model_name: str = 'lgbm', **model_params: Any) -> Pipeline

Build an sklearn Pipeline by model name.

Source code in src/cardio_risk_rf/models/sklearn_pipeline.py
13
14
15
16
17
18
19
20
21
def build_pipeline(model_name: str = "lgbm", **model_params: Any) -> Pipeline:
    """Build an sklearn Pipeline by model name."""
    if model_name == "lgbm":
        clf = lgb.LGBMClassifier(random_state=42, **model_params)
    elif model_name == "random_forest":
        clf = RandomForestClassifier(random_state=42, **model_params)
    else:
        raise ValueError(f"Unknown model: {model_name}")
    return Pipeline([("scaler", StandardScaler()), ("clf", clf)])

Training

train

Training orchestration for main (LightGBM + Optuna) and baseline (RF + GridSearchCV).

Functions

Evaluation

calibration

Reliability diagram for probabilistic binary predictions.

Functions

save_calibration_plot
save_calibration_plot(
    y_true: ndarray,
    probs: ndarray,
    out_path: str | Path,
    *,
    bins: int = 10,
    title: str = "Calibration curve",
) -> None

Write a reliability-diagram PNG.

Source code in src/cardio_risk_rf/evaluation/calibration.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def save_calibration_plot(
    y_true: np.ndarray,
    probs: np.ndarray,
    out_path: str | Path,
    *,
    bins: int = 10,
    title: str = "Calibration curve",
) -> None:
    """Write a reliability-diagram PNG."""
    frac_pos, mean_pred = calibration_curve(y_true, probs, n_bins=bins, strategy="quantile")
    fig, ax = plt.subplots(figsize=(5, 5), dpi=120)
    ax.plot([0, 1], [0, 1], linestyle="--", color="gray", label="Perfect")
    ax.plot(mean_pred, frac_pos, marker="o", linewidth=2, label="Model")
    ax.set_xlabel("Predicted probability")
    ax.set_ylabel("Observed positive rate")
    ax.set_title(title)
    ax.set_xlim(0.0, 1.0)
    ax.set_ylim(0.0, 1.0)
    ax.legend(loc="lower right")
    ax.grid(True, alpha=0.3)
    fig.tight_layout()
    out_path = Path(out_path)
    out_path.parent.mkdir(parents=True, exist_ok=True)
    fig.savefig(out_path)
    plt.close(fig)

evaluate

CLI: score a trained model on a Parquet split.

Functions

metrics

Binary classification metrics for the tabular pipeline.

Functions

compute_metrics
compute_metrics(
    y_true: ndarray, probs: ndarray, *, threshold: float = 0.5
) -> dict[str, Any]

Return a flat dict with ROC-AUC / PR-AUC / F1 / Brier for reporting.

Source code in src/cardio_risk_rf/evaluation/metrics.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def compute_metrics(
    y_true: np.ndarray,
    probs: np.ndarray,
    *,
    threshold: float = 0.5,
) -> dict[str, Any]:
    """Return a flat dict with ROC-AUC / PR-AUC / F1 / Brier for reporting."""
    y_true = np.asarray(y_true).astype(int)
    probs = np.asarray(probs).astype(float)
    if y_true.shape != probs.shape:
        raise ValueError(f"shape mismatch: {y_true.shape} vs {probs.shape}")
    if y_true.size == 0:
        raise ValueError("empty y_true")

    preds = (probs >= threshold).astype(int)
    return {
        "roc_auc": float(roc_auc_score(y_true, probs)),
        "pr_auc": float(average_precision_score(y_true, probs)),
        "f1": float(f1_score(y_true, preds)),
        "brier": float(brier_score_loss(y_true, probs)),
        "threshold": float(threshold),
        "n": int(y_true.size),
        "positive_rate": float(y_true.mean()),
    }

summary

Merge main + baseline metrics into a single summary JSON.

Inference

predict

Inference CLI — load a checkpoint and predict on input(s).

Functions

load_model
load_model(path: str | Path) -> Any

Load a joblib checkpoint from disk.

Source code in src/cardio_risk_rf/inference/predict.py
17
18
19
def load_model(path: str | Path) -> Any:
    """Load a joblib checkpoint from disk."""
    return joblib.load(path)
predict
predict(model: Any, features: dict[str, Any]) -> dict[str, Any]

Run inference on a single feature mapping and return pred + class probabilities.

Source code in src/cardio_risk_rf/inference/predict.py
22
23
24
25
26
27
28
29
def predict(model: Any, features: dict[str, Any]) -> dict[str, Any]:
    """Run inference on a single feature mapping and return pred + class probabilities."""
    import pandas as pd

    x = pd.DataFrame([features]).astype(float)
    proba = model.predict_proba(x)[0].tolist()
    pred = int(model.predict(x)[0])
    return {"pred": pred, "proba": proba}
main
main() -> None

CLI entry point — parse args, load model, predict, print JSON.

Source code in src/cardio_risk_rf/inference/predict.py
32
33
34
35
36
37
38
39
40
41
def main() -> None:
    """CLI entry point — parse args, load model, predict, print JSON."""
    parser = argparse.ArgumentParser()
    parser.add_argument("--checkpoint", required=True)
    parser.add_argument("--input", required=True)
    args = parser.parse_args()
    configure_logging()
    model = load_model(args.checkpoint)
    result = predict(model, args.input)
    print(json.dumps(result, indent=2))

Explainability

explain

SHAP wrappers for the LightGBM main model and RF baseline.

TreeExplainer supports both natively. For global reports we pass a sample of val or test; for per-instance serving we pass a single row.

Serving

dependencies

Dependency injection — singleton model loader.

Functions

get_model cached
get_model() -> Any

Singleton-load the checkpoint from MODEL_PATH.

Source code in src/cardio_risk_rf/serving/dependencies.py
15
16
17
18
19
20
21
22
@lru_cache(maxsize=1)
def get_model() -> Any:
    """Singleton-load the checkpoint from `MODEL_PATH`."""
    ckpt = os.environ.get("MODEL_PATH")
    if not ckpt:
        raise RuntimeError("MODEL_PATH env var not set")
    log.info("model.load", path=ckpt)
    return load_model(ckpt)

errors

Exception types and handlers.

main

FastAPI application.

Functions

lifespan async
lifespan(app: FastAPI) -> AsyncIterator[None]

FastAPI lifespan — eagerly load the model (best-effort) on startup.

Source code in src/cardio_risk_rf/serving/main.py
25
26
27
28
29
30
31
32
33
34
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    """FastAPI lifespan — eagerly load the model (best-effort) on startup."""
    configure_logging(json_output=True)
    try:
        get_model()
        log.info("startup.model_loaded")
    except Exception as exc:
        log.warning("startup.model_not_loaded", error=str(exc))
    yield
add_request_id async
add_request_id(
    request: Request, call_next: Callable[[Request], Awaitable[Response]]
) -> Response

Middleware — inject a UUID request id and echo it back as X-Request-ID.

Source code in src/cardio_risk_rf/serving/main.py
47
48
49
50
51
52
53
54
55
@app.middleware("http")
async def add_request_id(
    request: Request, call_next: Callable[[Request], Awaitable[Response]]
) -> Response:
    """Middleware — inject a UUID request id and echo it back as `X-Request-ID`."""
    request.state.request_id = str(uuid.uuid4())
    response = await call_next(request)
    response.headers["X-Request-ID"] = request.state.request_id
    return response

routes

FastAPI routes: /health and /predict (main + baseline by query param).

Classes

schemas

Pydantic request/response schemas for /predict.

Fields match the sulianova Cardiovascular Disease Dataset schema (11 features + binary cardio target). age is in years (source data is in days; converted at load time in data/cardio.py).

Classes

PatientFeatures

Bases: BaseModel

Single-patient feature payload; 11 sulianova cardio-risk features.

ShapEntry

Bases: BaseModel

Single SHAP contribution for one feature.

PredictionResponse

Bases: BaseModel

Response schema for /predict.

Classes
Config

Pydantic configuration — allow populating cls by field or alias.

Utilities

hf_hub

HuggingFace Hub helpers.

logging

Structured logging configuration.

Functions

configure_logging
configure_logging(level: str = 'INFO', json_output: bool = False) -> None

Initialise stdlib + structlog with JSON or console rendering.

Source code in src/cardio_risk_rf/utils/logging.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def configure_logging(level: str = "INFO", json_output: bool = False) -> None:
    """Initialise stdlib + structlog with JSON or console rendering."""
    logging.basicConfig(format="%(message)s", stream=sys.stdout, level=level.upper())
    processors: list[Any] = [
        structlog.contextvars.merge_contextvars,
        structlog.stdlib.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
    ]
    if json_output:
        processors.append(structlog.processors.JSONRenderer())
    else:
        processors.append(structlog.dev.ConsoleRenderer())
    structlog.configure(
        processors=processors,
        wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, level.upper())),
        context_class=dict,
        logger_factory=structlog.PrintLoggerFactory(),
        cache_logger_on_first_use=True,
    )
get_logger
get_logger(name: str | None = None) -> structlog.stdlib.BoundLogger

Return a structlog bound logger; call after configure_logging.

Source code in src/cardio_risk_rf/utils/logging.py
34
35
36
37
def get_logger(name: str | None = None) -> structlog.stdlib.BoundLogger:
    """Return a structlog bound logger; call after `configure_logging`."""
    logger: structlog.stdlib.BoundLogger = structlog.get_logger(name)
    return logger

seed

Deterministic seeding across libraries.

Functions

seed_everything
seed_everything(seed: int = 42) -> None

Seed Python, NumPy, and (optionally) PyTorch for deterministic behaviour.

Source code in src/cardio_risk_rf/utils/seed.py
11
12
13
14
15
16
17
18
19
20
21
22
23
def seed_everything(seed: int = 42) -> None:
    """Seed Python, NumPy, and (optionally) PyTorch for deterministic behaviour."""
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    try:
        import torch  # type: ignore[import-not-found]  # template stub; revisit in backport

        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
    except ImportError:
        pass