Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 52 additions & 13 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ To compare this approach with Ansible or Terraform, see
> **Catalog:** [`catalog.yaml`](../catalog.yaml) lists every Python example with
> prerequisites, inputs, and outputs. Sections below follow the same format.

> **Note:** These scripts are runnable illustrations, not a tested library.
> There are no unit tests by design - CI validates only lint and formatting
> via Ruff. When you adapt a script for production, add tests appropriate
> to your environment.
> **Note:** These scripts are runnable illustrations. Unit tests live in
> `Unit_tests/` and can be run with `pytest Unit_tests/`. CI validates lint
> and formatting via Ruff in addition to running the test suite.

---

Expand Down Expand Up @@ -47,9 +46,10 @@ All scripts read connection details from environment variables.
export ONTAP_HOST=10.0.0.1 # cluster management LIF
export ONTAP_USER=admin # default: admin
export ONTAP_PASS=your_password
export ONTAP_TIMEOUT=30 # optional: request timeout in seconds (default: 90)
```

Or use an env file:
Or use an env file and pass it to scripts that support `--env-file`:

```bash
# cluster.env
Expand All @@ -59,13 +59,46 @@ ONTAP_PASS=your_password
```

```bash
# Linux / macOS
set -a && source cluster.env && set +a

# Windows PowerShell
Get-Content cluster.env | ForEach-Object {
if ($_ -match '^([^#][^=]*)=(.*)$') { [System.Environment]::SetEnvironmentVariable($Matches[1].Trim(), $Matches[2].Trim()) }
}
```

Scripts that accept `--env-file` (e.g. `cluster_setup_basic.py`) can also load
the file directly:

```bash
python cluster_setup_basic.py --env-file cluster.env
```

> SSL verification is disabled by default to support environments that use
> self-signed certificates. We recommend setting `ONTAP_VERIFY_SSL=true`
> once CA-signed certificates are in place.

### SSL / CA-bundle configuration

| Knob | Description |
|---|---|
| Default | `verify_ssl=False` — SSL certificate errors are suppressed so scripts work out-of-the-box with self-signed certs. |
| `ONTAP_VERIFY_SSL=true` | Enable full certificate verification (recommended for production). |
| `REQUESTS_CA_BUNDLE` | Path to a custom CA bundle PEM file when your cluster uses an internal/private CA. |

To enable verification with a custom CA bundle:

```bash
export ONTAP_VERIFY_SSL=true
export REQUESTS_CA_BUNDLE=/path/to/your/ca-bundle.pem
python cluster_info.py
```

For more details and common SSL errors, see the
[SSL / TLS errors section](../docs/troubleshooting.md#ssl--tls-errors) in the
troubleshooting guide.

---

## Examples
Expand Down Expand Up @@ -313,13 +346,13 @@ python snapmirror_cleanup_test_failover.py
|---|---|
| `ontap_client.py` | Reusable ONTAP REST client (session management, auth, polling, error handling) |
| `cluster_info.py` | Get cluster version + node list |
| `cluster_setup_basic.py` | Create a new ONTAP cluster from two pre-cluster nodes |
| `nfs_provision.py` | Create NFS volume with export policy |
| `cifs_provision.py` | Create CIFS share with volume and ACL |
| `cluster_setup_basic.py` | Create cluster from two pre-cluster nodes |
| `snapmirror_provision_src_managed.py` | SnapMirror provision (source-managed view) |
| `snapmirror_provision_dest_managed.py` | SnapMirror provision (destination-managed view) |
| `snapmirror_test_failover.py` | SnapMirror test failover via FlexClone |
| `snapmirror_cleanup_test_failover.py` | Clean up test failover clone |
| `cifs_provision.py` | Create CIFS/SMB share (optionally create CIFS server) |
| `snapmirror_provision_src_managed.py` | Provision a SnapMirror relationship from the source cluster |
| `snapmirror_provision_dest_managed.py` | Provision a SnapMirror relationship from the destination cluster |
| `snapmirror_test_failover.py` | Create a FlexClone of the SnapMirror destination for test failover |
| `snapmirror_cleanup_test_failover.py` | Delete the FlexClone created by a test failover |
| `requirements.txt` | Python dependencies |

## Code Patterns
Expand All @@ -328,8 +361,14 @@ These scripts demonstrate several patterns you can reuse:

- **`OntapClient.from_env()`** - builds a configured client from environment
variables so credentials never appear in code
- **`client.poll_job(uuid)`** - polls an async ONTAP job until completion with
configurable interval and timeout
- **`client.poll_job(uuid)`** - polls an async ONTAP job until completion;
accepts keyword args `interval` (seconds between polls, default 5) and
`timeout` (max seconds to wait, default 300); raises `RuntimeError` on
job failure and `TimeoutError` on timeout
- **`client.wait_snapmirrored(rel_uuid)`** - polls a SnapMirror relationship
until its state reaches `snapmirrored`; accepts `interval` and `max_wait`
- **`client.update_auth(username, password)`** - replaces session credentials
mid-workflow (used by `cluster_setup_basic.py` after cluster creation)
- **Context manager** - `with OntapClient.from_env() as client:` ensures the
HTTP session is properly closed
- **Structured logging** - all output goes through `logging`, not `print()`,
Expand Down
74 changes: 32 additions & 42 deletions python/cifs_provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@
import logging
import os
import sys
from pathlib import Path

from ontap_client import OntapApiError, OntapClient
from ontap_client import OntapApiError, OntapClient, load_env_file

logging.basicConfig(
level=logging.INFO,
Expand All @@ -38,40 +37,22 @@
logger = logging.getLogger(__name__)

ENV = {
"ONTAP_HOST": "", # cluster management IP — set here or via ONTAP_HOST env var
"ONTAP_HOST": "", # cluster management IP — set via ONTAP_HOST env var
"ONTAP_USER": "admin",
"ONTAP_PASS": "", # never hardcode — set via ONTAP_PASS env var
"SVM_NAME": "vs1",
"VOLUME_NAME": "vol_002",
"SVM_NAME": "vs0",
"VOLUME_NAME": "volume",
"VOLUME_SIZE": "100MB",
"AGGR_NAME": "", # required — set via --aggregate or AGGR_NAME env var
"CLIENT_MATCH": "0.0.0.0/0", # required — set via --client-match or CLIENT_MATCH env var
"SHARE_NAME": "cifs_share_demo",
"SHARE_COMMENT": "Provisioned by orchestrio",
"SHARE_NAME": "cifs_test",
"SHARE_COMMENT": "Provisioned by PACE",
"ACL_USER": "Everyone",
"ACL_PERMISSION": "full_control",
"CIFS_SERVER_NAME": "ONTAP-CIFS",
"CIFS_WORKGROUP": "WORKGROUP",
}


def _load_env_file(path: str) -> None:
"""Load KEY=VALUE pairs from an env file into os.environ (dotenv style)."""
p = Path(path)
if not p.is_file():
logger.error("Env file not found: %s", path)
sys.exit(1)
for lineno, raw in enumerate(p.read_text().splitlines(), start=1):
line = raw.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
logger.error("Env file %s line %d: expected KEY=VALUE, got: %s", path, lineno, line)
sys.exit(1)
key, _, value = line.partition("=")
os.environ.setdefault(key.strip(), value.strip())


def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Provision a CIFS share")
p.add_argument(
Expand Down Expand Up @@ -114,33 +95,41 @@ def _pick(cli_val: str | None, env_key: str, default: str = "") -> str:
return cli_val or os.environ.get(env_key) or ENV.get(env_key, "") or default


def _resolve_config(args: argparse.Namespace) -> dict[str, str | bool]:
"""Load env file and CLI args, then return the resolved configuration dict."""
def _require(value: str, flag: str, env_key: str) -> str:
"""Return *value* or exit with a clear error if it is empty."""
if not value:
logger.error("--%s is required (or set %s in env / --env-file)", flag, env_key)
sys.exit(1)
return value


def _resolve_config(args: argparse.Namespace) -> tuple[dict[str, str], bool]:
"""Load env file and CLI args, then return (str_config, create_cifs_server).

Returns a tuple of:
- A ``dict[str, str]`` with all string config values.
- A ``bool`` indicating whether to auto-create the CIFS server.
"""
if args.env_file:
_load_env_file(args.env_file)
load_env_file(args.env_file)

for key, value in ENV.items():
if value and key not in os.environ:
os.environ[key] = value

aggregate = _pick(args.aggregate, "AGGR_NAME")
if not aggregate:
logger.error("--aggregate is required (or set AGGR_NAME in env / --env-file)")
sys.exit(1)

return {
"svm": _pick(args.svm, "SVM_NAME", "vs0"),
"volume": _pick(args.volume, "VOLUME_NAME", "cifs_test_env"),
cfg: dict[str, str] = {
"aggregate": _require(_pick(args.aggregate, "AGGR_NAME"), "aggregate", "AGGR_NAME"),
"svm": _require(_pick(args.svm, "SVM_NAME"), "svm", "SVM_NAME"),
"volume": _require(_pick(args.volume, "VOLUME_NAME"), "volume", "VOLUME_NAME"),
"share_name": _require(_pick(args.share_name, "SHARE_NAME"), "share-name", "SHARE_NAME"),
"size": _pick(args.size, "VOLUME_SIZE", "100MB"),
"aggregate": aggregate,
"share_name": _pick(args.share_name, "SHARE_NAME", "cifs_share_demo"),
"share_comment": _pick(args.share_comment, "SHARE_COMMENT", "Provisioned by orchestrio"),
"share_comment": _pick(args.share_comment, "SHARE_COMMENT", "Provisioned by PACE"),
"acl_user": _pick(args.acl_user, "ACL_USER", "Everyone"),
"acl_permission": _pick(args.acl_permission, "ACL_PERMISSION", "full_control"),
"create_cifs_server": args.create_cifs_server,
"cifs_server_name": _pick(args.cifs_server_name, "CIFS_SERVER_NAME", "ONTAP-CIFS"),
"workgroup": _pick(args.workgroup, "CIFS_WORKGROUP", "WORKGROUP"),
}
return cfg, bool(args.create_cifs_server)


def _ensure_cifs_server(
Expand Down Expand Up @@ -302,7 +291,8 @@ def _verify_and_log_acls(client: OntapClient, svm_uuid: str, share_name: str) ->


def main() -> None:
cfg = _resolve_config(parse_args())
"""Resolve configuration, then orchestrate CIFS server, volume, share, and ACL setup."""
cfg, create_cifs_server = _resolve_config(parse_args())
svm = cfg["svm"]
volume = cfg["volume"]
size = cfg["size"]
Expand All @@ -314,7 +304,7 @@ def main() -> None:

with OntapClient.from_env() as client:
_ensure_cifs_server(
client, svm, cfg["create_cifs_server"], cfg["cifs_server_name"], cfg["workgroup"]
client, svm, create_cifs_server, cfg["cifs_server_name"], cfg["workgroup"]
)

job_result = _ensure_volume_ntfs(client, svm, volume, size, aggregate)
Expand Down
33 changes: 27 additions & 6 deletions python/cluster_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
# SPDX-License-Identifier: Apache-2.0
# See the NOTICE file in the repo root for trademark and attribution details.

"""Retrieve storage cluster version and list all nodes with serial numbers.
"""Retrieve storage cluster version, nodes, and aggregates.

Steps:
1. GET /cluster — retrieve cluster name and ONTAP version
2. GET /cluster/nodes — list all nodes with serial numbers
1. GET /cluster — cluster name, ONTAP version, contact, location
2. GET /cluster/nodes — list all nodes with serial numbers
3. GET /storage/aggregates — list aggregates with state and used space

Prerequisites::

Expand All @@ -34,13 +35,16 @@


def main() -> None:
"""Retrieve cluster version and print all node names with serial numbers."""
with OntapClient.from_env() as client:
# Step 1 — cluster version
cluster = client.get("/cluster", fields="name,version")
# Step 1 — cluster version, contact, and location
cluster = client.get("/cluster", fields="name,version,contact,location")
logger.info(
"Cluster: %s — ONTAP %s",
"Cluster: %s — ONTAP %s contact=%s location=%s",
cluster.get("name", "unknown"),
cluster.get("version", {}).get("full", "unknown"),
cluster.get("contact", "—"),
cluster.get("location", "—"),
)

# Step 2 — node list with serial numbers
Expand All @@ -55,6 +59,23 @@ def main() -> None:
node.get("serial_number", "—"),
)

# Step 3 — aggregate list with state and used space
aggr_resp = client.get("/storage/aggregates", fields="name,state,space.block_storage")
aggr_records = aggr_resp.get("records", [])
logger.info("Aggregates in cluster: %d", aggr_resp.get("num_records", len(aggr_records)))

for aggr in aggr_records:
block = aggr.get("space", {}).get("block_storage", {})
total = block.get("size", 0)
used = block.get("used", 0)
used_pct = (used / total * 100) if total else 0
logger.info(
" %-30s state: %-10s used: %.1f%%",
aggr.get("name", "—"),
aggr.get("state", "—"),
used_pct,
)


if __name__ == "__main__":
try:
Expand Down
Loading
Loading