From ce78eb62a3684355829e159943235fc2c045b3ca Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Wed, 4 Aug 2021 01:20:19 -0400 Subject: [PATCH 01/19] Added --with-spectrum-mpi to configure --- Makefile.am | 6 +++--- configure.ac | 44 +++++++++++++++++++++++++++++++++++++------- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/Makefile.am b/Makefile.am index a66dfda..ac9a0cc 100644 --- a/Makefile.am +++ b/Makefile.am @@ -36,7 +36,7 @@ bin_PROGRAMS = tests/gds_kernel_latency tests/gds_poll_lat tests/gds_kernel_loop noinst_PROGRAMS = tests/rstest tests/wqtest tests_gds_kernel_latency_SOURCES = tests/gds_kernel_latency.c tests/gpu_kernels.cu tests/pingpong.c tests/gpu.cpp -tests_gds_kernel_latency_LDADD = $(top_builddir)/src/libgdsync.la -lmpi $(LIBGDSTOOLS) -lgdrapi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS) +tests_gds_kernel_latency_LDADD = $(top_builddir)/src/libgdsync.la $(MPILDFLAGS) $(LIBGDSTOOLS) -lgdrapi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS) tests_rstest_SOURCES = tests/rstest.cpp tests_rstest_LDADD = @@ -45,10 +45,10 @@ tests_wqtest_SOURCES = tests/task_queue_test.cpp tests_wqtest_LDADD = $(PTHREAD_LIBS) tests_gds_poll_lat_SOURCES = tests/gds_poll_lat.c tests/gpu.cpp tests/gpu_kernels.cu -tests_gds_poll_lat_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi -lmpi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS) +tests_gds_poll_lat_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi $(MPILDFLAGS) $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS) tests_gds_sanity_SOURCES = tests/gds_sanity.cpp tests/gpu.cpp tests/gpu_kernels.cu -tests_gds_sanity_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi -lmpi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS) +tests_gds_sanity_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi $(MPILDFLAGS) $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS) tests_gds_kernel_loopback_latency_SOURCES = tests/gds_kernel_loopback_latency.c tests/pingpong.c tests/gpu.cpp tests/gpu_kernels.cu tests_gds_kernel_loopback_latency_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS) diff --git a/configure.ac b/configure.ac index a79aed6..e20f313 100644 --- a/configure.ac +++ b/configure.ac @@ -93,25 +93,54 @@ else AC_SUBST(LIBGDSTOOLS) fi -AC_ARG_WITH([mpi], - AC_HELP_STRING([--with-mpi], [ Set path to mpi installation ])) -if test x$with_mpi = x || test x$with_mpi = xno; then +AC_ARG_WITH([spectrum-mpi], + AC_HELP_STRING([--with-spectrum-mpi], [ Set path to Spectrum MPI installation ])) +if test x$with_spectrum_mpi = x || test x$with_spectrum_mpi = xno; then # assuming system location mpi_home=/usr - MPICC=$with_home/bin/mpicc - MPICXX=$with_home/bin/mpic++ + MPICC=/bin/mpicc + MPICXX=/bin/mpic++ + MPILDFLAGS="-lmpi_ibm" else - if test -d $with_mpi; then - mpi_home=$with_mpi + if test -d $with_spectrum_mpi; then + mpi_home=$with_spectrum_mpi MPICC=${mpi_home}/bin/mpicc MPICXX=${mpi_home}/bin/mpic++ CPPFLAGS="$CPPFLAGS -I${mpi_home}/include" LDFLAGS="$LDFLAGS -L${mpi_home}/lib -L${mpi_home}/lib64" + MPILDFLAGS="-lmpi_ibm" else echo "MPI dir does not exist" fi fi +AC_ARG_WITH([mpi], + AC_HELP_STRING([--with-mpi], [ Set path to MPI installation ])) +if test x$with_spectrum_mpi = x || test x$with_spectrum_mpi == xno; then + if test x$with_mpi = x || test x$with_mpi = xno; then + # assuming system location + mpi_home=/usr + MPICC=/bin/mpicc + MPICXX=/bin/mpic++ + MPILDFLAGS="-lmpi" + else + if test -d $with_mpi; then + mpi_home=$with_mpi + MPICC=${mpi_home}/bin/mpicc + MPICXX=${mpi_home}/bin/mpic++ + CPPFLAGS="$CPPFLAGS -I${mpi_home}/include" + LDFLAGS="$LDFLAGS -L${mpi_home}/lib -L${mpi_home}/lib64" + MPILDFLAGS="-lmpi" + else + echo "MPI dir does not exist" + fi + fi +fi + +if test x$with_spectrum_mpi != x && test x$with_spectrum_mpi != xno && test x$with_mpi != x && test x$with_mpi != xno; then + AC_MSG_ERROR([--with-mpi and --with-spectrum-mpi are mutually exclusive.]) +fi + dnl Specify CUDA Location AC_ARG_WITH(cuda-toolkit, AC_HELP_STRING([--with-cuda-toolkit=CUDATKDIR], [ Specify CUDA toolkit installation directory (default: /usr/local/cuda)]), @@ -186,6 +215,7 @@ AC_MSG_NOTICE([Setting MPI_PATH = ${mpi_home} ]) AC_SUBST( MPI_PATH, [${mpi_home} ]) AC_SUBST( MPICC, [${MPICC} ]) AC_SUBST( MPICXX, [${MPICXX} ]) +AC_SUBST( MPILDFLAGS, [${MPILDFLAGS} ]) CPPFLAGS="$CPPFLAGS -I$CUDA_DRV_PATH/include -I$CUDA_PATH/include" LDFLAGS="$LDFLAGS -L$CUDA_DRV_PATH/lib64 -L$CUDA_DRV_PATH/lib -L$CUDA_PATH/lib64 -L$CUDA_PATH/lib" From 66409ac64785c884889e3d85df10a3a34bc2e8f9 Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Wed, 4 Aug 2021 21:42:55 -0400 Subject: [PATCH 02/19] Introduced GDS_DRIVER_TYPE to gds_qp and gds_cq --- include/gdsync/core.h | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/include/gdsync/core.h b/include/gdsync/core.h index 7ff0cbb..87a4cc3 100644 --- a/include/gdsync/core.h +++ b/include/gdsync/core.h @@ -40,26 +40,33 @@ ((((v) & 0x0000ffffU) >> 0 ) >= (unsigned)GDS_API_MINOR_VERSION) ) typedef enum gds_param { - GDS_PARAM_VERSION, - GDS_NUM_PARAMS + GDS_PARAM_VERSION, + GDS_NUM_PARAMS } gds_param_t; int gds_query_param(gds_param_t param, int *value); enum gds_create_qp_flags { - GDS_CREATE_QP_DEFAULT = 0, - GDS_CREATE_QP_WQ_ON_GPU = 1<<0, - GDS_CREATE_QP_TX_CQ_ON_GPU = 1<<1, - GDS_CREATE_QP_RX_CQ_ON_GPU = 1<<2, - GDS_CREATE_QP_WQ_DBREC_ON_GPU = 1<<5, + GDS_CREATE_QP_DEFAULT = 0, + GDS_CREATE_QP_WQ_ON_GPU = 1<<0, + GDS_CREATE_QP_TX_CQ_ON_GPU = 1<<1, + GDS_CREATE_QP_RX_CQ_ON_GPU = 1<<2, + GDS_CREATE_QP_WQ_DBREC_ON_GPU = 1<<5, }; typedef struct ibv_exp_qp_init_attr gds_qp_init_attr_t; typedef struct ibv_exp_send_wr gds_send_wr; +typedef enum gds_driver_type { + GDS_DRIVER_TYPE_MLX5_EXP = 0, + GDS_DRIVER_TYPE_MLX5_DV, + GDS_DRIVER_TYPE_MLX5_DEVX +} gds_driver_type_t; + struct gds_cq { struct ibv_cq *cq; uint32_t curr_offset; + gds_driver_type_t dtype; }; struct gds_qp { @@ -68,6 +75,7 @@ struct gds_qp { struct gds_cq recv_cq; struct ibv_exp_res_domain * res_domain; struct ibv_context *dev_context; + gds_driver_type_t dtype; }; /* \brief: Create a peer-enabled QP attached to the specified GPU id. From fdd2270a1a2736657c6385e75d0991b6146a6829 Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Wed, 4 Aug 2021 21:43:15 -0400 Subject: [PATCH 03/19] Set gds_qp and gds_cq dtype to MLX5_EXP in the creation functions --- src/gdsync.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/gdsync.cpp b/src/gdsync.cpp index 90d5508..e863ec0 100644 --- a/src/gdsync.cpp +++ b/src/gdsync.cpp @@ -1754,6 +1754,8 @@ gds_create_cq_internal(struct ibv_context *context, int cqe, return NULL; } + gcq->dtype = GDS_DRIVER_TYPE_MLX5_EXP; + return gcq; } @@ -1893,6 +1895,7 @@ struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context, gqp->send_cq.curr_offset = 0; gqp->recv_cq.cq = qp->recv_cq; gqp->recv_cq.curr_offset = 0; + gqp->dtype = GDS_DRIVER_TYPE_MLX5_EXP; gds_dbg("created gds_qp=%p\n", gqp); From 0073f813b84bf27579ba5d6868808fc349baef4d Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Wed, 4 Aug 2021 22:34:26 -0400 Subject: [PATCH 04/19] Changed the definition of gds_qp_init_attr_t and gds_send_wr. Also, removed res_domain from the gds_qp struct --- include/gdsync/core.h | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/include/gdsync/core.h b/include/gdsync/core.h index 87a4cc3..4b61e86 100644 --- a/include/gdsync/core.h +++ b/include/gdsync/core.h @@ -54,8 +54,8 @@ enum gds_create_qp_flags { GDS_CREATE_QP_WQ_DBREC_ON_GPU = 1<<5, }; -typedef struct ibv_exp_qp_init_attr gds_qp_init_attr_t; -typedef struct ibv_exp_send_wr gds_send_wr; +typedef struct ibv_qp_init_attr gds_qp_init_attr_t; +typedef struct ibv_send_wr gds_send_wr; typedef enum gds_driver_type { GDS_DRIVER_TYPE_MLX5_EXP = 0, @@ -73,7 +73,6 @@ struct gds_qp { struct ibv_qp *qp; struct gds_cq send_cq; struct gds_cq recv_cq; - struct ibv_exp_res_domain * res_domain; struct ibv_context *dev_context; gds_driver_type_t dtype; }; From 78eadbb4c899d66c8073f4e8e26ed79ed6a15d9a Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Thu, 5 Aug 2021 01:26:25 -0400 Subject: [PATCH 05/19] Implemented gds_get_driver_type --- include/gdsync/core.h | 7 ++++--- src/utils.hpp | 16 ++++++++++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/include/gdsync/core.h b/include/gdsync/core.h index 4b61e86..c438594 100644 --- a/include/gdsync/core.h +++ b/include/gdsync/core.h @@ -58,7 +58,8 @@ typedef struct ibv_qp_init_attr gds_qp_init_attr_t; typedef struct ibv_send_wr gds_send_wr; typedef enum gds_driver_type { - GDS_DRIVER_TYPE_MLX5_EXP = 0, + GDS_DRIVER_TYPE_UNSUPPORTED = 0, + GDS_DRIVER_TYPE_MLX5_EXP, GDS_DRIVER_TYPE_MLX5_DV, GDS_DRIVER_TYPE_MLX5_DEVX } gds_driver_type_t; @@ -71,8 +72,8 @@ struct gds_cq { struct gds_qp { struct ibv_qp *qp; - struct gds_cq send_cq; - struct gds_cq recv_cq; + struct gds_cq *send_cq; + struct gds_cq *recv_cq; struct ibv_context *dev_context; gds_driver_type_t dtype; }; diff --git a/src/utils.hpp b/src/utils.hpp index b501bda..de2aaa8 100644 --- a/src/utils.hpp +++ b/src/utils.hpp @@ -222,6 +222,22 @@ gds_peer *peer_from_stream(CUstream stream); //----------------------------------------------------------------------------- +/* \brief: Get the underlying driver associated with the ibdev. + * + */ +static inline gds_driver_type gds_get_driver_type(struct ibv_device *ibdev) +{ + const char *dev_name = ibv_get_device_name(ibdev); + + // Heuristically guess the driver by the device name. + // Until we find a better way to do so... + if (strstr(dev_name, "mlx5") != NULL) + return GDS_DRIVER_TYPE_MLX5_EXP; + return GDS_DRIVER_TYPE_UNSUPPORTED; +} + +//----------------------------------------------------------------------------- + /* * Local variables: * c-indent-level: 8 From aade2abf15285314c88841a9a3b7c8d0948f0afc Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Thu, 5 Aug 2021 01:27:08 -0400 Subject: [PATCH 06/19] Initial implementation of mlx5-exp.cpp/hpp and moved create/destroy qp/cq functions to mlx5-exp --- src/gdsync.cpp | 125 ------------------------ src/mlx5-exp.cpp | 245 +++++++++++++++++++++++++++++++++++++++++++++++ src/mlx5-exp.hpp | 20 ++++ 3 files changed, 265 insertions(+), 125 deletions(-) create mode 100644 src/mlx5-exp.cpp create mode 100644 src/mlx5-exp.hpp diff --git a/src/gdsync.cpp b/src/gdsync.cpp index e863ec0..5a14c51 100644 --- a/src/gdsync.cpp +++ b/src/gdsync.cpp @@ -1678,131 +1678,6 @@ gds_peer *peer_from_stream(CUstream stream) //----------------------------------------------------------------------------- -static ibv_exp_res_domain *gds_create_res_domain(struct ibv_context *context) -{ - if (!context) { - gds_err("invalid context"); - return NULL; - } - - ibv_exp_res_domain_init_attr res_domain_attr; - memset(&res_domain_attr, 0, sizeof(res_domain_attr)); - - res_domain_attr.comp_mask |= IBV_EXP_RES_DOMAIN_THREAD_MODEL; - res_domain_attr.thread_model = IBV_EXP_THREAD_SINGLE; - - ibv_exp_res_domain *res_domain = ibv_exp_create_res_domain(context, &res_domain_attr); - if (!res_domain) { - gds_warn("Can't create resource domain\n"); - } - - return res_domain; -} - -//----------------------------------------------------------------------------- - -static struct gds_cq * -gds_create_cq_internal(struct ibv_context *context, int cqe, - void *cq_context, struct ibv_comp_channel *channel, - int comp_vector, int gpu_id, gds_alloc_cq_flags_t flags, - struct ibv_exp_res_domain * res_domain) -{ - struct gds_cq *gcq = NULL; - ibv_exp_cq_init_attr attr; - gds_peer *peer = NULL; - gds_peer_attr *peer_attr = NULL; - int ret=0; - - if(!context) - { - gds_dbg("Invalid input context\n"); - return NULL; - } - - gcq = (struct gds_cq*)calloc(1, sizeof(struct gds_cq)); - if (!gcq) { - gds_err("cannot allocate memory\n"); - return NULL; - } - - //Here we need to recover peer and peer_attr pointers to set alloc_type and alloc_flags - //before ibv_exp_create_cq - ret = gds_register_peer_by_ordinal(gpu_id, &peer, &peer_attr); - if (ret) { - gds_err("error %d while registering GPU peer\n", ret); - return NULL; - } - assert(peer); - assert(peer_attr); - - peer->alloc_type = gds_peer::CQ; - peer->alloc_flags = flags; - - attr.comp_mask = IBV_EXP_CQ_INIT_ATTR_PEER_DIRECT; - attr.flags = 0; // see ibv_exp_cq_create_flags - attr.peer_direct_attrs = peer_attr; - if (res_domain) { - gds_dbg("using peer->res_domain %p for CQ\n", res_domain); - attr.res_domain = res_domain; - attr.comp_mask |= IBV_EXP_CQ_INIT_ATTR_RES_DOMAIN; - } - - int old_errno = errno; - gcq->cq = ibv_exp_create_cq(context, cqe, cq_context, channel, comp_vector, &attr); - if (!gcq->cq) { - gds_err("error %d in ibv_exp_create_cq, old errno %d\n", errno, old_errno); - return NULL; - } - - gcq->dtype = GDS_DRIVER_TYPE_MLX5_EXP; - - return gcq; -} - -//Note: general create cq function, not really used for now! -struct gds_cq * -gds_create_cq(struct ibv_context *context, int cqe, - void *cq_context, struct ibv_comp_channel *channel, - int comp_vector, int gpu_id, gds_alloc_cq_flags_t flags) -{ - int ret = 0; - struct gds_cq *gcq = NULL; - //TODO: leak of res_domain - struct ibv_exp_res_domain * res_domain; - gds_dbg("cqe=%d gpu_id=%d cq_flags=%08x\n", cqe, gpu_id, flags); - - gds_peer *peer = NULL; - gds_peer_attr *peer_attr = NULL; - ret = gds_register_peer_by_ordinal(gpu_id, &peer, &peer_attr); - if (ret) { - gds_err("error %d while registering GPU peer\n", ret); - return NULL; - } - assert(peer); - assert(peer_attr); - - peer->alloc_type = gds_peer::CQ; - peer->alloc_flags = flags; - - res_domain = gds_create_res_domain(context); - if (res_domain) - gds_dbg("using res_domain %p\n", res_domain); - else - gds_warn("NOT using res_domain\n"); - - - gcq = gds_create_cq_internal(context, cqe, cq_context, channel, comp_vector, gpu_id, flags, res_domain); - - if (!gcq) { - gds_err("error in gds_create_cq_internal\n"); - return NULL; - } - - return gcq; -} - -//----------------------------------------------------------------------------- - struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context, gds_qp_init_attr_t *qp_attr, int gpu_id, int flags) { diff --git a/src/mlx5-exp.cpp b/src/mlx5-exp.cpp new file mode 100644 index 0000000..9e4efe4 --- /dev/null +++ b/src/mlx5-exp.cpp @@ -0,0 +1,245 @@ +#include +#include +#include + +#include "mlx5-exp.hpp" +#include "utils.hpp" + +static ibv_exp_res_domain *gds_mlx5_exp_create_res_domain(struct ibv_context *context) +{ + if (!context) { + gds_err("invalid context"); + return NULL; + } + + ibv_exp_res_domain_init_attr res_domain_attr; + memset(&res_domain_attr, 0, sizeof(res_domain_attr)); + + res_domain_attr.comp_mask |= IBV_EXP_RES_DOMAIN_THREAD_MODEL; + res_domain_attr.thread_model = IBV_EXP_THREAD_SINGLE; + + ibv_exp_res_domain *res_domain = ibv_exp_create_res_domain(context, &res_domain_attr); + if (!res_domain) { + gds_warn("Can't create resource domain\n"); + } + + return res_domain; +} + +//----------------------------------------------------------------------------- + +static struct gds_mlx5_exp_cq_t * +gds_mlx5_exp_create_cq(struct ibv_context *context, int cqe, + void *cq_context, struct ibv_comp_channel *channel, + int comp_vector, int gpu_id, gds_alloc_cq_flags_t flags, + struct ibv_exp_res_domain *res_domain) +{ + struct gds_mlx5_exp_cq_t *gmexpcq = NULL; + ibv_exp_cq_init_attr attr; + gds_peer *peer = NULL; + gds_peer_attr *peer_attr = NULL; + int ret = 0; + + if (!context) + { + gds_dbg("Invalid input context\n"); + return NULL; + } + + gmexpcq = (gds_mlx5_exp_cq_t *)calloc(1, sizeof(gds_mlx5_exp_cq_t)); + if (!gmexpcq) { + gds_err("cannot allocate memory\n"); + return NULL; + } + + //Here we need to recover peer and peer_attr pointers to set alloc_type and alloc_flags + //before ibv_exp_create_cq + ret = gds_register_peer_by_ordinal(gpu_id, &peer, &peer_attr); + if (ret) { + gds_err("error %d while registering GPU peer\n", ret); + return NULL; + } + assert(peer); + assert(peer_attr); + + peer->alloc_type = gds_peer::CQ; + peer->alloc_flags = flags; + + attr.comp_mask = IBV_EXP_CQ_INIT_ATTR_PEER_DIRECT; + attr.flags = 0; // see ibv_exp_cq_create_flags + attr.peer_direct_attrs = peer_attr; + if (res_domain) { + gds_dbg("using peer->res_domain %p for CQ\n", res_domain); + attr.res_domain = res_domain; + attr.comp_mask |= IBV_EXP_CQ_INIT_ATTR_RES_DOMAIN; + gmexpcq->res_domain = res_domain; + } + + int old_errno = errno; + gmexpcq->gcq.cq = ibv_exp_create_cq(context, cqe, cq_context, channel, comp_vector, &attr); + if (!gmexpcq->gcq.cq) { + gds_err("error %d in ibv_exp_create_cq, old errno %d\n", errno, old_errno); + return NULL; + } + + gmexpcq->gcq.dtype = GDS_DRIVER_TYPE_MLX5_EXP; + + return gmexpcq; +} + +//----------------------------------------------------------------------------- + +struct gds_mlx5_exp_qp_t *gds_mlx5_exp_create_qp(struct ibv_pd *pd, struct ibv_context *context, + gds_qp_init_attr_t *qp_attr, int gpu_id, int flags) +{ + int ret = 0; + gds_mlx5_exp_qp_t *gmexpqp = NULL; + struct ibv_qp *qp = NULL; + gds_mlx5_exp_cq_t *rx_gmexpcq = NULL, *tx_gmexpcq = NULL; + gds_peer *peer = NULL; + gds_peer_attr *peer_attr = NULL; + struct ibv_qp_init_attr exp_qp_attr = {0,}; + int old_errno = errno; + + assert(pd); + assert(context); + assert(qp_attr); + + gmexpqp = (gds_mlx5_exp_qp_t *)calloc(1, sizeof(struct gds_qp)); + if (!gqp) { + gds_err("cannot allocate memory\n"); + return NULL; + } + gmexpqp->gqp.dtype = GDS_DRIVER_TYPE_MLX5_EXP; + + gmexpqp->gqp.dev_context = context; + + // peer registration + gds_dbg("before gds_register_peer_ex\n"); + ret = gds_register_peer_by_ordinal(gpu_id, &peer, &peer_attr); + if (ret) { + gds_err("error %d in gds_register_peer_ex\n", ret); + goto err; + } + + gmexpqp->res_domain = gds_create_res_domain(context); + if (gmexpqp->res_domain) + gds_dbg("using res_domain %p\n", gmexpqp->res_domain); + else + gds_warn("NOT using res_domain\n"); + + tx_gmexpcq = gds_mlx5_exp_create_cq(context, qp_attr->cap.max_send_wr, NULL, NULL, 0, gpu_id, + (flags & GDS_CREATE_QP_TX_CQ_ON_GPU) ? GDS_ALLOC_CQ_ON_GPU : GDS_ALLOC_CQ_DEFAULT, + gmexpqp->res_domain); + if (!tx_gmexpcq) { + ret = errno; + gds_err("error %d while creating TX CQ, old_errno=%d\n", ret, old_errno); + goto err; + } + + rx_gmexpcq = gds_mlx5_exp_create_cq(context, qp_attr->cap.max_recv_wr, NULL, NULL, 0, gpu_id, + (flags & GDS_CREATE_QP_RX_CQ_ON_GPU) ? GDS_ALLOC_CQ_ON_GPU : GDS_ALLOC_CQ_DEFAULT, + gmexpqp->res_domain); + if (!rx_gmexpcq) { + ret = errno; + gds_err("error %d while creating RX CQ\n", ret); + goto err; + } + + // peer registration + peer->alloc_type = gds_peer::WQ; + peer->alloc_flags = GDS_ALLOC_WQ_DEFAULT | GDS_ALLOC_DBREC_DEFAULT; + if (flags & GDS_CREATE_QP_WQ_ON_GPU) { + gds_err("error, QP WQ on GPU is not supported yet\n"); + goto err; + } + if (flags & GDS_CREATE_QP_WQ_DBREC_ON_GPU) { + gds_warn("QP WQ DBREC on GPU\n"); + peer->alloc_flags |= GDS_ALLOC_DBREC_ON_GPU; + } + + exp_qp_attr = { + .send_cq = tx_gmexpcq->gcq.cq, + .recv_cq = rx_gmexpcq->gcq.cq, + .pd = pd, + .comp_mask = IBV_EXP_QP_INIT_ATTR_PD | IBV_EXP_QP_INIT_ATTR_PEER_DIRECT, + .peer_direct_attrs = peer_attr, + .qp_type = qp_attr->qp_type + }; + + assert(sizeof(exp_qp_attr.cap) == sizeof(qp_attr->cap)); + + memcpy(&exp_qp_attr.cap, &qp_attr->cap, sizeof(qp_attr->cap)); + + qp = ibv_exp_create_qp(context, qp_attr); + if (!qp) { + ret = EINVAL; + gds_err("error in ibv_exp_create_qp\n"); + goto err; + } + + gmexpqp->gqp.qp = qp; + gmexpqp->gqp.send_cq = tx_gmexpcq->gcq; + gmexpqp->gqp.recv_cq = rx_gmexpcq->gcq; + + gds_dbg("created gds_mlx5_exp_qp=%p\n", gmexpqp); + + return gmexpqp; + +err: + gds_dbg("destroying QP\n"); + gds_mlx5_exp_destroy_qp(gmexpqp); + + return NULL; +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_destroy_qp(gds_mlx5_exp_qp_t *gmexpqp) +{ + int retcode = 0; + int ret; + + if (!gmexpqp) + return retcode; + + assert(gmexpqp->gqp.dtype == GDS_DRIVER_TYPE_MLX5_EXP); + + if (gmexpqp->gqp.qp) { + ret = ibv_destroy_qp(gmexpqp->gqp.qp); + if (ret) { + gds_err("error %d in destroy_qp\n", ret); + retcode = ret; + } + } + + if (gmexpqp->gqp.send_cq) { + ret = gds_destroy_cq(gmexpqp->gqp.send_cq); + if (ret) { + gds_err("error %d in destroy_cq send_cq\n", ret); + retcode = ret; + } + } + + if (gmexpqp->gqp.recv_cq) { + ret = gds_destroy_cq(gmexpqp->gqp.recv_cq); + if (ret) { + gds_err("error %d in destroy_cq recv_cq\n", ret); + retcode = ret; + } + } + + if (gmexpqp->res_domain) { + struct ibv_exp_destroy_res_domain_attr attr = {0,}; //IBV_EXP_DESTROY_RES_DOMAIN_RESERVED + ret = ibv_exp_destroy_res_domain(gmexpqp->gqp.dev_context, gmexpqp->res_domain, &attr); + if (ret) { + gds_err("ibv_exp_destroy_res_domain error %d: %s\n", ret, strerror(ret)); + retcode = ret; + } + } + + free(gmexpqp); + + return retcode; +} + diff --git a/src/mlx5-exp.hpp b/src/mlx5-exp.hpp new file mode 100644 index 0000000..214342a --- /dev/null +++ b/src/mlx5-exp.hpp @@ -0,0 +1,20 @@ +typedef struct gds_mlx5_exp_cq { + gds_cq_t gcq; + ibv_exp_res_domain *res_domain +} gds_mlx5_exp_cq_t; + +typedef struct gds_mlx5_exp_qp { + gds_qp_t gqp; + ibv_exp_res_domain *res_domain +} gds_mlx5_exp_qp_t; + +static inline gds_mlx5_exp_cq_t *to_gds_mexp_cq(gds_cq_t *gcq) { + assert(gcq->dtype == GDS_DRIVER_TYPE_MLX5_EXP); + return container_of(gcq, gds_mlx5_exp_cq_t, gcq); +} + +static inline gds_mlx5_exp_qp_t *to_gds_mexp_qp(gds_qp_t *gqp) { + assert(gcq->dtype == GDS_DRIVER_TYPE_MLX5_EXP); + return container_of(gqp, gds_mlx5_exp_qp_t, gqp); +} + From 5e789db4f759d8b0ce94bbf8f6834aa7ce1f7588 Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Thu, 5 Aug 2021 02:12:31 -0400 Subject: [PATCH 07/19] Implemented gds_mlx5_exp_destroy_cq --- src/mlx5-exp.cpp | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/src/mlx5-exp.cpp b/src/mlx5-exp.cpp index 9e4efe4..47dc80a 100644 --- a/src/mlx5-exp.cpp +++ b/src/mlx5-exp.cpp @@ -28,8 +28,7 @@ static ibv_exp_res_domain *gds_mlx5_exp_create_res_domain(struct ibv_context *co //----------------------------------------------------------------------------- -static struct gds_mlx5_exp_cq_t * -gds_mlx5_exp_create_cq(struct ibv_context *context, int cqe, +gds_mlx5_exp_cq_t *gds_mlx5_exp_create_cq(struct ibv_context *context, int cqe, void *cq_context, struct ibv_comp_channel *channel, int comp_vector, int gpu_id, gds_alloc_cq_flags_t flags, struct ibv_exp_res_domain *res_domain) @@ -89,7 +88,7 @@ gds_mlx5_exp_create_cq(struct ibv_context *context, int cqe, //----------------------------------------------------------------------------- -struct gds_mlx5_exp_qp_t *gds_mlx5_exp_create_qp(struct ibv_pd *pd, struct ibv_context *context, +gds_mlx5_exp_qp_t *gds_mlx5_exp_create_qp(struct ibv_pd *pd, struct ibv_context *context, gds_qp_init_attr_t *qp_attr, int gpu_id, int flags) { int ret = 0; @@ -243,3 +242,29 @@ int gds_mlx5_exp_destroy_qp(gds_mlx5_exp_qp_t *gmexpqp) return retcode; } +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_destroy_cq(gds_mlx5_exp_cq_t *gmexpcq) +{ + int retcode = 0; + int ret; + + if (!gmexpcq) + return retcode; + + assert(gmexpcq->gcq.dtype == GDS_DRIVER_TYPE_MLX5_EXP); + + if (gmexpcq->gcq.cq) { + ret = ibv_destroy_cq(gmexpcq->gcq.cq); + if (ret) { + gds_err("error %d in destroy_cq\n", ret); + retcode = ret; + } + } + + // res_domain will be destroyed in gds_mlx5_exp_destroy_qp. + + free(gmexpcq); + + return retcode; +} From 2a8ac7d93bc93e961626c5f4e240945d53a7042b Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Thu, 5 Aug 2021 02:19:03 -0400 Subject: [PATCH 08/19] Implemented gds_destroy_qp and gds_destroy_cq by connecting to gds_mlx5_exp_* --- src/gdsync.cpp | 61 ++++++++++++++++++++---------------------------- src/mlx5-exp.hpp | 13 +++++++++++ 2 files changed, 38 insertions(+), 36 deletions(-) diff --git a/src/gdsync.cpp b/src/gdsync.cpp index 5a14c51..862c0da 100644 --- a/src/gdsync.cpp +++ b/src/gdsync.cpp @@ -1785,51 +1785,40 @@ struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context, //----------------------------------------------------------------------------- -int gds_destroy_qp(struct gds_qp *gqp) +int gds_destroy_cq(struct gds_cq *gcq) { int retcode = 0; int ret; - if(!gqp) return retcode; + if (!gcq) + return retcode; - if(gqp->qp) - { - ret = ibv_destroy_qp(gqp->qp); - if (ret) { - gds_err("error %d in destroy_qp\n", ret); - retcode = ret; - } - } + // Currently, we support only exp-verbs. + assert(gcq->dtype == GDS_DRIVER_TYPE_MLX5_EXP); - if(gqp->send_cq.cq) - { - ret = ibv_destroy_cq(gqp->send_cq.cq); - if (ret) { - gds_err("error %d in destroy_cq send_cq\n", ret); - retcode = ret; - } - } + gds_mlx5_exp_cq_t *gmexpcq = to_gds_mexp_cq(gcq); - if(gqp->recv_cq.cq) - { - ret = ibv_destroy_cq(gqp->recv_cq.cq); - if (ret) { - gds_err("error %d in destroy_cq recv_cq\n", ret); - retcode = ret; - } - } + retcode = gds_mlx5_exp_destroy_cq(gmexpcq); - if(gqp->res_domain) { - struct ibv_exp_destroy_res_domain_attr attr; //IBV_EXP_DESTROY_RES_DOMAIN_RESERVED - attr.comp_mask=0; - ret = ibv_exp_destroy_res_domain(gqp->dev_context, gqp->res_domain, &attr); - if (ret) { - gds_err("ibv_exp_destroy_res_domain error %d: %s\n", ret, strerror(ret)); - retcode = ret; - } - } + return retcode; +} + +//----------------------------------------------------------------------------- + +int gds_destroy_qp(struct gds_qp *gqp) +{ + int retcode = 0; + int ret; + + if (!gqp) + return retcode; + + // Currently, we support only exp-verbs. + assert(gqp->dtype == GDS_DRIVER_TYPE_MLX5_EXP); + + gds_mlx5_exp_qp_t *gmexpqp = to_gds_mexp_qp(gqp); - free(gqp); + retcode = gds_mlx5_exp_destroy_qp(gmexpqp); return retcode; } diff --git a/src/mlx5-exp.hpp b/src/mlx5-exp.hpp index 214342a..0f525ee 100644 --- a/src/mlx5-exp.hpp +++ b/src/mlx5-exp.hpp @@ -1,3 +1,6 @@ +#include +#include + typedef struct gds_mlx5_exp_cq { gds_cq_t gcq; ibv_exp_res_domain *res_domain @@ -18,3 +21,13 @@ static inline gds_mlx5_exp_qp_t *to_gds_mexp_qp(gds_qp_t *gqp) { return container_of(gqp, gds_mlx5_exp_qp_t, gqp); } +gds_mlx5_exp_cq_t *gds_mlx5_exp_create_cq(struct ibv_context *context, int cqe, + void *cq_context, struct ibv_comp_channel *channel, + int comp_vector, int gpu_id, gds_alloc_cq_flags_t flags, + struct ibv_exp_res_domain *res_domain); + +gds_mlx5_exp_qp_t *gds_mlx5_exp_create_qp(struct ibv_pd *pd, struct ibv_context *context, + gds_qp_init_attr_t *qp_attr, int gpu_id, int flags); + +int gds_mlx5_exp_destroy_cq(gds_mlx5_exp_cq_t *gmexpcq); +int gds_mlx5_exp_destroy_qp(gds_mlx5_exp_qp_t *gmexpqp); From a9351f2bfabd7c1d44adf29c4cba10f73c615744 Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Thu, 5 Aug 2021 03:11:13 -0400 Subject: [PATCH 09/19] Reimplemented gds_create_qp by connecting to gds_mlx5_exp_create_qp --- src/gdsync.cpp | 80 ++++++++---------------------------------------- src/mlx5-exp.cpp | 60 +++++++++++++++--------------------- src/mlx5-exp.hpp | 14 +++++---- src/utils.hpp | 4 +++ 4 files changed, 48 insertions(+), 110 deletions(-) diff --git a/src/gdsync.cpp b/src/gdsync.cpp index 862c0da..56d2743 100644 --- a/src/gdsync.cpp +++ b/src/gdsync.cpp @@ -1682,16 +1682,16 @@ struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context, gds_qp_init_attr_t *qp_attr, int gpu_id, int flags) { int ret = 0; - struct gds_qp *gqp = NULL; - struct ibv_qp *qp = NULL; - struct gds_cq *rx_gcq = NULL, *tx_gcq = NULL; + gds_mlx5_exp_qp_t *gmexpqp = NULL; gds_peer *peer = NULL; gds_peer_attr *peer_attr = NULL; + gds_driver_type dtype; int old_errno = errno; gds_dbg("pd=%p context=%p gpu_id=%d flags=%08x current errno=%d\n", pd, context, gpu_id, flags, errno); assert(pd); assert(context); + assert(context->device); assert(qp_attr); if (flags & ~(GDS_CREATE_QP_WQ_ON_GPU|GDS_CREATE_QP_TX_CQ_ON_GPU|GDS_CREATE_QP_RX_CQ_ON_GPU|GDS_CREATE_QP_WQ_DBREC_ON_GPU)) { @@ -1699,87 +1699,31 @@ struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context, return NULL; } - gqp = (struct gds_qp*)calloc(1, sizeof(struct gds_qp)); - if (!gqp) { - gds_err("cannot allocate memory\n"); - return NULL; - } - - gqp->dev_context=context; - // peer registration gds_dbg("before gds_register_peer_ex\n"); ret = gds_register_peer_by_ordinal(gpu_id, &peer, &peer_attr); if (ret) { - gds_err("error %d in gds_register_peer_ex\n", ret); - goto err; - } - - gqp->res_domain = gds_create_res_domain(context); - if (gqp->res_domain) - gds_dbg("using gqp->res_domain %p\n", gqp->res_domain); - else - gds_warn("NOT using gqp->res_domain\n"); - - tx_gcq = gds_create_cq_internal(context, qp_attr->cap.max_send_wr, NULL, NULL, 0, gpu_id, - (flags & GDS_CREATE_QP_TX_CQ_ON_GPU) ? GDS_ALLOC_CQ_ON_GPU : GDS_ALLOC_CQ_DEFAULT, - gqp->res_domain); - if (!tx_gcq) { - ret = errno; - gds_err("error %d while creating TX CQ, old_errno=%d\n", ret, old_errno); + gds_err("error %d in gds_register_peer_ex\n", ret); goto err; } - rx_gcq = gds_create_cq_internal(context, qp_attr->cap.max_recv_wr, NULL, NULL, 0, gpu_id, - (flags & GDS_CREATE_QP_RX_CQ_ON_GPU) ? GDS_ALLOC_CQ_ON_GPU : GDS_ALLOC_CQ_DEFAULT, - gqp->res_domain); - if (!rx_gcq) { - ret = errno; - gds_err("error %d while creating RX CQ\n", ret); + dtype = gds_get_driver_type(context->device); + if (dtype != GDS_DRIVER_TYPE_MLX5_EXP) { + gds_err("Unsupported IB device\n"); goto err; } - // peer registration - qp_attr->send_cq = tx_gcq->cq; - qp_attr->recv_cq = rx_gcq->cq; - qp_attr->pd = pd; - qp_attr->comp_mask |= IBV_EXP_QP_INIT_ATTR_PD; - - peer->alloc_type = gds_peer::WQ; - peer->alloc_flags = GDS_ALLOC_WQ_DEFAULT | GDS_ALLOC_DBREC_DEFAULT; - if (flags & GDS_CREATE_QP_WQ_ON_GPU) { - gds_err("error, QP WQ on GPU is not supported yet\n"); - goto err; - } - if (flags & GDS_CREATE_QP_WQ_DBREC_ON_GPU) { - gds_warn("QP WQ DBREC on GPU\n"); - peer->alloc_flags |= GDS_ALLOC_DBREC_ON_GPU; - } - qp_attr->comp_mask |= IBV_EXP_QP_INIT_ATTR_PEER_DIRECT; - qp_attr->peer_direct_attrs = peer_attr; - - qp = ibv_exp_create_qp(context, qp_attr); - if (!qp) { - ret = EINVAL; - gds_err("error in ibv_exp_create_qp\n"); + gmexpqp = gds_mlx5_exp_create_qp(pd, context, qp_attr, peer, peer_attr, flags); + if (!gmexpqp) { + gds_err("Error in gds_mlx5_exp_create_qp.\n"); goto err; } - gqp->qp = qp; - gqp->send_cq.cq = qp->send_cq; - gqp->send_cq.curr_offset = 0; - gqp->recv_cq.cq = qp->recv_cq; - gqp->recv_cq.curr_offset = 0; - gqp->dtype = GDS_DRIVER_TYPE_MLX5_EXP; - - gds_dbg("created gds_qp=%p\n", gqp); + gds_dbg("created gds_qp=%p\n", gmexpqp->gqp); - return gqp; + return gmexpqp->gqp; err: - gds_dbg("destroying QP\n"); - gds_destroy_qp(gqp); - return NULL; } diff --git a/src/mlx5-exp.cpp b/src/mlx5-exp.cpp index 47dc80a..b39fa16 100644 --- a/src/mlx5-exp.cpp +++ b/src/mlx5-exp.cpp @@ -28,10 +28,11 @@ static ibv_exp_res_domain *gds_mlx5_exp_create_res_domain(struct ibv_context *co //----------------------------------------------------------------------------- -gds_mlx5_exp_cq_t *gds_mlx5_exp_create_cq(struct ibv_context *context, int cqe, - void *cq_context, struct ibv_comp_channel *channel, - int comp_vector, int gpu_id, gds_alloc_cq_flags_t flags, - struct ibv_exp_res_domain *res_domain) +gds_mlx5_exp_cq_t *gds_mlx5_exp_create_cq( + struct ibv_context *context, int cqe, + void *cq_context, struct ibv_comp_channel *channel, + int comp_vector, gds_peer *peer, gds_peer_attr *peer_attr, gds_alloc_cq_flags_t flags, + struct ibv_exp_res_domain *res_domain) { struct gds_mlx5_exp_cq_t *gmexpcq = NULL; ibv_exp_cq_init_attr attr; @@ -39,11 +40,9 @@ gds_mlx5_exp_cq_t *gds_mlx5_exp_create_cq(struct ibv_context *context, int cqe, gds_peer_attr *peer_attr = NULL; int ret = 0; - if (!context) - { - gds_dbg("Invalid input context\n"); - return NULL; - } + assert(context); + assert(peer); + assert(peer_attr); gmexpcq = (gds_mlx5_exp_cq_t *)calloc(1, sizeof(gds_mlx5_exp_cq_t)); if (!gmexpcq) { @@ -51,16 +50,6 @@ gds_mlx5_exp_cq_t *gds_mlx5_exp_create_cq(struct ibv_context *context, int cqe, return NULL; } - //Here we need to recover peer and peer_attr pointers to set alloc_type and alloc_flags - //before ibv_exp_create_cq - ret = gds_register_peer_by_ordinal(gpu_id, &peer, &peer_attr); - if (ret) { - gds_err("error %d while registering GPU peer\n", ret); - return NULL; - } - assert(peer); - assert(peer_attr); - peer->alloc_type = gds_peer::CQ; peer->alloc_flags = flags; @@ -88,8 +77,9 @@ gds_mlx5_exp_cq_t *gds_mlx5_exp_create_cq(struct ibv_context *context, int cqe, //----------------------------------------------------------------------------- -gds_mlx5_exp_qp_t *gds_mlx5_exp_create_qp(struct ibv_pd *pd, struct ibv_context *context, - gds_qp_init_attr_t *qp_attr, int gpu_id, int flags) +gds_mlx5_exp_qp_t *gds_mlx5_exp_create_qp( + struct ibv_pd *pd, struct ibv_context *context, gds_qp_init_attr_t *qp_attr, + gds_peer *peer, gds_peer_attr *peer_attr, int flags) { int ret = 0; gds_mlx5_exp_qp_t *gmexpqp = NULL; @@ -103,8 +93,10 @@ gds_mlx5_exp_qp_t *gds_mlx5_exp_create_qp(struct ibv_pd *pd, struct ibv_context assert(pd); assert(context); assert(qp_attr); + assert(peer); + assert(peer_attr); - gmexpqp = (gds_mlx5_exp_qp_t *)calloc(1, sizeof(struct gds_qp)); + gmexpqp = (gds_mlx5_exp_qp_t *)calloc(1, sizeof(gds_mlx5_exp_qp_t)); if (!gqp) { gds_err("cannot allocate memory\n"); return NULL; @@ -113,32 +105,28 @@ gds_mlx5_exp_qp_t *gds_mlx5_exp_create_qp(struct ibv_pd *pd, struct ibv_context gmexpqp->gqp.dev_context = context; - // peer registration - gds_dbg("before gds_register_peer_ex\n"); - ret = gds_register_peer_by_ordinal(gpu_id, &peer, &peer_attr); - if (ret) { - gds_err("error %d in gds_register_peer_ex\n", ret); - goto err; - } - gmexpqp->res_domain = gds_create_res_domain(context); if (gmexpqp->res_domain) gds_dbg("using res_domain %p\n", gmexpqp->res_domain); else gds_warn("NOT using res_domain\n"); - tx_gmexpcq = gds_mlx5_exp_create_cq(context, qp_attr->cap.max_send_wr, NULL, NULL, 0, gpu_id, - (flags & GDS_CREATE_QP_TX_CQ_ON_GPU) ? GDS_ALLOC_CQ_ON_GPU : GDS_ALLOC_CQ_DEFAULT, - gmexpqp->res_domain); + tx_gmexpcq = gds_mlx5_exp_create_cq( + context, qp_attr->cap.max_send_wr, NULL, NULL, 0, peer, peer_attr, + (flags & GDS_CREATE_QP_TX_CQ_ON_GPU) ? GDS_ALLOC_CQ_ON_GPU : GDS_ALLOC_CQ_DEFAULT, + gmexpqp->res_domain + ); if (!tx_gmexpcq) { ret = errno; gds_err("error %d while creating TX CQ, old_errno=%d\n", ret, old_errno); goto err; } - rx_gmexpcq = gds_mlx5_exp_create_cq(context, qp_attr->cap.max_recv_wr, NULL, NULL, 0, gpu_id, - (flags & GDS_CREATE_QP_RX_CQ_ON_GPU) ? GDS_ALLOC_CQ_ON_GPU : GDS_ALLOC_CQ_DEFAULT, - gmexpqp->res_domain); + rx_gmexpcq = gds_mlx5_exp_create_cq( + context, qp_attr->cap.max_recv_wr, NULL, NULL, 0, peer, peer_attr, + (flags & GDS_CREATE_QP_RX_CQ_ON_GPU) ? GDS_ALLOC_CQ_ON_GPU : GDS_ALLOC_CQ_DEFAULT, + gmexpqp->res_domain + ); if (!rx_gmexpcq) { ret = errno; gds_err("error %d while creating RX CQ\n", ret); diff --git a/src/mlx5-exp.hpp b/src/mlx5-exp.hpp index 0f525ee..20e94f3 100644 --- a/src/mlx5-exp.hpp +++ b/src/mlx5-exp.hpp @@ -21,13 +21,15 @@ static inline gds_mlx5_exp_qp_t *to_gds_mexp_qp(gds_qp_t *gqp) { return container_of(gqp, gds_mlx5_exp_qp_t, gqp); } -gds_mlx5_exp_cq_t *gds_mlx5_exp_create_cq(struct ibv_context *context, int cqe, - void *cq_context, struct ibv_comp_channel *channel, - int comp_vector, int gpu_id, gds_alloc_cq_flags_t flags, - struct ibv_exp_res_domain *res_domain); +gds_mlx5_exp_cq_t *gds_mlx5_exp_create_cq( + struct ibv_context *context, int cqe, + void *cq_context, struct ibv_comp_channel *channel, + int comp_vector, gds_peer *peer, gds_peer_attr *peer_attr, gds_alloc_cq_flags_t flags, + struct ibv_exp_res_domain *res_domain); -gds_mlx5_exp_qp_t *gds_mlx5_exp_create_qp(struct ibv_pd *pd, struct ibv_context *context, - gds_qp_init_attr_t *qp_attr, int gpu_id, int flags); +gds_mlx5_exp_qp_t *gds_mlx5_exp_create_qp( + struct ibv_pd *pd, struct ibv_context *context, gds_qp_init_attr_t *qp_attr, + gds_peer *peer, gds_peer_attr *peer_attr, int flags); int gds_mlx5_exp_destroy_cq(gds_mlx5_exp_cq_t *gmexpcq); int gds_mlx5_exp_destroy_qp(gds_mlx5_exp_qp_t *gmexpqp); diff --git a/src/utils.hpp b/src/utils.hpp index de2aaa8..1bacbb6 100644 --- a/src/utils.hpp +++ b/src/utils.hpp @@ -238,6 +238,10 @@ static inline gds_driver_type gds_get_driver_type(struct ibv_device *ibdev) //----------------------------------------------------------------------------- +int gds_destroy_cq(struct gds_cq *gcq); + +//----------------------------------------------------------------------------- + /* * Local variables: * c-indent-level: 8 From e18866864b7da32f97bd2a73900cab7b552c453c Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Thu, 5 Aug 2021 03:16:09 -0400 Subject: [PATCH 10/19] Moved gds_send_wr from exp-verbs to ib-verbs in gds_kernel_latency.c --- tests/gds_kernel_latency.c | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/tests/gds_kernel_latency.c b/tests/gds_kernel_latency.c index 63875bf..428dadb 100644 --- a/tests/gds_kernel_latency.c +++ b/tests/gds_kernel_latency.c @@ -542,23 +542,22 @@ static int pp_post_gpu_send(struct pingpong_context *ctx, uint32_t qpn, CUstream .wr_id = PINGPONG_SEND_WRID, .sg_list = &list, .num_sge = 1, - .exp_opcode = IBV_EXP_WR_SEND, - .exp_send_flags = IBV_EXP_SEND_SIGNALED, + .opcode = IBV_WR_SEND, + .send_flags = IBV_SEND_SIGNALED, .wr = { .ud = { .ah = ctx->ah, .remote_qpn = qpn, .remote_qkey = 0x11111111 } - }, - .comp_mask = 0 + } }; #if 0 if (IBV_QPT_UD != gds_qpt) { memset(&ewr, 0, sizeof(ewr)); ewr.num_sge = 1; - ewr.exp_send_flags = IBV_EXP_SEND_SIGNALED; - ewr.exp_opcode = IBV_EXP_WR_SEND; + ewr.send_flags = IBV_SEND_SIGNALED; + ewr.opcode = IBV_WR_SEND; ewr.wr_id = PINGPONG_SEND_WRID; ewr.sg_list = &list; ewr.next = NULL; @@ -580,23 +579,22 @@ static int pp_prepare_gpu_send(struct pingpong_context *ctx, uint32_t qpn, gds_s .wr_id = PINGPONG_SEND_WRID, .sg_list = &list, .num_sge = 1, - .exp_opcode = IBV_EXP_WR_SEND, - .exp_send_flags = IBV_EXP_SEND_SIGNALED, + .opcode = IBV_WR_SEND, + .send_flags = IBV_SEND_SIGNALED, .wr = { .ud = { .ah = ctx->ah, .remote_qpn = qpn, .remote_qkey = 0x11111111 } - }, - .comp_mask = 0 + } }; if (IBV_QPT_UD != gds_qpt) { memset(&ewr, 0, sizeof(ewr)); ewr.num_sge = 1; - ewr.exp_send_flags = IBV_EXP_SEND_SIGNALED; - ewr.exp_opcode = IBV_EXP_WR_SEND; + ewr.send_flags = IBV_SEND_SIGNALED; + ewr.opcode = IBV_WR_SEND; ewr.wr_id = PINGPONG_SEND_WRID; ewr.sg_list = &list; ewr.next = NULL; From 07d03a035c53876f29e10ca4e7608ba96399d270 Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Thu, 5 Aug 2021 03:20:27 -0400 Subject: [PATCH 11/19] Fixed compile issues in gdsync.cpp and mlx5-exp.hpp --- include/gdsync/core.h | 8 ++++---- src/gdsync.cpp | 3 ++- src/mlx5-exp.hpp | 8 +++++--- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/include/gdsync/core.h b/include/gdsync/core.h index c438594..a74299d 100644 --- a/include/gdsync/core.h +++ b/include/gdsync/core.h @@ -64,19 +64,19 @@ typedef enum gds_driver_type { GDS_DRIVER_TYPE_MLX5_DEVX } gds_driver_type_t; -struct gds_cq { +typedef struct gds_cq { struct ibv_cq *cq; uint32_t curr_offset; gds_driver_type_t dtype; -}; +} gds_cq_t; -struct gds_qp { +typedef struct gds_qp { struct ibv_qp *qp; struct gds_cq *send_cq; struct gds_cq *recv_cq; struct ibv_context *dev_context; gds_driver_type_t dtype; -}; +} gds_qp_t; /* \brief: Create a peer-enabled QP attached to the specified GPU id. * diff --git a/src/gdsync.cpp b/src/gdsync.cpp index 56d2743..23907fe 100644 --- a/src/gdsync.cpp +++ b/src/gdsync.cpp @@ -43,6 +43,7 @@ #include "archutils.h" #include "mlnxutils.h" #include "task_queue.hpp" +#include "mlx5-exp.hpp" //----------------------------------------------------------------------------- @@ -1721,7 +1722,7 @@ struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context, gds_dbg("created gds_qp=%p\n", gmexpqp->gqp); - return gmexpqp->gqp; + return &gmexpqp->gqp; err: return NULL; diff --git a/src/mlx5-exp.hpp b/src/mlx5-exp.hpp index 20e94f3..60bb1fd 100644 --- a/src/mlx5-exp.hpp +++ b/src/mlx5-exp.hpp @@ -1,14 +1,16 @@ #include #include +#include + typedef struct gds_mlx5_exp_cq { gds_cq_t gcq; - ibv_exp_res_domain *res_domain + ibv_exp_res_domain *res_domain; } gds_mlx5_exp_cq_t; typedef struct gds_mlx5_exp_qp { gds_qp_t gqp; - ibv_exp_res_domain *res_domain + ibv_exp_res_domain *res_domain; } gds_mlx5_exp_qp_t; static inline gds_mlx5_exp_cq_t *to_gds_mexp_cq(gds_cq_t *gcq) { @@ -17,7 +19,7 @@ static inline gds_mlx5_exp_cq_t *to_gds_mexp_cq(gds_cq_t *gcq) { } static inline gds_mlx5_exp_qp_t *to_gds_mexp_qp(gds_qp_t *gqp) { - assert(gcq->dtype == GDS_DRIVER_TYPE_MLX5_EXP); + assert(gqp->dtype == GDS_DRIVER_TYPE_MLX5_EXP); return container_of(gqp, gds_mlx5_exp_qp_t, gqp); } From 173841f3174272eca05d55f7a8c838e7ecbaa01b Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Fri, 6 Aug 2021 00:54:30 -0400 Subject: [PATCH 12/19] Added mlx5-exp.cpp to the compile list --- Makefile.am | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Makefile.am b/Makefile.am index ac9a0cc..27fce57 100644 --- a/Makefile.am +++ b/Makefile.am @@ -20,13 +20,13 @@ EXTRA_DIST = autogen.sh include_HEADERS = include/gdsync.h libgdsyncincludedir = $(includedir)/gdsync -libgdsyncinclude_HEADERS = include/gdsync/core.h include/gdsync/device.cuh include/gdsync/mlx5.h include/gdsync/tools.h +libgdsyncinclude_HEADERS = include/gdsync/core.h include/gdsync/device.cuh include/gdsync/mlx5.h include/gdsync/tools.h src_libgdsync_la_CFLAGS = $(AM_CFLAGS) -src_libgdsync_la_SOURCES = src/gdsync.cpp src/memmgr.cpp src/mem.cpp src/objs.cpp src/apis.cpp src/mlx5.cpp include/gdsync.h +src_libgdsync_la_SOURCES = src/gdsync.cpp src/memmgr.cpp src/mem.cpp src/objs.cpp src/apis.cpp src/mlx5.cpp src/mlx5-exp.cpp include/gdsync.h src_libgdsync_la_LDFLAGS = -version-info @VERSION_INFO@ -noinst_HEADERS = src/mem.hpp src/memmgr.hpp src/objs.hpp src/rangeset.hpp src/utils.hpp src/archutils.h src/mlnxutils.h +noinst_HEADERS = src/mem.hpp src/memmgr.hpp src/objs.hpp src/rangeset.hpp src/utils.hpp src/archutils.h src/mlnxutils.h src/mlx5-exp.hpp # if enabled at configure time From 8d1ed503001c40dd6120b4d61c83f07b4395f631 Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Fri, 6 Aug 2021 00:55:35 -0400 Subject: [PATCH 13/19] Moved gds_prepare_send to gds_mlx5_exp_* and fixed compile errors in mlx5-exp.* --- src/apis.cpp | 34 ++++++++++--------------- src/mlx5-exp.cpp | 64 ++++++++++++++++++++++++++++++++++-------------- src/mlx5-exp.hpp | 11 +++++++++ 3 files changed, 69 insertions(+), 40 deletions(-) diff --git a/src/apis.cpp b/src/apis.cpp index cd532d7..0801771 100644 --- a/src/apis.cpp +++ b/src/apis.cpp @@ -51,6 +51,7 @@ #include "utils.hpp" #include "archutils.h" #include "mlnxutils.h" +#include "mlx5-exp.hpp" //----------------------------------------------------------------------------- @@ -171,33 +172,24 @@ int gds_post_recv(struct gds_qp *qp, struct ibv_recv_wr *wr, struct ibv_recv_wr //----------------------------------------------------------------------------- -int gds_prepare_send(struct gds_qp *qp, gds_send_wr *p_ewr, +int gds_prepare_send(struct gds_qp *gqp, gds_send_wr *p_ewr, gds_send_wr **bad_ewr, gds_send_request_t *request) { int ret = 0; + gds_mlx5_exp_qp_t *gmexpqp; + gds_init_send_info(request); - assert(qp); - assert(qp->qp); - ret = ibv_exp_post_send(qp->qp, p_ewr, bad_ewr); - if (ret) { + assert(gqp); + assert(gqp->qp); + assert(gqp->dtype == GDS_DRIVER_TYPE_MLX5_EXP); + + gmexpqp = to_gds_mexp_qp(gqp); + + ret = gds_mlx5_exp_prepare_send(gmexpqp, p_ewr, bad_ewr, request); + if (ret) + gds_err("Error %d in gds_mlx5_exp_prepare_send.\n", ret); - if (ret == ENOMEM) { - // out of space error can happen too often to report - gds_dbg("ENOMEM error %d in ibv_exp_post_send\n", ret); - } else { - gds_err("error %d in ibv_exp_post_send\n", ret); - } - goto out; - } - - ret = ibv_exp_peer_commit_qp(qp->qp, &request->commit); - if (ret) { - gds_err("error %d in ibv_exp_peer_commit_qp\n", ret); - //gds_wait_kernel(); - goto out; - } -out: return ret; } diff --git a/src/mlx5-exp.cpp b/src/mlx5-exp.cpp index b39fa16..f76cf12 100644 --- a/src/mlx5-exp.cpp +++ b/src/mlx5-exp.cpp @@ -34,10 +34,8 @@ gds_mlx5_exp_cq_t *gds_mlx5_exp_create_cq( int comp_vector, gds_peer *peer, gds_peer_attr *peer_attr, gds_alloc_cq_flags_t flags, struct ibv_exp_res_domain *res_domain) { - struct gds_mlx5_exp_cq_t *gmexpcq = NULL; + gds_mlx5_exp_cq_t *gmexpcq = NULL; ibv_exp_cq_init_attr attr; - gds_peer *peer = NULL; - gds_peer_attr *peer_attr = NULL; int ret = 0; assert(context); @@ -85,9 +83,7 @@ gds_mlx5_exp_qp_t *gds_mlx5_exp_create_qp( gds_mlx5_exp_qp_t *gmexpqp = NULL; struct ibv_qp *qp = NULL; gds_mlx5_exp_cq_t *rx_gmexpcq = NULL, *tx_gmexpcq = NULL; - gds_peer *peer = NULL; - gds_peer_attr *peer_attr = NULL; - struct ibv_qp_init_attr exp_qp_attr = {0,}; + struct ibv_exp_qp_init_attr exp_qp_attr = {0,}; int old_errno = errno; assert(pd); @@ -97,7 +93,7 @@ gds_mlx5_exp_qp_t *gds_mlx5_exp_create_qp( assert(peer_attr); gmexpqp = (gds_mlx5_exp_qp_t *)calloc(1, sizeof(gds_mlx5_exp_qp_t)); - if (!gqp) { + if (!gmexpqp) { gds_err("cannot allocate memory\n"); return NULL; } @@ -105,7 +101,7 @@ gds_mlx5_exp_qp_t *gds_mlx5_exp_create_qp( gmexpqp->gqp.dev_context = context; - gmexpqp->res_domain = gds_create_res_domain(context); + gmexpqp->res_domain = gds_mlx5_exp_create_res_domain(context); if (gmexpqp->res_domain) gds_dbg("using res_domain %p\n", gmexpqp->res_domain); else @@ -145,29 +141,30 @@ gds_mlx5_exp_qp_t *gds_mlx5_exp_create_qp( peer->alloc_flags |= GDS_ALLOC_DBREC_ON_GPU; } - exp_qp_attr = { - .send_cq = tx_gmexpcq->gcq.cq, - .recv_cq = rx_gmexpcq->gcq.cq, - .pd = pd, - .comp_mask = IBV_EXP_QP_INIT_ATTR_PD | IBV_EXP_QP_INIT_ATTR_PEER_DIRECT, - .peer_direct_attrs = peer_attr, - .qp_type = qp_attr->qp_type - }; + exp_qp_attr.send_cq = tx_gmexpcq->gcq.cq; + exp_qp_attr.recv_cq = rx_gmexpcq->gcq.cq; + exp_qp_attr.pd = pd; + exp_qp_attr.comp_mask = IBV_EXP_QP_INIT_ATTR_PD | IBV_EXP_QP_INIT_ATTR_PEER_DIRECT; + exp_qp_attr.peer_direct_attrs = peer_attr; + exp_qp_attr.qp_type = qp_attr->qp_type; assert(sizeof(exp_qp_attr.cap) == sizeof(qp_attr->cap)); memcpy(&exp_qp_attr.cap, &qp_attr->cap, sizeof(qp_attr->cap)); - qp = ibv_exp_create_qp(context, qp_attr); + qp = ibv_exp_create_qp(context, &exp_qp_attr); if (!qp) { ret = EINVAL; gds_err("error in ibv_exp_create_qp\n"); goto err; } + tx_gmexpcq->gcq.cq = qp->send_cq; + rx_gmexpcq->gcq.cq = qp->recv_cq; + gmexpqp->gqp.qp = qp; - gmexpqp->gqp.send_cq = tx_gmexpcq->gcq; - gmexpqp->gqp.recv_cq = rx_gmexpcq->gcq; + gmexpqp->gqp.send_cq = &tx_gmexpcq->gcq; + gmexpqp->gqp.recv_cq = &rx_gmexpcq->gcq; gds_dbg("created gds_mlx5_exp_qp=%p\n", gmexpqp); @@ -256,3 +253,32 @@ int gds_mlx5_exp_destroy_cq(gds_mlx5_exp_cq_t *gmexpcq) return retcode; } + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_prepare_send(gds_mlx5_exp_qp_t *gmexpqp, gds_send_wr *p_ewr, + gds_send_wr **bad_ewr, + gds_send_request_t *request) +{ + int ret = 0; + ret = ibv_post_send(gmexpqp->gqp.qp, p_ewr, bad_ewr); + if (ret) { + + if (ret == ENOMEM) { + // out of space error can happen too often to report + gds_dbg("ENOMEM error %d in ibv_post_send\n", ret); + } else { + gds_err("error %d in ibv_post_send\n", ret); + } + goto out; + } + + ret = ibv_exp_peer_commit_qp(gmexpqp->gqp.qp, &request->commit); + if (ret) { + gds_err("error %d in ibv_exp_peer_commit_qp\n", ret); + goto out; + } +out: + return ret; +} + diff --git a/src/mlx5-exp.hpp b/src/mlx5-exp.hpp index 60bb1fd..d289c83 100644 --- a/src/mlx5-exp.hpp +++ b/src/mlx5-exp.hpp @@ -1,8 +1,15 @@ +#include +#include +#include + #include #include #include +#include "objs.hpp" +#include "utils.hpp" + typedef struct gds_mlx5_exp_cq { gds_cq_t gcq; ibv_exp_res_domain *res_domain; @@ -35,3 +42,7 @@ gds_mlx5_exp_qp_t *gds_mlx5_exp_create_qp( int gds_mlx5_exp_destroy_cq(gds_mlx5_exp_cq_t *gmexpcq); int gds_mlx5_exp_destroy_qp(gds_mlx5_exp_qp_t *gmexpqp); + +int gds_mlx5_exp_prepare_send(gds_mlx5_exp_qp_t *gmexpqp, gds_send_wr *p_ewr, + gds_send_wr **bad_ewr, + gds_send_request_t *request); From 10e164ec794c24df2a997a4f8db69385f7256ca9 Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Fri, 6 Aug 2021 00:56:18 -0400 Subject: [PATCH 14/19] Modified gds_kernel_* applications to fit the new API/structs --- tests/gds_kernel_latency.c | 18 +++++++++--------- tests/gds_kernel_loopback_latency.c | 29 +++++++++++++---------------- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/tests/gds_kernel_latency.c b/tests/gds_kernel_latency.c index 428dadb..04370f5 100644 --- a/tests/gds_kernel_latency.c +++ b/tests/gds_kernel_latency.c @@ -495,7 +495,7 @@ static int pp_wait_cq(struct pingpong_context *ctx, int is_client) { int ret; if (ctx->peersync) { - ret = gds_stream_wait_cq(gpu_stream, &ctx->gds_qp->recv_cq, ctx->consume_rx_cqe); + ret = gds_stream_wait_cq(gpu_stream, ctx->gds_qp->recv_cq, ctx->consume_rx_cqe); } else { if (is_client) { do { @@ -674,7 +674,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin wdesc->descs[k].tag = GDS_TAG_SEND; wdesc->descs[k].send = &wdesc->send_rq; ++k; - ret = gds_prepare_wait_cq(&ctx->gds_qp->send_cq, &wdesc->wait_tx_rq, 0); + ret = gds_prepare_wait_cq(ctx->gds_qp->send_cq, &wdesc->wait_tx_rq, 0); if (ret) { retcode = -ret; break; @@ -683,7 +683,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin wdesc->descs[k].tag = GDS_TAG_WAIT; wdesc->descs[k].wait = &wdesc->wait_tx_rq; ++k; - ret = gds_prepare_wait_cq(&ctx->gds_qp->recv_cq, &wdesc->wait_rx_rq, 0); + ret = gds_prepare_wait_cq(ctx->gds_qp->recv_cq, &wdesc->wait_rx_rq, 0); if (ret) { retcode = -ret; break; @@ -713,14 +713,14 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin retcode = -ret; break; } - ret = gds_stream_wait_cq(gpu_stream, &ctx->gds_qp->send_cq, 0); + ret = gds_stream_wait_cq(gpu_stream, ctx->gds_qp->send_cq, 0); if (ret) { // TODO: rollback gpu send gpu_err("error %d in gds_stream_wait_cq\n", ret); retcode = -ret; break; } - ret = gds_stream_wait_cq(gpu_stream, &ctx->gds_qp->recv_cq, ctx->consume_rx_cqe); + ret = gds_stream_wait_cq(gpu_stream, ctx->gds_qp->recv_cq, ctx->consume_rx_cqe); if (ret) { // TODO: rollback gpu send and wait send_cq gpu_err("[%d] error %d in gds_stream_wait_cq\n", my_rank, ret); @@ -749,7 +749,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin if (ctx->use_desc_apis) { work_desc_t *wdesc = calloc(1, sizeof(*wdesc)); int k = 0; - ret = gds_prepare_wait_cq(&ctx->gds_qp->recv_cq, &wdesc->wait_rx_rq, 0); + ret = gds_prepare_wait_cq(ctx->gds_qp->recv_cq, &wdesc->wait_rx_rq, 0); if (ret) { retcode = -ret; break; @@ -771,7 +771,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin CUCHECK(cuStreamAddCallback(gpu_stream, post_work_cb, wdesc, 0)); } } else if (ctx->peersync) { - ret = gds_stream_wait_cq(gpu_stream, &ctx->gds_qp->recv_cq, ctx->consume_rx_cqe); + ret = gds_stream_wait_cq(gpu_stream, ctx->gds_qp->recv_cq, ctx->consume_rx_cqe); if (ret) { // TODO: rollback gpu send and wait send_cq gpu_err("error %d in gds_stream_wait_cq\n", ret); @@ -804,7 +804,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin wdesc->descs[k].tag = GDS_TAG_SEND; wdesc->descs[k].send = &wdesc->send_rq; ++k; - ret = gds_prepare_wait_cq(&ctx->gds_qp->send_cq, &wdesc->wait_tx_rq, 0); + ret = gds_prepare_wait_cq(ctx->gds_qp->send_cq, &wdesc->wait_tx_rq, 0); if (ret) { retcode = -ret; break; @@ -833,7 +833,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin retcode = -ret; break; } - ret = gds_stream_wait_cq(gpu_stream, &ctx->gds_qp->send_cq, 0); + ret = gds_stream_wait_cq(gpu_stream, ctx->gds_qp->send_cq, 0); if (ret) { // TODO: rollback gpu send gpu_err("error %d in gds_stream_wait_cq\n", ret); diff --git a/tests/gds_kernel_loopback_latency.c b/tests/gds_kernel_loopback_latency.c index b2d209c..f6ccc32 100644 --- a/tests/gds_kernel_loopback_latency.c +++ b/tests/gds_kernel_loopback_latency.c @@ -511,16 +511,15 @@ static int pp_post_send(struct pingpong_context *ctx, uint32_t qpn) .wr_id = PINGPONG_SEND_WRID, .sg_list = &list, .num_sge = 1, - .exp_opcode = IBV_EXP_WR_SEND, - .exp_send_flags = IBV_EXP_SEND_SIGNALED, + .opcode = IBV_WR_SEND, + .send_flags = IBV_SEND_SIGNALED, .wr = { .ud = { .ah = ctx->ah, .remote_qpn = qpn, .remote_qkey = 0x11111111 } - }, - .comp_mask = 0 + } }; gds_send_wr *bad_ewr; return gds_post_send(ctx->gds_qp, &ewr, &bad_ewr); @@ -538,16 +537,15 @@ static int pp_post_gpu_send(struct pingpong_context *ctx, uint32_t qpn, CUstream .wr_id = PINGPONG_SEND_WRID, .sg_list = &list, .num_sge = 1, - .exp_opcode = IBV_EXP_WR_SEND, - .exp_send_flags = IBV_EXP_SEND_SIGNALED, + .opcode = IBV_WR_SEND, + .send_flags = IBV_SEND_SIGNALED, .wr = { .ud = { .ah = ctx->ah, .remote_qpn = qpn, .remote_qkey = 0x11111111 } - }, - .comp_mask = 0 + } }; gds_send_wr *bad_ewr; return gds_stream_queue_send(*p_gpu_stream, ctx->gds_qp, &ewr, &bad_ewr); @@ -565,16 +563,15 @@ static int pp_prepare_gpu_send(struct pingpong_context *ctx, uint32_t qpn, gds_s .wr_id = PINGPONG_SEND_WRID, .sg_list = &list, .num_sge = 1, - .exp_opcode = IBV_EXP_WR_SEND, - .exp_send_flags = IBV_EXP_SEND_SIGNALED, + .opcode = IBV_WR_SEND, + .send_flags = IBV_SEND_SIGNALED, .wr = { .ud = { .ah = ctx->ah, .remote_qpn = qpn, .remote_qkey = 0x11111111 } - }, - .comp_mask = 0 + } }; gds_send_wr *bad_ewr; //printf("gpu_post_send_on_stream\n"); @@ -655,7 +652,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin wdesc->descs[k].send = &wdesc->send_rq; ++k; - ret = gds_prepare_wait_cq(&ctx->gds_qp->send_cq, &wdesc->wait_tx_rq, 0); + ret = gds_prepare_wait_cq(ctx->gds_qp->send_cq, &wdesc->wait_tx_rq, 0); if (ret) { retcode = -ret; break; @@ -665,7 +662,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin wdesc->descs[k].wait = &wdesc->wait_tx_rq; ++k; - ret = gds_prepare_wait_cq(&ctx->gds_qp->recv_cq, &wdesc->wait_rx_rq, 0); + ret = gds_prepare_wait_cq(ctx->gds_qp->recv_cq, &wdesc->wait_rx_rq, 0); if (ret) { retcode = -ret; break; @@ -697,7 +694,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin break; } - ret = gds_stream_wait_cq(gpu_stream_server, &ctx->gds_qp->send_cq, 0); + ret = gds_stream_wait_cq(gpu_stream_server, ctx->gds_qp->send_cq, 0); if (ret) { // TODO: rollback gpu send gpu_err("error %d in gds_stream_wait_cq\n", ret); @@ -705,7 +702,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin break; } - ret = gds_stream_wait_cq(gpu_stream_server, &ctx->gds_qp->recv_cq, ctx->consume_rx_cqe); + ret = gds_stream_wait_cq(gpu_stream_server, ctx->gds_qp->recv_cq, ctx->consume_rx_cqe); if (ret) { // TODO: rollback gpu send and wait send_cq gpu_err("error %d in gds_stream_wait_cq\n", ret); From b1cab0511afe6efe5227e00ed7e790e064884394 Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Wed, 11 Aug 2021 04:06:38 -0400 Subject: [PATCH 15/19] Moved gds_wait_request to gds_mlx5_exp_wait_request and made the former an opaque struct --- include/gdsync/core.h | 7 +++++-- src/mlx5-exp.hpp | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/include/gdsync/core.h b/include/gdsync/core.h index a74299d..30fc977 100644 --- a/include/gdsync/core.h +++ b/include/gdsync/core.h @@ -175,8 +175,11 @@ int gds_stream_post_send_all(CUstream stream, int count, gds_send_request_t *req */ typedef struct gds_wait_request { - struct ibv_exp_peer_peek peek; - struct peer_op_wr wr[GDS_WAIT_INFO_MAX_OPS]; + gds_driver_type_t dtype; + uint8_t pad0[4]; + uint8_t reserved0[40]; + uint8_t reserved1[56 * GDS_WAIT_INFO_MAX_OPS]; + uint8_t pad1[16]; } gds_wait_request_t; /** diff --git a/src/mlx5-exp.hpp b/src/mlx5-exp.hpp index d289c83..a92e0e6 100644 --- a/src/mlx5-exp.hpp +++ b/src/mlx5-exp.hpp @@ -6,6 +6,7 @@ #include #include +#include #include "objs.hpp" #include "utils.hpp" @@ -20,6 +21,18 @@ typedef struct gds_mlx5_exp_qp { ibv_exp_res_domain *res_domain; } gds_mlx5_exp_qp_t; +typedef struct gds_mlx5_exp_wait_request { + gds_driver_type_t dtype; + uint8_t pad0[4]; + struct ibv_exp_peer_peek peek; + struct peer_op_wr wr[GDS_WAIT_INFO_MAX_OPS]; + uint8_t pad1[16]; +} gds_mlx5_exp_wait_request_t; + +static_assert(sizeof(gds_mlx5_exp_wait_request_t) % 64 == 0, "gds_mlx5_exp_wait_request_t must be 64-byte aligned."); +static_assert(sizeof(gds_mlx5_exp_wait_request_t) <= sizeof(gds_wait_request_t), "The size of gds_mlx5_exp_wait_request_t must be less than or equal to that of gds_wait_request_t."); +static_assert(offsetof(gds_mlx5_exp_wait_request_t, dtype) == offsetof(gds_wait_request_t, dtype), "dtype of gds_mlx5_exp_wait_request_t and gds_wait_request_t must be at the same offset."); + static inline gds_mlx5_exp_cq_t *to_gds_mexp_cq(gds_cq_t *gcq) { assert(gcq->dtype == GDS_DRIVER_TYPE_MLX5_EXP); return container_of(gcq, gds_mlx5_exp_cq_t, gcq); @@ -30,6 +43,19 @@ static inline gds_mlx5_exp_qp_t *to_gds_mexp_qp(gds_qp_t *gqp) { return container_of(gqp, gds_mlx5_exp_qp_t, gqp); } +static inline gds_mlx5_exp_wait_request_t *to_gds_mexp_wait_request(gds_wait_request_t *gwreq) { + assert(gwreq->dtype == GDS_DRIVER_TYPE_MLX5_EXP); + return (gds_mlx5_exp_wait_request_t *)(gwreq); +} + +static inline const gds_mlx5_exp_wait_request_t *to_gds_mexp_wait_request(const gds_wait_request_t *gwreq) { + return (const gds_mlx5_exp_wait_request_t *)to_gds_mexp_wait_request((const gds_wait_request_t *)gwreq); +} + +static inline uint32_t gds_mlx5_exp_get_num_wait_request_entries(gds_mlx5_exp_wait_request_t *gmexp_request) { + return gmexp_request->peek.entries; +} + gds_mlx5_exp_cq_t *gds_mlx5_exp_create_cq( struct ibv_context *context, int cqe, void *cq_context, struct ibv_comp_channel *channel, @@ -46,3 +72,12 @@ int gds_mlx5_exp_destroy_qp(gds_mlx5_exp_qp_t *gmexpqp); int gds_mlx5_exp_prepare_send(gds_mlx5_exp_qp_t *gmexpqp, gds_send_wr *p_ewr, gds_send_wr **bad_ewr, gds_send_request_t *request); + +void gds_mlx5_exp_init_wait_request(gds_mlx5_exp_wait_request_t *request, uint32_t offset); +void gds_mlx5_exp_dump_wait_request(gds_mlx5_exp_wait_request_t *request, size_t count); +int gds_mlx5_exp_prepare_wait_cq(gds_mlx5_exp_cq_t *mexpcq, gds_mlx5_exp_wait_request_t *request, int flags); +int gds_mlx5_exp_append_wait_cq(gds_mlx5_exp_wait_request_t *request, uint32_t *dw, uint32_t val); +int gds_mlx5_exp_abort_wait_cq(gds_mlx5_exp_cq_t *gmexpcq, gds_mlx5_exp_wait_request_t *request); +int gds_mlx5_exp_stream_post_wait_descriptor(gds_peer *peer, gds_mlx5_exp_wait_request_t *request, gds_op_list_t ¶ms, int flags); +int gds_mlx5_exp_post_wait_descriptor(gds_mlx5_exp_wait_request_t *request, int flags); +int gds_mlx5_exp_get_wait_descs(gds_mlx5_wait_info_t *mlx5_i, const gds_mlx5_exp_wait_request_t *request); From 17b246f2ec485f97d324a36f6937b8c02dd646a4 Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Wed, 11 Aug 2021 04:07:49 -0400 Subject: [PATCH 16/19] Moved gds_wait_request related functions to mlx5-exp --- src/apis.cpp | 88 ++++++--------- src/gdsync.cpp | 71 +----------- src/mlx5-exp.cpp | 280 +++++++++++++++++++++++++++++++++++++++++++++++ src/mlx5.cpp | 104 +----------------- src/utils.hpp | 2 + 5 files changed, 323 insertions(+), 222 deletions(-) diff --git a/src/apis.cpp b/src/apis.cpp index 0801771..e9ad2ed 100644 --- a/src/apis.cpp +++ b/src/apis.cpp @@ -56,7 +56,7 @@ //----------------------------------------------------------------------------- -static void gds_init_ops(struct peer_op_wr *op, int count) +void gds_init_ops(struct peer_op_wr *op, int count) { int i = count; while (--i) @@ -80,13 +80,15 @@ static void gds_init_send_info(gds_send_request_t *info) static void gds_init_wait_request(gds_wait_request_t *request, uint32_t offset) { + gds_mlx5_exp_wait_request_t *gmexp_request; gds_dbg("wait_request=%p offset=%08x\n", request, offset); memset(request, 0, sizeof(*request)); - request->peek.storage = request->wr; - request->peek.entries = sizeof(request->wr)/sizeof(request->wr[0]); - request->peek.whence = IBV_EXP_PEER_PEEK_ABSOLUTE; - request->peek.offset = offset; - gds_init_ops(request->peek.storage, request->peek.entries); + + request->dtype = GDS_DRIVER_TYPE_MLX5_EXP; + + gmexp_request = to_gds_mexp_wait_request(request); + + gds_mlx5_exp_init_wait_request(gmexp_request, offset); } //----------------------------------------------------------------------------- @@ -273,7 +275,9 @@ int gds_stream_post_send_all(CUstream stream, int count, gds_send_request_t *req int gds_prepare_wait_cq(struct gds_cq *cq, gds_wait_request_t *request, int flags) { - int retcode = 0; + gds_mlx5_exp_cq_t *gmexpcq; + gds_mlx5_exp_wait_request_t *gmexp_request; + if (flags != 0) { gds_err("invalid flags != 0\n"); return EINVAL; @@ -281,51 +285,19 @@ int gds_prepare_wait_cq(struct gds_cq *cq, gds_wait_request_t *request, int flag gds_init_wait_request(request, cq->curr_offset++); - retcode = ibv_exp_peer_peek_cq(cq->cq, &request->peek); - if (retcode == -ENOSPC) { - // TODO: handle too few entries - gds_err("not enough ops in peer_peek_cq\n"); - goto out; - } else if (retcode) { - gds_err("error %d in peer_peek_cq\n", retcode); - goto out; - } - //gds_dump_wait_request(request, 1); - out: - return retcode; + gmexpcq = to_gds_mexp_cq(cq); + gmexp_request = to_gds_mexp_wait_request(request); + + return gds_mlx5_exp_prepare_wait_cq(gmexpcq, gmexp_request, flags); } //----------------------------------------------------------------------------- int gds_append_wait_cq(gds_wait_request_t *request, uint32_t *dw, uint32_t val) { - int ret = 0; - unsigned MAX_NUM_ENTRIES = sizeof(request->wr)/sizeof(request->wr[0]); - unsigned n = request->peek.entries; - struct peer_op_wr *wr = request->peek.storage; - - if (n + 1 > MAX_NUM_ENTRIES) { - gds_err("no space left to stuff a poke\n"); - ret = ENOMEM; - goto out; - } + gds_mlx5_exp_wait_request_t *gmexp_request = to_gds_mexp_wait_request(request); - // at least 1 op - assert(n); - assert(wr); - - for (; n; --n) wr = wr->next; - assert(wr); - - wr->type = IBV_EXP_PEER_OP_STORE_DWORD; - wr->wr.dword_va.data = val; - wr->wr.dword_va.target_id = 0; // direct mapping, offset IS the address - wr->wr.dword_va.offset = (ptrdiff_t)(dw-(uint32_t*)0); - - ++request->peek.entries; - - out: - return ret; + return gds_mlx5_exp_append_wait_cq(gmexp_request, dw, val); } //----------------------------------------------------------------------------- @@ -346,12 +318,16 @@ int gds_stream_post_wait_cq_all(CUstream stream, int count, gds_wait_request_t * static int gds_abort_wait_cq(struct gds_cq *cq, gds_wait_request_t *request) { + gds_mlx5_exp_cq_t *gmexpcq; + gds_mlx5_exp_wait_request_t *gmexp_request; + assert(cq); assert(request); - struct ibv_exp_peer_abort_peek abort_ctx; - abort_ctx.peek_id = request->peek.peek_id; - abort_ctx.comp_mask = 0; - return ibv_exp_peer_abort_peek_cq(cq->cq, &abort_ctx); + + gmexpcq = to_gds_mexp_cq(cq); + gmexp_request = to_gds_mexp_wait_request(request); + + return gds_mlx5_exp_abort_wait_cq(gmexpcq, gmexp_request); } //----------------------------------------------------------------------------- @@ -549,7 +525,7 @@ static int calc_n_mem_ops(size_t n_descs, gds_descriptor_t *descs, size_t &n_mem n_mem_ops += desc->send->commit.entries + 2; // extra space, ugly break; case GDS_TAG_WAIT: - n_mem_ops += desc->wait->peek.entries + 2; // ditto + n_mem_ops += gds_mlx5_exp_get_num_wait_request_entries(to_gds_mexp_wait_request(desc->wait)) + 2; // ditto break; case GDS_TAG_WAIT_VALUE32: case GDS_TAG_WRITE_VALUE32: @@ -618,15 +594,15 @@ int gds_stream_post_descriptors(CUstream stream, size_t n_descs, gds_descriptor_ break; } case GDS_TAG_WAIT: { - gds_wait_request_t *wreq = desc->wait; + gds_mlx5_exp_wait_request_t *wreq = to_gds_mexp_wait_request(desc->wait); int flags = 0; if (move_flush && i != last_wait) { gds_dbg("discarding FLUSH!\n"); flags = GDS_POST_OPS_DISCARD_WAIT_FLUSH; } - retcode = gds_post_ops(peer, wreq->peek.entries, wreq->peek.storage, params, flags); + retcode = gds_mlx5_exp_stream_post_wait_descriptor(peer, wreq, params, flags); if (retcode) { - gds_err("error %d in gds_post_ops\n", retcode); + gds_err("error %d in gds_mlx5_exp_stream_post_wait_descriptor\n", retcode); ret = retcode; goto out; } @@ -697,10 +673,10 @@ int gds_post_descriptors(size_t n_descs, gds_descriptor_t *descs, int flags) } case GDS_TAG_WAIT: { gds_dbg("desc[%zu] WAIT\n", i); - gds_wait_request_t *wreq = desc->wait; - retcode = gds_post_ops_on_cpu(wreq->peek.entries, wreq->peek.storage, flags); + gds_mlx5_exp_wait_request_t *wreq = to_gds_mexp_wait_request(desc->wait); + retcode = gds_mlx5_exp_post_wait_descriptor(wreq, flags); if (retcode) { - gds_err("error %d in gds_post_ops_on_cpu\n", retcode); + gds_err("error %d in gds_mlx5_exp_post_wait_descriptor\n", retcode); ret = retcode; goto out; } diff --git a/src/gdsync.cpp b/src/gdsync.cpp index 23907fe..670bc45 100644 --- a/src/gdsync.cpp +++ b/src/gdsync.cpp @@ -1169,73 +1169,14 @@ int gds_post_pokes_on_cpu(int count, gds_send_request_t *info, uint32_t *dw, uin //----------------------------------------------------------------------------- -static void gds_dump_ops(struct peer_op_wr *op, size_t count) -{ - size_t n = 0; - for (; op; op = op->next, ++n) { - gds_dbg("op[%zu] type:%d\n", n, op->type); - switch(op->type) { - case IBV_EXP_PEER_OP_FENCE: { - gds_dbg("FENCE flags=%" PRIu64 "\n", op->wr.fence.fence_flags); - break; - } - case IBV_EXP_PEER_OP_STORE_DWORD: { - CUdeviceptr dev_ptr = range_from_id(op->wr.dword_va.target_id)->dptr + - op->wr.dword_va.offset; - gds_dbg("STORE_QWORD data:%x target_id:%" PRIx64 " offset:%zu dev_ptr=%llx\n", - op->wr.dword_va.data, op->wr.dword_va.target_id, - op->wr.dword_va.offset, dev_ptr); - break; - } - case IBV_EXP_PEER_OP_STORE_QWORD: { - CUdeviceptr dev_ptr = range_from_id(op->wr.qword_va.target_id)->dptr + - op->wr.qword_va.offset; - gds_dbg("STORE_QWORD data:%" PRIx64 " target_id:%" PRIx64 " offset:%zu dev_ptr=%llx\n", - op->wr.qword_va.data, op->wr.qword_va.target_id, - op->wr.qword_va.offset, dev_ptr); - break; - } - case IBV_EXP_PEER_OP_COPY_BLOCK: { - CUdeviceptr dev_ptr = range_from_id(op->wr.copy_op.target_id)->dptr + - op->wr.copy_op.offset; - gds_dbg("COPY_BLOCK src:%p len:%zu target_id:%" PRIx64 " offset:%zu dev_ptr=%llx\n", - op->wr.copy_op.src, op->wr.copy_op.len, - op->wr.copy_op.target_id, op->wr.copy_op.offset, - dev_ptr); - break; - } - case IBV_EXP_PEER_OP_POLL_AND_DWORD: - case IBV_EXP_PEER_OP_POLL_NOR_DWORD: { - CUdeviceptr dev_ptr = range_from_id(op->wr.dword_va.target_id)->dptr + - op->wr.dword_va.offset; - gds_dbg("%s data:%08x target_id:%" PRIx64 " offset:%zu dev_ptr=%llx\n", - (op->type==IBV_EXP_PEER_OP_POLL_AND_DWORD) ? "POLL_AND_DW" : "POLL_NOR_SDW", - op->wr.dword_va.data, - op->wr.dword_va.target_id, - op->wr.dword_va.offset, - dev_ptr); - break; - } - default: - gds_err("undefined peer op type %d\n", op->type); - break; - } - } - - assert(count == n); -} - -//----------------------------------------------------------------------------- - void gds_dump_wait_request(gds_wait_request_t *request, size_t count) { - for (size_t j=0; jentries, peek->whence, peek->offset, - peek->peek_id, peek->comp_mask); - gds_dump_ops(peek->storage, peek->entries); - } + gds_mlx5_exp_wait_request_t *gmexp_request; + if (count == 0) + return; + + gmexp_request = to_gds_mexp_wait_request(request); + gds_mlx5_exp_dump_wait_request(gmexp_request, count); } //----------------------------------------------------------------------------- diff --git a/src/mlx5-exp.cpp b/src/mlx5-exp.cpp index f76cf12..8d7e939 100644 --- a/src/mlx5-exp.cpp +++ b/src/mlx5-exp.cpp @@ -282,3 +282,283 @@ int gds_mlx5_exp_prepare_send(gds_mlx5_exp_qp_t *gmexpqp, gds_send_wr *p_ewr, return ret; } +//----------------------------------------------------------------------------- + +void gds_mlx5_exp_init_wait_request(gds_mlx5_exp_wait_request_t *request, uint32_t offset) +{ + gds_dbg("wait_request=%p offset=%08x\n", request, offset); + request->peek.storage = request->wr; + request->peek.entries = sizeof(request->wr)/sizeof(request->wr[0]); + request->peek.whence = IBV_EXP_PEER_PEEK_ABSOLUTE; + request->peek.offset = offset; + gds_init_ops(request->peek.storage, request->peek.entries); +} + +//----------------------------------------------------------------------------- + +static void gds_mlx5_exp_dump_ops(struct peer_op_wr *op, size_t count) +{ + size_t n = 0; + for (; op; op = op->next, ++n) { + gds_dbg("op[%zu] type:%d\n", n, op->type); + switch(op->type) { + case IBV_EXP_PEER_OP_FENCE: { + gds_dbg("FENCE flags=%" PRIu64 "\n", op->wr.fence.fence_flags); + break; + } + case IBV_EXP_PEER_OP_STORE_DWORD: { + CUdeviceptr dev_ptr = range_from_id(op->wr.dword_va.target_id)->dptr + + op->wr.dword_va.offset; + gds_dbg("STORE_QWORD data:%x target_id:%" PRIx64 " offset:%zu dev_ptr=%llx\n", + op->wr.dword_va.data, op->wr.dword_va.target_id, + op->wr.dword_va.offset, dev_ptr); + break; + } + case IBV_EXP_PEER_OP_STORE_QWORD: { + CUdeviceptr dev_ptr = range_from_id(op->wr.qword_va.target_id)->dptr + + op->wr.qword_va.offset; + gds_dbg("STORE_QWORD data:%" PRIx64 " target_id:%" PRIx64 " offset:%zu dev_ptr=%llx\n", + op->wr.qword_va.data, op->wr.qword_va.target_id, + op->wr.qword_va.offset, dev_ptr); + break; + } + case IBV_EXP_PEER_OP_COPY_BLOCK: { + CUdeviceptr dev_ptr = range_from_id(op->wr.copy_op.target_id)->dptr + + op->wr.copy_op.offset; + gds_dbg("COPY_BLOCK src:%p len:%zu target_id:%" PRIx64 " offset:%zu dev_ptr=%llx\n", + op->wr.copy_op.src, op->wr.copy_op.len, + op->wr.copy_op.target_id, op->wr.copy_op.offset, + dev_ptr); + break; + } + case IBV_EXP_PEER_OP_POLL_AND_DWORD: + case IBV_EXP_PEER_OP_POLL_NOR_DWORD: { + CUdeviceptr dev_ptr = range_from_id(op->wr.dword_va.target_id)->dptr + + op->wr.dword_va.offset; + gds_dbg("%s data:%08x target_id:%" PRIx64 " offset:%zu dev_ptr=%llx\n", + (op->type==IBV_EXP_PEER_OP_POLL_AND_DWORD) ? "POLL_AND_DW" : "POLL_NOR_SDW", + op->wr.dword_va.data, + op->wr.dword_va.target_id, + op->wr.dword_va.offset, + dev_ptr); + break; + } + default: + gds_err("undefined peer op type %d\n", op->type); + break; + } + } + + assert(count == n); +} + +//----------------------------------------------------------------------------- + +void gds_mlx5_exp_dump_wait_request(gds_mlx5_exp_wait_request_t *request, size_t count) +{ + for (size_t j = 0; j < count; ++j) { + struct ibv_exp_peer_peek *peek = &request[j].peek; + gds_dbg("req[%zu] entries:%u whence:%u offset:%u peek_id:%" PRIx64 " comp_mask:%08x\n", + j, peek->entries, peek->whence, peek->offset, + peek->peek_id, peek->comp_mask); + gds_mlx5_exp_dump_ops(peek->storage, peek->entries); + } +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_prepare_wait_cq(gds_mlx5_exp_cq_t *mexpcq, gds_mlx5_exp_wait_request_t *request, int flags) +{ + int retcode = 0; + + retcode = ibv_exp_peer_peek_cq(mexpcq->gcq.cq, &request->peek); + if (retcode == ENOSPC) { + // TODO: handle too few entries + gds_err("not enough ops in peer_peek_cq\n"); + goto out; + } else if (retcode) { + gds_err("error %d in peer_peek_cq\n", retcode); + goto out; + } + //gds_dump_wait_request(request, 1); + out: + return retcode; +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_append_wait_cq(gds_mlx5_exp_wait_request_t *request, uint32_t *dw, uint32_t val) +{ + int ret = 0; + unsigned MAX_NUM_ENTRIES = sizeof(request->wr) / sizeof(request->wr[0]); + unsigned n = request->peek.entries; + struct peer_op_wr *wr = request->peek.storage; + + if (n + 1 > MAX_NUM_ENTRIES) { + gds_err("no space left to stuff a poke\n"); + ret = ENOMEM; + goto out; + } + + // at least 1 op + assert(n); + assert(wr); + + for (; n; --n) + wr = wr->next; + + assert(wr); + + wr->type = IBV_EXP_PEER_OP_STORE_DWORD; + wr->wr.dword_va.data = val; + wr->wr.dword_va.target_id = 0; // direct mapping, offset IS the address + wr->wr.dword_va.offset = (ptrdiff_t)(dw-(uint32_t*)0); + + ++request->peek.entries; + +out: + return ret; +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_abort_wait_cq(gds_mlx5_exp_cq_t *gmexpcq, gds_mlx5_exp_wait_request_t *request) +{ + struct ibv_exp_peer_abort_peek abort_ctx; + abort_ctx.peek_id = request->peek.peek_id; + abort_ctx.comp_mask = 0; + return ibv_exp_peer_abort_peek_cq(gmexpcq->gcq.cq, &abort_ctx); +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_stream_post_wait_descriptor(gds_peer *peer, gds_mlx5_exp_wait_request_t *request, gds_op_list_t ¶ms, int flags) +{ + int ret = 0; + + ret = gds_post_ops(peer, request->peek.entries, request->peek.storage, params, flags); + if (ret) + gds_err("error %d in gds_post_ops\n", ret); + + return ret; +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_post_wait_descriptor(gds_mlx5_exp_wait_request_t *request, int flags) +{ + int ret = 0; + + ret = gds_post_ops_on_cpu(request->peek.entries, request->peek.storage, flags); + if (ret) + gds_err("error %d in gds_post_ops_on_cpu\n", ret); + + return ret; +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_get_wait_descs(gds_mlx5_wait_info_t *mlx5_i, const gds_mlx5_exp_wait_request_t *request) +{ + int retcode = 0; + size_t n_ops = request->peek.entries; + peer_op_wr *op = request->peek.storage; + size_t n = 0; + + memset(mlx5_i, 0, sizeof(*mlx5_i)); + + for (; op && n < n_ops; op = op->next, ++n) { + switch(op->type) { + case IBV_EXP_PEER_OP_FENCE: { + gds_dbg("OP_FENCE: fence_flags=%" PRIu64 "\n", op->wr.fence.fence_flags); + uint32_t fence_op = (op->wr.fence.fence_flags & (IBV_EXP_PEER_FENCE_OP_READ|IBV_EXP_PEER_FENCE_OP_WRITE)); + uint32_t fence_from = (op->wr.fence.fence_flags & (IBV_EXP_PEER_FENCE_FROM_CPU|IBV_EXP_PEER_FENCE_FROM_HCA)); + uint32_t fence_mem = (op->wr.fence.fence_flags & (IBV_EXP_PEER_FENCE_MEM_SYS|IBV_EXP_PEER_FENCE_MEM_PEER)); + if (fence_op == IBV_EXP_PEER_FENCE_OP_READ) { + gds_dbg("nothing to do for read fences\n"); + break; + } + if (fence_from != IBV_EXP_PEER_FENCE_FROM_HCA) { + gds_err("unexpected from fence\n"); + retcode = EINVAL; + break; + } + gds_err("unsupported fence combination\n"); + retcode = EINVAL; + break; + } + case IBV_EXP_PEER_OP_STORE_DWORD: { + CUdeviceptr dev_ptr = range_from_id(op->wr.dword_va.target_id)->dptr + + op->wr.dword_va.offset; + uint32_t data = op->wr.dword_va.data; + gds_dbg("OP_STORE_DWORD dev_ptr=%" PRIx64 " data=%08x\n", (uint64_t)dev_ptr, data); + if (n != 1) { + gds_err("store DWORD is not 2nd op\n"); + retcode = EINVAL; + break; + } + mlx5_i->flag_ptr = (uint32_t*)dev_ptr; + mlx5_i->flag_value = data; + break; + } + case IBV_EXP_PEER_OP_STORE_QWORD: { + CUdeviceptr dev_ptr = range_from_id(op->wr.qword_va.target_id)->dptr + + op->wr.qword_va.offset; + uint64_t data = op->wr.qword_va.data; + gds_dbg("OP_STORE_QWORD dev_ptr=%" PRIx64 " data=%" PRIx64 "\n", (uint64_t)dev_ptr, (uint64_t)data); + gds_err("unsupported QWORD op\n"); + retcode = EINVAL; + break; + } + case IBV_EXP_PEER_OP_COPY_BLOCK: { + CUdeviceptr dev_ptr = range_from_id(op->wr.copy_op.target_id)->dptr + + op->wr.copy_op.offset; + size_t len = op->wr.copy_op.len; + void *src = op->wr.copy_op.src; + gds_err("unsupported COPY_BLOCK\n"); + retcode = EINVAL; + break; + } + case IBV_EXP_PEER_OP_POLL_AND_DWORD: + case IBV_EXP_PEER_OP_POLL_GEQ_DWORD: + case IBV_EXP_PEER_OP_POLL_NOR_DWORD: { + CUdeviceptr dev_ptr = range_from_id(op->wr.dword_va.target_id)->dptr + + op->wr.dword_va.offset; + uint32_t data = op->wr.dword_va.data; + + gds_dbg("OP_POLL_DWORD dev_ptr=%" PRIx64 " data=%08x\n", (uint64_t)dev_ptr, data); + + mlx5_i->cqe_ptr = (uint32_t *)dev_ptr; + mlx5_i->cqe_value = data; + + switch(op->type) { + case IBV_EXP_PEER_OP_POLL_NOR_DWORD: + // GPU SMs can always do NOR + mlx5_i->cond = GDS_WAIT_COND_NOR; + break; + case IBV_EXP_PEER_OP_POLL_GEQ_DWORD: + mlx5_i->cond = GDS_WAIT_COND_GEQ; + break; + case IBV_EXP_PEER_OP_POLL_AND_DWORD: + mlx5_i->cond = GDS_WAIT_COND_AND; + break; + default: + gds_err("unexpected op type\n"); + retcode = EINVAL; + goto err; + } + break; + } + default: + gds_err("undefined peer op type %d\n", op->type); + retcode = EINVAL; + break; + } + err: + if (retcode) { + gds_err("error in fill func at entry n=%zu\n", n); + break; + } + } + return retcode; +} diff --git a/src/mlx5.cpp b/src/mlx5.cpp index a2c7b39..b026374 100644 --- a/src/mlx5.cpp +++ b/src/mlx5.cpp @@ -40,6 +40,7 @@ //#include "mem.hpp" #include "objs.hpp" #include "utils.hpp" +#include "mlx5-exp.hpp" #if 0 union { uint64_t qw; uint32_t dw[2]; } db_val; @@ -180,107 +181,8 @@ int gds_mlx5_get_send_info(int count, const gds_send_request_t *requests, gds_ml int gds_mlx5_get_wait_descs(gds_mlx5_wait_info_t *mlx5_i, const gds_wait_request_t *request) { - int retcode = 0; - size_t n_ops = request->peek.entries; - peer_op_wr *op = request->peek.storage; - size_t n = 0; - - memset(mlx5_i, 0, sizeof(*mlx5_i)); - - for (; op && n < n_ops; op = op->next, ++n) { - switch(op->type) { - case IBV_EXP_PEER_OP_FENCE: { - gds_dbg("OP_FENCE: fence_flags=%" PRIu64 "\n", op->wr.fence.fence_flags); - uint32_t fence_op = (op->wr.fence.fence_flags & (IBV_EXP_PEER_FENCE_OP_READ|IBV_EXP_PEER_FENCE_OP_WRITE)); - uint32_t fence_from = (op->wr.fence.fence_flags & (IBV_EXP_PEER_FENCE_FROM_CPU|IBV_EXP_PEER_FENCE_FROM_HCA)); - uint32_t fence_mem = (op->wr.fence.fence_flags & (IBV_EXP_PEER_FENCE_MEM_SYS|IBV_EXP_PEER_FENCE_MEM_PEER)); - if (fence_op == IBV_EXP_PEER_FENCE_OP_READ) { - gds_dbg("nothing to do for read fences\n"); - break; - } - if (fence_from != IBV_EXP_PEER_FENCE_FROM_HCA) { - gds_err("unexpected from fence\n"); - retcode = EINVAL; - break; - } - gds_err("unsupported fence combination\n"); - retcode = EINVAL; - break; - } - case IBV_EXP_PEER_OP_STORE_DWORD: { - CUdeviceptr dev_ptr = range_from_id(op->wr.dword_va.target_id)->dptr + - op->wr.dword_va.offset; - uint32_t data = op->wr.dword_va.data; - gds_dbg("OP_STORE_DWORD dev_ptr=%" PRIx64 " data=%08x\n", (uint64_t)dev_ptr, data); - if (n != 1) { - gds_err("store DWORD is not 2nd op\n"); - retcode = EINVAL; - break; - } - mlx5_i->flag_ptr = (uint32_t*)dev_ptr; - mlx5_i->flag_value = data; - break; - } - case IBV_EXP_PEER_OP_STORE_QWORD: { - CUdeviceptr dev_ptr = range_from_id(op->wr.qword_va.target_id)->dptr + - op->wr.qword_va.offset; - uint64_t data = op->wr.qword_va.data; - gds_dbg("OP_STORE_QWORD dev_ptr=%" PRIx64 " data=%" PRIx64 "\n", (uint64_t)dev_ptr, (uint64_t)data); - gds_err("unsupported QWORD op\n"); - retcode = EINVAL; - break; - } - case IBV_EXP_PEER_OP_COPY_BLOCK: { - CUdeviceptr dev_ptr = range_from_id(op->wr.copy_op.target_id)->dptr + - op->wr.copy_op.offset; - size_t len = op->wr.copy_op.len; - void *src = op->wr.copy_op.src; - gds_err("unsupported COPY_BLOCK\n"); - retcode = EINVAL; - break; - } - case IBV_EXP_PEER_OP_POLL_AND_DWORD: - case IBV_EXP_PEER_OP_POLL_GEQ_DWORD: - case IBV_EXP_PEER_OP_POLL_NOR_DWORD: { - CUdeviceptr dev_ptr = range_from_id(op->wr.dword_va.target_id)->dptr + - op->wr.dword_va.offset; - uint32_t data = op->wr.dword_va.data; - - gds_dbg("OP_POLL_DWORD dev_ptr=%" PRIx64 " data=%08x\n", (uint64_t)dev_ptr, data); - - mlx5_i->cqe_ptr = (uint32_t *)dev_ptr; - mlx5_i->cqe_value = data; - - switch(op->type) { - case IBV_EXP_PEER_OP_POLL_NOR_DWORD: - // GPU SMs can always do NOR - mlx5_i->cond = GDS_WAIT_COND_NOR; - break; - case IBV_EXP_PEER_OP_POLL_GEQ_DWORD: - mlx5_i->cond = GDS_WAIT_COND_GEQ; - break; - case IBV_EXP_PEER_OP_POLL_AND_DWORD: - mlx5_i->cond = GDS_WAIT_COND_AND; - break; - default: - gds_err("unexpected op type\n"); - retcode = EINVAL; - goto err; - } - break; - } - default: - gds_err("undefined peer op type %d\n", op->type); - retcode = EINVAL; - break; - } - err: - if (retcode) { - gds_err("error in fill func at entry n=%zu\n", n); - break; - } - } - return retcode; + const gds_mlx5_exp_wait_request_t *gmexp_request = to_gds_mexp_wait_request(request); + return gds_mlx5_exp_get_wait_descs(mlx5_i, gmexp_request); } //----------------------------------------------------------------------------- diff --git a/src/utils.hpp b/src/utils.hpp index 1bacbb6..c5d0774 100644 --- a/src/utils.hpp +++ b/src/utils.hpp @@ -240,6 +240,8 @@ static inline gds_driver_type gds_get_driver_type(struct ibv_device *ibdev) int gds_destroy_cq(struct gds_cq *gcq); +void gds_init_ops(struct peer_op_wr *op, int count); + //----------------------------------------------------------------------------- /* From 6be27ac863da603a2d7928b07e59750b6e02f5d0 Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Thu, 9 Sep 2021 04:41:13 -0400 Subject: [PATCH 17/19] Changed the definition of gds_send_request_t --- include/gdsync/core.h | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/include/gdsync/core.h b/include/gdsync/core.h index 30fc977..fa213d3 100644 --- a/include/gdsync/core.h +++ b/include/gdsync/core.h @@ -161,9 +161,13 @@ enum { */ typedef struct gds_send_request { - struct ibv_exp_peer_commit commit; - struct peer_op_wr wr[GDS_SEND_INFO_MAX_OPS]; + gds_driver_type_t dtype; + uint8_t pad0[4]; + uint8_t reserved0[32]; + uint8_t reserved1[56 * GDS_SEND_INFO_MAX_OPS]; + uint8_t pad1[24]; } gds_send_request_t; +static_assert(sizeof(gds_send_request_t) % 64 == 0, "gds_send_request_t must be 64-byte aligned."); int gds_prepare_send(struct gds_qp *qp, gds_send_wr *p_ewr, gds_send_wr **bad_ewr, gds_send_request_t *request); int gds_stream_post_send(CUstream stream, gds_send_request_t *request); @@ -181,6 +185,7 @@ typedef struct gds_wait_request { uint8_t reserved1[56 * GDS_WAIT_INFO_MAX_OPS]; uint8_t pad1[16]; } gds_wait_request_t; +static_assert(sizeof(gds_wait_request_t) % 64 == 0, "gds_wait_request_t must be 64-byte aligned."); /** * Initializes a wait request out of the next heading CQE, which is kept in From f02657839f251cd89d2ba4bc37e511cc464bfac1 Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Thu, 9 Sep 2021 04:41:56 -0400 Subject: [PATCH 18/19] Added gds_mlx5_exp_send_request_t definition and supported functions --- src/mlx5-exp.hpp | 34 ++++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/src/mlx5-exp.hpp b/src/mlx5-exp.hpp index a92e0e6..7e4ba14 100644 --- a/src/mlx5-exp.hpp +++ b/src/mlx5-exp.hpp @@ -21,6 +21,17 @@ typedef struct gds_mlx5_exp_qp { ibv_exp_res_domain *res_domain; } gds_mlx5_exp_qp_t; +typedef struct gds_mlx5_exp_send_request { + gds_driver_type_t dtype; + uint8_t pad0[4]; + struct ibv_exp_peer_commit commit; + struct peer_op_wr wr[GDS_SEND_INFO_MAX_OPS]; + uint8_t pad1[24]; +} gds_mlx5_exp_send_request_t; +static_assert(sizeof(gds_mlx5_exp_send_request_t) % 64 == 0, "gds_mlx5_exp_send_request_t must be 64-byte aligned."); +static_assert(sizeof(gds_mlx5_exp_send_request_t) <= sizeof(gds_send_request_t), "The size of gds_mlx5_exp_send_request_t must be less than or equal to that of gds_send_request_t."); +static_assert(offsetof(gds_mlx5_exp_send_request_t, dtype) == offsetof(gds_send_request_t, dtype), "dtype of gds_mlx5_exp_send_request_t and gds_send_request_t must be at the same offset."); + typedef struct gds_mlx5_exp_wait_request { gds_driver_type_t dtype; uint8_t pad0[4]; @@ -28,7 +39,6 @@ typedef struct gds_mlx5_exp_wait_request { struct peer_op_wr wr[GDS_WAIT_INFO_MAX_OPS]; uint8_t pad1[16]; } gds_mlx5_exp_wait_request_t; - static_assert(sizeof(gds_mlx5_exp_wait_request_t) % 64 == 0, "gds_mlx5_exp_wait_request_t must be 64-byte aligned."); static_assert(sizeof(gds_mlx5_exp_wait_request_t) <= sizeof(gds_wait_request_t), "The size of gds_mlx5_exp_wait_request_t must be less than or equal to that of gds_wait_request_t."); static_assert(offsetof(gds_mlx5_exp_wait_request_t, dtype) == offsetof(gds_wait_request_t, dtype), "dtype of gds_mlx5_exp_wait_request_t and gds_wait_request_t must be at the same offset."); @@ -43,6 +53,15 @@ static inline gds_mlx5_exp_qp_t *to_gds_mexp_qp(gds_qp_t *gqp) { return container_of(gqp, gds_mlx5_exp_qp_t, gqp); } +static inline gds_mlx5_exp_send_request_t *to_gds_mexp_send_request(gds_send_request_t *gsreq) { + assert(gsreq->dtype == GDS_DRIVER_TYPE_MLX5_EXP); + return (gds_mlx5_exp_send_request_t *)(gsreq); +} + +static inline const gds_mlx5_exp_send_request_t *to_gds_mexp_send_request(const gds_send_request_t *gsreq) { + return (const gds_mlx5_exp_send_request_t *)to_gds_mexp_send_request((const gds_send_request_t *)gsreq); +} + static inline gds_mlx5_exp_wait_request_t *to_gds_mexp_wait_request(gds_wait_request_t *gwreq) { assert(gwreq->dtype == GDS_DRIVER_TYPE_MLX5_EXP); return (gds_mlx5_exp_wait_request_t *)(gwreq); @@ -56,6 +75,10 @@ static inline uint32_t gds_mlx5_exp_get_num_wait_request_entries(gds_mlx5_exp_wa return gmexp_request->peek.entries; } +static inline uint32_t gds_mlx5_exp_get_num_send_request_entries(gds_mlx5_exp_send_request_t *gmexp_request) { + return gmexp_request->commit.entries; +} + gds_mlx5_exp_cq_t *gds_mlx5_exp_create_cq( struct ibv_context *context, int cqe, void *cq_context, struct ibv_comp_channel *channel, @@ -71,7 +94,12 @@ int gds_mlx5_exp_destroy_qp(gds_mlx5_exp_qp_t *gmexpqp); int gds_mlx5_exp_prepare_send(gds_mlx5_exp_qp_t *gmexpqp, gds_send_wr *p_ewr, gds_send_wr **bad_ewr, - gds_send_request_t *request); + gds_mlx5_exp_send_request_t *request); + + +void gds_mlx5_exp_init_send_info(gds_mlx5_exp_send_request_t *info); +int gds_mlx5_exp_post_send_ops(gds_peer *peer, gds_mlx5_exp_send_request_t *info, gds_op_list_t &ops); +int gds_mlx5_exp_post_send_ops_on_cpu(gds_mlx5_exp_send_request_t *info, int flags = 0); void gds_mlx5_exp_init_wait_request(gds_mlx5_exp_wait_request_t *request, uint32_t offset); void gds_mlx5_exp_dump_wait_request(gds_mlx5_exp_wait_request_t *request, size_t count); @@ -81,3 +109,5 @@ int gds_mlx5_exp_abort_wait_cq(gds_mlx5_exp_cq_t *gmexpcq, gds_mlx5_exp_wait_req int gds_mlx5_exp_stream_post_wait_descriptor(gds_peer *peer, gds_mlx5_exp_wait_request_t *request, gds_op_list_t ¶ms, int flags); int gds_mlx5_exp_post_wait_descriptor(gds_mlx5_exp_wait_request_t *request, int flags); int gds_mlx5_exp_get_wait_descs(gds_mlx5_wait_info_t *mlx5_i, const gds_mlx5_exp_wait_request_t *request); + +int gds_mlx5_exp_rollback_qp(gds_mlx5_exp_qp_t *gmexpqp, gds_mlx5_exp_send_request_t *send_info); From 92b09f04923211e3bf6f968f369e4c4b3bc38dd6 Mon Sep 17 00:00:00 2001 From: Pak Markthub Date: Thu, 9 Sep 2021 04:42:33 -0400 Subject: [PATCH 19/19] Moved APIs that use gds_send_request_t to mlx5-exp.cpp --- src/apis.cpp | 58 ++++++++++++++++++------------------------------ src/gdsync.cpp | 6 +++-- src/mlx5-exp.cpp | 54 +++++++++++++++++++++++++++++++++++++++++++- src/mlx5.cpp | 3 ++- 4 files changed, 81 insertions(+), 40 deletions(-) diff --git a/src/apis.cpp b/src/apis.cpp index e9ad2ed..23577b0 100644 --- a/src/apis.cpp +++ b/src/apis.cpp @@ -68,12 +68,15 @@ void gds_init_ops(struct peer_op_wr *op, int count) static void gds_init_send_info(gds_send_request_t *info) { + gds_mlx5_exp_send_request_t *gmexp_info; gds_dbg("send_request=%p\n", info); memset(info, 0, sizeof(*info)); - info->commit.storage = info->wr; - info->commit.entries = sizeof(info->wr)/sizeof(info->wr[0]); - gds_init_ops(info->commit.storage, info->commit.entries); + info->dtype = GDS_DRIVER_TYPE_MLX5_EXP; + + gmexp_info = to_gds_mexp_send_request(info); + + gds_mlx5_exp_init_send_info(gmexp_info); } //----------------------------------------------------------------------------- @@ -93,37 +96,18 @@ static void gds_init_wait_request(gds_wait_request_t *request, uint32_t offset) //----------------------------------------------------------------------------- -static int gds_rollback_qp(struct gds_qp *qp, gds_send_request_t * send_info, enum ibv_exp_rollback_flags flag) +static int gds_rollback_qp(struct gds_qp *qp, gds_send_request_t *send_info) { - struct ibv_exp_rollback_ctx rollback; - int ret=0; + gds_mlx5_exp_qp_t *gmexpqp; + gds_mlx5_exp_send_request_t *gmexp_sreq; assert(qp); - assert(qp->qp); assert(send_info); - if( - flag != IBV_EXP_ROLLBACK_ABORT_UNCOMMITED && - flag != IBV_EXP_ROLLBACK_ABORT_LATE - ) - { - gds_err("erroneous ibv_exp_rollback_flags flag input value\n"); - ret=EINVAL; - goto out; - } - - /* from ibv_exp_peer_commit call */ - rollback.rollback_id = send_info->commit.rollback_id; - /* from ibv_exp_rollback_flag */ - rollback.flags = flag; - /* Reserved for future expensions, must be 0 */ - rollback.comp_mask = 0; - gds_warn("Need to rollback WQE %lx\n", rollback.rollback_id); - ret = ibv_exp_rollback_qp(qp->qp, &rollback); - if(ret) - gds_err("error %d in ibv_exp_rollback_qp\n", ret); -out: - return ret; + gmexpqp = to_gds_mexp_qp(qp); + gmexp_sreq = to_gds_mexp_send_request(send_info); + + return gds_mlx5_exp_rollback_qp(gmexpqp, gmexp_sreq); } //----------------------------------------------------------------------------- @@ -141,7 +125,7 @@ int gds_post_send(struct gds_qp *qp, gds_send_wr *p_ewr, gds_send_wr **bad_ewr) ret = gds_post_pokes_on_cpu(1, &send_info, NULL, 0); if (ret) { gds_err("error %d in gds_post_pokes_on_cpu\n", ret); - ret_roll = gds_rollback_qp(qp, &send_info, IBV_EXP_ROLLBACK_ABORT_LATE); + ret_roll = gds_rollback_qp(qp, &send_info); if (ret_roll) { gds_err("error %d in gds_rollback_qp\n", ret_roll); } @@ -180,6 +164,7 @@ int gds_prepare_send(struct gds_qp *gqp, gds_send_wr *p_ewr, { int ret = 0; gds_mlx5_exp_qp_t *gmexpqp; + gds_mlx5_exp_send_request_t *sreq; gds_init_send_info(request); assert(gqp); @@ -187,8 +172,9 @@ int gds_prepare_send(struct gds_qp *gqp, gds_send_wr *p_ewr, assert(gqp->dtype == GDS_DRIVER_TYPE_MLX5_EXP); gmexpqp = to_gds_mexp_qp(gqp); + sreq = to_gds_mexp_send_request(request); - ret = gds_mlx5_exp_prepare_send(gmexpqp, p_ewr, bad_ewr, request); + ret = gds_mlx5_exp_prepare_send(gmexpqp, p_ewr, bad_ewr, sreq); if (ret) gds_err("Error %d in gds_mlx5_exp_prepare_send.\n", ret); @@ -522,7 +508,7 @@ static int calc_n_mem_ops(size_t n_descs, gds_descriptor_t *descs, size_t &n_mem gds_descriptor_t *desc = descs + i; switch(desc->tag) { case GDS_TAG_SEND: - n_mem_ops += desc->send->commit.entries + 2; // extra space, ugly + n_mem_ops += gds_mlx5_exp_get_num_send_request_entries(to_gds_mexp_send_request(desc->send)) + 2; // extra space, ugly break; case GDS_TAG_WAIT: n_mem_ops += gds_mlx5_exp_get_num_wait_request_entries(to_gds_mexp_wait_request(desc->wait)) + 2; // ditto @@ -584,8 +570,8 @@ int gds_stream_post_descriptors(CUstream stream, size_t n_descs, gds_descriptor_ gds_descriptor_t *desc = descs + i; switch(desc->tag) { case GDS_TAG_SEND: { - gds_send_request_t *sreq = desc->send; - retcode = gds_post_ops(peer, sreq->commit.entries, sreq->commit.storage, params); + gds_mlx5_exp_send_request_t *sreq = to_gds_mexp_send_request(desc->send); + retcode = gds_mlx5_exp_post_send_ops(peer, sreq, params); if (retcode) { gds_err("error %d in gds_post_ops\n", retcode); ret = retcode; @@ -662,8 +648,8 @@ int gds_post_descriptors(size_t n_descs, gds_descriptor_t *descs, int flags) switch(desc->tag) { case GDS_TAG_SEND: { gds_dbg("desc[%zu] SEND\n", i); - gds_send_request_t *sreq = desc->send; - retcode = gds_post_ops_on_cpu(sreq->commit.entries, sreq->commit.storage, flags); + gds_mlx5_exp_send_request_t *sreq = to_gds_mexp_send_request(desc->send); + retcode = gds_mlx5_exp_post_send_ops_on_cpu(sreq, flags); if (retcode) { gds_err("error %d in gds_post_ops_on_cpu\n", retcode); ret = retcode; diff --git a/src/gdsync.cpp b/src/gdsync.cpp index 670bc45..9f1ddde 100644 --- a/src/gdsync.cpp +++ b/src/gdsync.cpp @@ -997,8 +997,9 @@ int gds_post_pokes(CUstream stream, int count, gds_send_request_t *info, uint32_ } for (int j=0; jgqp.qp, p_ewr, bad_ewr); @@ -284,6 +284,31 @@ int gds_mlx5_exp_prepare_send(gds_mlx5_exp_qp_t *gmexpqp, gds_send_wr *p_ewr, //----------------------------------------------------------------------------- +void gds_mlx5_exp_init_send_info(gds_mlx5_exp_send_request_t *info) +{ + gds_dbg("send_request=%p\n", info); + + info->commit.storage = info->wr; + info->commit.entries = sizeof(info->wr)/sizeof(info->wr[0]); + gds_init_ops(info->commit.storage, info->commit.entries); +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_post_send_ops(gds_peer *peer, gds_mlx5_exp_send_request_t *info, gds_op_list_t &ops) +{ + return gds_post_ops(peer, info->commit.entries, info->commit.storage, ops, 0); +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_post_send_ops_on_cpu(gds_mlx5_exp_send_request_t *info, int flags) +{ + return gds_post_ops_on_cpu(info->commit.entries, info->commit.storage, flags); +} + +//----------------------------------------------------------------------------- + void gds_mlx5_exp_init_wait_request(gds_mlx5_exp_wait_request_t *request, uint32_t offset) { gds_dbg("wait_request=%p offset=%08x\n", request, offset); @@ -562,3 +587,30 @@ int gds_mlx5_exp_get_wait_descs(gds_mlx5_wait_info_t *mlx5_i, const gds_mlx5_exp } return retcode; } + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_rollback_qp(gds_mlx5_exp_qp_t *gmexpqp, gds_mlx5_exp_send_request_t *send_info) +{ + struct ibv_exp_rollback_ctx rollback; + int ret = 0; + enum ibv_exp_rollback_flags flag = IBV_EXP_ROLLBACK_ABORT_LATE; + + assert(gmexpqp); + assert(gmexpqp->gqp.qp); + assert(send_info); + + /* from ibv_exp_peer_commit call */ + rollback.rollback_id = send_info->commit.rollback_id; + /* from ibv_exp_rollback_flag */ + rollback.flags = flag; + /* Reserved for future expensions, must be 0 */ + rollback.comp_mask = 0; + gds_warn("Need to rollback WQE %lx\n", rollback.rollback_id); + ret = ibv_exp_rollback_qp(gmexpqp->gqp.qp, &rollback); + if (ret) + gds_err("error %d in ibv_exp_rollback_qp\n", ret); + +out: + return ret; +} diff --git a/src/mlx5.cpp b/src/mlx5.cpp index b026374..526f544 100644 --- a/src/mlx5.cpp +++ b/src/mlx5.cpp @@ -52,9 +52,10 @@ //----------------------------------------------------------------------------- -int gds_mlx5_get_send_descs(gds_mlx5_send_info_t *mlx5_i, const gds_send_request_t *request) +int gds_mlx5_get_send_descs(gds_mlx5_send_info_t *mlx5_i, const gds_send_request_t *_request) { int retcode = 0; + const gds_mlx5_exp_send_request_t *request = to_gds_mexp_send_request(_request); size_t n_ops = request->commit.entries; peer_op_wr *op = request->commit.storage; size_t n = 0;