6@brief A comprehensive conductor script for the PIC-Flow simulation platform.
8This script acts as the central user interface for running simulations,
9managing configurations, and orchestrating the entire end-to-end workflow.
10It translates user-friendly YAML files into C-solver compatible control files,
11supports full multi-block configurations, and provides live log streaming.
12It features intelligent, content-based config file discovery and robustly
13manages data I/O paths for the post-processor. It also supports Slurm job
14generation/submission and parameter sweeps via job arrays.
31from datetime import datetime
35 import matplotlib.pyplot as plt
39# --- Global Path Definitions ---
40SCRIPT_PATH = os.path.dirname(os.path.realpath(__file__))
42PROJECT_ROOT = os.path.dirname(BIN_DIR)
44# Standardized error codes used for CLI/validation reporting.
45ERROR_CODE_CLI_USAGE_INVALID = "CLI_USAGE_INVALID"
46ERROR_CODE_CFG_MISSING_SECTION = "CFG_MISSING_SECTION"
47ERROR_CODE_CFG_MISSING_KEY = "CFG_MISSING_KEY"
48ERROR_CODE_CFG_INVALID_TYPE = "CFG_INVALID_TYPE"
49ERROR_CODE_CFG_INVALID_VALUE = "CFG_INVALID_VALUE"
50ERROR_CODE_CFG_FILE_NOT_FOUND = "CFG_FILE_NOT_FOUND"
51ERROR_CODE_CFG_GRID_PARSE = "CFG_GRID_PARSE"
52ERROR_CODE_CFG_INCONSISTENT_COMBO = "CFG_INCONSISTENT_COMBO"
55 ERROR_CODE_CLI_USAGE_INVALID: "Run 'pic.flow <command> --help' to see valid argument combinations.",
56 ERROR_CODE_CFG_MISSING_SECTION: "Add the missing section using examples/master_template/*.yml as reference.",
57 ERROR_CODE_CFG_MISSING_KEY: "Add the missing key in the referenced YAML file.",
58 ERROR_CODE_CFG_INVALID_TYPE: "Fix the value type to match the documented schema in docs/pages/14_Config_Contract.md.",
59 ERROR_CODE_CFG_INVALID_VALUE: "Adjust the value to a supported range/enum from the config reference pages.",
60 ERROR_CODE_CFG_FILE_NOT_FOUND: "Fix the path or create the missing file before running again.",
61 ERROR_CODE_CFG_GRID_PARSE: "Validate grid file format and numeric payload (block count, dims, coordinates).",
62 ERROR_CODE_CFG_INCONSISTENT_COMBO: "Fix conflicting options/keys so the configuration is internally consistent.",
66def _sanitize_error_field(value) -> str:
67 """Normalize error fields into a single-line string."""
70 text = str(value).strip()
73 return " ".join(text.splitlines())
76def emit_structured_error(code: str, key: str = "-", file_path: str = "-",
77 message: str = "", hint: str = None, stream=None):
78 """Emit one standardized error line for tooling and users."""
81 resolved_hint = hint if hint is not None else _ERROR_HINTS.get(code, "-")
83 f"ERROR {_sanitize_error_field(code)} | "
84 f"key={_sanitize_error_field(key)} | "
85 f"file={_sanitize_error_field(file_path)} | "
86 f"message={_sanitize_error_field(message)} | "
87 f"hint={_sanitize_error_field(resolved_hint)}",
92def fail_cli_usage(message: str, hint: str = None):
93 """Emit a structured CLI usage error and exit with code 2."""
94 emit_structured_error(
95 ERROR_CODE_CLI_USAGE_INVALID,
99 hint=hint or _ERROR_HINTS[ERROR_CODE_CLI_USAGE_INVALID],
104def _split_error_file_and_message(raw_error: str):
105 """Split '<file>: <message>' style validation strings when possible."""
106 text = str(raw_error).strip()
107 match = re.match(r"^(?P<file>[^:]+):\s*(?P<msg>.+)$", text)
110 file_candidate = match.group("file").strip()
111 msg = match.group("msg").strip()
112 known_suffixes = (".yml", ".yaml", ".cfg", ".picgrid", ".control", ".run", ".txt")
113 if "/" in file_candidate or file_candidate.endswith(known_suffixes):
114 return file_candidate, msg
118def _extract_key_path(message: str) -> str:
119 """Best-effort key-path extraction from free-form validation messages."""
120 dotted = re.search(r"\b([A-Za-z_][A-Za-z0-9_]*(?:\.[A-Za-z0-9_\[\]-]+)+)\b", message)
122 return dotted.group(1)
124 bracketed = re.search(r"\b([A-Za-z_][A-Za-z0-9_]*\[[^\]]+\](?:\[[^\]]+\])*)\b", message)
126 return bracketed.group(1)
128 quoted = re.findall(r"'([A-Za-z0-9_.\[\]-]+)'", message)
130 if "." in token or "[" in token or token.isidentifier():
135def _classify_error_code(message: str) -> str:
136 """Map existing validation/error messages to the standardized code set."""
137 msg = message.lower()
138 if "missing required section" in msg:
139 return ERROR_CODE_CFG_MISSING_SECTION
140 if "missing required key" in msg or "missing key" in msg:
141 return ERROR_CODE_CFG_MISSING_KEY
142 if "not found" in msg or "does not exist" in msg:
143 return ERROR_CODE_CFG_FILE_NOT_FOUND
144 if "invalid dimensions line" in msg or "invalid coordinate row" in msg or "grid file" in msg:
145 return ERROR_CODE_CFG_GRID_PARSE
147 "must both be periodic" in msg
148 or "inconsistent periodicity" in msg
150 or "requires --" in msg
151 or "must be 1 (auto) or exactly" in msg
153 return ERROR_CODE_CFG_INCONSISTENT_COMBO
155 "must be a mapping" in msg
156 or "must be a list" in msg
157 or "must be a string" in msg
158 or "must be a boolean" in msg
159 or "must be either" in msg
161 return ERROR_CODE_CFG_INVALID_TYPE
162 return ERROR_CODE_CFG_INVALID_VALUE
164# ==============================================================================
166# ==============================================================================
168def read_yaml_file(filepath: str) -> dict:
170 @brief Safely reads a YAML file and returns its content.
171 @param[in] filepath Path to the YAML file.
172 @return A dictionary containing the parsed YAML content.
173 @throws SystemExit if the file is not found or cannot be parsed.
175 if not os.path.exists(filepath):
176 emit_structured_error(
177 ERROR_CODE_CFG_FILE_NOT_FOUND,
180 message="Configuration file not found.",
184 with open(filepath, 'r') as f:
185 return yaml.safe_load(f)
186 except yaml.YAMLError as e:
187 emit_structured_error(
188 ERROR_CODE_CFG_INVALID_VALUE,
191 message=f"YAML parse error: {e}",
192 hint="Fix YAML syntax/indentation and retry validation.",
196def write_yaml_file(filepath: str, data: dict):
197 """Write YAML with stable ordering for generated study artifacts."""
198 os.makedirs(os.path.dirname(filepath), exist_ok=True)
199 with open(filepath, "w") as f:
200 yaml.safe_dump(data, f, sort_keys=False)
202def write_json_file(filepath: str, payload: dict):
203 """Write JSON metadata/manifests with a stable, readable format."""
204 os.makedirs(os.path.dirname(filepath), exist_ok=True)
205 with open(filepath, "w") as f:
206 json.dump(payload, f, indent=2, sort_keys=True)
209def resolve_path(anchor_file: str, candidate: str) -> str:
210 """Resolve a potentially relative path against a source YAML file path."""
211 if candidate is None:
213 if os.path.isabs(candidate):
214 return os.path.abspath(candidate)
215 return os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(anchor_file)), candidate))
217def absolutize_case_external_paths(case_cfg: dict, case_anchor_path: str):
218 """Convert external grid/generator paths in case config to absolute paths."""
219 grid_cfg = case_cfg.get("grid", {})
220 if not isinstance(grid_cfg, dict):
222 mode = grid_cfg.get("mode")
224 source_file = grid_cfg.get("source_file")
225 if isinstance(source_file, str):
226 grid_cfg["source_file"] = resolve_path(case_anchor_path, source_file)
227 elif mode == "grid_gen":
228 gen = grid_cfg.get("generator", {})
229 if isinstance(gen, dict):
230 for key in ("script", "config_file"):
232 if isinstance(val, str):
233 gen[key] = resolve_path(case_anchor_path, val)
235def get_git_commit() -> str:
236 """Best-effort git commit lookup for run/study manifests."""
238 result = subprocess.run(
239 ["git", "rev-parse", "HEAD"],
245 if result.returncode == 0:
246 return result.stdout.strip()
251def is_valid_email(email: str) -> bool:
252 """Lightweight email validation for scheduler notifications."""
253 if not isinstance(email, str):
255 pattern = r"^[^@\s]+@[^@\s]+\.[^@\s]+$"
256 return re.match(pattern, email.strip()) is not None
258def normalize_statistics_task(task_name: str) -> str:
260 @brief Normalizes user-facing statistics task names to C pipeline keywords.
261 @param[in] task_name Task name from YAML.
262 @return Canonical keyword accepted by C statistics pipeline.
263 @throws ValueError if task is unsupported.
265 if task_name is None:
266 raise ValueError("statistics task cannot be None")
267 raw = str(task_name).strip()
268 if raw == "ComputeMSD":
270 normalized = raw.lower().replace("-", "_").replace(" ", "_")
273 "compute_msd": "ComputeMSD",
274 "computemsd": "ComputeMSD",
276 mapped = aliases.get(normalized)
278 raise ValueError(f"Unsupported statistics task '{task_name}'. Currently supported: 'msd'.")
281def _iter_nonempty_noncomment_lines(file_obj):
282 """Yield (lineno, stripped_line) for non-empty, non-comment lines."""
283 for lineno, raw in enumerate(file_obj, start=1):
285 if not line or line.startswith("#"):
289def validate_and_nondimensionalize_picgrid(source_grid: str, dest_grid: str, L_ref: float, expected_nblk: int = None) -> dict:
291 @brief Validates PICGRID payload and writes a non-dimensionalized copy.
292 @details Accepts files with or without leading "PICGRID" token. Output is always
293 written in canonical PICGRID format with header and per-block dims.
294 @param[in] source_grid Input grid file path.
295 @param[in] dest_grid Output grid file path.
296 @param[in] L_ref Reference length for non-dimensionalization.
297 @param[in] expected_nblk Optional expected block count.
298 @return Summary dictionary with nblk, dims, and total_nodes.
299 @throws ValueError on malformed grid.
302 raise ValueError("length_ref must be non-zero when processing grid coordinates.")
303 if not os.path.isfile(source_grid):
304 raise ValueError(f"Grid file not found: {source_grid}")
306 with open(source_grid, "r") as fin:
307 line_iter = _iter_nonempty_noncomment_lines(fin)
309 _, first_token = next(line_iter)
310 except StopIteration:
311 raise ValueError(f"Grid file '{source_grid}' is empty.")
313 if first_token == "PICGRID":
315 _, nblk_line = next(line_iter)
316 except StopIteration:
317 raise ValueError(f"Grid file '{source_grid}' missing block count after PICGRID header.")
319 nblk_line = first_token
322 nblk = int(nblk_line)
324 raise ValueError(f"Invalid block count '{nblk_line}' in grid file '{source_grid}'.")
326 raise ValueError(f"Grid file '{source_grid}' has non-positive block count: {nblk}.")
327 if expected_nblk is not None and nblk != expected_nblk:
329 f"Grid file block count mismatch: case expects {expected_nblk}, grid contains {nblk}."
333 for bi in range(nblk):
335 lineno, dim_line = next(line_iter)
336 except StopIteration:
337 raise ValueError(f"Grid file '{source_grid}' missing dimensions for block {bi}.")
338 parts = dim_line.split()
341 f"Invalid dimensions line at {source_grid}:{lineno}. Expected 3 integers, got: '{dim_line}'."
344 im, jm, km = (int(parts[0]), int(parts[1]), int(parts[2]))
347 f"Invalid dimensions line at {source_grid}:{lineno}. Non-integer values: '{dim_line}'."
349 if im <= 0 or jm <= 0 or km <= 0:
351 f"Invalid block dimensions at {source_grid}:{lineno}: ({im}, {jm}, {km}). Must be > 0."
353 dims.append((im, jm, km))
355 total_nodes_expected = sum(im * jm * km for (im, jm, km) in dims)
356 os.makedirs(os.path.dirname(dest_grid), exist_ok=True)
357 with open(dest_grid, "w") as fout:
358 fout.write("PICGRID\n")
359 fout.write(f"{nblk}\n")
360 for (im, jm, km) in dims:
361 fout.write(f"{im} {jm} {km}\n")
364 for lineno, coord_line in line_iter:
365 parts = coord_line.split()
368 f"Invalid coordinate row at {source_grid}:{lineno}. Expected 3 floats, got: '{coord_line}'."
371 x = float(parts[0]) / L_ref
372 y = float(parts[1]) / L_ref
373 z = float(parts[2]) / L_ref
376 f"Invalid coordinate row at {source_grid}:{lineno}. Non-numeric values: '{coord_line}'."
378 total_nodes_seen += 1
379 if total_nodes_seen > total_nodes_expected:
381 f"Grid file '{source_grid}' has more coordinates ({total_nodes_seen}) than expected ({total_nodes_expected})."
383 fout.write(f"{x:.8e} {y:.8e} {z:.8e}\n")
385 if total_nodes_seen != total_nodes_expected:
387 f"Grid file '{source_grid}' has {total_nodes_seen} coordinates, expected {total_nodes_expected} from header."
390 return {"nblk": nblk, "dims": dims, "total_nodes": total_nodes_expected}
392def run_grid_generator(case_path: str, run_dir: str, grid_cfg: dict) -> str:
394 @brief Runs scripts/grid.gen to produce a PICGRID file for this run.
395 @param[in] case_path Path to case.yml (used for relative path resolution).
396 @param[in] run_dir Run directory path.
397 @param[in] grid_cfg The grid config section from case.yml.
398 @return Absolute path to generated dimensional PICGRID file.
399 @throws ValueError on invalid config or generator failure.
401 generator = grid_cfg.get("generator", {})
402 if not isinstance(generator, dict):
403 raise ValueError("grid.generator must be a mapping when grid.mode is 'grid_gen'.")
405 case_dir = os.path.dirname(os.path.abspath(case_path))
406 gridgen_script = generator.get("script", os.path.join(SCRIPT_PATH, "grid.gen"))
407 if not os.path.isabs(gridgen_script):
408 gridgen_script = os.path.abspath(os.path.join(case_dir, gridgen_script))
409 if not os.path.isfile(gridgen_script):
410 raise ValueError(f"grid.gen script not found: {gridgen_script}")
412 config_file = generator.get("config_file")
414 raise ValueError("grid.generator.config_file is required when grid.mode is 'grid_gen'.")
415 if not os.path.isabs(config_file):
416 config_file = os.path.abspath(os.path.join(case_dir, config_file))
417 if not os.path.isfile(config_file):
418 raise ValueError(f"grid.generator.config_file not found: {config_file}")
420 output_file = generator.get("output_file", os.path.join("config", "grid.generated.picgrid"))
421 if not os.path.isabs(output_file):
422 output_file = os.path.abspath(os.path.join(run_dir, output_file))
423 os.makedirs(os.path.dirname(output_file), exist_ok=True)
425 grid_type = generator.get("grid_type")
426 cli_args = generator.get("cli_args", [])
429 if not isinstance(cli_args, list):
430 raise ValueError("grid.generator.cli_args must be a list of CLI tokens.")
432 cmd = [sys.executable, gridgen_script, "-c", config_file]
434 cmd.append(str(grid_type))
435 cmd.extend([str(token) for token in cli_args])
436 cmd.extend(["--output", output_file])
438 vts_file = generator.get("vts_file")
440 if not os.path.isabs(vts_file):
441 vts_file = os.path.abspath(os.path.join(run_dir, vts_file))
442 os.makedirs(os.path.dirname(vts_file), exist_ok=True)
443 cmd.extend(["--vts", vts_file])
445 stats_file = generator.get("stats_file")
447 if not os.path.isabs(stats_file):
448 stats_file = os.path.abspath(os.path.join(run_dir, stats_file))
449 os.makedirs(os.path.dirname(stats_file), exist_ok=True)
450 cmd.extend(["--stats-file", stats_file])
452 print(f"[INFO] Grid generator command: {' '.join(cmd)}")
453 result = subprocess.run(cmd, cwd=case_dir, text=True, capture_output=True)
454 if result.returncode != 0:
455 stderr = (result.stderr or "").strip()
456 stdout = (result.stdout or "").strip()
457 details = stderr if stderr else stdout
459 f"grid.gen failed with exit code {result.returncode}. Details:\n{details}"
462 print(result.stdout.strip())
464 print(result.stderr.strip())
466 if not os.path.isfile(output_file):
467 raise ValueError(f"grid.gen did not produce expected output file: {output_file}")
482 "symmetry": "SYMMETRY",
485 "periodic": "PERIODIC",
489 # Only handlers that are implemented end-to-end in current C path are allowed.
492 "required_params": set(),
493 "optional_params": set(),
495 "constant_velocity": {
497 "required_params": {"vx", "vy", "vz"},
498 "optional_params": set(),
502 "required_params": set(),
503 "optional_params": set(),
507 "required_params": {"v_max"},
508 "optional_params": set(),
511 "types": {"PERIODIC"},
512 "required_params": set(),
513 "optional_params": set(),
516 "types": {"PERIODIC"},
517 "required_params": {"target_flux"},
518 "optional_params": {"apply_trim"},
522_NUMERIC_BC_PARAMS = {"vx", "vy", "vz", "v_max", "target_flux"}
523_BOOL_BC_PARAMS = {"apply_trim"}
525def _to_float(value, field_name: str) -> float:
526 """Convert a YAML scalar to float with a clear error message."""
529 except (TypeError, ValueError):
530 raise ValueError(f"'{field_name}' must be numeric (got {value!r}).")
532def _to_bool(value, field_name: str) -> bool:
533 """Convert a YAML scalar/string to bool with a clear error message."""
534 if isinstance(value, bool):
536 if isinstance(value, str):
537 raw = value.strip().lower()
538 if raw in {"true", "1", "yes"}:
540 if raw in {"false", "0", "no"}:
542 raise ValueError(f"'{field_name}' must be boolean (got {value!r}).")
544def normalize_boundary_conditions_layout(all_blocks_bcs, num_blocks: int):
546 Normalize boundary_conditions to list-of-lists form and validate block count.
548 if not all_blocks_bcs:
549 raise ValueError("The 'boundary_conditions' section in case.yml is empty.")
551 is_simple_list = isinstance(all_blocks_bcs[0], dict)
552 if num_blocks == 1 and is_simple_list:
553 all_blocks_bcs = [all_blocks_bcs]
554 elif is_simple_list and num_blocks > 1:
556 f"case.yml declares {num_blocks} blocks but boundary_conditions is a single face-list. "
557 "Use a list-of-lists, one inner list per block."
560 if len(all_blocks_bcs) != num_blocks:
562 f"Mismatch: case.yml declares {num_blocks} block(s) but found {len(all_blocks_bcs)} BC definitions."
564 return all_blocks_bcs
566def validate_and_prepare_boundary_conditions(case_cfg: dict):
568 Validate BC entries against currently supported C-side handlers/types and
569 return normalized entries ready for bcs.run generation.
571 num_blocks = int(case_cfg.get('models', {}).get('domain', {}).get('blocks', 1))
572 scales = case_cfg.get('properties', {}).get('scaling', {})
573 L_ref = _to_float(scales.get('length_ref'), "properties.scaling.length_ref")
574 U_ref = _to_float(scales.get('velocity_ref'), "properties.scaling.velocity_ref")
576 raise ValueError("properties.scaling.velocity_ref must be non-zero for non-dimensionalization.")
578 raise ValueError("properties.scaling.length_ref must be non-zero for non-dimensionalization.")
580 all_blocks_bcs = normalize_boundary_conditions_layout(case_cfg.get('boundary_conditions', []), num_blocks)
583 expected_faces = {"-Xi", "+Xi", "-Eta", "+Eta", "-Zeta", "+Zeta"}
584 axis_pairs = [("-Xi", "+Xi"), ("-Eta", "+Eta"), ("-Zeta", "+Zeta")]
586 for bi, block_bcs in enumerate(all_blocks_bcs):
587 if not isinstance(block_bcs, list):
588 raise ValueError(f"boundary_conditions[{bi}] must be a list of face configs.")
593 for idx, bc in enumerate(block_bcs):
594 if not isinstance(bc, dict):
595 raise ValueError(f"boundary_conditions[{bi}][{idx}] must be a mapping.")
597 for req in ("face", "type", "handler"):
599 raise ValueError(f"boundary_conditions[{bi}][{idx}] missing required key '{req}'.")
601 face_raw = str(bc["face"]).strip()
602 face_key = face_raw.lower()
603 face = BC_FACE_MAP.get(face_key)
606 f"Unsupported BC face '{face_raw}' at boundary_conditions[{bi}][{idx}]. "
607 f"Supported: {sorted(expected_faces)}."
609 if face in seen_faces:
610 raise ValueError(f"Duplicate face '{face}' in boundary_conditions[{bi}] (entries {seen_faces[face]} and {idx}).")
611 seen_faces[face] = idx
613 bc_type_raw = str(bc["type"]).strip()
614 bc_type = BC_TYPE_MAP.get(bc_type_raw.lower())
617 f"Unsupported BC type '{bc_type_raw}' for face {face} in block {bi}. "
618 f"Supported: {sorted(set(BC_TYPE_MAP.values()))}."
621 handler = str(bc["handler"]).strip().lower()
622 handler_spec = BC_HANDLER_SPECS.get(handler)
623 if handler_spec is None:
625 f"Unsupported BC handler '{bc['handler']}' for face {face} in block {bi}. "
626 f"Supported now: {sorted(BC_HANDLER_SPECS.keys())}."
628 if bc_type not in handler_spec["types"]:
630 f"Invalid BC combination on block {bi}, face {face}: type '{bc_type}' cannot use handler '{handler}'."
633 params = bc.get("params", {})
636 if not isinstance(params, dict):
637 raise ValueError(f"'params' for block {bi}, face {face} must be a mapping.")
639 # Reject legacy structured keys explicitly.
640 if "vector" in params or "velocity" in params:
642 f"Legacy params key ('vector'/'velocity') found on block {bi}, face {face}. "
643 "Use scalar keys 'vx', 'vy', 'vz'."
646 required = handler_spec["required_params"]
647 optional = handler_spec["optional_params"]
648 allowed = required | optional
650 missing = sorted(required - set(params.keys()))
653 f"Missing required params for handler '{handler}' on block {bi}, face {face}: {missing}."
655 unknown = sorted(set(params.keys()) - allowed)
658 f"Unknown params for handler '{handler}' on block {bi}, face {face}: {unknown}. "
659 f"Allowed: {sorted(allowed)}."
662 converted_params = {}
663 for key, value in params.items():
664 if key in _NUMERIC_BC_PARAMS:
665 numeric = _to_float(value, f"boundary_conditions[{bi}][{idx}].params.{key}")
666 if key in {"vx", "vy", "vz", "v_max"}:
667 converted_params[key] = numeric / U_ref
668 elif key == "target_flux":
669 converted_params[key] = numeric / (U_ref * (L_ref ** 2))
670 elif key in _BOOL_BC_PARAMS:
671 converted_params[key] = _to_bool(value, f"boundary_conditions[{bi}][{idx}].params.{key}")
673 # Defensive fallback; should not happen due unknown-key gate above.
674 converted_params[key] = value
676 prepared_block.append({
680 "params": converted_params,
683 missing_faces = sorted(expected_faces - set(seen_faces.keys()))
686 f"boundary_conditions[{bi}] is incomplete. Missing faces: {missing_faces}. "
687 "Provide all six faces explicitly."
690 # Pairwise periodic consistency checks.
691 face_map = {entry["face"]: entry for entry in prepared_block}
692 for neg_face, pos_face in axis_pairs:
693 neg = face_map[neg_face]
694 pos = face_map[pos_face]
695 neg_periodic = (neg["type"] == "PERIODIC")
696 pos_periodic = (pos["type"] == "PERIODIC")
697 if neg_periodic != pos_periodic:
699 f"Inconsistent periodicity in block {bi}: {neg_face} and {pos_face} must both be PERIODIC or neither."
702 driven_handlers = {"constant_flux"}
703 if (neg["handler"] in driven_handlers) or (pos["handler"] in driven_handlers):
704 if neg["handler"] != pos["handler"]:
706 f"In block {bi}, driven periodic handlers on {neg_face}/{pos_face} must match exactly."
708 if not (neg_periodic and pos_periodic):
710 f"In block {bi}, driven periodic handler '{neg['handler']}' requires PERIODIC type on both faces."
713 prepared_blocks.append(prepared_block)
715 return prepared_blocks
717def validate_solver_configs(case_cfg: dict, solver_cfg: dict, monitor_cfg: dict,
718 case_path: str, solver_path: str, monitor_path: str):
720 @brief Validates all solver input configs before any work is done.
721 @details Checks for required sections, required keys, and physical sanity.
722 Exits with a clear error message on the first problem found.
723 @param[in] case_cfg Parsed case YAML dictionary.
724 @param[in] solver_cfg Parsed solver YAML dictionary.
725 @param[in] monitor_cfg Parsed monitor YAML dictionary.
726 @param[in] case_path Path to case file (for error messages).
727 @param[in] solver_path Path to solver file (for error messages).
728 @param[in] monitor_path Path to monitor file (for error messages).
729 @throws SystemExit on validation failure.
733 # --- case.yml: required top-level sections ---
734 required_case_sections = ['properties', 'run_control', 'grid', 'models', 'boundary_conditions']
735 for section in required_case_sections:
736 if section not in case_cfg:
737 errors.append(f" {case_path}: missing required section '{section}'.")
740 _print_validation_errors(errors)
742 # --- case.yml: properties sub-keys ---
743 props = case_cfg.get('properties', {})
744 for group, keys in [('scaling', ['length_ref', 'velocity_ref']),
745 ('fluid', ['density', 'viscosity']),
746 ('initial_conditions', ['u_physical', 'v_physical', 'w_physical'])]:
747 sub = props.get(group, {})
749 errors.append(f" {case_path}: missing 'properties.{group}' section.")
753 errors.append(f" {case_path}: missing key 'properties.{group}.{k}'.")
754 if group == 'initial_conditions' and 'mode' in sub:
756 normalize_field_init_mode(sub.get('mode'))
757 except ValueError as e:
758 errors.append(f" {case_path}: {e}")
760 # --- case.yml: run_control sub-keys ---
761 rc = case_cfg.get('run_control', {})
762 for k in ['start_step', 'total_steps', 'dt_physical']:
764 errors.append(f" {case_path}: missing key 'run_control.{k}'.")
766 # --- Physical sanity checks ---
768 density = float(props.get('fluid', {}).get('density', 0))
769 viscosity = float(props.get('fluid', {}).get('viscosity', 0))
770 dt = float(rc.get('dt_physical', 0))
772 errors.append(f" {case_path}: 'properties.fluid.density' must be positive (got {density}).")
774 errors.append(f" {case_path}: 'properties.fluid.viscosity' must be non-negative (got {viscosity}).")
776 errors.append(f" {case_path}: 'run_control.dt_physical' must be positive (got {dt}).")
777 except (TypeError, ValueError):
778 pass # Will be caught later during processing
780 # --- case.yml: grid mode ---
781 grid_cfg = case_cfg.get('grid', {})
782 grid_mode = grid_cfg.get('mode')
783 valid_grid_modes = ['file', 'programmatic_c', 'grid_gen']
784 if grid_mode not in valid_grid_modes:
785 errors.append(f" {case_path}: 'grid.mode' must be one of {valid_grid_modes} (got '{grid_mode}').")
786 elif grid_mode == 'file':
787 source_file = grid_cfg.get('source_file')
789 errors.append(f" {case_path}: 'grid.source_file' is required when grid.mode is 'file'.")
791 source_abs = source_file if os.path.isabs(source_file) else os.path.abspath(os.path.join(os.path.dirname(case_path), source_file))
792 if not os.path.isfile(source_abs):
793 errors.append(f" {case_path}: grid.source_file does not exist: {source_abs}")
794 elif grid_mode == 'programmatic_c':
795 grid_settings = grid_cfg.get('programmatic_settings')
796 if not grid_settings:
797 errors.append(f" {case_path}: 'grid.programmatic_settings' is required when grid.mode is 'programmatic_c'.")
798 elif not isinstance(grid_settings, dict):
799 errors.append(f" {case_path}: 'grid.programmatic_settings' must be a mapping.")
801 for p_key in ('da_processors_x', 'da_processors_y', 'da_processors_z'):
802 p_val = grid_settings.get(p_key)
803 if isinstance(p_val, (list, tuple)):
805 f" {case_path}: grid.programmatic_settings.{p_key} must be a scalar integer. "
806 "Per-block MPI decomposition is not implemented on the C side; DMDA layout is global."
808 elif p_val is not None and (not isinstance(p_val, int) or p_val <= 0):
810 f" {case_path}: grid.programmatic_settings.{p_key} must be a positive integer when provided (got {p_val})."
812 elif grid_mode == 'grid_gen':
813 gen_cfg = grid_cfg.get('generator')
814 if not isinstance(gen_cfg, dict):
815 errors.append(f" {case_path}: 'grid.generator' must be a mapping when grid.mode is 'grid_gen'.")
817 config_file = gen_cfg.get('config_file')
819 errors.append(f" {case_path}: 'grid.generator.config_file' is required for grid.mode='grid_gen'.")
821 config_abs = config_file if os.path.isabs(config_file) else os.path.abspath(os.path.join(os.path.dirname(case_path), config_file))
822 if not os.path.isfile(config_abs):
823 errors.append(f" {case_path}: grid.generator.config_file does not exist: {config_abs}")
825 grid_type = gen_cfg.get('grid_type')
826 if grid_type is not None and str(grid_type) not in {'cpipe', 'pipe', 'warp'}:
827 errors.append(f" {case_path}: grid.generator.grid_type must be one of ['cpipe','pipe','warp'] (got '{grid_type}').")
829 cli_args = gen_cfg.get('cli_args', [])
830 if cli_args is not None and not isinstance(cli_args, list):
831 errors.append(f" {case_path}: grid.generator.cli_args must be a list of CLI tokens.")
833 # --- case.yml: boundary_conditions strict validation ---
835 validate_and_prepare_boundary_conditions(case_cfg)
836 except ValueError as e:
837 errors.append(f" {case_path}: {e}")
839 # --- case.yml: particle initialization validation ---
840 particles_cfg = case_cfg.get('models', {}).get('physics', {}).get('particles', {})
841 if particles_cfg and not isinstance(particles_cfg, dict):
842 errors.append(f" {case_path}: 'models.physics.particles' must be a mapping.")
843 elif isinstance(particles_cfg, dict):
844 init_mode_raw = particles_cfg.get('init_mode', 'Surface')
846 pinit_code = normalize_particle_init_mode(init_mode_raw)
847 except ValueError as e:
848 errors.append(f" {case_path}: {e}")
851 restart_mode = particles_cfg.get('restart_mode')
852 if restart_mode is not None and str(restart_mode).lower() not in {"init", "load"}:
854 f" {case_path}: models.physics.particles.restart_mode must be 'init' or 'load' (got '{restart_mode}')."
858 point_cfg = particles_cfg.get('point_source', {})
859 if not isinstance(point_cfg, dict):
860 errors.append(f" {case_path}: models.physics.particles.point_source must be a mapping when init_mode is PointSource.")
862 for coord in ('x', 'y', 'z'):
863 if coord not in point_cfg:
865 f" {case_path}: models.physics.particles.point_source.{coord} is required when init_mode is PointSource."
868 # --- solver.yml: basic structure ---
869 if not isinstance(solver_cfg, dict) or not solver_cfg:
870 errors.append(f" {solver_path}: solver config is empty or not a valid YAML mapping.")
872 strategy_cfg = solver_cfg.get('strategy', {})
873 if not isinstance(strategy_cfg, dict):
874 errors.append(f" {solver_path}: 'strategy' must be a mapping.")
875 elif 'implicit' in strategy_cfg:
877 f" {solver_path}: legacy key 'strategy.implicit' is not supported. "
878 "Use 'strategy.momentum_solver' with named solver values."
880 if isinstance(strategy_cfg, dict) and 'momentum_solver' in strategy_cfg:
882 normalize_momentum_solver_type(strategy_cfg['momentum_solver'])
883 except ValueError as e:
884 errors.append(f" {solver_path}: {e}")
886 op_mode_cfg = solver_cfg.get('operation_mode', {})
887 if op_mode_cfg is not None and not isinstance(op_mode_cfg, dict):
888 errors.append(f" {solver_path}: 'operation_mode' must be a mapping when provided.")
889 elif isinstance(op_mode_cfg, dict):
890 analytical_type = op_mode_cfg.get('analytical_type')
891 if analytical_type is not None and not isinstance(analytical_type, str):
892 errors.append(f" {solver_path}: 'operation_mode.analytical_type' must be a string when provided.")
894 ms_cfg = solver_cfg.get('momentum_solver', {})
895 if ms_cfg is not None and not isinstance(ms_cfg, dict):
896 errors.append(f" {solver_path}: 'momentum_solver' must be a mapping when provided.")
897 elif isinstance(ms_cfg, dict):
899 'max_pseudo_steps', 'absolute_tol', 'relative_tol', 'step_tol',
900 'pseudo_cfl', 'rk4_residual_noise_allowance_factor'
902 present_legacy = sorted(legacy_flat_keys.intersection(ms_cfg.keys()))
905 f" {solver_path}: legacy flat keys in 'momentum_solver' are not supported: {present_legacy}. "
906 "Use solver-specific sub-blocks (e.g., momentum_solver.dual_time_picard_rk4)."
911 normalize_momentum_solver_type(ms_cfg['type'])
912 except ValueError as e:
913 errors.append(f" {solver_path}: {e}")
915 allowed_ms_keys = {'type', 'dual_time_picard_rk4'}
916 unknown_ms_keys = sorted(set(ms_cfg.keys()) - allowed_ms_keys)
919 f" {solver_path}: unsupported momentum_solver blocks/keys: {unknown_ms_keys}. "
920 "Currently supported: 'dual_time_picard_rk4' (plus optional 'type')."
923 selected_solver = None
924 if isinstance(strategy_cfg, dict) and 'momentum_solver' in strategy_cfg:
926 selected_solver = normalize_momentum_solver_type(strategy_cfg['momentum_solver'])
929 if selected_solver is None and 'type' in ms_cfg:
931 selected_solver = normalize_momentum_solver_type(ms_cfg['type'])
934 if selected_solver is None:
935 selected_solver = "DUALTIME_PICARD_RK4"
937 if selected_solver != "DUALTIME_PICARD_RK4" and 'dual_time_picard_rk4' in ms_cfg:
939 f" {solver_path}: momentum_solver.dual_time_picard_rk4 is set but selected solver is "
940 f"{selected_solver}. Solver-specific blocks must match the selected solver."
943 dt_picard_cfg = ms_cfg.get('dual_time_picard_rk4')
944 if dt_picard_cfg is not None:
945 if not isinstance(dt_picard_cfg, dict):
946 errors.append(f" {solver_path}: momentum_solver.dual_time_picard_rk4 must be a mapping.")
949 'max_pseudo_steps', 'absolute_tol', 'relative_tol', 'step_tol',
950 'pseudo_cfl', 'rk4_residual_noise_allowance_factor'
952 unknown_dt_keys = sorted(set(dt_picard_cfg.keys()) - allowed_dt_keys)
955 f" {solver_path}: unsupported keys in momentum_solver.dual_time_picard_rk4: {unknown_dt_keys}."
957 if 'pseudo_cfl' in dt_picard_cfg:
958 pcfl_cfg = dt_picard_cfg['pseudo_cfl']
959 if not isinstance(pcfl_cfg, dict):
960 errors.append(f" {solver_path}: momentum_solver.dual_time_picard_rk4.pseudo_cfl must be a mapping.")
962 allowed_pcfl_keys = {'initial', 'minimum', 'maximum', 'growth_factor', 'reduction_factor'}
963 unknown_pcfl_keys = sorted(set(pcfl_cfg.keys()) - allowed_pcfl_keys)
964 if unknown_pcfl_keys:
966 f" {solver_path}: unsupported keys in momentum_solver.dual_time_picard_rk4.pseudo_cfl: {unknown_pcfl_keys}."
969 # --- monitor.yml: basic structure ---
970 if not isinstance(monitor_cfg, dict) or not monitor_cfg:
971 errors.append(f" {monitor_path}: monitor config is empty or not a valid YAML mapping.")
973 io_cfg = monitor_cfg.get('io', {})
974 freq = io_cfg.get('data_output_frequency')
975 if freq is not None and (not isinstance(freq, int) or freq <= 0):
976 errors.append(f" {monitor_path}: 'io.data_output_frequency' must be a positive integer (got {freq}).")
977 solver_mon_cfg = monitor_cfg.get('solver_monitoring')
978 if solver_mon_cfg is not None and not isinstance(solver_mon_cfg, dict):
979 errors.append(f" {monitor_path}: 'solver_monitoring' must be a mapping of <flag>: <value>.")
982 _print_validation_errors(errors)
985def validate_post_config(post_cfg: dict, post_path: str):
987 @brief Validates the post-processing config before running the post-processor.
988 @param[in] post_cfg Parsed post-processing YAML dictionary.
989 @param[in] post_path Path to post file (for error messages).
990 @throws SystemExit on validation failure.
994 if not isinstance(post_cfg, dict) or not post_cfg:
995 errors.append(f" {post_path}: post-processing config is empty or not a valid YAML mapping.")
996 _print_validation_errors(errors)
998 # --- run_control ---
999 if 'run_control' not in post_cfg:
1000 errors.append(f" {post_path}: missing required section 'run_control'.")
1002 # --- io section ---
1003 io_cfg = post_cfg.get('io', {})
1005 errors.append(f" {post_path}: missing required section 'io'.")
1007 for k in ['output_directory', 'output_filename_prefix']:
1009 errors.append(f" {post_path}: missing required key 'io.{k}'.")
1010 input_extensions = io_cfg.get('input_extensions')
1011 if input_extensions is not None:
1012 if not isinstance(input_extensions, dict):
1013 errors.append(f" {post_path}: 'io.input_extensions' must be a mapping when provided.")
1015 for ext_key in ('eulerian', 'particle'):
1016 ext_val = input_extensions.get(ext_key)
1017 if ext_val is not None and not isinstance(ext_val, str):
1018 errors.append(f" {post_path}: 'io.input_extensions.{ext_key}' must be a string extension.")
1020 averaged_fields = io_cfg.get('eulerian_fields_averaged')
1021 if averaged_fields is not None and not isinstance(averaged_fields, list):
1022 errors.append(f" {post_path}: 'io.eulerian_fields_averaged' must be a list when provided.")
1024 # --- Check eulerian_pipeline entries have 'task' key ---
1025 for i, entry in enumerate(post_cfg.get('eulerian_pipeline', [])):
1026 if not isinstance(entry, dict) or 'task' not in entry:
1027 errors.append(f" {post_path}: 'eulerian_pipeline[{i}]' is missing the 'task' key. "
1028 "Check YAML indentation (each entry needs '- task: ...' with proper spacing).")
1030 # --- Check statistics pipeline entries ---
1031 stats_cfg = post_cfg.get('statistics_pipeline')
1033 if stats_cfg is not None:
1034 if isinstance(stats_cfg, list):
1035 stats_entries = stats_cfg
1036 elif isinstance(stats_cfg, dict):
1037 stats_entries = stats_cfg.get('tasks', [])
1038 if not isinstance(stats_entries, list):
1039 errors.append(f" {post_path}: 'statistics_pipeline.tasks' must be a list.")
1040 stats_output_prefix = stats_cfg.get('output_prefix')
1041 if stats_output_prefix is not None and not isinstance(stats_output_prefix, str):
1042 errors.append(f" {post_path}: 'statistics_pipeline.output_prefix' must be a string.")
1045 f" {post_path}: 'statistics_pipeline' must be either a list of tasks or a mapping with a 'tasks' list."
1047 for i, entry in enumerate(stats_entries):
1048 if isinstance(entry, str):
1050 elif isinstance(entry, dict) and 'task' in entry:
1051 task_name = entry.get('task')
1054 f" {post_path}: statistics task entry {i} must be either a string or a mapping with key 'task'."
1058 normalize_statistics_task(task_name)
1059 except ValueError as e:
1060 errors.append(f" {post_path}: {e}")
1063 _print_validation_errors(errors)
1065def validate_cluster_config(cluster_cfg: dict, cluster_path: str):
1066 """Validate Slurm scheduler configuration from cluster.yml."""
1068 if not isinstance(cluster_cfg, dict) or not cluster_cfg:
1069 errors.append(f" {cluster_path}: cluster config is empty or not a valid YAML mapping.")
1070 _print_validation_errors(errors)
1072 scheduler = cluster_cfg.get("scheduler", {})
1073 if not isinstance(scheduler, dict):
1074 errors.append(f" {cluster_path}: 'scheduler' must be a mapping.")
1076 scheduler_type = scheduler.get("type", "slurm")
1077 if str(scheduler_type).lower() != "slurm":
1078 errors.append(f" {cluster_path}: scheduler.type must be 'slurm' in v1 (got '{scheduler_type}').")
1080 resources = cluster_cfg.get("resources", {})
1081 if not isinstance(resources, dict):
1082 errors.append(f" {cluster_path}: 'resources' must be a mapping.")
1084 for req in ("account", "nodes", "ntasks_per_node", "mem", "time"):
1085 if req not in resources:
1086 errors.append(f" {cluster_path}: missing required key 'resources.{req}'.")
1087 for int_key in ("nodes", "ntasks_per_node"):
1088 if int_key in resources:
1089 val = resources.get(int_key)
1090 if not isinstance(val, int) or val <= 0:
1091 errors.append(f" {cluster_path}: resources.{int_key} must be a positive integer (got {val}).")
1092 for str_key in ("account", "mem", "time", "partition"):
1093 if str_key in resources and resources.get(str_key) is not None:
1094 if not isinstance(resources.get(str_key), str):
1095 errors.append(f" {cluster_path}: resources.{str_key} must be a string when provided.")
1097 notifications = cluster_cfg.get("notifications", {})
1098 if notifications is not None and not isinstance(notifications, dict):
1099 errors.append(f" {cluster_path}: 'notifications' must be a mapping when provided.")
1100 elif isinstance(notifications, dict):
1101 mail_user = notifications.get("mail_user")
1102 if mail_user is not None and not is_valid_email(mail_user):
1103 errors.append(f" {cluster_path}: notifications.mail_user is not a valid email '{mail_user}'.")
1104 mail_type = notifications.get("mail_type")
1105 if mail_type is not None and not isinstance(mail_type, str):
1106 errors.append(f" {cluster_path}: notifications.mail_type must be a string when provided.")
1108 execution = cluster_cfg.get("execution", {})
1109 if execution is not None and not isinstance(execution, dict):
1110 errors.append(f" {cluster_path}: 'execution' must be a mapping when provided.")
1111 elif isinstance(execution, dict):
1112 module_setup = execution.get("module_setup", [])
1113 if module_setup is not None and not isinstance(module_setup, list):
1114 errors.append(f" {cluster_path}: execution.module_setup must be a list of shell lines.")
1115 elif isinstance(module_setup, list):
1116 for i, line in enumerate(module_setup):
1117 if not isinstance(line, str):
1118 errors.append(f" {cluster_path}: execution.module_setup[{i}] must be a string.")
1120 launcher = execution.get("launcher", "srun")
1121 if launcher is not None and not isinstance(launcher, str):
1122 errors.append(f" {cluster_path}: execution.launcher must be a string when provided.")
1123 launcher_args = execution.get("launcher_args", [])
1124 if launcher_args is not None and not isinstance(launcher_args, list):
1125 errors.append(f" {cluster_path}: execution.launcher_args must be a list of CLI tokens.")
1126 elif isinstance(launcher_args, list):
1127 for i, token in enumerate(launcher_args):
1128 if not isinstance(token, (str, int, float)):
1129 errors.append(f" {cluster_path}: execution.launcher_args[{i}] must be a scalar CLI token.")
1131 extra_sbatch = execution.get("extra_sbatch")
1132 if extra_sbatch is not None and not isinstance(extra_sbatch, (dict, list)):
1133 errors.append(f" {cluster_path}: execution.extra_sbatch must be a mapping or list when provided.")
1136 _print_validation_errors(errors)
1138def validate_study_config(study_cfg: dict, study_path: str):
1139 """Validate sweep/study specification from study.yml."""
1141 if not isinstance(study_cfg, dict) or not study_cfg:
1142 errors.append(f" {study_path}: study config is empty or not a valid YAML mapping.")
1143 _print_validation_errors(errors)
1145 base_cfgs = study_cfg.get("base_configs")
1146 if not isinstance(base_cfgs, dict):
1147 errors.append(f" {study_path}: missing required mapping 'base_configs'.")
1149 for req in ("case", "solver", "monitor", "post"):
1150 path_val = base_cfgs.get(req)
1151 if not path_val or not isinstance(path_val, str):
1152 errors.append(f" {study_path}: base_configs.{req} must be a path string.")
1154 resolved = resolve_path(study_path, path_val)
1155 if not os.path.isfile(resolved):
1156 errors.append(f" {study_path}: base_configs.{req} does not exist: {resolved}")
1158 study_type = study_cfg.get("study_type")
1159 allowed_types = {"grid_independence", "timestep_independence", "sensitivity"}
1160 if study_type not in allowed_types:
1162 f" {study_path}: study_type must be one of {sorted(allowed_types)} (got '{study_type}')."
1165 parameters = study_cfg.get("parameters")
1166 if not isinstance(parameters, dict) or not parameters:
1167 errors.append(f" {study_path}: 'parameters' must be a non-empty mapping of key->list.")
1169 allowed_roots = {"case", "solver", "monitor", "post"}
1170 for key, values in parameters.items():
1171 if not isinstance(key, str) or "." not in key:
1173 f" {study_path}: parameter key '{key}' must use '<target>.<yaml.path>' format."
1176 root = key.split(".", 1)[0]
1177 if root not in allowed_roots:
1179 f" {study_path}: parameter key '{key}' must start with one of {sorted(allowed_roots)}."
1181 if not isinstance(values, list) or len(values) == 0:
1182 errors.append(f" {study_path}: parameters.{key} must be a non-empty list.")
1184 metrics = study_cfg.get("metrics", [])
1185 if metrics is not None and not isinstance(metrics, list):
1186 errors.append(f" {study_path}: 'metrics' must be a list when provided.")
1187 elif isinstance(metrics, list):
1188 for i, metric in enumerate(metrics):
1189 if isinstance(metric, str):
1191 if not isinstance(metric, dict):
1193 f" {study_path}: metrics[{i}] must be a string or mapping."
1196 if "name" not in metric:
1197 errors.append(f" {study_path}: metrics[{i}] missing required key 'name'.")
1198 if "source" not in metric:
1199 errors.append(f" {study_path}: metrics[{i}] missing required key 'source'.")
1201 plotting = study_cfg.get("plotting", {})
1202 if plotting is not None and not isinstance(plotting, dict):
1203 errors.append(f" {study_path}: 'plotting' must be a mapping when provided.")
1204 elif isinstance(plotting, dict):
1205 enabled = plotting.get("enabled")
1206 if enabled is not None and not isinstance(enabled, bool):
1207 errors.append(f" {study_path}: plotting.enabled must be boolean when provided.")
1208 output_format = plotting.get("output_format")
1209 if output_format is not None and output_format not in {"png", "pdf", "svg"}:
1210 errors.append(f" {study_path}: plotting.output_format must be one of ['png','pdf','svg'].")
1212 execution = study_cfg.get("execution", {})
1213 if execution is not None and not isinstance(execution, dict):
1214 errors.append(f" {study_path}: 'execution' must be a mapping when provided.")
1215 elif isinstance(execution, dict):
1216 max_conc = execution.get("max_concurrent_array_tasks")
1217 if max_conc is not None and (not isinstance(max_conc, int) or max_conc <= 0):
1219 f" {study_path}: execution.max_concurrent_array_tasks must be a positive integer when provided."
1223 _print_validation_errors(errors)
1225def _deep_set(container: dict, dotted_path: str, value):
1226 """Set nested dictionary value, creating intermediate maps when needed."""
1227 keys = dotted_path.split(".")
1229 for key in keys[:-1]:
1230 if key not in current or not isinstance(current[key], dict):
1232 current = current[key]
1233 current[keys[-1]] = value
1235def expand_parameter_matrix(parameters: dict) -> list:
1236 """Expand study parameter lists into cartesian-product combinations."""
1237 param_keys = list(parameters.keys())
1238 all_values = [parameters[k] for k in param_keys]
1240 for combo in itertools.product(*all_values):
1241 combos.append(dict(zip(param_keys, combo)))
1244def get_cluster_total_tasks(cluster_cfg: dict) -> int:
1245 resources = cluster_cfg.get("resources", {})
1246 return int(resources.get("nodes", 1)) * int(resources.get("ntasks_per_node", 1))
1248def normalize_extension(ext: str) -> str:
1251 return str(ext).strip().lstrip(".")
1253def render_slurm_script(
1260 stderr_path: str = None,
1261 env_vars: dict = None,
1262 array_spec: str = None
1264 """Render a Slurm batch script for a single command."""
1265 resources = cluster_cfg.get("resources", {})
1266 notifications = cluster_cfg.get("notifications", {}) or {}
1267 execution = cluster_cfg.get("execution", {}) or {}
1268 extra_sbatch = execution.get("extra_sbatch")
1269 module_setup = execution.get("module_setup", []) or []
1271 if stderr_path is None:
1272 stderr_path = stdout_path.replace(".out", ".err")
1276 f"#SBATCH --job-name={job_name}",
1277 f"#SBATCH --nodes={resources['nodes']}",
1278 f"#SBATCH --ntasks-per-node={resources['ntasks_per_node']}",
1279 f"#SBATCH --mem={resources['mem']}",
1280 f"#SBATCH --time={resources['time']}",
1281 f"#SBATCH --output={stdout_path}",
1282 f"#SBATCH --error={stderr_path}",
1283 f"#SBATCH --account={resources['account']}",
1285 partition = resources.get("partition")
1287 lines.append(f"#SBATCH --partition={partition}")
1289 lines.append(f"#SBATCH --array={array_spec}")
1290 mail_user = notifications.get("mail_user")
1291 mail_type = notifications.get("mail_type")
1293 lines.append(f"#SBATCH --mail-user={mail_user}")
1295 lines.append(f"#SBATCH --mail-type={mail_type}")
1297 if isinstance(extra_sbatch, dict):
1298 for key, value in extra_sbatch.items():
1300 if not flag.startswith("--"):
1302 if isinstance(value, bool):
1304 lines.append(f"#SBATCH {flag}")
1305 elif value is not None:
1306 lines.append(f"#SBATCH {flag}={value}")
1307 elif isinstance(extra_sbatch, list):
1308 for token in extra_sbatch:
1309 lines.append(f"#SBATCH {token}")
1314 "set -euo pipefail",
1316 f"cd {shlex.quote(workdir)}",
1317 'echo "[$(date)] Starting job ${SLURM_JOB_NAME} (${SLURM_JOB_ID})"',
1318 'echo "[$(date)] Working directory: $PWD"',
1322 for setup_line in module_setup:
1323 lines.append(str(setup_line))
1326 for key, value in env_vars.items():
1327 lines.append(f"export {key}={shlex.quote(str(value))}")
1329 cmd = " ".join(shlex.quote(str(tok)) for tok in command)
1331 lines.append('echo "[$(date)] Job completed."')
1333 os.makedirs(os.path.dirname(script_path), exist_ok=True)
1334 with open(script_path, "w") as f:
1335 f.write("\n".join(lines) + "\n")
1336 os.chmod(script_path, 0o755)
1338def build_cluster_launch_command(cluster_cfg: dict, executable: str, executable_args: list) -> list:
1339 """Build scheduler launcher command (srun/mpirun/custom) from cluster config."""
1340 execution = cluster_cfg.get("execution", {}) or {}
1341 launcher = execution.get("launcher", "srun")
1342 launcher_args = [str(x) for x in execution.get("launcher_args", [])]
1343 ntasks = get_cluster_total_tasks(cluster_cfg)
1345 if launcher and launcher.lower() == "srun":
1346 has_n = any(token in {"-n", "--ntasks"} for token in launcher_args)
1347 cmd = ["srun"] + launcher_args
1349 cmd += ["-n", str(ntasks)]
1350 return cmd + [executable] + executable_args
1352 if launcher and launcher.lower() == "mpirun":
1353 has_np = any(token in {"-np", "-n"} for token in launcher_args)
1354 cmd = ["mpirun"] + launcher_args
1356 cmd += ["-np", str(ntasks)]
1357 return cmd + [executable] + executable_args
1359 # Custom launcher or no launcher.
1362 cmd.append(str(launcher))
1363 cmd += launcher_args
1364 cmd += [executable] + executable_args
1367def parse_slurm_job_id(sbatch_output: str) -> str:
1368 """Extract numeric job id from standard sbatch output."""
1369 match = re.search(r"Submitted batch job\s+(\d+)", sbatch_output or "")
1370 return match.group(1) if match else None
1372def submit_sbatch(script_path: str, dependency: str = None) -> dict:
1373 """Submit sbatch script and return submission metadata."""
1376 cmd.append(f"--dependency=afterok:{dependency}")
1377 cmd.append(script_path)
1378 result = subprocess.run(cmd, text=True, capture_output=True, check=False)
1381 "returncode": result.returncode,
1382 "stdout": (result.stdout or "").strip(),
1383 "stderr": (result.stderr or "").strip(),
1384 "script": script_path,
1386 if result.returncode != 0:
1387 print(f"[FATAL] sbatch submission failed for {script_path}\n{metadata['stderr']}", file=sys.stderr)
1388 sys.exit(result.returncode)
1389 metadata["job_id"] = parse_slurm_job_id(metadata["stdout"])
1390 if not metadata["job_id"]:
1392 f"[FATAL] Could not parse Slurm job id from sbatch output: {metadata['stdout']}",
1399def _print_validation_errors(errors: list):
1401 @brief Prints validation errors and exits.
1402 @param[in] errors List of error message strings.
1404 print(f"\n[FATAL] Configuration validation failed with {len(errors)} issue(s):", file=sys.stderr)
1405 for raw_error in errors:
1406 file_path, message = _split_error_file_and_message(raw_error)
1407 key_path = _extract_key_path(message)
1408 code = _classify_error_code(message)
1409 emit_structured_error(code, key=key_path, file_path=file_path, message=message)
1411 "\nHint: See examples/master_template/ for valid config structure and "
1412 "docs/pages/14_Config_Contract.md for key-level contract details.",
1418def generate_header(run_id: str, source_files: dict) -> str:
1420 @brief Creates a standard header block for all generated files.
1421 @param[in] run_id The unique identifier for the current simulation run.
1422 @param[in] source_files A dictionary of source profile files used.
1423 @return A formatted string containing the header.
1426 "# ==============================================================================",
1427 "# AUTO-GENERATED CONFIGURATION FILE",
1428 "# ------------------------------------------------------------------------------",
1429 f"# Run ID: {run_id}",
1430 f"# Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
1432 "# Source Configuration:"
1434 for name, path in source_files.items():
1435 header_parts.append(f"# - {name:<12}: {os.path.basename(path)}")
1436 header_parts.extend([
1438 "# DO NOT EDIT THIS FILE MANUALLY. IT IS A MACHINE-READABLE ARTIFACT.",
1439 "# ==============================================================================\n"
1441 return "\n".join(header_parts)
1443def generate_simple_list_file(run_dir: str, run_id: str, cfg: dict, section: str, key: str, filename: str, header_sources: dict) -> str:
1445 @brief Generic function to create a file containing a simple list of strings.
1446 @param[in] run_dir The path to the main run directory.
1447 @param[in] run_id The unique identifier for the run.
1448 @param[in] cfg The dictionary containing the configuration data.
1449 @param[in] section The top-level key in the cfg dictionary.
1450 @param[in] key The second-level key whose value is the list of strings.
1451 @param[in] filename The name of the file to generate (e.g., 'whitelist.run').
1452 @param[in] header_sources A dictionary of source files for the header.
1453 @return The absolute path to the generated file.
1455 print(f"[INFO] Generating {filename}...")
1456 config_dir = os.path.join(run_dir, "config")
1457 file_path = os.path.join(config_dir, filename)
1459 lines = [generate_header(run_id, header_sources)]
1460 items = cfg.get(section, {}).get(key, [])
1463 with open(file_path, "w") as f: f.write("\n".join(lines))
1464 print(f"[SUCCESS] Generated {filename}: {os.path.relpath(file_path)}")
1465 return os.path.abspath(file_path)
1467def generate_multi_block_bcs(run_dir: str, run_id: str, case_cfg: dict, source_files: dict) -> list:
1469 @brief Parses multi-block BCs from YAML, generates a .run file for each block,
1470 and returns a list of their absolute paths.
1471 @details Handles both simple list format (for single-block cases) and a
1472 list-of-lists (for multi-block cases) for boundary conditions.
1473 @param[in] run_dir The path to the main run directory.
1474 @param[in] run_id The unique identifier for the run.
1475 @param[in] case_cfg The parsed case.yml configuration dictionary.
1476 @param[in] source_files A dictionary of source files for the header.
1477 @return A list of absolute paths to the generated BC files.
1478 @throws ValueError if the number of BC definitions does not match the number of blocks.
1480 print("[INFO] Generating boundary condition files...")
1481 config_dir = os.path.join(run_dir, "config")
1482 num_blocks = int(case_cfg.get('models', {}).get('domain', {}).get('blocks', 1))
1483 prepared_blocks = validate_and_prepare_boundary_conditions(case_cfg)
1485 generated_files = []
1486 for i, block_bcs_list in enumerate(prepared_blocks):
1487 file_name = "bcs.run" if num_blocks == 1 else f"bcs_block{i}.run"
1488 bcs_file_path = os.path.join(config_dir, file_name)
1489 bcs_lines = [generate_header(run_id, source_files)]
1491 for bc in block_bcs_list:
1492 face, bc_type, handler = bc['face'], bc['type'], bc['handler']
1494 if bc.get('params'):
1496 for k, v in bc['params'].items():
1497 if isinstance(v, bool):
1498 value_str = "true" if v else "false"
1501 parts.append(f"{k}={value_str}")
1502 params_str = " ".join(parts)
1503 bcs_lines.append(f"{face:<20s} {bc_type:<12s} {handler:<20s} {params_str}")
1505 with open(bcs_file_path, "w") as f: f.write("\n".join(bcs_lines))
1507 print(f"[SUCCESS] Generated BCs for Block {i}: {os.path.relpath(bcs_file_path)}")
1508 generated_files.append(os.path.abspath(bcs_file_path))
1510 return generated_files
1512def format_flag_value(value):
1514 @brief Converts Python types to C-style command-line flag values.
1515 @param[in] value The Python object to convert (bool, list, or other).
1516 @return A string representation suitable for a C command-line parser.
1518 if isinstance(value, bool):
1519 return "1" if value else "0"
1520 if isinstance(value, list):
1521 return ",".join(map(str, value))
1524def normalize_momentum_solver_type(value: str) -> str:
1526 @brief Normalizes user-facing momentum solver names to C-enum CLI values.
1527 @param[in] value Human-readable or enum-like momentum solver string.
1528 @return Canonical value accepted by -mom_solver_type.
1529 @throws ValueError if the input cannot be mapped.
1532 raise ValueError("momentum solver type cannot be None")
1534 raw = str(value).strip()
1537 "DUALTIME_PICARD_RK4",
1538 "DUALTIME_NK_ARNOLDI",
1539 "DUALTIME_NK_ANALYTICAL_JACOBIAN"
1544 normalized = raw.lower().replace("-", " ").replace("_", " ")
1545 normalized = " ".join(normalized.split())
1547 "explicit rk4": "EXPLICIT_RK",
1548 "explicit rk": "EXPLICIT_RK",
1549 "explicit runge kutta 4": "EXPLICIT_RK",
1550 "explicit runge kutta": "EXPLICIT_RK",
1551 "dual time picard rk4": "DUALTIME_PICARD_RK4",
1552 "dualtime picard rk4": "DUALTIME_PICARD_RK4",
1553 "dual time picard": "DUALTIME_PICARD_RK4",
1554 "dualtime picard": "DUALTIME_PICARD_RK4",
1555 "dual time nk arnoldi": "DUALTIME_NK_ARNOLDI",
1556 "dualtime nk arnoldi": "DUALTIME_NK_ARNOLDI",
1557 "dual time nk analytical jacobian": "DUALTIME_NK_ANALYTICAL_JACOBIAN",
1558 "dualtime nk analytical jacobian": "DUALTIME_NK_ANALYTICAL_JACOBIAN"
1560 mapped = aliases.get(normalized)
1563 f"Unknown momentum solver '{value}'. Use one of: "
1564 "'Explicit RK4', 'Dual Time Picard RK4', 'Dual Time NK Arnoldi', "
1565 "'Dual Time NK Analytical Jacobian' (or canonical enum values)."
1569def normalize_field_init_mode(value: str) -> int:
1571 @brief Normalizes user-facing field init mode names to C enum/int codes (-finit).
1572 @param[in] value Human-readable or enum-like field initialization mode.
1573 @return Canonical integer code accepted by -finit.
1574 @throws ValueError if the input cannot be mapped.
1577 raise ValueError("field initialization mode cannot be None")
1579 raw = str(value).strip()
1580 direct = {"0": 0, "1": 1, "2": 2}
1584 normalized = raw.lower().replace("-", " ").replace("_", " ")
1585 normalized = " ".join(normalized.split())
1591 mapped = aliases.get(normalized)
1594 f"Unknown initial_conditions mode '{value}'. Use one of: 'Zero', 'Constant', 'Poiseuille' (or 0/1/2)."
1598def normalize_particle_init_mode(value: str) -> int:
1600 @brief Normalizes particle init mode names to C enum/int codes (-pinit).
1601 @param[in] value Human-readable or enum-like particle initialization mode.
1602 @return Canonical integer code accepted by -pinit.
1603 @throws ValueError if the input cannot be mapped.
1606 raise ValueError("particle init mode cannot be None")
1608 raw = str(value).strip()
1609 direct = {"0": 0, "1": 1, "2": 2, "3": 3}
1613 normalized = raw.lower().replace("-", " ").replace("_", " ")
1614 normalized = " ".join(normalized.split())
1617 "surface random": 0,
1624 "surface at edges": 3,
1626 mapped = aliases.get(normalized)
1629 f"Unknown particle init_mode '{value}'. Use one of: "
1630 "'Surface', 'Volume', 'PointSource', 'SurfaceEdges' (or 0/1/2/3)."
1634def append_passthrough_flags(control_lines: list, options: dict):
1636 @brief Appends raw CLI flags to the control list from a {flag: value} dict.
1637 @details Boolean `true` is emitted as a switch with no value. Boolean `false`
1638 is skipped. All other values are emitted as "<flag> <value>".
1639 @param[out] control_lines The destination list of control-file lines.
1640 @param[in] options Mapping of raw CLI flags to values.
1644 for flag, value in options.items():
1645 if isinstance(value, bool):
1647 control_lines.append(str(flag))
1649 control_lines.append(f"{flag} {format_flag_value(value)}")
1651def parse_and_add_model_flags(case_cfg: dict, control_lines: list):
1653 @brief Parses the 'models' section of case.yml and adds corresponding C-solver flags.
1654 @param[in] case_cfg The parsed case.yml configuration dictionary.
1655 @param[out] control_lines A list of strings to which C-flags will be appended.
1657 models = case_cfg.get('models', {})
1659 'domain': {'blocks': '-nblk', 'i_periodic': '-i_periodic', 'j_periodic': '-j_periodic', 'k_periodic': '-k_periodic'},
1660 'physics.fsi': {'immersed': '-imm', 'moving_fsi': '-fsi'},
1661 'physics.particles': {'count': '-numParticles'},
1662 'physics.turbulence': {'les': '-les', 'rans': '-rans', 'wall_function': '-wallfunction'},
1663 'statistics': {'time_averaging': '-averaging'}
1665 for section_path, flags in FLAG_MAP.items():
1666 current_level = models
1668 for key in section_path.split('.'): current_level = current_level[key]
1669 for yaml_key, flag in flags.items():
1670 if yaml_key in current_level:
1671 control_lines.append(f"{flag} {format_flag_value(current_level[yaml_key])}")
1672 except KeyError: continue
1674 if models.get('physics', {}).get('dimensionality') == '2D':
1675 control_lines.append("-TwoD 1")
1677 particles_cfg = models.get('physics', {}).get('particles', {})
1678 p_init_mode_str = particles_cfg.get('init_mode', 'Surface')
1679 pinit_code = normalize_particle_init_mode(p_init_mode_str)
1680 control_lines.append(f"-pinit {pinit_code}")
1681 print(f" - Particle Initialization Mode: {p_init_mode_str} (Code: {pinit_code})")
1684 point_cfg = particles_cfg.get('point_source', {})
1685 if not isinstance(point_cfg, dict):
1686 raise ValueError("models.physics.particles.point_source must be a mapping when init_mode is PointSource.")
1688 psrc_x = float(point_cfg['x'])
1689 psrc_y = float(point_cfg['y'])
1690 psrc_z = float(point_cfg['z'])
1691 except (KeyError, TypeError, ValueError):
1692 raise ValueError("PointSource init_mode requires numeric point_source.{x,y,z} values.")
1693 control_lines.append(f"-psrc_x {psrc_x}")
1694 control_lines.append(f"-psrc_y {psrc_y}")
1695 control_lines.append(f"-psrc_z {psrc_z}")
1696 print(f" - Particle Point Source: ({psrc_x}, {psrc_y}, {psrc_z})")
1698 p_restart_mode = particles_cfg.get('restart_mode')
1700 p_restart_mode_normalized = str(p_restart_mode).lower()
1701 if p_restart_mode_normalized not in {"init", "load"}:
1702 raise ValueError(f"Unknown particle restart_mode '{p_restart_mode}'. Options are 'init' or 'load'.")
1703 control_lines.append(f"-particle_restart_mode \"{p_restart_mode}\"")
1705def parse_solver_config(solver_cfg: dict) -> dict:
1707 @brief Parses the structured solver.yml into a flat dictionary of {flag: value}.
1708 @param[in] solver_cfg The parsed solver.yml configuration dictionary.
1709 @return A dictionary where keys are C-solver flags and values are the corresponding settings.
1712 if 'operation_mode' in solver_cfg and isinstance(solver_cfg['operation_mode'], dict):
1713 op_mode = solver_cfg['operation_mode']
1714 if 'eulerian_field_source' in op_mode:
1715 flags['-euler_field_source'] = f"\"{op_mode.get('eulerian_field_source', 'solve')}\""
1716 if 'analytical_type' in op_mode and op_mode.get('analytical_type') is not None:
1717 flags['-analytical_type'] = f"\"{op_mode.get('analytical_type')}\""
1719 selected_solver = None
1720 if 'strategy' in solver_cfg:
1721 s = solver_cfg['strategy']
1722 if 'central_diff' in s:
1723 flags['-central'] = format_flag_value(s['central_diff'])
1724 # Preferred selector.
1725 if 'momentum_solver' in s:
1726 selected_solver = normalize_momentum_solver_type(s['momentum_solver'])
1727 elif 'implicit' in s:
1728 raise ValueError("Legacy key 'strategy.implicit' is not supported. Use 'strategy.momentum_solver'.")
1730 ms = solver_cfg.get('momentum_solver', {})
1731 if isinstance(ms, dict) and 'type' in ms and selected_solver is None:
1732 selected_solver = normalize_momentum_solver_type(ms['type'])
1734 if selected_solver is None:
1735 selected_solver = "DUALTIME_PICARD_RK4"
1736 flags['-mom_solver_type'] = f"\"{selected_solver}\""
1738 if 'tolerances' in solver_cfg:
1739 t = solver_cfg['tolerances']
1741 'max_iterations': '-mom_max_pseudo_steps',
1742 'absolute_tol': '-mom_atol',
1743 'relative_tol': '-mom_rtol',
1744 'step_tol': '-imp_stol'
1746 for key, flag in tol_map.items():
1748 flags[flag] = t[key]
1750 def _append_dualtime_options(cfg: dict):
1751 if 'max_pseudo_steps' in cfg:
1752 flags['-mom_max_pseudo_steps'] = cfg['max_pseudo_steps']
1753 if 'absolute_tol' in cfg:
1754 flags['-mom_atol'] = cfg['absolute_tol']
1755 if 'relative_tol' in cfg:
1756 flags['-mom_rtol'] = cfg['relative_tol']
1757 if 'step_tol' in cfg:
1758 flags['-imp_stol'] = cfg['step_tol']
1759 if 'pseudo_cfl' in cfg:
1760 pcfl = cfg['pseudo_cfl']
1761 if 'initial' in pcfl:
1762 flags['-pseudo_cfl'] = pcfl['initial']
1763 if 'minimum' in pcfl:
1764 flags['-min_pseudo_cfl'] = pcfl['minimum']
1765 if 'maximum' in pcfl:
1766 flags['-max_pseudo_cfl'] = pcfl['maximum']
1767 if 'growth_factor' in pcfl:
1768 flags['-pseudo_cfl_growth_factor'] = pcfl['growth_factor']
1769 if 'reduction_factor' in pcfl:
1770 flags['-pseudo_cfl_reduction_factor'] = pcfl['reduction_factor']
1771 if 'rk4_residual_noise_allowance_factor' in cfg:
1772 flags['-mom_dt_rk4_residual_norm_noise_allowance_factor'] = cfg['rk4_residual_noise_allowance_factor']
1774 if isinstance(ms, dict):
1775 allowed_ms_keys = {'type', 'dual_time_picard_rk4'}
1776 unknown_ms_keys = sorted(set(ms.keys()) - allowed_ms_keys)
1779 f"Unsupported momentum_solver keys/blocks: {unknown_ms_keys}. "
1780 "Currently supported block: 'dual_time_picard_rk4'."
1783 dt_picard_cfg = ms.get('dual_time_picard_rk4')
1784 if dt_picard_cfg is not None:
1785 if selected_solver != "DUALTIME_PICARD_RK4":
1787 f"momentum_solver.dual_time_picard_rk4 is set but selected solver is {selected_solver}."
1789 if not isinstance(dt_picard_cfg, dict):
1790 raise ValueError("momentum_solver.dual_time_picard_rk4 must be a mapping.")
1791 _append_dualtime_options(dt_picard_cfg)
1792 if 'pressure_solver' in solver_cfg:
1793 ps = solver_cfg['pressure_solver']
1794 if 'tolerance' in ps: flags['-poisson_tol'] = ps['tolerance']
1795 if 'multigrid' in ps:
1796 mg = ps['multigrid']
1797 mg_map = {'levels': '-mg_level', 'pre_sweeps': '-mg_pre_it', 'post_sweeps': '-mg_post_it'}
1798 for key, flag in mg_map.items():
1799 if key in mg: flags[flag] = mg[key]
1800 if 'semi_coarsening' in mg:
1801 sc = mg['semi_coarsening']
1802 if 'i' in sc: flags['-mg_i_semi'] = format_flag_value(sc['i'])
1803 if 'j' in sc: flags['-mg_j_semi'] = format_flag_value(sc['j'])
1804 if 'k' in sc: flags['-mg_k_semi'] = format_flag_value(sc['k'])
1805 if 'level_solvers' in mg:
1806 for level_name, settings in mg['level_solvers'].items():
1807 level_num = level_name.split('_')[-1]
1808 for key, value in settings.items():
1809 flags[f"-ps_mg_levels_{level_num}_{key}"] = format_flag_value(value)
1810 if 'petsc_passthrough_options' in solver_cfg:
1811 passthrough = solver_cfg['petsc_passthrough_options']
1813 for key, value in passthrough.items():
1814 flags[key] = format_flag_value(value)
1817def generate_solver_control_file(run_dir, run_id, configs, num_procs, monitor_files):
1819 @brief Generates the main .control file for the C-solver.
1820 @details Orchestrates the conversion of all YAML configurations (case, solver, monitor)
1821 into a single, machine-readable file of command-line flags.
1822 @param[in] run_dir The path to the main run directory.
1823 @param[in] run_id The unique identifier for the run.
1824 @param[in] configs A dictionary containing the parsed YAML data.
1825 @param[in] num_procs The number of MPI processes for the run.
1826 @param[in] monitor_files A dictionary containing paths to generated monitor files.
1827 @return The absolute path to the generated solver control file.
1829 print("[INFO] Generating master solver control file...")
1830 case_cfg, solver_cfg, monitor_cfg = configs['case'], configs['solver'], configs['monitor']
1831 source_files = {'Case': configs['case_path'], 'Solver': configs['solver_path'], 'Monitor': configs['monitor_path']}
1835 props, run_ctrl = case_cfg['properties'], case_cfg['run_control']
1836 scales, fluid, ic = props['scaling'], props['fluid'], props['initial_conditions']
1837 L_ref, U_ref, rho, mu = float(scales['length_ref']), float(scales['velocity_ref']), float(fluid['density']), float(fluid['viscosity'])
1838 reynolds = (rho * U_ref * L_ref) / mu if mu != 0 else float('inf')
1839 dt_phys = float(run_ctrl['dt_physical'])
1840 T_ref = L_ref / U_ref if U_ref != 0 else float('inf')
1841 dt_nondim = dt_phys / T_ref if T_ref != float('inf') else 0.0
1842 u, v, w = float(ic['u_physical']), float(ic['v_physical']), float(ic['w_physical'])
1843 finit_mode_str = ic.get('mode', 'Constant')
1844 finit_code = normalize_field_init_mode(finit_mode_str)
1845 print(f" - Reynolds Number (Re) = {reynolds:.4f}")
1846 print(f" - Non-Dimensional dt* = {dt_nondim:.6f}")
1847 print(f" - Field Initialization Mode: {finit_mode_str} (Code: {finit_code})")
1848 control_lines.extend([
1849 f"-start_step {run_ctrl['start_step']}", f"-totalsteps {run_ctrl['total_steps']}",
1850 f"-ren {reynolds}", f"-dt {dt_nondim}", f"-finit {finit_code}",
1851 f"-ucont_x {u/U_ref if U_ref!=0 else 0}", f"-ucont_y {v/U_ref if U_ref!=0 else 0}", f"-ucont_z {w/U_ref if U_ref!=0 else 0}",
1852 f"-scaling_L_ref {L_ref}", f"-scaling_U_ref {U_ref}", f"-scaling_rho_ref {rho}"
1854 except (KeyError, TypeError, ZeroDivisionError, ValueError) as e:
1855 print(f"[FATAL] Error processing case.yml: {e}", file=sys.stderr)
1859 bcs_files = generate_multi_block_bcs(run_dir, run_id, case_cfg, source_files)
1860 except ValueError as e:
1861 print(f"[FATAL] Invalid boundary_conditions in case.yml: {e}", file=sys.stderr)
1863 control_lines.append(f"-bcs_files \"{','.join(bcs_files)}\"")
1865 # --- CORRECTED: Add paths for whitelist and profile files ---
1866 control_lines.append(f"-whitelist_config_file {monitor_files['whitelist']}")
1867 control_lines.append(f"-profile_config_file {monitor_files['profile']}")
1869 grid_cfg = case_cfg.get('grid', {})
1870 grid_mode = grid_cfg.get('mode')
1871 expected_nblk = int(case_cfg.get('models', {}).get('domain', {}).get('blocks', 1))
1873 if grid_mode == 'file':
1874 print("[INFO] Grid Mode: Using external file...")
1875 case_file_dir = os.path.dirname(configs['case_path'])
1876 source_grid = grid_cfg['source_file']
1877 if not os.path.isabs(source_grid):
1878 source_grid = os.path.abspath(os.path.join(case_file_dir, source_grid))
1879 nondim_grid_path = os.path.join(run_dir, "config", "grid.run")
1881 summary = validate_and_nondimensionalize_picgrid(
1882 source_grid, nondim_grid_path, L_ref, expected_nblk=expected_nblk
1885 f"[SUCCESS] Validated and non-dimensionalized grid: {os.path.relpath(nondim_grid_path)} "
1886 f"(nblk={summary['nblk']}, total_nodes={summary['total_nodes']})"
1888 control_lines.append(f"-grid_file {nondim_grid_path}")
1889 except Exception as e:
1890 print(f"[FATAL] Failed to process grid file '{source_grid}': {e}", file=sys.stderr)
1892 elif grid_mode == 'grid_gen':
1893 print("[INFO] Grid Mode: Generating external grid via grid.gen...")
1894 nondim_grid_path = os.path.join(run_dir, "config", "grid.run")
1896 generated_grid = run_grid_generator(configs['case_path'], run_dir, grid_cfg)
1897 summary = validate_and_nondimensionalize_picgrid(
1898 generated_grid, nondim_grid_path, L_ref, expected_nblk=expected_nblk
1901 f"[SUCCESS] grid.gen output validated and non-dimensionalized: {os.path.relpath(nondim_grid_path)} "
1902 f"(nblk={summary['nblk']}, total_nodes={summary['total_nodes']})"
1904 control_lines.append(f"-grid_file {nondim_grid_path}")
1905 except Exception as e:
1906 print(f"[FATAL] Grid generation failed: {e}", file=sys.stderr)
1908 elif grid_mode == 'programmatic_c':
1909 print("[INFO] Grid Mode: Programmatic C...")
1910 grid_settings = dict(grid_cfg.get('programmatic_settings', {}))
1911 control_lines.append("-grid")
1912 px, py, pz = grid_settings.get('da_processors_x'), grid_settings.get('da_processors_y'), grid_settings.get('da_processors_z')
1913 if any(isinstance(p, (list, tuple)) for p in [px, py, pz] if p is not None):
1915 "da_processors_x/y/z must be scalar integers. "
1916 "Per-block MPI decomposition is not implemented on the C side; DMDA layout is global."
1918 if num_procs > 1 and all(p is not None for p in [px, py, pz]):
1919 if not all(isinstance(p, int) and p > 0 for p in [px, py, pz]):
1920 raise ValueError("da_processors_x/y/z must be positive integers when provided.")
1921 total_layout = px * py * pz
1922 if total_layout != num_procs:
1923 raise ValueError(f"Processor layout mismatch: product ({total_layout}) != processes ({num_procs}).")
1924 print(f"[INFO] Applying user-defined processor layout for {num_procs} processes.")
1926 if num_procs == 1: print("[INFO] Serial run, ignoring da_processors layout.")
1927 else: print("[INFO] Letting PETSc automatically determine processor layout.")
1928 for p_key in ['da_processors_x', 'da_processors_y', 'da_processors_z']:
1929 grid_settings.pop(p_key, None)
1930 for key, value in grid_settings.items(): control_lines.append(f"-{key} {format_flag_value(value)}")
1932 raise ValueError(f"Unknown or missing grid mode '{grid_mode}' in case.yml.")
1934 parse_and_add_model_flags(case_cfg, control_lines)
1936 if 'solver_parameters' in case_cfg:
1937 params = case_cfg['solver_parameters']
1939 for key, value in params.items():
1940 control_lines.append(f"{key} {format_flag_value(value)}")
1943 solver_flags = parse_solver_config(solver_cfg)
1944 except ValueError as e:
1945 print(f"[FATAL] Invalid solver.yml settings: {e}", file=sys.stderr)
1947 for flag, value in solver_flags.items(): control_lines.append(f"{flag} {value}")
1949 append_passthrough_flags(control_lines, monitor_cfg.get('solver_monitoring', {}))
1951 io_cfg = monitor_cfg.get('io', {})
1952 if 'data_output_frequency' in io_cfg: control_lines.append(f"-tio {io_cfg['data_output_frequency']}")
1953 if 'particle_log_interval' in io_cfg: control_lines.append(f"-logfreq {io_cfg['particle_log_interval']}")
1954 if 'directories' in io_cfg:
1955 dirs = io_cfg['directories']
1956 if 'output' in dirs: control_lines.append(f"-output_dir {dirs['output']}")
1957 if 'restart' in dirs: control_lines.append(f"-restart_dir {dirs['restart']}")
1958 if 'log' in dirs: control_lines.append(f"-log_dir {dirs['log']}")
1959 if 'eulerian_subdir' in dirs: control_lines.append(f"-euler_subdir {dirs['eulerian_subdir']}")
1960 if 'particle_subdir' in dirs: control_lines.append(f"-particle_subdir {dirs['particle_subdir']}")
1962 final_content = generate_header(run_id, source_files) + "\n".join(control_lines)
1963 control_file_path = os.path.join(run_dir, "config", f"{run_id}.control")
1964 with open(control_file_path, "w") as f: f.write(final_content)
1965 print(f"[SUCCESS] Generated solver control file: {os.path.relpath(control_file_path)}")
1966 return os.path.abspath(control_file_path)
1968def generate_post_recipe_file(run_dir: str, run_id: str, post_cfg: dict, source_files: dict) -> str:
1970 @brief Generates a key=value config file (post.run) for the C post-processor.
1971 @details Translates the structured post-processing YAML into the specific flat
1972 key-value format required by the C executable, including complex,
1973 semicolon-separated pipeline strings.
1974 @param[in] run_dir The path to the main run directory.
1975 @param[in] run_id The unique identifier for the run.
1976 @param[in] post_cfg The parsed post-profile YAML configuration dictionary.
1977 @param[in] source_files A dictionary of source files for the header.
1978 @return The absolute path to the generated post.run recipe file.
1980 print("[INFO] Generating post-processor recipe file (post.run)...")
1981 config_dir = os.path.join(run_dir, "config")
1982 post_recipe_path = os.path.join(config_dir, "post.run")
1984 lines = [generate_header(run_id, source_files)]
1988 # --- 1. Process Run Control ---
1989 # Accept snake_case names (preferred) with camelCase fallback for backwards compatibility.
1990 rc = post_cfg.get('run_control', {})
1991 c_config['startTime'] = rc.get('start_step', rc.get('startTime', 0))
1992 c_config['endTime'] = rc.get('end_step', rc.get('endTime', 10))
1993 c_config['timeStep'] = rc.get('step_interval', rc.get('timeStep', 1))
1995 # --- 2. Process Global Operations ---
1996 eulerian_pipeline_parts = []
1997 if post_cfg.get('global_operations', {}).get('dimensionalize', False):
1998 eulerian_pipeline_parts.append('DimensionalizeAllLoadedFields')
2000 # --- 3. Build Eulerian Pipeline String ---
2001 for task in post_cfg.get('eulerian_pipeline', []):
2002 task_name = task.get('task')
2003 if task_name == 'q_criterion':
2004 eulerian_pipeline_parts.append('ComputeQCriterion')
2005 elif task_name == 'normalize_field':
2006 field = task.get('field', 'P')
2007 eulerian_pipeline_parts.append(f'NormalizeRelativeField:{field}')
2008 ref_point = task.get('reference_point', [1, 1, 1])
2009 c_config['reference_ip'] = ref_point[0]
2010 c_config['reference_jp'] = ref_point[1]
2011 c_config['reference_kp'] = ref_point[2]
2012 elif task_name == 'nodal_average':
2013 in_field = task.get('input_field')
2014 out_field = task.get('output_field')
2015 if in_field and out_field:
2016 eulerian_pipeline_parts.append(f'CellToNodeAverage:{in_field}>{out_field}')
2018 if eulerian_pipeline_parts:
2019 c_config['process_pipeline'] = ";".join(eulerian_pipeline_parts)
2021 # --- 4. Build Lagrangian Pipeline String ---
2022 lagrangian_pipeline_parts = []
2023 for task in post_cfg.get('lagrangian_pipeline', []):
2024 task_name = task.get('task')
2025 if task_name == 'specific_ke':
2026 in_field = task.get('input_field')
2027 out_field = task.get('output_field')
2028 if in_field and out_field:
2029 lagrangian_pipeline_parts.append(f'ComputeSpecificKE:{in_field}>{out_field}')
2031 if lagrangian_pipeline_parts:
2032 c_config['particle_pipeline'] = ";".join(lagrangian_pipeline_parts)
2034 # --- 4B. Build Statistics Pipeline String ---
2035 statistics_pipeline_parts = []
2036 statistics_output_prefix = None
2037 stats_cfg = post_cfg.get('statistics_pipeline')
2039 if isinstance(stats_cfg, list):
2040 stats_entries = stats_cfg
2041 elif isinstance(stats_cfg, dict):
2042 stats_entries = stats_cfg.get('tasks', [])
2043 statistics_output_prefix = stats_cfg.get('output_prefix')
2045 for entry in stats_entries:
2046 if isinstance(entry, str):
2048 elif isinstance(entry, dict):
2049 task_name = entry.get('task')
2053 statistics_pipeline_parts.append(normalize_statistics_task(task_name))
2055 # validation should catch this earlier; keep generation tolerant.
2058 if statistics_pipeline_parts:
2059 c_config['statistics_pipeline'] = ";".join(statistics_pipeline_parts)
2060 if statistics_output_prefix is None:
2061 statistics_output_prefix = post_cfg.get('statistics_output_prefix')
2062 if statistics_output_prefix:
2063 c_config['statistics_output_prefix'] = statistics_output_prefix
2065 # --- 5. Process I/O ---
2066 io = post_cfg.get('io', {})
2067 c_config['output_prefix'] = io.get('output_directory','viz')+'/'+io.get('output_filename_prefix', 'Field')
2068 c_config['particle_output_prefix'] = io.get('output_directory','viz')+'/'+io.get('particle_filename_prefix', 'Particle')
2069 c_config['output_particles'] = io.get('output_particles', False)
2070 c_config['particle_output_freq'] = io.get('particle_subsampling_frequency', 1)
2071 c_config['output_fields_instantaneous'] = ",".join(io.get('eulerian_fields', []))
2072 c_config['output_fields_averaged'] = ",".join(io.get('eulerian_fields_averaged', []))
2073 c_config['particle_fields_instantaneous'] = ",".join(io.get('particle_fields', []))
2074 input_extensions = io.get('input_extensions', {})
2075 if isinstance(input_extensions, dict):
2076 e_ext = input_extensions.get('eulerian')
2077 p_ext = input_extensions.get('particle')
2079 c_config['eulerianExt'] = str(e_ext).strip().lstrip('.')
2081 c_config['particleExt'] = str(p_ext).strip().lstrip('.')
2083 # --- 6. Add Source Directory ---
2084 if 'source_data' in post_cfg and 'directory' in post_cfg['source_data']:
2085 c_config['source_directory'] = post_cfg['source_data']['directory']
2087 # --- 7. Write the final file ---
2088 for key, value in c_config.items():
2089 if value is not None and str(value) != "":
2090 lines.append(f"{key} = {value}")
2092 with open(post_recipe_path, "w") as f: f.write("\n".join(lines))
2093 print(f"[SUCCESS] Generated post-processor recipe: {os.path.relpath(post_recipe_path)}")
2094 return os.path.abspath(post_recipe_path)
2096def execute_command(command: list, run_dir: str, log_filename: str, monitor_cfg: dict = None):
2098 @brief Executes a command, streaming its output to the console and a log file.
2100 @param[in] monitor_cfg Optional. If provided, used to set LOG_LEVEL in a custom environment.
2101 If None, the process inherits the parent's environment directly.
2103 # Create the log directory if it doesn't exist.
2104 log_dir = os.path.join(run_dir, "logs")
2105 os.makedirs(log_dir, exist_ok=True)
2107 log_path = os.path.join(log_dir, log_filename)
2108 print(f"[INFO] Launching Command...\n > {' '.join(command)}")
2109 print(f" Log file: {os.path.relpath(log_path)}")
2112 # --- Environment Handling ---
2114 "stdout": subprocess.PIPE, "stderr": subprocess.STDOUT,
2115 "cwd": run_dir, "bufsize": 1, "universal_newlines": True,
2116 "encoding": 'utf-8', "errors": 'replace'
2120 print("[INFO] Creating custom environment to set LOG_LEVEL.")
2121 run_env = os.environ.copy()
2122 verbosity = monitor_cfg.get('logging', {}).get('verbosity', 'INFO').upper()
2123 run_env['LOG_LEVEL'] = verbosity
2124 print(f"[INFO] Setting LOG_LEVEL={verbosity} for C executable.")
2125 popen_kwargs['env'] = run_env
2127 print("[INFO] Using inherited environment for process.")
2131 # Pass the constructed keyword arguments dictionary to Popen
2132 process = subprocess.Popen(command, **popen_kwargs)
2134 with open(log_path, "w") as log_file:
2135 for line in process.stdout:
2136 sys.stdout.write(line)
2137 log_file.write(line)
2139 return_code = process.returncode
2141 if return_code == 0:
2142 print(f"[SUCCESS] Execution finished successfully.")
2144 print(f"[FATAL] Execution failed with exit code {return_code}. Check log: {os.path.relpath(log_path)}", file=sys.stderr)
2145 sys.exit(return_code)
2146 except FileNotFoundError:
2147 print(f"[FATAL] Command not found or is not executable: '{command[0]}'", file=sys.stderr)
2148 print(" Please check that the path is correct and the file has execute permissions.", file=sys.stderr)
2150 except Exception as e:
2151 print(f"[FATAL] An unexpected error occurred during execution: {e}", file=sys.stderr)
2154def auto_identify_run_inputs(config_dir: str):
2155 """Auto-detect case.yml, monitor.yml, and *.control in a run config directory."""
2156 all_yml_files = glob.glob(os.path.join(config_dir, "*.yml"))
2157 case_path, monitor_path = None, None
2158 for f_path in all_yml_files:
2160 content = read_yaml_file(f_path)
2161 if not isinstance(content, dict):
2163 if 'models' in content and 'boundary_conditions' in content:
2165 elif 'io' in content and 'logging' in content:
2166 monitor_path = f_path
2167 except Exception as e:
2168 print(f"[WARNING] Could not parse or inspect '{f_path}': {e}", file=sys.stderr)
2170 solver_control_path = glob.glob(os.path.join(config_dir, "*.control"))[0]
2172 solver_control_path = None
2173 return case_path, monitor_path, solver_control_path
2175def resolve_post_source_directory(run_dir: str, monitor_cfg: dict, post_cfg: dict, strict: bool = True) -> str:
2176 """Resolve post source directory token and optionally enforce existence."""
2177 solver_output_dir_rel = monitor_cfg.get('io', {}).get('directories', {}).get('output', 'results')
2178 solver_output_dir_abs = os.path.join(run_dir, solver_output_dir_rel)
2179 source_dir_template = post_cfg.get('source_data', {}).get('directory', '<solver_output_dir>')
2180 if source_dir_template == '<solver_output_dir>':
2181 resolved_source_dir = solver_output_dir_abs
2182 print(f"[INFO] Post-processor source data: {os.path.relpath(resolved_source_dir)}")
2184 resolved_source_dir = os.path.abspath(os.path.join(run_dir, source_dir_template))
2185 print(f"[INFO] Post-processor source data (user-defined): {os.path.relpath(resolved_source_dir)}")
2187 if strict and (not os.path.isdir(resolved_source_dir) or not os.listdir(resolved_source_dir)):
2189 f"[FATAL] Source data directory for post-processing not found or empty: {os.path.relpath(resolved_source_dir)}",
2193 if not strict and (not os.path.isdir(resolved_source_dir) or not os.listdir(resolved_source_dir)):
2194 print("[WARNING] Source data directory is not available yet; keeping deferred path for scheduled post job.")
2195 return resolved_source_dir
2197def render_slurm_array_stage_script(
2202 case_index_tsv: str,
2209 """Render array script that maps SLURM_ARRAY_TASK_ID to per-case run artifacts."""
2210 resources = cluster_cfg.get("resources", {})
2211 notifications = cluster_cfg.get("notifications", {}) or {}
2212 execution = cluster_cfg.get("execution", {}) or {}
2213 module_setup = execution.get("module_setup", []) or []
2214 extra_sbatch = execution.get("extra_sbatch")
2218 f"#SBATCH --job-name={job_name}",
2219 f"#SBATCH --nodes={resources['nodes']}",
2220 f"#SBATCH --ntasks-per-node={resources['ntasks_per_node']}",
2221 f"#SBATCH --mem={resources['mem']}",
2222 f"#SBATCH --time={resources['time']}",
2223 f"#SBATCH --output={stdout_path}",
2224 f"#SBATCH --error={stderr_path}",
2225 f"#SBATCH --account={resources['account']}",
2226 f"#SBATCH --array={array_spec}",
2228 partition = resources.get("partition")
2230 lines.append(f"#SBATCH --partition={partition}")
2231 mail_user = notifications.get("mail_user")
2232 mail_type = notifications.get("mail_type")
2234 lines.append(f"#SBATCH --mail-user={mail_user}")
2236 lines.append(f"#SBATCH --mail-type={mail_type}")
2237 if isinstance(extra_sbatch, dict):
2238 for key, value in extra_sbatch.items():
2240 if not flag.startswith("--"):
2242 if isinstance(value, bool):
2244 lines.append(f"#SBATCH {flag}")
2245 elif value is not None:
2246 lines.append(f"#SBATCH {flag}={value}")
2247 elif isinstance(extra_sbatch, list):
2248 for token in extra_sbatch:
2249 lines.append(f"#SBATCH {token}")
2253 "set -euo pipefail",
2255 f'CASE_INDEX_FILE={shlex.quote(case_index_tsv)}',
2256 'LINE=$(sed -n "$((SLURM_ARRAY_TASK_ID + 1))p" "$CASE_INDEX_FILE")',
2257 'if [ -z "$LINE" ]; then',
2258 ' echo "No case entry for array index ${SLURM_ARRAY_TASK_ID}" >&2',
2261 "IFS=$'\\t' read -r CASE_INDEX CASE_ID RUN_DIR CONTROL_FILE POST_RECIPE_FILE LOG_LEVEL POST_PREFIX <<< \"$LINE\"",
2263 'echo "[$(date)] Starting case ${CASE_ID} (array index ${SLURM_ARRAY_TASK_ID})"',
2264 'export LOG_LEVEL="${LOG_LEVEL}"',
2267 for setup_line in module_setup:
2268 lines.append(str(setup_line))
2270 if stage == "solve":
2271 cmd = build_cluster_launch_command(
2274 ["-control_file", "$CONTROL_FILE"]
2277 cmd = build_cluster_launch_command(
2280 ["-control_file", "$CONTROL_FILE", "-postprocessing_config_file", "$POST_RECIPE_FILE"]
2283 # Keep shell variables unresolved inside sbatch script.
2284 def _token(tok: str) -> str:
2285 if tok.startswith("$"):
2287 return shlex.quote(str(tok))
2289 lines.append(" ".join(_token(t) for t in cmd))
2290 lines.append('echo "[$(date)] Completed case ${CASE_ID}"')
2292 os.makedirs(os.path.dirname(script_path), exist_ok=True)
2293 with open(script_path, "w") as f:
2294 f.write("\n".join(lines) + "\n")
2295 os.chmod(script_path, 0o755)
2297def extract_metric_from_csv(case_dir: str, spec: dict):
2298 """Extract a scalar metric from a CSV source."""
2299 file_glob = spec.get("file_glob", "**/*_msd.csv")
2300 candidates = sorted(glob.glob(os.path.join(case_dir, file_glob), recursive=True))
2303 csv_path = candidates[0]
2305 with open(csv_path, "r", newline="") as f:
2306 reader = csv.DictReader(f)
2307 if reader.fieldnames:
2312 column = spec.get("column")
2314 for name in reversed(reader.fieldnames):
2315 if name and name.lower() not in {"step", "time", "timestep"}:
2323 values.append(float(row[column]))
2330 reduction = str(spec.get("reduction", "last")).lower()
2331 if reduction == "mean":
2332 return float(np.mean(values))
2333 if reduction == "min":
2334 return float(np.min(values))
2335 if reduction == "max":
2336 return float(np.max(values))
2337 return float(values[-1])
2339def extract_metric_from_log(case_dir: str, spec: dict):
2340 """Extract a scalar metric from a log file using regex."""
2341 file_glob = spec.get("file_glob", "logs/*.log")
2342 regex = spec.get("regex")
2345 candidates = sorted(glob.glob(os.path.join(case_dir, file_glob), recursive=True))
2348 pattern = re.compile(regex)
2350 for path in candidates:
2352 with open(path, "r", encoding="utf-8", errors="replace") as f:
2354 m = pattern.search(line)
2357 values.append(float(m.group(1)))
2364 reduction = str(spec.get("reduction", "last")).lower()
2365 if reduction == "mean":
2366 return float(np.mean(values))
2367 if reduction == "min":
2368 return float(np.min(values))
2369 if reduction == "max":
2370 return float(np.max(values))
2371 return float(values[-1])
2373def normalize_metric_spec(metric):
2374 """Normalize study metric definitions to a common dictionary form."""
2375 if isinstance(metric, str):
2376 if metric.lower() in {"msd", "msd_final"}:
2378 "name": "msd_final",
2379 "source": "statistics_csv",
2380 "file_glob": "**/*_msd.csv",
2381 "reduction": "last",
2383 return {"name": metric, "source": "log_regex", "regex": metric}
2386def aggregate_study_metrics(study_cfg: dict, cases: list, results_dir: str) -> str:
2387 """Collect metric values from generated case directories into one CSV."""
2388 metrics = study_cfg.get("metrics", [])
2390 metrics = ["msd_final"]
2391 normalized_specs = [normalize_metric_spec(m) for m in metrics]
2395 row = {"case_id": case["case_id"]}
2396 for p_key, p_val in case["parameters"].items():
2398 for spec in normalized_specs:
2399 name = spec.get("name", "metric")
2400 source = str(spec.get("source", "")).lower()
2401 if source in {"statistics_csv", "csv"}:
2402 row[name] = extract_metric_from_csv(case["run_dir"], spec)
2403 elif source in {"log_regex", "log"}:
2404 row[name] = extract_metric_from_log(case["run_dir"], spec)
2415 for k in row.keys():
2420 os.makedirs(results_dir, exist_ok=True)
2421 out_csv = os.path.join(results_dir, "metrics_table.csv")
2422 with open(out_csv, "w", newline="") as f:
2423 writer = csv.DictWriter(f, fieldnames=all_keys)
2424 writer.writeheader()
2425 writer.writerows(rows)
2426 print(f"[SUCCESS] Aggregated metrics table: {os.path.relpath(out_csv)}")
2429def infer_plot_x_axis(study_cfg: dict, rows: list):
2430 """Infer x-axis key/values for study plots."""
2431 params = list((study_cfg.get("parameters") or {}).keys())
2432 if not params or not rows:
2435 study_type = study_cfg.get("study_type")
2436 if study_type == "grid_independence":
2437 has_im = "case.grid.programmatic_settings.im" in params
2438 has_jm = "case.grid.programmatic_settings.jm" in params
2439 has_km = "case.grid.programmatic_settings.km" in params
2440 if has_im and has_jm and has_km:
2444 im = float(row["case.grid.programmatic_settings.im"])
2445 jm = float(row["case.grid.programmatic_settings.jm"])
2446 km = float(row["case.grid.programmatic_settings.km"])
2447 xs.append((im * jm * km) ** (1.0 / 3.0))
2450 return "N^(1/3)", xs
2456 xs.append(float(row[primary]))
2461def generate_study_plots(study_cfg: dict, metrics_csv: str, plots_dir: str):
2462 """Generate metric-vs-parameter plots for completed studies."""
2463 plotting_cfg = study_cfg.get("plotting", {}) or {}
2464 if plotting_cfg.get("enabled", True) is False:
2465 print("[INFO] Plotting disabled by study.yml.")
2468 print("[WARNING] matplotlib not available; skipping plot generation.")
2470 if not metrics_csv or not os.path.isfile(metrics_csv):
2473 with open(metrics_csv, "r", newline="") as f:
2474 reader = csv.DictReader(f)
2479 x_name, x_values = infer_plot_x_axis(study_cfg, rows)
2480 if not x_name or x_values is None:
2481 print("[WARNING] Could not infer numeric x-axis for plots; skipping.")
2485 param_keys = list((study_cfg.get("parameters") or {}).keys())
2486 for key in rows[0].keys():
2487 if key in {"case_id"}:
2489 if key in param_keys:
2491 metric_keys.append(key)
2493 out_format = plotting_cfg.get("output_format", "png")
2494 os.makedirs(plots_dir, exist_ok=True)
2496 for metric in metric_keys:
2501 y_values.append(float(row[metric]))
2507 plt.figure(figsize=(7.0, 4.2))
2508 plt.plot(x_values, y_values, marker="o", linewidth=1.5)
2511 plt.title(f"{metric} vs {x_name}")
2512 plt.grid(True, alpha=0.3)
2513 out_path = os.path.join(plots_dir, f"{metric}_vs_{x_name.replace('/', '_')}.{out_format}")
2515 plt.savefig(out_path, dpi=150)
2517 generated.append(out_path)
2519 print(f"[SUCCESS] Generated {len(generated)} plot(s) in {os.path.relpath(plots_dir)}")
2523def _command_to_string(command_tokens: list) -> str:
2524 """Render a command list as a shell-safe display string."""
2525 return " ".join(shlex.quote(str(tok)) for tok in command_tokens)
2528def _resolve_post_source_directory_preview(run_dir: str, monitor_cfg: dict, post_cfg: dict) -> str:
2529 """Resolve post source directory without side effects or stdout/stderr output."""
2530 solver_output_dir_rel = monitor_cfg.get('io', {}).get('directories', {}).get('output', 'results')
2531 solver_output_dir_abs = os.path.join(run_dir, solver_output_dir_rel)
2532 source_dir_template = post_cfg.get('source_data', {}).get('directory', '<solver_output_dir>')
2533 if source_dir_template == '<solver_output_dir>':
2534 return solver_output_dir_abs
2535 return os.path.abspath(os.path.join(run_dir, source_dir_template))
2538def build_run_dry_plan(args) -> dict:
2539 """Build a no-write execution plan for `run --dry-run`."""
2542 "created_at": datetime.now().isoformat(),
2549 if args.dry_run and args.no_submit:
2550 plan["warnings"].append("--dry-run takes precedence over --no-submit; no files will be written.")
2552 cluster_mode = bool(getattr(args, "cluster", None))
2555 effective_num_procs = args.num_procs
2558 solver_control_path = None
2559 loaded_case_cfg = None
2560 loaded_monitor_cfg = None
2563 cluster_path = os.path.abspath(args.cluster)
2564 cluster_cfg = read_yaml_file(cluster_path)
2565 validate_cluster_config(cluster_cfg, cluster_path)
2566 scheduler_type = str(cluster_cfg.get("scheduler", {}).get("type", "slurm")).lower()
2567 if args.scheduler and args.scheduler.lower() != scheduler_type:
2568 emit_structured_error(
2569 ERROR_CODE_CFG_INCONSISTENT_COMBO,
2570 key="scheduler.type",
2571 file_path=cluster_path,
2572 message=f"--scheduler={args.scheduler} does not match cluster.yml scheduler.type={scheduler_type}.",
2575 if scheduler_type != "slurm":
2576 emit_structured_error(
2577 ERROR_CODE_CFG_INVALID_VALUE,
2578 key="scheduler.type",
2579 file_path=cluster_path,
2580 message=f"Unsupported scheduler '{scheduler_type}'. Only Slurm is supported in v1.",
2583 cluster_tasks = get_cluster_total_tasks(cluster_cfg)
2584 if args.num_procs not in (1, cluster_tasks):
2585 emit_structured_error(
2586 ERROR_CODE_CFG_INCONSISTENT_COMBO,
2587 key="resources.ntasks_per_node",
2588 file_path=cluster_path,
2590 "--num-procs must be 1 (auto) or exactly nodes*ntasks_per_node "
2591 f"({cluster_tasks}) in cluster mode."
2595 effective_num_procs = cluster_tasks
2596 plan["launch_mode"] = "slurm"
2597 plan["inputs"]["cluster"] = cluster_path
2599 if getattr(args, "scheduler", None):
2600 fail_cli_usage("--scheduler requires --cluster in this version.")
2601 plan["launch_mode"] = "local"
2604 case_path = os.path.abspath(args.case)
2605 solver_path = os.path.abspath(args.solver)
2606 monitor_path = os.path.abspath(args.monitor)
2607 loaded_case_cfg = read_yaml_file(case_path)
2608 solver_cfg = read_yaml_file(solver_path)
2609 loaded_monitor_cfg = read_yaml_file(monitor_path)
2610 validate_solver_configs(loaded_case_cfg, solver_cfg, loaded_monitor_cfg, case_path, solver_path, monitor_path)
2612 case_name = os.path.splitext(os.path.basename(case_path))[0]
2613 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
2614 run_id = f"{case_name}_{timestamp}"
2615 run_dir = os.path.abspath(os.path.join("runs", run_id))
2617 config_dir = os.path.join(run_dir, "config")
2618 scheduler_dir = os.path.join(run_dir, "scheduler")
2619 logs_dir = os.path.join(run_dir, "logs")
2620 solver_control_path = os.path.join(config_dir, f"{run_id}.control")
2621 whitelist_path = os.path.join(config_dir, "whitelist.run")
2622 profile_path = os.path.join(config_dir, "profile.run")
2624 plan["run_id_preview"] = run_id
2625 plan["run_dir_preview"] = run_dir
2626 plan["inputs"].update({"case": case_path, "solver": solver_path, "monitor": monitor_path})
2627 plan["artifacts"].extend(
2632 os.path.join(run_dir, "results"),
2634 os.path.join(config_dir, "case.yml"),
2635 os.path.join(config_dir, "solver.yml"),
2636 os.path.join(config_dir, "monitor.yml"),
2639 solver_control_path,
2640 os.path.join(run_dir, "manifest.json"),
2644 plan["artifacts"].append(os.path.join(config_dir, "cluster.yml"))
2645 plan["artifacts"].append(os.path.join(scheduler_dir, "submission.json"))
2647 solver_exe = os.path.join(BIN_DIR, "picsolver")
2648 solver_args = ["-control_file", solver_control_path]
2650 solver_script = os.path.join(scheduler_dir, "solver.sbatch")
2651 solver_cmd = build_cluster_launch_command(cluster_cfg, solver_exe, solver_args)
2652 plan["artifacts"].append(solver_script)
2653 plan["stages"]["solve"] = {
2655 "script": solver_script,
2656 "launch_command": solver_cmd,
2657 "launch_command_string": _command_to_string(solver_cmd),
2660 solver_cmd = [solver_exe] + solver_args
2661 if effective_num_procs > 1:
2662 solver_cmd = ["mpiexec", "-n", str(effective_num_procs)] + solver_cmd
2663 plan["stages"]["solve"] = {
2665 "launch_command": solver_cmd,
2666 "launch_command_string": _command_to_string(solver_cmd),
2669 if args.post_process:
2670 post_path = os.path.abspath(args.post)
2671 plan["inputs"]["post"] = post_path
2672 post_cfg = read_yaml_file(post_path)
2673 validate_post_config(post_cfg, post_path)
2676 run_dir = os.path.abspath(args.run_dir)
2677 if not os.path.isdir(run_dir):
2678 emit_structured_error(
2679 ERROR_CODE_CFG_FILE_NOT_FOUND,
2682 message="Specified run directory not found.",
2685 run_id = os.path.basename(run_dir)
2686 elif not args.solve:
2687 fail_cli_usage("--post-process requires --run-dir when not used with --solve.")
2690 config_dir = os.path.join(run_dir, "config")
2691 case_path, monitor_path, solver_control_path = auto_identify_run_inputs(config_dir)
2692 if not all([case_path, monitor_path, solver_control_path]):
2693 emit_structured_error(
2694 ERROR_CODE_CFG_MISSING_KEY,
2695 key="run_dir.config",
2696 file_path=config_dir,
2698 "Could not auto-identify required run inputs "
2699 "(case.yml/monitor.yml/*.control) in run config directory."
2703 loaded_case_cfg = read_yaml_file(case_path)
2704 loaded_monitor_cfg = read_yaml_file(monitor_path)
2706 config_dir = os.path.join(run_dir, "config")
2707 case_path = os.path.join(config_dir, "case.yml")
2708 monitor_path = os.path.join(config_dir, "monitor.yml")
2709 if solver_control_path is None:
2710 solver_control_path = os.path.join(config_dir, f"{run_id}.control")
2712 resolved_source_dir = _resolve_post_source_directory_preview(run_dir, loaded_monitor_cfg, post_cfg)
2713 post_recipe_path = os.path.join(config_dir, "post.run")
2714 output_dir_rel = post_cfg.get("io", {}).get("output_directory")
2715 output_prefix = post_cfg.get("io", {}).get("output_filename_prefix")
2716 if not output_dir_rel or not output_prefix:
2717 emit_structured_error(
2718 ERROR_CODE_CFG_MISSING_KEY,
2719 key="io.output_directory/io.output_filename_prefix",
2720 file_path=post_path,
2721 message="Missing required post IO keys.",
2724 output_dir_abs = os.path.abspath(os.path.join(run_dir, output_dir_rel))
2725 post_exe = os.path.join(BIN_DIR, "postprocessor")
2726 post_args = ["-control_file", solver_control_path, "-postprocessing_config_file", post_recipe_path]
2727 plan["artifacts"].extend([post_recipe_path, output_dir_abs])
2730 scheduler_dir = os.path.join(run_dir, "scheduler")
2731 post_script = os.path.join(scheduler_dir, "post.sbatch")
2732 post_cmd = build_cluster_launch_command(cluster_cfg, post_exe, post_args)
2733 plan["artifacts"].append(post_script)
2734 plan["stages"]["post-process"] = {
2736 "script": post_script,
2737 "source_data_directory": resolved_source_dir,
2738 "launch_command": post_cmd,
2739 "launch_command_string": _command_to_string(post_cmd),
2742 post_cmd = [post_exe] + post_args
2743 if effective_num_procs > 1:
2744 post_cmd = ["mpiexec", "-n", str(effective_num_procs)] + post_cmd
2745 plan["stages"]["post-process"] = {
2747 "source_data_directory": resolved_source_dir,
2748 "launch_command": post_cmd,
2749 "launch_command_string": _command_to_string(post_cmd),
2752 # Preserve insertion order while removing duplicates.
2755 for item in plan["artifacts"]:
2756 if item not in seen:
2758 deduped.append(item)
2759 plan["artifacts"] = deduped
2760 if run_id and "run_id_preview" not in plan:
2761 plan["run_id_preview"] = run_id
2762 if run_dir and "run_dir_preview" not in plan:
2763 plan["run_dir_preview"] = run_dir
2764 plan["num_procs_effective"] = effective_num_procs
2768def render_run_dry_plan(plan: dict, output_format: str = "text"):
2769 """Render dry-run plan in human or JSON format."""
2770 if output_format == "json":
2771 print(json.dumps(plan, indent=2, sort_keys=True))
2774 print("\n" + "=" * 60)
2775 print(" DRY-RUN PLAN")
2777 print(f" Launch mode : {plan.get('launch_mode')}")
2778 print(f" Created at : {plan.get('created_at')}")
2779 if plan.get("run_id_preview"):
2780 print(f" Run ID preview : {plan.get('run_id_preview')}")
2781 if plan.get("run_dir_preview"):
2782 print(f" Run dir preview: {plan.get('run_dir_preview')}")
2783 print(f" MPI processes : {plan.get('num_procs_effective')}")
2784 if plan.get("warnings"):
2785 print(" Warnings :")
2786 for warning in plan["warnings"]:
2787 print(f" - {warning}")
2789 if plan.get("inputs"):
2791 for key, value in plan["inputs"].items():
2792 print(f" - {key}: {value}")
2794 if plan.get("stages"):
2795 print("\n Planned stage commands:")
2796 for stage, details in plan["stages"].items():
2797 print(f" - {stage} ({details.get('mode')}):")
2798 print(f" {details.get('launch_command_string')}")
2800 print("\n Planned artifacts (no files created in dry-run):")
2801 for artifact in plan.get("artifacts", []):
2802 print(f" - {artifact}")
2806def validate_workflow(args):
2807 """Implements `pic.flow validate` without launching solver/post workflows."""
2809 solver_group_selected = any([args.case, args.solver, args.monitor])
2810 any_group_selected = solver_group_selected or any([args.post, args.cluster, args.study])
2812 if not any_group_selected:
2814 "validate requires at least one config group. Provide solver trio and/or --post/--cluster/--study.",
2815 hint="Example: pic.flow validate --case case.yml --solver solver.yml --monitor monitor.yml --post post.yml",
2818 if solver_group_selected and not all([args.case, args.solver, args.monitor]):
2819 fail_cli_usage("When solver validation is requested, --case, --solver, and --monitor are all required.")
2821 if solver_group_selected:
2822 case_path = os.path.abspath(args.case)
2823 solver_path = os.path.abspath(args.solver)
2824 monitor_path = os.path.abspath(args.monitor)
2825 case_cfg = read_yaml_file(case_path)
2826 solver_cfg = read_yaml_file(solver_path)
2827 monitor_cfg = read_yaml_file(monitor_path)
2828 validate_solver_configs(case_cfg, solver_cfg, monitor_cfg, case_path, solver_path, monitor_path)
2829 checked.extend([case_path, solver_path, monitor_path])
2833 post_path = os.path.abspath(args.post)
2834 post_cfg = read_yaml_file(post_path)
2835 validate_post_config(post_cfg, post_path)
2836 checked.append(post_path)
2840 cluster_path = os.path.abspath(args.cluster)
2841 cluster_cfg = read_yaml_file(cluster_path)
2842 validate_cluster_config(cluster_cfg, cluster_path)
2843 checked.append(cluster_path)
2847 study_path = os.path.abspath(args.study)
2848 study_cfg = read_yaml_file(study_path)
2849 validate_study_config(study_cfg, study_path)
2850 checked.append(study_path)
2852 if args.strict and post_cfg is not None:
2853 post_path = os.path.abspath(args.post)
2854 source_dir = post_cfg.get("source_data", {}).get("directory")
2855 if source_dir and source_dir != "<solver_output_dir>":
2856 resolved = resolve_path(post_path, source_dir)
2857 if not os.path.isdir(resolved):
2858 emit_structured_error(
2859 ERROR_CODE_CFG_FILE_NOT_FOUND,
2860 key="source_data.directory",
2861 file_path=post_path,
2862 message=f"strict mode: source_data.directory resolves to missing directory '{resolved}'.",
2866 if args.strict and study_cfg is not None:
2867 study_path = os.path.abspath(args.study)
2868 base_cfgs = study_cfg.get("base_configs", {})
2869 if isinstance(base_cfgs, dict):
2870 base_case_path = resolve_path(study_path, base_cfgs.get("case"))
2871 base_solver_path = resolve_path(study_path, base_cfgs.get("solver"))
2872 base_monitor_path = resolve_path(study_path, base_cfgs.get("monitor"))
2873 base_post_path = resolve_path(study_path, base_cfgs.get("post"))
2874 if all([base_case_path, base_solver_path, base_monitor_path]):
2875 validate_solver_configs(
2876 read_yaml_file(base_case_path),
2877 read_yaml_file(base_solver_path),
2878 read_yaml_file(base_monitor_path),
2884 validate_post_config(read_yaml_file(base_post_path), base_post_path)
2886 print(f"[SUCCESS] Validation completed for {len(checked)} file(s).")
2887 for path in checked:
2890def run_workflow(args):
2891 """Main orchestrator for the 'run' command (local and Slurm modes)."""
2892 if getattr(args, "dry_run", False):
2893 plan = build_run_dry_plan(args)
2894 render_run_dry_plan(plan, output_format=getattr(args, "output_format", "text"))
2899 output_dir_abs = None
2900 workflow_start = time.time()
2901 stages_completed = []
2903 submission_meta = {"launch_mode": "local", "stages": {}}
2905 cluster_mode = bool(getattr(args, "cluster", None))
2908 effective_num_procs = args.num_procs
2911 cluster_path = os.path.abspath(args.cluster)
2912 cluster_cfg = read_yaml_file(cluster_path)
2913 validate_cluster_config(cluster_cfg, cluster_path)
2914 scheduler_type = str(cluster_cfg.get("scheduler", {}).get("type", "slurm")).lower()
2915 if args.scheduler and args.scheduler.lower() != scheduler_type:
2917 f"[FATAL] --scheduler={args.scheduler} does not match cluster.yml scheduler.type={scheduler_type}.",
2921 if scheduler_type != "slurm":
2922 print(f"[FATAL] Unsupported scheduler '{scheduler_type}'. Only Slurm is supported in v1.", file=sys.stderr)
2924 cluster_tasks = get_cluster_total_tasks(cluster_cfg)
2925 if args.num_procs not in (1, cluster_tasks):
2927 f"[FATAL] In cluster mode, --num-procs must be 1 (auto) or exactly nodes*ntasks_per_node ({cluster_tasks}).",
2931 effective_num_procs = cluster_tasks
2932 submission_meta["launch_mode"] = "slurm"
2933 submission_meta["cluster_config"] = cluster_path
2934 submission_meta["no_submit"] = bool(args.no_submit)
2935 print(f"[INFO] Cluster mode enabled (Slurm). Using {effective_num_procs} MPI tasks from cluster.yml.")
2936 elif getattr(args, "scheduler", None):
2937 print("[FATAL] --scheduler requires --cluster in this version.", file=sys.stderr)
2940 # --- Stage 1: Solver (if requested) ---
2943 'case': read_yaml_file(args.case), 'case_path': os.path.abspath(args.case),
2944 'solver': read_yaml_file(args.solver), 'solver_path': os.path.abspath(args.solver),
2945 'monitor': read_yaml_file(args.monitor), 'monitor_path': os.path.abspath(args.monitor)
2948 print("\n[INFO] Validating configuration files...")
2949 validate_solver_configs(
2950 configs['case'], configs['solver'], configs['monitor'],
2951 args.case, args.solver, args.monitor
2953 print("[SUCCESS] All configuration files passed validation.\n")
2955 case_name = os.path.splitext(os.path.basename(args.case))[0]
2956 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
2957 run_id = f"{case_name}_{timestamp}"
2958 run_dir = os.path.abspath(os.path.join("runs", run_id))
2960 config_dir = os.path.join(run_dir, "config")
2961 for d in [config_dir, "logs", "results", os.path.join(run_dir, "scheduler")]:
2962 os.makedirs(d, exist_ok=True)
2963 print(f"[INFO] Created new self-contained run directory: {os.path.relpath(run_dir)}")
2965 shutil.copy(args.case, os.path.join(config_dir, "case.yml"))
2966 shutil.copy(args.solver, os.path.join(config_dir, "solver.yml"))
2967 shutil.copy(args.monitor, os.path.join(config_dir, "monitor.yml"))
2969 shutil.copy(cluster_path, os.path.join(config_dir, "cluster.yml"))
2971 print("\n" + "="*25 + " SOLVER STAGE " + "="*25)
2972 source_files = {'Case': args.case, 'Solver': args.solver, 'Monitor': args.monitor}
2973 print("[INFO] Generating monitoring files (whitelist.run, profile.run)...")
2974 whitelist_path = generate_simple_list_file(
2975 run_dir, run_id, configs['monitor'], 'logging', 'enabled_functions', 'whitelist.run', source_files
2977 profile_path = generate_simple_list_file(
2978 run_dir, run_id, configs['monitor'], 'profiling', 'critical_functions', 'profile.run', source_files
2981 monitor_files = {'whitelist': whitelist_path, 'profile': profile_path}
2982 control_file = generate_solver_control_file(run_dir, run_id, configs, effective_num_procs, monitor_files)
2984 solver_exe = os.path.join(BIN_DIR, "picsolver")
2985 solver_args = ["-control_file", control_file]
2987 scheduler_dir = os.path.join(run_dir, "scheduler")
2988 solver_script = os.path.join(scheduler_dir, "solver.sbatch")
2989 solver_log = os.path.join(run_dir, "logs", "solver_%j.out")
2990 solver_err = os.path.join(run_dir, "logs", "solver_%j.err")
2991 solver_cmd = build_cluster_launch_command(cluster_cfg, solver_exe, solver_args)
2992 render_slurm_script(
3000 env_vars={"LOG_LEVEL": configs['monitor'].get('logging', {}).get('verbosity', 'INFO').upper()},
3002 submission_meta["stages"]["solve"] = {"script": solver_script, "submitted": False}
3003 print(f"[SUCCESS] Generated solver Slurm script: {os.path.relpath(solver_script)}")
3004 if not args.no_submit:
3005 submit_info = submit_sbatch(solver_script)
3006 submission_meta["stages"]["solve"].update(submit_info)
3007 submission_meta["stages"]["solve"]["submitted"] = True
3008 print(f"[SUCCESS] Submitted solver job: {submit_info['job_id']}")
3009 stages_completed.append('solve')
3011 command = [solver_exe] + solver_args
3012 if effective_num_procs > 1:
3013 command = ["mpiexec", "-n", str(effective_num_procs)] + command
3014 execute_command(command, run_dir, f"{run_id}_solver.log", configs['monitor'])
3015 stages_completed.append('solve')
3017 # --- Stage 2: Post-Processing (if requested) ---
3018 if args.post_process:
3020 run_dir = os.path.abspath(args.run_dir)
3021 if not os.path.isdir(run_dir):
3022 print(f"[FATAL] Specified run directory not found: {run_dir}", file=sys.stderr)
3024 print(f"[INFO] Operating on existing run directory: {os.path.relpath(run_dir)}")
3025 run_id = os.path.basename(run_dir)
3026 elif not args.solve:
3027 print("[FATAL] --post-process requires --run-dir when not used with --solve.", file=sys.stderr)
3030 print("\n" + "="*20 + " POST-PROCESSING STAGE " + "="*20)
3031 config_dir = os.path.join(run_dir, "config")
3032 case_path, monitor_path, solver_control_path = auto_identify_run_inputs(config_dir)
3034 if not all([case_path, monitor_path, solver_control_path]):
3035 print(f"[FATAL] Could not automatically identify required config files in {config_dir}", file=sys.stderr)
3037 print(" - No 'case' file found (expected 'models' + 'boundary_conditions').", file=sys.stderr)
3038 if not monitor_path:
3039 print(" - No 'monitor' file found (expected 'io' + 'logging').", file=sys.stderr)
3040 if not solver_control_path:
3041 print(" - No '.control' file found.", file=sys.stderr)
3044 print(f"[INFO] Auto-identified Case file: {os.path.basename(case_path)}")
3045 print(f"[INFO] Auto-identified Monitor file: {os.path.basename(monitor_path)}")
3047 case_cfg = read_yaml_file(case_path)
3048 monitor_cfg = read_yaml_file(monitor_path)
3049 post_cfg = read_yaml_file(args.post)
3051 print("[INFO] Validating post-processing configuration...")
3052 validate_post_config(post_cfg, args.post)
3053 print("[SUCCESS] Post-processing configuration passed validation.\n")
3055 strict_source_check = not (cluster_mode and args.solve)
3056 resolved_source_dir = resolve_post_source_directory(run_dir, monitor_cfg, post_cfg, strict=strict_source_check)
3057 if 'source_data' not in post_cfg:
3058 post_cfg['source_data'] = {}
3059 post_cfg['source_data']['directory'] = resolved_source_dir
3061 post_io_cfg = post_cfg.get('io', {})
3063 output_dir_rel = post_io_cfg['output_directory']
3064 output_prefix = post_io_cfg['output_filename_prefix']
3065 except KeyError as e:
3066 print(f"[FATAL] Missing required key '{e.args[0]}' in the 'io' section of {args.post}", file=sys.stderr)
3069 output_dir_abs = os.path.abspath(os.path.join(run_dir, output_dir_rel))
3070 os.makedirs(output_dir_abs, exist_ok=True)
3071 print(f"[INFO] Post-processor output directory: {os.path.relpath(output_dir_abs)}")
3073 source_files_post = {'Case': case_path, 'Post-Profile': args.post}
3074 post_recipe_file = generate_post_recipe_file(run_dir, run_id, post_cfg, source_files_post)
3076 post_exe = os.path.join(BIN_DIR, "postprocessor")
3077 post_args = ["-control_file", solver_control_path, "-postprocessing_config_file", post_recipe_file]
3079 scheduler_dir = os.path.join(run_dir, "scheduler")
3080 os.makedirs(scheduler_dir, exist_ok=True)
3081 post_script = os.path.join(scheduler_dir, "post.sbatch")
3082 post_log = os.path.join(run_dir, "logs", "post_%j.out")
3083 post_err = os.path.join(run_dir, "logs", "post_%j.err")
3084 post_cmd = build_cluster_launch_command(cluster_cfg, post_exe, post_args)
3085 render_slurm_script(
3093 env_vars={"LOG_LEVEL": monitor_cfg.get('logging', {}).get('verbosity', 'INFO').upper()},
3095 submission_meta["stages"]["post-process"] = {"script": post_script, "submitted": False}
3096 print(f"[SUCCESS] Generated post Slurm script: {os.path.relpath(post_script)}")
3098 if not args.no_submit:
3099 dependency_job = None
3101 dependency_job = submission_meta.get("stages", {}).get("solve", {}).get("job_id")
3102 submit_info = submit_sbatch(post_script, dependency=dependency_job)
3103 submission_meta["stages"]["post-process"].update(submit_info)
3104 submission_meta["stages"]["post-process"]["submitted"] = True
3106 submission_meta["stages"]["post-process"]["dependency"] = f"afterok:{dependency_job}"
3107 print(f"[SUCCESS] Submitted post job: {submit_info['job_id']}")
3108 stages_completed.append('post-process')
3110 command = [post_exe] + post_args
3111 if effective_num_procs > 1:
3112 command = ["mpiexec", "-n", str(effective_num_procs)] + command
3113 execute_command(command, run_dir, f"{run_id}_{output_prefix}.log", monitor_cfg)
3114 stages_completed.append('post-process')
3119 "created_at": datetime.now().isoformat(),
3120 "launch_mode": "slurm" if cluster_mode else "local",
3121 "git_commit": get_git_commit(),
3122 "num_procs": effective_num_procs,
3123 "stages_requested": {"solve": bool(args.solve), "post_process": bool(args.post_process)},
3124 "stages_completed_or_submitted": stages_completed,
3128 manifest["inputs"]["case"] = os.path.abspath(args.case)
3129 manifest["inputs"]["solver"] = os.path.abspath(args.solver)
3130 manifest["inputs"]["monitor"] = os.path.abspath(args.monitor)
3131 if args.post_process:
3132 manifest["inputs"]["post"] = os.path.abspath(args.post)
3134 manifest["inputs"]["cluster"] = cluster_path
3135 write_json_file(os.path.join(run_dir, "scheduler", "submission.json"), submission_meta)
3136 write_json_file(os.path.join(run_dir, "manifest.json"), manifest)
3138 if stages_completed:
3139 elapsed = time.time() - workflow_start
3140 mins, secs = divmod(int(elapsed), 60)
3141 hrs, mins = divmod(mins, 60)
3143 time_str = f"{hrs}h {mins}m {secs}s"
3145 time_str = f"{mins}m {secs}s"
3147 time_str = f"{secs}s"
3149 print("\n" + "=" * 60)
3150 print(" RUN SUMMARY")
3152 print(f" Run ID : {run_id}")
3153 print(f" Run directory : {os.path.relpath(run_dir)}")
3154 print(f" Wall-clock : {time_str}")
3155 print(f" Stages : {', '.join(stages_completed)}")
3156 print(f" Launch mode : {'slurm' if cluster_mode else 'local'}")
3157 print(f" MPI processes : {effective_num_procs}")
3158 if args.solve and configs:
3159 total_steps = configs['case'].get('run_control', {}).get('total_steps', '?')
3160 result_dir = os.path.join(run_dir, configs['monitor'].get('io', {}).get('directories', {}).get('output', 'results'))
3161 print(f" Steps run : {total_steps}")
3162 print(f" Solver output : {os.path.relpath(result_dir)}")
3163 if 'post-process' in stages_completed and output_dir_abs:
3164 print(f" Post output : {os.path.relpath(output_dir_abs)}")
3165 print(f" Logs : {os.path.relpath(os.path.join(run_dir, 'logs'))}")
3167 submission_file = os.path.join(run_dir, "scheduler", "submission.json")
3168 print(f" Submission meta: {os.path.relpath(submission_file)}")
3171def sweep_workflow(args):
3172 """Study/sweep orchestration using Slurm job arrays."""
3173 study_path = os.path.abspath(args.study)
3174 cluster_path = os.path.abspath(args.cluster)
3176 study_cfg = read_yaml_file(study_path)
3177 cluster_cfg = read_yaml_file(cluster_path)
3178 validate_study_config(study_cfg, study_path)
3179 validate_cluster_config(cluster_cfg, cluster_path)
3181 study_name = os.path.splitext(os.path.basename(study_path))[0]
3182 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
3183 study_id = f"{study_name}_{timestamp}"
3184 study_dir = os.path.abspath(os.path.join("studies", study_id))
3185 cases_dir = os.path.join(study_dir, "cases")
3186 scheduler_dir = os.path.join(study_dir, "scheduler")
3187 results_dir = os.path.join(study_dir, "results")
3188 logs_dir = os.path.join(study_dir, "logs")
3189 for path in [cases_dir, scheduler_dir, results_dir, logs_dir]:
3190 os.makedirs(path, exist_ok=True)
3192 print(f"[INFO] Creating study directory: {os.path.relpath(study_dir)}")
3193 shutil.copy(study_path, os.path.join(study_dir, "study.yml"))
3194 shutil.copy(cluster_path, os.path.join(study_dir, "cluster.yml"))
3196 base_cfgs = study_cfg["base_configs"]
3197 base_paths = {k: resolve_path(study_path, v) for k, v in base_cfgs.items()}
3198 base_case = read_yaml_file(base_paths["case"])
3199 base_solver = read_yaml_file(base_paths["solver"])
3200 base_monitor = read_yaml_file(base_paths["monitor"])
3201 base_post = read_yaml_file(base_paths["post"])
3202 validate_solver_configs(base_case, base_solver, base_monitor, base_paths["case"], base_paths["solver"], base_paths["monitor"])
3203 validate_post_config(base_post, base_paths["post"])
3205 combinations = expand_parameter_matrix(study_cfg["parameters"])
3206 if not combinations:
3207 print("[FATAL] Study parameter matrix expanded to zero cases.", file=sys.stderr)
3209 print(f"[INFO] Expanded sweep matrix to {len(combinations)} case(s).")
3211 cluster_tasks = get_cluster_total_tasks(cluster_cfg)
3213 case_index_file = os.path.join(scheduler_dir, "case_index.tsv")
3215 for idx, combo in enumerate(combinations):
3216 case_id = f"case_{idx:04d}"
3217 run_dir = os.path.join(cases_dir, case_id)
3218 config_dir = os.path.join(run_dir, "config")
3219 os.makedirs(config_dir, exist_ok=True)
3220 os.makedirs(os.path.join(run_dir, "logs"), exist_ok=True)
3221 os.makedirs(os.path.join(run_dir, "results"), exist_ok=True)
3223 case_cfg = copy.deepcopy(base_case)
3224 solver_cfg = copy.deepcopy(base_solver)
3225 monitor_cfg = copy.deepcopy(base_monitor)
3226 post_cfg = copy.deepcopy(base_post)
3227 target_map = {"case": case_cfg, "solver": solver_cfg, "monitor": monitor_cfg, "post": post_cfg}
3228 for full_key, value in combo.items():
3229 root, nested = full_key.split(".", 1)
3230 _deep_set(target_map[root], nested, value)
3232 # Preserve file-based/grid-gen workflows when study cases are materialized
3233 # into new directories by rewriting external paths as absolute.
3234 absolutize_case_external_paths(case_cfg, base_paths["case"])
3236 case_path = os.path.join(config_dir, "case.yml")
3237 solver_path = os.path.join(config_dir, "solver.yml")
3238 monitor_path = os.path.join(config_dir, "monitor.yml")
3239 post_path = os.path.join(config_dir, "post.yml")
3240 write_yaml_file(case_path, case_cfg)
3241 write_yaml_file(solver_path, solver_cfg)
3242 write_yaml_file(monitor_path, monitor_cfg)
3243 write_yaml_file(post_path, post_cfg)
3245 validate_solver_configs(case_cfg, solver_cfg, monitor_cfg, case_path, solver_path, monitor_path)
3246 validate_post_config(post_cfg, post_path)
3248 source_files = {'Case': case_path, 'Solver': solver_path, 'Monitor': monitor_path}
3249 whitelist_path = generate_simple_list_file(run_dir, case_id, monitor_cfg, 'logging', 'enabled_functions', 'whitelist.run', source_files)
3250 profile_path = generate_simple_list_file(run_dir, case_id, monitor_cfg, 'profiling', 'critical_functions', 'profile.run', source_files)
3251 monitor_files = {"whitelist": whitelist_path, "profile": profile_path}
3253 "case": case_cfg, "case_path": case_path,
3254 "solver": solver_cfg, "solver_path": solver_path,
3255 "monitor": monitor_cfg, "monitor_path": monitor_path
3257 control_file = generate_solver_control_file(run_dir, case_id, configs, cluster_tasks, monitor_files)
3259 source_dir = resolve_post_source_directory(run_dir, monitor_cfg, post_cfg, strict=False)
3260 if 'source_data' not in post_cfg:
3261 post_cfg['source_data'] = {}
3262 post_cfg['source_data']['directory'] = source_dir
3263 output_prefix = post_cfg.get("io", {}).get("output_filename_prefix", "post")
3264 post_recipe = generate_post_recipe_file(run_dir, case_id, post_cfg, {'Case': case_path, 'Post-Profile': post_path})
3266 case_entries.append({
3269 "run_dir": os.path.abspath(run_dir),
3270 "control_file": control_file,
3271 "post_recipe_file": post_recipe,
3272 "log_level": str(monitor_cfg.get("logging", {}).get("verbosity", "INFO")).upper(),
3273 "post_prefix": output_prefix,
3274 "parameters": combo,
3277 with open(case_index_file, "w") as f:
3278 for entry in case_entries:
3282 str(entry["index"]),
3285 entry["control_file"],
3286 entry["post_recipe_file"],
3288 entry["post_prefix"],
3292 print(f"[SUCCESS] Wrote sweep case index: {os.path.relpath(case_index_file)}")
3294 max_idx = len(case_entries) - 1
3295 max_conc = study_cfg.get("execution", {}).get("max_concurrent_array_tasks")
3296 array_spec = f"0-{max_idx}"
3298 array_spec = f"{array_spec}%{max_conc}"
3300 solver_exe = os.path.join(BIN_DIR, "picsolver")
3301 post_exe = os.path.join(BIN_DIR, "postprocessor")
3302 solver_array_script = os.path.join(scheduler_dir, "solver_array.sbatch")
3303 post_array_script = os.path.join(scheduler_dir, "post_array.sbatch")
3304 render_slurm_array_stage_script(
3305 solver_array_script,
3306 f"{study_id}_solve",
3313 os.path.join(logs_dir, "solver_%A_%a.out"),
3314 os.path.join(logs_dir, "solver_%A_%a.err")
3316 render_slurm_array_stage_script(
3325 os.path.join(logs_dir, "post_%A_%a.out"),
3326 os.path.join(logs_dir, "post_%A_%a.err")
3328 print(f"[SUCCESS] Generated Slurm array scripts in {os.path.relpath(scheduler_dir)}")
3331 "launch_mode": "slurm",
3332 "study_id": study_id,
3333 "solver_array": {"script": solver_array_script, "submitted": False},
3334 "post_array": {"script": post_array_script, "submitted": False},
3335 "no_submit": bool(args.no_submit),
3337 if not args.no_submit:
3338 solver_submit = submit_sbatch(solver_array_script)
3339 submission["solver_array"].update(solver_submit)
3340 submission["solver_array"]["submitted"] = True
3341 post_submit = submit_sbatch(post_array_script, dependency=solver_submit["job_id"])
3342 submission["post_array"].update(post_submit)
3343 submission["post_array"]["submitted"] = True
3344 submission["post_array"]["dependency"] = f"afterok:{solver_submit['job_id']}"
3345 print(f"[SUCCESS] Submitted solver array job: {solver_submit['job_id']}")
3346 print(f"[SUCCESS] Submitted post array job: {post_submit['job_id']}")
3348 metrics_csv = aggregate_study_metrics(study_cfg, case_entries, results_dir)
3349 plots = generate_study_plots(study_cfg, metrics_csv, os.path.join(results_dir, "plots"))
3352 "study_id": study_id,
3353 "created_at": datetime.now().isoformat(),
3354 "git_commit": get_git_commit(),
3355 "study_type": study_cfg.get("study_type"),
3356 "num_cases": len(case_entries),
3358 "study_dir": study_dir,
3359 "case_index": case_index_file,
3360 "solver_array_script": solver_array_script,
3361 "post_array_script": post_array_script,
3362 "metrics_table": metrics_csv,
3363 "plots_dir": os.path.join(results_dir, "plots"),
3365 "submission": submission,
3367 write_json_file(os.path.join(scheduler_dir, "submission.json"), submission)
3368 write_json_file(os.path.join(study_dir, "study_manifest.json"), summary)
3369 write_json_file(os.path.join(results_dir, "summary.json"), {"study_id": study_id, "metrics_csv": metrics_csv, "plots": plots})
3371 print("\n" + "=" * 60)
3372 print(" STUDY SUMMARY")
3374 print(f" Study ID : {study_id}")
3375 print(f" Study directory : {os.path.relpath(study_dir)}")
3376 print(f" Cases generated : {len(case_entries)}")
3377 print(f" Array spec : {array_spec}")
3378 print(f" Solver script : {os.path.relpath(solver_array_script)}")
3379 print(f" Post script : {os.path.relpath(post_array_script)}")
3381 print(f" Metrics table : {os.path.relpath(metrics_csv)}")
3383 print(f" Plots : {os.path.relpath(os.path.join(results_dir, 'plots'))}")
3389 @brief Implements the 'init' command.
3390 @details Creates a new case study directory by copying a template. It can then
3391 either create relative symbolic links to the project's executables
3392 (default) or create a full copy of them for a self-contained study.
3393 @param[in] args The command-line arguments parsed by argparse.
3395 template_path = os.path.join(PROJECT_ROOT, "examples", args.template_name)
3396 # The destination path is relative to the current working directory.
3397 dest_path = os.path.abspath(os.path.join(os.getcwd(), args.dest_name if args.dest_name else args.template_name))
3399 if not os.path.isdir(template_path):
3400 print(f"[FATAL] Case template '{args.template_name}' not found at '{template_path}'", file=sys.stderr)
3402 if os.path.exists(dest_path):
3403 print(f"[FATAL] Destination directory '{dest_path}' already exists.", file=sys.stderr)
3406 print(f"[INFO] Initializing new case '{os.path.basename(dest_path)}' from template '{args.template_name}'...")
3408 shutil.copytree(template_path, dest_path)
3409 print(f"[SUCCESS] Copied template files to: {dest_path}")
3411 if args.copy_binaries:
3412 print("[INFO] Copying project binaries for a self-contained case...")
3414 print("[INFO] Creating symbolic links to project binaries...")
3416 main_bin_dir = os.path.join(PROJECT_ROOT, "bin")
3419 # We only want to copy/link the actual executables, not the orchestrator script itself.
3420 binaries = [f for f in os.listdir(main_bin_dir) if f != 'pic.flow' and os.path.isfile(os.path.join(main_bin_dir, f))]
3422 print("[WARNING] Main project bin/ directory contains no executables. Nothing to link or copy.", file=sys.stderr)
3425 for binary_name in binaries:
3426 source_path_abs = os.path.abspath(os.path.join(main_bin_dir, binary_name))
3427 dest_file_path = os.path.join(dest_path, binary_name)
3429 if args.copy_binaries:
3430 # Use copy2 to preserve permissions and metadata
3431 shutil.copy2(source_path_abs, dest_file_path)
3432 print(f" - Copied '{binary_name}'")
3434 relative_source_path = os.path.relpath(source_path_abs, start=dest_path)
3435 os.symlink(relative_source_path, dest_file_path)
3436 print(f" - Linked '{binary_name}'")
3438 print("[SUCCESS] Binaries are now available in your study directory.")
3439 print(" You can now 'cd' into your study and run commands locally (e.g., './picsolver ...').")
3441 except Exception as e:
3442 print(f"[ERROR] Failed to process binaries: {e}", file=sys.stderr)
3443 print(" Your case files were copied, but you will need to run commands from the project root.", file=sys.stderr)
3445def build_project(args):
3447 @brief Implements the 'build' command.
3448 @details Executes the top-level build.sh script, passing through any
3449 additional arguments directly to 'make'. This allows for building,
3450 cleaning, and other Makefile targets via the orchestrator.
3451 @param[in] args The command-line arguments parsed by argparse.
3454 print("\n" + "="*27 + " BUILD STAGE " + "="*27)
3455 build_script_path = os.path.join(PROJECT_ROOT, "build.sh")
3457 if not os.path.isfile(build_script_path):
3458 print(f"[FATAL] Build script not found at expected location: {build_script_path}", file=sys.stderr)
3459 print(" Please ensure 'build.sh' exists in the project root directory.", file=sys.stderr)
3462 # The command is the script itself, plus any passthrough arguments for make.
3463 # command = [build_script_path] + args.make_args
3464 command = ['/bin/bash', build_script_path] + args.make_args
3465 # For the build process, we don't have a monitor.yml, so we pass an empty
3466 # dict to execute_command. The command should be run in the project root.
3467 execute_command(command, PROJECT_ROOT, "build.log", {})
3472# ==============================================================================
3473# MAIN COMMAND-LINE INTERFACE PARSER
3474# ==============================================================================
3476def _add_run_parser(subparsers):
3477 """Attach `run` parser with staged execution and dry-run support."""
3478 p_run = subparsers.add_parser(
3480 help="Execute a simulation workflow (solve and/or post-process).",
3481 formatter_class=argparse.RawTextHelpFormatter,
3483 "Execute solver and/or post-processing stages.\n\n"
3485 " pic.flow run --solve --post-process -n 8 --case case.yml --solver solver.yml --monitor monitor.yml --post post.yml\n"
3486 " pic.flow run --post-process --run-dir runs/my_run --post post.yml\n"
3487 " pic.flow run --solve --post-process --case case.yml --solver solver.yml --monitor monitor.yml --post post.yml --cluster cluster.yml --no-submit\n"
3488 " pic.flow run --solve --post-process --case case.yml --solver solver.yml --monitor monitor.yml --post post.yml --dry-run\n"
3489 " pic.flow run --solve --post-process --case case.yml --solver solver.yml --monitor monitor.yml --post post.yml --dry-run --format json"
3491 epilog="Next: run `pic.flow validate ...` first for config-only checks.",
3493 run_group = p_run.add_argument_group("stages")
3494 run_group.add_argument("--solve", action="store_true", help="Execute the solver stage (creates a new run directory).")
3495 run_group.add_argument("--post-process", action="store_true", help="Execute the post-processing stage on a run directory.")
3497 solver_group = p_run.add_argument_group("solver inputs (required for --solve)")
3498 solver_group.add_argument("--case", help="Path to the case definition file (e.g., case.yml).")
3499 solver_group.add_argument("--solver", help="Path to the solver settings profile (e.g., solver.yml).")
3500 solver_group.add_argument("--monitor", help="Path to the monitoring and I/O profile (e.g., monitor.yml).")
3502 post_group = p_run.add_argument_group("post-processor inputs (required for --post-process)")
3503 post_group.add_argument("--run-dir", help="Path to an existing run directory to post-process.\n(Not needed if running with --solve in the same command).")
3504 post_group.add_argument("--post", help="Path to the post-processing recipe file (e.g., post.yml).")
3506 p_run.add_argument("-n", "--num-procs", type=int, default=1, help="Number of MPI processes for either stage.")
3507 p_run.add_argument("--cluster", help="Path to cluster.yml for Slurm execution mode.")
3508 p_run.add_argument("--scheduler", help="Explicit scheduler selector (currently 'slurm').")
3509 p_run.add_argument("--no-submit", action="store_true", help="Generate Slurm scripts/manifests but do not call sbatch.")
3510 p_run.add_argument("--dry-run", action="store_true", help="Resolve and print planned commands/artifacts without writing files.")
3513 dest="output_format",
3514 choices=["text", "json"],
3516 help="Output format for --dry-run (default: text).",
3521def _add_sweep_parser(subparsers):
3522 p_sweep = subparsers.add_parser(
3524 help="Launch a Slurm-based parameter sweep/study.",
3525 formatter_class=argparse.RawTextHelpFormatter,
3527 "Launch matrix studies from study.yml + cluster.yml.\n\n"
3529 " pic.flow sweep --study study.yml --cluster cluster.yml\n"
3530 " pic.flow sweep --study study.yml --cluster cluster.yml --no-submit"
3532 epilog="Next: inspect studies/<study_id>/results/metrics_table.csv for aggregated metrics.",
3534 p_sweep.add_argument("--study", required=True, help="Path to study.yml defining parameter matrix and metrics.")
3535 p_sweep.add_argument("--cluster", required=True, help="Path to cluster.yml defining Slurm resources.")
3536 p_sweep.add_argument("--no-submit", action="store_true", help="Generate all study artifacts without submitting jobs.")
3540def _add_validate_parser(subparsers):
3541 p_validate = subparsers.add_parser(
3543 help="Validate config files without launching solver/post.",
3544 formatter_class=argparse.RawTextHelpFormatter,
3546 "Validate one or more config roles. No solver/post execution and no run/study artifact writes.\n\n"
3548 " pic.flow validate --case case.yml --solver solver.yml --monitor monitor.yml\n"
3549 " pic.flow validate --post post.yml --cluster cluster.yml\n"
3550 " pic.flow validate --study study.yml --cluster cluster.yml --strict"
3552 epilog="Next: run `pic.flow run --dry-run ...` to inspect resolved commands/artifacts.",
3554 p_validate.add_argument("--case", help="Path to case.yml")
3555 p_validate.add_argument("--solver", help="Path to solver.yml")
3556 p_validate.add_argument("--monitor", help="Path to monitor.yml")
3557 p_validate.add_argument("--post", help="Path to post.yml")
3558 p_validate.add_argument("--cluster", help="Path to cluster.yml")
3559 p_validate.add_argument("--study", help="Path to study.yml")
3560 p_validate.add_argument("--strict", action="store_true", help="Enable additional strict checks for selected roles.")
3564def _add_init_parser(subparsers):
3565 p_init = subparsers.add_parser(
3567 help="Initialize a new case study directory from a template.",
3568 formatter_class=argparse.RawTextHelpFormatter,
3570 "Create a study directory from examples/<template_name>.\n\n"
3572 " pic.flow init flat_channel --dest my_case\n"
3573 " pic.flow init bent_channel --dest my_bent_case --copy-binaries"
3575 epilog="Next: run `pic.flow validate --case ... --solver ... --monitor ...` before execution.",
3577 p_init.add_argument("template_name", help="Name of the case template directory to copy (e.g., 'flat_channel').")
3578 p_init.add_argument(
3581 help="Optional name for the new directory. Defaults to the template name.\nPath is relative to your current working directory.",
3583 p_init.add_argument(
3585 action="store_true",
3586 help="Copy executables into the case directory instead of symlinking.\nThis creates a fully portable, self-contained case study.",
3591def _add_build_parser(subparsers):
3592 p_build = subparsers.add_parser(
3594 help="Build project executables using the Makefile.",
3595 formatter_class=argparse.RawTextHelpFormatter,
3597 "Calls the project's build.sh script. Any arguments provided after 'build'\n"
3598 "are passed directly to make.\n\n"
3601 " pic.flow build clean-project\n"
3602 " pic.flow build SYSTEM=cluster\n"
3603 " pic.flow build postprocessor"
3605 epilog="Next: run `pic.flow --help` or `pic.flow run --help` for execution commands.",
3607 p_build.add_argument(
3609 nargs=argparse.REMAINDER,
3610 help="Arguments to pass directly to the make command (e.g., 'clean-project').",
3615def build_main_parser():
3616 """Build and return the top-level CLI parser."""
3617 parser = argparse.ArgumentParser(
3618 description="pic.flow: A comprehensive conductor for the PIC-Flow CFD simulation platform.",
3619 formatter_class=argparse.RawTextHelpFormatter,
3622 " pic.flow validate --case case.yml --solver solver.yml --monitor monitor.yml --post post.yml\n"
3623 " pic.flow run --solve --post-process --case case.yml --solver solver.yml --monitor monitor.yml --post post.yml --dry-run\n"
3624 " pic.flow run --solve --post-process --case case.yml --solver solver.yml --monitor monitor.yml --post post.yml --cluster cluster.yml --no-submit\n"
3625 " pic.flow sweep --study study.yml --cluster cluster.yml\n\n"
3627 " - First run: pic.flow init ... -> pic.flow validate ... -> pic.flow run ...\n"
3628 " - Config debugging: pic.flow validate ...\n"
3629 " - Launch planning: pic.flow run ... --dry-run"
3632 subparsers = parser.add_subparsers(dest="command", required=True, help="Available commands")
3633 _add_run_parser(subparsers)
3634 _add_sweep_parser(subparsers)
3635 _add_validate_parser(subparsers)
3636 _add_init_parser(subparsers)
3637 _add_build_parser(subparsers)
3641def dispatch_command(args):
3642 """Validate argument combinations and dispatch to command handlers."""
3643 if args.command == "run":
3644 if not args.solve and not args.post_process:
3645 fail_cli_usage("At least one stage (--solve or --post-process) must be selected.")
3646 if args.solve and (not args.case or not args.solver or not args.monitor):
3647 fail_cli_usage("--solve requires --case, --solver, and --monitor.")
3648 if args.post_process and not args.post:
3649 fail_cli_usage("--post-process requires --post.")
3650 if args.scheduler and not args.cluster:
3651 fail_cli_usage("--scheduler requires --cluster in this version.")
3654 if args.command == "sweep":
3655 sweep_workflow(args)
3657 if args.command == "validate":
3658 validate_workflow(args)
3660 if args.command == "init":
3663 if args.command == "build":
3666 fail_cli_usage(f"Unsupported command '{args.command}'.")
3669if __name__ == "__main__":
3670 main_parser = build_main_parser()
3671 dispatch_command(main_parser.parse_args())