Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ EXTRA_DIST = autogen.sh

include_HEADERS = include/gdsync.h
libgdsyncincludedir = $(includedir)/gdsync
libgdsyncinclude_HEADERS = include/gdsync/core.h include/gdsync/device.cuh include/gdsync/mlx5.h include/gdsync/tools.h
libgdsyncinclude_HEADERS = include/gdsync/core.h include/gdsync/device.cuh include/gdsync/mlx5.h include/gdsync/tools.h

src_libgdsync_la_CFLAGS = $(AM_CFLAGS)
src_libgdsync_la_SOURCES = src/gdsync.cpp src/memmgr.cpp src/mem.cpp src/objs.cpp src/apis.cpp src/mlx5.cpp include/gdsync.h
src_libgdsync_la_SOURCES = src/gdsync.cpp src/memmgr.cpp src/mem.cpp src/objs.cpp src/apis.cpp src/mlx5.cpp src/mlx5-exp.cpp include/gdsync.h
src_libgdsync_la_LDFLAGS = -version-info @VERSION_INFO@

noinst_HEADERS = src/mem.hpp src/memmgr.hpp src/objs.hpp src/rangeset.hpp src/utils.hpp src/archutils.h src/mlnxutils.h
noinst_HEADERS = src/mem.hpp src/memmgr.hpp src/objs.hpp src/rangeset.hpp src/utils.hpp src/archutils.h src/mlnxutils.h src/mlx5-exp.hpp

# if enabled at configure time

Expand All @@ -36,7 +36,7 @@ bin_PROGRAMS = tests/gds_kernel_latency tests/gds_poll_lat tests/gds_kernel_loop
noinst_PROGRAMS = tests/rstest tests/wqtest

tests_gds_kernel_latency_SOURCES = tests/gds_kernel_latency.c tests/gpu_kernels.cu tests/pingpong.c tests/gpu.cpp
tests_gds_kernel_latency_LDADD = $(top_builddir)/src/libgdsync.la -lmpi $(LIBGDSTOOLS) -lgdrapi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS)
tests_gds_kernel_latency_LDADD = $(top_builddir)/src/libgdsync.la $(MPILDFLAGS) $(LIBGDSTOOLS) -lgdrapi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS)

tests_rstest_SOURCES = tests/rstest.cpp
tests_rstest_LDADD =
Expand All @@ -45,10 +45,10 @@ tests_wqtest_SOURCES = tests/task_queue_test.cpp
tests_wqtest_LDADD = $(PTHREAD_LIBS)

tests_gds_poll_lat_SOURCES = tests/gds_poll_lat.c tests/gpu.cpp tests/gpu_kernels.cu
tests_gds_poll_lat_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi -lmpi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS)
tests_gds_poll_lat_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi $(MPILDFLAGS) $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS)

tests_gds_sanity_SOURCES = tests/gds_sanity.cpp tests/gpu.cpp tests/gpu_kernels.cu
tests_gds_sanity_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi -lmpi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS)
tests_gds_sanity_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi $(MPILDFLAGS) $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS)

tests_gds_kernel_loopback_latency_SOURCES = tests/gds_kernel_loopback_latency.c tests/pingpong.c tests/gpu.cpp tests/gpu_kernels.cu
tests_gds_kernel_loopback_latency_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS)
Expand Down
44 changes: 37 additions & 7 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -93,25 +93,54 @@ else
AC_SUBST(LIBGDSTOOLS)
fi

AC_ARG_WITH([mpi],
AC_HELP_STRING([--with-mpi], [ Set path to mpi installation ]))
if test x$with_mpi = x || test x$with_mpi = xno; then
AC_ARG_WITH([spectrum-mpi],
AC_HELP_STRING([--with-spectrum-mpi], [ Set path to Spectrum MPI installation ]))
if test x$with_spectrum_mpi = x || test x$with_spectrum_mpi = xno; then
# assuming system location
mpi_home=/usr
MPICC=$with_home/bin/mpicc
MPICXX=$with_home/bin/mpic++
MPICC=/bin/mpicc
MPICXX=/bin/mpic++
MPILDFLAGS="-lmpi_ibm"
else
if test -d $with_mpi; then
mpi_home=$with_mpi
if test -d $with_spectrum_mpi; then
mpi_home=$with_spectrum_mpi
MPICC=${mpi_home}/bin/mpicc
MPICXX=${mpi_home}/bin/mpic++
CPPFLAGS="$CPPFLAGS -I${mpi_home}/include"
LDFLAGS="$LDFLAGS -L${mpi_home}/lib -L${mpi_home}/lib64"
MPILDFLAGS="-lmpi_ibm"
else
echo "MPI dir does not exist"
fi
fi

AC_ARG_WITH([mpi],
AC_HELP_STRING([--with-mpi], [ Set path to MPI installation ]))
if test x$with_spectrum_mpi = x || test x$with_spectrum_mpi == xno; then
if test x$with_mpi = x || test x$with_mpi = xno; then
# assuming system location
mpi_home=/usr
MPICC=/bin/mpicc
MPICXX=/bin/mpic++
MPILDFLAGS="-lmpi"
else
if test -d $with_mpi; then
mpi_home=$with_mpi
MPICC=${mpi_home}/bin/mpicc
MPICXX=${mpi_home}/bin/mpic++
CPPFLAGS="$CPPFLAGS -I${mpi_home}/include"
LDFLAGS="$LDFLAGS -L${mpi_home}/lib -L${mpi_home}/lib64"
MPILDFLAGS="-lmpi"
else
echo "MPI dir does not exist"
fi
fi
fi

if test x$with_spectrum_mpi != x && test x$with_spectrum_mpi != xno && test x$with_mpi != x && test x$with_mpi != xno; then
AC_MSG_ERROR([--with-mpi and --with-spectrum-mpi are mutually exclusive.])
fi

dnl Specify CUDA Location
AC_ARG_WITH(cuda-toolkit,
AC_HELP_STRING([--with-cuda-toolkit=CUDATKDIR], [ Specify CUDA toolkit installation directory (default: /usr/local/cuda)]),
Expand Down Expand Up @@ -186,6 +215,7 @@ AC_MSG_NOTICE([Setting MPI_PATH = ${mpi_home} ])
AC_SUBST( MPI_PATH, [${mpi_home} ])
AC_SUBST( MPICC, [${MPICC} ])
AC_SUBST( MPICXX, [${MPICXX} ])
AC_SUBST( MPILDFLAGS, [${MPILDFLAGS} ])

CPPFLAGS="$CPPFLAGS -I$CUDA_DRV_PATH/include -I$CUDA_PATH/include"
LDFLAGS="$LDFLAGS -L$CUDA_DRV_PATH/lib64 -L$CUDA_DRV_PATH/lib -L$CUDA_PATH/lib64 -L$CUDA_PATH/lib"
Expand Down
47 changes: 29 additions & 18 deletions include/gdsync/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,43 @@
((((v) & 0x0000ffffU) >> 0 ) >= (unsigned)GDS_API_MINOR_VERSION) )

typedef enum gds_param {
GDS_PARAM_VERSION,
GDS_NUM_PARAMS
GDS_PARAM_VERSION,
GDS_NUM_PARAMS
} gds_param_t;

int gds_query_param(gds_param_t param, int *value);

enum gds_create_qp_flags {
GDS_CREATE_QP_DEFAULT = 0,
GDS_CREATE_QP_WQ_ON_GPU = 1<<0,
GDS_CREATE_QP_TX_CQ_ON_GPU = 1<<1,
GDS_CREATE_QP_RX_CQ_ON_GPU = 1<<2,
GDS_CREATE_QP_WQ_DBREC_ON_GPU = 1<<5,
GDS_CREATE_QP_DEFAULT = 0,
GDS_CREATE_QP_WQ_ON_GPU = 1<<0,
GDS_CREATE_QP_TX_CQ_ON_GPU = 1<<1,
GDS_CREATE_QP_RX_CQ_ON_GPU = 1<<2,
GDS_CREATE_QP_WQ_DBREC_ON_GPU = 1<<5,
};

typedef struct ibv_exp_qp_init_attr gds_qp_init_attr_t;
typedef struct ibv_exp_send_wr gds_send_wr;
typedef struct ibv_qp_init_attr gds_qp_init_attr_t;
typedef struct ibv_send_wr gds_send_wr;

struct gds_cq {
typedef enum gds_driver_type {
GDS_DRIVER_TYPE_UNSUPPORTED = 0,
GDS_DRIVER_TYPE_MLX5_EXP,
GDS_DRIVER_TYPE_MLX5_DV,
GDS_DRIVER_TYPE_MLX5_DEVX
} gds_driver_type_t;

typedef struct gds_cq {
struct ibv_cq *cq;
uint32_t curr_offset;
};
gds_driver_type_t dtype;
} gds_cq_t;

struct gds_qp {
typedef struct gds_qp {
struct ibv_qp *qp;
struct gds_cq send_cq;
struct gds_cq recv_cq;
struct ibv_exp_res_domain * res_domain;
struct gds_cq *send_cq;
struct gds_cq *recv_cq;
struct ibv_context *dev_context;
};
gds_driver_type_t dtype;
} gds_qp_t;

/* \brief: Create a peer-enabled QP attached to the specified GPU id.
*
Expand Down Expand Up @@ -167,8 +175,11 @@ int gds_stream_post_send_all(CUstream stream, int count, gds_send_request_t *req
*/

typedef struct gds_wait_request {
struct ibv_exp_peer_peek peek;
struct peer_op_wr wr[GDS_WAIT_INFO_MAX_OPS];
gds_driver_type_t dtype;
uint8_t pad0[4];
uint8_t reserved0[40];
uint8_t reserved1[56 * GDS_WAIT_INFO_MAX_OPS];
uint8_t pad1[16];
} gds_wait_request_t;

/**
Expand Down
122 changes: 45 additions & 77 deletions src/apis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@
#include "utils.hpp"
#include "archutils.h"
#include "mlnxutils.h"
#include "mlx5-exp.hpp"


//-----------------------------------------------------------------------------

static void gds_init_ops(struct peer_op_wr *op, int count)
void gds_init_ops(struct peer_op_wr *op, int count)
{
int i = count;
while (--i)
Expand All @@ -79,13 +80,15 @@ static void gds_init_send_info(gds_send_request_t *info)

static void gds_init_wait_request(gds_wait_request_t *request, uint32_t offset)
{
gds_mlx5_exp_wait_request_t *gmexp_request;
gds_dbg("wait_request=%p offset=%08x\n", request, offset);
memset(request, 0, sizeof(*request));
request->peek.storage = request->wr;
request->peek.entries = sizeof(request->wr)/sizeof(request->wr[0]);
request->peek.whence = IBV_EXP_PEER_PEEK_ABSOLUTE;
request->peek.offset = offset;
gds_init_ops(request->peek.storage, request->peek.entries);

request->dtype = GDS_DRIVER_TYPE_MLX5_EXP;

gmexp_request = to_gds_mexp_wait_request(request);

gds_mlx5_exp_init_wait_request(gmexp_request, offset);
}

//-----------------------------------------------------------------------------
Expand Down Expand Up @@ -171,33 +174,24 @@ int gds_post_recv(struct gds_qp *qp, struct ibv_recv_wr *wr, struct ibv_recv_wr

//-----------------------------------------------------------------------------

int gds_prepare_send(struct gds_qp *qp, gds_send_wr *p_ewr,
int gds_prepare_send(struct gds_qp *gqp, gds_send_wr *p_ewr,
gds_send_wr **bad_ewr,
gds_send_request_t *request)
{
int ret = 0;
gds_mlx5_exp_qp_t *gmexpqp;

gds_init_send_info(request);
assert(qp);
assert(qp->qp);
ret = ibv_exp_post_send(qp->qp, p_ewr, bad_ewr);
if (ret) {
assert(gqp);
assert(gqp->qp);
assert(gqp->dtype == GDS_DRIVER_TYPE_MLX5_EXP);

gmexpqp = to_gds_mexp_qp(gqp);

ret = gds_mlx5_exp_prepare_send(gmexpqp, p_ewr, bad_ewr, request);
if (ret)
gds_err("Error %d in gds_mlx5_exp_prepare_send.\n", ret);

if (ret == ENOMEM) {
// out of space error can happen too often to report
gds_dbg("ENOMEM error %d in ibv_exp_post_send\n", ret);
} else {
gds_err("error %d in ibv_exp_post_send\n", ret);
}
goto out;
}

ret = ibv_exp_peer_commit_qp(qp->qp, &request->commit);
if (ret) {
gds_err("error %d in ibv_exp_peer_commit_qp\n", ret);
//gds_wait_kernel();
goto out;
}
out:
return ret;
}

Expand Down Expand Up @@ -281,59 +275,29 @@ int gds_stream_post_send_all(CUstream stream, int count, gds_send_request_t *req

int gds_prepare_wait_cq(struct gds_cq *cq, gds_wait_request_t *request, int flags)
{
int retcode = 0;
gds_mlx5_exp_cq_t *gmexpcq;
gds_mlx5_exp_wait_request_t *gmexp_request;

if (flags != 0) {
gds_err("invalid flags != 0\n");
return EINVAL;
}

gds_init_wait_request(request, cq->curr_offset++);

retcode = ibv_exp_peer_peek_cq(cq->cq, &request->peek);
if (retcode == -ENOSPC) {
// TODO: handle too few entries
gds_err("not enough ops in peer_peek_cq\n");
goto out;
} else if (retcode) {
gds_err("error %d in peer_peek_cq\n", retcode);
goto out;
}
//gds_dump_wait_request(request, 1);
out:
return retcode;
gmexpcq = to_gds_mexp_cq(cq);
gmexp_request = to_gds_mexp_wait_request(request);

return gds_mlx5_exp_prepare_wait_cq(gmexpcq, gmexp_request, flags);
}

//-----------------------------------------------------------------------------

int gds_append_wait_cq(gds_wait_request_t *request, uint32_t *dw, uint32_t val)
{
int ret = 0;
unsigned MAX_NUM_ENTRIES = sizeof(request->wr)/sizeof(request->wr[0]);
unsigned n = request->peek.entries;
struct peer_op_wr *wr = request->peek.storage;

if (n + 1 > MAX_NUM_ENTRIES) {
gds_err("no space left to stuff a poke\n");
ret = ENOMEM;
goto out;
}

// at least 1 op
assert(n);
assert(wr);

for (; n; --n) wr = wr->next;
assert(wr);
gds_mlx5_exp_wait_request_t *gmexp_request = to_gds_mexp_wait_request(request);

wr->type = IBV_EXP_PEER_OP_STORE_DWORD;
wr->wr.dword_va.data = val;
wr->wr.dword_va.target_id = 0; // direct mapping, offset IS the address
wr->wr.dword_va.offset = (ptrdiff_t)(dw-(uint32_t*)0);

++request->peek.entries;

out:
return ret;
return gds_mlx5_exp_append_wait_cq(gmexp_request, dw, val);
}

//-----------------------------------------------------------------------------
Expand All @@ -354,12 +318,16 @@ int gds_stream_post_wait_cq_all(CUstream stream, int count, gds_wait_request_t *

static int gds_abort_wait_cq(struct gds_cq *cq, gds_wait_request_t *request)
{
gds_mlx5_exp_cq_t *gmexpcq;
gds_mlx5_exp_wait_request_t *gmexp_request;

assert(cq);
assert(request);
struct ibv_exp_peer_abort_peek abort_ctx;
abort_ctx.peek_id = request->peek.peek_id;
abort_ctx.comp_mask = 0;
return ibv_exp_peer_abort_peek_cq(cq->cq, &abort_ctx);

gmexpcq = to_gds_mexp_cq(cq);
gmexp_request = to_gds_mexp_wait_request(request);

return gds_mlx5_exp_abort_wait_cq(gmexpcq, gmexp_request);
}

//-----------------------------------------------------------------------------
Expand Down Expand Up @@ -557,7 +525,7 @@ static int calc_n_mem_ops(size_t n_descs, gds_descriptor_t *descs, size_t &n_mem
n_mem_ops += desc->send->commit.entries + 2; // extra space, ugly
break;
case GDS_TAG_WAIT:
n_mem_ops += desc->wait->peek.entries + 2; // ditto
n_mem_ops += gds_mlx5_exp_get_num_wait_request_entries(to_gds_mexp_wait_request(desc->wait)) + 2; // ditto
break;
case GDS_TAG_WAIT_VALUE32:
case GDS_TAG_WRITE_VALUE32:
Expand Down Expand Up @@ -626,15 +594,15 @@ int gds_stream_post_descriptors(CUstream stream, size_t n_descs, gds_descriptor_
break;
}
case GDS_TAG_WAIT: {
gds_wait_request_t *wreq = desc->wait;
gds_mlx5_exp_wait_request_t *wreq = to_gds_mexp_wait_request(desc->wait);
int flags = 0;
if (move_flush && i != last_wait) {
gds_dbg("discarding FLUSH!\n");
flags = GDS_POST_OPS_DISCARD_WAIT_FLUSH;
}
retcode = gds_post_ops(peer, wreq->peek.entries, wreq->peek.storage, params, flags);
retcode = gds_mlx5_exp_stream_post_wait_descriptor(peer, wreq, params, flags);
if (retcode) {
gds_err("error %d in gds_post_ops\n", retcode);
gds_err("error %d in gds_mlx5_exp_stream_post_wait_descriptor\n", retcode);
ret = retcode;
goto out;
}
Expand Down Expand Up @@ -705,10 +673,10 @@ int gds_post_descriptors(size_t n_descs, gds_descriptor_t *descs, int flags)
}
case GDS_TAG_WAIT: {
gds_dbg("desc[%zu] WAIT\n", i);
gds_wait_request_t *wreq = desc->wait;
retcode = gds_post_ops_on_cpu(wreq->peek.entries, wreq->peek.storage, flags);
gds_mlx5_exp_wait_request_t *wreq = to_gds_mexp_wait_request(desc->wait);
retcode = gds_mlx5_exp_post_wait_descriptor(wreq, flags);
if (retcode) {
gds_err("error %d in gds_post_ops_on_cpu\n", retcode);
gds_err("error %d in gds_mlx5_exp_post_wait_descriptor\n", retcode);
ret = retcode;
goto out;
}
Expand Down
Loading