Source code for pfd.entrypoint.submit

from copy import deepcopy
import os
import copy
from turtle import up
from typing import Dict, List, Optional, Union
from pathlib import Path
import json
import dpdata
import dflow
import pfd
import re
import ase
import warnings
import time
from dflow import ArgoStep, Step, Steps, Workflow, upload_artifact, download_artifact
from pfd import train
from pfd.entrypoint.args import (
    normalize as normalize_args,
)
from pfd.entrypoint.common import global_config_workflow, expand_idx

from pfd.exploration.task.stage import ExplorationStage
from pfd.train import train_styles
#from pfd.exploration import explore_styles
from pfd.fp import fp_styles
from pfd.flow import wf_styles
from pfd.superop import PrepRunFp

from pfd.op import (
    CollectData,
    SelectConfs,
    ModelTestOP,
)

from pfd.flow import (
    ExplTrainBlock,
    ExplTrainLoop,
    PFD
)

from pfd.exploration.selector import (
    ConfFilters,
    ConfSelectorFrames,
    conf_filter_styles,
)
from pfd.exploration.converge import ConfFiltersConv, ConfFilterConv
from pfd.exploration.render import TrajRender
from pfd.exploration.scheduler import Scheduler
from pfd.constants import default_image
from pfd.utils.step_config import normalize as normalize_step_dict
from pfd.utils import (
    upload_artifact_and_print_uri,
    get_artifact_from_uri,
    matched_step_key,
    sort_slice_ops,
    print_keys_in_nice_format,
)

from pfd.superop import PrepRunExpl, PrepRunCaly
from pfd.op import (
    PrepASE,
    RunASE,
    PrepCalyInput,
    PrepCalyASEOptim,
    RunCalyASEOptim,
    CollRunCaly,
    CalyEvoStep,
    CalyEvoStepMerge,
)
from pfd.exploration.task import (
    AseTaskGroup, CalyTaskGroup
)


explore_styles = {
    "ase":{
        "preprun": PrepRunExpl,
        "prep": PrepASE,
        "run": RunASE,
        "task_grp": AseTaskGroup,
    },
    "calypso":{
        "preprun": PrepRunCaly,
        "prep": PrepCalyInput,
        "run": CalyEvoStep,
        "task_grp": CalyTaskGroup,
    }
}




default_config = normalize_step_dict({"template_config": {"image": default_image}})


[docs] def get_conf_filters(config): conf_filters = None if len(config) > 0: conf_filters = ConfFilters() for c in config: c = deepcopy(c) conf_filter = conf_filter_styles[c.pop("type")](**c) conf_filters.add(conf_filter) return conf_filters
[docs] def get_conf_filters_conv(config): conf_filters_conv = ConfFiltersConv() if len(config) > 0: for c in config: c = deepcopy(c) conf_filter_conv = ConfFilterConv.get_filter(c.pop("type"))(**c) conf_filters_conv.add(conf_filter_conv) return conf_filters_conv
[docs] def make_pfd_op( fp_style: str = "vasp", train_style: str = "dp", explore_style: str = "ase", wf_style: str = "finetune", #pert_gen_step_config: dict = default_config, prep_fp_config: dict = default_config, run_fp_config: dict = default_config, train_config: dict = default_config, #run_train_config: dict = default_config, prep_explore_config: dict = default_config, run_explore_config: dict = default_config, scheduler_config: dict = default_config, collect_data_config: dict = default_config, select_confs_config: dict = default_config, evaluate_config: dict = default_config, #inference_step_config: dict = default_config, upload_python_packages: Optional[List[os.PathLike]] = None, init_train: bool = False, init_fp: bool = False, ): ## initiate fp op if fp_style in fp_styles.keys(): prep_run_fp_op = PrepRunFp( "prep-run-fp", fp_styles[fp_style]["prep"], fp_styles[fp_style]["run"], prep_config=prep_fp_config, run_config=run_fp_config, upload_python_packages=upload_python_packages, ) else: raise RuntimeError(f"unknown fp_style {fp_style}") ## initiate DP train op if train_style in train_styles.keys(): train_op = train_styles[train_style] else: raise NotImplementedError( f"Training style {train_style} has not been implemented!" ) if explore_style == "ase": prep_run_explore_op = explore_styles[explore_style]["preprun"]( "prep-run-explore-step", explore_styles[explore_style]["prep"], explore_styles[explore_style]["run"], prep_config=prep_explore_config, run_config=run_explore_config, upload_python_packages=upload_python_packages, ) elif explore_style in ["calypso","calypso:merge"]: if explore_style == "calypso": caly_evo_step_op = CalyEvoStep( name="caly-eval-step", collect_run_caly=CollRunCaly, prep_ase_optim=PrepCalyASEOptim, run_ase_optim=RunCalyASEOptim, prep_config=prep_explore_config, run_config=run_explore_config, ) else: caly_evo_step_op = CalyEvoStepMerge( name="caly-eval-step", collect_run_caly=CollRunCaly, prep_ase_optim=PrepCalyASEOptim, run_ase_optim=RunCalyASEOptim, prep_config=prep_explore_config, run_config=run_explore_config, ) prep_run_explore_op = PrepRunCaly( "prep-run-caly-step", PrepCalyInput, caly_evo_step_op, prep_config=prep_explore_config, run_config=run_explore_config, upload_python_packages=upload_python_packages, ) else: raise ValueError(f"Explore style {explore_style} has not been implemented!") expl_train_block_op = ExplTrainBlock( name="blk", prep_run_explore_op=prep_run_explore_op, prep_run_fp_op=prep_run_fp_op, collect_data_op=CollectData, select_confs_op=SelectConfs, prep_run_train_op=train_op, evaluate_op= ModelTestOP, train_config=train_config, collect_data_config=collect_data_config, select_confs_config=select_confs_config, evaluate_config=evaluate_config, upload_python_packages=upload_python_packages, ) expl_train_loop_op = ExplTrainLoop( name="loop", expl_train_blk_op=expl_train_block_op, stage_scheduler_op=wf_styles[wf_style], ## implement a default scheduler scheduler_config=scheduler_config, upload_python_packages=upload_python_packages, ) pfd_op = PFD( name="pfd", prep_run_fp_op=prep_run_fp_op, collect_data_op=CollectData, train_op=train_op, expl_train_loop_op=expl_train_loop_op, train_config=train_config, collect_data_step_config=collect_data_config, upload_python_packages=upload_python_packages, init_train=init_train, init_fp=init_fp, ) return pfd_op
[docs] def get_systems_from_data(data, data_prefix=None): data = [data] if isinstance(data, str) else data assert isinstance(data, list) if data_prefix is not None: data = [os.path.join(data_prefix, ii) for ii in data] return data
[docs] class FlowGen: def __init__( self, config: Dict, debug: bool = False, download_path: Union[Path, str] = Path("./"), ): self._download_path = download_path if debug is True: os.environ["DFLOW_DEBUG"] = "1" elif os.environ.get("DFLOW_DEBUG"): del os.environ["DFLOW_DEBUG"] self._config = normalize_args(config) global_config_workflow(self._config) print("dflow mode: %s" % dflow.config["mode"]) self.workflow = Workflow(name=self._config["name"]) self._wf_type = config["task"].get("type") self._set_wf(self._config) @property def wf_type(self): return self._wf_type @property def download_path(self): if isinstance(self._download_path, str): return Path(self._download_path) else: return self._download_path def _set_wf(self, config: Dict): """ Build a workflow from the OP templates of model finetune """ # executor config start with prep_* or run_* default_config = config["default_step_config"] run_train_config = config["step_configs"].get( "run_train_config", default_config ) prep_fp_config = config["step_configs"].get("perp_fp_config", default_config) run_fp_config = config["step_configs"].get("run_fp_config", default_config) run_collect_data_config = config["step_configs"].get( "collect_data_config", default_config ) run_select_confs_config = config["step_configs"].get( "select_confs_config", default_config ) prep_explore_config = config["step_configs"].get( "prep_explore_config", default_config ) run_explore_config = config["step_configs"].get( "run_explore_config", default_config ) run_scheduler_config = config["step_configs"].get( "run_scheduler_config", default_config ) run_evaluate_config = config["step_configs"].get( "run_evaluate_config", default_config ) # uploaded python packages upload_python_packages = [] if custom_packages := config.get("upload_python_packages"): upload_python_packages.extend(custom_packages) upload_python_packages.extend(list(dpdata.__path__)) upload_python_packages.extend(list(dflow.__path__)) upload_python_packages.extend(list(pfd.__path__)) upload_python_packages.extend(list(ase.__path__)) ##### task configs task_type = config["task"].get("type", "finetune") max_iter = config["task"]["max_iter"] if task_type == 'dist': init_train=False init_fp=False if max_iter > 1: warnings.warn( "In most cases, there is absolutely no need for more than one training iteration for knowledge distillation!" ) elif task_type == 'finetune': init_train = config["task"]["init_train"] if init_train is False: print("No initial training before exploration") init_fp = config["task"]["init_fp"] if init_fp is False: print("Initial fp calculation skipped") #### train config train_style = config["train"]["type"] train_config = config["train"]["config"] if task_type == 'finetune': train_config["finetune_mode"] = True else: train_config["finetune_mode"] = False # read custom training template if isinstance(config["train"]["template_script"], str): with open(config["train"]["template_script"], "r") as fp: template_script = json.load(fp) elif isinstance(config["train"]["template_script"], dict): template_script = config["train"]["template_script"] #### explore config explore_style = config["exploration"]["type"] expl_stages = config["exploration"]["stages"] explore_config = config["exploration"]["config"] #### confs selection config select_confs_config = config["select_confs"] conf_filters = get_conf_filters(select_confs_config.pop("frame_filter")) render = TrajRender.get_driver(explore_style)() conf_selector = ConfSelectorFrames( render, conf_filters=conf_filters ) #### model evaluation config evaluate_config = config["evaluate"] ##### collect_data_config collect_data_config = {"test_size": evaluate_config.pop('test_size',0.1)} #### read init confs if config["inputs"]["init_confs"]["confs_paths"] is not None: init_confs_prefix = config["inputs"]["init_confs"]["prefix"] init_confs = config["inputs"]["init_confs"]["confs_paths"] init_confs = get_systems_from_data(init_confs, init_confs_prefix) else: raise RuntimeError("init_confs must be provided") #### read init fp confs if config["inputs"]["init_fp_confs"]["confs_paths"] is not None: init_fp_confs_prefix = config["inputs"]["init_fp_confs"]["prefix"] init_fp_confs = config["inputs"]["init_fp_confs"]["confs_paths"] init_fp_confs = get_systems_from_data(init_fp_confs, init_fp_confs_prefix) else: init_fp_confs = [] init_fp_confs = upload_artifact_and_print_uri(init_fp_confs, "init_fp_confs") #### create expl tasks from the init_confs task_grp_style = explore_styles[explore_style]["task_grp"] _expl_stages=[] for stg in expl_stages: expl_stage=ExplorationStage() for task_grp in stg: # Use the unified method for creating task groups expl_stage.add_task_group( task_grp_style.make_task_grp_from_conf( task_grp, init_confs, ) ) _expl_stages.append(expl_stage) #### scheduler for workflow management scheduler = Scheduler( explore_stages=_expl_stages, max_iter=max_iter, train_config=train_config ) #### upload init_data if config["inputs"]["init_data_uri"] is not None: init_data = get_artifact_from_uri(config["inputs"]["init_data_uri"]) elif config["inputs"]["init_data_sys"] is not None: init_data_prefix = config["inputs"]["init_data_prefix"] init_data = config["inputs"]["init_data_sys"] init_data = get_systems_from_data(init_data, init_data_prefix) init_data = upload_artifact_and_print_uri(init_data, "init_data") else: init_train = init_fp init_data = upload_artifact([]) #### upload init models init_model_paths = config["inputs"]["base_model_path"] if config["inputs"]["base_model_uri"] is not None: print("Using uploaded model at: ", config["inputs"]["base_model_uri"]) init_model = get_artifact_from_uri(config["inputs"]["base_model_uri"]) elif init_model_paths: init_model = upload_artifact_and_print_uri(init_model_paths, "base_model") else: raise FileNotFoundError("Pre-trained model must exist!") fp_config = {} fp_inputs_config = config["fp"]["inputs_config"] fp_type = config["fp"]["type"] if task_type == 'dist': fp_type = 'ase' fp_inputs = fp_styles[fp_type]["inputs"](**fp_inputs_config) fp_config["inputs"] = fp_inputs fp_config["run"] = config["fp"]["run_config"] fp_config["extra_output_files"] = config["fp"].get("extra_output_files", []) # aimd exploration init_fp_config = {} if init_fp: init_fp_config.update(fp_config) init_fp_inputs_config = copy.deepcopy(fp_inputs_config) init_fp_inputs_config.update(config["init_fp"]["inputs_config"]) init_fp_inputs = fp_styles[fp_type]["inputs"](**init_fp_inputs_config) init_fp_config["inputs"] = init_fp_inputs # make pfd op pfd_op = make_pfd_op( wf_style=task_type, fp_style=fp_type, train_style=train_style, explore_style=explore_style, prep_fp_config=prep_fp_config, run_fp_config=run_fp_config, train_config=run_train_config, prep_explore_config=prep_explore_config, run_explore_config=run_explore_config, scheduler_config=run_scheduler_config, collect_data_config=run_collect_data_config, select_confs_config=run_select_confs_config, evaluate_config=run_evaluate_config, upload_python_packages=upload_python_packages, init_train=init_train, init_fp=init_fp, ) pfd_step = Step( "workflow", template=pfd_op, parameters={ "block_id": "finetune", "explore_config": explore_config, "conf_selector": conf_selector, "select_confs_config": select_confs_config, "scheduler": scheduler, # training "template_script": template_script, "train_config":train_config, # fp_calculation "fp_config": fp_config, "init_fp_config":init_fp_config, #"aimd_config": aimd_config, #"aimd_sample_conf": aimd_sample_conf, "collect_data_config": collect_data_config, "evaluate_config": evaluate_config, }, artifacts={ "init_model": init_model, "expl_model": init_model, "init_data": init_data, "init_confs": init_confs, "init_fp_confs": init_fp_confs #"iter_data": iter_data, }, ) self.workflow.add(pfd_step) def _moniter(self): while True: time.sleep(4) step_info = self.workflow.query() wf_status = self.workflow.query_status() if wf_status == "Failed": raise RuntimeError( f"Workflow failed (ID: {self.workflow.id}, UID: {self.workflow.uid})" ) try: dist_post = step_info.get_step(name="finetune")[0] except IndexError: continue if dist_post["phase"] == "Succeeded": print( f"Distillation finished (ID: {self.workflow.id}, UID: {self.workflow.uid})" ) print("Retrieving completed tasks to local...") download_artifact( artifact=dist_post.outputs.artifacts["model"], path=self.download_path, ) break
[docs] def submit( self, reuse_step: Optional[List[ArgoStep]] = None, no_submission: bool = False, only_submit: bool = True, ): if not no_submission: self.workflow.submit(reuse_step=reuse_step) else: return self.workflow if not only_submit: self._moniter()
[docs] def successful_step_keys(wf, unsuccessful_step_keys: bool = False): """[From DPGEN2] Get the keys of all successful steps in the workflow. Args: wf (_type_): The workflow object. unsuccessful_step_keys (bool, optional): If True, include keys of unsuccessful steps. Defaults to False. Returns: list: A list of successful step keys. """ all_step_keys = [] steps = wf.query_step() # For reused steps whose startedAt are identical, sort them by key steps.sort(key=lambda x: "%s-%s" % (x.startedAt, x.key)) for step in steps: if not unsuccessful_step_keys: if step.key is not None and step.phase == "Succeeded": all_step_keys.append(step.key) else: if step.key is not None: all_step_keys.append(step.key) return all_step_keys
[docs] def get_superop(key): """[From DPGEN2] Get the super operation key for a given step key. Args: key (str): The step key. Returns: str: The super operation key, or None if not found. """ if "prep-expl" in key: return key.replace("prep-expl", "prep-run-explore") elif "run-expl-" in key: return re.sub("run-expl-[0-9]*", "prep-run-explore", key) elif "prep-fp" in key: return key.replace("prep-fp", "prep-run-fp") elif "run-fp-" in key: return re.sub("run-fp-[0-9]*", "prep-run-fp", key) return None
[docs] def fold_keys(all_step_keys): folded_keys = {} for key in all_step_keys: is_superop = False for superop in ["prep-run-explore", "prep-run-fp"]: if superop in key: if key not in folded_keys: folded_keys[key] = [] is_superop = True break if is_superop: continue superop = get_superop(key) # if its super OP is succeeded, fold it into its super OP if superop is not None and superop in all_step_keys: if superop not in folded_keys: folded_keys[superop] = [] folded_keys[superop].append(key) else: folded_keys[key] = [key] for k, v in folded_keys.items(): if v == []: folded_keys[k] = [k] return folded_keys
[docs] def get_resubmit_keys(wf, unsuccessful_step_keys: bool = False): """[From DPGEN2] Get the keys of all steps in the workflow for resubmission. """ all_step_keys = successful_step_keys(wf, unsuccessful_step_keys) # legal step keys step_keys = [ "train", "prep-run-explore", "prep-expl", "run-expl", "select-confs", "prep-run-fp", "prep-fp", "run-fp", "collect-data", "evaluate", # "scheduler", ] all_step_keys = matched_step_key( all_step_keys, step_keys, ) all_step_keys = sort_slice_ops( all_step_keys, ["run-expl", "run-fp"], ) folded_keys = fold_keys(all_step_keys) return folded_keys
[docs] def resubmit_workflow( wf_config, wfid, list_steps=False, reuse=None, fold=False, unsuccessful_step_keys: bool = False, **kwargs, ): global_config_workflow(normalize_args(wf_config)) old_wf = Workflow(id=wfid) folded_keys = get_resubmit_keys(old_wf, unsuccessful_step_keys) all_step_keys = sum(folded_keys.values(), []) if list_steps: prt_str = print_keys_in_nice_format( all_step_keys, ["run-train", "run-lmp", "run-fp"] ) print(prt_str) return if reuse is None: return reuse_idx = expand_idx(reuse) reused_keys = [all_step_keys[ii] for ii in reuse_idx] if fold: reused_folded_keys = {} for key in reused_keys: superop = get_superop(key) if superop is not None: if superop not in reused_folded_keys: reused_folded_keys[superop] = [] reused_folded_keys[superop].append(key) else: reused_folded_keys[key] = [key] for k, v in reused_folded_keys.items(): # reuse the super OP iif all steps within it are reused if v != [k] and k in folded_keys and set(v) == set(folded_keys[k]): reused_folded_keys[k] = [k] reused_keys = sum(reused_folded_keys.values(), []) print(reused_keys) reuse_step = old_wf.query_step(key=reused_keys) # For reused steps whose startedAt are identical, sort them by key reuse_step.sort(key=lambda x: "%s-%s" % (x.startedAt, x.key)) wf = FlowGen(wf_config) wf.submit(reuse_step=reuse_step, **kwargs)