-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathht_workgraph.py
140 lines (120 loc) · 4.17 KB
/
ht_workgraph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""Workgraph to run high-throughput calculations."""
from pathlib import Path
from typing import Callable, Union
from aiida.engine import CalcJob, WorkChain
from aiida.orm import Str
from aiida_workgraph import WorkGraph, task
from ase.io import read
from aiida_mlip.helpers.help_load import load_structure
@task.graph_builder(outputs=[{"name": "final_structures", "from": "context.structs"}])
def build_ht_calc(
calc: Union[CalcJob, Callable, WorkChain, WorkGraph],
folder: Union[Path, str, Str],
calc_inputs: dict,
input_struct_key: str = "struct",
final_struct_key: str = "final_structure",
recursive: bool = True,
) -> WorkGraph:
"""
Build high throughput calculation WorkGraph.
The `calc` must take a structure, by default `struct`, as one of its inputs.
Tasks will then be created to carry out the calculation for each structure file in
`folder`.
Parameters
----------
calc : Union[CalcJob, Callable, WorkChain, WorkGraph]
Calculation to be performed on all structures.
folder : Union[Path, str, Str]
Path to the folder containing input structure files.
calc_inputs : dict
Dictionary of inputs, shared by all the calculations. Must not contain
`struct_key`.
input_struct_key : str
Keyword for input structure for `calc`. Default is "struct".
final_struct_key : str
Key for final structure output from `calc`. Default is "final_structure".
recursive : bool
Whether to search `folder` recursively. Default is True.
Returns
-------
WorkGraph
The workgraph with calculation tasks for each structure.
Raises
------
FileNotFoundError
If `folder` has no valid structure files.
"""
wg = WorkGraph()
structure = None
if isinstance(folder, Str):
folder = Path(folder.value)
if isinstance(folder, str):
folder = Path(folder)
pattern = "**/*" if recursive else "*"
for file in filter(Path.is_file, folder.glob(pattern)):
try:
read(file)
except Exception:
continue
structure = load_structure(file)
calc_inputs[input_struct_key] = structure
calc_task = wg.add_task(
calc,
name=f"calc_{file.stem}",
**calc_inputs,
)
calc_task.set_context({final_struct_key: f"structs.{file.stem}"})
if structure is None:
raise FileNotFoundError(
f"{folder} is empty or has no readable structure files."
)
return wg
def get_ht_workgraph(
calc: Union[CalcJob, Callable, WorkChain, WorkGraph],
folder: Union[Path, str, Str],
calc_inputs: dict,
input_struct_key: str = "struct",
final_struct_key: str = "final_structure",
recursive: bool = True,
max_number_jobs: int = 10,
) -> WorkGraph:
"""
Get WorkGraph to carry out calculation on all structures in a directory.
Parameters
----------
calc : Union[CalcJob, Callable, WorkChain, WorkGraph]
Calculation to be performed on all structures.
folder : Union[Path, str, Str]
Path to the folder containing input structure files.
calc_inputs : dict
Dictionary of inputs, shared by all the calculations. Must not contain
`struct_key`.
input_struct_key : str
Keyword for input structure for `calc`. Default is "struct".
final_struct_key : str
Key for final structure output from `calc`. Default is "final_structure".
recursive : bool
Whether to search `folder` recursively. Default is True.
max_number_jobs : int
Max number of subprocesses running within the WorkGraph. Default is 10.
Returns
-------
WorkGraph
The workgraph ready to be submitted.
"""
wg = WorkGraph("ht_calculation")
wg.add_task(
build_ht_calc,
name="ht_calc",
calc=calc,
folder=folder,
calc_inputs=calc_inputs,
input_struct_key=input_struct_key,
final_struct_key=final_struct_key,
recursive=recursive,
)
wg.group_outputs = [
{"name": "final_structures", "from": "ht_calc.final_structures"}
]
wg.max_number_jobs = max_number_jobs
return wg