PICurv 0.1.0
A Parallel Particle-In-Cell Solver for Curvilinear LES
Loading...
Searching...
No Matches
pic.flow
Go to the documentation of this file.
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4"""!
5@file pic.flow
6@brief A comprehensive conductor script for the PIC-Flow simulation platform.
7
8This script acts as the central user interface for running simulations,
9managing configurations, and orchestrating the entire end-to-end workflow.
10It translates user-friendly YAML files into C-solver compatible control files,
11supports full multi-block configurations, and provides live log streaming.
12It features intelligent, content-based config file discovery and robustly
13manages data I/O paths for the post-processor. It also supports Slurm job
14generation/submission and parameter sweeps via job arrays.
15"""
16
17import yaml
18import sys
19import os
20import argparse
21import subprocess
22import shutil
23import glob
24import csv
25import json
26import itertools
27import re
28import shlex
29import copy
30import numpy as np
31from datetime import datetime
32import time
33
34try:
35 import matplotlib.pyplot as plt
36except ImportError:
37 plt = None
38
39# --- Global Path Definitions ---
40SCRIPT_PATH = os.path.dirname(os.path.realpath(__file__))
41BIN_DIR = SCRIPT_PATH
42PROJECT_ROOT = os.path.dirname(BIN_DIR)
43
44# Standardized error codes used for CLI/validation reporting.
45ERROR_CODE_CLI_USAGE_INVALID = "CLI_USAGE_INVALID"
46ERROR_CODE_CFG_MISSING_SECTION = "CFG_MISSING_SECTION"
47ERROR_CODE_CFG_MISSING_KEY = "CFG_MISSING_KEY"
48ERROR_CODE_CFG_INVALID_TYPE = "CFG_INVALID_TYPE"
49ERROR_CODE_CFG_INVALID_VALUE = "CFG_INVALID_VALUE"
50ERROR_CODE_CFG_FILE_NOT_FOUND = "CFG_FILE_NOT_FOUND"
51ERROR_CODE_CFG_GRID_PARSE = "CFG_GRID_PARSE"
52ERROR_CODE_CFG_INCONSISTENT_COMBO = "CFG_INCONSISTENT_COMBO"
53
54_ERROR_HINTS = {
55 ERROR_CODE_CLI_USAGE_INVALID: "Run 'pic.flow <command> --help' to see valid argument combinations.",
56 ERROR_CODE_CFG_MISSING_SECTION: "Add the missing section using examples/master_template/*.yml as reference.",
57 ERROR_CODE_CFG_MISSING_KEY: "Add the missing key in the referenced YAML file.",
58 ERROR_CODE_CFG_INVALID_TYPE: "Fix the value type to match the documented schema in docs/pages/14_Config_Contract.md.",
59 ERROR_CODE_CFG_INVALID_VALUE: "Adjust the value to a supported range/enum from the config reference pages.",
60 ERROR_CODE_CFG_FILE_NOT_FOUND: "Fix the path or create the missing file before running again.",
61 ERROR_CODE_CFG_GRID_PARSE: "Validate grid file format and numeric payload (block count, dims, coordinates).",
62 ERROR_CODE_CFG_INCONSISTENT_COMBO: "Fix conflicting options/keys so the configuration is internally consistent.",
63}
64
65
66def _sanitize_error_field(value) -> str:
67 """Normalize error fields into a single-line string."""
68 if value is None:
69 return "-"
70 text = str(value).strip()
71 if not text:
72 return "-"
73 return " ".join(text.splitlines())
74
75
76def emit_structured_error(code: str, key: str = "-", file_path: str = "-",
77 message: str = "", hint: str = None, stream=None):
78 """Emit one standardized error line for tooling and users."""
79 if stream is None:
80 stream = sys.stderr
81 resolved_hint = hint if hint is not None else _ERROR_HINTS.get(code, "-")
82 print(
83 f"ERROR {_sanitize_error_field(code)} | "
84 f"key={_sanitize_error_field(key)} | "
85 f"file={_sanitize_error_field(file_path)} | "
86 f"message={_sanitize_error_field(message)} | "
87 f"hint={_sanitize_error_field(resolved_hint)}",
88 file=stream,
89 )
90
91
92def fail_cli_usage(message: str, hint: str = None):
93 """Emit a structured CLI usage error and exit with code 2."""
94 emit_structured_error(
95 ERROR_CODE_CLI_USAGE_INVALID,
96 key="-",
97 file_path="-",
98 message=message,
99 hint=hint or _ERROR_HINTS[ERROR_CODE_CLI_USAGE_INVALID],
100 )
101 sys.exit(2)
102
103
104def _split_error_file_and_message(raw_error: str):
105 """Split '<file>: <message>' style validation strings when possible."""
106 text = str(raw_error).strip()
107 match = re.match(r"^(?P<file>[^:]+):\s*(?P<msg>.+)$", text)
108 if not match:
109 return "-", text
110 file_candidate = match.group("file").strip()
111 msg = match.group("msg").strip()
112 known_suffixes = (".yml", ".yaml", ".cfg", ".picgrid", ".control", ".run", ".txt")
113 if "/" in file_candidate or file_candidate.endswith(known_suffixes):
114 return file_candidate, msg
115 return "-", text
116
117
118def _extract_key_path(message: str) -> str:
119 """Best-effort key-path extraction from free-form validation messages."""
120 dotted = re.search(r"\b([A-Za-z_][A-Za-z0-9_]*(?:\.[A-Za-z0-9_\[\]-]+)+)\b", message)
121 if dotted:
122 return dotted.group(1)
123
124 bracketed = re.search(r"\b([A-Za-z_][A-Za-z0-9_]*\[[^\]]+\](?:\[[^\]]+\])*)\b", message)
125 if bracketed:
126 return bracketed.group(1)
127
128 quoted = re.findall(r"'([A-Za-z0-9_.\[\]-]+)'", message)
129 for token in quoted:
130 if "." in token or "[" in token or token.isidentifier():
131 return token
132 return "-"
133
134
135def _classify_error_code(message: str) -> str:
136 """Map existing validation/error messages to the standardized code set."""
137 msg = message.lower()
138 if "missing required section" in msg:
139 return ERROR_CODE_CFG_MISSING_SECTION
140 if "missing required key" in msg or "missing key" in msg:
141 return ERROR_CODE_CFG_MISSING_KEY
142 if "not found" in msg or "does not exist" in msg:
143 return ERROR_CODE_CFG_FILE_NOT_FOUND
144 if "invalid dimensions line" in msg or "invalid coordinate row" in msg or "grid file" in msg:
145 return ERROR_CODE_CFG_GRID_PARSE
146 if (
147 "must both be periodic" in msg
148 or "inconsistent periodicity" in msg
149 or "mismatch" in msg
150 or "requires --" in msg
151 or "must be 1 (auto) or exactly" in msg
152 ):
153 return ERROR_CODE_CFG_INCONSISTENT_COMBO
154 if (
155 "must be a mapping" in msg
156 or "must be a list" in msg
157 or "must be a string" in msg
158 or "must be a boolean" in msg
159 or "must be either" in msg
160 ):
161 return ERROR_CODE_CFG_INVALID_TYPE
162 return ERROR_CODE_CFG_INVALID_VALUE
163
164# ==============================================================================
165# HELPER FUNCTIONS
166# ==============================================================================
167
168def read_yaml_file(filepath: str) -> dict:
169 """!
170 @brief Safely reads a YAML file and returns its content.
171 @param[in] filepath Path to the YAML file.
172 @return A dictionary containing the parsed YAML content.
173 @throws SystemExit if the file is not found or cannot be parsed.
174 """
175 if not os.path.exists(filepath):
176 emit_structured_error(
177 ERROR_CODE_CFG_FILE_NOT_FOUND,
178 key="-",
179 file_path=filepath,
180 message="Configuration file not found.",
181 )
182 sys.exit(1)
183 try:
184 with open(filepath, 'r') as f:
185 return yaml.safe_load(f)
186 except yaml.YAMLError as e:
187 emit_structured_error(
188 ERROR_CODE_CFG_INVALID_VALUE,
189 key="-",
190 file_path=filepath,
191 message=f"YAML parse error: {e}",
192 hint="Fix YAML syntax/indentation and retry validation.",
193 )
194 sys.exit(1)
195
196def write_yaml_file(filepath: str, data: dict):
197 """Write YAML with stable ordering for generated study artifacts."""
198 os.makedirs(os.path.dirname(filepath), exist_ok=True)
199 with open(filepath, "w") as f:
200 yaml.safe_dump(data, f, sort_keys=False)
201
202def write_json_file(filepath: str, payload: dict):
203 """Write JSON metadata/manifests with a stable, readable format."""
204 os.makedirs(os.path.dirname(filepath), exist_ok=True)
205 with open(filepath, "w") as f:
206 json.dump(payload, f, indent=2, sort_keys=True)
207 f.write("\n")
208
209def resolve_path(anchor_file: str, candidate: str) -> str:
210 """Resolve a potentially relative path against a source YAML file path."""
211 if candidate is None:
212 return None
213 if os.path.isabs(candidate):
214 return os.path.abspath(candidate)
215 return os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(anchor_file)), candidate))
216
217def absolutize_case_external_paths(case_cfg: dict, case_anchor_path: str):
218 """Convert external grid/generator paths in case config to absolute paths."""
219 grid_cfg = case_cfg.get("grid", {})
220 if not isinstance(grid_cfg, dict):
221 return
222 mode = grid_cfg.get("mode")
223 if mode == "file":
224 source_file = grid_cfg.get("source_file")
225 if isinstance(source_file, str):
226 grid_cfg["source_file"] = resolve_path(case_anchor_path, source_file)
227 elif mode == "grid_gen":
228 gen = grid_cfg.get("generator", {})
229 if isinstance(gen, dict):
230 for key in ("script", "config_file"):
231 val = gen.get(key)
232 if isinstance(val, str):
233 gen[key] = resolve_path(case_anchor_path, val)
234
235def get_git_commit() -> str:
236 """Best-effort git commit lookup for run/study manifests."""
237 try:
238 result = subprocess.run(
239 ["git", "rev-parse", "HEAD"],
240 cwd=PROJECT_ROOT,
241 text=True,
242 capture_output=True,
243 check=False
244 )
245 if result.returncode == 0:
246 return result.stdout.strip()
247 except Exception:
248 pass
249 return None
250
251def is_valid_email(email: str) -> bool:
252 """Lightweight email validation for scheduler notifications."""
253 if not isinstance(email, str):
254 return False
255 pattern = r"^[^@\s]+@[^@\s]+\.[^@\s]+$"
256 return re.match(pattern, email.strip()) is not None
257
258def normalize_statistics_task(task_name: str) -> str:
259 """!
260 @brief Normalizes user-facing statistics task names to C pipeline keywords.
261 @param[in] task_name Task name from YAML.
262 @return Canonical keyword accepted by C statistics pipeline.
263 @throws ValueError if task is unsupported.
264 """
265 if task_name is None:
266 raise ValueError("statistics task cannot be None")
267 raw = str(task_name).strip()
268 if raw == "ComputeMSD":
269 return raw
270 normalized = raw.lower().replace("-", "_").replace(" ", "_")
271 aliases = {
272 "msd": "ComputeMSD",
273 "compute_msd": "ComputeMSD",
274 "computemsd": "ComputeMSD",
275 }
276 mapped = aliases.get(normalized)
277 if mapped is None:
278 raise ValueError(f"Unsupported statistics task '{task_name}'. Currently supported: 'msd'.")
279 return mapped
280
281def _iter_nonempty_noncomment_lines(file_obj):
282 """Yield (lineno, stripped_line) for non-empty, non-comment lines."""
283 for lineno, raw in enumerate(file_obj, start=1):
284 line = raw.strip()
285 if not line or line.startswith("#"):
286 continue
287 yield lineno, line
288
289def validate_and_nondimensionalize_picgrid(source_grid: str, dest_grid: str, L_ref: float, expected_nblk: int = None) -> dict:
290 """!
291 @brief Validates PICGRID payload and writes a non-dimensionalized copy.
292 @details Accepts files with or without leading "PICGRID" token. Output is always
293 written in canonical PICGRID format with header and per-block dims.
294 @param[in] source_grid Input grid file path.
295 @param[in] dest_grid Output grid file path.
296 @param[in] L_ref Reference length for non-dimensionalization.
297 @param[in] expected_nblk Optional expected block count.
298 @return Summary dictionary with nblk, dims, and total_nodes.
299 @throws ValueError on malformed grid.
300 """
301 if L_ref == 0.0:
302 raise ValueError("length_ref must be non-zero when processing grid coordinates.")
303 if not os.path.isfile(source_grid):
304 raise ValueError(f"Grid file not found: {source_grid}")
305
306 with open(source_grid, "r") as fin:
307 line_iter = _iter_nonempty_noncomment_lines(fin)
308 try:
309 _, first_token = next(line_iter)
310 except StopIteration:
311 raise ValueError(f"Grid file '{source_grid}' is empty.")
312
313 if first_token == "PICGRID":
314 try:
315 _, nblk_line = next(line_iter)
316 except StopIteration:
317 raise ValueError(f"Grid file '{source_grid}' missing block count after PICGRID header.")
318 else:
319 nblk_line = first_token
320
321 try:
322 nblk = int(nblk_line)
323 except ValueError:
324 raise ValueError(f"Invalid block count '{nblk_line}' in grid file '{source_grid}'.")
325 if nblk <= 0:
326 raise ValueError(f"Grid file '{source_grid}' has non-positive block count: {nblk}.")
327 if expected_nblk is not None and nblk != expected_nblk:
328 raise ValueError(
329 f"Grid file block count mismatch: case expects {expected_nblk}, grid contains {nblk}."
330 )
331
332 dims = []
333 for bi in range(nblk):
334 try:
335 lineno, dim_line = next(line_iter)
336 except StopIteration:
337 raise ValueError(f"Grid file '{source_grid}' missing dimensions for block {bi}.")
338 parts = dim_line.split()
339 if len(parts) != 3:
340 raise ValueError(
341 f"Invalid dimensions line at {source_grid}:{lineno}. Expected 3 integers, got: '{dim_line}'."
342 )
343 try:
344 im, jm, km = (int(parts[0]), int(parts[1]), int(parts[2]))
345 except ValueError:
346 raise ValueError(
347 f"Invalid dimensions line at {source_grid}:{lineno}. Non-integer values: '{dim_line}'."
348 )
349 if im <= 0 or jm <= 0 or km <= 0:
350 raise ValueError(
351 f"Invalid block dimensions at {source_grid}:{lineno}: ({im}, {jm}, {km}). Must be > 0."
352 )
353 dims.append((im, jm, km))
354
355 total_nodes_expected = sum(im * jm * km for (im, jm, km) in dims)
356 os.makedirs(os.path.dirname(dest_grid), exist_ok=True)
357 with open(dest_grid, "w") as fout:
358 fout.write("PICGRID\n")
359 fout.write(f"{nblk}\n")
360 for (im, jm, km) in dims:
361 fout.write(f"{im} {jm} {km}\n")
362
363 total_nodes_seen = 0
364 for lineno, coord_line in line_iter:
365 parts = coord_line.split()
366 if len(parts) != 3:
367 raise ValueError(
368 f"Invalid coordinate row at {source_grid}:{lineno}. Expected 3 floats, got: '{coord_line}'."
369 )
370 try:
371 x = float(parts[0]) / L_ref
372 y = float(parts[1]) / L_ref
373 z = float(parts[2]) / L_ref
374 except ValueError:
375 raise ValueError(
376 f"Invalid coordinate row at {source_grid}:{lineno}. Non-numeric values: '{coord_line}'."
377 )
378 total_nodes_seen += 1
379 if total_nodes_seen > total_nodes_expected:
380 raise ValueError(
381 f"Grid file '{source_grid}' has more coordinates ({total_nodes_seen}) than expected ({total_nodes_expected})."
382 )
383 fout.write(f"{x:.8e} {y:.8e} {z:.8e}\n")
384
385 if total_nodes_seen != total_nodes_expected:
386 raise ValueError(
387 f"Grid file '{source_grid}' has {total_nodes_seen} coordinates, expected {total_nodes_expected} from header."
388 )
389
390 return {"nblk": nblk, "dims": dims, "total_nodes": total_nodes_expected}
391
392def run_grid_generator(case_path: str, run_dir: str, grid_cfg: dict) -> str:
393 """!
394 @brief Runs scripts/grid.gen to produce a PICGRID file for this run.
395 @param[in] case_path Path to case.yml (used for relative path resolution).
396 @param[in] run_dir Run directory path.
397 @param[in] grid_cfg The grid config section from case.yml.
398 @return Absolute path to generated dimensional PICGRID file.
399 @throws ValueError on invalid config or generator failure.
400 """
401 generator = grid_cfg.get("generator", {})
402 if not isinstance(generator, dict):
403 raise ValueError("grid.generator must be a mapping when grid.mode is 'grid_gen'.")
404
405 case_dir = os.path.dirname(os.path.abspath(case_path))
406 gridgen_script = generator.get("script", os.path.join(SCRIPT_PATH, "grid.gen"))
407 if not os.path.isabs(gridgen_script):
408 gridgen_script = os.path.abspath(os.path.join(case_dir, gridgen_script))
409 if not os.path.isfile(gridgen_script):
410 raise ValueError(f"grid.gen script not found: {gridgen_script}")
411
412 config_file = generator.get("config_file")
413 if not config_file:
414 raise ValueError("grid.generator.config_file is required when grid.mode is 'grid_gen'.")
415 if not os.path.isabs(config_file):
416 config_file = os.path.abspath(os.path.join(case_dir, config_file))
417 if not os.path.isfile(config_file):
418 raise ValueError(f"grid.generator.config_file not found: {config_file}")
419
420 output_file = generator.get("output_file", os.path.join("config", "grid.generated.picgrid"))
421 if not os.path.isabs(output_file):
422 output_file = os.path.abspath(os.path.join(run_dir, output_file))
423 os.makedirs(os.path.dirname(output_file), exist_ok=True)
424
425 grid_type = generator.get("grid_type")
426 cli_args = generator.get("cli_args", [])
427 if cli_args is None:
428 cli_args = []
429 if not isinstance(cli_args, list):
430 raise ValueError("grid.generator.cli_args must be a list of CLI tokens.")
431
432 cmd = [sys.executable, gridgen_script, "-c", config_file]
433 if grid_type:
434 cmd.append(str(grid_type))
435 cmd.extend([str(token) for token in cli_args])
436 cmd.extend(["--output", output_file])
437
438 vts_file = generator.get("vts_file")
439 if vts_file:
440 if not os.path.isabs(vts_file):
441 vts_file = os.path.abspath(os.path.join(run_dir, vts_file))
442 os.makedirs(os.path.dirname(vts_file), exist_ok=True)
443 cmd.extend(["--vts", vts_file])
444
445 stats_file = generator.get("stats_file")
446 if stats_file:
447 if not os.path.isabs(stats_file):
448 stats_file = os.path.abspath(os.path.join(run_dir, stats_file))
449 os.makedirs(os.path.dirname(stats_file), exist_ok=True)
450 cmd.extend(["--stats-file", stats_file])
451
452 print(f"[INFO] Grid generator command: {' '.join(cmd)}")
453 result = subprocess.run(cmd, cwd=case_dir, text=True, capture_output=True)
454 if result.returncode != 0:
455 stderr = (result.stderr or "").strip()
456 stdout = (result.stdout or "").strip()
457 details = stderr if stderr else stdout
458 raise ValueError(
459 f"grid.gen failed with exit code {result.returncode}. Details:\n{details}"
460 )
461 if result.stdout:
462 print(result.stdout.strip())
463 if result.stderr:
464 print(result.stderr.strip())
465
466 if not os.path.isfile(output_file):
467 raise ValueError(f"grid.gen did not produce expected output file: {output_file}")
468
469 return output_file
470
471BC_FACE_MAP = {
472 "-xi": "-Xi",
473 "+xi": "+Xi",
474 "-eta": "-Eta",
475 "+eta": "+Eta",
476 "-zeta": "-Zeta",
477 "+zeta": "+Zeta",
478}
479
480BC_TYPE_MAP = {
481 "wall": "WALL",
482 "symmetry": "SYMMETRY",
483 "inlet": "INLET",
484 "outlet": "OUTLET",
485 "periodic": "PERIODIC",
486}
487
488BC_HANDLER_SPECS = {
489 # Only handlers that are implemented end-to-end in current C path are allowed.
490 "noslip": {
491 "types": {"WALL"},
492 "required_params": set(),
493 "optional_params": set(),
494 },
495 "constant_velocity": {
496 "types": {"INLET"},
497 "required_params": {"vx", "vy", "vz"},
498 "optional_params": set(),
499 },
500 "conservation": {
501 "types": {"OUTLET"},
502 "required_params": set(),
503 "optional_params": set(),
504 },
505 "parabolic": {
506 "types": {"INLET"},
507 "required_params": {"v_max"},
508 "optional_params": set(),
509 },
510 "geometric": {
511 "types": {"PERIODIC"},
512 "required_params": set(),
513 "optional_params": set(),
514 },
515 "constant_flux": {
516 "types": {"PERIODIC"},
517 "required_params": {"target_flux"},
518 "optional_params": {"apply_trim"},
519 },
520}
521
522_NUMERIC_BC_PARAMS = {"vx", "vy", "vz", "v_max", "target_flux"}
523_BOOL_BC_PARAMS = {"apply_trim"}
524
525def _to_float(value, field_name: str) -> float:
526 """Convert a YAML scalar to float with a clear error message."""
527 try:
528 return float(value)
529 except (TypeError, ValueError):
530 raise ValueError(f"'{field_name}' must be numeric (got {value!r}).")
531
532def _to_bool(value, field_name: str) -> bool:
533 """Convert a YAML scalar/string to bool with a clear error message."""
534 if isinstance(value, bool):
535 return value
536 if isinstance(value, str):
537 raw = value.strip().lower()
538 if raw in {"true", "1", "yes"}:
539 return True
540 if raw in {"false", "0", "no"}:
541 return False
542 raise ValueError(f"'{field_name}' must be boolean (got {value!r}).")
543
544def normalize_boundary_conditions_layout(all_blocks_bcs, num_blocks: int):
545 """
546 Normalize boundary_conditions to list-of-lists form and validate block count.
547 """
548 if not all_blocks_bcs:
549 raise ValueError("The 'boundary_conditions' section in case.yml is empty.")
550
551 is_simple_list = isinstance(all_blocks_bcs[0], dict)
552 if num_blocks == 1 and is_simple_list:
553 all_blocks_bcs = [all_blocks_bcs]
554 elif is_simple_list and num_blocks > 1:
555 raise ValueError(
556 f"case.yml declares {num_blocks} blocks but boundary_conditions is a single face-list. "
557 "Use a list-of-lists, one inner list per block."
558 )
559
560 if len(all_blocks_bcs) != num_blocks:
561 raise ValueError(
562 f"Mismatch: case.yml declares {num_blocks} block(s) but found {len(all_blocks_bcs)} BC definitions."
563 )
564 return all_blocks_bcs
565
566def validate_and_prepare_boundary_conditions(case_cfg: dict):
567 """
568 Validate BC entries against currently supported C-side handlers/types and
569 return normalized entries ready for bcs.run generation.
570 """
571 num_blocks = int(case_cfg.get('models', {}).get('domain', {}).get('blocks', 1))
572 scales = case_cfg.get('properties', {}).get('scaling', {})
573 L_ref = _to_float(scales.get('length_ref'), "properties.scaling.length_ref")
574 U_ref = _to_float(scales.get('velocity_ref'), "properties.scaling.velocity_ref")
575 if U_ref == 0.0:
576 raise ValueError("properties.scaling.velocity_ref must be non-zero for non-dimensionalization.")
577 if L_ref == 0.0:
578 raise ValueError("properties.scaling.length_ref must be non-zero for non-dimensionalization.")
579
580 all_blocks_bcs = normalize_boundary_conditions_layout(case_cfg.get('boundary_conditions', []), num_blocks)
581 prepared_blocks = []
582
583 expected_faces = {"-Xi", "+Xi", "-Eta", "+Eta", "-Zeta", "+Zeta"}
584 axis_pairs = [("-Xi", "+Xi"), ("-Eta", "+Eta"), ("-Zeta", "+Zeta")]
585
586 for bi, block_bcs in enumerate(all_blocks_bcs):
587 if not isinstance(block_bcs, list):
588 raise ValueError(f"boundary_conditions[{bi}] must be a list of face configs.")
589
590 prepared_block = []
591 seen_faces = {}
592
593 for idx, bc in enumerate(block_bcs):
594 if not isinstance(bc, dict):
595 raise ValueError(f"boundary_conditions[{bi}][{idx}] must be a mapping.")
596
597 for req in ("face", "type", "handler"):
598 if req not in bc:
599 raise ValueError(f"boundary_conditions[{bi}][{idx}] missing required key '{req}'.")
600
601 face_raw = str(bc["face"]).strip()
602 face_key = face_raw.lower()
603 face = BC_FACE_MAP.get(face_key)
604 if face is None:
605 raise ValueError(
606 f"Unsupported BC face '{face_raw}' at boundary_conditions[{bi}][{idx}]. "
607 f"Supported: {sorted(expected_faces)}."
608 )
609 if face in seen_faces:
610 raise ValueError(f"Duplicate face '{face}' in boundary_conditions[{bi}] (entries {seen_faces[face]} and {idx}).")
611 seen_faces[face] = idx
612
613 bc_type_raw = str(bc["type"]).strip()
614 bc_type = BC_TYPE_MAP.get(bc_type_raw.lower())
615 if bc_type is None:
616 raise ValueError(
617 f"Unsupported BC type '{bc_type_raw}' for face {face} in block {bi}. "
618 f"Supported: {sorted(set(BC_TYPE_MAP.values()))}."
619 )
620
621 handler = str(bc["handler"]).strip().lower()
622 handler_spec = BC_HANDLER_SPECS.get(handler)
623 if handler_spec is None:
624 raise ValueError(
625 f"Unsupported BC handler '{bc['handler']}' for face {face} in block {bi}. "
626 f"Supported now: {sorted(BC_HANDLER_SPECS.keys())}."
627 )
628 if bc_type not in handler_spec["types"]:
629 raise ValueError(
630 f"Invalid BC combination on block {bi}, face {face}: type '{bc_type}' cannot use handler '{handler}'."
631 )
632
633 params = bc.get("params", {})
634 if params is None:
635 params = {}
636 if not isinstance(params, dict):
637 raise ValueError(f"'params' for block {bi}, face {face} must be a mapping.")
638
639 # Reject legacy structured keys explicitly.
640 if "vector" in params or "velocity" in params:
641 raise ValueError(
642 f"Legacy params key ('vector'/'velocity') found on block {bi}, face {face}. "
643 "Use scalar keys 'vx', 'vy', 'vz'."
644 )
645
646 required = handler_spec["required_params"]
647 optional = handler_spec["optional_params"]
648 allowed = required | optional
649
650 missing = sorted(required - set(params.keys()))
651 if missing:
652 raise ValueError(
653 f"Missing required params for handler '{handler}' on block {bi}, face {face}: {missing}."
654 )
655 unknown = sorted(set(params.keys()) - allowed)
656 if unknown:
657 raise ValueError(
658 f"Unknown params for handler '{handler}' on block {bi}, face {face}: {unknown}. "
659 f"Allowed: {sorted(allowed)}."
660 )
661
662 converted_params = {}
663 for key, value in params.items():
664 if key in _NUMERIC_BC_PARAMS:
665 numeric = _to_float(value, f"boundary_conditions[{bi}][{idx}].params.{key}")
666 if key in {"vx", "vy", "vz", "v_max"}:
667 converted_params[key] = numeric / U_ref
668 elif key == "target_flux":
669 converted_params[key] = numeric / (U_ref * (L_ref ** 2))
670 elif key in _BOOL_BC_PARAMS:
671 converted_params[key] = _to_bool(value, f"boundary_conditions[{bi}][{idx}].params.{key}")
672 else:
673 # Defensive fallback; should not happen due unknown-key gate above.
674 converted_params[key] = value
675
676 prepared_block.append({
677 "face": face,
678 "type": bc_type,
679 "handler": handler,
680 "params": converted_params,
681 })
682
683 missing_faces = sorted(expected_faces - set(seen_faces.keys()))
684 if missing_faces:
685 raise ValueError(
686 f"boundary_conditions[{bi}] is incomplete. Missing faces: {missing_faces}. "
687 "Provide all six faces explicitly."
688 )
689
690 # Pairwise periodic consistency checks.
691 face_map = {entry["face"]: entry for entry in prepared_block}
692 for neg_face, pos_face in axis_pairs:
693 neg = face_map[neg_face]
694 pos = face_map[pos_face]
695 neg_periodic = (neg["type"] == "PERIODIC")
696 pos_periodic = (pos["type"] == "PERIODIC")
697 if neg_periodic != pos_periodic:
698 raise ValueError(
699 f"Inconsistent periodicity in block {bi}: {neg_face} and {pos_face} must both be PERIODIC or neither."
700 )
701
702 driven_handlers = {"constant_flux"}
703 if (neg["handler"] in driven_handlers) or (pos["handler"] in driven_handlers):
704 if neg["handler"] != pos["handler"]:
705 raise ValueError(
706 f"In block {bi}, driven periodic handlers on {neg_face}/{pos_face} must match exactly."
707 )
708 if not (neg_periodic and pos_periodic):
709 raise ValueError(
710 f"In block {bi}, driven periodic handler '{neg['handler']}' requires PERIODIC type on both faces."
711 )
712
713 prepared_blocks.append(prepared_block)
714
715 return prepared_blocks
716
717def validate_solver_configs(case_cfg: dict, solver_cfg: dict, monitor_cfg: dict,
718 case_path: str, solver_path: str, monitor_path: str):
719 """!
720 @brief Validates all solver input configs before any work is done.
721 @details Checks for required sections, required keys, and physical sanity.
722 Exits with a clear error message on the first problem found.
723 @param[in] case_cfg Parsed case YAML dictionary.
724 @param[in] solver_cfg Parsed solver YAML dictionary.
725 @param[in] monitor_cfg Parsed monitor YAML dictionary.
726 @param[in] case_path Path to case file (for error messages).
727 @param[in] solver_path Path to solver file (for error messages).
728 @param[in] monitor_path Path to monitor file (for error messages).
729 @throws SystemExit on validation failure.
730 """
731 errors = []
732
733 # --- case.yml: required top-level sections ---
734 required_case_sections = ['properties', 'run_control', 'grid', 'models', 'boundary_conditions']
735 for section in required_case_sections:
736 if section not in case_cfg:
737 errors.append(f" {case_path}: missing required section '{section}'.")
738
739 if errors:
740 _print_validation_errors(errors)
741
742 # --- case.yml: properties sub-keys ---
743 props = case_cfg.get('properties', {})
744 for group, keys in [('scaling', ['length_ref', 'velocity_ref']),
745 ('fluid', ['density', 'viscosity']),
746 ('initial_conditions', ['u_physical', 'v_physical', 'w_physical'])]:
747 sub = props.get(group, {})
748 if not sub:
749 errors.append(f" {case_path}: missing 'properties.{group}' section.")
750 else:
751 for k in keys:
752 if k not in sub:
753 errors.append(f" {case_path}: missing key 'properties.{group}.{k}'.")
754 if group == 'initial_conditions' and 'mode' in sub:
755 try:
756 normalize_field_init_mode(sub.get('mode'))
757 except ValueError as e:
758 errors.append(f" {case_path}: {e}")
759
760 # --- case.yml: run_control sub-keys ---
761 rc = case_cfg.get('run_control', {})
762 for k in ['start_step', 'total_steps', 'dt_physical']:
763 if k not in rc:
764 errors.append(f" {case_path}: missing key 'run_control.{k}'.")
765
766 # --- Physical sanity checks ---
767 try:
768 density = float(props.get('fluid', {}).get('density', 0))
769 viscosity = float(props.get('fluid', {}).get('viscosity', 0))
770 dt = float(rc.get('dt_physical', 0))
771 if density <= 0:
772 errors.append(f" {case_path}: 'properties.fluid.density' must be positive (got {density}).")
773 if viscosity < 0:
774 errors.append(f" {case_path}: 'properties.fluid.viscosity' must be non-negative (got {viscosity}).")
775 if dt <= 0:
776 errors.append(f" {case_path}: 'run_control.dt_physical' must be positive (got {dt}).")
777 except (TypeError, ValueError):
778 pass # Will be caught later during processing
779
780 # --- case.yml: grid mode ---
781 grid_cfg = case_cfg.get('grid', {})
782 grid_mode = grid_cfg.get('mode')
783 valid_grid_modes = ['file', 'programmatic_c', 'grid_gen']
784 if grid_mode not in valid_grid_modes:
785 errors.append(f" {case_path}: 'grid.mode' must be one of {valid_grid_modes} (got '{grid_mode}').")
786 elif grid_mode == 'file':
787 source_file = grid_cfg.get('source_file')
788 if not source_file:
789 errors.append(f" {case_path}: 'grid.source_file' is required when grid.mode is 'file'.")
790 else:
791 source_abs = source_file if os.path.isabs(source_file) else os.path.abspath(os.path.join(os.path.dirname(case_path), source_file))
792 if not os.path.isfile(source_abs):
793 errors.append(f" {case_path}: grid.source_file does not exist: {source_abs}")
794 elif grid_mode == 'programmatic_c':
795 grid_settings = grid_cfg.get('programmatic_settings')
796 if not grid_settings:
797 errors.append(f" {case_path}: 'grid.programmatic_settings' is required when grid.mode is 'programmatic_c'.")
798 elif not isinstance(grid_settings, dict):
799 errors.append(f" {case_path}: 'grid.programmatic_settings' must be a mapping.")
800 else:
801 for p_key in ('da_processors_x', 'da_processors_y', 'da_processors_z'):
802 p_val = grid_settings.get(p_key)
803 if isinstance(p_val, (list, tuple)):
804 errors.append(
805 f" {case_path}: grid.programmatic_settings.{p_key} must be a scalar integer. "
806 "Per-block MPI decomposition is not implemented on the C side; DMDA layout is global."
807 )
808 elif p_val is not None and (not isinstance(p_val, int) or p_val <= 0):
809 errors.append(
810 f" {case_path}: grid.programmatic_settings.{p_key} must be a positive integer when provided (got {p_val})."
811 )
812 elif grid_mode == 'grid_gen':
813 gen_cfg = grid_cfg.get('generator')
814 if not isinstance(gen_cfg, dict):
815 errors.append(f" {case_path}: 'grid.generator' must be a mapping when grid.mode is 'grid_gen'.")
816 else:
817 config_file = gen_cfg.get('config_file')
818 if not config_file:
819 errors.append(f" {case_path}: 'grid.generator.config_file' is required for grid.mode='grid_gen'.")
820 else:
821 config_abs = config_file if os.path.isabs(config_file) else os.path.abspath(os.path.join(os.path.dirname(case_path), config_file))
822 if not os.path.isfile(config_abs):
823 errors.append(f" {case_path}: grid.generator.config_file does not exist: {config_abs}")
824
825 grid_type = gen_cfg.get('grid_type')
826 if grid_type is not None and str(grid_type) not in {'cpipe', 'pipe', 'warp'}:
827 errors.append(f" {case_path}: grid.generator.grid_type must be one of ['cpipe','pipe','warp'] (got '{grid_type}').")
828
829 cli_args = gen_cfg.get('cli_args', [])
830 if cli_args is not None and not isinstance(cli_args, list):
831 errors.append(f" {case_path}: grid.generator.cli_args must be a list of CLI tokens.")
832
833 # --- case.yml: boundary_conditions strict validation ---
834 try:
835 validate_and_prepare_boundary_conditions(case_cfg)
836 except ValueError as e:
837 errors.append(f" {case_path}: {e}")
838
839 # --- case.yml: particle initialization validation ---
840 particles_cfg = case_cfg.get('models', {}).get('physics', {}).get('particles', {})
841 if particles_cfg and not isinstance(particles_cfg, dict):
842 errors.append(f" {case_path}: 'models.physics.particles' must be a mapping.")
843 elif isinstance(particles_cfg, dict):
844 init_mode_raw = particles_cfg.get('init_mode', 'Surface')
845 try:
846 pinit_code = normalize_particle_init_mode(init_mode_raw)
847 except ValueError as e:
848 errors.append(f" {case_path}: {e}")
849 pinit_code = None
850
851 restart_mode = particles_cfg.get('restart_mode')
852 if restart_mode is not None and str(restart_mode).lower() not in {"init", "load"}:
853 errors.append(
854 f" {case_path}: models.physics.particles.restart_mode must be 'init' or 'load' (got '{restart_mode}')."
855 )
856
857 if pinit_code == 2:
858 point_cfg = particles_cfg.get('point_source', {})
859 if not isinstance(point_cfg, dict):
860 errors.append(f" {case_path}: models.physics.particles.point_source must be a mapping when init_mode is PointSource.")
861 else:
862 for coord in ('x', 'y', 'z'):
863 if coord not in point_cfg:
864 errors.append(
865 f" {case_path}: models.physics.particles.point_source.{coord} is required when init_mode is PointSource."
866 )
867
868 # --- solver.yml: basic structure ---
869 if not isinstance(solver_cfg, dict) or not solver_cfg:
870 errors.append(f" {solver_path}: solver config is empty or not a valid YAML mapping.")
871 else:
872 strategy_cfg = solver_cfg.get('strategy', {})
873 if not isinstance(strategy_cfg, dict):
874 errors.append(f" {solver_path}: 'strategy' must be a mapping.")
875 elif 'implicit' in strategy_cfg:
876 errors.append(
877 f" {solver_path}: legacy key 'strategy.implicit' is not supported. "
878 "Use 'strategy.momentum_solver' with named solver values."
879 )
880 if isinstance(strategy_cfg, dict) and 'momentum_solver' in strategy_cfg:
881 try:
882 normalize_momentum_solver_type(strategy_cfg['momentum_solver'])
883 except ValueError as e:
884 errors.append(f" {solver_path}: {e}")
885
886 op_mode_cfg = solver_cfg.get('operation_mode', {})
887 if op_mode_cfg is not None and not isinstance(op_mode_cfg, dict):
888 errors.append(f" {solver_path}: 'operation_mode' must be a mapping when provided.")
889 elif isinstance(op_mode_cfg, dict):
890 analytical_type = op_mode_cfg.get('analytical_type')
891 if analytical_type is not None and not isinstance(analytical_type, str):
892 errors.append(f" {solver_path}: 'operation_mode.analytical_type' must be a string when provided.")
893
894 ms_cfg = solver_cfg.get('momentum_solver', {})
895 if ms_cfg is not None and not isinstance(ms_cfg, dict):
896 errors.append(f" {solver_path}: 'momentum_solver' must be a mapping when provided.")
897 elif isinstance(ms_cfg, dict):
898 legacy_flat_keys = {
899 'max_pseudo_steps', 'absolute_tol', 'relative_tol', 'step_tol',
900 'pseudo_cfl', 'rk4_residual_noise_allowance_factor'
901 }
902 present_legacy = sorted(legacy_flat_keys.intersection(ms_cfg.keys()))
903 if present_legacy:
904 errors.append(
905 f" {solver_path}: legacy flat keys in 'momentum_solver' are not supported: {present_legacy}. "
906 "Use solver-specific sub-blocks (e.g., momentum_solver.dual_time_picard_rk4)."
907 )
908
909 if 'type' in ms_cfg:
910 try:
911 normalize_momentum_solver_type(ms_cfg['type'])
912 except ValueError as e:
913 errors.append(f" {solver_path}: {e}")
914
915 allowed_ms_keys = {'type', 'dual_time_picard_rk4'}
916 unknown_ms_keys = sorted(set(ms_cfg.keys()) - allowed_ms_keys)
917 if unknown_ms_keys:
918 errors.append(
919 f" {solver_path}: unsupported momentum_solver blocks/keys: {unknown_ms_keys}. "
920 "Currently supported: 'dual_time_picard_rk4' (plus optional 'type')."
921 )
922
923 selected_solver = None
924 if isinstance(strategy_cfg, dict) and 'momentum_solver' in strategy_cfg:
925 try:
926 selected_solver = normalize_momentum_solver_type(strategy_cfg['momentum_solver'])
927 except ValueError:
928 pass
929 if selected_solver is None and 'type' in ms_cfg:
930 try:
931 selected_solver = normalize_momentum_solver_type(ms_cfg['type'])
932 except ValueError:
933 pass
934 if selected_solver is None:
935 selected_solver = "DUALTIME_PICARD_RK4"
936
937 if selected_solver != "DUALTIME_PICARD_RK4" and 'dual_time_picard_rk4' in ms_cfg:
938 errors.append(
939 f" {solver_path}: momentum_solver.dual_time_picard_rk4 is set but selected solver is "
940 f"{selected_solver}. Solver-specific blocks must match the selected solver."
941 )
942
943 dt_picard_cfg = ms_cfg.get('dual_time_picard_rk4')
944 if dt_picard_cfg is not None:
945 if not isinstance(dt_picard_cfg, dict):
946 errors.append(f" {solver_path}: momentum_solver.dual_time_picard_rk4 must be a mapping.")
947 else:
948 allowed_dt_keys = {
949 'max_pseudo_steps', 'absolute_tol', 'relative_tol', 'step_tol',
950 'pseudo_cfl', 'rk4_residual_noise_allowance_factor'
951 }
952 unknown_dt_keys = sorted(set(dt_picard_cfg.keys()) - allowed_dt_keys)
953 if unknown_dt_keys:
954 errors.append(
955 f" {solver_path}: unsupported keys in momentum_solver.dual_time_picard_rk4: {unknown_dt_keys}."
956 )
957 if 'pseudo_cfl' in dt_picard_cfg:
958 pcfl_cfg = dt_picard_cfg['pseudo_cfl']
959 if not isinstance(pcfl_cfg, dict):
960 errors.append(f" {solver_path}: momentum_solver.dual_time_picard_rk4.pseudo_cfl must be a mapping.")
961 else:
962 allowed_pcfl_keys = {'initial', 'minimum', 'maximum', 'growth_factor', 'reduction_factor'}
963 unknown_pcfl_keys = sorted(set(pcfl_cfg.keys()) - allowed_pcfl_keys)
964 if unknown_pcfl_keys:
965 errors.append(
966 f" {solver_path}: unsupported keys in momentum_solver.dual_time_picard_rk4.pseudo_cfl: {unknown_pcfl_keys}."
967 )
968
969 # --- monitor.yml: basic structure ---
970 if not isinstance(monitor_cfg, dict) or not monitor_cfg:
971 errors.append(f" {monitor_path}: monitor config is empty or not a valid YAML mapping.")
972 else:
973 io_cfg = monitor_cfg.get('io', {})
974 freq = io_cfg.get('data_output_frequency')
975 if freq is not None and (not isinstance(freq, int) or freq <= 0):
976 errors.append(f" {monitor_path}: 'io.data_output_frequency' must be a positive integer (got {freq}).")
977 solver_mon_cfg = monitor_cfg.get('solver_monitoring')
978 if solver_mon_cfg is not None and not isinstance(solver_mon_cfg, dict):
979 errors.append(f" {monitor_path}: 'solver_monitoring' must be a mapping of <flag>: <value>.")
980
981 if errors:
982 _print_validation_errors(errors)
983
984
985def validate_post_config(post_cfg: dict, post_path: str):
986 """!
987 @brief Validates the post-processing config before running the post-processor.
988 @param[in] post_cfg Parsed post-processing YAML dictionary.
989 @param[in] post_path Path to post file (for error messages).
990 @throws SystemExit on validation failure.
991 """
992 errors = []
993
994 if not isinstance(post_cfg, dict) or not post_cfg:
995 errors.append(f" {post_path}: post-processing config is empty or not a valid YAML mapping.")
996 _print_validation_errors(errors)
997
998 # --- run_control ---
999 if 'run_control' not in post_cfg:
1000 errors.append(f" {post_path}: missing required section 'run_control'.")
1001
1002 # --- io section ---
1003 io_cfg = post_cfg.get('io', {})
1004 if not io_cfg:
1005 errors.append(f" {post_path}: missing required section 'io'.")
1006 else:
1007 for k in ['output_directory', 'output_filename_prefix']:
1008 if k not in io_cfg:
1009 errors.append(f" {post_path}: missing required key 'io.{k}'.")
1010 input_extensions = io_cfg.get('input_extensions')
1011 if input_extensions is not None:
1012 if not isinstance(input_extensions, dict):
1013 errors.append(f" {post_path}: 'io.input_extensions' must be a mapping when provided.")
1014 else:
1015 for ext_key in ('eulerian', 'particle'):
1016 ext_val = input_extensions.get(ext_key)
1017 if ext_val is not None and not isinstance(ext_val, str):
1018 errors.append(f" {post_path}: 'io.input_extensions.{ext_key}' must be a string extension.")
1019
1020 averaged_fields = io_cfg.get('eulerian_fields_averaged')
1021 if averaged_fields is not None and not isinstance(averaged_fields, list):
1022 errors.append(f" {post_path}: 'io.eulerian_fields_averaged' must be a list when provided.")
1023
1024 # --- Check eulerian_pipeline entries have 'task' key ---
1025 for i, entry in enumerate(post_cfg.get('eulerian_pipeline', [])):
1026 if not isinstance(entry, dict) or 'task' not in entry:
1027 errors.append(f" {post_path}: 'eulerian_pipeline[{i}]' is missing the 'task' key. "
1028 "Check YAML indentation (each entry needs '- task: ...' with proper spacing).")
1029
1030 # --- Check statistics pipeline entries ---
1031 stats_cfg = post_cfg.get('statistics_pipeline')
1032 stats_entries = []
1033 if stats_cfg is not None:
1034 if isinstance(stats_cfg, list):
1035 stats_entries = stats_cfg
1036 elif isinstance(stats_cfg, dict):
1037 stats_entries = stats_cfg.get('tasks', [])
1038 if not isinstance(stats_entries, list):
1039 errors.append(f" {post_path}: 'statistics_pipeline.tasks' must be a list.")
1040 stats_output_prefix = stats_cfg.get('output_prefix')
1041 if stats_output_prefix is not None and not isinstance(stats_output_prefix, str):
1042 errors.append(f" {post_path}: 'statistics_pipeline.output_prefix' must be a string.")
1043 else:
1044 errors.append(
1045 f" {post_path}: 'statistics_pipeline' must be either a list of tasks or a mapping with a 'tasks' list."
1046 )
1047 for i, entry in enumerate(stats_entries):
1048 if isinstance(entry, str):
1049 task_name = entry
1050 elif isinstance(entry, dict) and 'task' in entry:
1051 task_name = entry.get('task')
1052 else:
1053 errors.append(
1054 f" {post_path}: statistics task entry {i} must be either a string or a mapping with key 'task'."
1055 )
1056 continue
1057 try:
1058 normalize_statistics_task(task_name)
1059 except ValueError as e:
1060 errors.append(f" {post_path}: {e}")
1061
1062 if errors:
1063 _print_validation_errors(errors)
1064
1065def validate_cluster_config(cluster_cfg: dict, cluster_path: str):
1066 """Validate Slurm scheduler configuration from cluster.yml."""
1067 errors = []
1068 if not isinstance(cluster_cfg, dict) or not cluster_cfg:
1069 errors.append(f" {cluster_path}: cluster config is empty or not a valid YAML mapping.")
1070 _print_validation_errors(errors)
1071
1072 scheduler = cluster_cfg.get("scheduler", {})
1073 if not isinstance(scheduler, dict):
1074 errors.append(f" {cluster_path}: 'scheduler' must be a mapping.")
1075 else:
1076 scheduler_type = scheduler.get("type", "slurm")
1077 if str(scheduler_type).lower() != "slurm":
1078 errors.append(f" {cluster_path}: scheduler.type must be 'slurm' in v1 (got '{scheduler_type}').")
1079
1080 resources = cluster_cfg.get("resources", {})
1081 if not isinstance(resources, dict):
1082 errors.append(f" {cluster_path}: 'resources' must be a mapping.")
1083 else:
1084 for req in ("account", "nodes", "ntasks_per_node", "mem", "time"):
1085 if req not in resources:
1086 errors.append(f" {cluster_path}: missing required key 'resources.{req}'.")
1087 for int_key in ("nodes", "ntasks_per_node"):
1088 if int_key in resources:
1089 val = resources.get(int_key)
1090 if not isinstance(val, int) or val <= 0:
1091 errors.append(f" {cluster_path}: resources.{int_key} must be a positive integer (got {val}).")
1092 for str_key in ("account", "mem", "time", "partition"):
1093 if str_key in resources and resources.get(str_key) is not None:
1094 if not isinstance(resources.get(str_key), str):
1095 errors.append(f" {cluster_path}: resources.{str_key} must be a string when provided.")
1096
1097 notifications = cluster_cfg.get("notifications", {})
1098 if notifications is not None and not isinstance(notifications, dict):
1099 errors.append(f" {cluster_path}: 'notifications' must be a mapping when provided.")
1100 elif isinstance(notifications, dict):
1101 mail_user = notifications.get("mail_user")
1102 if mail_user is not None and not is_valid_email(mail_user):
1103 errors.append(f" {cluster_path}: notifications.mail_user is not a valid email '{mail_user}'.")
1104 mail_type = notifications.get("mail_type")
1105 if mail_type is not None and not isinstance(mail_type, str):
1106 errors.append(f" {cluster_path}: notifications.mail_type must be a string when provided.")
1107
1108 execution = cluster_cfg.get("execution", {})
1109 if execution is not None and not isinstance(execution, dict):
1110 errors.append(f" {cluster_path}: 'execution' must be a mapping when provided.")
1111 elif isinstance(execution, dict):
1112 module_setup = execution.get("module_setup", [])
1113 if module_setup is not None and not isinstance(module_setup, list):
1114 errors.append(f" {cluster_path}: execution.module_setup must be a list of shell lines.")
1115 elif isinstance(module_setup, list):
1116 for i, line in enumerate(module_setup):
1117 if not isinstance(line, str):
1118 errors.append(f" {cluster_path}: execution.module_setup[{i}] must be a string.")
1119
1120 launcher = execution.get("launcher", "srun")
1121 if launcher is not None and not isinstance(launcher, str):
1122 errors.append(f" {cluster_path}: execution.launcher must be a string when provided.")
1123 launcher_args = execution.get("launcher_args", [])
1124 if launcher_args is not None and not isinstance(launcher_args, list):
1125 errors.append(f" {cluster_path}: execution.launcher_args must be a list of CLI tokens.")
1126 elif isinstance(launcher_args, list):
1127 for i, token in enumerate(launcher_args):
1128 if not isinstance(token, (str, int, float)):
1129 errors.append(f" {cluster_path}: execution.launcher_args[{i}] must be a scalar CLI token.")
1130
1131 extra_sbatch = execution.get("extra_sbatch")
1132 if extra_sbatch is not None and not isinstance(extra_sbatch, (dict, list)):
1133 errors.append(f" {cluster_path}: execution.extra_sbatch must be a mapping or list when provided.")
1134
1135 if errors:
1136 _print_validation_errors(errors)
1137
1138def validate_study_config(study_cfg: dict, study_path: str):
1139 """Validate sweep/study specification from study.yml."""
1140 errors = []
1141 if not isinstance(study_cfg, dict) or not study_cfg:
1142 errors.append(f" {study_path}: study config is empty or not a valid YAML mapping.")
1143 _print_validation_errors(errors)
1144
1145 base_cfgs = study_cfg.get("base_configs")
1146 if not isinstance(base_cfgs, dict):
1147 errors.append(f" {study_path}: missing required mapping 'base_configs'.")
1148 else:
1149 for req in ("case", "solver", "monitor", "post"):
1150 path_val = base_cfgs.get(req)
1151 if not path_val or not isinstance(path_val, str):
1152 errors.append(f" {study_path}: base_configs.{req} must be a path string.")
1153 else:
1154 resolved = resolve_path(study_path, path_val)
1155 if not os.path.isfile(resolved):
1156 errors.append(f" {study_path}: base_configs.{req} does not exist: {resolved}")
1157
1158 study_type = study_cfg.get("study_type")
1159 allowed_types = {"grid_independence", "timestep_independence", "sensitivity"}
1160 if study_type not in allowed_types:
1161 errors.append(
1162 f" {study_path}: study_type must be one of {sorted(allowed_types)} (got '{study_type}')."
1163 )
1164
1165 parameters = study_cfg.get("parameters")
1166 if not isinstance(parameters, dict) or not parameters:
1167 errors.append(f" {study_path}: 'parameters' must be a non-empty mapping of key->list.")
1168 else:
1169 allowed_roots = {"case", "solver", "monitor", "post"}
1170 for key, values in parameters.items():
1171 if not isinstance(key, str) or "." not in key:
1172 errors.append(
1173 f" {study_path}: parameter key '{key}' must use '<target>.<yaml.path>' format."
1174 )
1175 continue
1176 root = key.split(".", 1)[0]
1177 if root not in allowed_roots:
1178 errors.append(
1179 f" {study_path}: parameter key '{key}' must start with one of {sorted(allowed_roots)}."
1180 )
1181 if not isinstance(values, list) or len(values) == 0:
1182 errors.append(f" {study_path}: parameters.{key} must be a non-empty list.")
1183
1184 metrics = study_cfg.get("metrics", [])
1185 if metrics is not None and not isinstance(metrics, list):
1186 errors.append(f" {study_path}: 'metrics' must be a list when provided.")
1187 elif isinstance(metrics, list):
1188 for i, metric in enumerate(metrics):
1189 if isinstance(metric, str):
1190 continue
1191 if not isinstance(metric, dict):
1192 errors.append(
1193 f" {study_path}: metrics[{i}] must be a string or mapping."
1194 )
1195 continue
1196 if "name" not in metric:
1197 errors.append(f" {study_path}: metrics[{i}] missing required key 'name'.")
1198 if "source" not in metric:
1199 errors.append(f" {study_path}: metrics[{i}] missing required key 'source'.")
1200
1201 plotting = study_cfg.get("plotting", {})
1202 if plotting is not None and not isinstance(plotting, dict):
1203 errors.append(f" {study_path}: 'plotting' must be a mapping when provided.")
1204 elif isinstance(plotting, dict):
1205 enabled = plotting.get("enabled")
1206 if enabled is not None and not isinstance(enabled, bool):
1207 errors.append(f" {study_path}: plotting.enabled must be boolean when provided.")
1208 output_format = plotting.get("output_format")
1209 if output_format is not None and output_format not in {"png", "pdf", "svg"}:
1210 errors.append(f" {study_path}: plotting.output_format must be one of ['png','pdf','svg'].")
1211
1212 execution = study_cfg.get("execution", {})
1213 if execution is not None and not isinstance(execution, dict):
1214 errors.append(f" {study_path}: 'execution' must be a mapping when provided.")
1215 elif isinstance(execution, dict):
1216 max_conc = execution.get("max_concurrent_array_tasks")
1217 if max_conc is not None and (not isinstance(max_conc, int) or max_conc <= 0):
1218 errors.append(
1219 f" {study_path}: execution.max_concurrent_array_tasks must be a positive integer when provided."
1220 )
1221
1222 if errors:
1223 _print_validation_errors(errors)
1224
1225def _deep_set(container: dict, dotted_path: str, value):
1226 """Set nested dictionary value, creating intermediate maps when needed."""
1227 keys = dotted_path.split(".")
1228 current = container
1229 for key in keys[:-1]:
1230 if key not in current or not isinstance(current[key], dict):
1231 current[key] = {}
1232 current = current[key]
1233 current[keys[-1]] = value
1234
1235def expand_parameter_matrix(parameters: dict) -> list:
1236 """Expand study parameter lists into cartesian-product combinations."""
1237 param_keys = list(parameters.keys())
1238 all_values = [parameters[k] for k in param_keys]
1239 combos = []
1240 for combo in itertools.product(*all_values):
1241 combos.append(dict(zip(param_keys, combo)))
1242 return combos
1243
1244def get_cluster_total_tasks(cluster_cfg: dict) -> int:
1245 resources = cluster_cfg.get("resources", {})
1246 return int(resources.get("nodes", 1)) * int(resources.get("ntasks_per_node", 1))
1247
1248def normalize_extension(ext: str) -> str:
1249 if ext is None:
1250 return None
1251 return str(ext).strip().lstrip(".")
1252
1253def render_slurm_script(
1254 script_path: str,
1255 job_name: str,
1256 cluster_cfg: dict,
1257 command: list,
1258 workdir: str,
1259 stdout_path: str,
1260 stderr_path: str = None,
1261 env_vars: dict = None,
1262 array_spec: str = None
1263):
1264 """Render a Slurm batch script for a single command."""
1265 resources = cluster_cfg.get("resources", {})
1266 notifications = cluster_cfg.get("notifications", {}) or {}
1267 execution = cluster_cfg.get("execution", {}) or {}
1268 extra_sbatch = execution.get("extra_sbatch")
1269 module_setup = execution.get("module_setup", []) or []
1270
1271 if stderr_path is None:
1272 stderr_path = stdout_path.replace(".out", ".err")
1273
1274 lines = [
1275 "#!/bin/bash",
1276 f"#SBATCH --job-name={job_name}",
1277 f"#SBATCH --nodes={resources['nodes']}",
1278 f"#SBATCH --ntasks-per-node={resources['ntasks_per_node']}",
1279 f"#SBATCH --mem={resources['mem']}",
1280 f"#SBATCH --time={resources['time']}",
1281 f"#SBATCH --output={stdout_path}",
1282 f"#SBATCH --error={stderr_path}",
1283 f"#SBATCH --account={resources['account']}",
1284 ]
1285 partition = resources.get("partition")
1286 if partition:
1287 lines.append(f"#SBATCH --partition={partition}")
1288 if array_spec:
1289 lines.append(f"#SBATCH --array={array_spec}")
1290 mail_user = notifications.get("mail_user")
1291 mail_type = notifications.get("mail_type")
1292 if mail_user:
1293 lines.append(f"#SBATCH --mail-user={mail_user}")
1294 if mail_type:
1295 lines.append(f"#SBATCH --mail-type={mail_type}")
1296
1297 if isinstance(extra_sbatch, dict):
1298 for key, value in extra_sbatch.items():
1299 flag = str(key)
1300 if not flag.startswith("--"):
1301 flag = f"--{flag}"
1302 if isinstance(value, bool):
1303 if value:
1304 lines.append(f"#SBATCH {flag}")
1305 elif value is not None:
1306 lines.append(f"#SBATCH {flag}={value}")
1307 elif isinstance(extra_sbatch, list):
1308 for token in extra_sbatch:
1309 lines.append(f"#SBATCH {token}")
1310
1311 lines.extend(
1312 [
1313 "",
1314 "set -euo pipefail",
1315 "",
1316 f"cd {shlex.quote(workdir)}",
1317 'echo "[$(date)] Starting job ${SLURM_JOB_NAME} (${SLURM_JOB_ID})"',
1318 'echo "[$(date)] Working directory: $PWD"',
1319 ]
1320 )
1321
1322 for setup_line in module_setup:
1323 lines.append(str(setup_line))
1324
1325 if env_vars:
1326 for key, value in env_vars.items():
1327 lines.append(f"export {key}={shlex.quote(str(value))}")
1328
1329 cmd = " ".join(shlex.quote(str(tok)) for tok in command)
1330 lines.append(cmd)
1331 lines.append('echo "[$(date)] Job completed."')
1332
1333 os.makedirs(os.path.dirname(script_path), exist_ok=True)
1334 with open(script_path, "w") as f:
1335 f.write("\n".join(lines) + "\n")
1336 os.chmod(script_path, 0o755)
1337
1338def build_cluster_launch_command(cluster_cfg: dict, executable: str, executable_args: list) -> list:
1339 """Build scheduler launcher command (srun/mpirun/custom) from cluster config."""
1340 execution = cluster_cfg.get("execution", {}) or {}
1341 launcher = execution.get("launcher", "srun")
1342 launcher_args = [str(x) for x in execution.get("launcher_args", [])]
1343 ntasks = get_cluster_total_tasks(cluster_cfg)
1344
1345 if launcher and launcher.lower() == "srun":
1346 has_n = any(token in {"-n", "--ntasks"} for token in launcher_args)
1347 cmd = ["srun"] + launcher_args
1348 if not has_n:
1349 cmd += ["-n", str(ntasks)]
1350 return cmd + [executable] + executable_args
1351
1352 if launcher and launcher.lower() == "mpirun":
1353 has_np = any(token in {"-np", "-n"} for token in launcher_args)
1354 cmd = ["mpirun"] + launcher_args
1355 if not has_np:
1356 cmd += ["-np", str(ntasks)]
1357 return cmd + [executable] + executable_args
1358
1359 # Custom launcher or no launcher.
1360 cmd = []
1361 if launcher:
1362 cmd.append(str(launcher))
1363 cmd += launcher_args
1364 cmd += [executable] + executable_args
1365 return cmd
1366
1367def parse_slurm_job_id(sbatch_output: str) -> str:
1368 """Extract numeric job id from standard sbatch output."""
1369 match = re.search(r"Submitted batch job\s+(\d+)", sbatch_output or "")
1370 return match.group(1) if match else None
1371
1372def submit_sbatch(script_path: str, dependency: str = None) -> dict:
1373 """Submit sbatch script and return submission metadata."""
1374 cmd = ["sbatch"]
1375 if dependency:
1376 cmd.append(f"--dependency=afterok:{dependency}")
1377 cmd.append(script_path)
1378 result = subprocess.run(cmd, text=True, capture_output=True, check=False)
1379 metadata = {
1380 "command": cmd,
1381 "returncode": result.returncode,
1382 "stdout": (result.stdout or "").strip(),
1383 "stderr": (result.stderr or "").strip(),
1384 "script": script_path,
1385 }
1386 if result.returncode != 0:
1387 print(f"[FATAL] sbatch submission failed for {script_path}\n{metadata['stderr']}", file=sys.stderr)
1388 sys.exit(result.returncode)
1389 metadata["job_id"] = parse_slurm_job_id(metadata["stdout"])
1390 if not metadata["job_id"]:
1391 print(
1392 f"[FATAL] Could not parse Slurm job id from sbatch output: {metadata['stdout']}",
1393 file=sys.stderr
1394 )
1395 sys.exit(1)
1396 return metadata
1397
1398
1399def _print_validation_errors(errors: list):
1400 """!
1401 @brief Prints validation errors and exits.
1402 @param[in] errors List of error message strings.
1403 """
1404 print(f"\n[FATAL] Configuration validation failed with {len(errors)} issue(s):", file=sys.stderr)
1405 for raw_error in errors:
1406 file_path, message = _split_error_file_and_message(raw_error)
1407 key_path = _extract_key_path(message)
1408 code = _classify_error_code(message)
1409 emit_structured_error(code, key=key_path, file_path=file_path, message=message)
1410 print(
1411 "\nHint: See examples/master_template/ for valid config structure and "
1412 "docs/pages/14_Config_Contract.md for key-level contract details.",
1413 file=sys.stderr,
1414 )
1415 sys.exit(1)
1416
1417
1418def generate_header(run_id: str, source_files: dict) -> str:
1419 """!
1420 @brief Creates a standard header block for all generated files.
1421 @param[in] run_id The unique identifier for the current simulation run.
1422 @param[in] source_files A dictionary of source profile files used.
1423 @return A formatted string containing the header.
1424 """
1425 header_parts = [
1426 "# ==============================================================================",
1427 "# AUTO-GENERATED CONFIGURATION FILE",
1428 "# ------------------------------------------------------------------------------",
1429 f"# Run ID: {run_id}",
1430 f"# Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
1431 "#",
1432 "# Source Configuration:"
1433 ]
1434 for name, path in source_files.items():
1435 header_parts.append(f"# - {name:<12}: {os.path.basename(path)}")
1436 header_parts.extend([
1437 "#",
1438 "# DO NOT EDIT THIS FILE MANUALLY. IT IS A MACHINE-READABLE ARTIFACT.",
1439 "# ==============================================================================\n"
1440 ])
1441 return "\n".join(header_parts)
1442
1443def generate_simple_list_file(run_dir: str, run_id: str, cfg: dict, section: str, key: str, filename: str, header_sources: dict) -> str:
1444 """!
1445 @brief Generic function to create a file containing a simple list of strings.
1446 @param[in] run_dir The path to the main run directory.
1447 @param[in] run_id The unique identifier for the run.
1448 @param[in] cfg The dictionary containing the configuration data.
1449 @param[in] section The top-level key in the cfg dictionary.
1450 @param[in] key The second-level key whose value is the list of strings.
1451 @param[in] filename The name of the file to generate (e.g., 'whitelist.run').
1452 @param[in] header_sources A dictionary of source files for the header.
1453 @return The absolute path to the generated file.
1454 """
1455 print(f"[INFO] Generating {filename}...")
1456 config_dir = os.path.join(run_dir, "config")
1457 file_path = os.path.join(config_dir, filename)
1458
1459 lines = [generate_header(run_id, header_sources)]
1460 items = cfg.get(section, {}).get(key, [])
1461 lines.extend(items)
1462
1463 with open(file_path, "w") as f: f.write("\n".join(lines))
1464 print(f"[SUCCESS] Generated {filename}: {os.path.relpath(file_path)}")
1465 return os.path.abspath(file_path)
1466
1467def generate_multi_block_bcs(run_dir: str, run_id: str, case_cfg: dict, source_files: dict) -> list:
1468 """!
1469 @brief Parses multi-block BCs from YAML, generates a .run file for each block,
1470 and returns a list of their absolute paths.
1471 @details Handles both simple list format (for single-block cases) and a
1472 list-of-lists (for multi-block cases) for boundary conditions.
1473 @param[in] run_dir The path to the main run directory.
1474 @param[in] run_id The unique identifier for the run.
1475 @param[in] case_cfg The parsed case.yml configuration dictionary.
1476 @param[in] source_files A dictionary of source files for the header.
1477 @return A list of absolute paths to the generated BC files.
1478 @throws ValueError if the number of BC definitions does not match the number of blocks.
1479 """
1480 print("[INFO] Generating boundary condition files...")
1481 config_dir = os.path.join(run_dir, "config")
1482 num_blocks = int(case_cfg.get('models', {}).get('domain', {}).get('blocks', 1))
1483 prepared_blocks = validate_and_prepare_boundary_conditions(case_cfg)
1484
1485 generated_files = []
1486 for i, block_bcs_list in enumerate(prepared_blocks):
1487 file_name = "bcs.run" if num_blocks == 1 else f"bcs_block{i}.run"
1488 bcs_file_path = os.path.join(config_dir, file_name)
1489 bcs_lines = [generate_header(run_id, source_files)]
1490
1491 for bc in block_bcs_list:
1492 face, bc_type, handler = bc['face'], bc['type'], bc['handler']
1493 params_str = ""
1494 if bc.get('params'):
1495 parts = []
1496 for k, v in bc['params'].items():
1497 if isinstance(v, bool):
1498 value_str = "true" if v else "false"
1499 else:
1500 value_str = str(v)
1501 parts.append(f"{k}={value_str}")
1502 params_str = " ".join(parts)
1503 bcs_lines.append(f"{face:<20s} {bc_type:<12s} {handler:<20s} {params_str}")
1504
1505 with open(bcs_file_path, "w") as f: f.write("\n".join(bcs_lines))
1506
1507 print(f"[SUCCESS] Generated BCs for Block {i}: {os.path.relpath(bcs_file_path)}")
1508 generated_files.append(os.path.abspath(bcs_file_path))
1509
1510 return generated_files
1511
1512def format_flag_value(value):
1513 """!
1514 @brief Converts Python types to C-style command-line flag values.
1515 @param[in] value The Python object to convert (bool, list, or other).
1516 @return A string representation suitable for a C command-line parser.
1517 """
1518 if isinstance(value, bool):
1519 return "1" if value else "0"
1520 if isinstance(value, list):
1521 return ",".join(map(str, value))
1522 return str(value)
1523
1524def normalize_momentum_solver_type(value: str) -> str:
1525 """!
1526 @brief Normalizes user-facing momentum solver names to C-enum CLI values.
1527 @param[in] value Human-readable or enum-like momentum solver string.
1528 @return Canonical value accepted by -mom_solver_type.
1529 @throws ValueError if the input cannot be mapped.
1530 """
1531 if value is None:
1532 raise ValueError("momentum solver type cannot be None")
1533
1534 raw = str(value).strip()
1535 direct = {
1536 "EXPLICIT_RK",
1537 "DUALTIME_PICARD_RK4",
1538 "DUALTIME_NK_ARNOLDI",
1539 "DUALTIME_NK_ANALYTICAL_JACOBIAN"
1540 }
1541 if raw in direct:
1542 return raw
1543
1544 normalized = raw.lower().replace("-", " ").replace("_", " ")
1545 normalized = " ".join(normalized.split())
1546 aliases = {
1547 "explicit rk4": "EXPLICIT_RK",
1548 "explicit rk": "EXPLICIT_RK",
1549 "explicit runge kutta 4": "EXPLICIT_RK",
1550 "explicit runge kutta": "EXPLICIT_RK",
1551 "dual time picard rk4": "DUALTIME_PICARD_RK4",
1552 "dualtime picard rk4": "DUALTIME_PICARD_RK4",
1553 "dual time picard": "DUALTIME_PICARD_RK4",
1554 "dualtime picard": "DUALTIME_PICARD_RK4",
1555 "dual time nk arnoldi": "DUALTIME_NK_ARNOLDI",
1556 "dualtime nk arnoldi": "DUALTIME_NK_ARNOLDI",
1557 "dual time nk analytical jacobian": "DUALTIME_NK_ANALYTICAL_JACOBIAN",
1558 "dualtime nk analytical jacobian": "DUALTIME_NK_ANALYTICAL_JACOBIAN"
1559 }
1560 mapped = aliases.get(normalized)
1561 if mapped is None:
1562 raise ValueError(
1563 f"Unknown momentum solver '{value}'. Use one of: "
1564 "'Explicit RK4', 'Dual Time Picard RK4', 'Dual Time NK Arnoldi', "
1565 "'Dual Time NK Analytical Jacobian' (or canonical enum values)."
1566 )
1567 return mapped
1568
1569def normalize_field_init_mode(value: str) -> int:
1570 """!
1571 @brief Normalizes user-facing field init mode names to C enum/int codes (-finit).
1572 @param[in] value Human-readable or enum-like field initialization mode.
1573 @return Canonical integer code accepted by -finit.
1574 @throws ValueError if the input cannot be mapped.
1575 """
1576 if value is None:
1577 raise ValueError("field initialization mode cannot be None")
1578
1579 raw = str(value).strip()
1580 direct = {"0": 0, "1": 1, "2": 2}
1581 if raw in direct:
1582 return direct[raw]
1583
1584 normalized = raw.lower().replace("-", " ").replace("_", " ")
1585 normalized = " ".join(normalized.split())
1586 aliases = {
1587 "zero": 0,
1588 "constant": 1,
1589 "poiseuille": 2,
1590 }
1591 mapped = aliases.get(normalized)
1592 if mapped is None:
1593 raise ValueError(
1594 f"Unknown initial_conditions mode '{value}'. Use one of: 'Zero', 'Constant', 'Poiseuille' (or 0/1/2)."
1595 )
1596 return mapped
1597
1598def normalize_particle_init_mode(value: str) -> int:
1599 """!
1600 @brief Normalizes particle init mode names to C enum/int codes (-pinit).
1601 @param[in] value Human-readable or enum-like particle initialization mode.
1602 @return Canonical integer code accepted by -pinit.
1603 @throws ValueError if the input cannot be mapped.
1604 """
1605 if value is None:
1606 raise ValueError("particle init mode cannot be None")
1607
1608 raw = str(value).strip()
1609 direct = {"0": 0, "1": 1, "2": 2, "3": 3}
1610 if raw in direct:
1611 return direct[raw]
1612
1613 normalized = raw.lower().replace("-", " ").replace("_", " ")
1614 normalized = " ".join(normalized.split())
1615 aliases = {
1616 "surface": 0,
1617 "surface random": 0,
1618 "volume": 1,
1619 "volumetric": 1,
1620 "point source": 2,
1621 "pointsource": 2,
1622 "surface edges": 3,
1623 "surfaceedges": 3,
1624 "surface at edges": 3,
1625 }
1626 mapped = aliases.get(normalized)
1627 if mapped is None:
1628 raise ValueError(
1629 f"Unknown particle init_mode '{value}'. Use one of: "
1630 "'Surface', 'Volume', 'PointSource', 'SurfaceEdges' (or 0/1/2/3)."
1631 )
1632 return mapped
1633
1634def append_passthrough_flags(control_lines: list, options: dict):
1635 """!
1636 @brief Appends raw CLI flags to the control list from a {flag: value} dict.
1637 @details Boolean `true` is emitted as a switch with no value. Boolean `false`
1638 is skipped. All other values are emitted as "<flag> <value>".
1639 @param[out] control_lines The destination list of control-file lines.
1640 @param[in] options Mapping of raw CLI flags to values.
1641 """
1642 if not options:
1643 return
1644 for flag, value in options.items():
1645 if isinstance(value, bool):
1646 if value:
1647 control_lines.append(str(flag))
1648 continue
1649 control_lines.append(f"{flag} {format_flag_value(value)}")
1650
1651def parse_and_add_model_flags(case_cfg: dict, control_lines: list):
1652 """!
1653 @brief Parses the 'models' section of case.yml and adds corresponding C-solver flags.
1654 @param[in] case_cfg The parsed case.yml configuration dictionary.
1655 @param[out] control_lines A list of strings to which C-flags will be appended.
1656 """
1657 models = case_cfg.get('models', {})
1658 FLAG_MAP = {
1659 'domain': {'blocks': '-nblk', 'i_periodic': '-i_periodic', 'j_periodic': '-j_periodic', 'k_periodic': '-k_periodic'},
1660 'physics.fsi': {'immersed': '-imm', 'moving_fsi': '-fsi'},
1661 'physics.particles': {'count': '-numParticles'},
1662 'physics.turbulence': {'les': '-les', 'rans': '-rans', 'wall_function': '-wallfunction'},
1663 'statistics': {'time_averaging': '-averaging'}
1664 }
1665 for section_path, flags in FLAG_MAP.items():
1666 current_level = models
1667 try:
1668 for key in section_path.split('.'): current_level = current_level[key]
1669 for yaml_key, flag in flags.items():
1670 if yaml_key in current_level:
1671 control_lines.append(f"{flag} {format_flag_value(current_level[yaml_key])}")
1672 except KeyError: continue
1673
1674 if models.get('physics', {}).get('dimensionality') == '2D':
1675 control_lines.append("-TwoD 1")
1676
1677 particles_cfg = models.get('physics', {}).get('particles', {})
1678 p_init_mode_str = particles_cfg.get('init_mode', 'Surface')
1679 pinit_code = normalize_particle_init_mode(p_init_mode_str)
1680 control_lines.append(f"-pinit {pinit_code}")
1681 print(f" - Particle Initialization Mode: {p_init_mode_str} (Code: {pinit_code})")
1682
1683 if pinit_code == 2:
1684 point_cfg = particles_cfg.get('point_source', {})
1685 if not isinstance(point_cfg, dict):
1686 raise ValueError("models.physics.particles.point_source must be a mapping when init_mode is PointSource.")
1687 try:
1688 psrc_x = float(point_cfg['x'])
1689 psrc_y = float(point_cfg['y'])
1690 psrc_z = float(point_cfg['z'])
1691 except (KeyError, TypeError, ValueError):
1692 raise ValueError("PointSource init_mode requires numeric point_source.{x,y,z} values.")
1693 control_lines.append(f"-psrc_x {psrc_x}")
1694 control_lines.append(f"-psrc_y {psrc_y}")
1695 control_lines.append(f"-psrc_z {psrc_z}")
1696 print(f" - Particle Point Source: ({psrc_x}, {psrc_y}, {psrc_z})")
1697
1698 p_restart_mode = particles_cfg.get('restart_mode')
1699 if p_restart_mode:
1700 p_restart_mode_normalized = str(p_restart_mode).lower()
1701 if p_restart_mode_normalized not in {"init", "load"}:
1702 raise ValueError(f"Unknown particle restart_mode '{p_restart_mode}'. Options are 'init' or 'load'.")
1703 control_lines.append(f"-particle_restart_mode \"{p_restart_mode}\"")
1704
1705def parse_solver_config(solver_cfg: dict) -> dict:
1706 """!
1707 @brief Parses the structured solver.yml into a flat dictionary of {flag: value}.
1708 @param[in] solver_cfg The parsed solver.yml configuration dictionary.
1709 @return A dictionary where keys are C-solver flags and values are the corresponding settings.
1710 """
1711 flags = {}
1712 if 'operation_mode' in solver_cfg and isinstance(solver_cfg['operation_mode'], dict):
1713 op_mode = solver_cfg['operation_mode']
1714 if 'eulerian_field_source' in op_mode:
1715 flags['-euler_field_source'] = f"\"{op_mode.get('eulerian_field_source', 'solve')}\""
1716 if 'analytical_type' in op_mode and op_mode.get('analytical_type') is not None:
1717 flags['-analytical_type'] = f"\"{op_mode.get('analytical_type')}\""
1718
1719 selected_solver = None
1720 if 'strategy' in solver_cfg:
1721 s = solver_cfg['strategy']
1722 if 'central_diff' in s:
1723 flags['-central'] = format_flag_value(s['central_diff'])
1724 # Preferred selector.
1725 if 'momentum_solver' in s:
1726 selected_solver = normalize_momentum_solver_type(s['momentum_solver'])
1727 elif 'implicit' in s:
1728 raise ValueError("Legacy key 'strategy.implicit' is not supported. Use 'strategy.momentum_solver'.")
1729
1730 ms = solver_cfg.get('momentum_solver', {})
1731 if isinstance(ms, dict) and 'type' in ms and selected_solver is None:
1732 selected_solver = normalize_momentum_solver_type(ms['type'])
1733
1734 if selected_solver is None:
1735 selected_solver = "DUALTIME_PICARD_RK4"
1736 flags['-mom_solver_type'] = f"\"{selected_solver}\""
1737
1738 if 'tolerances' in solver_cfg:
1739 t = solver_cfg['tolerances']
1740 tol_map = {
1741 'max_iterations': '-mom_max_pseudo_steps',
1742 'absolute_tol': '-mom_atol',
1743 'relative_tol': '-mom_rtol',
1744 'step_tol': '-imp_stol'
1745 }
1746 for key, flag in tol_map.items():
1747 if key in t:
1748 flags[flag] = t[key]
1749
1750 def _append_dualtime_options(cfg: dict):
1751 if 'max_pseudo_steps' in cfg:
1752 flags['-mom_max_pseudo_steps'] = cfg['max_pseudo_steps']
1753 if 'absolute_tol' in cfg:
1754 flags['-mom_atol'] = cfg['absolute_tol']
1755 if 'relative_tol' in cfg:
1756 flags['-mom_rtol'] = cfg['relative_tol']
1757 if 'step_tol' in cfg:
1758 flags['-imp_stol'] = cfg['step_tol']
1759 if 'pseudo_cfl' in cfg:
1760 pcfl = cfg['pseudo_cfl']
1761 if 'initial' in pcfl:
1762 flags['-pseudo_cfl'] = pcfl['initial']
1763 if 'minimum' in pcfl:
1764 flags['-min_pseudo_cfl'] = pcfl['minimum']
1765 if 'maximum' in pcfl:
1766 flags['-max_pseudo_cfl'] = pcfl['maximum']
1767 if 'growth_factor' in pcfl:
1768 flags['-pseudo_cfl_growth_factor'] = pcfl['growth_factor']
1769 if 'reduction_factor' in pcfl:
1770 flags['-pseudo_cfl_reduction_factor'] = pcfl['reduction_factor']
1771 if 'rk4_residual_noise_allowance_factor' in cfg:
1772 flags['-mom_dt_rk4_residual_norm_noise_allowance_factor'] = cfg['rk4_residual_noise_allowance_factor']
1773
1774 if isinstance(ms, dict):
1775 allowed_ms_keys = {'type', 'dual_time_picard_rk4'}
1776 unknown_ms_keys = sorted(set(ms.keys()) - allowed_ms_keys)
1777 if unknown_ms_keys:
1778 raise ValueError(
1779 f"Unsupported momentum_solver keys/blocks: {unknown_ms_keys}. "
1780 "Currently supported block: 'dual_time_picard_rk4'."
1781 )
1782
1783 dt_picard_cfg = ms.get('dual_time_picard_rk4')
1784 if dt_picard_cfg is not None:
1785 if selected_solver != "DUALTIME_PICARD_RK4":
1786 raise ValueError(
1787 f"momentum_solver.dual_time_picard_rk4 is set but selected solver is {selected_solver}."
1788 )
1789 if not isinstance(dt_picard_cfg, dict):
1790 raise ValueError("momentum_solver.dual_time_picard_rk4 must be a mapping.")
1791 _append_dualtime_options(dt_picard_cfg)
1792 if 'pressure_solver' in solver_cfg:
1793 ps = solver_cfg['pressure_solver']
1794 if 'tolerance' in ps: flags['-poisson_tol'] = ps['tolerance']
1795 if 'multigrid' in ps:
1796 mg = ps['multigrid']
1797 mg_map = {'levels': '-mg_level', 'pre_sweeps': '-mg_pre_it', 'post_sweeps': '-mg_post_it'}
1798 for key, flag in mg_map.items():
1799 if key in mg: flags[flag] = mg[key]
1800 if 'semi_coarsening' in mg:
1801 sc = mg['semi_coarsening']
1802 if 'i' in sc: flags['-mg_i_semi'] = format_flag_value(sc['i'])
1803 if 'j' in sc: flags['-mg_j_semi'] = format_flag_value(sc['j'])
1804 if 'k' in sc: flags['-mg_k_semi'] = format_flag_value(sc['k'])
1805 if 'level_solvers' in mg:
1806 for level_name, settings in mg['level_solvers'].items():
1807 level_num = level_name.split('_')[-1]
1808 for key, value in settings.items():
1809 flags[f"-ps_mg_levels_{level_num}_{key}"] = format_flag_value(value)
1810 if 'petsc_passthrough_options' in solver_cfg:
1811 passthrough = solver_cfg['petsc_passthrough_options']
1812 if passthrough:
1813 for key, value in passthrough.items():
1814 flags[key] = format_flag_value(value)
1815 return flags
1816
1817def generate_solver_control_file(run_dir, run_id, configs, num_procs, monitor_files):
1818 """!
1819 @brief Generates the main .control file for the C-solver.
1820 @details Orchestrates the conversion of all YAML configurations (case, solver, monitor)
1821 into a single, machine-readable file of command-line flags.
1822 @param[in] run_dir The path to the main run directory.
1823 @param[in] run_id The unique identifier for the run.
1824 @param[in] configs A dictionary containing the parsed YAML data.
1825 @param[in] num_procs The number of MPI processes for the run.
1826 @param[in] monitor_files A dictionary containing paths to generated monitor files.
1827 @return The absolute path to the generated solver control file.
1828 """
1829 print("[INFO] Generating master solver control file...")
1830 case_cfg, solver_cfg, monitor_cfg = configs['case'], configs['solver'], configs['monitor']
1831 source_files = {'Case': configs['case_path'], 'Solver': configs['solver_path'], 'Monitor': configs['monitor_path']}
1832
1833 control_lines = []
1834 try:
1835 props, run_ctrl = case_cfg['properties'], case_cfg['run_control']
1836 scales, fluid, ic = props['scaling'], props['fluid'], props['initial_conditions']
1837 L_ref, U_ref, rho, mu = float(scales['length_ref']), float(scales['velocity_ref']), float(fluid['density']), float(fluid['viscosity'])
1838 reynolds = (rho * U_ref * L_ref) / mu if mu != 0 else float('inf')
1839 dt_phys = float(run_ctrl['dt_physical'])
1840 T_ref = L_ref / U_ref if U_ref != 0 else float('inf')
1841 dt_nondim = dt_phys / T_ref if T_ref != float('inf') else 0.0
1842 u, v, w = float(ic['u_physical']), float(ic['v_physical']), float(ic['w_physical'])
1843 finit_mode_str = ic.get('mode', 'Constant')
1844 finit_code = normalize_field_init_mode(finit_mode_str)
1845 print(f" - Reynolds Number (Re) = {reynolds:.4f}")
1846 print(f" - Non-Dimensional dt* = {dt_nondim:.6f}")
1847 print(f" - Field Initialization Mode: {finit_mode_str} (Code: {finit_code})")
1848 control_lines.extend([
1849 f"-start_step {run_ctrl['start_step']}", f"-totalsteps {run_ctrl['total_steps']}",
1850 f"-ren {reynolds}", f"-dt {dt_nondim}", f"-finit {finit_code}",
1851 f"-ucont_x {u/U_ref if U_ref!=0 else 0}", f"-ucont_y {v/U_ref if U_ref!=0 else 0}", f"-ucont_z {w/U_ref if U_ref!=0 else 0}",
1852 f"-scaling_L_ref {L_ref}", f"-scaling_U_ref {U_ref}", f"-scaling_rho_ref {rho}"
1853 ])
1854 except (KeyError, TypeError, ZeroDivisionError, ValueError) as e:
1855 print(f"[FATAL] Error processing case.yml: {e}", file=sys.stderr)
1856 sys.exit(1)
1857
1858 try:
1859 bcs_files = generate_multi_block_bcs(run_dir, run_id, case_cfg, source_files)
1860 except ValueError as e:
1861 print(f"[FATAL] Invalid boundary_conditions in case.yml: {e}", file=sys.stderr)
1862 sys.exit(1)
1863 control_lines.append(f"-bcs_files \"{','.join(bcs_files)}\"")
1864
1865 # --- CORRECTED: Add paths for whitelist and profile files ---
1866 control_lines.append(f"-whitelist_config_file {monitor_files['whitelist']}")
1867 control_lines.append(f"-profile_config_file {monitor_files['profile']}")
1868
1869 grid_cfg = case_cfg.get('grid', {})
1870 grid_mode = grid_cfg.get('mode')
1871 expected_nblk = int(case_cfg.get('models', {}).get('domain', {}).get('blocks', 1))
1872
1873 if grid_mode == 'file':
1874 print("[INFO] Grid Mode: Using external file...")
1875 case_file_dir = os.path.dirname(configs['case_path'])
1876 source_grid = grid_cfg['source_file']
1877 if not os.path.isabs(source_grid):
1878 source_grid = os.path.abspath(os.path.join(case_file_dir, source_grid))
1879 nondim_grid_path = os.path.join(run_dir, "config", "grid.run")
1880 try:
1881 summary = validate_and_nondimensionalize_picgrid(
1882 source_grid, nondim_grid_path, L_ref, expected_nblk=expected_nblk
1883 )
1884 print(
1885 f"[SUCCESS] Validated and non-dimensionalized grid: {os.path.relpath(nondim_grid_path)} "
1886 f"(nblk={summary['nblk']}, total_nodes={summary['total_nodes']})"
1887 )
1888 control_lines.append(f"-grid_file {nondim_grid_path}")
1889 except Exception as e:
1890 print(f"[FATAL] Failed to process grid file '{source_grid}': {e}", file=sys.stderr)
1891 sys.exit(1)
1892 elif grid_mode == 'grid_gen':
1893 print("[INFO] Grid Mode: Generating external grid via grid.gen...")
1894 nondim_grid_path = os.path.join(run_dir, "config", "grid.run")
1895 try:
1896 generated_grid = run_grid_generator(configs['case_path'], run_dir, grid_cfg)
1897 summary = validate_and_nondimensionalize_picgrid(
1898 generated_grid, nondim_grid_path, L_ref, expected_nblk=expected_nblk
1899 )
1900 print(
1901 f"[SUCCESS] grid.gen output validated and non-dimensionalized: {os.path.relpath(nondim_grid_path)} "
1902 f"(nblk={summary['nblk']}, total_nodes={summary['total_nodes']})"
1903 )
1904 control_lines.append(f"-grid_file {nondim_grid_path}")
1905 except Exception as e:
1906 print(f"[FATAL] Grid generation failed: {e}", file=sys.stderr)
1907 sys.exit(1)
1908 elif grid_mode == 'programmatic_c':
1909 print("[INFO] Grid Mode: Programmatic C...")
1910 grid_settings = dict(grid_cfg.get('programmatic_settings', {}))
1911 control_lines.append("-grid")
1912 px, py, pz = grid_settings.get('da_processors_x'), grid_settings.get('da_processors_y'), grid_settings.get('da_processors_z')
1913 if any(isinstance(p, (list, tuple)) for p in [px, py, pz] if p is not None):
1914 raise ValueError(
1915 "da_processors_x/y/z must be scalar integers. "
1916 "Per-block MPI decomposition is not implemented on the C side; DMDA layout is global."
1917 )
1918 if num_procs > 1 and all(p is not None for p in [px, py, pz]):
1919 if not all(isinstance(p, int) and p > 0 for p in [px, py, pz]):
1920 raise ValueError("da_processors_x/y/z must be positive integers when provided.")
1921 total_layout = px * py * pz
1922 if total_layout != num_procs:
1923 raise ValueError(f"Processor layout mismatch: product ({total_layout}) != processes ({num_procs}).")
1924 print(f"[INFO] Applying user-defined processor layout for {num_procs} processes.")
1925 else:
1926 if num_procs == 1: print("[INFO] Serial run, ignoring da_processors layout.")
1927 else: print("[INFO] Letting PETSc automatically determine processor layout.")
1928 for p_key in ['da_processors_x', 'da_processors_y', 'da_processors_z']:
1929 grid_settings.pop(p_key, None)
1930 for key, value in grid_settings.items(): control_lines.append(f"-{key} {format_flag_value(value)}")
1931 else:
1932 raise ValueError(f"Unknown or missing grid mode '{grid_mode}' in case.yml.")
1933
1934 parse_and_add_model_flags(case_cfg, control_lines)
1935
1936 if 'solver_parameters' in case_cfg:
1937 params = case_cfg['solver_parameters']
1938 if params:
1939 for key, value in params.items():
1940 control_lines.append(f"{key} {format_flag_value(value)}")
1941
1942 try:
1943 solver_flags = parse_solver_config(solver_cfg)
1944 except ValueError as e:
1945 print(f"[FATAL] Invalid solver.yml settings: {e}", file=sys.stderr)
1946 sys.exit(1)
1947 for flag, value in solver_flags.items(): control_lines.append(f"{flag} {value}")
1948
1949 append_passthrough_flags(control_lines, monitor_cfg.get('solver_monitoring', {}))
1950
1951 io_cfg = monitor_cfg.get('io', {})
1952 if 'data_output_frequency' in io_cfg: control_lines.append(f"-tio {io_cfg['data_output_frequency']}")
1953 if 'particle_log_interval' in io_cfg: control_lines.append(f"-logfreq {io_cfg['particle_log_interval']}")
1954 if 'directories' in io_cfg:
1955 dirs = io_cfg['directories']
1956 if 'output' in dirs: control_lines.append(f"-output_dir {dirs['output']}")
1957 if 'restart' in dirs: control_lines.append(f"-restart_dir {dirs['restart']}")
1958 if 'log' in dirs: control_lines.append(f"-log_dir {dirs['log']}")
1959 if 'eulerian_subdir' in dirs: control_lines.append(f"-euler_subdir {dirs['eulerian_subdir']}")
1960 if 'particle_subdir' in dirs: control_lines.append(f"-particle_subdir {dirs['particle_subdir']}")
1961
1962 final_content = generate_header(run_id, source_files) + "\n".join(control_lines)
1963 control_file_path = os.path.join(run_dir, "config", f"{run_id}.control")
1964 with open(control_file_path, "w") as f: f.write(final_content)
1965 print(f"[SUCCESS] Generated solver control file: {os.path.relpath(control_file_path)}")
1966 return os.path.abspath(control_file_path)
1967
1968def generate_post_recipe_file(run_dir: str, run_id: str, post_cfg: dict, source_files: dict) -> str:
1969 """!
1970 @brief Generates a key=value config file (post.run) for the C post-processor.
1971 @details Translates the structured post-processing YAML into the specific flat
1972 key-value format required by the C executable, including complex,
1973 semicolon-separated pipeline strings.
1974 @param[in] run_dir The path to the main run directory.
1975 @param[in] run_id The unique identifier for the run.
1976 @param[in] post_cfg The parsed post-profile YAML configuration dictionary.
1977 @param[in] source_files A dictionary of source files for the header.
1978 @return The absolute path to the generated post.run recipe file.
1979 """
1980 print("[INFO] Generating post-processor recipe file (post.run)...")
1981 config_dir = os.path.join(run_dir, "config")
1982 post_recipe_path = os.path.join(config_dir, "post.run")
1983
1984 lines = [generate_header(run_id, source_files)]
1985
1986 c_config = {}
1987
1988 # --- 1. Process Run Control ---
1989 # Accept snake_case names (preferred) with camelCase fallback for backwards compatibility.
1990 rc = post_cfg.get('run_control', {})
1991 c_config['startTime'] = rc.get('start_step', rc.get('startTime', 0))
1992 c_config['endTime'] = rc.get('end_step', rc.get('endTime', 10))
1993 c_config['timeStep'] = rc.get('step_interval', rc.get('timeStep', 1))
1994
1995 # --- 2. Process Global Operations ---
1996 eulerian_pipeline_parts = []
1997 if post_cfg.get('global_operations', {}).get('dimensionalize', False):
1998 eulerian_pipeline_parts.append('DimensionalizeAllLoadedFields')
1999
2000 # --- 3. Build Eulerian Pipeline String ---
2001 for task in post_cfg.get('eulerian_pipeline', []):
2002 task_name = task.get('task')
2003 if task_name == 'q_criterion':
2004 eulerian_pipeline_parts.append('ComputeQCriterion')
2005 elif task_name == 'normalize_field':
2006 field = task.get('field', 'P')
2007 eulerian_pipeline_parts.append(f'NormalizeRelativeField:{field}')
2008 ref_point = task.get('reference_point', [1, 1, 1])
2009 c_config['reference_ip'] = ref_point[0]
2010 c_config['reference_jp'] = ref_point[1]
2011 c_config['reference_kp'] = ref_point[2]
2012 elif task_name == 'nodal_average':
2013 in_field = task.get('input_field')
2014 out_field = task.get('output_field')
2015 if in_field and out_field:
2016 eulerian_pipeline_parts.append(f'CellToNodeAverage:{in_field}>{out_field}')
2017
2018 if eulerian_pipeline_parts:
2019 c_config['process_pipeline'] = ";".join(eulerian_pipeline_parts)
2020
2021 # --- 4. Build Lagrangian Pipeline String ---
2022 lagrangian_pipeline_parts = []
2023 for task in post_cfg.get('lagrangian_pipeline', []):
2024 task_name = task.get('task')
2025 if task_name == 'specific_ke':
2026 in_field = task.get('input_field')
2027 out_field = task.get('output_field')
2028 if in_field and out_field:
2029 lagrangian_pipeline_parts.append(f'ComputeSpecificKE:{in_field}>{out_field}')
2030
2031 if lagrangian_pipeline_parts:
2032 c_config['particle_pipeline'] = ";".join(lagrangian_pipeline_parts)
2033
2034 # --- 4B. Build Statistics Pipeline String ---
2035 statistics_pipeline_parts = []
2036 statistics_output_prefix = None
2037 stats_cfg = post_cfg.get('statistics_pipeline')
2038 stats_entries = []
2039 if isinstance(stats_cfg, list):
2040 stats_entries = stats_cfg
2041 elif isinstance(stats_cfg, dict):
2042 stats_entries = stats_cfg.get('tasks', [])
2043 statistics_output_prefix = stats_cfg.get('output_prefix')
2044
2045 for entry in stats_entries:
2046 if isinstance(entry, str):
2047 task_name = entry
2048 elif isinstance(entry, dict):
2049 task_name = entry.get('task')
2050 else:
2051 continue
2052 try:
2053 statistics_pipeline_parts.append(normalize_statistics_task(task_name))
2054 except ValueError:
2055 # validation should catch this earlier; keep generation tolerant.
2056 continue
2057
2058 if statistics_pipeline_parts:
2059 c_config['statistics_pipeline'] = ";".join(statistics_pipeline_parts)
2060 if statistics_output_prefix is None:
2061 statistics_output_prefix = post_cfg.get('statistics_output_prefix')
2062 if statistics_output_prefix:
2063 c_config['statistics_output_prefix'] = statistics_output_prefix
2064
2065 # --- 5. Process I/O ---
2066 io = post_cfg.get('io', {})
2067 c_config['output_prefix'] = io.get('output_directory','viz')+'/'+io.get('output_filename_prefix', 'Field')
2068 c_config['particle_output_prefix'] = io.get('output_directory','viz')+'/'+io.get('particle_filename_prefix', 'Particle')
2069 c_config['output_particles'] = io.get('output_particles', False)
2070 c_config['particle_output_freq'] = io.get('particle_subsampling_frequency', 1)
2071 c_config['output_fields_instantaneous'] = ",".join(io.get('eulerian_fields', []))
2072 c_config['output_fields_averaged'] = ",".join(io.get('eulerian_fields_averaged', []))
2073 c_config['particle_fields_instantaneous'] = ",".join(io.get('particle_fields', []))
2074 input_extensions = io.get('input_extensions', {})
2075 if isinstance(input_extensions, dict):
2076 e_ext = input_extensions.get('eulerian')
2077 p_ext = input_extensions.get('particle')
2078 if e_ext:
2079 c_config['eulerianExt'] = str(e_ext).strip().lstrip('.')
2080 if p_ext:
2081 c_config['particleExt'] = str(p_ext).strip().lstrip('.')
2082
2083 # --- 6. Add Source Directory ---
2084 if 'source_data' in post_cfg and 'directory' in post_cfg['source_data']:
2085 c_config['source_directory'] = post_cfg['source_data']['directory']
2086
2087 # --- 7. Write the final file ---
2088 for key, value in c_config.items():
2089 if value is not None and str(value) != "":
2090 lines.append(f"{key} = {value}")
2091
2092 with open(post_recipe_path, "w") as f: f.write("\n".join(lines))
2093 print(f"[SUCCESS] Generated post-processor recipe: {os.path.relpath(post_recipe_path)}")
2094 return os.path.abspath(post_recipe_path)
2095
2096def execute_command(command: list, run_dir: str, log_filename: str, monitor_cfg: dict = None):
2097 """!
2098 @brief Executes a command, streaming its output to the console and a log file.
2099 ...
2100 @param[in] monitor_cfg Optional. If provided, used to set LOG_LEVEL in a custom environment.
2101 If None, the process inherits the parent's environment directly.
2102 """
2103 # Create the log directory if it doesn't exist.
2104 log_dir = os.path.join(run_dir, "logs")
2105 os.makedirs(log_dir, exist_ok=True)
2106
2107 log_path = os.path.join(log_dir, log_filename)
2108 print(f"[INFO] Launching Command...\n > {' '.join(command)}")
2109 print(f" Log file: {os.path.relpath(log_path)}")
2110 print("-" * 60)
2111
2112 # --- Environment Handling ---
2113 popen_kwargs = {
2114 "stdout": subprocess.PIPE, "stderr": subprocess.STDOUT,
2115 "cwd": run_dir, "bufsize": 1, "universal_newlines": True,
2116 "encoding": 'utf-8', "errors": 'replace'
2117 }
2118
2119 if monitor_cfg:
2120 print("[INFO] Creating custom environment to set LOG_LEVEL.")
2121 run_env = os.environ.copy()
2122 verbosity = monitor_cfg.get('logging', {}).get('verbosity', 'INFO').upper()
2123 run_env['LOG_LEVEL'] = verbosity
2124 print(f"[INFO] Setting LOG_LEVEL={verbosity} for C executable.")
2125 popen_kwargs['env'] = run_env
2126 else:
2127 print("[INFO] Using inherited environment for process.")
2128
2129 print("-" * 60)
2130 try:
2131 # Pass the constructed keyword arguments dictionary to Popen
2132 process = subprocess.Popen(command, **popen_kwargs)
2133
2134 with open(log_path, "w") as log_file:
2135 for line in process.stdout:
2136 sys.stdout.write(line)
2137 log_file.write(line)
2138 process.wait()
2139 return_code = process.returncode
2140 print("-" * 60)
2141 if return_code == 0:
2142 print(f"[SUCCESS] Execution finished successfully.")
2143 else:
2144 print(f"[FATAL] Execution failed with exit code {return_code}. Check log: {os.path.relpath(log_path)}", file=sys.stderr)
2145 sys.exit(return_code)
2146 except FileNotFoundError:
2147 print(f"[FATAL] Command not found or is not executable: '{command[0]}'", file=sys.stderr)
2148 print(" Please check that the path is correct and the file has execute permissions.", file=sys.stderr)
2149 sys.exit(1)
2150 except Exception as e:
2151 print(f"[FATAL] An unexpected error occurred during execution: {e}", file=sys.stderr)
2152 sys.exit(1)
2153
2154def auto_identify_run_inputs(config_dir: str):
2155 """Auto-detect case.yml, monitor.yml, and *.control in a run config directory."""
2156 all_yml_files = glob.glob(os.path.join(config_dir, "*.yml"))
2157 case_path, monitor_path = None, None
2158 for f_path in all_yml_files:
2159 try:
2160 content = read_yaml_file(f_path)
2161 if not isinstance(content, dict):
2162 continue
2163 if 'models' in content and 'boundary_conditions' in content:
2164 case_path = f_path
2165 elif 'io' in content and 'logging' in content:
2166 monitor_path = f_path
2167 except Exception as e:
2168 print(f"[WARNING] Could not parse or inspect '{f_path}': {e}", file=sys.stderr)
2169 try:
2170 solver_control_path = glob.glob(os.path.join(config_dir, "*.control"))[0]
2171 except IndexError:
2172 solver_control_path = None
2173 return case_path, monitor_path, solver_control_path
2174
2175def resolve_post_source_directory(run_dir: str, monitor_cfg: dict, post_cfg: dict, strict: bool = True) -> str:
2176 """Resolve post source directory token and optionally enforce existence."""
2177 solver_output_dir_rel = monitor_cfg.get('io', {}).get('directories', {}).get('output', 'results')
2178 solver_output_dir_abs = os.path.join(run_dir, solver_output_dir_rel)
2179 source_dir_template = post_cfg.get('source_data', {}).get('directory', '<solver_output_dir>')
2180 if source_dir_template == '<solver_output_dir>':
2181 resolved_source_dir = solver_output_dir_abs
2182 print(f"[INFO] Post-processor source data: {os.path.relpath(resolved_source_dir)}")
2183 else:
2184 resolved_source_dir = os.path.abspath(os.path.join(run_dir, source_dir_template))
2185 print(f"[INFO] Post-processor source data (user-defined): {os.path.relpath(resolved_source_dir)}")
2186
2187 if strict and (not os.path.isdir(resolved_source_dir) or not os.listdir(resolved_source_dir)):
2188 print(
2189 f"[FATAL] Source data directory for post-processing not found or empty: {os.path.relpath(resolved_source_dir)}",
2190 file=sys.stderr
2191 )
2192 sys.exit(1)
2193 if not strict and (not os.path.isdir(resolved_source_dir) or not os.listdir(resolved_source_dir)):
2194 print("[WARNING] Source data directory is not available yet; keeping deferred path for scheduled post job.")
2195 return resolved_source_dir
2196
2197def render_slurm_array_stage_script(
2198 script_path: str,
2199 job_name: str,
2200 cluster_cfg: dict,
2201 array_spec: str,
2202 case_index_tsv: str,
2203 stage: str,
2204 solver_exe: str,
2205 post_exe: str,
2206 stdout_path: str,
2207 stderr_path: str
2208):
2209 """Render array script that maps SLURM_ARRAY_TASK_ID to per-case run artifacts."""
2210 resources = cluster_cfg.get("resources", {})
2211 notifications = cluster_cfg.get("notifications", {}) or {}
2212 execution = cluster_cfg.get("execution", {}) or {}
2213 module_setup = execution.get("module_setup", []) or []
2214 extra_sbatch = execution.get("extra_sbatch")
2215
2216 lines = [
2217 "#!/bin/bash",
2218 f"#SBATCH --job-name={job_name}",
2219 f"#SBATCH --nodes={resources['nodes']}",
2220 f"#SBATCH --ntasks-per-node={resources['ntasks_per_node']}",
2221 f"#SBATCH --mem={resources['mem']}",
2222 f"#SBATCH --time={resources['time']}",
2223 f"#SBATCH --output={stdout_path}",
2224 f"#SBATCH --error={stderr_path}",
2225 f"#SBATCH --account={resources['account']}",
2226 f"#SBATCH --array={array_spec}",
2227 ]
2228 partition = resources.get("partition")
2229 if partition:
2230 lines.append(f"#SBATCH --partition={partition}")
2231 mail_user = notifications.get("mail_user")
2232 mail_type = notifications.get("mail_type")
2233 if mail_user:
2234 lines.append(f"#SBATCH --mail-user={mail_user}")
2235 if mail_type:
2236 lines.append(f"#SBATCH --mail-type={mail_type}")
2237 if isinstance(extra_sbatch, dict):
2238 for key, value in extra_sbatch.items():
2239 flag = str(key)
2240 if not flag.startswith("--"):
2241 flag = f"--{flag}"
2242 if isinstance(value, bool):
2243 if value:
2244 lines.append(f"#SBATCH {flag}")
2245 elif value is not None:
2246 lines.append(f"#SBATCH {flag}={value}")
2247 elif isinstance(extra_sbatch, list):
2248 for token in extra_sbatch:
2249 lines.append(f"#SBATCH {token}")
2250
2251 lines.extend([
2252 "",
2253 "set -euo pipefail",
2254 "",
2255 f'CASE_INDEX_FILE={shlex.quote(case_index_tsv)}',
2256 'LINE=$(sed -n "$((SLURM_ARRAY_TASK_ID + 1))p" "$CASE_INDEX_FILE")',
2257 'if [ -z "$LINE" ]; then',
2258 ' echo "No case entry for array index ${SLURM_ARRAY_TASK_ID}" >&2',
2259 ' exit 1',
2260 "fi",
2261 "IFS=$'\\t' read -r CASE_INDEX CASE_ID RUN_DIR CONTROL_FILE POST_RECIPE_FILE LOG_LEVEL POST_PREFIX <<< \"$LINE\"",
2262 'cd "$RUN_DIR"',
2263 'echo "[$(date)] Starting case ${CASE_ID} (array index ${SLURM_ARRAY_TASK_ID})"',
2264 'export LOG_LEVEL="${LOG_LEVEL}"',
2265 ])
2266
2267 for setup_line in module_setup:
2268 lines.append(str(setup_line))
2269
2270 if stage == "solve":
2271 cmd = build_cluster_launch_command(
2272 cluster_cfg,
2273 solver_exe,
2274 ["-control_file", "$CONTROL_FILE"]
2275 )
2276 else:
2277 cmd = build_cluster_launch_command(
2278 cluster_cfg,
2279 post_exe,
2280 ["-control_file", "$CONTROL_FILE", "-postprocessing_config_file", "$POST_RECIPE_FILE"]
2281 )
2282
2283 # Keep shell variables unresolved inside sbatch script.
2284 def _token(tok: str) -> str:
2285 if tok.startswith("$"):
2286 return tok
2287 return shlex.quote(str(tok))
2288
2289 lines.append(" ".join(_token(t) for t in cmd))
2290 lines.append('echo "[$(date)] Completed case ${CASE_ID}"')
2291
2292 os.makedirs(os.path.dirname(script_path), exist_ok=True)
2293 with open(script_path, "w") as f:
2294 f.write("\n".join(lines) + "\n")
2295 os.chmod(script_path, 0o755)
2296
2297def extract_metric_from_csv(case_dir: str, spec: dict):
2298 """Extract a scalar metric from a CSV source."""
2299 file_glob = spec.get("file_glob", "**/*_msd.csv")
2300 candidates = sorted(glob.glob(os.path.join(case_dir, file_glob), recursive=True))
2301 if not candidates:
2302 return None
2303 csv_path = candidates[0]
2304 rows = []
2305 with open(csv_path, "r", newline="") as f:
2306 reader = csv.DictReader(f)
2307 if reader.fieldnames:
2308 for row in reader:
2309 rows.append(row)
2310 if not rows:
2311 return None
2312 column = spec.get("column")
2313 if not column:
2314 for name in reversed(reader.fieldnames):
2315 if name and name.lower() not in {"step", "time", "timestep"}:
2316 column = name
2317 break
2318 if not column:
2319 return None
2320 values = []
2321 for row in rows:
2322 try:
2323 values.append(float(row[column]))
2324 except Exception:
2325 continue
2326 else:
2327 return None
2328 if not values:
2329 return None
2330 reduction = str(spec.get("reduction", "last")).lower()
2331 if reduction == "mean":
2332 return float(np.mean(values))
2333 if reduction == "min":
2334 return float(np.min(values))
2335 if reduction == "max":
2336 return float(np.max(values))
2337 return float(values[-1])
2338
2339def extract_metric_from_log(case_dir: str, spec: dict):
2340 """Extract a scalar metric from a log file using regex."""
2341 file_glob = spec.get("file_glob", "logs/*.log")
2342 regex = spec.get("regex")
2343 if not regex:
2344 return None
2345 candidates = sorted(glob.glob(os.path.join(case_dir, file_glob), recursive=True))
2346 if not candidates:
2347 return None
2348 pattern = re.compile(regex)
2349 values = []
2350 for path in candidates:
2351 try:
2352 with open(path, "r", encoding="utf-8", errors="replace") as f:
2353 for line in f:
2354 m = pattern.search(line)
2355 if m:
2356 try:
2357 values.append(float(m.group(1)))
2358 except Exception:
2359 pass
2360 except OSError:
2361 continue
2362 if not values:
2363 return None
2364 reduction = str(spec.get("reduction", "last")).lower()
2365 if reduction == "mean":
2366 return float(np.mean(values))
2367 if reduction == "min":
2368 return float(np.min(values))
2369 if reduction == "max":
2370 return float(np.max(values))
2371 return float(values[-1])
2372
2373def normalize_metric_spec(metric):
2374 """Normalize study metric definitions to a common dictionary form."""
2375 if isinstance(metric, str):
2376 if metric.lower() in {"msd", "msd_final"}:
2377 return {
2378 "name": "msd_final",
2379 "source": "statistics_csv",
2380 "file_glob": "**/*_msd.csv",
2381 "reduction": "last",
2382 }
2383 return {"name": metric, "source": "log_regex", "regex": metric}
2384 return dict(metric)
2385
2386def aggregate_study_metrics(study_cfg: dict, cases: list, results_dir: str) -> str:
2387 """Collect metric values from generated case directories into one CSV."""
2388 metrics = study_cfg.get("metrics", [])
2389 if not metrics:
2390 metrics = ["msd_final"]
2391 normalized_specs = [normalize_metric_spec(m) for m in metrics]
2392
2393 rows = []
2394 for case in cases:
2395 row = {"case_id": case["case_id"]}
2396 for p_key, p_val in case["parameters"].items():
2397 row[p_key] = p_val
2398 for spec in normalized_specs:
2399 name = spec.get("name", "metric")
2400 source = str(spec.get("source", "")).lower()
2401 if source in {"statistics_csv", "csv"}:
2402 row[name] = extract_metric_from_csv(case["run_dir"], spec)
2403 elif source in {"log_regex", "log"}:
2404 row[name] = extract_metric_from_log(case["run_dir"], spec)
2405 else:
2406 row[name] = None
2407 rows.append(row)
2408
2409 if not rows:
2410 return None
2411
2412 all_keys = []
2413 seen = set()
2414 for row in rows:
2415 for k in row.keys():
2416 if k not in seen:
2417 seen.add(k)
2418 all_keys.append(k)
2419
2420 os.makedirs(results_dir, exist_ok=True)
2421 out_csv = os.path.join(results_dir, "metrics_table.csv")
2422 with open(out_csv, "w", newline="") as f:
2423 writer = csv.DictWriter(f, fieldnames=all_keys)
2424 writer.writeheader()
2425 writer.writerows(rows)
2426 print(f"[SUCCESS] Aggregated metrics table: {os.path.relpath(out_csv)}")
2427 return out_csv
2428
2429def infer_plot_x_axis(study_cfg: dict, rows: list):
2430 """Infer x-axis key/values for study plots."""
2431 params = list((study_cfg.get("parameters") or {}).keys())
2432 if not params or not rows:
2433 return None, None
2434
2435 study_type = study_cfg.get("study_type")
2436 if study_type == "grid_independence":
2437 has_im = "case.grid.programmatic_settings.im" in params
2438 has_jm = "case.grid.programmatic_settings.jm" in params
2439 has_km = "case.grid.programmatic_settings.km" in params
2440 if has_im and has_jm and has_km:
2441 xs = []
2442 for row in rows:
2443 try:
2444 im = float(row["case.grid.programmatic_settings.im"])
2445 jm = float(row["case.grid.programmatic_settings.jm"])
2446 km = float(row["case.grid.programmatic_settings.km"])
2447 xs.append((im * jm * km) ** (1.0 / 3.0))
2448 except Exception:
2449 return None, None
2450 return "N^(1/3)", xs
2451
2452 primary = params[0]
2453 xs = []
2454 for row in rows:
2455 try:
2456 xs.append(float(row[primary]))
2457 except Exception:
2458 return None, None
2459 return primary, xs
2460
2461def generate_study_plots(study_cfg: dict, metrics_csv: str, plots_dir: str):
2462 """Generate metric-vs-parameter plots for completed studies."""
2463 plotting_cfg = study_cfg.get("plotting", {}) or {}
2464 if plotting_cfg.get("enabled", True) is False:
2465 print("[INFO] Plotting disabled by study.yml.")
2466 return []
2467 if plt is None:
2468 print("[WARNING] matplotlib not available; skipping plot generation.")
2469 return []
2470 if not metrics_csv or not os.path.isfile(metrics_csv):
2471 return []
2472
2473 with open(metrics_csv, "r", newline="") as f:
2474 reader = csv.DictReader(f)
2475 rows = list(reader)
2476 if not rows:
2477 return []
2478
2479 x_name, x_values = infer_plot_x_axis(study_cfg, rows)
2480 if not x_name or x_values is None:
2481 print("[WARNING] Could not infer numeric x-axis for plots; skipping.")
2482 return []
2483
2484 metric_keys = []
2485 param_keys = list((study_cfg.get("parameters") or {}).keys())
2486 for key in rows[0].keys():
2487 if key in {"case_id"}:
2488 continue
2489 if key in param_keys:
2490 continue
2491 metric_keys.append(key)
2492
2493 out_format = plotting_cfg.get("output_format", "png")
2494 os.makedirs(plots_dir, exist_ok=True)
2495 generated = []
2496 for metric in metric_keys:
2497 y_values = []
2498 ok = True
2499 for row in rows:
2500 try:
2501 y_values.append(float(row[metric]))
2502 except Exception:
2503 ok = False
2504 break
2505 if not ok:
2506 continue
2507 plt.figure(figsize=(7.0, 4.2))
2508 plt.plot(x_values, y_values, marker="o", linewidth=1.5)
2509 plt.xlabel(x_name)
2510 plt.ylabel(metric)
2511 plt.title(f"{metric} vs {x_name}")
2512 plt.grid(True, alpha=0.3)
2513 out_path = os.path.join(plots_dir, f"{metric}_vs_{x_name.replace('/', '_')}.{out_format}")
2514 plt.tight_layout()
2515 plt.savefig(out_path, dpi=150)
2516 plt.close()
2517 generated.append(out_path)
2518 if generated:
2519 print(f"[SUCCESS] Generated {len(generated)} plot(s) in {os.path.relpath(plots_dir)}")
2520 return generated
2521
2522
2523def _command_to_string(command_tokens: list) -> str:
2524 """Render a command list as a shell-safe display string."""
2525 return " ".join(shlex.quote(str(tok)) for tok in command_tokens)
2526
2527
2528def _resolve_post_source_directory_preview(run_dir: str, monitor_cfg: dict, post_cfg: dict) -> str:
2529 """Resolve post source directory without side effects or stdout/stderr output."""
2530 solver_output_dir_rel = monitor_cfg.get('io', {}).get('directories', {}).get('output', 'results')
2531 solver_output_dir_abs = os.path.join(run_dir, solver_output_dir_rel)
2532 source_dir_template = post_cfg.get('source_data', {}).get('directory', '<solver_output_dir>')
2533 if source_dir_template == '<solver_output_dir>':
2534 return solver_output_dir_abs
2535 return os.path.abspath(os.path.join(run_dir, source_dir_template))
2536
2537
2538def build_run_dry_plan(args) -> dict:
2539 """Build a no-write execution plan for `run --dry-run`."""
2540 plan = {
2541 "mode": "dry-run",
2542 "created_at": datetime.now().isoformat(),
2543 "warnings": [],
2544 "inputs": {},
2545 "stages": {},
2546 "artifacts": [],
2547 }
2548
2549 if args.dry_run and args.no_submit:
2550 plan["warnings"].append("--dry-run takes precedence over --no-submit; no files will be written.")
2551
2552 cluster_mode = bool(getattr(args, "cluster", None))
2553 cluster_cfg = None
2554 cluster_path = None
2555 effective_num_procs = args.num_procs
2556 run_id = None
2557 run_dir = None
2558 solver_control_path = None
2559 loaded_case_cfg = None
2560 loaded_monitor_cfg = None
2561
2562 if cluster_mode:
2563 cluster_path = os.path.abspath(args.cluster)
2564 cluster_cfg = read_yaml_file(cluster_path)
2565 validate_cluster_config(cluster_cfg, cluster_path)
2566 scheduler_type = str(cluster_cfg.get("scheduler", {}).get("type", "slurm")).lower()
2567 if args.scheduler and args.scheduler.lower() != scheduler_type:
2568 emit_structured_error(
2569 ERROR_CODE_CFG_INCONSISTENT_COMBO,
2570 key="scheduler.type",
2571 file_path=cluster_path,
2572 message=f"--scheduler={args.scheduler} does not match cluster.yml scheduler.type={scheduler_type}.",
2573 )
2574 sys.exit(1)
2575 if scheduler_type != "slurm":
2576 emit_structured_error(
2577 ERROR_CODE_CFG_INVALID_VALUE,
2578 key="scheduler.type",
2579 file_path=cluster_path,
2580 message=f"Unsupported scheduler '{scheduler_type}'. Only Slurm is supported in v1.",
2581 )
2582 sys.exit(1)
2583 cluster_tasks = get_cluster_total_tasks(cluster_cfg)
2584 if args.num_procs not in (1, cluster_tasks):
2585 emit_structured_error(
2586 ERROR_CODE_CFG_INCONSISTENT_COMBO,
2587 key="resources.ntasks_per_node",
2588 file_path=cluster_path,
2589 message=(
2590 "--num-procs must be 1 (auto) or exactly nodes*ntasks_per_node "
2591 f"({cluster_tasks}) in cluster mode."
2592 ),
2593 )
2594 sys.exit(1)
2595 effective_num_procs = cluster_tasks
2596 plan["launch_mode"] = "slurm"
2597 plan["inputs"]["cluster"] = cluster_path
2598 else:
2599 if getattr(args, "scheduler", None):
2600 fail_cli_usage("--scheduler requires --cluster in this version.")
2601 plan["launch_mode"] = "local"
2602
2603 if args.solve:
2604 case_path = os.path.abspath(args.case)
2605 solver_path = os.path.abspath(args.solver)
2606 monitor_path = os.path.abspath(args.monitor)
2607 loaded_case_cfg = read_yaml_file(case_path)
2608 solver_cfg = read_yaml_file(solver_path)
2609 loaded_monitor_cfg = read_yaml_file(monitor_path)
2610 validate_solver_configs(loaded_case_cfg, solver_cfg, loaded_monitor_cfg, case_path, solver_path, monitor_path)
2611
2612 case_name = os.path.splitext(os.path.basename(case_path))[0]
2613 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
2614 run_id = f"{case_name}_{timestamp}"
2615 run_dir = os.path.abspath(os.path.join("runs", run_id))
2616
2617 config_dir = os.path.join(run_dir, "config")
2618 scheduler_dir = os.path.join(run_dir, "scheduler")
2619 logs_dir = os.path.join(run_dir, "logs")
2620 solver_control_path = os.path.join(config_dir, f"{run_id}.control")
2621 whitelist_path = os.path.join(config_dir, "whitelist.run")
2622 profile_path = os.path.join(config_dir, "profile.run")
2623
2624 plan["run_id_preview"] = run_id
2625 plan["run_dir_preview"] = run_dir
2626 plan["inputs"].update({"case": case_path, "solver": solver_path, "monitor": monitor_path})
2627 plan["artifacts"].extend(
2628 [
2629 run_dir,
2630 config_dir,
2631 logs_dir,
2632 os.path.join(run_dir, "results"),
2633 scheduler_dir,
2634 os.path.join(config_dir, "case.yml"),
2635 os.path.join(config_dir, "solver.yml"),
2636 os.path.join(config_dir, "monitor.yml"),
2637 whitelist_path,
2638 profile_path,
2639 solver_control_path,
2640 os.path.join(run_dir, "manifest.json"),
2641 ]
2642 )
2643 if cluster_mode:
2644 plan["artifacts"].append(os.path.join(config_dir, "cluster.yml"))
2645 plan["artifacts"].append(os.path.join(scheduler_dir, "submission.json"))
2646
2647 solver_exe = os.path.join(BIN_DIR, "picsolver")
2648 solver_args = ["-control_file", solver_control_path]
2649 if cluster_mode:
2650 solver_script = os.path.join(scheduler_dir, "solver.sbatch")
2651 solver_cmd = build_cluster_launch_command(cluster_cfg, solver_exe, solver_args)
2652 plan["artifacts"].append(solver_script)
2653 plan["stages"]["solve"] = {
2654 "mode": "slurm",
2655 "script": solver_script,
2656 "launch_command": solver_cmd,
2657 "launch_command_string": _command_to_string(solver_cmd),
2658 }
2659 else:
2660 solver_cmd = [solver_exe] + solver_args
2661 if effective_num_procs > 1:
2662 solver_cmd = ["mpiexec", "-n", str(effective_num_procs)] + solver_cmd
2663 plan["stages"]["solve"] = {
2664 "mode": "local",
2665 "launch_command": solver_cmd,
2666 "launch_command_string": _command_to_string(solver_cmd),
2667 }
2668
2669 if args.post_process:
2670 post_path = os.path.abspath(args.post)
2671 plan["inputs"]["post"] = post_path
2672 post_cfg = read_yaml_file(post_path)
2673 validate_post_config(post_cfg, post_path)
2674
2675 if args.run_dir:
2676 run_dir = os.path.abspath(args.run_dir)
2677 if not os.path.isdir(run_dir):
2678 emit_structured_error(
2679 ERROR_CODE_CFG_FILE_NOT_FOUND,
2680 key="run-dir",
2681 file_path=run_dir,
2682 message="Specified run directory not found.",
2683 )
2684 sys.exit(1)
2685 run_id = os.path.basename(run_dir)
2686 elif not args.solve:
2687 fail_cli_usage("--post-process requires --run-dir when not used with --solve.")
2688
2689 if args.run_dir:
2690 config_dir = os.path.join(run_dir, "config")
2691 case_path, monitor_path, solver_control_path = auto_identify_run_inputs(config_dir)
2692 if not all([case_path, monitor_path, solver_control_path]):
2693 emit_structured_error(
2694 ERROR_CODE_CFG_MISSING_KEY,
2695 key="run_dir.config",
2696 file_path=config_dir,
2697 message=(
2698 "Could not auto-identify required run inputs "
2699 "(case.yml/monitor.yml/*.control) in run config directory."
2700 ),
2701 )
2702 sys.exit(1)
2703 loaded_case_cfg = read_yaml_file(case_path)
2704 loaded_monitor_cfg = read_yaml_file(monitor_path)
2705 else:
2706 config_dir = os.path.join(run_dir, "config")
2707 case_path = os.path.join(config_dir, "case.yml")
2708 monitor_path = os.path.join(config_dir, "monitor.yml")
2709 if solver_control_path is None:
2710 solver_control_path = os.path.join(config_dir, f"{run_id}.control")
2711
2712 resolved_source_dir = _resolve_post_source_directory_preview(run_dir, loaded_monitor_cfg, post_cfg)
2713 post_recipe_path = os.path.join(config_dir, "post.run")
2714 output_dir_rel = post_cfg.get("io", {}).get("output_directory")
2715 output_prefix = post_cfg.get("io", {}).get("output_filename_prefix")
2716 if not output_dir_rel or not output_prefix:
2717 emit_structured_error(
2718 ERROR_CODE_CFG_MISSING_KEY,
2719 key="io.output_directory/io.output_filename_prefix",
2720 file_path=post_path,
2721 message="Missing required post IO keys.",
2722 )
2723 sys.exit(1)
2724 output_dir_abs = os.path.abspath(os.path.join(run_dir, output_dir_rel))
2725 post_exe = os.path.join(BIN_DIR, "postprocessor")
2726 post_args = ["-control_file", solver_control_path, "-postprocessing_config_file", post_recipe_path]
2727 plan["artifacts"].extend([post_recipe_path, output_dir_abs])
2728
2729 if cluster_mode:
2730 scheduler_dir = os.path.join(run_dir, "scheduler")
2731 post_script = os.path.join(scheduler_dir, "post.sbatch")
2732 post_cmd = build_cluster_launch_command(cluster_cfg, post_exe, post_args)
2733 plan["artifacts"].append(post_script)
2734 plan["stages"]["post-process"] = {
2735 "mode": "slurm",
2736 "script": post_script,
2737 "source_data_directory": resolved_source_dir,
2738 "launch_command": post_cmd,
2739 "launch_command_string": _command_to_string(post_cmd),
2740 }
2741 else:
2742 post_cmd = [post_exe] + post_args
2743 if effective_num_procs > 1:
2744 post_cmd = ["mpiexec", "-n", str(effective_num_procs)] + post_cmd
2745 plan["stages"]["post-process"] = {
2746 "mode": "local",
2747 "source_data_directory": resolved_source_dir,
2748 "launch_command": post_cmd,
2749 "launch_command_string": _command_to_string(post_cmd),
2750 }
2751
2752 # Preserve insertion order while removing duplicates.
2753 deduped = []
2754 seen = set()
2755 for item in plan["artifacts"]:
2756 if item not in seen:
2757 seen.add(item)
2758 deduped.append(item)
2759 plan["artifacts"] = deduped
2760 if run_id and "run_id_preview" not in plan:
2761 plan["run_id_preview"] = run_id
2762 if run_dir and "run_dir_preview" not in plan:
2763 plan["run_dir_preview"] = run_dir
2764 plan["num_procs_effective"] = effective_num_procs
2765 return plan
2766
2767
2768def render_run_dry_plan(plan: dict, output_format: str = "text"):
2769 """Render dry-run plan in human or JSON format."""
2770 if output_format == "json":
2771 print(json.dumps(plan, indent=2, sort_keys=True))
2772 return
2773
2774 print("\n" + "=" * 60)
2775 print(" DRY-RUN PLAN")
2776 print("=" * 60)
2777 print(f" Launch mode : {plan.get('launch_mode')}")
2778 print(f" Created at : {plan.get('created_at')}")
2779 if plan.get("run_id_preview"):
2780 print(f" Run ID preview : {plan.get('run_id_preview')}")
2781 if plan.get("run_dir_preview"):
2782 print(f" Run dir preview: {plan.get('run_dir_preview')}")
2783 print(f" MPI processes : {plan.get('num_procs_effective')}")
2784 if plan.get("warnings"):
2785 print(" Warnings :")
2786 for warning in plan["warnings"]:
2787 print(f" - {warning}")
2788
2789 if plan.get("inputs"):
2790 print("\n Inputs:")
2791 for key, value in plan["inputs"].items():
2792 print(f" - {key}: {value}")
2793
2794 if plan.get("stages"):
2795 print("\n Planned stage commands:")
2796 for stage, details in plan["stages"].items():
2797 print(f" - {stage} ({details.get('mode')}):")
2798 print(f" {details.get('launch_command_string')}")
2799
2800 print("\n Planned artifacts (no files created in dry-run):")
2801 for artifact in plan.get("artifacts", []):
2802 print(f" - {artifact}")
2803 print("=" * 60)
2804
2805
2806def validate_workflow(args):
2807 """Implements `pic.flow validate` without launching solver/post workflows."""
2808 checked = []
2809 solver_group_selected = any([args.case, args.solver, args.monitor])
2810 any_group_selected = solver_group_selected or any([args.post, args.cluster, args.study])
2811
2812 if not any_group_selected:
2813 fail_cli_usage(
2814 "validate requires at least one config group. Provide solver trio and/or --post/--cluster/--study.",
2815 hint="Example: pic.flow validate --case case.yml --solver solver.yml --monitor monitor.yml --post post.yml",
2816 )
2817
2818 if solver_group_selected and not all([args.case, args.solver, args.monitor]):
2819 fail_cli_usage("When solver validation is requested, --case, --solver, and --monitor are all required.")
2820
2821 if solver_group_selected:
2822 case_path = os.path.abspath(args.case)
2823 solver_path = os.path.abspath(args.solver)
2824 monitor_path = os.path.abspath(args.monitor)
2825 case_cfg = read_yaml_file(case_path)
2826 solver_cfg = read_yaml_file(solver_path)
2827 monitor_cfg = read_yaml_file(monitor_path)
2828 validate_solver_configs(case_cfg, solver_cfg, monitor_cfg, case_path, solver_path, monitor_path)
2829 checked.extend([case_path, solver_path, monitor_path])
2830
2831 post_cfg = None
2832 if args.post:
2833 post_path = os.path.abspath(args.post)
2834 post_cfg = read_yaml_file(post_path)
2835 validate_post_config(post_cfg, post_path)
2836 checked.append(post_path)
2837
2838 cluster_cfg = None
2839 if args.cluster:
2840 cluster_path = os.path.abspath(args.cluster)
2841 cluster_cfg = read_yaml_file(cluster_path)
2842 validate_cluster_config(cluster_cfg, cluster_path)
2843 checked.append(cluster_path)
2844
2845 study_cfg = None
2846 if args.study:
2847 study_path = os.path.abspath(args.study)
2848 study_cfg = read_yaml_file(study_path)
2849 validate_study_config(study_cfg, study_path)
2850 checked.append(study_path)
2851
2852 if args.strict and post_cfg is not None:
2853 post_path = os.path.abspath(args.post)
2854 source_dir = post_cfg.get("source_data", {}).get("directory")
2855 if source_dir and source_dir != "<solver_output_dir>":
2856 resolved = resolve_path(post_path, source_dir)
2857 if not os.path.isdir(resolved):
2858 emit_structured_error(
2859 ERROR_CODE_CFG_FILE_NOT_FOUND,
2860 key="source_data.directory",
2861 file_path=post_path,
2862 message=f"strict mode: source_data.directory resolves to missing directory '{resolved}'.",
2863 )
2864 sys.exit(1)
2865
2866 if args.strict and study_cfg is not None:
2867 study_path = os.path.abspath(args.study)
2868 base_cfgs = study_cfg.get("base_configs", {})
2869 if isinstance(base_cfgs, dict):
2870 base_case_path = resolve_path(study_path, base_cfgs.get("case"))
2871 base_solver_path = resolve_path(study_path, base_cfgs.get("solver"))
2872 base_monitor_path = resolve_path(study_path, base_cfgs.get("monitor"))
2873 base_post_path = resolve_path(study_path, base_cfgs.get("post"))
2874 if all([base_case_path, base_solver_path, base_monitor_path]):
2875 validate_solver_configs(
2876 read_yaml_file(base_case_path),
2877 read_yaml_file(base_solver_path),
2878 read_yaml_file(base_monitor_path),
2879 base_case_path,
2880 base_solver_path,
2881 base_monitor_path,
2882 )
2883 if base_post_path:
2884 validate_post_config(read_yaml_file(base_post_path), base_post_path)
2885
2886 print(f"[SUCCESS] Validation completed for {len(checked)} file(s).")
2887 for path in checked:
2888 print(f" - {path}")
2889
2890def run_workflow(args):
2891 """Main orchestrator for the 'run' command (local and Slurm modes)."""
2892 if getattr(args, "dry_run", False):
2893 plan = build_run_dry_plan(args)
2894 render_run_dry_plan(plan, output_format=getattr(args, "output_format", "text"))
2895 return
2896
2897 run_dir = None
2898 run_id = None
2899 output_dir_abs = None
2900 workflow_start = time.time()
2901 stages_completed = []
2902 configs = None
2903 submission_meta = {"launch_mode": "local", "stages": {}}
2904
2905 cluster_mode = bool(getattr(args, "cluster", None))
2906 cluster_cfg = None
2907 cluster_path = None
2908 effective_num_procs = args.num_procs
2909
2910 if cluster_mode:
2911 cluster_path = os.path.abspath(args.cluster)
2912 cluster_cfg = read_yaml_file(cluster_path)
2913 validate_cluster_config(cluster_cfg, cluster_path)
2914 scheduler_type = str(cluster_cfg.get("scheduler", {}).get("type", "slurm")).lower()
2915 if args.scheduler and args.scheduler.lower() != scheduler_type:
2916 print(
2917 f"[FATAL] --scheduler={args.scheduler} does not match cluster.yml scheduler.type={scheduler_type}.",
2918 file=sys.stderr
2919 )
2920 sys.exit(1)
2921 if scheduler_type != "slurm":
2922 print(f"[FATAL] Unsupported scheduler '{scheduler_type}'. Only Slurm is supported in v1.", file=sys.stderr)
2923 sys.exit(1)
2924 cluster_tasks = get_cluster_total_tasks(cluster_cfg)
2925 if args.num_procs not in (1, cluster_tasks):
2926 print(
2927 f"[FATAL] In cluster mode, --num-procs must be 1 (auto) or exactly nodes*ntasks_per_node ({cluster_tasks}).",
2928 file=sys.stderr
2929 )
2930 sys.exit(1)
2931 effective_num_procs = cluster_tasks
2932 submission_meta["launch_mode"] = "slurm"
2933 submission_meta["cluster_config"] = cluster_path
2934 submission_meta["no_submit"] = bool(args.no_submit)
2935 print(f"[INFO] Cluster mode enabled (Slurm). Using {effective_num_procs} MPI tasks from cluster.yml.")
2936 elif getattr(args, "scheduler", None):
2937 print("[FATAL] --scheduler requires --cluster in this version.", file=sys.stderr)
2938 sys.exit(1)
2939
2940 # --- Stage 1: Solver (if requested) ---
2941 if args.solve:
2942 configs = {
2943 'case': read_yaml_file(args.case), 'case_path': os.path.abspath(args.case),
2944 'solver': read_yaml_file(args.solver), 'solver_path': os.path.abspath(args.solver),
2945 'monitor': read_yaml_file(args.monitor), 'monitor_path': os.path.abspath(args.monitor)
2946 }
2947
2948 print("\n[INFO] Validating configuration files...")
2949 validate_solver_configs(
2950 configs['case'], configs['solver'], configs['monitor'],
2951 args.case, args.solver, args.monitor
2952 )
2953 print("[SUCCESS] All configuration files passed validation.\n")
2954
2955 case_name = os.path.splitext(os.path.basename(args.case))[0]
2956 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
2957 run_id = f"{case_name}_{timestamp}"
2958 run_dir = os.path.abspath(os.path.join("runs", run_id))
2959
2960 config_dir = os.path.join(run_dir, "config")
2961 for d in [config_dir, "logs", "results", os.path.join(run_dir, "scheduler")]:
2962 os.makedirs(d, exist_ok=True)
2963 print(f"[INFO] Created new self-contained run directory: {os.path.relpath(run_dir)}")
2964
2965 shutil.copy(args.case, os.path.join(config_dir, "case.yml"))
2966 shutil.copy(args.solver, os.path.join(config_dir, "solver.yml"))
2967 shutil.copy(args.monitor, os.path.join(config_dir, "monitor.yml"))
2968 if cluster_mode:
2969 shutil.copy(cluster_path, os.path.join(config_dir, "cluster.yml"))
2970
2971 print("\n" + "="*25 + " SOLVER STAGE " + "="*25)
2972 source_files = {'Case': args.case, 'Solver': args.solver, 'Monitor': args.monitor}
2973 print("[INFO] Generating monitoring files (whitelist.run, profile.run)...")
2974 whitelist_path = generate_simple_list_file(
2975 run_dir, run_id, configs['monitor'], 'logging', 'enabled_functions', 'whitelist.run', source_files
2976 )
2977 profile_path = generate_simple_list_file(
2978 run_dir, run_id, configs['monitor'], 'profiling', 'critical_functions', 'profile.run', source_files
2979 )
2980
2981 monitor_files = {'whitelist': whitelist_path, 'profile': profile_path}
2982 control_file = generate_solver_control_file(run_dir, run_id, configs, effective_num_procs, monitor_files)
2983
2984 solver_exe = os.path.join(BIN_DIR, "picsolver")
2985 solver_args = ["-control_file", control_file]
2986 if cluster_mode:
2987 scheduler_dir = os.path.join(run_dir, "scheduler")
2988 solver_script = os.path.join(scheduler_dir, "solver.sbatch")
2989 solver_log = os.path.join(run_dir, "logs", "solver_%j.out")
2990 solver_err = os.path.join(run_dir, "logs", "solver_%j.err")
2991 solver_cmd = build_cluster_launch_command(cluster_cfg, solver_exe, solver_args)
2992 render_slurm_script(
2993 solver_script,
2994 f"{run_id}_solve",
2995 cluster_cfg,
2996 solver_cmd,
2997 run_dir,
2998 solver_log,
2999 solver_err,
3000 env_vars={"LOG_LEVEL": configs['monitor'].get('logging', {}).get('verbosity', 'INFO').upper()},
3001 )
3002 submission_meta["stages"]["solve"] = {"script": solver_script, "submitted": False}
3003 print(f"[SUCCESS] Generated solver Slurm script: {os.path.relpath(solver_script)}")
3004 if not args.no_submit:
3005 submit_info = submit_sbatch(solver_script)
3006 submission_meta["stages"]["solve"].update(submit_info)
3007 submission_meta["stages"]["solve"]["submitted"] = True
3008 print(f"[SUCCESS] Submitted solver job: {submit_info['job_id']}")
3009 stages_completed.append('solve')
3010 else:
3011 command = [solver_exe] + solver_args
3012 if effective_num_procs > 1:
3013 command = ["mpiexec", "-n", str(effective_num_procs)] + command
3014 execute_command(command, run_dir, f"{run_id}_solver.log", configs['monitor'])
3015 stages_completed.append('solve')
3016
3017 # --- Stage 2: Post-Processing (if requested) ---
3018 if args.post_process:
3019 if args.run_dir:
3020 run_dir = os.path.abspath(args.run_dir)
3021 if not os.path.isdir(run_dir):
3022 print(f"[FATAL] Specified run directory not found: {run_dir}", file=sys.stderr)
3023 sys.exit(1)
3024 print(f"[INFO] Operating on existing run directory: {os.path.relpath(run_dir)}")
3025 run_id = os.path.basename(run_dir)
3026 elif not args.solve:
3027 print("[FATAL] --post-process requires --run-dir when not used with --solve.", file=sys.stderr)
3028 sys.exit(1)
3029
3030 print("\n" + "="*20 + " POST-PROCESSING STAGE " + "="*20)
3031 config_dir = os.path.join(run_dir, "config")
3032 case_path, monitor_path, solver_control_path = auto_identify_run_inputs(config_dir)
3033
3034 if not all([case_path, monitor_path, solver_control_path]):
3035 print(f"[FATAL] Could not automatically identify required config files in {config_dir}", file=sys.stderr)
3036 if not case_path:
3037 print(" - No 'case' file found (expected 'models' + 'boundary_conditions').", file=sys.stderr)
3038 if not monitor_path:
3039 print(" - No 'monitor' file found (expected 'io' + 'logging').", file=sys.stderr)
3040 if not solver_control_path:
3041 print(" - No '.control' file found.", file=sys.stderr)
3042 sys.exit(1)
3043
3044 print(f"[INFO] Auto-identified Case file: {os.path.basename(case_path)}")
3045 print(f"[INFO] Auto-identified Monitor file: {os.path.basename(monitor_path)}")
3046
3047 case_cfg = read_yaml_file(case_path)
3048 monitor_cfg = read_yaml_file(monitor_path)
3049 post_cfg = read_yaml_file(args.post)
3050
3051 print("[INFO] Validating post-processing configuration...")
3052 validate_post_config(post_cfg, args.post)
3053 print("[SUCCESS] Post-processing configuration passed validation.\n")
3054
3055 strict_source_check = not (cluster_mode and args.solve)
3056 resolved_source_dir = resolve_post_source_directory(run_dir, monitor_cfg, post_cfg, strict=strict_source_check)
3057 if 'source_data' not in post_cfg:
3058 post_cfg['source_data'] = {}
3059 post_cfg['source_data']['directory'] = resolved_source_dir
3060
3061 post_io_cfg = post_cfg.get('io', {})
3062 try:
3063 output_dir_rel = post_io_cfg['output_directory']
3064 output_prefix = post_io_cfg['output_filename_prefix']
3065 except KeyError as e:
3066 print(f"[FATAL] Missing required key '{e.args[0]}' in the 'io' section of {args.post}", file=sys.stderr)
3067 sys.exit(1)
3068
3069 output_dir_abs = os.path.abspath(os.path.join(run_dir, output_dir_rel))
3070 os.makedirs(output_dir_abs, exist_ok=True)
3071 print(f"[INFO] Post-processor output directory: {os.path.relpath(output_dir_abs)}")
3072
3073 source_files_post = {'Case': case_path, 'Post-Profile': args.post}
3074 post_recipe_file = generate_post_recipe_file(run_dir, run_id, post_cfg, source_files_post)
3075
3076 post_exe = os.path.join(BIN_DIR, "postprocessor")
3077 post_args = ["-control_file", solver_control_path, "-postprocessing_config_file", post_recipe_file]
3078 if cluster_mode:
3079 scheduler_dir = os.path.join(run_dir, "scheduler")
3080 os.makedirs(scheduler_dir, exist_ok=True)
3081 post_script = os.path.join(scheduler_dir, "post.sbatch")
3082 post_log = os.path.join(run_dir, "logs", "post_%j.out")
3083 post_err = os.path.join(run_dir, "logs", "post_%j.err")
3084 post_cmd = build_cluster_launch_command(cluster_cfg, post_exe, post_args)
3085 render_slurm_script(
3086 post_script,
3087 f"{run_id}_post",
3088 cluster_cfg,
3089 post_cmd,
3090 run_dir,
3091 post_log,
3092 post_err,
3093 env_vars={"LOG_LEVEL": monitor_cfg.get('logging', {}).get('verbosity', 'INFO').upper()},
3094 )
3095 submission_meta["stages"]["post-process"] = {"script": post_script, "submitted": False}
3096 print(f"[SUCCESS] Generated post Slurm script: {os.path.relpath(post_script)}")
3097
3098 if not args.no_submit:
3099 dependency_job = None
3100 if args.solve:
3101 dependency_job = submission_meta.get("stages", {}).get("solve", {}).get("job_id")
3102 submit_info = submit_sbatch(post_script, dependency=dependency_job)
3103 submission_meta["stages"]["post-process"].update(submit_info)
3104 submission_meta["stages"]["post-process"]["submitted"] = True
3105 if dependency_job:
3106 submission_meta["stages"]["post-process"]["dependency"] = f"afterok:{dependency_job}"
3107 print(f"[SUCCESS] Submitted post job: {submit_info['job_id']}")
3108 stages_completed.append('post-process')
3109 else:
3110 command = [post_exe] + post_args
3111 if effective_num_procs > 1:
3112 command = ["mpiexec", "-n", str(effective_num_procs)] + command
3113 execute_command(command, run_dir, f"{run_id}_{output_prefix}.log", monitor_cfg)
3114 stages_completed.append('post-process')
3115
3116 if run_dir:
3117 manifest = {
3118 "run_id": run_id,
3119 "created_at": datetime.now().isoformat(),
3120 "launch_mode": "slurm" if cluster_mode else "local",
3121 "git_commit": get_git_commit(),
3122 "num_procs": effective_num_procs,
3123 "stages_requested": {"solve": bool(args.solve), "post_process": bool(args.post_process)},
3124 "stages_completed_or_submitted": stages_completed,
3125 "inputs": {},
3126 }
3127 if args.solve:
3128 manifest["inputs"]["case"] = os.path.abspath(args.case)
3129 manifest["inputs"]["solver"] = os.path.abspath(args.solver)
3130 manifest["inputs"]["monitor"] = os.path.abspath(args.monitor)
3131 if args.post_process:
3132 manifest["inputs"]["post"] = os.path.abspath(args.post)
3133 if cluster_mode:
3134 manifest["inputs"]["cluster"] = cluster_path
3135 write_json_file(os.path.join(run_dir, "scheduler", "submission.json"), submission_meta)
3136 write_json_file(os.path.join(run_dir, "manifest.json"), manifest)
3137
3138 if stages_completed:
3139 elapsed = time.time() - workflow_start
3140 mins, secs = divmod(int(elapsed), 60)
3141 hrs, mins = divmod(mins, 60)
3142 if hrs > 0:
3143 time_str = f"{hrs}h {mins}m {secs}s"
3144 elif mins > 0:
3145 time_str = f"{mins}m {secs}s"
3146 else:
3147 time_str = f"{secs}s"
3148
3149 print("\n" + "=" * 60)
3150 print(" RUN SUMMARY")
3151 print("=" * 60)
3152 print(f" Run ID : {run_id}")
3153 print(f" Run directory : {os.path.relpath(run_dir)}")
3154 print(f" Wall-clock : {time_str}")
3155 print(f" Stages : {', '.join(stages_completed)}")
3156 print(f" Launch mode : {'slurm' if cluster_mode else 'local'}")
3157 print(f" MPI processes : {effective_num_procs}")
3158 if args.solve and configs:
3159 total_steps = configs['case'].get('run_control', {}).get('total_steps', '?')
3160 result_dir = os.path.join(run_dir, configs['monitor'].get('io', {}).get('directories', {}).get('output', 'results'))
3161 print(f" Steps run : {total_steps}")
3162 print(f" Solver output : {os.path.relpath(result_dir)}")
3163 if 'post-process' in stages_completed and output_dir_abs:
3164 print(f" Post output : {os.path.relpath(output_dir_abs)}")
3165 print(f" Logs : {os.path.relpath(os.path.join(run_dir, 'logs'))}")
3166 if cluster_mode:
3167 submission_file = os.path.join(run_dir, "scheduler", "submission.json")
3168 print(f" Submission meta: {os.path.relpath(submission_file)}")
3169 print("=" * 60)
3170
3171def sweep_workflow(args):
3172 """Study/sweep orchestration using Slurm job arrays."""
3173 study_path = os.path.abspath(args.study)
3174 cluster_path = os.path.abspath(args.cluster)
3175
3176 study_cfg = read_yaml_file(study_path)
3177 cluster_cfg = read_yaml_file(cluster_path)
3178 validate_study_config(study_cfg, study_path)
3179 validate_cluster_config(cluster_cfg, cluster_path)
3180
3181 study_name = os.path.splitext(os.path.basename(study_path))[0]
3182 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
3183 study_id = f"{study_name}_{timestamp}"
3184 study_dir = os.path.abspath(os.path.join("studies", study_id))
3185 cases_dir = os.path.join(study_dir, "cases")
3186 scheduler_dir = os.path.join(study_dir, "scheduler")
3187 results_dir = os.path.join(study_dir, "results")
3188 logs_dir = os.path.join(study_dir, "logs")
3189 for path in [cases_dir, scheduler_dir, results_dir, logs_dir]:
3190 os.makedirs(path, exist_ok=True)
3191
3192 print(f"[INFO] Creating study directory: {os.path.relpath(study_dir)}")
3193 shutil.copy(study_path, os.path.join(study_dir, "study.yml"))
3194 shutil.copy(cluster_path, os.path.join(study_dir, "cluster.yml"))
3195
3196 base_cfgs = study_cfg["base_configs"]
3197 base_paths = {k: resolve_path(study_path, v) for k, v in base_cfgs.items()}
3198 base_case = read_yaml_file(base_paths["case"])
3199 base_solver = read_yaml_file(base_paths["solver"])
3200 base_monitor = read_yaml_file(base_paths["monitor"])
3201 base_post = read_yaml_file(base_paths["post"])
3202 validate_solver_configs(base_case, base_solver, base_monitor, base_paths["case"], base_paths["solver"], base_paths["monitor"])
3203 validate_post_config(base_post, base_paths["post"])
3204
3205 combinations = expand_parameter_matrix(study_cfg["parameters"])
3206 if not combinations:
3207 print("[FATAL] Study parameter matrix expanded to zero cases.", file=sys.stderr)
3208 sys.exit(1)
3209 print(f"[INFO] Expanded sweep matrix to {len(combinations)} case(s).")
3210
3211 cluster_tasks = get_cluster_total_tasks(cluster_cfg)
3212 case_entries = []
3213 case_index_file = os.path.join(scheduler_dir, "case_index.tsv")
3214
3215 for idx, combo in enumerate(combinations):
3216 case_id = f"case_{idx:04d}"
3217 run_dir = os.path.join(cases_dir, case_id)
3218 config_dir = os.path.join(run_dir, "config")
3219 os.makedirs(config_dir, exist_ok=True)
3220 os.makedirs(os.path.join(run_dir, "logs"), exist_ok=True)
3221 os.makedirs(os.path.join(run_dir, "results"), exist_ok=True)
3222
3223 case_cfg = copy.deepcopy(base_case)
3224 solver_cfg = copy.deepcopy(base_solver)
3225 monitor_cfg = copy.deepcopy(base_monitor)
3226 post_cfg = copy.deepcopy(base_post)
3227 target_map = {"case": case_cfg, "solver": solver_cfg, "monitor": monitor_cfg, "post": post_cfg}
3228 for full_key, value in combo.items():
3229 root, nested = full_key.split(".", 1)
3230 _deep_set(target_map[root], nested, value)
3231
3232 # Preserve file-based/grid-gen workflows when study cases are materialized
3233 # into new directories by rewriting external paths as absolute.
3234 absolutize_case_external_paths(case_cfg, base_paths["case"])
3235
3236 case_path = os.path.join(config_dir, "case.yml")
3237 solver_path = os.path.join(config_dir, "solver.yml")
3238 monitor_path = os.path.join(config_dir, "monitor.yml")
3239 post_path = os.path.join(config_dir, "post.yml")
3240 write_yaml_file(case_path, case_cfg)
3241 write_yaml_file(solver_path, solver_cfg)
3242 write_yaml_file(monitor_path, monitor_cfg)
3243 write_yaml_file(post_path, post_cfg)
3244
3245 validate_solver_configs(case_cfg, solver_cfg, monitor_cfg, case_path, solver_path, monitor_path)
3246 validate_post_config(post_cfg, post_path)
3247
3248 source_files = {'Case': case_path, 'Solver': solver_path, 'Monitor': monitor_path}
3249 whitelist_path = generate_simple_list_file(run_dir, case_id, monitor_cfg, 'logging', 'enabled_functions', 'whitelist.run', source_files)
3250 profile_path = generate_simple_list_file(run_dir, case_id, monitor_cfg, 'profiling', 'critical_functions', 'profile.run', source_files)
3251 monitor_files = {"whitelist": whitelist_path, "profile": profile_path}
3252 configs = {
3253 "case": case_cfg, "case_path": case_path,
3254 "solver": solver_cfg, "solver_path": solver_path,
3255 "monitor": monitor_cfg, "monitor_path": monitor_path
3256 }
3257 control_file = generate_solver_control_file(run_dir, case_id, configs, cluster_tasks, monitor_files)
3258
3259 source_dir = resolve_post_source_directory(run_dir, monitor_cfg, post_cfg, strict=False)
3260 if 'source_data' not in post_cfg:
3261 post_cfg['source_data'] = {}
3262 post_cfg['source_data']['directory'] = source_dir
3263 output_prefix = post_cfg.get("io", {}).get("output_filename_prefix", "post")
3264 post_recipe = generate_post_recipe_file(run_dir, case_id, post_cfg, {'Case': case_path, 'Post-Profile': post_path})
3265
3266 case_entries.append({
3267 "index": idx,
3268 "case_id": case_id,
3269 "run_dir": os.path.abspath(run_dir),
3270 "control_file": control_file,
3271 "post_recipe_file": post_recipe,
3272 "log_level": str(monitor_cfg.get("logging", {}).get("verbosity", "INFO")).upper(),
3273 "post_prefix": output_prefix,
3274 "parameters": combo,
3275 })
3276
3277 with open(case_index_file, "w") as f:
3278 for entry in case_entries:
3279 f.write(
3280 "\t".join(
3281 [
3282 str(entry["index"]),
3283 entry["case_id"],
3284 entry["run_dir"],
3285 entry["control_file"],
3286 entry["post_recipe_file"],
3287 entry["log_level"],
3288 entry["post_prefix"],
3289 ]
3290 ) + "\n"
3291 )
3292 print(f"[SUCCESS] Wrote sweep case index: {os.path.relpath(case_index_file)}")
3293
3294 max_idx = len(case_entries) - 1
3295 max_conc = study_cfg.get("execution", {}).get("max_concurrent_array_tasks")
3296 array_spec = f"0-{max_idx}"
3297 if max_conc:
3298 array_spec = f"{array_spec}%{max_conc}"
3299
3300 solver_exe = os.path.join(BIN_DIR, "picsolver")
3301 post_exe = os.path.join(BIN_DIR, "postprocessor")
3302 solver_array_script = os.path.join(scheduler_dir, "solver_array.sbatch")
3303 post_array_script = os.path.join(scheduler_dir, "post_array.sbatch")
3304 render_slurm_array_stage_script(
3305 solver_array_script,
3306 f"{study_id}_solve",
3307 cluster_cfg,
3308 array_spec,
3309 case_index_file,
3310 "solve",
3311 solver_exe,
3312 post_exe,
3313 os.path.join(logs_dir, "solver_%A_%a.out"),
3314 os.path.join(logs_dir, "solver_%A_%a.err")
3315 )
3316 render_slurm_array_stage_script(
3317 post_array_script,
3318 f"{study_id}_post",
3319 cluster_cfg,
3320 array_spec,
3321 case_index_file,
3322 "post",
3323 solver_exe,
3324 post_exe,
3325 os.path.join(logs_dir, "post_%A_%a.out"),
3326 os.path.join(logs_dir, "post_%A_%a.err")
3327 )
3328 print(f"[SUCCESS] Generated Slurm array scripts in {os.path.relpath(scheduler_dir)}")
3329
3330 submission = {
3331 "launch_mode": "slurm",
3332 "study_id": study_id,
3333 "solver_array": {"script": solver_array_script, "submitted": False},
3334 "post_array": {"script": post_array_script, "submitted": False},
3335 "no_submit": bool(args.no_submit),
3336 }
3337 if not args.no_submit:
3338 solver_submit = submit_sbatch(solver_array_script)
3339 submission["solver_array"].update(solver_submit)
3340 submission["solver_array"]["submitted"] = True
3341 post_submit = submit_sbatch(post_array_script, dependency=solver_submit["job_id"])
3342 submission["post_array"].update(post_submit)
3343 submission["post_array"]["submitted"] = True
3344 submission["post_array"]["dependency"] = f"afterok:{solver_submit['job_id']}"
3345 print(f"[SUCCESS] Submitted solver array job: {solver_submit['job_id']}")
3346 print(f"[SUCCESS] Submitted post array job: {post_submit['job_id']}")
3347
3348 metrics_csv = aggregate_study_metrics(study_cfg, case_entries, results_dir)
3349 plots = generate_study_plots(study_cfg, metrics_csv, os.path.join(results_dir, "plots"))
3350
3351 summary = {
3352 "study_id": study_id,
3353 "created_at": datetime.now().isoformat(),
3354 "git_commit": get_git_commit(),
3355 "study_type": study_cfg.get("study_type"),
3356 "num_cases": len(case_entries),
3357 "paths": {
3358 "study_dir": study_dir,
3359 "case_index": case_index_file,
3360 "solver_array_script": solver_array_script,
3361 "post_array_script": post_array_script,
3362 "metrics_table": metrics_csv,
3363 "plots_dir": os.path.join(results_dir, "plots"),
3364 },
3365 "submission": submission,
3366 }
3367 write_json_file(os.path.join(scheduler_dir, "submission.json"), submission)
3368 write_json_file(os.path.join(study_dir, "study_manifest.json"), summary)
3369 write_json_file(os.path.join(results_dir, "summary.json"), {"study_id": study_id, "metrics_csv": metrics_csv, "plots": plots})
3370
3371 print("\n" + "=" * 60)
3372 print(" STUDY SUMMARY")
3373 print("=" * 60)
3374 print(f" Study ID : {study_id}")
3375 print(f" Study directory : {os.path.relpath(study_dir)}")
3376 print(f" Cases generated : {len(case_entries)}")
3377 print(f" Array spec : {array_spec}")
3378 print(f" Solver script : {os.path.relpath(solver_array_script)}")
3379 print(f" Post script : {os.path.relpath(post_array_script)}")
3380 if metrics_csv:
3381 print(f" Metrics table : {os.path.relpath(metrics_csv)}")
3382 if plots:
3383 print(f" Plots : {os.path.relpath(os.path.join(results_dir, 'plots'))}")
3384 print("=" * 60)
3385
3386
3387def init_case(args):
3388 """!
3389 @brief Implements the 'init' command.
3390 @details Creates a new case study directory by copying a template. It can then
3391 either create relative symbolic links to the project's executables
3392 (default) or create a full copy of them for a self-contained study.
3393 @param[in] args The command-line arguments parsed by argparse.
3394 """
3395 template_path = os.path.join(PROJECT_ROOT, "examples", args.template_name)
3396 # The destination path is relative to the current working directory.
3397 dest_path = os.path.abspath(os.path.join(os.getcwd(), args.dest_name if args.dest_name else args.template_name))
3398
3399 if not os.path.isdir(template_path):
3400 print(f"[FATAL] Case template '{args.template_name}' not found at '{template_path}'", file=sys.stderr)
3401 sys.exit(1)
3402 if os.path.exists(dest_path):
3403 print(f"[FATAL] Destination directory '{dest_path}' already exists.", file=sys.stderr)
3404 sys.exit(1)
3405
3406 print(f"[INFO] Initializing new case '{os.path.basename(dest_path)}' from template '{args.template_name}'...")
3407
3408 shutil.copytree(template_path, dest_path)
3409 print(f"[SUCCESS] Copied template files to: {dest_path}")
3410
3411 if args.copy_binaries:
3412 print("[INFO] Copying project binaries for a self-contained case...")
3413 else:
3414 print("[INFO] Creating symbolic links to project binaries...")
3415
3416 main_bin_dir = os.path.join(PROJECT_ROOT, "bin")
3417
3418 try:
3419 # We only want to copy/link the actual executables, not the orchestrator script itself.
3420 binaries = [f for f in os.listdir(main_bin_dir) if f != 'pic.flow' and os.path.isfile(os.path.join(main_bin_dir, f))]
3421 if not binaries:
3422 print("[WARNING] Main project bin/ directory contains no executables. Nothing to link or copy.", file=sys.stderr)
3423 return
3424
3425 for binary_name in binaries:
3426 source_path_abs = os.path.abspath(os.path.join(main_bin_dir, binary_name))
3427 dest_file_path = os.path.join(dest_path, binary_name)
3428
3429 if args.copy_binaries:
3430 # Use copy2 to preserve permissions and metadata
3431 shutil.copy2(source_path_abs, dest_file_path)
3432 print(f" - Copied '{binary_name}'")
3433 else:
3434 relative_source_path = os.path.relpath(source_path_abs, start=dest_path)
3435 os.symlink(relative_source_path, dest_file_path)
3436 print(f" - Linked '{binary_name}'")
3437
3438 print("[SUCCESS] Binaries are now available in your study directory.")
3439 print(" You can now 'cd' into your study and run commands locally (e.g., './picsolver ...').")
3440
3441 except Exception as e:
3442 print(f"[ERROR] Failed to process binaries: {e}", file=sys.stderr)
3443 print(" Your case files were copied, but you will need to run commands from the project root.", file=sys.stderr)
3444
3445def build_project(args):
3446 """!
3447 @brief Implements the 'build' command.
3448 @details Executes the top-level build.sh script, passing through any
3449 additional arguments directly to 'make'. This allows for building,
3450 cleaning, and other Makefile targets via the orchestrator.
3451 @param[in] args The command-line arguments parsed by argparse.
3452 """
3453
3454 print("\n" + "="*27 + " BUILD STAGE " + "="*27)
3455 build_script_path = os.path.join(PROJECT_ROOT, "build.sh")
3456
3457 if not os.path.isfile(build_script_path):
3458 print(f"[FATAL] Build script not found at expected location: {build_script_path}", file=sys.stderr)
3459 print(" Please ensure 'build.sh' exists in the project root directory.", file=sys.stderr)
3460 sys.exit(1)
3461
3462 # The command is the script itself, plus any passthrough arguments for make.
3463 # command = [build_script_path] + args.make_args
3464 command = ['/bin/bash', build_script_path] + args.make_args
3465 # For the build process, we don't have a monitor.yml, so we pass an empty
3466 # dict to execute_command. The command should be run in the project root.
3467 execute_command(command, PROJECT_ROOT, "build.log", {})
3468
3469
3470
3471
3472# ==============================================================================
3473# MAIN COMMAND-LINE INTERFACE PARSER
3474# ==============================================================================
3475
3476def _add_run_parser(subparsers):
3477 """Attach `run` parser with staged execution and dry-run support."""
3478 p_run = subparsers.add_parser(
3479 "run",
3480 help="Execute a simulation workflow (solve and/or post-process).",
3481 formatter_class=argparse.RawTextHelpFormatter,
3482 description=(
3483 "Execute solver and/or post-processing stages.\n\n"
3484 "Examples:\n"
3485 " pic.flow run --solve --post-process -n 8 --case case.yml --solver solver.yml --monitor monitor.yml --post post.yml\n"
3486 " pic.flow run --post-process --run-dir runs/my_run --post post.yml\n"
3487 " pic.flow run --solve --post-process --case case.yml --solver solver.yml --monitor monitor.yml --post post.yml --cluster cluster.yml --no-submit\n"
3488 " pic.flow run --solve --post-process --case case.yml --solver solver.yml --monitor monitor.yml --post post.yml --dry-run\n"
3489 " pic.flow run --solve --post-process --case case.yml --solver solver.yml --monitor monitor.yml --post post.yml --dry-run --format json"
3490 ),
3491 epilog="Next: run `pic.flow validate ...` first for config-only checks.",
3492 )
3493 run_group = p_run.add_argument_group("stages")
3494 run_group.add_argument("--solve", action="store_true", help="Execute the solver stage (creates a new run directory).")
3495 run_group.add_argument("--post-process", action="store_true", help="Execute the post-processing stage on a run directory.")
3496
3497 solver_group = p_run.add_argument_group("solver inputs (required for --solve)")
3498 solver_group.add_argument("--case", help="Path to the case definition file (e.g., case.yml).")
3499 solver_group.add_argument("--solver", help="Path to the solver settings profile (e.g., solver.yml).")
3500 solver_group.add_argument("--monitor", help="Path to the monitoring and I/O profile (e.g., monitor.yml).")
3501
3502 post_group = p_run.add_argument_group("post-processor inputs (required for --post-process)")
3503 post_group.add_argument("--run-dir", help="Path to an existing run directory to post-process.\n(Not needed if running with --solve in the same command).")
3504 post_group.add_argument("--post", help="Path to the post-processing recipe file (e.g., post.yml).")
3505
3506 p_run.add_argument("-n", "--num-procs", type=int, default=1, help="Number of MPI processes for either stage.")
3507 p_run.add_argument("--cluster", help="Path to cluster.yml for Slurm execution mode.")
3508 p_run.add_argument("--scheduler", help="Explicit scheduler selector (currently 'slurm').")
3509 p_run.add_argument("--no-submit", action="store_true", help="Generate Slurm scripts/manifests but do not call sbatch.")
3510 p_run.add_argument("--dry-run", action="store_true", help="Resolve and print planned commands/artifacts without writing files.")
3511 p_run.add_argument(
3512 "--format",
3513 dest="output_format",
3514 choices=["text", "json"],
3515 default="text",
3516 help="Output format for --dry-run (default: text).",
3517 )
3518 return p_run
3519
3520
3521def _add_sweep_parser(subparsers):
3522 p_sweep = subparsers.add_parser(
3523 "sweep",
3524 help="Launch a Slurm-based parameter sweep/study.",
3525 formatter_class=argparse.RawTextHelpFormatter,
3526 description=(
3527 "Launch matrix studies from study.yml + cluster.yml.\n\n"
3528 "Examples:\n"
3529 " pic.flow sweep --study study.yml --cluster cluster.yml\n"
3530 " pic.flow sweep --study study.yml --cluster cluster.yml --no-submit"
3531 ),
3532 epilog="Next: inspect studies/<study_id>/results/metrics_table.csv for aggregated metrics.",
3533 )
3534 p_sweep.add_argument("--study", required=True, help="Path to study.yml defining parameter matrix and metrics.")
3535 p_sweep.add_argument("--cluster", required=True, help="Path to cluster.yml defining Slurm resources.")
3536 p_sweep.add_argument("--no-submit", action="store_true", help="Generate all study artifacts without submitting jobs.")
3537 return p_sweep
3538
3539
3540def _add_validate_parser(subparsers):
3541 p_validate = subparsers.add_parser(
3542 "validate",
3543 help="Validate config files without launching solver/post.",
3544 formatter_class=argparse.RawTextHelpFormatter,
3545 description=(
3546 "Validate one or more config roles. No solver/post execution and no run/study artifact writes.\n\n"
3547 "Examples:\n"
3548 " pic.flow validate --case case.yml --solver solver.yml --monitor monitor.yml\n"
3549 " pic.flow validate --post post.yml --cluster cluster.yml\n"
3550 " pic.flow validate --study study.yml --cluster cluster.yml --strict"
3551 ),
3552 epilog="Next: run `pic.flow run --dry-run ...` to inspect resolved commands/artifacts.",
3553 )
3554 p_validate.add_argument("--case", help="Path to case.yml")
3555 p_validate.add_argument("--solver", help="Path to solver.yml")
3556 p_validate.add_argument("--monitor", help="Path to monitor.yml")
3557 p_validate.add_argument("--post", help="Path to post.yml")
3558 p_validate.add_argument("--cluster", help="Path to cluster.yml")
3559 p_validate.add_argument("--study", help="Path to study.yml")
3560 p_validate.add_argument("--strict", action="store_true", help="Enable additional strict checks for selected roles.")
3561 return p_validate
3562
3563
3564def _add_init_parser(subparsers):
3565 p_init = subparsers.add_parser(
3566 "init",
3567 help="Initialize a new case study directory from a template.",
3568 formatter_class=argparse.RawTextHelpFormatter,
3569 description=(
3570 "Create a study directory from examples/<template_name>.\n\n"
3571 "Examples:\n"
3572 " pic.flow init flat_channel --dest my_case\n"
3573 " pic.flow init bent_channel --dest my_bent_case --copy-binaries"
3574 ),
3575 epilog="Next: run `pic.flow validate --case ... --solver ... --monitor ...` before execution.",
3576 )
3577 p_init.add_argument("template_name", help="Name of the case template directory to copy (e.g., 'flat_channel').")
3578 p_init.add_argument(
3579 "--dest",
3580 dest="dest_name",
3581 help="Optional name for the new directory. Defaults to the template name.\nPath is relative to your current working directory.",
3582 )
3583 p_init.add_argument(
3584 "--copy-binaries",
3585 action="store_true",
3586 help="Copy executables into the case directory instead of symlinking.\nThis creates a fully portable, self-contained case study.",
3587 )
3588 return p_init
3589
3590
3591def _add_build_parser(subparsers):
3592 p_build = subparsers.add_parser(
3593 "build",
3594 help="Build project executables using the Makefile.",
3595 formatter_class=argparse.RawTextHelpFormatter,
3596 description=(
3597 "Calls the project's build.sh script. Any arguments provided after 'build'\n"
3598 "are passed directly to make.\n\n"
3599 "Examples:\n"
3600 " pic.flow build\n"
3601 " pic.flow build clean-project\n"
3602 " pic.flow build SYSTEM=cluster\n"
3603 " pic.flow build postprocessor"
3604 ),
3605 epilog="Next: run `pic.flow --help` or `pic.flow run --help` for execution commands.",
3606 )
3607 p_build.add_argument(
3608 "make_args",
3609 nargs=argparse.REMAINDER,
3610 help="Arguments to pass directly to the make command (e.g., 'clean-project').",
3611 )
3612 return p_build
3613
3614
3615def build_main_parser():
3616 """Build and return the top-level CLI parser."""
3617 parser = argparse.ArgumentParser(
3618 description="pic.flow: A comprehensive conductor for the PIC-Flow CFD simulation platform.",
3619 formatter_class=argparse.RawTextHelpFormatter,
3620 epilog=(
3621 "Examples:\n"
3622 " pic.flow validate --case case.yml --solver solver.yml --monitor monitor.yml --post post.yml\n"
3623 " pic.flow run --solve --post-process --case case.yml --solver solver.yml --monitor monitor.yml --post post.yml --dry-run\n"
3624 " pic.flow run --solve --post-process --case case.yml --solver solver.yml --monitor monitor.yml --post post.yml --cluster cluster.yml --no-submit\n"
3625 " pic.flow sweep --study study.yml --cluster cluster.yml\n\n"
3626 "Next commands:\n"
3627 " - First run: pic.flow init ... -> pic.flow validate ... -> pic.flow run ...\n"
3628 " - Config debugging: pic.flow validate ...\n"
3629 " - Launch planning: pic.flow run ... --dry-run"
3630 ),
3631 )
3632 subparsers = parser.add_subparsers(dest="command", required=True, help="Available commands")
3633 _add_run_parser(subparsers)
3634 _add_sweep_parser(subparsers)
3635 _add_validate_parser(subparsers)
3636 _add_init_parser(subparsers)
3637 _add_build_parser(subparsers)
3638 return parser
3639
3640
3641def dispatch_command(args):
3642 """Validate argument combinations and dispatch to command handlers."""
3643 if args.command == "run":
3644 if not args.solve and not args.post_process:
3645 fail_cli_usage("At least one stage (--solve or --post-process) must be selected.")
3646 if args.solve and (not args.case or not args.solver or not args.monitor):
3647 fail_cli_usage("--solve requires --case, --solver, and --monitor.")
3648 if args.post_process and not args.post:
3649 fail_cli_usage("--post-process requires --post.")
3650 if args.scheduler and not args.cluster:
3651 fail_cli_usage("--scheduler requires --cluster in this version.")
3652 run_workflow(args)
3653 return
3654 if args.command == "sweep":
3655 sweep_workflow(args)
3656 return
3657 if args.command == "validate":
3658 validate_workflow(args)
3659 return
3660 if args.command == "init":
3661 init_case(args)
3662 return
3663 if args.command == "build":
3664 build_project(args)
3665 return
3666 fail_cli_usage(f"Unsupported command '{args.command}'.")
3667
3668
3669if __name__ == "__main__":
3670 main_parser = build_main_parser()
3671 dispatch_command(main_parser.parse_args())