Skip to content

Benchmark rambo #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 227 additions & 0 deletions examples/rambo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
"""

Examples:
python rambo.py -nevts 10 -nout 10 -b sharpy -i 10000
mpiexec -n 3 python rambo.py -nevts 64 -nout 64 -b sharpy -i 100

"""

import argparse
import time as time_mod

import numpy

import sharpy

try:
import mpi4py

mpi4py.rc.finalize = False
from mpi4py import MPI

comm_rank = MPI.COMM_WORLD.Get_rank()
comm = MPI.COMM_WORLD
except ImportError:
comm_rank = 0
comm = None


def info(s):
if comm_rank == 0:
print(s)


def naive_erf(x):
"""
Error function (erf) implementation

Adapted from formula 7.1.26 in
Abramowitz and Stegun, "Handbook of Mathematical Functions", 1965.
"""
y = numpy.abs(x)

a1 = 0.254829592
a2 = -0.284496736
a3 = 1.421413741
a4 = -1.453152027
a5 = 1.061405429
p = 0.3275911

t = 1.0 / (1.0 + p * y)
f = (((((a5 * t + a4) * t) + a3) * t + a2) * t + a1) * t
return numpy.sign(x) * (1.0 - f * numpy.exp(-y * y))


def sp_rambo(sp, sp_C1, sp_F1, sp_Q1, sp_output, nevts, nout):
sp_C = 2.0 * sp_C1 - 1.0
sp_S = sp.sqrt(1 - sp.square(sp_C))
sp_F = 2.0 * sp.pi * sp_F1
sp_Q = -sp.log(sp_Q1)

sp_output[:, :, 0] = sp.reshape(sp_Q, (nevts, nout, 1))
sp_output[:, :, 1] = sp.reshape(
sp_Q * sp_S * sp.sin(sp_F), (nevts, nout, 1)
)
sp_output[:, :, 2] = sp.reshape(
sp_Q * sp_S * sp.cos(sp_F), (nevts, nout, 1)
)
sp_output[:, :, 3] = sp.reshape(sp_Q * sp_C, (nevts, nout, 1))

sharpy.sync()


def np_rambo(np, C1, F1, Q1, output, nevts, nout):
C = 2.0 * C1 - 1.0
S = np.sqrt(1 - np.square(C))
F = 2.0 * np.pi * F1
Q = -np.log(Q1)

output[:, :, 0] = Q
output[:, :, 1] = Q * S * np.sin(F)
output[:, :, 2] = Q * S * np.cos(F)
output[:, :, 3] = Q * C


def initialize(np, nevts, nout, seed, dtype):
np.random.seed(seed)
C1 = np.random.rand(nevts, nout)
F1 = np.random.rand(nevts, nout)
Q1 = np.random.rand(nevts, nout) * np.random.rand(nevts, nout)
return (C1, F1, Q1, np.zeros((nevts, nout, 4), dtype))


def run(nevts, nout, backend, iterations, datatype):
if backend == "sharpy":
import sharpy as np
from sharpy import fini, init, sync

rambo = sp_rambo

init(False)
elif backend == "numpy":
import numpy as np

if comm is not None:
assert (
comm.Get_size() == 1
), "Numpy backend only supports serial execution."

fini = sync = lambda x=None: None
rambo = np_rambo
else:
raise ValueError(f'Unknown backend: "{backend}"')

dtype = {
"f32": np.float32,
"f64": np.float64,
}[datatype]

info(f"Using backend: {backend}")
info(f"Number of events: {nevts}")
info(f"Number of outputs: {nout}")
info(f"Datatype: {datatype}")

seed = 7777
C1, F1, Q1, output = initialize(np, nevts, nout, seed, dtype)
sync()

# verify
if backend == "sharpy":
sp_rambo(sharpy, C1, F1, Q1, output, nevts, nout)
# sync() !! not work here?
np_C1 = sharpy.to_numpy(C1)
np_F1 = sharpy.to_numpy(F1)
np_Q1 = sharpy.to_numpy(Q1)
np_output = numpy.zeros((nevts, nout, 4))
np_rambo(numpy, np_C1, np_F1, np_Q1, np_output, nevts, nout)
assert numpy.allclose(sharpy.to_numpy(output), np_output)

def eval():
tic = time_mod.perf_counter()
rambo(np, C1, F1, Q1, output, nevts, nout)
toc = time_mod.perf_counter()
return toc - tic

# warm-up run
t_warm = eval()

# evaluate
info(f"Running {iterations} iterations")
time_list = []
for i in range(iterations):
time_list.append(eval())

# get max time over mpi ranks
if comm is not None:
t_warm = comm.allreduce(t_warm, MPI.MAX)
time_list = comm.allreduce(time_list, MPI.MAX)

t_min = numpy.min(time_list)
t_max = numpy.max(time_list)
t_med = numpy.median(time_list)
# perf_rate = nopt / t_med / 1e6 # million options per second
init_overhead = t_warm - t_med
if backend == "sharpy":
info(f"Estimated initialization overhead: {init_overhead:.5f} s")
info(f"Min. duration: {t_min:.5f} s")
info(f"Max. duration: {t_max:.5f} s")
info(f"Median duration: {t_med:.5f} s")
# info(f"Median rate: {perf_rate:.5f} Mopts/s")

fini()


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run rambo benchmark",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

parser.add_argument(
"-nevts",
"--num_events",
type=int,
default=10,
help="Number of events to evaluate.",
)
parser.add_argument(
"-nout",
"--num_outputs",
type=int,
default=10,
help="Number of outputs to evaluate.",
)

parser.add_argument(
"-b",
"--backend",
type=str,
default="sharpy",
choices=["sharpy", "numpy"],
help="Backend to use.",
)

parser.add_argument(
"-i",
"--iterations",
type=int,
default=10,
help="Number of iterations to run.",
)
parser.add_argument(
"-d",
"--datatype",
type=str,
default="f64",
choices=["f32", "f64"],
help="Datatype for model state variables",
)
args = parser.parse_args()
nevts, nout = args.num_events, args.num_outputs
run(
nevts,
nout,
args.backend,
args.iterations,
args.datatype,
)
20 changes: 16 additions & 4 deletions sharpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@
from ._sharpy import sync
from .ndarray import ndarray


# Lazy load submodules
def __getattr__(name):
if name == "random":
import sharpy.random as random

return random
elif name == "numpy":
import sharpy.numpy as numpy

return numpy

if "_fallback" in globals():
return _fallback(name)


_sharpy_cw = bool(int(getenv("SHARPY_CW", False)))

pi = 3.1415926535897932384626433
Expand Down Expand Up @@ -185,7 +201,3 @@ def __getattr__(self, name):
dt.linalg.norm(...)
"""
return _fallback(name, self._func)

def __getattr__(name):
"Attempt to find a fallback in fallback-lib"
return _fallback(name)
11 changes: 0 additions & 11 deletions sharpy/random.py

This file was deleted.

37 changes: 37 additions & 0 deletions sharpy/random/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import numpy as np

import sharpy as sp
from sharpy import float64
from sharpy.numpy import fromfunction


def uniform(low, high, size, device="", team=1):
data = np.random.uniform(low, high, size)
if len(data.shape) == 0:
sp_data = sp.empty(())
sp_data[()] = data[()]
return sp_data
return fromfunction(
lambda *index: data[index],
data.shape,
dtype=float64,
device=device,
team=team,
)


def rand(*shape, device="", team=1):
data = np.random.rand(*shape)
if isinstance(data, float):
return data
return fromfunction(
lambda *index: data[index],
data.shape,
dtype=float64,
device=device,
team=team,
)


def seed(s):
np.random.seed(s)
5 changes: 4 additions & 1 deletion src/Service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ struct DeferredService : public DeferredT<Service::service_promise_type,
// drop from dep manager
dm.drop(_a);
// and from registry
Registry::del(_a);
dm.addReady(_a, [this](id_type guid) {
assert(this->_a == guid);
Registry::del(guid);
});
break;
}
case RUN:
Expand Down
45 changes: 40 additions & 5 deletions src/idtr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,21 @@ void unpack(void *in, SHARPY::DTypeId dtype, const int64_t *sizes,
});
}

/// copy contiguous block of data into a possibly strided array
void unpack1(void *in, SHARPY::DTypeId dtype, const int64_t *sizes,
const int64_t *strides, uint64_t ndim, void *out) {
if (!in || !sizes || !strides || !out) {
return;
}
dispatch(dtype, out, [sizes, strides, ndim, in](auto *out_) {
auto in_ = static_cast<decltype(out_)>(in);
SHARPY::forall(0, out_, sizes, strides, ndim, [&in_](auto *out) {
*out = *in_;
++in_;
});
});
}

template <typename T>
void copy_(uint64_t d, uint64_t &pos, T *cptr, const int64_t *sizes,
const int64_t *strides, const uint64_t *chunks, uint64_t nd,
Expand Down Expand Up @@ -489,21 +504,41 @@ WaitHandleBase *_idtr_copy_reshape(SHARPY::DTypeId sharpytype,
}
}

int64_t oStride = std::accumulate(oDataStridesPtr, oDataStridesPtr + oNDims,
1, std::multiplies<int64_t>());
void *rBuff = oDataPtr;
if (oStride != 1) {
rBuff = new char[sizeof_dtype(sharpytype) * myOSz];
}

SHARPY::Buffer sendbuff(totSSz * sizeof_dtype(sharpytype), 2);
bufferizeN(iNDims, iDataPtr, iDataShapePtr, iDataStridesPtr, sharpytype, N,
lsOffs.data(), lsEnds.data(), sendbuff.data());
auto hdl = tc->alltoall(sendbuff.data(), sszs.data(), soffs.data(),
sharpytype, oDataPtr, rszs.data(), roffs.data());
sharpytype, rBuff, rszs.data(), roffs.data());

if (no_async) {
tc->wait(hdl);
if (oStride != 1) {
unpack1(rBuff, sharpytype, oDataShapePtr, oDataStridesPtr, oNDims,
oDataPtr);
delete[](char *) rBuff;
}
return nullptr;
}

auto wait = [tc = tc, hdl = hdl, sendbuff = std::move(sendbuff),
sszs = std::move(sszs), soffs = std::move(soffs),
rszs = std::move(rszs),
roffs = std::move(roffs)]() { tc->wait(hdl); };
auto wait = [tc, hdl, oStride, rBuff, sharpytype, oDataShapePtr,
oDataStridesPtr, oNDims, oDataPtr,
sendbuff = std::move(sendbuff), sszs = std::move(sszs),
soffs = std::move(soffs), rszs = std::move(rszs),
roffs = std::move(roffs)]() {
tc->wait(hdl);
if (oStride != 1) {
unpack1(rBuff, sharpytype, oDataShapePtr, oDataStridesPtr, oNDims,
oDataPtr);
delete[](char *) rBuff;
}
};
assert(sendbuff.empty() && sszs.empty() && soffs.empty() && rszs.empty() &&
roffs.empty());
return mkWaitHandle(std::move(wait));
Expand Down
Loading
Loading