Note
Go to the end to download the full example code.
Running a model on EQSQL#
In this example we show how to run a Polaris model on the EQ/SQL cluster.
Imports#
import shutil
from pathlib import Path
from polaris import __file__ as polaris_file
from polaris.hpc.eq_utils import query_workers, task_queue
from polaris.hpc.eqsql.eq import insert_task
from polaris.hpc.eqsql.task import Task
from polaris.utils.db_config import DbConfig
from polaris.utils.dir_utils import mkdir_p
from sqlalchemy import create_engine
engine = DbConfig.eqsql_db().create_engine()
See the status of workers#
The following cell gets a (styled) pandas DataFrame that shows what each of the workers in the cluster is up to.
query_workers(engine, style_df=True, clear_idle_threshold="3 minutes")
See the queue#
The following cells shows the status of currently running or queued jobs
task_queue(engine)
Running#
Setup#
experiment = "your_experiment_name"
target_dir = Path("/mnt/p/ShareAndFileTransfer/For Jamie/tmp/src_transfer")
results_dir = Path("/mnt/p/ShareAndFileTransfer/For Jamie")
model_base_dir = Path("/mnt/p/VMS_Software/15-CI-CD-Artifacts/Polaris/models/RUN")
polaris_dir = Path(polaris_file).parent.parent
results_dir.mkdir(exist_ok=True, parents=True)
# Copy files from local to file share
def copy_self_to_server():
src_dir = polaris_dir / "polaris" / "hpc" / "eqsql" / "examples" / "run_convergence"
if target_dir.exists():
shutil.rmtree(target_dir)
mkdir_p(target_dir)
for i in src_dir.glob("*.py"):
shutil.copyfile(i, Path(target_dir) / i.name)
def get_input(run_id, experiment, polaris_exe=None, model_name="Grid", num_threads=24):
num_threads = 1 if "Grid" in model_name else num_threads
# Find the exe
if polaris_exe is None:
raise RuntimeError("Please provide a polaris exe (branch name or full path to exe)")
ci_exe_path = "/mnt/p/VMS_Software/15-CI-CD-Artifacts/Polaris/polaris-linux/{branch}/ubuntu-20.04/Integrated_Model"
polaris_exe = str(polaris_exe) if Path(polaris_exe).exists() else ci_exe_path.format(branch=polaris_exe)
return {
"run-id": run_id,
"where-is-base-model": str(model_base_dir / model_name),
"put-results-here": str(results_dir / experiment / run_id),
"overwrite-model-files": False,
"run-config": {"do_skim": False, "do_abm_init": False, "num_abm_runs": 1, "num_threads": num_threads},
"where-is-polaris-exe": polaris_exe,
}
def run(run_id, experiment, polaris_exe=None, model_name="Grid", worker_id="VMS-C.*", num_threads=24) -> Task:
run_id = f"{experiment}-{model_name}-{run_id}"
run_model_task_def = {"task-type": "python-module", "directory": str(target_dir), "entry-point": "main.py"}
with engine.connect() as conn:
input = get_input(run_id, experiment, polaris_exe=polaris_exe, model_name=model_name, num_threads=num_threads)
task = insert_task(
conn=conn, worker_id=worker_id, definition=run_model_task_def, exp_id=run_id, input=input
).value
return task
copy_self_to_server()
The following submits the job to the cluster and gets back a Task object which can be used to interact with that job on the cluster. We can see the status of a task, cancel it, or get any log messages that have been recorded.
Note that the following cells are commented out to avoid submission of jobs by the automated CI systems.
# Submit the task for running
# task = run(
# "develop", experiment="demo", model_name="Bloomington", polaris_exe="develop/latest", worker_id="VMS-C.*", num_threads=20
# )
# Check on task progress
# task.update_from_db(engine)
# task.get_logs(engine)