Skip to content

Commit

Permalink
Checkpoint on new builder/runner architecture
Browse files Browse the repository at this point in the history
Signed-off-by: Matthew Ballance <[email protected]>
  • Loading branch information
mballance committed Jan 22, 2025
1 parent 3518534 commit 5edb3c5
Show file tree
Hide file tree
Showing 45 changed files with 696 additions and 952 deletions.
14 changes: 14 additions & 0 deletions docs/Roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ are evaluated.
- Operations on input and output data
- Operations on task parameters
- Package fragments
- Define task status. Tasks can have at least two types of failures
- Pass/Fail: Fail halts successors
- Fail must come with a message and extra info
- Status markers/counters
- errors / warnings / notes
- Want known fileset to capture logfiles and related info
- Central params and datasets?
- Datasets preserve dependency relationships
- Datasets are the best way to aggregate settings
- Typed parameter sets
- Dependencies provide order in which to evaluate
- Operations on variables

## 2.0.0
- Parameterized package definition and use
Expand All @@ -29,11 +41,13 @@ are evaluated.
- JQ-based data extraction
- YAML task templates / expansions
- Support for annotating job requirements
- Support capturing schema for structured task data
- Mark tasks as producing and accepting certain data
- FileSet task `produces` fileset of `type`
- SimImage task `accepts` systemVerilogSource, verilogSource, verilogPreCompLib, etc
=> Mostly useful for checking and suggestion
=> As more are marked, can treat as more binding
-

# Library

Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ description = "DV Flow Manager is a build system for silicon design"
license = {file = "LICENSE" }

[project.scripts]
dvfm = "dv_flow_mgr.__main__:main"
dfm = "dv_flow.mgr.__main__:main"

[tool.setuptools.package-data]
dv_Flow_mgr = ['share/*']
"dv_flow.mgr" = ['share/*']

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

from .package_def import *
from .session import *
from .task_graph_runner import *
from .task import *
from .task_data import *

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def get_parser():
subparsers = parser.add_subparsers(required=True)

run_parser = subparsers.add_parser('run', help='run a flow')
run_parser.add_argument("tasks", nargs='+', help="tasks to run")
run_parser.add_argument("tasks", nargs='*', help="tasks to run")
run_parser.set_defaults(func=CmdRun())

return parser
Expand Down
90 changes: 90 additions & 0 deletions src/dv_flow/mgr/cmds/cmd_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import asyncio
import os
from ..task_graph_runner import TaskGraphRunner
from ..util import loadProjPkgDef
from ..task_graph_builder import TaskGraphBuilder
from ..task_graph_runner_local import TaskGraphRunnerLocal
from ..pkg_rgy import PkgRgy


class CmdRun(object):

def __call__(self, args):

# First, find the project we're working with
pkg = loadProjPkgDef(os.getcwd())

if pkg is None:
raise Exception("Failed to find a 'flow.dv' file that defines a package in %s or its parent directories" % os.getcwd())

print("pkg: %s" % pkg.name)

if len(args.tasks) > 0:
pass
else:
# Print out available tasks
tasks = []
for task in pkg.tasks:
tasks.append(task)
for frag in pkg.fragment_l:
for task in frag.tasks:
tasks.append(task)
tasks.sort(key=lambda x: x.name)

max_name_len = 0
for t in tasks:
if len(t.name) > max_name_len:
max_name_len = len(t.name)

print("No task specified. Available Tasks:")
for t in tasks:
desc = t.desc
if desc is None or t.desc == "":
"<no descripion>"
print("%s - %s" % (t.name.ljust(max_name_len), desc))

pass

# Create a session around <pkg>
# Need to select a backend
# Need somewhere to store project config data
# Maybe separate into a task-graph builder and a task-graph runner

# TODO: allow user to specify run root -- maybe relative to some fixed directory?
rundir = os.path.join(pkg.basedir, "rundir")

builder = TaskGraphBuilder(root_pkg=pkg, rundir=rundir)
runner = TaskGraphRunnerLocal(rundir)

tasks = []

for spec in args.tasks:
task = builder.mkTaskGraph(spec)
tasks.append(task)

asyncio.run(runner.run(tasks))

# rgy = PkgRgy.inst()
# rgy.registerPackage(pkg)


# srcdir = os.getcwd()

# session = Session(srcdir, rundir)

# package = session.load(srcdir)

# graphs = []
# for task in args.tasks:
# if task.find(".") == -1:
# task = package.name + "." + task
# subgraph = session.mkTaskGraph(task)
# graphs.append(subgraph)

# awaitables = [subgraph.do_run() for subgraph in graphs]
# print("%d awaitables" % len(awaitables))

# out = asyncio.get_event_loop().run_until_complete(asyncio.gather(*awaitables))

# print("out: %s" % str(out))

File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
100 changes: 96 additions & 4 deletions src/dv_flow_mgr/package_def.py → src/dv_flow/mgr/package_def.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
#* Author:
#*
#****************************************************************************
import os
import json
import yaml
import importlib
import sys
import pydantic
Expand All @@ -32,7 +34,7 @@
from .package_import_spec import PackageImportSpec, PackageSpec
from .task import TaskCtor, TaskParams
from .task_def import TaskDef, TaskSpec
from .tasklib.builtin_pkg import TaskNull
from .std.task_null import TaskNull


class PackageDef(BaseModel):
Expand Down Expand Up @@ -144,7 +146,8 @@ def mkTaskCtor(self, session, task, srcdir, tasks_m) -> TaskCtor:
else:
mod = sys.modules[modname]
except ModuleNotFoundError as e:
raise Exception("Failed to import module %s" % modname)
raise Exception("Failed to import module %s (basedir=%s): %s" % (
modname, self.basedir, str(e)))

if not hasattr(mod, clsname):
raise Exception("Class %s not found in module %s" % (clsname, modname))
Expand All @@ -170,7 +173,15 @@ def mkTaskCtor(self, session, task, srcdir, tasks_m) -> TaskCtor:
"str" : str,
"int" : int,
"float" : float,
"bool" : bool
"bool" : bool,
"list" : List
}
pdflt_m = {
"str" : "",
"int" : 0,
"float" : 0.0,
"bool" : False,
"list" : []
}
for p in task.params.keys():
param = task.params[p]
Expand All @@ -185,7 +196,7 @@ def mkTaskCtor(self, session, task, srcdir, tasks_m) -> TaskCtor:
if "value" in param.keys():
field_m[p] = (ptype, param["value"])
else:
field_m[p] = (ptype, )
field_m[p] = (ptype, pdflt_m[ptype_s])
else:
if p not in field_m.keys():
raise Exception("Field %s not found" % p)
Expand All @@ -208,3 +219,84 @@ def mkTaskCtor(self, session, task, srcdir, tasks_m) -> TaskCtor:

return ctor_t

@staticmethod
def load(path, exp_pkg_name=None):
return PackageDef._loadPkgDef(path, exp_pkg_name, [])
pass

@staticmethod
def _loadPkgDef(root, exp_pkg_name, file_s):
if root in file_s:
raise Exception("Recursive file processing @ %s: %s" % (root, ",".join(file_s)))
file_s.append(root)
ret = None
with open(root, "r") as fp:
print("open %s" % root)
doc = yaml.load(fp, Loader=yaml.FullLoader)
if "package" not in doc.keys():
raise Exception("Missing 'package' key in %s" % root)
pkg = PackageDef(**(doc["package"]))
pkg.basedir = os.path.dirname(root)

# for t in pkg.tasks:
# t.basedir = os.path.dirname(root)

if exp_pkg_name is not None:
if exp_pkg_name != pkg.name:
raise Exception("Package name mismatch: %s != %s" % (exp_pkg_name, pkg.name))
# else:
# self._pkg_m[exp_pkg_name] = [PackageSpec(pkg.name)
# self._pkg_spec_s.append(PackageSpec(pkg.name))

# if not len(self._pkg_spec_s):
# self._pkg_spec_s.append(PackageSpec(pkg.name))
# else:
# self._pkg_def_m[PackageSpec(pkg.name)] = pkg

print("pkg: %s" % str(pkg))

print("fragments: %s" % str(pkg.fragments))
for spec in pkg.fragments:
PackageDef._loadFragmentSpec(pkg, spec, file_s)

file_s.pop()

return pkg

@staticmethod
def _loadFragmentSpec(pkg, spec, file_s):
# We're either going to have:
# - File path
# - Directory path

if os.path.isfile(os.path.join(pkg.basedir, spec)):
PackageDef._loadFragmentFile(pkg, spec, file_s)
elif os.path.isdir(os.path.join(pkg.basedir, spec)):
PackageDef._loadFragmentDir(pkg, os.path.join(pkg.basedir, spec), file_s)
else:
raise Exception("Fragment spec %s not found" % spec)

@staticmethod
def _loadFragmentDir(pkg, dir, file_s):
for file in os.listdir(dir):
if os.path.isdir(os.path.join(dir, file)):
PackageDef._loadFragmentDir(pkg, os.path.join(dir, file), file_s)
elif os.path.isfile(os.path.join(dir, file)) and file == "flow.dv":
PackageDef._loadFragmentFile(pkg, os.path.join(dir, file), file_s)

@staticmethod
def _loadFragmentFile(pkg, file, file_s):
if file in file_s:
raise Exception("Recursive file processing @ %s: %s" % (file, ", ".join(file_s)))
file_s.append(file)

with open(file, "r") as fp:
doc = yaml.load(fp, Loader=yaml.FullLoader)
print("doc: %s" % str(doc), flush=True)
if "fragment" in doc.keys():
# Merge the package definition
frag = FragmentDef(**(doc["fragment"]))
frag.basedir = os.path.dirname(file)
pkg.fragment_l.append(frag)
else:
print("Warning: file %s is not a fragment" % file)
File renamed without changes.
File renamed without changes.
78 changes: 78 additions & 0 deletions src/dv_flow/mgr/pkg_rgy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import os
import sys
from typing import Dict, Tuple
from .package_def import PackageDef

class PkgRgy(object):
_inst = None

def __init__(self):
self._pkgpath = []
self._pkg_m : Dict[str, Tuple[str,PackageDef]] = {}

def hasPackage(self, name, search_path=False):
if name in self._pkg_m.keys():
return True
elif search_path:
for p in self._pkgpath:
if os.path.exists(os.path.join(p, name)):
return True
else:
return False

def getPackage(self, name):
if name in self._pkg_m.keys():
if self._pkg_m[name][1] is None:
pkg_def = PackageDef.load(self._pkg_m[name][0])
# Load the package
self._pkg_m[name] = (
self._pkg_m[name][0],
pkg_def
)
pass
return self._pkg_m[name][1]
else:
# Go search the package path
return None

def registerPackage(self, pkg_def):
if pkg_def.name in self._pkg_m.keys():
raise Exception("Duplicate package %s" % pkg_def.name)
self._pkg_m[pkg_def.name] = pkg_def

def _discover_plugins(self):
# Register built-in package
self._pkg_m["std"] = (os.path.join(os.path.dirname(__file__), "std/flow.dv"), None)

if sys.version_info < (3,10):
from importlib_metadata import entry_points
else:
from importlib.metadata import entry_points

discovered_plugins = entry_points(group='dv_flow.mgr')
for p in discovered_plugins:
try:
mod = p.load()

if hasattr(mod, "dvfm_packages"):
pkg_m = mod.dvfm_packages()

for name,path in pkg_m.items():
if name in self._pkg_m.keys():
raise Exception("Package %s already registered using path %s. Conflicting path: %s" % (
name, self._pkg_m[name][0], path))
self._pkg_m[name] = (path, None)
except Exception as e:
print("Error loading plugin %s: %s" % (p.name, str(e)))
raise e

# self._pkgs = {}
# for pkg in self._load_pkg_list():
# self._pkgs[pkg.name] = pkg

@classmethod
def inst(cls):
if cls._inst is None:
cls._inst = cls()
cls._inst._discover_plugins()
return cls._inst
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import glob
import pydantic.dataclasses as dc
from typing import List, Tuple
from dv_flow_mgr import Task, TaskData, TaskMemento
from dv_flow_mgr import FileSet as _FileSet
from dv_flow.mgr import Task, TaskData, TaskMemento
from dv_flow.mgr import FileSet as _FileSet

class TaskFileSetMemento(TaskMemento):
files : List[Tuple[str,float]] = dc.Field(default_factory=list)
Expand Down Expand Up @@ -45,7 +45,7 @@ async def run(self, input : TaskData) -> TaskData:
for file in included_files:
if not any(glob.fnmatch.fnmatch(file, os.path.join(glob_root, pattern)) for pattern in self.params.exclude):
memento.files.append((file, os.path.getmtime(os.path.join(glob_root, file))))
fs.files.append(file[len(glob_root):])
fs.files.append(file[len(glob_root)+1:])

# Check to see if the filelist or fileset have changed
# Only bother doing this if the upstream task data has not changed
Expand Down
Loading

0 comments on commit 5edb3c5

Please sign in to comment.