diff --git a/python/README.md b/python/README.md index 0bbdfa4..3f90165 100644 --- a/python/README.md +++ b/python/README.md @@ -13,10 +13,9 @@ To compare this approach with Ansible or Terraform, see > **Catalog:** [`catalog.yaml`](../catalog.yaml) lists every Python example with > prerequisites, inputs, and outputs. Sections below follow the same format. -> **Note:** These scripts are runnable illustrations, not a tested library. -> There are no unit tests by design - CI validates only lint and formatting -> via Ruff. When you adapt a script for production, add tests appropriate -> to your environment. +> **Note:** These scripts are runnable illustrations. Unit tests live in +> `Unit_tests/` and can be run with `pytest Unit_tests/`. CI validates lint +> and formatting via Ruff in addition to running the test suite. --- @@ -47,9 +46,10 @@ All scripts read connection details from environment variables. export ONTAP_HOST=10.0.0.1 # cluster management LIF export ONTAP_USER=admin # default: admin export ONTAP_PASS=your_password +export ONTAP_TIMEOUT=30 # optional: request timeout in seconds (default: 90) ``` -Or use an env file: +Or use an env file and pass it to scripts that support `--env-file`: ```bash # cluster.env @@ -59,13 +59,46 @@ ONTAP_PASS=your_password ``` ```bash +# Linux / macOS set -a && source cluster.env && set +a + +# Windows PowerShell +Get-Content cluster.env | ForEach-Object { + if ($_ -match '^([^#][^=]*)=(.*)$') { [System.Environment]::SetEnvironmentVariable($Matches[1].Trim(), $Matches[2].Trim()) } +} +``` + +Scripts that accept `--env-file` (e.g. `cluster_setup_basic.py`) can also load +the file directly: + +```bash +python cluster_setup_basic.py --env-file cluster.env ``` > SSL verification is disabled by default to support environments that use > self-signed certificates. We recommend setting `ONTAP_VERIFY_SSL=true` > once CA-signed certificates are in place. +### SSL / CA-bundle configuration + +| Knob | Description | +|---|---| +| Default | `verify_ssl=False` — SSL certificate errors are suppressed so scripts work out-of-the-box with self-signed certs. | +| `ONTAP_VERIFY_SSL=true` | Enable full certificate verification (recommended for production). | +| `REQUESTS_CA_BUNDLE` | Path to a custom CA bundle PEM file when your cluster uses an internal/private CA. | + +To enable verification with a custom CA bundle: + +```bash +export ONTAP_VERIFY_SSL=true +export REQUESTS_CA_BUNDLE=/path/to/your/ca-bundle.pem +python cluster_info.py +``` + +For more details and common SSL errors, see the +[SSL / TLS errors section](../docs/troubleshooting.md#ssl--tls-errors) in the +troubleshooting guide. + --- ## Examples @@ -313,13 +346,13 @@ python snapmirror_cleanup_test_failover.py |---|---| | `ontap_client.py` | Reusable ONTAP REST client (session management, auth, polling, error handling) | | `cluster_info.py` | Get cluster version + node list | +| `cluster_setup_basic.py` | Create a new ONTAP cluster from two pre-cluster nodes | | `nfs_provision.py` | Create NFS volume with export policy | -| `cifs_provision.py` | Create CIFS share with volume and ACL | -| `cluster_setup_basic.py` | Create cluster from two pre-cluster nodes | -| `snapmirror_provision_src_managed.py` | SnapMirror provision (source-managed view) | -| `snapmirror_provision_dest_managed.py` | SnapMirror provision (destination-managed view) | -| `snapmirror_test_failover.py` | SnapMirror test failover via FlexClone | -| `snapmirror_cleanup_test_failover.py` | Clean up test failover clone | +| `cifs_provision.py` | Create CIFS/SMB share (optionally create CIFS server) | +| `snapmirror_provision_src_managed.py` | Provision a SnapMirror relationship from the source cluster | +| `snapmirror_provision_dest_managed.py` | Provision a SnapMirror relationship from the destination cluster | +| `snapmirror_test_failover.py` | Create a FlexClone of the SnapMirror destination for test failover | +| `snapmirror_cleanup_test_failover.py` | Delete the FlexClone created by a test failover | | `requirements.txt` | Python dependencies | ## Code Patterns @@ -328,8 +361,14 @@ These scripts demonstrate several patterns you can reuse: - **`OntapClient.from_env()`** - builds a configured client from environment variables so credentials never appear in code -- **`client.poll_job(uuid)`** - polls an async ONTAP job until completion with - configurable interval and timeout +- **`client.poll_job(uuid)`** - polls an async ONTAP job until completion; + accepts keyword args `interval` (seconds between polls, default 5) and + `timeout` (max seconds to wait, default 300); raises `RuntimeError` on + job failure and `TimeoutError` on timeout +- **`client.wait_snapmirrored(rel_uuid)`** - polls a SnapMirror relationship + until its state reaches `snapmirrored`; accepts `interval` and `max_wait` +- **`client.update_auth(username, password)`** - replaces session credentials + mid-workflow (used by `cluster_setup_basic.py` after cluster creation) - **Context manager** - `with OntapClient.from_env() as client:` ensures the HTTP session is properly closed - **Structured logging** - all output goes through `logging`, not `print()`, diff --git a/python/cifs_provision.py b/python/cifs_provision.py index 4dfe231..fd90579 100644 --- a/python/cifs_provision.py +++ b/python/cifs_provision.py @@ -27,9 +27,8 @@ import logging import os import sys -from pathlib import Path -from ontap_client import OntapApiError, OntapClient +from ontap_client import OntapApiError, OntapClient, load_env_file logging.basicConfig( level=logging.INFO, @@ -38,16 +37,15 @@ logger = logging.getLogger(__name__) ENV = { - "ONTAP_HOST": "", # cluster management IP — set here or via ONTAP_HOST env var + "ONTAP_HOST": "", # cluster management IP — set via ONTAP_HOST env var "ONTAP_USER": "admin", "ONTAP_PASS": "", # never hardcode — set via ONTAP_PASS env var - "SVM_NAME": "vs1", - "VOLUME_NAME": "vol_002", + "SVM_NAME": "vs0", + "VOLUME_NAME": "volume", "VOLUME_SIZE": "100MB", "AGGR_NAME": "", # required — set via --aggregate or AGGR_NAME env var - "CLIENT_MATCH": "0.0.0.0/0", # required — set via --client-match or CLIENT_MATCH env var - "SHARE_NAME": "cifs_share_demo", - "SHARE_COMMENT": "Provisioned by orchestrio", + "SHARE_NAME": "cifs_test", + "SHARE_COMMENT": "Provisioned by PACE", "ACL_USER": "Everyone", "ACL_PERMISSION": "full_control", "CIFS_SERVER_NAME": "ONTAP-CIFS", @@ -55,23 +53,6 @@ } -def _load_env_file(path: str) -> None: - """Load KEY=VALUE pairs from an env file into os.environ (dotenv style).""" - p = Path(path) - if not p.is_file(): - logger.error("Env file not found: %s", path) - sys.exit(1) - for lineno, raw in enumerate(p.read_text().splitlines(), start=1): - line = raw.strip() - if not line or line.startswith("#"): - continue - if "=" not in line: - logger.error("Env file %s line %d: expected KEY=VALUE, got: %s", path, lineno, line) - sys.exit(1) - key, _, value = line.partition("=") - os.environ.setdefault(key.strip(), value.strip()) - - def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description="Provision a CIFS share") p.add_argument( @@ -114,33 +95,41 @@ def _pick(cli_val: str | None, env_key: str, default: str = "") -> str: return cli_val or os.environ.get(env_key) or ENV.get(env_key, "") or default -def _resolve_config(args: argparse.Namespace) -> dict[str, str | bool]: - """Load env file and CLI args, then return the resolved configuration dict.""" +def _require(value: str, flag: str, env_key: str) -> str: + """Return *value* or exit with a clear error if it is empty.""" + if not value: + logger.error("--%s is required (or set %s in env / --env-file)", flag, env_key) + sys.exit(1) + return value + + +def _resolve_config(args: argparse.Namespace) -> tuple[dict[str, str], bool]: + """Load env file and CLI args, then return (str_config, create_cifs_server). + + Returns a tuple of: + - A ``dict[str, str]`` with all string config values. + - A ``bool`` indicating whether to auto-create the CIFS server. + """ if args.env_file: - _load_env_file(args.env_file) + load_env_file(args.env_file) for key, value in ENV.items(): if value and key not in os.environ: os.environ[key] = value - aggregate = _pick(args.aggregate, "AGGR_NAME") - if not aggregate: - logger.error("--aggregate is required (or set AGGR_NAME in env / --env-file)") - sys.exit(1) - - return { - "svm": _pick(args.svm, "SVM_NAME", "vs0"), - "volume": _pick(args.volume, "VOLUME_NAME", "cifs_test_env"), + cfg: dict[str, str] = { + "aggregate": _require(_pick(args.aggregate, "AGGR_NAME"), "aggregate", "AGGR_NAME"), + "svm": _require(_pick(args.svm, "SVM_NAME"), "svm", "SVM_NAME"), + "volume": _require(_pick(args.volume, "VOLUME_NAME"), "volume", "VOLUME_NAME"), + "share_name": _require(_pick(args.share_name, "SHARE_NAME"), "share-name", "SHARE_NAME"), "size": _pick(args.size, "VOLUME_SIZE", "100MB"), - "aggregate": aggregate, - "share_name": _pick(args.share_name, "SHARE_NAME", "cifs_share_demo"), - "share_comment": _pick(args.share_comment, "SHARE_COMMENT", "Provisioned by orchestrio"), + "share_comment": _pick(args.share_comment, "SHARE_COMMENT", "Provisioned by PACE"), "acl_user": _pick(args.acl_user, "ACL_USER", "Everyone"), "acl_permission": _pick(args.acl_permission, "ACL_PERMISSION", "full_control"), - "create_cifs_server": args.create_cifs_server, "cifs_server_name": _pick(args.cifs_server_name, "CIFS_SERVER_NAME", "ONTAP-CIFS"), "workgroup": _pick(args.workgroup, "CIFS_WORKGROUP", "WORKGROUP"), } + return cfg, bool(args.create_cifs_server) def _ensure_cifs_server( @@ -302,7 +291,8 @@ def _verify_and_log_acls(client: OntapClient, svm_uuid: str, share_name: str) -> def main() -> None: - cfg = _resolve_config(parse_args()) + """Resolve configuration, then orchestrate CIFS server, volume, share, and ACL setup.""" + cfg, create_cifs_server = _resolve_config(parse_args()) svm = cfg["svm"] volume = cfg["volume"] size = cfg["size"] @@ -314,7 +304,7 @@ def main() -> None: with OntapClient.from_env() as client: _ensure_cifs_server( - client, svm, cfg["create_cifs_server"], cfg["cifs_server_name"], cfg["workgroup"] + client, svm, create_cifs_server, cfg["cifs_server_name"], cfg["workgroup"] ) job_result = _ensure_volume_ntfs(client, svm, volume, size, aggregate) diff --git a/python/cluster_info.py b/python/cluster_info.py index 6b29ec5..d7edff3 100644 --- a/python/cluster_info.py +++ b/python/cluster_info.py @@ -3,11 +3,12 @@ # SPDX-License-Identifier: Apache-2.0 # See the NOTICE file in the repo root for trademark and attribution details. -"""Retrieve storage cluster version and list all nodes with serial numbers. +"""Retrieve storage cluster version, nodes, and aggregates. Steps: - 1. GET /cluster — retrieve cluster name and ONTAP version - 2. GET /cluster/nodes — list all nodes with serial numbers + 1. GET /cluster — cluster name, ONTAP version, contact, location + 2. GET /cluster/nodes — list all nodes with serial numbers + 3. GET /storage/aggregates — list aggregates with state and used space Prerequisites:: @@ -34,13 +35,16 @@ def main() -> None: + """Retrieve cluster version and print all node names with serial numbers.""" with OntapClient.from_env() as client: - # Step 1 — cluster version - cluster = client.get("/cluster", fields="name,version") + # Step 1 — cluster version, contact, and location + cluster = client.get("/cluster", fields="name,version,contact,location") logger.info( - "Cluster: %s — ONTAP %s", + "Cluster: %s — ONTAP %s contact=%s location=%s", cluster.get("name", "unknown"), cluster.get("version", {}).get("full", "unknown"), + cluster.get("contact", "—"), + cluster.get("location", "—"), ) # Step 2 — node list with serial numbers @@ -55,6 +59,23 @@ def main() -> None: node.get("serial_number", "—"), ) + # Step 3 — aggregate list with state and used space + aggr_resp = client.get("/storage/aggregates", fields="name,state,space.block_storage") + aggr_records = aggr_resp.get("records", []) + logger.info("Aggregates in cluster: %d", aggr_resp.get("num_records", len(aggr_records))) + + for aggr in aggr_records: + block = aggr.get("space", {}).get("block_storage", {}) + total = block.get("size", 0) + used = block.get("used", 0) + used_pct = (used / total * 100) if total else 0 + logger.info( + " %-30s state: %-10s used: %.1f%%", + aggr.get("name", "—"), + aggr.get("state", "—"), + used_pct, + ) + if __name__ == "__main__": try: diff --git a/python/cluster_setup_basic.py b/python/cluster_setup_basic.py index 3c7c250..88c6299 100644 --- a/python/cluster_setup_basic.py +++ b/python/cluster_setup_basic.py @@ -2,7 +2,7 @@ # © 2026 NetApp, Inc. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 # See the NOTICE file in the repo root for trademark and attribution details. -"""Create a storage cluster from two pre-cluster nodes. +"""Create a storage cluster from two pre-cluster nodes (ONTAP 9 unified). Steps: 1. discover_nodes — GET /api/cluster/nodes (membership=available, retry 3x/30s) 2. discover_local — isolate the local node (has management_interfaces != null) @@ -33,9 +33,8 @@ import os import sys import time -from pathlib import Path -from ontap_client import OntapClient +from ontap_client import OntapClient, load_env_file logging.basicConfig( level=logging.INFO, @@ -50,7 +49,7 @@ "ONTAP_HOST": "", # Node 1 management IP — set via ONTAP_HOST env var "ONTAP_USER": "admin", "ONTAP_PASS": "", # leave empty for pre-cluster nodes - "CLUSTER_NAME": "", # set via CLUSTER_NAME env var + "CLUSTER_NAME": "cluster1", # set via CLUSTER_NAME env var "CLUSTER_PASS": "", # set via CLUSTER_PASS env var — choose your cluster admin password "CLUSTER_MGMT_IP": "", # cluster management IP — set via CLUSTER_MGMT_IP env var "CLUSTER_NETMASK": "", # set via CLUSTER_NETMASK env var @@ -59,26 +58,18 @@ } # --------------------------------------------------------------------------- -_NODE_FIELDS_SETS = [ - # newest (9.19+) - ( - "name,model,state,ha,version,serial_number,membership," - "cluster_interfaces,management_interfaces,metrocluster,disaggregated,san_optimized" - ), - # 9.18 without disaggregated - ( - "name,model,state,ha,version,serial_number,membership," - "cluster_interfaces,management_interfaces,metrocluster,san_optimized" - ), - # 9.14 and older — minimal safe set - ( - "name,model,state,ha,version,serial_number,membership," - "cluster_interfaces,management_interfaces,metrocluster" - ), -] +# ONTAP 9 unified — node discovery fields +_NODE_FIELDS = ( + "name,model,state,ha,version,serial_number,membership," + "cluster_interfaces,management_interfaces,metrocluster" +) def _env(key: str, required: bool = True) -> str: + """Return the value for *key* from INPUTS or os.environ. + + Logs an error and exits if *required* is True and the value is empty. + """ val = INPUTS.get(key) or os.environ.get(key, "") if required and not val: logger.error( @@ -95,23 +86,11 @@ def _env(key: str, required: bool = True) -> str: def _get_nodes(client: OntapClient, **kwargs) -> dict: - """GET /cluster/nodes, trying progressively reduced field sets for older ONTAP versions.""" - last_exc: Exception | None = None - for fields in _NODE_FIELDS_SETS: - try: - return client.get("/cluster/nodes", fields=fields, **kwargs) - except Exception as exc: - if "262197" in str(exc): - logger.warning( - "discover: field unsupported on this version, retrying with reduced fields" - ) - last_exc = exc - continue - raise - raise last_exc # type: ignore[misc] + """GET /cluster/nodes with the standard ONTAP 9 unified field set.""" + return client.get("/cluster/nodes", fields=_NODE_FIELDS, **kwargs) -def discover_nodes(client: OntapClient, attempts: int = 3, delay: int = 30) -> dict: +def discover_nodes(client: OntapClient, attempts: int = 3, delay: int = 30) -> dict: # type: ignore[return] """Step 1 — discover available nodes, retry up to 3 times.""" for attempt in range(1, attempts + 1): try: @@ -212,27 +191,19 @@ def create_cluster(client: OntapClient, local: dict, partner: dict) -> dict: def track_job(client: OntapClient, job_uuid: str) -> dict: - """Step 5 — poll job until state != running (switch to cluster password first).""" - # After cluster creation the node switches to cluster mode — use CLUSTER_PASS - client._session.auth = (_env("ONTAP_USER"), _env("CLUSTER_PASS")) + """Step 5 — switch to cluster credentials then poll the job until completion. - while True: - result = client.get( - f"/cluster/jobs/{job_uuid}", - fields=("code,description,end_time,error,message,start_time,state,uuid"), - ) - state = result.get("state", "unknown") - logger.info("track_job — state=%s", state) - if state != "running": - if state != "success": - raise RuntimeError( - f"Cluster job ended with state='{state}': {result.get('error')}" - ) - return result - time.sleep(10) + After ``POST /cluster`` the node transitions to full cluster mode and + requires the new cluster-admin password. Authentication is updated via + :meth:`~ontap_client.OntapClient.update_auth` before polling begins. + """ + # After cluster creation the node switches to cluster mode — use CLUSTER_PASS + client.update_auth(_env("ONTAP_USER"), _env("CLUSTER_PASS")) + return client.poll_job(job_uuid, interval=10, timeout=1800) def main() -> None: + """Orchestrate all five cluster-setup steps and log the resulting cluster URL.""" host = _env("ONTAP_HOST") user = _env("ONTAP_USER") passwd = os.environ.get("ONTAP_PASS", "") # empty on pre-cluster nodes @@ -258,13 +229,11 @@ def main() -> None: def _load_env_file(path: str) -> None: - """Load KEY=VALUE pairs from a .env file into the INPUTS dict.""" - for line in Path(path).read_text(encoding="utf-8").splitlines(): - line = line.strip() - if not line or line.startswith("#") or "=" not in line: - continue - key, _, value = line.partition("=") - INPUTS[key.strip()] = value.strip().strip('"').strip("'") + """Load KEY=VALUE pairs from a .env file into both os.environ and the INPUTS dict.""" + load_env_file(path) + for key in list(INPUTS): + if val := os.environ.get(key): + INPUTS[key] = val if __name__ == "__main__": diff --git a/python/nfs_provision.py b/python/nfs_provision.py index b8b6d13..584567d 100644 --- a/python/nfs_provision.py +++ b/python/nfs_provision.py @@ -43,9 +43,8 @@ import logging import os import sys -from pathlib import Path -from ontap_client import OntapClient +from ontap_client import OntapClient, load_env_file logging.basicConfig( level=logging.INFO, @@ -54,34 +53,17 @@ logger = logging.getLogger(__name__) ENV = { - "ONTAP_HOST": "", # cluster management IP — set here or via ONTAP_HOST env var + "ONTAP_HOST": "", # cluster management IP — set via ONTAP_HOST env var "ONTAP_USER": "admin", "ONTAP_PASS": "", # never hardcode — set via ONTAP_PASS env var - "SVM_NAME": "vs1", - "VOLUME_NAME": "vol_001", + "SVM_NAME": "vs0", + "VOLUME_NAME": "vol", "VOLUME_SIZE": "100MB", - "AGGR_NAME": "sti232_vsim_sr091o_aggr1", # required — set via --aggregate or AGGR_NAME env var + "AGGR_NAME": "", # required — set via --aggregate or AGGR_NAME env var "CLIENT_MATCH": "0.0.0.0/0", } -def _load_env_file(path: str) -> None: - """Load KEY=VALUE pairs from an env file into os.environ (dotenv style).""" - p = Path(path) - if not p.is_file(): - logger.error("Env file not found: %s", path) - sys.exit(1) - for lineno, raw in enumerate(p.read_text().splitlines(), start=1): - line = raw.strip() - if not line or line.startswith("#"): - continue - if "=" not in line: - logger.error("Env file %s line %d: expected KEY=VALUE, got: %s", path, lineno, line) - sys.exit(1) - key, _, value = line.partition("=") - os.environ.setdefault(key.strip(), value.strip()) - - def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description="Provision an NFS volume") p.add_argument( @@ -104,15 +86,22 @@ def _pick(arg: str | None, env_key: str, default: str = "") -> str: def _resolve_config(args: argparse.Namespace) -> tuple[str, str, str, str, str]: - """Push ENV defaults into os.environ then resolve final values from all sources.""" + """Load env file (if provided) and resolve final values from all sources. + + Also pushes ENV block defaults into os.environ so ``OntapClient.from_env()`` + can pick up ``ONTAP_HOST`` / ``ONTAP_PASS`` from the ENV block when no real + env vars are set. + """ + if args.env_file: + load_env_file(args.env_file) for key, value in ENV.items(): if value and key not in os.environ: os.environ[key] = value - svm = _pick(args.svm, "SVM_NAME", "vs0") - volume = _pick(args.volume, "VOLUME_NAME", "vol_nfs_test_01") + svm = _pick(args.svm, "SVM_NAME") + volume = _pick(args.volume, "VOLUME_NAME") size = _pick(args.size, "VOLUME_SIZE", "100MB") aggregate = _pick(args.aggregate, "AGGR_NAME") - client_match = _pick(args.client_match, "CLIENT_MATCH", "0.0.0.0/0") + client_match = _pick(args.client_match, "CLIENT_MATCH") return svm, volume, size, aggregate, client_match @@ -220,13 +209,21 @@ def _assign_policy(client: OntapClient, volume_uuid: str, policy_name: str) -> N def main() -> None: + """Parse args, resolve config, then provision volume, export policy, and client rule.""" args = parse_args() - if args.env_file: - _load_env_file(args.env_file) svm, volume, size, aggregate, client_match = _resolve_config(args) + if not svm: + logger.error("--svm is required (or set SVM_NAME in env / --env-file)") + sys.exit(1) + if not volume: + logger.error("--volume is required (or set VOLUME_NAME in env / --env-file)") + sys.exit(1) if not aggregate: logger.error("--aggregate is required (or set AGGR_NAME in env / --env-file)") sys.exit(1) + if not client_match: + logger.error("--client-match is required (or set CLIENT_MATCH in env / --env-file)") + sys.exit(1) policy_name = f"{volume}_export_policy" with OntapClient.from_env() as client: volume_uuid = _ensure_volume(client, svm, volume, size, aggregate) diff --git a/python/ontap_client.py b/python/ontap_client.py index 1f9e41e..5228587 100644 --- a/python/ontap_client.py +++ b/python/ontap_client.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python3 # © 2026 NetApp, Inc. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 # See the NOTICE file in the repo root for trademark and attribution details. @@ -11,6 +12,14 @@ with OntapClient.from_env() as client: cluster = client.get("/cluster", fields="version") print(cluster["name"], cluster["version"]["full"]) + +Environment variables:: + + ONTAP_HOST cluster management LIF (required) + ONTAP_PASS admin password (required) + ONTAP_USER username, default admin + ONTAP_VERIFY_SSL set to 'true' to enable SSL verification, default false + ONTAP_TIMEOUT request timeout in seconds, default 90 """ from __future__ import annotations @@ -26,6 +35,8 @@ logger = logging.getLogger("ontap_client") +__all__ = ["OntapClient", "OntapApiError", "load_env_file"] + # All examples in this repo disable SSL verification to support environments # that use self-signed certificates. We recommend setting # ONTAP_VERIFY_SSL=true once CA-signed certificates are in place. The @@ -33,7 +44,7 @@ # is disabled. urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) -_DEFAULT_TIMEOUT = 30 +_DEFAULT_TIMEOUT = 90 _DEFAULT_HEADERS = { "Accept": "application/hal+json", "Content-Type": "application/json", @@ -99,6 +110,15 @@ def __exit__(self, *exc: object) -> None: def close(self) -> None: self._session.close() + def update_auth(self, username: str, password: str) -> None: + """Replace the HTTP Basic-Auth credentials on the underlying session. + + Use this when the cluster switches authentication context mid-workflow + (e.g. after ``POST /cluster`` when the node moves from pre-cluster mode + to full cluster mode and requires the new cluster admin password). + """ + self._session.auth = (username, password) + # -- Factory ------------------------------------------------------------ @classmethod @@ -110,7 +130,8 @@ def from_env(cls) -> OntapClient: Optional (with defaults): ``ONTAP_USER`` (default ``admin``), - ``ONTAP_VERIFY_SSL`` (default ``false``) + ``ONTAP_VERIFY_SSL`` (default ``false``), + ``ONTAP_TIMEOUT`` (default ``90`` seconds) """ host = os.environ.get("ONTAP_HOST", "") if not host: @@ -121,11 +142,15 @@ def from_env(cls) -> OntapClient: logger.error("ONTAP_PASS environment variable is required") sys.exit(1) + _timeout_env = os.environ.get("ONTAP_TIMEOUT") + timeout = int(_timeout_env) if _timeout_env is not None else _DEFAULT_TIMEOUT + return cls( host=host, username=os.environ.get("ONTAP_USER", "admin"), password=password, verify_ssl=os.environ.get("ONTAP_VERIFY_SSL", "false").lower() == "true", + timeout=timeout, ) # -- HTTP helpers ------------------------------------------------------- @@ -140,7 +165,15 @@ def _request(self, method: str, path: str, **kwargs: Any) -> dict[str, Any]: url = self._url(path) logger.debug("%s %s", method, url) - resp = self._session.request(method, url, **kwargs) + try: + resp = self._session.request(method, url, **kwargs) + except requests.exceptions.Timeout as exc: + raise RuntimeError( + f"{method} {url} timed out after {kwargs['timeout']} s — " + "the cluster may be busy or unreachable. " + "Increase the timeout via OntapClient(..., timeout=) if needed." + ) from exc + if not resp.ok: raise OntapApiError(resp) @@ -151,7 +184,6 @@ def _request(self, method: str, path: str, **kwargs: Any) -> dict[str, Any]: def get(self, path: str, *, fields: str = "", **params: str) -> dict[str, Any]: if fields: params["fields"] = fields - params.setdefault("return_timeout", "120") return self._request("GET", path, params=params) def post(self, path: str, body: dict[str, Any]) -> dict[str, Any]: @@ -206,3 +238,76 @@ def poll_job( raise TimeoutError(f"Job {job_uuid} did not complete within {timeout}s") time.sleep(interval) + + def wait_snapmirrored( + self, + rel_uuid: str, + *, + interval: int = 15, + max_wait: int = 1800, + ) -> dict[str, Any]: + """Poll a SnapMirror relationship until its state becomes ``snapmirrored``. + + Args: + rel_uuid: UUID of the SnapMirror relationship to watch. + interval: Seconds between polls (default 15). + max_wait: Maximum total seconds to wait before raising (default 1800). + + Returns: + The final relationship record when state == ``snapmirrored``. + + Raises: + :class:`RuntimeError` if ``max_wait`` is exceeded. + """ + elapsed = 0 + while elapsed < max_wait: + result = self.get( + f"/snapmirror/relationships/{rel_uuid}", + fields="state,lag_time,healthy", + ) + state = result.get("state", "unknown") + logger.info("Relationship %s — state: %s", rel_uuid, state) + if state == "snapmirrored": + return result + time.sleep(interval) + elapsed += interval + raise RuntimeError(f"Timed out waiting for relationship {rel_uuid} to reach snapmirrored") + + +# --------------------------------------------------------------------------- +# Shared utilities +# --------------------------------------------------------------------------- + + +def load_env_file(path: str) -> None: + """Load ``KEY=VALUE`` pairs from a file into :data:`os.environ` (dotenv style). + + Rules: + - Blank lines and lines starting with ``#`` are ignored. + - Values are set via :func:`os.environ.setdefault` so existing env vars + take precedence. + - Surrounding single or double quotes on values are stripped. + + Args: + path: Path to the env file. The script exits with an error message if + the file does not exist or contains a malformed line. + """ + from pathlib import Path # local import to avoid top-level dependency + + p = Path(path) + if not p.is_file(): + logger.error("Env file not found: %s", path) + import sys + + sys.exit(1) + for lineno, raw in enumerate(p.read_text(encoding="utf-8").splitlines(), start=1): + line = raw.strip() + if not line or line.startswith("#"): + continue + if "=" not in line: + logger.error("Env file %s line %d: expected KEY=VALUE, got: %s", path, lineno, line) + import sys + + sys.exit(1) + key, _, value = line.partition("=") + os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'")) diff --git a/python/snapmirror_cleanup_test_failover.py b/python/snapmirror_cleanup_test_failover.py index 55f5ff2..667257f 100644 --- a/python/snapmirror_cleanup_test_failover.py +++ b/python/snapmirror_cleanup_test_failover.py @@ -53,17 +53,21 @@ # USER INPUTS — fill in your values here before running # --------------------------------------------------------------------------- INPUTS = { - "CLUSTER_A": "", # first cluster management IP — never hardcode - "CLUSTER_B": "", # second cluster management IP — never hardcode + "CLUSTER_A": "", # first cluster management IP — set via CLUSTER_A env var + "CLUSTER_B": "", # second cluster management IP — set via CLUSTER_B env var "DEST_USER": "admin", - "DEST_PASS": "", # set via DEST_PASS env var + "DEST_PASS": "", # set via DEST_PASS env var — never hardcode "SOURCE_VOLUME": "", # source volume name (e.g. vol_rw_01) - "SOURCE_SVM": "vs0", # source SVM name + "SOURCE_SVM": "", # source SVM name } # --------------------------------------------------------------------------- def _env(key: str, default: str = "") -> str: + """Return the value for *key* from INPUTS or os.environ. + + Logs an error and exits if the resolved value is empty and no *default* is given. + """ val = INPUTS.get(key) or os.environ.get(key, default) if not val: logger.error( @@ -74,16 +78,6 @@ def _env(key: str, default: str = "") -> str: return val -def _poll_job(client: OntapClient, job_uuid: str, interval: int = 10) -> dict: - while True: - result = client.get(f"/cluster/jobs/{job_uuid}", fields="state,message,error,code") - state = result.get("state", "unknown") - logger.info(" job %s — state=%s", job_uuid, state) - if state != "running": - return result - time.sleep(interval) - - def _pick_cluster_by_relationship( cluster_a: str, cluster_b: str, @@ -96,13 +90,12 @@ def _pick_cluster_by_relationship( source_path = f"{source_svm}:{source_volume}" for host in (cluster_a, cluster_b): try: - client = OntapClient(host, user, passwd, verify_ssl=False, timeout=20) - resp = client.get( - "/snapmirror/relationships", - fields="uuid,source.path,destination.path,state,healthy", - **{"source.path": source_path, "max_records": "1"}, - ) - client.close() + with OntapClient(host, user, passwd, verify_ssl=False, timeout=20) as client: + resp = client.get( + "/snapmirror/relationships", + fields="uuid,source.path,destination.path,state,healthy", + **{"source.path": source_path, "max_records": "1"}, + ) if resp.get("num_records", 0) >= 1: return host, resp["records"][0] except Exception as exc: @@ -156,7 +149,7 @@ def _remove_smas_and_bring_online( ) job_uuid = resp.get("job", {}).get("uuid") if job_uuid: - _poll_job(client, job_uuid) + client.poll_job(job_uuid) except Exception as exc: logger.warning("delete_smas_rel %s — %s (continuing)", smas_uuid, exc) if smas_resp.get("num_records", 0) == 0: @@ -168,7 +161,7 @@ def _remove_smas_and_bring_online( ) job_uuid = resp.get("job", {}).get("uuid") if job_uuid: - _poll_job(client, job_uuid) + client.poll_job(job_uuid) except Exception as exc: logger.warning("bring_online — %s (continuing)", exc) @@ -184,7 +177,7 @@ def _unmount_clone(client: OntapClient, clone_uuid: str) -> None: ) job_uuid = resp.get("job", {}).get("uuid") if job_uuid: - _poll_job(client, job_uuid) + client.poll_job(job_uuid) return except Exception as exc: logger.warning("unmount_clone attempt %d/6 — %s", attempt, exc) @@ -204,7 +197,7 @@ def _offline_clone(client: OntapClient, clone_uuid: str) -> None: ) job_uuid = resp.get("job", {}).get("uuid") if job_uuid: - _poll_job(client, job_uuid) + client.poll_job(job_uuid) except Exception as exc: logger.warning("offline_clone — %s", exc) @@ -218,7 +211,7 @@ def _delete_and_confirm_clone( resp = client.delete(f"/storage/volumes/{clone_uuid}?return_timeout=120") job_uuid = resp.get("job", {}).get("uuid") if job_uuid: - _poll_job(client, job_uuid) + client.poll_job(job_uuid) except Exception as exc: logger.warning("delete_clone — %s", exc) confirm = client.get( @@ -238,6 +231,7 @@ def _delete_and_confirm_clone( def main() -> None: + """Find the tagged FlexClone from a test failover and delete it through all cleanup phases.""" cluster_a = _env("CLUSTER_A") cluster_b = _env("CLUSTER_B") dest_user = _env("DEST_USER") diff --git a/python/snapmirror_provision_dest_managed.py b/python/snapmirror_provision_dest_managed.py index 4fa4c96..7930581 100644 --- a/python/snapmirror_provision_dest_managed.py +++ b/python/snapmirror_provision_dest_managed.py @@ -26,11 +26,21 @@ Prerequisites: 1. pip install -r requirements.txt - 2. ONTAP 9.8+ on both clusters - 3. SnapMirror licence installed on both clusters - 4. At least one intercluster LIF on each cluster + 2. ONTAP 9.8+ on both clusters (mixed versions supported; dest cluster drives all API calls) + 3. SnapMirror licence installed on BOTH clusters — source AND destination. + Verify via: ``system license show -package SnapMirror`` on each cluster. + 4. At least one intercluster LIF on each cluster, AND: + - IC LIFs on both clusters must be on a mutually routable subnet, OR + TCP 11104 and 11105 must be open between the two sets of IC LIF IPs. + - SnapMirror data transfers are always initiated by the DESTINATION cluster + connecting outbound to the source cluster on TCP 11104/11105. + - The script checks IC LIF subnets at startup and warns if they are disjoint. + - Tip: run ``network interface show -role intercluster`` on each cluster to + confirm the IPs and verify reachability with ``ping`` before running this script. 5. Cluster peer relationship already exists between source and dest clusters + (or will be auto-created by this script if none is found) 6. SVM peer relationship already exists (source SVM <-> dest SVM) + (or will be auto-created by this script if none is found) 7. Source RW volume (SOURCE_VOLUME) already exists on SOURCE_SVM 8. At least one online aggregate on the destination cluster 9. Admin credentials for both clusters @@ -52,7 +62,7 @@ import sys import time -from ontap_client import OntapClient +from ontap_client import OntapApiError, OntapClient logging.basicConfig( level=logging.INFO, @@ -67,18 +77,22 @@ "SOURCE_HOST": "", # source cluster management IP — set via SOURCE_HOST env var "SOURCE_USER": "admin", "SOURCE_PASS": "", # set via SOURCE_PASS env var — never hardcode - "SOURCE_SVM": "svm1", # source SVM name - "SOURCE_VOLUME": "vol_py1", # source RW volume name + "SOURCE_SVM": "", # source SVM name + "SOURCE_VOLUME": "", # source RW volume name "DEST_HOST": "", # destination cluster management IP — set via DEST_HOST env var "DEST_USER": "admin", "DEST_PASS": "", # set via DEST_PASS env var — never hardcode - "DEST_SVM": "vs0", # destination SVM name + "DEST_SVM": "", # destination SVM name "SM_POLICY": "Asynchronous", # SnapMirror policy (optional) } # --------------------------------------------------------------------------- def _env(key: str, default: str = "") -> str: + """Return the value for *key* from INPUTS or os.environ. + + Logs an error and exits if the resolved value is empty and no *default* is given. + """ val = INPUTS.get(key) or os.environ.get(key, default) if not val: logger.error( @@ -89,47 +103,29 @@ def _env(key: str, default: str = "") -> str: return val -def _poll_job(client: OntapClient, job_uuid: str, interval: int = 10) -> dict: - while True: - result = client.get(f"/cluster/jobs/{job_uuid}", fields="state,message,error,code") - state = result.get("state", "unknown") - logger.info(" job %s — state=%s", job_uuid, state) - if state != "running": - return result - time.sleep(interval) - +def _get_ic_lif_ips(client: OntapClient) -> list[str]: + """Return intercluster LIF IPs from the given cluster. -def _wait_snapmirrored( - client: OntapClient, rel_uuid: str, interval: int = 15, max_wait: int = 1800 -) -> dict: - elapsed = 0 - while elapsed < max_wait: - result = client.get( - f"/snapmirror/relationships/{rel_uuid}", - fields="state,lag_time,healthy", + Uses server-side query filters for reliability across ONTAP versions: + first tries ``service_policy.name=default-intercluster`` (9.8+ REST + model), then ``services=intercluster_core`` as a fallback. Results + from both queries are merged and deduplicated. + """ + seen: set[str] = set() + for qp in ( + {"service_policy.name": "default-intercluster"}, + {"services": "intercluster_core"}, + ): + resp = client.get( + "/network/ip/interfaces", + fields="name,ip.address", + **{"max_records": "50", **qp}, ) - state = result.get("state", "unknown") - logger.info(" relationship %s — state=%s", rel_uuid, state) - if state == "snapmirrored": - return result - time.sleep(interval) - elapsed += interval - raise RuntimeError(f"Timed out waiting for relationship {rel_uuid} to reach snapmirrored") - - -def _get_ic_lif_ips(client: OntapClient) -> list[str]: - """Return intercluster LIF IPs from the given cluster.""" - resp = client.get( - "/network/ip/interfaces", - fields="name,ip.address,services", - **{"max_records": "50"}, - ) - return [ - r["ip"]["address"] - for r in resp.get("records", []) - if any("intercluster" in str(s) for s in r.get("services", [])) - and r.get("ip", {}).get("address") - ] + for r in resp.get("records", []): + ip = r.get("ip", {}).get("address", "") + if ip: + seen.add(ip) + return list(seen) def _check_ic_lif_preconditions( @@ -146,7 +142,7 @@ def _check_ic_lif_preconditions( logger.error( "PRE-CONDITION FAILED | Source cluster has no intercluster LIFs.\n" " SnapMirror requires at least one IC LIF on each cluster.\n" - " Create one via System Manager: Network → IP Interfaces → Add → Role: Intercluster\n" + " Create one via System Manager: Network -> IP Interfaces -> Add -> Role: Intercluster\n" " Or via CLI: network interface create -role intercluster -home-port e0d " "-address -netmask " ) @@ -155,7 +151,7 @@ def _check_ic_lif_preconditions( logger.error( "PRE-CONDITION FAILED | Dest cluster has no intercluster LIFs.\n" " SnapMirror requires at least one IC LIF on each cluster.\n" - " Create one via System Manager: Network → IP Interfaces → Add → Role: Intercluster\n" + " Create one via System Manager: Network -> IP Interfaces -> Add -> Role: Intercluster\n" " Or via CLI: network interface create -role intercluster -home-port e0d " "-address -netmask " ) @@ -342,7 +338,7 @@ def _create_svm_peer_relationship( ) peer_job = resp.get("job", {}).get("uuid", "") if peer_job: - _poll_job(dst, peer_job) + dst.poll_job(peer_job) logger.info("SVM PEER | created '%s' <-> '%s'", dest_svm, source_svm) except Exception as exc: exc_s = str(exc) @@ -412,11 +408,20 @@ def _phase_a_source_preflight( src_cluster.get("name"), src_cluster.get("version", {}).get("full"), ) - src_vol_resp = src.get( - "/storage/volumes", - fields="name,uuid,state,type,space.size", - **{"max_records": "1", "name": source_volume, "svm.name": source_svm}, - ) + try: + src_vol_resp = src.get( + "/storage/volumes", + fields="name,uuid,state,type,space.size", + **{"max_records": "1", "name": source_volume, "svm.name": source_svm}, + ) + except OntapApiError as exc: + logger.error( + "ABORTED — could not query source volume '%s' on %s: %s", + source_volume, + source_host, + exc, + ) + sys.exit(1) if src_vol_resp.get("num_records", 0) == 0: logger.error( "ABORTED — source volume '%s' not found on %s", @@ -472,7 +477,7 @@ def _phase_d_setup_relationship( ) job_uuid = create_resp.get("job", {}).get("uuid") if job_uuid: - _poll_job(dst, job_uuid) + dst.poll_job(job_uuid) except Exception as exc: logger.info("create_and_initialize_relationship — %s (may already exist)", exc) @@ -528,6 +533,7 @@ def _phase_d_setup_relationship( def main() -> None: + """Orchestrate all six phases (A–F) to provision a SnapMirror relationship from the destination cluster.""" source_host = _env("SOURCE_HOST") source_user = _env("SOURCE_USER") source_pass = _env("SOURCE_PASS") @@ -616,7 +622,7 @@ def main() -> None: ) logger.info("=== Phase E: Convergence polling ===") - _wait_snapmirrored(dst, rel_uuid) + dst.wait_snapmirrored(rel_uuid) logger.info("=== Phase F: Final validation ===") final = dst.get( diff --git a/python/snapmirror_provision_src_managed.py b/python/snapmirror_provision_src_managed.py index 481d8fe..b2179a5 100644 --- a/python/snapmirror_provision_src_managed.py +++ b/python/snapmirror_provision_src_managed.py @@ -19,10 +19,21 @@ Prerequisites: 1. pip install -r requirements.txt 2. ONTAP 9.8+ on both clusters - 3. SnapMirror licence installed on both clusters - 4. At least one intercluster LIF on each cluster - 5. Cluster peer relationship already exists between source and dest clusters - 6. SVM peer relationship already exists (source SVM <-> dest SVM) + 3. SnapMirror licence installed on BOTH clusters — source AND destination. + Verify via: ``system license show -package SnapMirror`` on each cluster. + 4. At least one intercluster LIF on each cluster, AND: + - IC LIFs on both clusters must be on a mutually routable subnet, OR + TCP 11104 and 11105 must be open between the two sets of IC LIF IPs. + - SnapMirror data transfers are always initiated by the DESTINATION cluster + connecting outbound to the source cluster on TCP 11104/11105. + - Tip: run ``network interface show -role intercluster`` on each cluster to + confirm the IPs and verify reachability with ``ping`` before running this script. + 5. Cluster peer relationship MUST already exist between source and dest clusters. + (Unlike snapmirror_provision_dest_managed.py, this script does NOT auto-create it.) + Create via: ``cluster peer create`` on both clusters, or System Manager. + 6. SVM peer relationship MUST already exist (source SVM <-> dest SVM). + (Unlike snapmirror_provision_dest_managed.py, this script does NOT auto-create it.) + Create via: ``vserver peer create`` on the source cluster, or System Manager. 7. Source RW volume (SOURCE_VOLUME) already exists on SOURCE_SVM 8. At least one online aggregate on the destination cluster 9. Admin credentials for both clusters @@ -42,9 +53,8 @@ import logging import os import sys -import time -from ontap_client import OntapClient +from ontap_client import OntapApiError, OntapClient logging.basicConfig( level=logging.INFO, @@ -56,21 +66,25 @@ # USER INPUTS — fill in your values here before running # --------------------------------------------------------------------------- INPUTS = { - "SOURCE_HOST": "", # source cluster management IP — never hardcode + "SOURCE_HOST": "", # source cluster management IP — set via SOURCE_HOST env var "SOURCE_USER": "admin", - "SOURCE_PASS": "", # set via SOURCE_PASS env var - "SOURCE_SVM": "vs0", # source SVM name + "SOURCE_PASS": "", # set via SOURCE_PASS env var — never hardcode + "SOURCE_SVM": "", # source SVM name "SOURCE_VOLUME": "", # source RW volume name - "DEST_HOST": "", # destination cluster management IP — never hardcode + "DEST_HOST": "", # destination cluster management IP — set via DEST_HOST env var "DEST_USER": "admin", - "DEST_PASS": "", # set via DEST_PASS env var - "DEST_SVM": "vs1", # destination SVM name + "DEST_PASS": "", # set via DEST_PASS env var — never hardcode + "DEST_SVM": "", # destination SVM name "SM_POLICY": "Asynchronous", # SnapMirror policy (optional) } # --------------------------------------------------------------------------- def _env(key: str, default: str = "") -> str: + """Return the value for *key* from INPUTS or os.environ. + + Logs an error and exits if the resolved value is empty and no *default* is given. + """ val = INPUTS.get(key) or os.environ.get(key, default) if not val: logger.error( @@ -81,35 +95,271 @@ def _env(key: str, default: str = "") -> str: return val -def _poll_job(client: OntapClient, job_uuid: str, interval: int = 10) -> dict: - while True: - result = client.get(f"/cluster/jobs/{job_uuid}", fields="state,message,error,code") - state = result.get("state", "unknown") - logger.info(" job %s — state=%s", job_uuid, state) - if state != "running": - return result - time.sleep(interval) - - -def _wait_snapmirrored( - client: OntapClient, rel_uuid: str, interval: int = 15, max_wait: int = 1800 +def _phase_a_source_preflight( + src: OntapClient, source_svm: str, source_volume: str, source_host: str ) -> dict: - elapsed = 0 - while elapsed < max_wait: - result = client.get( - f"/snapmirror/relationships/{rel_uuid}", - fields="state,lag_time,healthy", + """Verify source cluster connectivity and validate the source volume. + + Returns the source volume record. Aborts if missing or DP type. + """ + src_cluster = src.get("/cluster", fields="name,version") + logger.info( + "SOURCE CLUSTER | name=%s | ontap=%s", + src_cluster.get("name"), + src_cluster.get("version", {}).get("full"), + ) + src_vol_resp = src.get( + "/storage/volumes", + fields="name,uuid,state,type,space.size", + **{"max_records": "1", "name": source_volume, "svm.name": source_svm}, + ) + if src_vol_resp.get("num_records", 0) == 0: + logger.error( + "ABORTED — source volume '%s' not found on %s", + source_volume, + source_host, ) - state = result.get("state", "unknown") - logger.info(" relationship %s — state=%s", rel_uuid, state) - if state == "snapmirrored": - return result - time.sleep(interval) - elapsed += interval - raise RuntimeError(f"Timed out waiting for relationship {rel_uuid} to reach snapmirrored") + sys.exit(1) + src_vol = src_vol_resp["records"][0] + if src_vol.get("type") == "dp": + logger.error("ABORTED — source volume is type=dp; specify the RW volume") + sys.exit(1) + logger.info( + "SOURCE VOLUME | name=%s | uuid=%s | state=%s | type=%s | size=%s", + src_vol["name"], + src_vol["uuid"], + src_vol["state"], + src_vol["type"], + src_vol.get("space", {}).get("size"), + ) + return src_vol + + +def _phase_b_dest_preflight(dst: OntapClient) -> tuple[str, str]: + """Query the dest cluster for its peer name and best available aggregate. + + Returns ``(peer_name, aggr_name)``. + """ + dst_cluster = dst.get("/cluster", fields="name,version") + logger.info( + "DEST CLUSTER | name=%s | ontap=%s", + dst_cluster.get("name"), + dst_cluster.get("version", {}).get("full"), + ) + peer_resp = dst.get( + "/cluster/peers", + fields="name,status.state", + **{"max_records": "1"}, + ) + peer_name = peer_resp.get("records", [{}])[0].get("name", "") + logger.info("CLUSTER PEER | name=%s", peer_name) + + aggr_resp = dst.get( + "/storage/aggregates", + fields="name,space.block_storage.available", + state="online", + **{"max_records": "1", "order_by": "space.block_storage.available desc"}, + ) + aggr_name = aggr_resp.get("records", [{}])[0].get("name", "") + logger.info("DEST AGGREGATE | name=%s", aggr_name) + return peer_name, aggr_name + + +def _phase_c_dest_volume_setup( + dst: OntapClient, + dest_svm: str, + dest_volume: str, + aggr_name: str, + src_vol: dict, +) -> None: + """Create the destination DP volume if it does not already exist.""" + check_dest = dst.get( + "/storage/volumes", + fields="name,uuid,state,type", + **{"max_records": "1", "name": dest_volume, "svm.name": dest_svm}, + ) + if check_dest.get("num_records", 0) == 0: + logger.info("Creating dest DP volume '%s' on '%s'...", dest_volume, aggr_name) + try: + dst.post( + "/storage/volumes?return_timeout=120", + body={ + "name": dest_volume, + "type": "dp", + "svm": {"name": dest_svm}, + "aggregates": [{"name": aggr_name}], + "size": str(src_vol.get("space", {}).get("size", "")), + }, + ) + except Exception as exc: + logger.warning("create_dest_volume — %s (may already exist)", exc) + else: + logger.info("Dest volume '%s' already exists — skipping create", dest_volume) + + dst_vol_resp = dst.get( + "/storage/volumes", + fields="name,uuid,state,type", + **{"max_records": "1", "name": dest_volume, "svm.name": dest_svm}, + ) + dst_vol = dst_vol_resp.get("records", [{}])[0] + logger.info( + "DEST VOLUME | name=%s | uuid=%s | state=%s | type=%s", + dst_vol.get("name"), + dst_vol.get("uuid"), + dst_vol.get("state"), + dst_vol.get("type"), + ) + + +def _phase_d_setup_relationship( + dst: OntapClient, + source_svm: str, + source_volume: str, + dest_svm: str, + dest_volume: str, + peer_name: str, + sm_policy: str, +) -> None: + """Create and initialize the SnapMirror relationship (idempotent).""" + existing = dst.get( + "/snapmirror/relationships", + fields="uuid,state,healthy", + **{"destination.path": f"{dest_svm}:{dest_volume}", "max_records": "1"}, + ) + logger.info("RELATIONSHIP CHECK | existing=%d", existing.get("num_records", 0)) + try: + create_resp = dst.post( + "/snapmirror/relationships?return_timeout=120", + body={ + "source": { + "path": f"{source_svm}:{source_volume}", + "cluster": {"name": peer_name}, + }, + "destination": {"path": f"{dest_svm}:{dest_volume}"}, + "policy": {"name": sm_policy}, + }, + ) + job_uuid = create_resp.get("job", {}).get("uuid") + if job_uuid: + dst.poll_job(job_uuid) + except Exception as exc: + exc_s = str(exc) + if "9895992" in exc_s: + logger.error( + "ABORTED — SVM peer relationship is not in peered state.\n" + " Error : %s\n" + " Fix : Check the SVM peer state with:\n" + " vserver peer show -vserver %s\n" + " Wait for state=peered, or delete and re-create the SVM peer.", + exc_s, + dest_svm, + ) + sys.exit(1) + logger.warning("create_and_initialize_relationship — %s (may already exist)", exc) + + +def _abort_ic_lif_unreachable( + src: OntapClient, + dst: OntapClient, + source_host: str, + dest_host: str, + exc: OntapApiError, +) -> None: + """Log a detailed error about unreachable IC LIFs and abort.""" + + def _ic_ips(c: OntapClient) -> list[str]: + seen: set[str] = set() + for qp in ( + {"service_policy.name": "default-intercluster"}, + {"services": "intercluster_core"}, + ): + for r in c.get( + "/network/ip/interfaces", + fields="ip.address", + **{"max_records": "50", **qp}, + ).get("records", []): + ip = r.get("ip", {}).get("address", "") + if ip: + seen.add(ip) + return list(seen) + + src_ips = _ic_ips(src) + dst_ips = _ic_ips(dst) + logger.error( + "ABORTED — SnapMirror initialize failed: source volume not reachable.\n" + " ONTAP error : %s\n" + " Likely cause: TCP 11104/11105 is blocked between IC LIFs.\n" + " src IC LIFs : %s\n" + " dst IC LIFs : %s\n" + " Note : SnapMirror transfers are always initiated by the DEST\n" + " cluster. With SOURCE=%s and DEST=%s, the dest cluster\n" + " (%s) must reach the source (%s) on TCP 11104/11105.\n" + " Fix : Open TCP 11104/11105 in that direction, or swap\n" + " SOURCE/DEST so the working direction is used.", + exc, + src_ips, + dst_ips, + source_host, + dest_host, + dest_host, + source_host, + ) + sys.exit(1) + + +def _phase_e_convergence_polling( + src: OntapClient, + dst: OntapClient, + source_host: str, + dest_host: str, + dest_svm: str, + dest_volume: str, +) -> str: + """Kick off the initial transfer and wait until the relationship is snapmirrored. + + Returns the relationship UUID. + """ + rel_resp = dst.get( + "/snapmirror/relationships", + fields="uuid,source.path,destination.path,state,lag_time,healthy,policy.name", + **{"destination.path": f"{dest_svm}:{dest_volume}", "max_records": "1"}, + ) + rel_records = rel_resp.get("records", []) + if not rel_records: + logger.error( + "ABORTED — SnapMirror relationship not found for '%s:%s'.\n" + " The relationship was not created in Phase D — check the errors above.", + dest_svm, + dest_volume, + ) + sys.exit(1) + rel = rel_records[0] + rel_uuid = rel.get("uuid", "") + logger.info( + "RELATIONSHIP FOUND | uuid=%s | state=%s | healthy=%s", + rel_uuid, + rel.get("state"), + rel.get("healthy"), + ) + try: + dst.post( + f"/snapmirror/relationships/{rel_uuid}/transfers?return_timeout=120", + body={}, + ) + except OntapApiError as exc: + err = exc.detail.get("error", {}) if isinstance(exc.detail, dict) else {} + if err.get("code") == "13303812" and "not found" in err.get("message", ""): + _abort_ic_lif_unreachable(src, dst, source_host, dest_host, exc) + logger.warning("initialize_relationship — %s (may already be initialized)", exc) + except Exception as exc: + logger.warning("initialize_relationship — %s (may already be initialized)", exc) + + dst.wait_snapmirrored(rel_uuid) + return rel_uuid def main() -> None: + """Orchestrate all six phases (A–F) to provision a SnapMirror relationship from the source cluster.""" source_host = _env("SOURCE_HOST") source_user = _env("SOURCE_USER") source_pass = _env("SOURCE_PASS") @@ -129,151 +379,23 @@ def main() -> None: with src, dst: logger.info("=== Phase A: Source pre-flight ===") - src_cluster = src.get("/cluster", fields="name,version") - logger.info( - "SOURCE CLUSTER | name=%s | ontap=%s", - src_cluster.get("name"), - src_cluster.get("version", {}).get("full"), - ) - - src_vol_resp = src.get( - "/storage/volumes", - fields="name,uuid,state,type,space.size", - **{"max_records": "1", "name": source_volume, "svm.name": source_svm}, - ) - if src_vol_resp.get("num_records", 0) == 0: - logger.error( - "ABORTED — source volume '%s' not found on %s", - source_volume, - source_host, - ) - sys.exit(1) - src_vol = src_vol_resp["records"][0] - if src_vol.get("type") == "dp": - logger.error("ABORTED — source volume is type=dp; specify the RW volume") - sys.exit(1) - logger.info( - "SOURCE VOLUME | name=%s | uuid=%s | state=%s | type=%s | size=%s", - src_vol["name"], - src_vol["uuid"], - src_vol["state"], - src_vol["type"], - src_vol.get("space", {}).get("size"), - ) + src_vol = _phase_a_source_preflight(src, source_svm, source_volume, source_host) logger.info("=== Phase B: Dest pre-flight ===") - dst_cluster = dst.get("/cluster", fields="name,version") - logger.info( - "DEST CLUSTER | name=%s | ontap=%s", - dst_cluster.get("name"), - dst_cluster.get("version", {}).get("full"), - ) - - peer_resp = dst.get( - "/cluster/peers", - fields="name,status.state", - **{"max_records": "1"}, - ) - peer_name = peer_resp.get("records", [{}])[0].get("name", "") - logger.info("CLUSTER PEER | name=%s", peer_name) - - aggr_resp = dst.get( - "/storage/aggregates", - fields="name,space.block_storage.available", - state="online", - **{"max_records": "1", "order_by": "space.block_storage.available desc"}, - ) - aggr_name = aggr_resp.get("records", [{}])[0].get("name", "") - logger.info("DEST AGGREGATE | name=%s", aggr_name) + peer_name, aggr_name = _phase_b_dest_preflight(dst) logger.info("=== Phase C: Dest volume setup ===") - check_dest = dst.get( - "/storage/volumes", - fields="name,uuid,state,type", - **{"max_records": "1", "name": dest_volume, "svm.name": dest_svm}, - ) - if check_dest.get("num_records", 0) == 0: - logger.info("Creating dest DP volume '%s' on '%s'…", dest_volume, aggr_name) - try: - dst.post( - "/storage/volumes?return_timeout=120", - body={ - "name": dest_volume, - "type": "dp", - "svm": {"name": dest_svm}, - "aggregates": [{"name": aggr_name}], - "size": str(src_vol.get("space", {}).get("size", "")), - }, - ) - except Exception as exc: - logger.warning("create_dest_volume — %s (may already exist)", exc) - else: - logger.info("Dest volume '%s' already exists — skipping create", dest_volume) - - dst_vol_resp = dst.get( - "/storage/volumes", - fields="name,uuid,state,type", - **{"max_records": "1", "name": dest_volume, "svm.name": dest_svm}, - ) - dst_vol = dst_vol_resp.get("records", [{}])[0] - logger.info( - "DEST VOLUME | name=%s | uuid=%s | state=%s | type=%s", - dst_vol.get("name"), - dst_vol.get("uuid"), - dst_vol.get("state"), - dst_vol.get("type"), - ) + _phase_c_dest_volume_setup(dst, dest_svm, dest_volume, aggr_name, src_vol) logger.info("=== Phase D: Relationship setup ===") - existing = dst.get( - "/snapmirror/relationships", - fields="uuid,state,healthy", - **{"destination.path": f"{dest_svm}:{dest_volume}", "max_records": "1"}, + _phase_d_setup_relationship( + dst, source_svm, source_volume, dest_svm, dest_volume, peer_name, sm_policy ) - logger.info("RELATIONSHIP CHECK | existing=%d", existing.get("num_records", 0)) - - try: - create_resp = dst.post( - "/snapmirror/relationships?return_timeout=120", - body={ - "source": { - "path": f"{source_svm}:{source_volume}", - "cluster": {"name": peer_name}, - }, - "destination": {"path": f"{dest_svm}:{dest_volume}"}, - "policy": {"name": sm_policy}, - }, - ) - job_uuid = create_resp.get("job", {}).get("uuid") - if job_uuid: - _poll_job(dst, job_uuid) - except Exception as exc: - logger.warning("create_and_initialize_relationship — %s (may already exist)", exc) logger.info("=== Phase E: Convergence polling ===") - rel_resp = dst.get( - "/snapmirror/relationships", - fields="uuid,source.path,destination.path,state,lag_time,healthy,policy.name", - **{"destination.path": f"{dest_svm}:{dest_volume}", "max_records": "1"}, + rel_uuid = _phase_e_convergence_polling( + src, dst, source_host, dest_host, dest_svm, dest_volume ) - rel = rel_resp.get("records", [{}])[0] - rel_uuid = rel.get("uuid", "") - logger.info( - "RELATIONSHIP FOUND | uuid=%s | state=%s | healthy=%s", - rel_uuid, - rel.get("state"), - rel.get("healthy"), - ) - - try: - dst.post( - f"/snapmirror/relationships/{rel_uuid}/transfers?return_timeout=120", - body={}, - ) - except Exception as exc: - logger.warning("initialize_relationship — %s (may already be initialized)", exc) - - _wait_snapmirrored(dst, rel_uuid) logger.info("=== Phase F: Final validation ===") final = dst.get( diff --git a/python/snapmirror_test_failover.py b/python/snapmirror_test_failover.py index 48b2aba..cdfa680 100644 --- a/python/snapmirror_test_failover.py +++ b/python/snapmirror_test_failover.py @@ -41,7 +41,6 @@ import logging import os import sys -import time from ontap_client import OntapClient @@ -55,16 +54,20 @@ # USER INPUTS — fill in your values here before running # --------------------------------------------------------------------------- INPUTS = { - "CLUSTER_A": "", # first cluster management IP — never hardcode - "CLUSTER_B": "", # second cluster management IP — never hardcode + "CLUSTER_A": "", # first cluster management IP — set via CLUSTER_A env var + "CLUSTER_B": "", # second cluster management IP — set via CLUSTER_B env var "DEST_USER": "admin", - "DEST_PASS": "", # set via DEST_PASS env var + "DEST_PASS": "", # set via DEST_PASS env var — never hardcode "SOURCE_VOLUME": "", # source volume name, or * to auto-detect } # --------------------------------------------------------------------------- def _env(key: str, default: str = "") -> str: + """Return the value for *key* from INPUTS or os.environ. + + Logs an error and exits if the resolved value is empty and no *default* is given. + """ val = INPUTS.get(key) or os.environ.get(key, default) if not val: logger.error( @@ -75,34 +78,6 @@ def _env(key: str, default: str = "") -> str: return val -def _poll_job(client: OntapClient, job_uuid: str, interval: int = 10) -> dict: - while True: - result = client.get(f"/cluster/jobs/{job_uuid}", fields="state,message,error,code") - state = result.get("state", "unknown") - logger.info(" job %s — state=%s", job_uuid, state) - if state != "running": - return result - time.sleep(interval) - - -def _wait_snapmirrored( - client: OntapClient, rel_uuid: str, interval: int = 15, max_wait: int = 1800 -) -> dict: - elapsed = 0 - while elapsed < max_wait: - result = client.get( - f"/snapmirror/relationships/{rel_uuid}", - fields="state,lag_time,healthy", - ) - state = result.get("state", "unknown") - logger.info(" relationship %s — state=%s", rel_uuid, state) - if state == "snapmirrored": - return result - time.sleep(interval) - elapsed += interval - raise RuntimeError(f"Timed out waiting for relationship {rel_uuid} to reach snapmirrored") - - def _pick_cluster( cluster_a: str, cluster_b: str, user: str, passwd: str, vol_name_filter: str ) -> tuple[str, dict]: @@ -114,18 +89,17 @@ def _pick_cluster( for host in (cluster_a, cluster_b): try: - client = OntapClient(host, user, passwd, verify_ssl=False, timeout=20) - resp = client.get( - "/storage/volumes", - fields="name,create_time,uuid,svm.name,state,space.size", - **{ - "type": "dp", - "name": dest_filter, - "order_by": "create_time desc", - "max_records": "1", - }, - ) - client.close() + with OntapClient(host, user, passwd, verify_ssl=False, timeout=20) as client: + resp = client.get( + "/storage/volumes", + fields="name,create_time,uuid,svm.name,state,space.size", + **{ + "type": "dp", + "name": dest_filter, + "order_by": "create_time desc", + "max_records": "1", + }, + ) if resp.get("num_records", 0) >= 1: best_cluster = host best_vol = resp["records"][0] @@ -141,6 +115,7 @@ def _pick_cluster( def main() -> None: + """Auto-detect target cluster, create a FlexClone for test failover, then resync SnapMirror.""" cluster_a = _env("CLUSTER_A") cluster_b = _env("CLUSTER_B") dest_user = _env("DEST_USER") @@ -225,7 +200,7 @@ def main() -> None: ) job_uuid = clone_resp.get("job", {}).get("uuid") if job_uuid: - _poll_job(client, job_uuid) + client.poll_job(job_uuid) except Exception as exc: logger.warning("create_test_clone — %s (may already exist)", exc) @@ -278,11 +253,11 @@ def main() -> None: ) job_uuid = resync_resp.get("job", {}).get("uuid") if job_uuid: - _poll_job(client, job_uuid, interval=10) + client.poll_job(job_uuid) except Exception as exc: logger.warning("resync_sm_relationship — %s", exc) - _wait_snapmirrored(client, rel_uuid) + client.wait_snapmirrored(rel_uuid) logger.info("=== TEST FAILOVER COMPLETE — SnapMirror resynced ===")