-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathWorkflowManager.py
304 lines (254 loc) · 10.7 KB
/
WorkflowManager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
from typing import Callable, List, Dict, Any, Optional
import threading
import json
import traceback
import numpy as np
class WorkflowContext:
def __init__(self):
self.data: Dict[str, Any] = {}
self.should_stop = False
self.current_step_index = 0
self.event_listeners: Dict[str, List[Callable[[Dict[str, Any]], None]]] = {}
self.objects: Dict[str, Any] = {} # Storage for arbitrary objects
def set_object(self, key: str, obj: Any):
self.objects[key] = obj
def get_object(self, key: str) -> Any:
return self.objects.get(key)
def remove_object(self, key: str):
if key in self.objects:
del self.objects[key]
def store_step_result(self, step_id: str, metadata: Dict[str, Any]):
self.data[step_id] = metadata
def get_step_result(self, step_id: str) -> Optional[Dict[str, Any]]:
return self.data.get(step_id)
def update_metadata(self, step_id: str, key: str, value: Any):
if step_id not in self.data:
self.data[step_id] = {}
self.data[step_id][key] = value
def on(self, event_name: str, callback: Callable[[Dict[str, Any]], None]):
self.event_listeners.setdefault(event_name, []).append(callback)
def emit_event(self, event_name: str, payload: Dict[str, Any]):
for cb in self.event_listeners.get(event_name, []):
cb(payload)
def request_stop(self):
self.should_stop = True
class WorkflowStep:
def __init__(
self,
name: str,
main_func: Callable[..., Any],
main_params: Dict[str, Any],
step_id: str,
pre_funcs: Optional[List[Callable[..., Any]]] = None,
pre_params: Optional[Dict[str, Any]] = None,
post_funcs: Optional[List[Callable[..., Any]]] = None,
post_params: Optional[Dict[str, Any]] = None,
):
self.name = name
self.main_func = main_func
self.main_params = main_params
self.step_id = step_id
self.pre_funcs = pre_funcs or []
self.pre_params = pre_params or {}
self.post_funcs = post_funcs or []
self.post_params = post_params or {}
# Allowing retries handling (if provided in main_params)
self.max_retries = self.main_params.pop("max_retries", 0)
def run(self, context: WorkflowContext):
if context.should_stop:
return None # Don't run if stop requested
# Merge params for pre and post functions
# We'll pass metadata and context, plus pre/post_params
# This lets pre/post funcs accept flexible arguments
metadata = {
"step_id": self.step_id,
**self.main_params,
}
# Emit event before step starts
context.emit_event("progress", {"status": "started", "step_id": self.step_id, "name": self.name})
# Run pre-processing functions
metadata["pre_result"] = []
for f in self.pre_funcs:
# Merge context, metadata and pre_params
merged_pre_params = {**self.pre_params, "context": context, "metadata": metadata}
result = f(**merged_pre_params)
metadata["pre_result"].append(result)
if context.should_stop:
return None
# Run main function with error handling and retries
retries = self.max_retries
while True:
try:
result = self.main_func(**self.main_params)
metadata["result"] = result
break
except Exception as e:
metadata["error"] = str(e)
metadata["traceback"] = traceback.format_exc()
if retries > 0:
retries -= 1
# Optionally emit event about retry
context.emit_event("progress", {"status": "retrying", "step_id": self.step_id})
else:
# No more retries, stop workflow or handle gracefully
context.should_stop = True
context.store_step_result(self.step_id, metadata)
context.emit_event("progress", {"status": "failed", "step_id": self.step_id})
return None
# Run post-processing functions
metadata["post_result"] = []
for f in self.post_funcs:
merged_post_params = {**self.post_params, "context": context, "metadata": metadata}
result = f(**merged_post_params)
metadata["post_result"].append(result)
if context.should_stop:
return None
# Store final metadata in the context
context.store_step_result(self.step_id, metadata)
# Emit event that step completed
context.emit_event("progress", {"status": "completed", "step_id": self.step_id, "name": self.name})
return metadata["result"]
class Workflow:
def __init__(self, steps: List[WorkflowStep]):
self.steps = steps
def run(self, context: Optional[WorkflowContext] = None):
# Either use the given context or create a new one
context = context or WorkflowContext()
# Resume from current_step_index if previously stopped
for i in range(context.current_step_index, len(self.steps)):
step = self.steps[i]
if context.should_stop:
break
step.run(context)
context.current_step_index = i + 1 # Update progress for resume
return context
def run_in_background(self, context: Optional[WorkflowContext] = None):
# Run workflow in a background thread
t = threading.Thread(target=self.run, args=(context,))
t.start()
return t
# Example device functions
def move_stage(x: float, y: float, z: float = 0.0):
print(f"Moving stage to X={x}, Y={y}, Z={z}")
return (x, y, z)
def autofocus(context: WorkflowContext, metadata: Dict[str, Any]):
print("Performing autofocus...")
metadata["autofocus_done"] = True
def save_data(context: WorkflowContext, metadata: Dict[str, Any]):
print(f"Saving data for step {metadata['step_id']}")
context.update_metadata(metadata["step_id"], "saved", True)
def set_laser_power(power: float, channel: str):
print(f"Setting laser power to {power} for channel {channel}")
return power
def acquire_frame(channel: str):
print(f"Acquiring frame on channel {channel}")
frame = np.random.rand(512, 512)
return frame
def process_data(context: WorkflowContext, metadata: Dict[str, Any]):
print(f"Processing data for step {metadata['step_id']}...")
metadata["processed"] = True
def save_frame(context: WorkflowContext, metadata: Dict[str, Any]):
print(f"Saving frame for step {metadata['step_id']}...")
metadata["frame_saved"] = True
def wait_time(seconds: int, context: WorkflowContext, metadata: Dict[str, Any]):
import time
time.sleep(seconds)
metadata["waited"] = seconds
def addFrametoFile(frame:np.ndarray, context: WorkflowContext, metadata: Dict[str, Any]):
print(f"Adding frame to file for step {metadata['step_id']}...")
metadata["frame_added"] = True
def append_data(context: WorkflowContext, metadata: Dict[str, Any]):
obj = context.get_object("data_buffer")
if obj is not None:
obj.append(metadata["result"])
# Example: Dynamically generate a workflow
x_positions = [0, 10]
y_positions = [0, 10]
z_positions = [0, 5]
channels = ["Brightfield", "GFP"]
frames = range(2)
workflowSteps = []
mFile = []
for x in x_positions:
for y in y_positions:
# Move XY with autofocus pre and save_data post
workflowSteps.append(WorkflowStep(
name=f"Move XY to ({x}, {y})",
main_func=move_stage,
main_params={"x": x, "y": y, "z": 0},
step_id=str(len(workflowSteps)),
pre_funcs=[autofocus],
post_funcs=[save_data]
))
for z in z_positions:
workflowSteps.append(WorkflowStep(
name=f"Move Z to {z}",
step_id=str(len(workflowSteps)),
main_func=move_stage,
main_params={"x": x, "y": y, "z": z},
pre_funcs=[],
post_funcs=[]
))
for ch in channels:
workflowSteps.append(WorkflowStep(
name=f"Set laser power for {ch}",
step_id=str(len(workflowSteps)),
main_func=set_laser_power,
main_params={"power": 10, "channel": ch},
pre_funcs=[],
post_funcs=[]
))
for fr in frames:
workflowSteps.append(WorkflowStep(
name=f"Acquire frame {ch}",
step_id=str(len(workflowSteps)),
main_func=acquire_frame,
main_params={"channel": ch},
pre_funcs=[wait_time],
pre_params={"seconds": 1},
post_funcs=[process_data, save_frame, append_data]
))
# Example event listener: print progress events
def progress_listener(event_data):
print(f"EVENT: {event_data}")
wf = Workflow(workflowSteps)
context = WorkflowContext()
context.on("progress", progress_listener)
# Set a generic object (like a list) into context
context.set_object("data_buffer", [])
# Running normally
context = wf.run(context)
print("Workflow results:", context.data)
all_frames = context.get_object("data_buffer")
# Example: If we wanted to stop the workflow halfway:
# context.request_stop()
# Example: Running in the background:
# background_thread = wf.run_in_background(context)
# Example of automating workflow creation from a config file:
function_registry = {
"move_stage": move_stage,
"autofocus": autofocus,
"save_data": save_data,
"set_laser_power": set_laser_power,
"acquire_frame": acquire_frame,
"process_data": process_data,
"save_frame": save_frame
}
def load_workflow_from_config(config_path: str) -> Workflow:
with open(config_path, 'r') as f:
config = json.load(f)
steps = []
for step_def in config["steps"]:
pre_funcs = [function_registry[p] for p in step_def.get("pre_funcs", [])]
post_funcs = [function_registry[p] for p in step_def.get("post_funcs", [])]
steps.append(WorkflowStep(
name=step_def["name"],
main_func=function_registry[step_def["main_func"]],
main_params=step_def["main_params"],
step_id=step_def["step_id"],
pre_funcs=pre_funcs,
post_funcs=post_funcs
))
return Workflow(steps)
# With a properly structured JSON config, you can load various workflows dynamically.
# This addresses automating workflow generation for different imaging applications.