diff --git a/python/tvm/utils/roofline.py b/python/tvm/utils/roofline.py index 431becdd00d1..6d1ac753e27e 100644 --- a/python/tvm/utils/roofline.py +++ b/python/tvm/utils/roofline.py @@ -18,23 +18,32 @@ from typing import Dict, Union, Optional import numpy as np -from .. import auto_scheduler, relay, tir, nd, IRModule, build, topi, transform +from .. import auto_scheduler, relay, tir, nd, IRModule, build, topi, transform, get_global_func from ..target import Target from ..runtime import profiler_vm, profiling, Device, num_threads from ..script import tir as T from ..ir.instrument import pass_instrument from ..ir.expr import GlobalVar +from ..rpc.base import RPC_SESS_MASK +from ..rpc.client import RPCSession +from ..contrib import utils -def _create_args(mod: IRModule, dev: Device, func_name: str = "main"): +def _create_args(mod: IRModule, dev: Device, func_name: str = "main", remote=None): + if dev.device_type >= RPC_SESS_MASK: + random_fill = remote.get_function("tvm.contrib.random.random_fill") + else: + random_fill = get_global_func("tvm.contrib.random.random_fill") + assert random_fill, "Please make sure USE_RANDOM is ON in config.cmake" args = [] for arg in mod[func_name].params: - args.append( - nd.array( - np.zeros([x.value for x in arg.type_annotation.shape], arg.type_annotation.dtype), - device=dev, - ) + ary = nd.empty( + [x.value for x in arg.type_annotation.shape], + arg.type_annotation.dtype, + device=dev, ) + random_fill(ary) + args.append(ary) return args @@ -103,6 +112,7 @@ def estimate_peak_fma_flops( dev: Device, vec_width: Optional[int] = None, num_vector_registers: Optional[int] = None, + remote: Optional[RPCSession] = None, ) -> float: """ Estimate the maximum number of FLOP/s this target/device combo is capable @@ -123,6 +133,9 @@ def estimate_peak_fma_flops( num_vector_registers : Optional[int] Number of vector registers on the underlying hardware. Will try to infer if no value is provided. + remote : Optional[RPCSession] + Remote session used to upload artifacts for runtime evaluation. Must be + the same session used to create `dev`. Returns ------- @@ -146,7 +159,23 @@ def estimate_peak_fma_flops( ) with transform.PassContext(opt_level=3): f = build(specialized, target=target) - a = nd.array(np.ones((nthreads, num_vector_registers, vec_width), dtype="float32"), device=dev) + + # upload to remote if running over rpc + if dev.device_type >= RPC_SESS_MASK: + if remote is None: + raise RuntimeError("A RPCSession must be provided when using a remote device.") + temp = utils.tempdir() + path = temp.relpath("peak_fma_flops.tar") + f.export_library(path) + remote.upload(path) + f = remote.load_module("peak_fma_flops.tar") + random_fill = remote.get_function("tvm.contrib.random.random_fill") + else: + random_fill = get_global_func("tvm.contrib.random.random_fill") + assert random_fill, "Please make sure USE_RANDOM is ON in config.cmake" + + a = nd.empty((nthreads, num_vector_registers, vec_width), dtype="float32", device=dev) + random_fill(a) times = f.time_evaluator(f.entry_name, dev, repeat=100, number=1)(a) flops = 2 * vec_width * num_vector_registers * nthreads * iters # fma is two flops flop_s = flops / times.min @@ -171,7 +200,12 @@ def peak_bandwidth_tir(a: T.handle, b: T.handle, threads: T.int32, vec_width: T. B[i, l, j] += A[i, k, l, j] -def estimate_peak_bandwidth(target: Target, dev: Device, vec_width: Optional[int] = None) -> float: +def estimate_peak_bandwidth( + target: Target, + dev: Device, + vec_width: Optional[int] = None, + remote: Optional[RPCSession] = None, +) -> float: """Estimate peak memory bandwidth of a target/device combo. Peak bandwidth is estimated by running a small experiment on the underlying @@ -187,6 +221,9 @@ def estimate_peak_bandwidth(target: Target, dev: Device, vec_width: Optional[int Device to measure peak bandwidth on. vec_width : Optional[int] Vector unit width, determined from target if not supplied. + remote : Optional[RPCSession] + Remote session used to upload artifacts for runtime evaluation. Must be + the same session used to create `dev`. Returns ------- @@ -207,13 +244,30 @@ def estimate_peak_bandwidth(target: Target, dev: Device, vec_width: Optional[int ) with transform.PassContext(opt_level=3): f = build(specialized, target=target) + + # upload to remote if running over rpc + if dev.device_type >= RPC_SESS_MASK: + if remote is None: + raise RuntimeError("A RPCSession must be provided when using a remote device.") + temp = utils.tempdir() + path = temp.relpath("peak_bandwidth.tar") + f.export_library(path) + remote.upload(path) + f = remote.load_module("peak_bandwidth.tar") + random_fill = remote.get_function("tvm.contrib.random.random_fill") + else: + random_fill = get_global_func("tvm.contrib.random.random_fill") + assert random_fill, "Please make sure USE_RANDOM is ON in config.cmake" + threads = num_threads() # Data size needs to be larger than last level of cache. We don't have a # way of getting cache sizes, so this number should give us a large enough # size. size = 10**8 // (4 * threads * vec_width) - a = nd.array(np.ones((threads, size, 4, vec_width), dtype="float32"), device=dev) - b = nd.array(np.ones((threads, vec_width, 4), dtype="float32"), device=dev) + a = nd.empty((threads, size, 4, vec_width), dtype="float32", device=dev) + random_fill(a) + b = nd.empty((threads, vec_width, 4), dtype="float32", device=dev) + random_fill(b) times = f.time_evaluator(f.entry_name, dev, repeat=10, number=1)(a, b, threads) return a.numpy().size * 4 / times.min # 4 bytes per float32 @@ -241,6 +295,7 @@ def roofline_from_existing( tir_functions: Dict[GlobalVar, tir.PrimFunc], target: Target, dev: Device, + remote: Optional[RPCSession] = None, ) -> profiling.Report: """Add roofline and other estimated statistics to an existing profiling report. @@ -290,6 +345,9 @@ def roofline_from_existing( TVM target that `report` was generated with. dev : Device Device that `report` was generated with. + remote : Optional[RPCSession] + Remote session used to upload artifacts for runtime evaluation. Must be + the same session used to create `dev`. Returns ------- @@ -299,8 +357,8 @@ def roofline_from_existing( :py:func:`roofline_analysis` for more information on which metrics are included. """ - peak_bandwidth = estimate_peak_bandwidth(target, dev) - peak_flops = estimate_peak_fma_flops(target, dev) + peak_bandwidth = estimate_peak_bandwidth(target, dev, remote=remote) + peak_flops = estimate_peak_fma_flops(target, dev, remote=remote) ridge_point = peak_flops / peak_bandwidth @@ -346,7 +404,11 @@ def roofline_from_existing( def roofline_analysis( - mod: IRModule, params: Dict[str, nd.NDArray], target: Union[str, Target], dev: Device + mod: IRModule, + params: Dict[str, nd.NDArray], + target: Union[str, Target], + dev: Device, + remote: Optional[RPCSession] = None, ) -> profiling.Report: """ Create a profiling report that contains roofline and other estimated @@ -385,6 +447,10 @@ def roofline_analysis( dev : Device Device to run on. + remote : Optional[RPCSession] + Remote session used to upload artifacts for runtime evaluation. Must be + the same session used to create `dev`. + Returns ------- @@ -405,9 +471,18 @@ def roofline_analysis( config=pass_ctx.config, ): lib = relay.vm.compile(mod, params=params, target=target) + # upload to remote if running over rpc + if dev.device_type >= RPC_SESS_MASK: + if remote is None: + raise RuntimeError("A RPCSession must be provided when using a remote device.") + temp = utils.tempdir() + path = temp.relpath("roofline_lib.tar") + lib.mod.export_library(path) + remote.upload(path) + lib = remote.load_module("roofline_lib.tar") vmexec = profiler_vm.VirtualMachineProfiler(lib, dev) - args = _create_args(mod, dev) + args = _create_args(mod, dev, remote=remote) report = vmexec.profile(*args) - return roofline_from_existing(report, save_tir.functions, target, dev) + return roofline_from_existing(report, save_tir.functions, target, dev, remote=remote) diff --git a/tests/python/unittest/test_runtime_profiling.py b/tests/python/unittest/test_runtime_profiling.py index a599c44e36ab..3cc79ab67a2a 100644 --- a/tests/python/unittest/test_runtime_profiling.py +++ b/tests/python/unittest/test_runtime_profiling.py @@ -267,10 +267,23 @@ def test_estimate_peak_fma_flops(target, dev): flops = tvm.utils.estimate_peak_fma_flops(tvm.target.Target(target), dev) # Assume we can achieve 1 GFLOP/s per thread, which is 1 FLOP per cycle on a 1GHz cpu. assert ( - flops > 10**9 * tvm.runtime.num_threads() and flops < 10**14 - ), f"FLOP/s should be between 10^9 * num_threads and 10^14, but it is {flops}" + flops > 10**9 and flops < 10**14 + ), f"FLOP/s should be between 10^9 and 10^14, but it is {flops}" +def test_estimate_peak_fma_flops_rpc(): + target = "llvm -mattr=+fma,+avx2" + server = rpc.Server(key="profiling") + remote = rpc.connect("127.0.0.1", server.port, key="profiling") + dev = remote.device(target) + flops = tvm.utils.estimate_peak_fma_flops(tvm.target.Target(target), dev, remote=remote) + # Assume we can achieve 1 GFLOP/s per thread, which is 1 FLOP per cycle on a 1GHz cpu. + assert ( + flops > 10**9 and flops < 10**14 + ), f"FLOP/s should be between 10^9 and 10^14, but it is {flops}" + + +@tvm.testing.skip_if_32bit(reason="Cannot allocate enough memory on i386") @tvm.testing.parametrize_targets("llvm") def test_estimate_peak_bandwidth(target, dev): # This test uses vectorized instructions so we need a target that supports them @@ -284,6 +297,20 @@ def test_estimate_peak_bandwidth(target, dev): ), f"Bandwidth should be between 10^9 and 10^12, but it is {bandwidth}" +@tvm.testing.skip_if_32bit(reason="Cannot allocate enough memory on i386") +def test_estimate_peak_bandwidth_rpc(): + target = "llvm -mattr=+fma,+avx2" + server = rpc.Server(key="profiling") + remote = rpc.connect("127.0.0.1", server.port, key="profiling") + dev = remote.device(target) + bandwidth = tvm.utils.estimate_peak_bandwidth(tvm.target.Target(target), dev, remote=remote) + # Assume we can achieve 1 GB/s. DDR2 should transfer somewhere around 6 + # GB/s, so this should leave enough wiggle room. + assert ( + bandwidth > 10**9 and bandwidth < 10**12 + ), f"Bandwidth should be between 10^9 and 10^12, but it is {bandwidth}" + + @tvm.testing.skip_if_32bit(reason="Cannot allocate enough memory on i386") @tvm.testing.parametrize_targets("llvm") def test_roofline_analysis(target, dev): @@ -304,6 +331,32 @@ def test_roofline_analysis(target, dev): assert call["Percent of Theoretical Optimal"].ratio >= 0 +@tvm.testing.skip_if_32bit(reason="Cannot allocate enough memory on i386") +def test_roofline_analysis_rpc(): + target = "llvm" + + a = relay.var("a", relay.TensorType((512, 512), "float32")) + b = relay.var("b", relay.TensorType((512, 512), "float32")) + c = relay.nn.dense(a, b) + mod = tvm.IRModule.from_expr(relay.Function([a, b], c)) + params = {} + + server = rpc.Server(key="profiling") + remote = rpc.connect("127.0.0.1", server.port, key="profiling") + dev = remote.device(target) + + report = tvm.utils.roofline_analysis(mod, params, target, dev, remote=remote) + + assert "Bound" in report.table() + assert "Percent of Theoretical Optimal" in report.table() + for call in report.calls: + if "Percent of Theoretical Optimal" in call: + # Ideally we'd like a little tighter bound here, but it is hard to + # know how well this dense will perform without tuning. And we + # don't have an operator that uses a specific number of flops. + assert call["Percent of Theoretical Optimal"].ratio >= 0 + + if __name__ == "__main__": import sys import pytest