Running a model on EQSQL

Running a model on EQSQL#

In this example we show how to run a Polaris model on the EQ/SQL cluster.

Imports#

import shutil
from pathlib import Path

from polaris import __file__ as polaris_file
from polaris.hpc.eq_utils import query_workers, task_queue
from polaris.hpc.eqsql.eq import insert_task
from polaris.hpc.eqsql.task import Task
from polaris.utils.db_config import DbConfig
from polaris.utils.dir_utils import mkdir_p
from sqlalchemy import create_engine

db_config = DbConfig()
engine = create_engine(db_config.conn_string(), echo=False, isolation_level="AUTOCOMMIT")
engine.connect()

See the status of workers#

The following cell gets a (styled) pandas DataFrame that shows what each of the workers in the cluster is up to.

query_workers(engine, style_df=True, clear_idle_threshold="3 minutes")

See the queue#

The following cells shows the status of currently running or queued jobs

task_queue(engine)

Running#

Setup#

experiment = "your_experiment_name"
target_dir = Path("/mnt/p/ShareAndFileTransfer/For Jamie/tmp/src_transfer")
results_dir = Path("/mnt/p/ShareAndFileTransfer/For Jamie")
model_base_dir = Path("/mnt/p/VMS_Software/15-CI-CD-Artifacts/Polaris/models/RUN")
polaris_dir = Path(polaris_file).parent.parent

results_dir.mkdir(exist_ok=True, parents=True)


# Copy files from local to file share
def copy_self_to_server():
    src_dir = polaris_dir / "polaris" / "hpc" / "eqsql" / "examples" / "run_convergence"
    if target_dir.exists():
        shutil.rmtree(target_dir)
    mkdir_p(target_dir)
    for i in src_dir.glob("*.py"):
        shutil.copyfile(i, Path(target_dir) / i.name)


def get_input(run_id, experiment, polaris_exe=None, model_name="Grid", num_threads=24):
    num_threads = 1 if "Grid" in model_name else num_threads

    # Find the exe
    if polaris_exe is None:
        raise RuntimeError("Please provide a polaris exe (branch name or full path to exe)")
    ci_exe_path = "/mnt/p/VMS_Software/15-CI-CD-Artifacts/Polaris/polaris-linux/{branch}/ubuntu-20.04/Integrated_Model"
    polaris_exe = str(polaris_exe) if Path(polaris_exe).exists() else ci_exe_path.format(branch=polaris_exe)

    return {
        "run-id": run_id,
        "where-is-base-model": str(model_base_dir / model_name),
        "put-results-here": str(results_dir / experiment / run_id),
        "overwrite-model-files": False,
        "run-config": {"do_skim": False, "do_abm_init": False, "num_abm_runs": 1, "num_threads": num_threads},
        "where-is-polaris-exe": polaris_exe,
    }


def run(run_id, experiment, polaris_exe=None, model_name="Grid", worker_id="VMS-C.*", num_threads=24) -> Task:
    run_id = f"{experiment}-{model_name}-{run_id}"
    run_model_task_def = {"task-type": "python-module", "directory": str(target_dir), "entry-point": "main.py"}
    with engine.connect() as conn:
        input = get_input(run_id, experiment, polaris_exe=polaris_exe, model_name=model_name, num_threads=num_threads)
        task = insert_task(
            conn=conn, worker_id=worker_id, definition=run_model_task_def, exp_id=run_id, input=input
        ).value
        return task


copy_self_to_server()

The following submits the job to the cluster and gets back a Task object which can be used to interact with that job on the cluster. We can see the status of a task, cancel it, or get any log messages that have been recorded.

Note that the following cells are commented out to avoid submission of jobs by the automated CI systems.

# Submit the task for running
# task = run(
#     "develop", experiment="demo", model_name="Bloomington", polaris_exe="develop/latest", worker_id="VMS-C.*", num_threads=20
# )
# Check on task progress
# task.update_from_db(engine)
# task.get_logs(engine)

Gallery generated by Sphinx-Gallery