Source code for mlonmcu.platform.tvm.tvm_tune_platform
#
# Copyright (c) 2022 TUM Department of Electrical and Computer Engineering.
#
# This file is part of MLonMCU.
# See https://github.com/tum-ei-eda/mlonmcu.git for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""TVM Tune Platform"""
import re
import os
from mlonmcu.config import str2bool
import time
import tempfile
import tarfile
import concurrent
from pathlib import Path
from .tvm_target_platform import TvmTargetPlatform
from ..platform import TunePlatform
from mlonmcu.flow.tvm.backend.tuner import (
get_autotuning_defaults,
get_autotvm_defaults,
get_autoscheduler_defaults,
get_metascheduler_defaults,
)
from mlonmcu.flow.tvm.backend.tvmc_utils import (
get_rpc_tvmc_args,
get_target_tvmc_args,
get_disabled_pass_tvmc_args,
)
from mlonmcu.artifact import Artifact, ArtifactFormat
from mlonmcu.target.metrics import Metrics
from mlonmcu.flow.tvm.backend.python_utils import prepare_python_environment
from mlonmcu.setup import utils
from mlonmcu.logging import get_logger
logger = get_logger()
[docs]
class TvmTunePlatform(TunePlatform, TvmTargetPlatform):
"""TVM Tune platform class."""
FEATURES = TunePlatform.FEATURES | TvmTargetPlatform.FEATURES | {"autotvm", "autoscheduler", "metascheduler"}
DEFAULTS = {
**TunePlatform.DEFAULTS,
**TvmTargetPlatform.DEFAULTS,
"experimental_tvmc_tune_tasks": False,
"experimental_tvmc_tune_visualize": False,
# "experimental_tvmc_tune_wandb": False, # TODO
"enable_wandb": False,
"min_repeat_ms": 0,
**{("autotuning_" + key): value for key, value in get_autotuning_defaults().items()},
**{("autotvm_" + key): value for key, value in get_autotvm_defaults().items()},
**{("autoscheduler_" + key): value for key, value in get_autoscheduler_defaults().items()},
**{("metascheduler_" + key): value for key, value in get_metascheduler_defaults().items()},
}
REQUIRED = TunePlatform.REQUIRED | TvmTargetPlatform.REQUIRED
@property
def tune_tasks(self):
# Effectively select which tasks should be tuned in the session
return self.config["tune_tasks"]
@property
def experimental_tvmc_tune_tasks(self):
value = self.config["experimental_tvmc_tune_tasks"]
return str2bool(value) if not isinstance(value, (bool, int)) else value
@property
def experimental_tvmc_tune_visualize(self):
value = self.config["experimental_tvmc_tune_visualize"]
return str2bool(value) if not isinstance(value, (bool, int)) else value
@property
def enable_wandb(self):
value = self.config["enable_wandb"]
return str2bool(value) if not isinstance(value, (bool, int)) else value
@property
def min_repeat_ms(self):
value = self.config["min_repeat_ms"]
return int(value)
[docs]
def invoke_tvmc_tune(self, *args, target=None, **kwargs):
return self.invoke_tvmc("tune", *args, target=target, **kwargs)
[docs]
def get_tune_args(self, model, backend, target, out, trials, early_stopping):
max_parallel = int(self.config.get("autotuning_max_parallel", 1))
timeout = int(self.config.get("autotuning_timeout", 1000))
results_file = self.config.get("autotuning_results_file", None)
desired_layout = backend.config.get("desired_layout", None)
ret = [
*get_target_tvmc_args(
backend.target,
extra_targets=backend.extra_targets,
target_details=backend.get_target_details(),
extra_target_details=backend.extra_target_details,
),
*(["--desired-layout", desired_layout] if desired_layout is not None else []),
*get_rpc_tvmc_args(self.use_rpc, self.rpc_key, self.rpc_hostname, self.rpc_port),
*get_disabled_pass_tvmc_args(backend.disabled_passes),
# TODO: missing: pass config etc.
*(["--early-stopping", str(early_stopping)] if early_stopping > 0 else []),
*["--parallel", str(max_parallel)],
*["--timeout", str(timeout * max_parallel)],
*["--trials", str(trials)],
*["--number", str(self.number)], # TODO: increase while tuning?
*["--repeat", str(self.repeat)], # TODO: increase while tuning?
*["--min-repeat-ms", str(self.min_repeat_ms)],
*(["--tuning-records", results_file] if results_file is not None else []),
*["--output", str(out)],
]
if self.config["autotuning_tasks"]:
assert self.experimental_tvmc_tune_tasks, f"{self.name}.tune_tasks requires experimental_tvmc_tune_tasks"
ret.extend(["--tasks", str(self.config["autotuning_tasks"])])
if self.enable_wandb:
ret.append("--wandb-callback")
ret.append(model)
return ret
[docs]
def get_autotvm_tune_args(self, model, backend, target, out, trials_global, early_stopping):
ret = self.get_tune_args(model, backend, target, out, trials_global, early_stopping)
tuner = self.config.get("autotvm_tuner", "ga")
assert tuner in ["ga", "gridsearch", "random", "xgb", "xgb_knob", "xgb-rank"]
ret.extend(["--tuner", tuner])
if self.config["autotuning_visualize"]:
to_file = self.config["autotuning_visualize_file"]
if not to_file:
to_file = "viz.png"
live = self.config["autotuning_visualize_live"]
assert self.experimental_tvmc_tune_tasks, "requires experimental_autotvm_visualize"
visualize_arg = to_file
if live:
visualize_arg += ",live"
ret.extend(["--visualize", visualize_arg])
return ret
[docs]
def get_autoscheduler_tune_args(self, model, backend, target, out, trials_global, early_stopping):
assert not self.enable_wandb, "WANDB callback not yet supported by AutoScheduler"
ret = self.get_tune_args(model, backend, target, out, trials_global, early_stopping)
ret.append("--enable-autoscheduler")
if self.config.get("autoscheduler_include_simple_tasks", False):
ret.append("--include-simple-tasks")
if self.config.get("autoscheduler_log_estimated_latency", False):
ret.append("--log-estimated-latency")
hardware_details = target.get_hardware_details()
if len(hardware_details) > 0:
for key, value in hardware_details.items():
ret.extend([f"--{key}", str(value)])
return ret
[docs]
def get_metascheduler_tune_args(self, model, backend, target, out, trials_global, trials_single, early_stopping):
ret = self.get_tune_args(model, backend, target, out, trials_global, early_stopping)
ret.append("--enable-metascheduler")
if trials_single:
ret.extend(["--trials-per-task", str(trials_single)])
return ret
def _tune_model(self, model_path, backend, target):
autotvm_enable = self.config["autotvm_enable"]
autoscheduler_enable = self.config["autoscheduler_enable"]
metascheduler_enable = self.config["metascheduler_enable"]
if not autotvm_enable and not autoscheduler_enable and not metascheduler_enable:
# Tuning not enabled! (Might happen if abstract autotune feature is used!)
return {}, {}
assert [autotvm_enable, autoscheduler_enable, metascheduler_enable].count(
True
) == 1, "Can not use AutoTVM and AutoScheduler/MetaScheduler at the same time"
results_file = self.config["autotuning_results_file"]
append = self.config["autotuning_append"]
num_workers = self.config["autotuning_num_workers"]
artifacts = []
verbose = False
if self.print_outputs:
verbose = True
def remove_empty(inp):
return [line for line in inp if len(line.strip()) > 0]
def count_failed_trials(inp):
cnt = 0
for line in inp.split("\n"):
m = re.compile(r".*\[1000000000\.0\].*").match(line)
if m:
cnt += 1
return cnt
def get_max_flops(out, prefix="M"):
res = re.compile(r"\d+\.\d+\s*\/\s*(\d+\.\d+)\s+{prefix}FLOPS").findall(out)
if len(res) > 0:
return res[-1]
return -1
# pick best records
def _pick_best(backend, records, verbose=False):
with tempfile.TemporaryDirectory() as tmp_dir:
in_file = Path(tmp_dir) / "tuning_results.log.txt"
with open(in_file, "w") as handle:
handle.write(records)
out_file = Path(tmp_dir) / "best_tuning_results.log.txt"
args = [
"--mode",
"pick",
"--i",
in_file,
"--o",
out_file,
]
env = prepare_python_environment(backend.tvm_pythonpath, backend.tvm_build_dir, backend.tvm_configs_dir)
utils.python("-m", "tvm.autotvm.record", *args, live=verbose, env=env)
with open(out_file, "r") as handle:
content_best = handle.read()
return content_best
content = ""
total_size = None
visualize_raw = None
if num_workers is not None:
if isinstance(num_workers, str):
num_workers = int(num_workers)
assert isinstance(num_workers, int) and num_workers >= 0
trials_global = self.config.get("autotuning_trials", 10)
trials_single = self.config.get("autotuning_trials_single", None)
if not isinstance(trials_global, int):
trials_global = int(trials_global)
if trials_single is not None and not isinstance(trials_single, int):
trials_single = int(trials_single)
trials = trials_single if trials_single is not None else trials_global
assert isinstance(trials, int)
early_stopping = self.config.get("autotuning_early_stopping", None)
if early_stopping is None:
early_stopping = max(trials, 10) # Let's see if this default works out...
if not isinstance(early_stopping, int):
early_stopping = int(early_stopping)
assert isinstance(early_stopping, int)
if metascheduler_enable:
assert not append, "append not supported by MetaScheduler"
assert num_workers is None or int(num_workers) == 0, "num_workers > 0 not supported by MetaScheduler"
assert not self.config["autotuning_visualize"], "autotuning_visualize not supported by MetaScheduler"
sub_metrics = {}
sub_artifacts = {}
with tempfile.TemporaryDirectory() as tmp_dir:
# out_file = Path(tmp_dir) / "tuning_results.log.txt"
tmp_dir = Path(tmp_dir)
work_dir = tmp_dir / "work_dir"
tune_args = self.get_metascheduler_tune_args(
model_path, backend, target, work_dir, trials_global, trials_single, 0
)
out = self.invoke_tvmc_tune(*tune_args, target=target, cwd=tmp_dir)
with tarfile.open(tmp_dir / "work_dir.tar", "w") as tar:
for file in os.listdir(work_dir):
tar.add(work_dir / file, arcname=os.path.join("work_dir", file))
raw = None
with open(tmp_dir / "work_dir.tar", "rb") as tar:
raw = tar.read()
artifact = Artifact(
"work_dir.tar", raw=raw, fmt=ArtifactFormat.ARCHIVE, flags=["records", "metascheduler"]
)
elif autotvm_enable or autoscheduler_enable:
if append:
if results_file is not None:
with open(results_file, "r") as handle:
content = handle.read()
sub_metrics = {}
sub_artifacts = {}
if num_workers is not None and num_workers > 0:
assert self.experimental_tvmc_tune_tasks, "num_workers requires experimental_tvmc_tune_tasks=1"
# TODO: fix
assert self.config["autotuning_tasks"] is None, "tune_tasks not supported together with num_workers > 0"
def get_tune_tasks():
with tempfile.TemporaryDirectory() as tmp_dir:
out_file = Path(tmp_dir) / "tuning_results.log.txt"
if autotvm_enable:
tune_args = self.get_autotvm_tune_args(model_path, backend, target, out_file, 1, 0)
elif autoscheduler_enable:
tune_args = self.get_autoscheduler_tune_args(model_path, backend, target, out_file, 1, 0)
else:
assert False
out = self.invoke_tvmc_tune(
*tune_args, "--tasks", "list", target=target, cwd=tmp_dir, live=False
)
lines = out.split("\n")
for i, line in enumerate(lines):
if "Available Tasks for tuning" in line:
lines = lines[i + 1 :]
break
# tasks = [line.split(". ", 1)[1] for line in lines if len(line.strip()) > 0]
# Get config space sizes
matches = re.compile(r"(\d+). Task.*\(len=(\d+)\)").findall(out)
sizes = list(map(lambda x: (int(x[0]), int(x[1])), matches))
return sizes
# num_tasks = len(get_tune_tasks())
tune_tasks = get_tune_tasks()
workers = []
with concurrent.futures.ThreadPoolExecutor(num_workers) as executor:
# for i in range(num_tasks):
for i, task_len in tune_tasks:
if total_size is None:
total_size = 0
total_size += task_len
logger.debug(f"Created worker for task {i}")
def do_work(idx, prepend, task_len):
nonlocal trials_single, trials_global, early_stopping
t0 = time.time()
with tempfile.TemporaryDirectory() as tmp_dir:
out_file = Path(tmp_dir) / "tuning_results.log.txt"
with open(out_file, "w") as handle:
handle.write(prepend)
if trials_single == 0 or (
trials_single is None
): # 0: auto, None: do not limit per task
trials_single = max(1, trials_global // len(tune_tasks))
early_stopping = max(trials_single, 10) # Let's see if this default works out...
if autotvm_enable:
tune_args = self.get_autotvm_tune_args(
model_path, backend, target, out_file, trials_single, early_stopping
)
elif autoscheduler_enable:
tune_args = self.get_autoscheduler_tune_args(
model_path, backend, target, out_file, trials_single, early_stopping
)
else:
assert False
out = self.invoke_tvmc_tune(*tune_args, "--tasks", str(idx), target=target, cwd=tmp_dir)
with open(out_file, "r") as handle:
content = handle.read()
visualize_raw_task = None
if self.config["autotuning_visualize"]:
to_file = self.config["autotuning_visualize_file"]
if not to_file or to_file is True:
to_file = Path(tmp_dir) / "viz.png"
else:
to_file = Path(to_file)
assert to_file.is_file()
with open(to_file, "rb") as handle:
visualize_raw_task = handle.read()
# content_best = _pick_best(backend, content, verbose=verbose)
sub_trials = len(remove_empty(content.split("\n")))
sub_failed_trials = count_failed_trials(content)
max_flops = get_max_flops(out)
t1 = time.time()
return (
out,
content,
task_len,
sub_trials,
sub_failed_trials,
max_flops,
t1 - t0,
visualize_raw_task,
)
workers.append(executor.submit(do_work, i, content, task_len))
all_out = ""
all_content = ""
for i, w in enumerate(workers):
logger.debug(f"Worker {i}: pending")
metrics_ = Metrics()
artifacts_ = []
try:
ret = w.result()
logger.debug(f"Worker {i}: done")
out, content, size, tuned, failed, max_flops, duration, visualize_raw_task = ret
all_out += out
all_content += content
metrics_.add("Config Space Size", size, True)
metrics_.add("Total Trials", tuned, True)
metrics_.add("Failed Trials", failed, True)
metrics_.add("Max. MFLOPS", max_flops, True)
metrics_.add("Tune Duration [s]", duration, True)
metrics_.add("Tune Duration per Trial [s]", duration / tuned + failed, True)
if early_stopping < trials_single:
early = tuned + failed < min(trials_single, size)
else:
early = False
metrics_.add("Early Stopped", early, True)
if visualize_raw_task:
visualize_artifact = Artifact(
f"tuning_progress_task{i}.png",
raw=visualize_raw_task,
fmt=ArtifactFormat.RAW,
flags=["visualize"],
)
artifacts_.append(visualize_artifact)
except AssertionError:
logger.exception(f"Worker {i}: failed")
metrics_.add("Failed Tuning", True)
sub_metrics[f"task{i}"] = metrics_
sub_artifacts[f"task{i}"] = artifacts_
out = all_out
content = all_content
else:
with tempfile.TemporaryDirectory() as tmp_dir:
out_file = Path(tmp_dir) / "tuning_results.log.txt"
with open(out_file, "w") as handle:
handle.write(content)
if autotvm_enable:
tune_args = self.get_autotvm_tune_args(
model_path, backend, target, out_file, trials_global, early_stopping
)
elif autoscheduler_enable:
tune_args = self.get_autoscheduler_tune_args(
model_path, backend, target, out_file, trials_global, early_stopping
) # TODO: expose per_task trials
else:
assert False
out = self.invoke_tvmc_tune(*tune_args, target=target, cwd=tmp_dir)
with open(out_file, "r") as handle:
content = handle.read()
visualize_raw = None
if self.config["autotuning_visualize"]:
to_file = self.config["autotuning_visualize_file"]
if not to_file or to_file is True:
to_file = Path(tmp_dir) / "viz.png"
else:
to_file = Path(tmp_dir)
assert to_file.is_file()
with open(to_file, "rb") as handle:
visualize_raw = handle.read()
else:
if results_file is None:
return {}, {}
assert Path(results_file).is_file()
with open(results_file, "r") as handle:
content = handle.read()
if metascheduler_enable:
artifacts.append(artifact)
# TODO: get num trials etc.
metrics = Metrics()
elif autotvm_enable or autoscheduler_enable:
flag = "autotvm" if not autoscheduler_enable else "autoscheduler"
artifact = Artifact(
"tuning_results.log.txt", content=content, fmt=ArtifactFormat.TEXT, flags=["records", flag]
)
artifacts.append(artifact)
if visualize_raw:
visualize_artifact = Artifact(
"tuning_progress.png", raw=visualize_raw, fmt=ArtifactFormat.RAW, flags=["visualize"]
)
artifacts.append(visualize_artifact)
metrics = Metrics()
if total_size is not None:
metrics.add("Config Space Size", total_size, True)
content_best = _pick_best(backend, content, verbose=verbose)
total_trials = len(remove_empty(content.split("\n")))
metrics.add("Total Trials", total_trials, True)
failed_trials = count_failed_trials(content)
metrics.add("Failed Trials", failed_trials, True)
if len(content_best) > 0:
artifact_ = Artifact("best_tuning_results.log.txt", content=content_best, fmt=ArtifactFormat.TEXT)
artifacts.append(artifact_)
num_tuned = len(remove_empty(content_best.split("\n")))
metrics.add("Tuned Tasks", num_tuned, True)
else:
artifact_ = Artifact("best_tuning_results.log.txt", content="", fmt=ArtifactFormat.TEXT)
artifacts.append(artifact_)
metrics.add("Tuned Tasks", 0, True)
if autotvm_enable or autoscheduler_enable or metascheduler_enable:
stdout_artifact = Artifact(
"tvmc_tune_out.log", content=out, fmt=ArtifactFormat.TEXT
) # TODO: rename to tvmaot_out.log?
artifacts.append(stdout_artifact)
return {"default": artifacts, **sub_artifacts}, {"default": metrics, **sub_metrics}