diff --git a/Makefile.am b/Makefile.am index a66dfda..27fce57 100644 --- a/Makefile.am +++ b/Makefile.am @@ -20,13 +20,13 @@ EXTRA_DIST = autogen.sh include_HEADERS = include/gdsync.h libgdsyncincludedir = $(includedir)/gdsync -libgdsyncinclude_HEADERS = include/gdsync/core.h include/gdsync/device.cuh include/gdsync/mlx5.h include/gdsync/tools.h +libgdsyncinclude_HEADERS = include/gdsync/core.h include/gdsync/device.cuh include/gdsync/mlx5.h include/gdsync/tools.h src_libgdsync_la_CFLAGS = $(AM_CFLAGS) -src_libgdsync_la_SOURCES = src/gdsync.cpp src/memmgr.cpp src/mem.cpp src/objs.cpp src/apis.cpp src/mlx5.cpp include/gdsync.h +src_libgdsync_la_SOURCES = src/gdsync.cpp src/memmgr.cpp src/mem.cpp src/objs.cpp src/apis.cpp src/mlx5.cpp src/mlx5-exp.cpp include/gdsync.h src_libgdsync_la_LDFLAGS = -version-info @VERSION_INFO@ -noinst_HEADERS = src/mem.hpp src/memmgr.hpp src/objs.hpp src/rangeset.hpp src/utils.hpp src/archutils.h src/mlnxutils.h +noinst_HEADERS = src/mem.hpp src/memmgr.hpp src/objs.hpp src/rangeset.hpp src/utils.hpp src/archutils.h src/mlnxutils.h src/mlx5-exp.hpp # if enabled at configure time @@ -36,7 +36,7 @@ bin_PROGRAMS = tests/gds_kernel_latency tests/gds_poll_lat tests/gds_kernel_loop noinst_PROGRAMS = tests/rstest tests/wqtest tests_gds_kernel_latency_SOURCES = tests/gds_kernel_latency.c tests/gpu_kernels.cu tests/pingpong.c tests/gpu.cpp -tests_gds_kernel_latency_LDADD = $(top_builddir)/src/libgdsync.la -lmpi $(LIBGDSTOOLS) -lgdrapi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS) +tests_gds_kernel_latency_LDADD = $(top_builddir)/src/libgdsync.la $(MPILDFLAGS) $(LIBGDSTOOLS) -lgdrapi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS) tests_rstest_SOURCES = tests/rstest.cpp tests_rstest_LDADD = @@ -45,10 +45,10 @@ tests_wqtest_SOURCES = tests/task_queue_test.cpp tests_wqtest_LDADD = $(PTHREAD_LIBS) tests_gds_poll_lat_SOURCES = tests/gds_poll_lat.c tests/gpu.cpp tests/gpu_kernels.cu -tests_gds_poll_lat_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi -lmpi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS) +tests_gds_poll_lat_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi $(MPILDFLAGS) $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS) tests_gds_sanity_SOURCES = tests/gds_sanity.cpp tests/gpu.cpp tests/gpu_kernels.cu -tests_gds_sanity_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi -lmpi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS) +tests_gds_sanity_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi $(MPILDFLAGS) $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS) tests_gds_kernel_loopback_latency_SOURCES = tests/gds_kernel_loopback_latency.c tests/pingpong.c tests/gpu.cpp tests/gpu_kernels.cu tests_gds_kernel_loopback_latency_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS) diff --git a/configure.ac b/configure.ac index a79aed6..e20f313 100644 --- a/configure.ac +++ b/configure.ac @@ -93,25 +93,54 @@ else AC_SUBST(LIBGDSTOOLS) fi -AC_ARG_WITH([mpi], - AC_HELP_STRING([--with-mpi], [ Set path to mpi installation ])) -if test x$with_mpi = x || test x$with_mpi = xno; then +AC_ARG_WITH([spectrum-mpi], + AC_HELP_STRING([--with-spectrum-mpi], [ Set path to Spectrum MPI installation ])) +if test x$with_spectrum_mpi = x || test x$with_spectrum_mpi = xno; then # assuming system location mpi_home=/usr - MPICC=$with_home/bin/mpicc - MPICXX=$with_home/bin/mpic++ + MPICC=/bin/mpicc + MPICXX=/bin/mpic++ + MPILDFLAGS="-lmpi_ibm" else - if test -d $with_mpi; then - mpi_home=$with_mpi + if test -d $with_spectrum_mpi; then + mpi_home=$with_spectrum_mpi MPICC=${mpi_home}/bin/mpicc MPICXX=${mpi_home}/bin/mpic++ CPPFLAGS="$CPPFLAGS -I${mpi_home}/include" LDFLAGS="$LDFLAGS -L${mpi_home}/lib -L${mpi_home}/lib64" + MPILDFLAGS="-lmpi_ibm" else echo "MPI dir does not exist" fi fi +AC_ARG_WITH([mpi], + AC_HELP_STRING([--with-mpi], [ Set path to MPI installation ])) +if test x$with_spectrum_mpi = x || test x$with_spectrum_mpi == xno; then + if test x$with_mpi = x || test x$with_mpi = xno; then + # assuming system location + mpi_home=/usr + MPICC=/bin/mpicc + MPICXX=/bin/mpic++ + MPILDFLAGS="-lmpi" + else + if test -d $with_mpi; then + mpi_home=$with_mpi + MPICC=${mpi_home}/bin/mpicc + MPICXX=${mpi_home}/bin/mpic++ + CPPFLAGS="$CPPFLAGS -I${mpi_home}/include" + LDFLAGS="$LDFLAGS -L${mpi_home}/lib -L${mpi_home}/lib64" + MPILDFLAGS="-lmpi" + else + echo "MPI dir does not exist" + fi + fi +fi + +if test x$with_spectrum_mpi != x && test x$with_spectrum_mpi != xno && test x$with_mpi != x && test x$with_mpi != xno; then + AC_MSG_ERROR([--with-mpi and --with-spectrum-mpi are mutually exclusive.]) +fi + dnl Specify CUDA Location AC_ARG_WITH(cuda-toolkit, AC_HELP_STRING([--with-cuda-toolkit=CUDATKDIR], [ Specify CUDA toolkit installation directory (default: /usr/local/cuda)]), @@ -186,6 +215,7 @@ AC_MSG_NOTICE([Setting MPI_PATH = ${mpi_home} ]) AC_SUBST( MPI_PATH, [${mpi_home} ]) AC_SUBST( MPICC, [${MPICC} ]) AC_SUBST( MPICXX, [${MPICXX} ]) +AC_SUBST( MPILDFLAGS, [${MPILDFLAGS} ]) CPPFLAGS="$CPPFLAGS -I$CUDA_DRV_PATH/include -I$CUDA_PATH/include" LDFLAGS="$LDFLAGS -L$CUDA_DRV_PATH/lib64 -L$CUDA_DRV_PATH/lib -L$CUDA_PATH/lib64 -L$CUDA_PATH/lib" diff --git a/include/gdsync/core.h b/include/gdsync/core.h index 7ff0cbb..fa213d3 100644 --- a/include/gdsync/core.h +++ b/include/gdsync/core.h @@ -40,35 +40,43 @@ ((((v) & 0x0000ffffU) >> 0 ) >= (unsigned)GDS_API_MINOR_VERSION) ) typedef enum gds_param { - GDS_PARAM_VERSION, - GDS_NUM_PARAMS + GDS_PARAM_VERSION, + GDS_NUM_PARAMS } gds_param_t; int gds_query_param(gds_param_t param, int *value); enum gds_create_qp_flags { - GDS_CREATE_QP_DEFAULT = 0, - GDS_CREATE_QP_WQ_ON_GPU = 1<<0, - GDS_CREATE_QP_TX_CQ_ON_GPU = 1<<1, - GDS_CREATE_QP_RX_CQ_ON_GPU = 1<<2, - GDS_CREATE_QP_WQ_DBREC_ON_GPU = 1<<5, + GDS_CREATE_QP_DEFAULT = 0, + GDS_CREATE_QP_WQ_ON_GPU = 1<<0, + GDS_CREATE_QP_TX_CQ_ON_GPU = 1<<1, + GDS_CREATE_QP_RX_CQ_ON_GPU = 1<<2, + GDS_CREATE_QP_WQ_DBREC_ON_GPU = 1<<5, }; -typedef struct ibv_exp_qp_init_attr gds_qp_init_attr_t; -typedef struct ibv_exp_send_wr gds_send_wr; +typedef struct ibv_qp_init_attr gds_qp_init_attr_t; +typedef struct ibv_send_wr gds_send_wr; -struct gds_cq { +typedef enum gds_driver_type { + GDS_DRIVER_TYPE_UNSUPPORTED = 0, + GDS_DRIVER_TYPE_MLX5_EXP, + GDS_DRIVER_TYPE_MLX5_DV, + GDS_DRIVER_TYPE_MLX5_DEVX +} gds_driver_type_t; + +typedef struct gds_cq { struct ibv_cq *cq; uint32_t curr_offset; -}; + gds_driver_type_t dtype; +} gds_cq_t; -struct gds_qp { +typedef struct gds_qp { struct ibv_qp *qp; - struct gds_cq send_cq; - struct gds_cq recv_cq; - struct ibv_exp_res_domain * res_domain; + struct gds_cq *send_cq; + struct gds_cq *recv_cq; struct ibv_context *dev_context; -}; + gds_driver_type_t dtype; +} gds_qp_t; /* \brief: Create a peer-enabled QP attached to the specified GPU id. * @@ -153,9 +161,13 @@ enum { */ typedef struct gds_send_request { - struct ibv_exp_peer_commit commit; - struct peer_op_wr wr[GDS_SEND_INFO_MAX_OPS]; + gds_driver_type_t dtype; + uint8_t pad0[4]; + uint8_t reserved0[32]; + uint8_t reserved1[56 * GDS_SEND_INFO_MAX_OPS]; + uint8_t pad1[24]; } gds_send_request_t; +static_assert(sizeof(gds_send_request_t) % 64 == 0, "gds_send_request_t must be 64-byte aligned."); int gds_prepare_send(struct gds_qp *qp, gds_send_wr *p_ewr, gds_send_wr **bad_ewr, gds_send_request_t *request); int gds_stream_post_send(CUstream stream, gds_send_request_t *request); @@ -167,9 +179,13 @@ int gds_stream_post_send_all(CUstream stream, int count, gds_send_request_t *req */ typedef struct gds_wait_request { - struct ibv_exp_peer_peek peek; - struct peer_op_wr wr[GDS_WAIT_INFO_MAX_OPS]; + gds_driver_type_t dtype; + uint8_t pad0[4]; + uint8_t reserved0[40]; + uint8_t reserved1[56 * GDS_WAIT_INFO_MAX_OPS]; + uint8_t pad1[16]; } gds_wait_request_t; +static_assert(sizeof(gds_wait_request_t) % 64 == 0, "gds_wait_request_t must be 64-byte aligned."); /** * Initializes a wait request out of the next heading CQE, which is kept in diff --git a/src/apis.cpp b/src/apis.cpp index cd532d7..23577b0 100644 --- a/src/apis.cpp +++ b/src/apis.cpp @@ -51,11 +51,12 @@ #include "utils.hpp" #include "archutils.h" #include "mlnxutils.h" +#include "mlx5-exp.hpp" //----------------------------------------------------------------------------- -static void gds_init_ops(struct peer_op_wr *op, int count) +void gds_init_ops(struct peer_op_wr *op, int count) { int i = count; while (--i) @@ -67,60 +68,46 @@ static void gds_init_ops(struct peer_op_wr *op, int count) static void gds_init_send_info(gds_send_request_t *info) { + gds_mlx5_exp_send_request_t *gmexp_info; gds_dbg("send_request=%p\n", info); memset(info, 0, sizeof(*info)); - info->commit.storage = info->wr; - info->commit.entries = sizeof(info->wr)/sizeof(info->wr[0]); - gds_init_ops(info->commit.storage, info->commit.entries); + info->dtype = GDS_DRIVER_TYPE_MLX5_EXP; + + gmexp_info = to_gds_mexp_send_request(info); + + gds_mlx5_exp_init_send_info(gmexp_info); } //----------------------------------------------------------------------------- static void gds_init_wait_request(gds_wait_request_t *request, uint32_t offset) { + gds_mlx5_exp_wait_request_t *gmexp_request; gds_dbg("wait_request=%p offset=%08x\n", request, offset); memset(request, 0, sizeof(*request)); - request->peek.storage = request->wr; - request->peek.entries = sizeof(request->wr)/sizeof(request->wr[0]); - request->peek.whence = IBV_EXP_PEER_PEEK_ABSOLUTE; - request->peek.offset = offset; - gds_init_ops(request->peek.storage, request->peek.entries); + + request->dtype = GDS_DRIVER_TYPE_MLX5_EXP; + + gmexp_request = to_gds_mexp_wait_request(request); + + gds_mlx5_exp_init_wait_request(gmexp_request, offset); } //----------------------------------------------------------------------------- -static int gds_rollback_qp(struct gds_qp *qp, gds_send_request_t * send_info, enum ibv_exp_rollback_flags flag) +static int gds_rollback_qp(struct gds_qp *qp, gds_send_request_t *send_info) { - struct ibv_exp_rollback_ctx rollback; - int ret=0; + gds_mlx5_exp_qp_t *gmexpqp; + gds_mlx5_exp_send_request_t *gmexp_sreq; assert(qp); - assert(qp->qp); assert(send_info); - if( - flag != IBV_EXP_ROLLBACK_ABORT_UNCOMMITED && - flag != IBV_EXP_ROLLBACK_ABORT_LATE - ) - { - gds_err("erroneous ibv_exp_rollback_flags flag input value\n"); - ret=EINVAL; - goto out; - } - - /* from ibv_exp_peer_commit call */ - rollback.rollback_id = send_info->commit.rollback_id; - /* from ibv_exp_rollback_flag */ - rollback.flags = flag; - /* Reserved for future expensions, must be 0 */ - rollback.comp_mask = 0; - gds_warn("Need to rollback WQE %lx\n", rollback.rollback_id); - ret = ibv_exp_rollback_qp(qp->qp, &rollback); - if(ret) - gds_err("error %d in ibv_exp_rollback_qp\n", ret); -out: - return ret; + gmexpqp = to_gds_mexp_qp(qp); + gmexp_sreq = to_gds_mexp_send_request(send_info); + + return gds_mlx5_exp_rollback_qp(gmexpqp, gmexp_sreq); } //----------------------------------------------------------------------------- @@ -138,7 +125,7 @@ int gds_post_send(struct gds_qp *qp, gds_send_wr *p_ewr, gds_send_wr **bad_ewr) ret = gds_post_pokes_on_cpu(1, &send_info, NULL, 0); if (ret) { gds_err("error %d in gds_post_pokes_on_cpu\n", ret); - ret_roll = gds_rollback_qp(qp, &send_info, IBV_EXP_ROLLBACK_ABORT_LATE); + ret_roll = gds_rollback_qp(qp, &send_info); if (ret_roll) { gds_err("error %d in gds_rollback_qp\n", ret_roll); } @@ -171,33 +158,26 @@ int gds_post_recv(struct gds_qp *qp, struct ibv_recv_wr *wr, struct ibv_recv_wr //----------------------------------------------------------------------------- -int gds_prepare_send(struct gds_qp *qp, gds_send_wr *p_ewr, +int gds_prepare_send(struct gds_qp *gqp, gds_send_wr *p_ewr, gds_send_wr **bad_ewr, gds_send_request_t *request) { int ret = 0; + gds_mlx5_exp_qp_t *gmexpqp; + gds_mlx5_exp_send_request_t *sreq; + gds_init_send_info(request); - assert(qp); - assert(qp->qp); - ret = ibv_exp_post_send(qp->qp, p_ewr, bad_ewr); - if (ret) { + assert(gqp); + assert(gqp->qp); + assert(gqp->dtype == GDS_DRIVER_TYPE_MLX5_EXP); + + gmexpqp = to_gds_mexp_qp(gqp); + sreq = to_gds_mexp_send_request(request); + + ret = gds_mlx5_exp_prepare_send(gmexpqp, p_ewr, bad_ewr, sreq); + if (ret) + gds_err("Error %d in gds_mlx5_exp_prepare_send.\n", ret); - if (ret == ENOMEM) { - // out of space error can happen too often to report - gds_dbg("ENOMEM error %d in ibv_exp_post_send\n", ret); - } else { - gds_err("error %d in ibv_exp_post_send\n", ret); - } - goto out; - } - - ret = ibv_exp_peer_commit_qp(qp->qp, &request->commit); - if (ret) { - gds_err("error %d in ibv_exp_peer_commit_qp\n", ret); - //gds_wait_kernel(); - goto out; - } -out: return ret; } @@ -281,7 +261,9 @@ int gds_stream_post_send_all(CUstream stream, int count, gds_send_request_t *req int gds_prepare_wait_cq(struct gds_cq *cq, gds_wait_request_t *request, int flags) { - int retcode = 0; + gds_mlx5_exp_cq_t *gmexpcq; + gds_mlx5_exp_wait_request_t *gmexp_request; + if (flags != 0) { gds_err("invalid flags != 0\n"); return EINVAL; @@ -289,51 +271,19 @@ int gds_prepare_wait_cq(struct gds_cq *cq, gds_wait_request_t *request, int flag gds_init_wait_request(request, cq->curr_offset++); - retcode = ibv_exp_peer_peek_cq(cq->cq, &request->peek); - if (retcode == -ENOSPC) { - // TODO: handle too few entries - gds_err("not enough ops in peer_peek_cq\n"); - goto out; - } else if (retcode) { - gds_err("error %d in peer_peek_cq\n", retcode); - goto out; - } - //gds_dump_wait_request(request, 1); - out: - return retcode; + gmexpcq = to_gds_mexp_cq(cq); + gmexp_request = to_gds_mexp_wait_request(request); + + return gds_mlx5_exp_prepare_wait_cq(gmexpcq, gmexp_request, flags); } //----------------------------------------------------------------------------- int gds_append_wait_cq(gds_wait_request_t *request, uint32_t *dw, uint32_t val) { - int ret = 0; - unsigned MAX_NUM_ENTRIES = sizeof(request->wr)/sizeof(request->wr[0]); - unsigned n = request->peek.entries; - struct peer_op_wr *wr = request->peek.storage; - - if (n + 1 > MAX_NUM_ENTRIES) { - gds_err("no space left to stuff a poke\n"); - ret = ENOMEM; - goto out; - } + gds_mlx5_exp_wait_request_t *gmexp_request = to_gds_mexp_wait_request(request); - // at least 1 op - assert(n); - assert(wr); - - for (; n; --n) wr = wr->next; - assert(wr); - - wr->type = IBV_EXP_PEER_OP_STORE_DWORD; - wr->wr.dword_va.data = val; - wr->wr.dword_va.target_id = 0; // direct mapping, offset IS the address - wr->wr.dword_va.offset = (ptrdiff_t)(dw-(uint32_t*)0); - - ++request->peek.entries; - - out: - return ret; + return gds_mlx5_exp_append_wait_cq(gmexp_request, dw, val); } //----------------------------------------------------------------------------- @@ -354,12 +304,16 @@ int gds_stream_post_wait_cq_all(CUstream stream, int count, gds_wait_request_t * static int gds_abort_wait_cq(struct gds_cq *cq, gds_wait_request_t *request) { + gds_mlx5_exp_cq_t *gmexpcq; + gds_mlx5_exp_wait_request_t *gmexp_request; + assert(cq); assert(request); - struct ibv_exp_peer_abort_peek abort_ctx; - abort_ctx.peek_id = request->peek.peek_id; - abort_ctx.comp_mask = 0; - return ibv_exp_peer_abort_peek_cq(cq->cq, &abort_ctx); + + gmexpcq = to_gds_mexp_cq(cq); + gmexp_request = to_gds_mexp_wait_request(request); + + return gds_mlx5_exp_abort_wait_cq(gmexpcq, gmexp_request); } //----------------------------------------------------------------------------- @@ -554,10 +508,10 @@ static int calc_n_mem_ops(size_t n_descs, gds_descriptor_t *descs, size_t &n_mem gds_descriptor_t *desc = descs + i; switch(desc->tag) { case GDS_TAG_SEND: - n_mem_ops += desc->send->commit.entries + 2; // extra space, ugly + n_mem_ops += gds_mlx5_exp_get_num_send_request_entries(to_gds_mexp_send_request(desc->send)) + 2; // extra space, ugly break; case GDS_TAG_WAIT: - n_mem_ops += desc->wait->peek.entries + 2; // ditto + n_mem_ops += gds_mlx5_exp_get_num_wait_request_entries(to_gds_mexp_wait_request(desc->wait)) + 2; // ditto break; case GDS_TAG_WAIT_VALUE32: case GDS_TAG_WRITE_VALUE32: @@ -616,8 +570,8 @@ int gds_stream_post_descriptors(CUstream stream, size_t n_descs, gds_descriptor_ gds_descriptor_t *desc = descs + i; switch(desc->tag) { case GDS_TAG_SEND: { - gds_send_request_t *sreq = desc->send; - retcode = gds_post_ops(peer, sreq->commit.entries, sreq->commit.storage, params); + gds_mlx5_exp_send_request_t *sreq = to_gds_mexp_send_request(desc->send); + retcode = gds_mlx5_exp_post_send_ops(peer, sreq, params); if (retcode) { gds_err("error %d in gds_post_ops\n", retcode); ret = retcode; @@ -626,15 +580,15 @@ int gds_stream_post_descriptors(CUstream stream, size_t n_descs, gds_descriptor_ break; } case GDS_TAG_WAIT: { - gds_wait_request_t *wreq = desc->wait; + gds_mlx5_exp_wait_request_t *wreq = to_gds_mexp_wait_request(desc->wait); int flags = 0; if (move_flush && i != last_wait) { gds_dbg("discarding FLUSH!\n"); flags = GDS_POST_OPS_DISCARD_WAIT_FLUSH; } - retcode = gds_post_ops(peer, wreq->peek.entries, wreq->peek.storage, params, flags); + retcode = gds_mlx5_exp_stream_post_wait_descriptor(peer, wreq, params, flags); if (retcode) { - gds_err("error %d in gds_post_ops\n", retcode); + gds_err("error %d in gds_mlx5_exp_stream_post_wait_descriptor\n", retcode); ret = retcode; goto out; } @@ -694,8 +648,8 @@ int gds_post_descriptors(size_t n_descs, gds_descriptor_t *descs, int flags) switch(desc->tag) { case GDS_TAG_SEND: { gds_dbg("desc[%zu] SEND\n", i); - gds_send_request_t *sreq = desc->send; - retcode = gds_post_ops_on_cpu(sreq->commit.entries, sreq->commit.storage, flags); + gds_mlx5_exp_send_request_t *sreq = to_gds_mexp_send_request(desc->send); + retcode = gds_mlx5_exp_post_send_ops_on_cpu(sreq, flags); if (retcode) { gds_err("error %d in gds_post_ops_on_cpu\n", retcode); ret = retcode; @@ -705,10 +659,10 @@ int gds_post_descriptors(size_t n_descs, gds_descriptor_t *descs, int flags) } case GDS_TAG_WAIT: { gds_dbg("desc[%zu] WAIT\n", i); - gds_wait_request_t *wreq = desc->wait; - retcode = gds_post_ops_on_cpu(wreq->peek.entries, wreq->peek.storage, flags); + gds_mlx5_exp_wait_request_t *wreq = to_gds_mexp_wait_request(desc->wait); + retcode = gds_mlx5_exp_post_wait_descriptor(wreq, flags); if (retcode) { - gds_err("error %d in gds_post_ops_on_cpu\n", retcode); + gds_err("error %d in gds_mlx5_exp_post_wait_descriptor\n", retcode); ret = retcode; goto out; } diff --git a/src/gdsync.cpp b/src/gdsync.cpp index 90d5508..9f1ddde 100644 --- a/src/gdsync.cpp +++ b/src/gdsync.cpp @@ -43,6 +43,7 @@ #include "archutils.h" #include "mlnxutils.h" #include "task_queue.hpp" +#include "mlx5-exp.hpp" //----------------------------------------------------------------------------- @@ -996,8 +997,9 @@ int gds_post_pokes(CUstream stream, int count, gds_send_request_t *info, uint32_ } for (int j=0; jnext, ++n) { - gds_dbg("op[%zu] type:%d\n", n, op->type); - switch(op->type) { - case IBV_EXP_PEER_OP_FENCE: { - gds_dbg("FENCE flags=%" PRIu64 "\n", op->wr.fence.fence_flags); - break; - } - case IBV_EXP_PEER_OP_STORE_DWORD: { - CUdeviceptr dev_ptr = range_from_id(op->wr.dword_va.target_id)->dptr + - op->wr.dword_va.offset; - gds_dbg("STORE_QWORD data:%x target_id:%" PRIx64 " offset:%zu dev_ptr=%llx\n", - op->wr.dword_va.data, op->wr.dword_va.target_id, - op->wr.dword_va.offset, dev_ptr); - break; - } - case IBV_EXP_PEER_OP_STORE_QWORD: { - CUdeviceptr dev_ptr = range_from_id(op->wr.qword_va.target_id)->dptr + - op->wr.qword_va.offset; - gds_dbg("STORE_QWORD data:%" PRIx64 " target_id:%" PRIx64 " offset:%zu dev_ptr=%llx\n", - op->wr.qword_va.data, op->wr.qword_va.target_id, - op->wr.qword_va.offset, dev_ptr); - break; - } - case IBV_EXP_PEER_OP_COPY_BLOCK: { - CUdeviceptr dev_ptr = range_from_id(op->wr.copy_op.target_id)->dptr + - op->wr.copy_op.offset; - gds_dbg("COPY_BLOCK src:%p len:%zu target_id:%" PRIx64 " offset:%zu dev_ptr=%llx\n", - op->wr.copy_op.src, op->wr.copy_op.len, - op->wr.copy_op.target_id, op->wr.copy_op.offset, - dev_ptr); - break; - } - case IBV_EXP_PEER_OP_POLL_AND_DWORD: - case IBV_EXP_PEER_OP_POLL_NOR_DWORD: { - CUdeviceptr dev_ptr = range_from_id(op->wr.dword_va.target_id)->dptr + - op->wr.dword_va.offset; - gds_dbg("%s data:%08x target_id:%" PRIx64 " offset:%zu dev_ptr=%llx\n", - (op->type==IBV_EXP_PEER_OP_POLL_AND_DWORD) ? "POLL_AND_DW" : "POLL_NOR_SDW", - op->wr.dword_va.data, - op->wr.dword_va.target_id, - op->wr.dword_va.offset, - dev_ptr); - break; - } - default: - gds_err("undefined peer op type %d\n", op->type); - break; - } - } - - assert(count == n); -} - -//----------------------------------------------------------------------------- - void gds_dump_wait_request(gds_wait_request_t *request, size_t count) { - for (size_t j=0; jentries, peek->whence, peek->offset, - peek->peek_id, peek->comp_mask); - gds_dump_ops(peek->storage, peek->entries); - } + gds_mlx5_exp_wait_request_t *gmexp_request; + if (count == 0) + return; + + gmexp_request = to_gds_mexp_wait_request(request); + gds_mlx5_exp_dump_wait_request(gmexp_request, count); } //----------------------------------------------------------------------------- @@ -1678,143 +1622,20 @@ gds_peer *peer_from_stream(CUstream stream) //----------------------------------------------------------------------------- -static ibv_exp_res_domain *gds_create_res_domain(struct ibv_context *context) -{ - if (!context) { - gds_err("invalid context"); - return NULL; - } - - ibv_exp_res_domain_init_attr res_domain_attr; - memset(&res_domain_attr, 0, sizeof(res_domain_attr)); - - res_domain_attr.comp_mask |= IBV_EXP_RES_DOMAIN_THREAD_MODEL; - res_domain_attr.thread_model = IBV_EXP_THREAD_SINGLE; - - ibv_exp_res_domain *res_domain = ibv_exp_create_res_domain(context, &res_domain_attr); - if (!res_domain) { - gds_warn("Can't create resource domain\n"); - } - - return res_domain; -} - -//----------------------------------------------------------------------------- - -static struct gds_cq * -gds_create_cq_internal(struct ibv_context *context, int cqe, - void *cq_context, struct ibv_comp_channel *channel, - int comp_vector, int gpu_id, gds_alloc_cq_flags_t flags, - struct ibv_exp_res_domain * res_domain) -{ - struct gds_cq *gcq = NULL; - ibv_exp_cq_init_attr attr; - gds_peer *peer = NULL; - gds_peer_attr *peer_attr = NULL; - int ret=0; - - if(!context) - { - gds_dbg("Invalid input context\n"); - return NULL; - } - - gcq = (struct gds_cq*)calloc(1, sizeof(struct gds_cq)); - if (!gcq) { - gds_err("cannot allocate memory\n"); - return NULL; - } - - //Here we need to recover peer and peer_attr pointers to set alloc_type and alloc_flags - //before ibv_exp_create_cq - ret = gds_register_peer_by_ordinal(gpu_id, &peer, &peer_attr); - if (ret) { - gds_err("error %d while registering GPU peer\n", ret); - return NULL; - } - assert(peer); - assert(peer_attr); - - peer->alloc_type = gds_peer::CQ; - peer->alloc_flags = flags; - - attr.comp_mask = IBV_EXP_CQ_INIT_ATTR_PEER_DIRECT; - attr.flags = 0; // see ibv_exp_cq_create_flags - attr.peer_direct_attrs = peer_attr; - if (res_domain) { - gds_dbg("using peer->res_domain %p for CQ\n", res_domain); - attr.res_domain = res_domain; - attr.comp_mask |= IBV_EXP_CQ_INIT_ATTR_RES_DOMAIN; - } - - int old_errno = errno; - gcq->cq = ibv_exp_create_cq(context, cqe, cq_context, channel, comp_vector, &attr); - if (!gcq->cq) { - gds_err("error %d in ibv_exp_create_cq, old errno %d\n", errno, old_errno); - return NULL; - } - - return gcq; -} - -//Note: general create cq function, not really used for now! -struct gds_cq * -gds_create_cq(struct ibv_context *context, int cqe, - void *cq_context, struct ibv_comp_channel *channel, - int comp_vector, int gpu_id, gds_alloc_cq_flags_t flags) -{ - int ret = 0; - struct gds_cq *gcq = NULL; - //TODO: leak of res_domain - struct ibv_exp_res_domain * res_domain; - gds_dbg("cqe=%d gpu_id=%d cq_flags=%08x\n", cqe, gpu_id, flags); - - gds_peer *peer = NULL; - gds_peer_attr *peer_attr = NULL; - ret = gds_register_peer_by_ordinal(gpu_id, &peer, &peer_attr); - if (ret) { - gds_err("error %d while registering GPU peer\n", ret); - return NULL; - } - assert(peer); - assert(peer_attr); - - peer->alloc_type = gds_peer::CQ; - peer->alloc_flags = flags; - - res_domain = gds_create_res_domain(context); - if (res_domain) - gds_dbg("using res_domain %p\n", res_domain); - else - gds_warn("NOT using res_domain\n"); - - - gcq = gds_create_cq_internal(context, cqe, cq_context, channel, comp_vector, gpu_id, flags, res_domain); - - if (!gcq) { - gds_err("error in gds_create_cq_internal\n"); - return NULL; - } - - return gcq; -} - -//----------------------------------------------------------------------------- - struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context, gds_qp_init_attr_t *qp_attr, int gpu_id, int flags) { int ret = 0; - struct gds_qp *gqp = NULL; - struct ibv_qp *qp = NULL; - struct gds_cq *rx_gcq = NULL, *tx_gcq = NULL; + gds_mlx5_exp_qp_t *gmexpqp = NULL; gds_peer *peer = NULL; gds_peer_attr *peer_attr = NULL; + gds_driver_type dtype; int old_errno = errno; gds_dbg("pd=%p context=%p gpu_id=%d flags=%08x current errno=%d\n", pd, context, gpu_id, flags, errno); assert(pd); assert(context); + assert(context->device); assert(qp_attr); if (flags & ~(GDS_CREATE_QP_WQ_ON_GPU|GDS_CREATE_QP_TX_CQ_ON_GPU|GDS_CREATE_QP_RX_CQ_ON_GPU|GDS_CREATE_QP_WQ_DBREC_ON_GPU)) { @@ -1822,136 +1643,70 @@ struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context, return NULL; } - gqp = (struct gds_qp*)calloc(1, sizeof(struct gds_qp)); - if (!gqp) { - gds_err("cannot allocate memory\n"); - return NULL; - } - - gqp->dev_context=context; - // peer registration gds_dbg("before gds_register_peer_ex\n"); ret = gds_register_peer_by_ordinal(gpu_id, &peer, &peer_attr); if (ret) { - gds_err("error %d in gds_register_peer_ex\n", ret); - goto err; - } - - gqp->res_domain = gds_create_res_domain(context); - if (gqp->res_domain) - gds_dbg("using gqp->res_domain %p\n", gqp->res_domain); - else - gds_warn("NOT using gqp->res_domain\n"); - - tx_gcq = gds_create_cq_internal(context, qp_attr->cap.max_send_wr, NULL, NULL, 0, gpu_id, - (flags & GDS_CREATE_QP_TX_CQ_ON_GPU) ? GDS_ALLOC_CQ_ON_GPU : GDS_ALLOC_CQ_DEFAULT, - gqp->res_domain); - if (!tx_gcq) { - ret = errno; - gds_err("error %d while creating TX CQ, old_errno=%d\n", ret, old_errno); + gds_err("error %d in gds_register_peer_ex\n", ret); goto err; } - rx_gcq = gds_create_cq_internal(context, qp_attr->cap.max_recv_wr, NULL, NULL, 0, gpu_id, - (flags & GDS_CREATE_QP_RX_CQ_ON_GPU) ? GDS_ALLOC_CQ_ON_GPU : GDS_ALLOC_CQ_DEFAULT, - gqp->res_domain); - if (!rx_gcq) { - ret = errno; - gds_err("error %d while creating RX CQ\n", ret); + dtype = gds_get_driver_type(context->device); + if (dtype != GDS_DRIVER_TYPE_MLX5_EXP) { + gds_err("Unsupported IB device\n"); goto err; } - // peer registration - qp_attr->send_cq = tx_gcq->cq; - qp_attr->recv_cq = rx_gcq->cq; - qp_attr->pd = pd; - qp_attr->comp_mask |= IBV_EXP_QP_INIT_ATTR_PD; - - peer->alloc_type = gds_peer::WQ; - peer->alloc_flags = GDS_ALLOC_WQ_DEFAULT | GDS_ALLOC_DBREC_DEFAULT; - if (flags & GDS_CREATE_QP_WQ_ON_GPU) { - gds_err("error, QP WQ on GPU is not supported yet\n"); - goto err; - } - if (flags & GDS_CREATE_QP_WQ_DBREC_ON_GPU) { - gds_warn("QP WQ DBREC on GPU\n"); - peer->alloc_flags |= GDS_ALLOC_DBREC_ON_GPU; - } - qp_attr->comp_mask |= IBV_EXP_QP_INIT_ATTR_PEER_DIRECT; - qp_attr->peer_direct_attrs = peer_attr; - - qp = ibv_exp_create_qp(context, qp_attr); - if (!qp) { - ret = EINVAL; - gds_err("error in ibv_exp_create_qp\n"); + gmexpqp = gds_mlx5_exp_create_qp(pd, context, qp_attr, peer, peer_attr, flags); + if (!gmexpqp) { + gds_err("Error in gds_mlx5_exp_create_qp.\n"); goto err; } - gqp->qp = qp; - gqp->send_cq.cq = qp->send_cq; - gqp->send_cq.curr_offset = 0; - gqp->recv_cq.cq = qp->recv_cq; - gqp->recv_cq.curr_offset = 0; + gds_dbg("created gds_qp=%p\n", gmexpqp->gqp); - gds_dbg("created gds_qp=%p\n", gqp); - - return gqp; + return &gmexpqp->gqp; err: - gds_dbg("destroying QP\n"); - gds_destroy_qp(gqp); - return NULL; } //----------------------------------------------------------------------------- -int gds_destroy_qp(struct gds_qp *gqp) +int gds_destroy_cq(struct gds_cq *gcq) { int retcode = 0; int ret; - if(!gqp) return retcode; + if (!gcq) + return retcode; - if(gqp->qp) - { - ret = ibv_destroy_qp(gqp->qp); - if (ret) { - gds_err("error %d in destroy_qp\n", ret); - retcode = ret; - } - } + // Currently, we support only exp-verbs. + assert(gcq->dtype == GDS_DRIVER_TYPE_MLX5_EXP); - if(gqp->send_cq.cq) - { - ret = ibv_destroy_cq(gqp->send_cq.cq); - if (ret) { - gds_err("error %d in destroy_cq send_cq\n", ret); - retcode = ret; - } - } + gds_mlx5_exp_cq_t *gmexpcq = to_gds_mexp_cq(gcq); - if(gqp->recv_cq.cq) - { - ret = ibv_destroy_cq(gqp->recv_cq.cq); - if (ret) { - gds_err("error %d in destroy_cq recv_cq\n", ret); - retcode = ret; - } - } + retcode = gds_mlx5_exp_destroy_cq(gmexpcq); - if(gqp->res_domain) { - struct ibv_exp_destroy_res_domain_attr attr; //IBV_EXP_DESTROY_RES_DOMAIN_RESERVED - attr.comp_mask=0; - ret = ibv_exp_destroy_res_domain(gqp->dev_context, gqp->res_domain, &attr); - if (ret) { - gds_err("ibv_exp_destroy_res_domain error %d: %s\n", ret, strerror(ret)); - retcode = ret; - } - } + return retcode; +} + +//----------------------------------------------------------------------------- + +int gds_destroy_qp(struct gds_qp *gqp) +{ + int retcode = 0; + int ret; + + if (!gqp) + return retcode; + + // Currently, we support only exp-verbs. + assert(gqp->dtype == GDS_DRIVER_TYPE_MLX5_EXP); + + gds_mlx5_exp_qp_t *gmexpqp = to_gds_mexp_qp(gqp); - free(gqp); + retcode = gds_mlx5_exp_destroy_qp(gmexpqp); return retcode; } diff --git a/src/mlx5-exp.cpp b/src/mlx5-exp.cpp new file mode 100644 index 0000000..dfaf91c --- /dev/null +++ b/src/mlx5-exp.cpp @@ -0,0 +1,616 @@ +#include +#include +#include + +#include "mlx5-exp.hpp" +#include "utils.hpp" + +static ibv_exp_res_domain *gds_mlx5_exp_create_res_domain(struct ibv_context *context) +{ + if (!context) { + gds_err("invalid context"); + return NULL; + } + + ibv_exp_res_domain_init_attr res_domain_attr; + memset(&res_domain_attr, 0, sizeof(res_domain_attr)); + + res_domain_attr.comp_mask |= IBV_EXP_RES_DOMAIN_THREAD_MODEL; + res_domain_attr.thread_model = IBV_EXP_THREAD_SINGLE; + + ibv_exp_res_domain *res_domain = ibv_exp_create_res_domain(context, &res_domain_attr); + if (!res_domain) { + gds_warn("Can't create resource domain\n"); + } + + return res_domain; +} + +//----------------------------------------------------------------------------- + +gds_mlx5_exp_cq_t *gds_mlx5_exp_create_cq( + struct ibv_context *context, int cqe, + void *cq_context, struct ibv_comp_channel *channel, + int comp_vector, gds_peer *peer, gds_peer_attr *peer_attr, gds_alloc_cq_flags_t flags, + struct ibv_exp_res_domain *res_domain) +{ + gds_mlx5_exp_cq_t *gmexpcq = NULL; + ibv_exp_cq_init_attr attr; + int ret = 0; + + assert(context); + assert(peer); + assert(peer_attr); + + gmexpcq = (gds_mlx5_exp_cq_t *)calloc(1, sizeof(gds_mlx5_exp_cq_t)); + if (!gmexpcq) { + gds_err("cannot allocate memory\n"); + return NULL; + } + + peer->alloc_type = gds_peer::CQ; + peer->alloc_flags = flags; + + attr.comp_mask = IBV_EXP_CQ_INIT_ATTR_PEER_DIRECT; + attr.flags = 0; // see ibv_exp_cq_create_flags + attr.peer_direct_attrs = peer_attr; + if (res_domain) { + gds_dbg("using peer->res_domain %p for CQ\n", res_domain); + attr.res_domain = res_domain; + attr.comp_mask |= IBV_EXP_CQ_INIT_ATTR_RES_DOMAIN; + gmexpcq->res_domain = res_domain; + } + + int old_errno = errno; + gmexpcq->gcq.cq = ibv_exp_create_cq(context, cqe, cq_context, channel, comp_vector, &attr); + if (!gmexpcq->gcq.cq) { + gds_err("error %d in ibv_exp_create_cq, old errno %d\n", errno, old_errno); + return NULL; + } + + gmexpcq->gcq.dtype = GDS_DRIVER_TYPE_MLX5_EXP; + + return gmexpcq; +} + +//----------------------------------------------------------------------------- + +gds_mlx5_exp_qp_t *gds_mlx5_exp_create_qp( + struct ibv_pd *pd, struct ibv_context *context, gds_qp_init_attr_t *qp_attr, + gds_peer *peer, gds_peer_attr *peer_attr, int flags) +{ + int ret = 0; + gds_mlx5_exp_qp_t *gmexpqp = NULL; + struct ibv_qp *qp = NULL; + gds_mlx5_exp_cq_t *rx_gmexpcq = NULL, *tx_gmexpcq = NULL; + struct ibv_exp_qp_init_attr exp_qp_attr = {0,}; + int old_errno = errno; + + assert(pd); + assert(context); + assert(qp_attr); + assert(peer); + assert(peer_attr); + + gmexpqp = (gds_mlx5_exp_qp_t *)calloc(1, sizeof(gds_mlx5_exp_qp_t)); + if (!gmexpqp) { + gds_err("cannot allocate memory\n"); + return NULL; + } + gmexpqp->gqp.dtype = GDS_DRIVER_TYPE_MLX5_EXP; + + gmexpqp->gqp.dev_context = context; + + gmexpqp->res_domain = gds_mlx5_exp_create_res_domain(context); + if (gmexpqp->res_domain) + gds_dbg("using res_domain %p\n", gmexpqp->res_domain); + else + gds_warn("NOT using res_domain\n"); + + tx_gmexpcq = gds_mlx5_exp_create_cq( + context, qp_attr->cap.max_send_wr, NULL, NULL, 0, peer, peer_attr, + (flags & GDS_CREATE_QP_TX_CQ_ON_GPU) ? GDS_ALLOC_CQ_ON_GPU : GDS_ALLOC_CQ_DEFAULT, + gmexpqp->res_domain + ); + if (!tx_gmexpcq) { + ret = errno; + gds_err("error %d while creating TX CQ, old_errno=%d\n", ret, old_errno); + goto err; + } + + rx_gmexpcq = gds_mlx5_exp_create_cq( + context, qp_attr->cap.max_recv_wr, NULL, NULL, 0, peer, peer_attr, + (flags & GDS_CREATE_QP_RX_CQ_ON_GPU) ? GDS_ALLOC_CQ_ON_GPU : GDS_ALLOC_CQ_DEFAULT, + gmexpqp->res_domain + ); + if (!rx_gmexpcq) { + ret = errno; + gds_err("error %d while creating RX CQ\n", ret); + goto err; + } + + // peer registration + peer->alloc_type = gds_peer::WQ; + peer->alloc_flags = GDS_ALLOC_WQ_DEFAULT | GDS_ALLOC_DBREC_DEFAULT; + if (flags & GDS_CREATE_QP_WQ_ON_GPU) { + gds_err("error, QP WQ on GPU is not supported yet\n"); + goto err; + } + if (flags & GDS_CREATE_QP_WQ_DBREC_ON_GPU) { + gds_warn("QP WQ DBREC on GPU\n"); + peer->alloc_flags |= GDS_ALLOC_DBREC_ON_GPU; + } + + exp_qp_attr.send_cq = tx_gmexpcq->gcq.cq; + exp_qp_attr.recv_cq = rx_gmexpcq->gcq.cq; + exp_qp_attr.pd = pd; + exp_qp_attr.comp_mask = IBV_EXP_QP_INIT_ATTR_PD | IBV_EXP_QP_INIT_ATTR_PEER_DIRECT; + exp_qp_attr.peer_direct_attrs = peer_attr; + exp_qp_attr.qp_type = qp_attr->qp_type; + + assert(sizeof(exp_qp_attr.cap) == sizeof(qp_attr->cap)); + + memcpy(&exp_qp_attr.cap, &qp_attr->cap, sizeof(qp_attr->cap)); + + qp = ibv_exp_create_qp(context, &exp_qp_attr); + if (!qp) { + ret = EINVAL; + gds_err("error in ibv_exp_create_qp\n"); + goto err; + } + + tx_gmexpcq->gcq.cq = qp->send_cq; + rx_gmexpcq->gcq.cq = qp->recv_cq; + + gmexpqp->gqp.qp = qp; + gmexpqp->gqp.send_cq = &tx_gmexpcq->gcq; + gmexpqp->gqp.recv_cq = &rx_gmexpcq->gcq; + + gds_dbg("created gds_mlx5_exp_qp=%p\n", gmexpqp); + + return gmexpqp; + +err: + gds_dbg("destroying QP\n"); + gds_mlx5_exp_destroy_qp(gmexpqp); + + return NULL; +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_destroy_qp(gds_mlx5_exp_qp_t *gmexpqp) +{ + int retcode = 0; + int ret; + + if (!gmexpqp) + return retcode; + + assert(gmexpqp->gqp.dtype == GDS_DRIVER_TYPE_MLX5_EXP); + + if (gmexpqp->gqp.qp) { + ret = ibv_destroy_qp(gmexpqp->gqp.qp); + if (ret) { + gds_err("error %d in destroy_qp\n", ret); + retcode = ret; + } + } + + if (gmexpqp->gqp.send_cq) { + ret = gds_destroy_cq(gmexpqp->gqp.send_cq); + if (ret) { + gds_err("error %d in destroy_cq send_cq\n", ret); + retcode = ret; + } + } + + if (gmexpqp->gqp.recv_cq) { + ret = gds_destroy_cq(gmexpqp->gqp.recv_cq); + if (ret) { + gds_err("error %d in destroy_cq recv_cq\n", ret); + retcode = ret; + } + } + + if (gmexpqp->res_domain) { + struct ibv_exp_destroy_res_domain_attr attr = {0,}; //IBV_EXP_DESTROY_RES_DOMAIN_RESERVED + ret = ibv_exp_destroy_res_domain(gmexpqp->gqp.dev_context, gmexpqp->res_domain, &attr); + if (ret) { + gds_err("ibv_exp_destroy_res_domain error %d: %s\n", ret, strerror(ret)); + retcode = ret; + } + } + + free(gmexpqp); + + return retcode; +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_destroy_cq(gds_mlx5_exp_cq_t *gmexpcq) +{ + int retcode = 0; + int ret; + + if (!gmexpcq) + return retcode; + + assert(gmexpcq->gcq.dtype == GDS_DRIVER_TYPE_MLX5_EXP); + + if (gmexpcq->gcq.cq) { + ret = ibv_destroy_cq(gmexpcq->gcq.cq); + if (ret) { + gds_err("error %d in destroy_cq\n", ret); + retcode = ret; + } + } + + // res_domain will be destroyed in gds_mlx5_exp_destroy_qp. + + free(gmexpcq); + + return retcode; +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_prepare_send(gds_mlx5_exp_qp_t *gmexpqp, gds_send_wr *p_ewr, + gds_send_wr **bad_ewr, + gds_mlx5_exp_send_request_t *request) +{ + int ret = 0; + ret = ibv_post_send(gmexpqp->gqp.qp, p_ewr, bad_ewr); + if (ret) { + + if (ret == ENOMEM) { + // out of space error can happen too often to report + gds_dbg("ENOMEM error %d in ibv_post_send\n", ret); + } else { + gds_err("error %d in ibv_post_send\n", ret); + } + goto out; + } + + ret = ibv_exp_peer_commit_qp(gmexpqp->gqp.qp, &request->commit); + if (ret) { + gds_err("error %d in ibv_exp_peer_commit_qp\n", ret); + goto out; + } +out: + return ret; +} + +//----------------------------------------------------------------------------- + +void gds_mlx5_exp_init_send_info(gds_mlx5_exp_send_request_t *info) +{ + gds_dbg("send_request=%p\n", info); + + info->commit.storage = info->wr; + info->commit.entries = sizeof(info->wr)/sizeof(info->wr[0]); + gds_init_ops(info->commit.storage, info->commit.entries); +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_post_send_ops(gds_peer *peer, gds_mlx5_exp_send_request_t *info, gds_op_list_t &ops) +{ + return gds_post_ops(peer, info->commit.entries, info->commit.storage, ops, 0); +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_post_send_ops_on_cpu(gds_mlx5_exp_send_request_t *info, int flags) +{ + return gds_post_ops_on_cpu(info->commit.entries, info->commit.storage, flags); +} + +//----------------------------------------------------------------------------- + +void gds_mlx5_exp_init_wait_request(gds_mlx5_exp_wait_request_t *request, uint32_t offset) +{ + gds_dbg("wait_request=%p offset=%08x\n", request, offset); + request->peek.storage = request->wr; + request->peek.entries = sizeof(request->wr)/sizeof(request->wr[0]); + request->peek.whence = IBV_EXP_PEER_PEEK_ABSOLUTE; + request->peek.offset = offset; + gds_init_ops(request->peek.storage, request->peek.entries); +} + +//----------------------------------------------------------------------------- + +static void gds_mlx5_exp_dump_ops(struct peer_op_wr *op, size_t count) +{ + size_t n = 0; + for (; op; op = op->next, ++n) { + gds_dbg("op[%zu] type:%d\n", n, op->type); + switch(op->type) { + case IBV_EXP_PEER_OP_FENCE: { + gds_dbg("FENCE flags=%" PRIu64 "\n", op->wr.fence.fence_flags); + break; + } + case IBV_EXP_PEER_OP_STORE_DWORD: { + CUdeviceptr dev_ptr = range_from_id(op->wr.dword_va.target_id)->dptr + + op->wr.dword_va.offset; + gds_dbg("STORE_QWORD data:%x target_id:%" PRIx64 " offset:%zu dev_ptr=%llx\n", + op->wr.dword_va.data, op->wr.dword_va.target_id, + op->wr.dword_va.offset, dev_ptr); + break; + } + case IBV_EXP_PEER_OP_STORE_QWORD: { + CUdeviceptr dev_ptr = range_from_id(op->wr.qword_va.target_id)->dptr + + op->wr.qword_va.offset; + gds_dbg("STORE_QWORD data:%" PRIx64 " target_id:%" PRIx64 " offset:%zu dev_ptr=%llx\n", + op->wr.qword_va.data, op->wr.qword_va.target_id, + op->wr.qword_va.offset, dev_ptr); + break; + } + case IBV_EXP_PEER_OP_COPY_BLOCK: { + CUdeviceptr dev_ptr = range_from_id(op->wr.copy_op.target_id)->dptr + + op->wr.copy_op.offset; + gds_dbg("COPY_BLOCK src:%p len:%zu target_id:%" PRIx64 " offset:%zu dev_ptr=%llx\n", + op->wr.copy_op.src, op->wr.copy_op.len, + op->wr.copy_op.target_id, op->wr.copy_op.offset, + dev_ptr); + break; + } + case IBV_EXP_PEER_OP_POLL_AND_DWORD: + case IBV_EXP_PEER_OP_POLL_NOR_DWORD: { + CUdeviceptr dev_ptr = range_from_id(op->wr.dword_va.target_id)->dptr + + op->wr.dword_va.offset; + gds_dbg("%s data:%08x target_id:%" PRIx64 " offset:%zu dev_ptr=%llx\n", + (op->type==IBV_EXP_PEER_OP_POLL_AND_DWORD) ? "POLL_AND_DW" : "POLL_NOR_SDW", + op->wr.dword_va.data, + op->wr.dword_va.target_id, + op->wr.dword_va.offset, + dev_ptr); + break; + } + default: + gds_err("undefined peer op type %d\n", op->type); + break; + } + } + + assert(count == n); +} + +//----------------------------------------------------------------------------- + +void gds_mlx5_exp_dump_wait_request(gds_mlx5_exp_wait_request_t *request, size_t count) +{ + for (size_t j = 0; j < count; ++j) { + struct ibv_exp_peer_peek *peek = &request[j].peek; + gds_dbg("req[%zu] entries:%u whence:%u offset:%u peek_id:%" PRIx64 " comp_mask:%08x\n", + j, peek->entries, peek->whence, peek->offset, + peek->peek_id, peek->comp_mask); + gds_mlx5_exp_dump_ops(peek->storage, peek->entries); + } +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_prepare_wait_cq(gds_mlx5_exp_cq_t *mexpcq, gds_mlx5_exp_wait_request_t *request, int flags) +{ + int retcode = 0; + + retcode = ibv_exp_peer_peek_cq(mexpcq->gcq.cq, &request->peek); + if (retcode == ENOSPC) { + // TODO: handle too few entries + gds_err("not enough ops in peer_peek_cq\n"); + goto out; + } else if (retcode) { + gds_err("error %d in peer_peek_cq\n", retcode); + goto out; + } + //gds_dump_wait_request(request, 1); + out: + return retcode; +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_append_wait_cq(gds_mlx5_exp_wait_request_t *request, uint32_t *dw, uint32_t val) +{ + int ret = 0; + unsigned MAX_NUM_ENTRIES = sizeof(request->wr) / sizeof(request->wr[0]); + unsigned n = request->peek.entries; + struct peer_op_wr *wr = request->peek.storage; + + if (n + 1 > MAX_NUM_ENTRIES) { + gds_err("no space left to stuff a poke\n"); + ret = ENOMEM; + goto out; + } + + // at least 1 op + assert(n); + assert(wr); + + for (; n; --n) + wr = wr->next; + + assert(wr); + + wr->type = IBV_EXP_PEER_OP_STORE_DWORD; + wr->wr.dword_va.data = val; + wr->wr.dword_va.target_id = 0; // direct mapping, offset IS the address + wr->wr.dword_va.offset = (ptrdiff_t)(dw-(uint32_t*)0); + + ++request->peek.entries; + +out: + return ret; +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_abort_wait_cq(gds_mlx5_exp_cq_t *gmexpcq, gds_mlx5_exp_wait_request_t *request) +{ + struct ibv_exp_peer_abort_peek abort_ctx; + abort_ctx.peek_id = request->peek.peek_id; + abort_ctx.comp_mask = 0; + return ibv_exp_peer_abort_peek_cq(gmexpcq->gcq.cq, &abort_ctx); +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_stream_post_wait_descriptor(gds_peer *peer, gds_mlx5_exp_wait_request_t *request, gds_op_list_t ¶ms, int flags) +{ + int ret = 0; + + ret = gds_post_ops(peer, request->peek.entries, request->peek.storage, params, flags); + if (ret) + gds_err("error %d in gds_post_ops\n", ret); + + return ret; +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_post_wait_descriptor(gds_mlx5_exp_wait_request_t *request, int flags) +{ + int ret = 0; + + ret = gds_post_ops_on_cpu(request->peek.entries, request->peek.storage, flags); + if (ret) + gds_err("error %d in gds_post_ops_on_cpu\n", ret); + + return ret; +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_get_wait_descs(gds_mlx5_wait_info_t *mlx5_i, const gds_mlx5_exp_wait_request_t *request) +{ + int retcode = 0; + size_t n_ops = request->peek.entries; + peer_op_wr *op = request->peek.storage; + size_t n = 0; + + memset(mlx5_i, 0, sizeof(*mlx5_i)); + + for (; op && n < n_ops; op = op->next, ++n) { + switch(op->type) { + case IBV_EXP_PEER_OP_FENCE: { + gds_dbg("OP_FENCE: fence_flags=%" PRIu64 "\n", op->wr.fence.fence_flags); + uint32_t fence_op = (op->wr.fence.fence_flags & (IBV_EXP_PEER_FENCE_OP_READ|IBV_EXP_PEER_FENCE_OP_WRITE)); + uint32_t fence_from = (op->wr.fence.fence_flags & (IBV_EXP_PEER_FENCE_FROM_CPU|IBV_EXP_PEER_FENCE_FROM_HCA)); + uint32_t fence_mem = (op->wr.fence.fence_flags & (IBV_EXP_PEER_FENCE_MEM_SYS|IBV_EXP_PEER_FENCE_MEM_PEER)); + if (fence_op == IBV_EXP_PEER_FENCE_OP_READ) { + gds_dbg("nothing to do for read fences\n"); + break; + } + if (fence_from != IBV_EXP_PEER_FENCE_FROM_HCA) { + gds_err("unexpected from fence\n"); + retcode = EINVAL; + break; + } + gds_err("unsupported fence combination\n"); + retcode = EINVAL; + break; + } + case IBV_EXP_PEER_OP_STORE_DWORD: { + CUdeviceptr dev_ptr = range_from_id(op->wr.dword_va.target_id)->dptr + + op->wr.dword_va.offset; + uint32_t data = op->wr.dword_va.data; + gds_dbg("OP_STORE_DWORD dev_ptr=%" PRIx64 " data=%08x\n", (uint64_t)dev_ptr, data); + if (n != 1) { + gds_err("store DWORD is not 2nd op\n"); + retcode = EINVAL; + break; + } + mlx5_i->flag_ptr = (uint32_t*)dev_ptr; + mlx5_i->flag_value = data; + break; + } + case IBV_EXP_PEER_OP_STORE_QWORD: { + CUdeviceptr dev_ptr = range_from_id(op->wr.qword_va.target_id)->dptr + + op->wr.qword_va.offset; + uint64_t data = op->wr.qword_va.data; + gds_dbg("OP_STORE_QWORD dev_ptr=%" PRIx64 " data=%" PRIx64 "\n", (uint64_t)dev_ptr, (uint64_t)data); + gds_err("unsupported QWORD op\n"); + retcode = EINVAL; + break; + } + case IBV_EXP_PEER_OP_COPY_BLOCK: { + CUdeviceptr dev_ptr = range_from_id(op->wr.copy_op.target_id)->dptr + + op->wr.copy_op.offset; + size_t len = op->wr.copy_op.len; + void *src = op->wr.copy_op.src; + gds_err("unsupported COPY_BLOCK\n"); + retcode = EINVAL; + break; + } + case IBV_EXP_PEER_OP_POLL_AND_DWORD: + case IBV_EXP_PEER_OP_POLL_GEQ_DWORD: + case IBV_EXP_PEER_OP_POLL_NOR_DWORD: { + CUdeviceptr dev_ptr = range_from_id(op->wr.dword_va.target_id)->dptr + + op->wr.dword_va.offset; + uint32_t data = op->wr.dword_va.data; + + gds_dbg("OP_POLL_DWORD dev_ptr=%" PRIx64 " data=%08x\n", (uint64_t)dev_ptr, data); + + mlx5_i->cqe_ptr = (uint32_t *)dev_ptr; + mlx5_i->cqe_value = data; + + switch(op->type) { + case IBV_EXP_PEER_OP_POLL_NOR_DWORD: + // GPU SMs can always do NOR + mlx5_i->cond = GDS_WAIT_COND_NOR; + break; + case IBV_EXP_PEER_OP_POLL_GEQ_DWORD: + mlx5_i->cond = GDS_WAIT_COND_GEQ; + break; + case IBV_EXP_PEER_OP_POLL_AND_DWORD: + mlx5_i->cond = GDS_WAIT_COND_AND; + break; + default: + gds_err("unexpected op type\n"); + retcode = EINVAL; + goto err; + } + break; + } + default: + gds_err("undefined peer op type %d\n", op->type); + retcode = EINVAL; + break; + } + err: + if (retcode) { + gds_err("error in fill func at entry n=%zu\n", n); + break; + } + } + return retcode; +} + +//----------------------------------------------------------------------------- + +int gds_mlx5_exp_rollback_qp(gds_mlx5_exp_qp_t *gmexpqp, gds_mlx5_exp_send_request_t *send_info) +{ + struct ibv_exp_rollback_ctx rollback; + int ret = 0; + enum ibv_exp_rollback_flags flag = IBV_EXP_ROLLBACK_ABORT_LATE; + + assert(gmexpqp); + assert(gmexpqp->gqp.qp); + assert(send_info); + + /* from ibv_exp_peer_commit call */ + rollback.rollback_id = send_info->commit.rollback_id; + /* from ibv_exp_rollback_flag */ + rollback.flags = flag; + /* Reserved for future expensions, must be 0 */ + rollback.comp_mask = 0; + gds_warn("Need to rollback WQE %lx\n", rollback.rollback_id); + ret = ibv_exp_rollback_qp(gmexpqp->gqp.qp, &rollback); + if (ret) + gds_err("error %d in ibv_exp_rollback_qp\n", ret); + +out: + return ret; +} diff --git a/src/mlx5-exp.hpp b/src/mlx5-exp.hpp new file mode 100644 index 0000000..7e4ba14 --- /dev/null +++ b/src/mlx5-exp.hpp @@ -0,0 +1,113 @@ +#include +#include +#include + +#include +#include + +#include +#include + +#include "objs.hpp" +#include "utils.hpp" + +typedef struct gds_mlx5_exp_cq { + gds_cq_t gcq; + ibv_exp_res_domain *res_domain; +} gds_mlx5_exp_cq_t; + +typedef struct gds_mlx5_exp_qp { + gds_qp_t gqp; + ibv_exp_res_domain *res_domain; +} gds_mlx5_exp_qp_t; + +typedef struct gds_mlx5_exp_send_request { + gds_driver_type_t dtype; + uint8_t pad0[4]; + struct ibv_exp_peer_commit commit; + struct peer_op_wr wr[GDS_SEND_INFO_MAX_OPS]; + uint8_t pad1[24]; +} gds_mlx5_exp_send_request_t; +static_assert(sizeof(gds_mlx5_exp_send_request_t) % 64 == 0, "gds_mlx5_exp_send_request_t must be 64-byte aligned."); +static_assert(sizeof(gds_mlx5_exp_send_request_t) <= sizeof(gds_send_request_t), "The size of gds_mlx5_exp_send_request_t must be less than or equal to that of gds_send_request_t."); +static_assert(offsetof(gds_mlx5_exp_send_request_t, dtype) == offsetof(gds_send_request_t, dtype), "dtype of gds_mlx5_exp_send_request_t and gds_send_request_t must be at the same offset."); + +typedef struct gds_mlx5_exp_wait_request { + gds_driver_type_t dtype; + uint8_t pad0[4]; + struct ibv_exp_peer_peek peek; + struct peer_op_wr wr[GDS_WAIT_INFO_MAX_OPS]; + uint8_t pad1[16]; +} gds_mlx5_exp_wait_request_t; +static_assert(sizeof(gds_mlx5_exp_wait_request_t) % 64 == 0, "gds_mlx5_exp_wait_request_t must be 64-byte aligned."); +static_assert(sizeof(gds_mlx5_exp_wait_request_t) <= sizeof(gds_wait_request_t), "The size of gds_mlx5_exp_wait_request_t must be less than or equal to that of gds_wait_request_t."); +static_assert(offsetof(gds_mlx5_exp_wait_request_t, dtype) == offsetof(gds_wait_request_t, dtype), "dtype of gds_mlx5_exp_wait_request_t and gds_wait_request_t must be at the same offset."); + +static inline gds_mlx5_exp_cq_t *to_gds_mexp_cq(gds_cq_t *gcq) { + assert(gcq->dtype == GDS_DRIVER_TYPE_MLX5_EXP); + return container_of(gcq, gds_mlx5_exp_cq_t, gcq); +} + +static inline gds_mlx5_exp_qp_t *to_gds_mexp_qp(gds_qp_t *gqp) { + assert(gqp->dtype == GDS_DRIVER_TYPE_MLX5_EXP); + return container_of(gqp, gds_mlx5_exp_qp_t, gqp); +} + +static inline gds_mlx5_exp_send_request_t *to_gds_mexp_send_request(gds_send_request_t *gsreq) { + assert(gsreq->dtype == GDS_DRIVER_TYPE_MLX5_EXP); + return (gds_mlx5_exp_send_request_t *)(gsreq); +} + +static inline const gds_mlx5_exp_send_request_t *to_gds_mexp_send_request(const gds_send_request_t *gsreq) { + return (const gds_mlx5_exp_send_request_t *)to_gds_mexp_send_request((const gds_send_request_t *)gsreq); +} + +static inline gds_mlx5_exp_wait_request_t *to_gds_mexp_wait_request(gds_wait_request_t *gwreq) { + assert(gwreq->dtype == GDS_DRIVER_TYPE_MLX5_EXP); + return (gds_mlx5_exp_wait_request_t *)(gwreq); +} + +static inline const gds_mlx5_exp_wait_request_t *to_gds_mexp_wait_request(const gds_wait_request_t *gwreq) { + return (const gds_mlx5_exp_wait_request_t *)to_gds_mexp_wait_request((const gds_wait_request_t *)gwreq); +} + +static inline uint32_t gds_mlx5_exp_get_num_wait_request_entries(gds_mlx5_exp_wait_request_t *gmexp_request) { + return gmexp_request->peek.entries; +} + +static inline uint32_t gds_mlx5_exp_get_num_send_request_entries(gds_mlx5_exp_send_request_t *gmexp_request) { + return gmexp_request->commit.entries; +} + +gds_mlx5_exp_cq_t *gds_mlx5_exp_create_cq( + struct ibv_context *context, int cqe, + void *cq_context, struct ibv_comp_channel *channel, + int comp_vector, gds_peer *peer, gds_peer_attr *peer_attr, gds_alloc_cq_flags_t flags, + struct ibv_exp_res_domain *res_domain); + +gds_mlx5_exp_qp_t *gds_mlx5_exp_create_qp( + struct ibv_pd *pd, struct ibv_context *context, gds_qp_init_attr_t *qp_attr, + gds_peer *peer, gds_peer_attr *peer_attr, int flags); + +int gds_mlx5_exp_destroy_cq(gds_mlx5_exp_cq_t *gmexpcq); +int gds_mlx5_exp_destroy_qp(gds_mlx5_exp_qp_t *gmexpqp); + +int gds_mlx5_exp_prepare_send(gds_mlx5_exp_qp_t *gmexpqp, gds_send_wr *p_ewr, + gds_send_wr **bad_ewr, + gds_mlx5_exp_send_request_t *request); + + +void gds_mlx5_exp_init_send_info(gds_mlx5_exp_send_request_t *info); +int gds_mlx5_exp_post_send_ops(gds_peer *peer, gds_mlx5_exp_send_request_t *info, gds_op_list_t &ops); +int gds_mlx5_exp_post_send_ops_on_cpu(gds_mlx5_exp_send_request_t *info, int flags = 0); + +void gds_mlx5_exp_init_wait_request(gds_mlx5_exp_wait_request_t *request, uint32_t offset); +void gds_mlx5_exp_dump_wait_request(gds_mlx5_exp_wait_request_t *request, size_t count); +int gds_mlx5_exp_prepare_wait_cq(gds_mlx5_exp_cq_t *mexpcq, gds_mlx5_exp_wait_request_t *request, int flags); +int gds_mlx5_exp_append_wait_cq(gds_mlx5_exp_wait_request_t *request, uint32_t *dw, uint32_t val); +int gds_mlx5_exp_abort_wait_cq(gds_mlx5_exp_cq_t *gmexpcq, gds_mlx5_exp_wait_request_t *request); +int gds_mlx5_exp_stream_post_wait_descriptor(gds_peer *peer, gds_mlx5_exp_wait_request_t *request, gds_op_list_t ¶ms, int flags); +int gds_mlx5_exp_post_wait_descriptor(gds_mlx5_exp_wait_request_t *request, int flags); +int gds_mlx5_exp_get_wait_descs(gds_mlx5_wait_info_t *mlx5_i, const gds_mlx5_exp_wait_request_t *request); + +int gds_mlx5_exp_rollback_qp(gds_mlx5_exp_qp_t *gmexpqp, gds_mlx5_exp_send_request_t *send_info); diff --git a/src/mlx5.cpp b/src/mlx5.cpp index a2c7b39..526f544 100644 --- a/src/mlx5.cpp +++ b/src/mlx5.cpp @@ -40,6 +40,7 @@ //#include "mem.hpp" #include "objs.hpp" #include "utils.hpp" +#include "mlx5-exp.hpp" #if 0 union { uint64_t qw; uint32_t dw[2]; } db_val; @@ -51,9 +52,10 @@ //----------------------------------------------------------------------------- -int gds_mlx5_get_send_descs(gds_mlx5_send_info_t *mlx5_i, const gds_send_request_t *request) +int gds_mlx5_get_send_descs(gds_mlx5_send_info_t *mlx5_i, const gds_send_request_t *_request) { int retcode = 0; + const gds_mlx5_exp_send_request_t *request = to_gds_mexp_send_request(_request); size_t n_ops = request->commit.entries; peer_op_wr *op = request->commit.storage; size_t n = 0; @@ -180,107 +182,8 @@ int gds_mlx5_get_send_info(int count, const gds_send_request_t *requests, gds_ml int gds_mlx5_get_wait_descs(gds_mlx5_wait_info_t *mlx5_i, const gds_wait_request_t *request) { - int retcode = 0; - size_t n_ops = request->peek.entries; - peer_op_wr *op = request->peek.storage; - size_t n = 0; - - memset(mlx5_i, 0, sizeof(*mlx5_i)); - - for (; op && n < n_ops; op = op->next, ++n) { - switch(op->type) { - case IBV_EXP_PEER_OP_FENCE: { - gds_dbg("OP_FENCE: fence_flags=%" PRIu64 "\n", op->wr.fence.fence_flags); - uint32_t fence_op = (op->wr.fence.fence_flags & (IBV_EXP_PEER_FENCE_OP_READ|IBV_EXP_PEER_FENCE_OP_WRITE)); - uint32_t fence_from = (op->wr.fence.fence_flags & (IBV_EXP_PEER_FENCE_FROM_CPU|IBV_EXP_PEER_FENCE_FROM_HCA)); - uint32_t fence_mem = (op->wr.fence.fence_flags & (IBV_EXP_PEER_FENCE_MEM_SYS|IBV_EXP_PEER_FENCE_MEM_PEER)); - if (fence_op == IBV_EXP_PEER_FENCE_OP_READ) { - gds_dbg("nothing to do for read fences\n"); - break; - } - if (fence_from != IBV_EXP_PEER_FENCE_FROM_HCA) { - gds_err("unexpected from fence\n"); - retcode = EINVAL; - break; - } - gds_err("unsupported fence combination\n"); - retcode = EINVAL; - break; - } - case IBV_EXP_PEER_OP_STORE_DWORD: { - CUdeviceptr dev_ptr = range_from_id(op->wr.dword_va.target_id)->dptr + - op->wr.dword_va.offset; - uint32_t data = op->wr.dword_va.data; - gds_dbg("OP_STORE_DWORD dev_ptr=%" PRIx64 " data=%08x\n", (uint64_t)dev_ptr, data); - if (n != 1) { - gds_err("store DWORD is not 2nd op\n"); - retcode = EINVAL; - break; - } - mlx5_i->flag_ptr = (uint32_t*)dev_ptr; - mlx5_i->flag_value = data; - break; - } - case IBV_EXP_PEER_OP_STORE_QWORD: { - CUdeviceptr dev_ptr = range_from_id(op->wr.qword_va.target_id)->dptr + - op->wr.qword_va.offset; - uint64_t data = op->wr.qword_va.data; - gds_dbg("OP_STORE_QWORD dev_ptr=%" PRIx64 " data=%" PRIx64 "\n", (uint64_t)dev_ptr, (uint64_t)data); - gds_err("unsupported QWORD op\n"); - retcode = EINVAL; - break; - } - case IBV_EXP_PEER_OP_COPY_BLOCK: { - CUdeviceptr dev_ptr = range_from_id(op->wr.copy_op.target_id)->dptr + - op->wr.copy_op.offset; - size_t len = op->wr.copy_op.len; - void *src = op->wr.copy_op.src; - gds_err("unsupported COPY_BLOCK\n"); - retcode = EINVAL; - break; - } - case IBV_EXP_PEER_OP_POLL_AND_DWORD: - case IBV_EXP_PEER_OP_POLL_GEQ_DWORD: - case IBV_EXP_PEER_OP_POLL_NOR_DWORD: { - CUdeviceptr dev_ptr = range_from_id(op->wr.dword_va.target_id)->dptr + - op->wr.dword_va.offset; - uint32_t data = op->wr.dword_va.data; - - gds_dbg("OP_POLL_DWORD dev_ptr=%" PRIx64 " data=%08x\n", (uint64_t)dev_ptr, data); - - mlx5_i->cqe_ptr = (uint32_t *)dev_ptr; - mlx5_i->cqe_value = data; - - switch(op->type) { - case IBV_EXP_PEER_OP_POLL_NOR_DWORD: - // GPU SMs can always do NOR - mlx5_i->cond = GDS_WAIT_COND_NOR; - break; - case IBV_EXP_PEER_OP_POLL_GEQ_DWORD: - mlx5_i->cond = GDS_WAIT_COND_GEQ; - break; - case IBV_EXP_PEER_OP_POLL_AND_DWORD: - mlx5_i->cond = GDS_WAIT_COND_AND; - break; - default: - gds_err("unexpected op type\n"); - retcode = EINVAL; - goto err; - } - break; - } - default: - gds_err("undefined peer op type %d\n", op->type); - retcode = EINVAL; - break; - } - err: - if (retcode) { - gds_err("error in fill func at entry n=%zu\n", n); - break; - } - } - return retcode; + const gds_mlx5_exp_wait_request_t *gmexp_request = to_gds_mexp_wait_request(request); + return gds_mlx5_exp_get_wait_descs(mlx5_i, gmexp_request); } //----------------------------------------------------------------------------- diff --git a/src/utils.hpp b/src/utils.hpp index b501bda..c5d0774 100644 --- a/src/utils.hpp +++ b/src/utils.hpp @@ -222,6 +222,28 @@ gds_peer *peer_from_stream(CUstream stream); //----------------------------------------------------------------------------- +/* \brief: Get the underlying driver associated with the ibdev. + * + */ +static inline gds_driver_type gds_get_driver_type(struct ibv_device *ibdev) +{ + const char *dev_name = ibv_get_device_name(ibdev); + + // Heuristically guess the driver by the device name. + // Until we find a better way to do so... + if (strstr(dev_name, "mlx5") != NULL) + return GDS_DRIVER_TYPE_MLX5_EXP; + return GDS_DRIVER_TYPE_UNSUPPORTED; +} + +//----------------------------------------------------------------------------- + +int gds_destroy_cq(struct gds_cq *gcq); + +void gds_init_ops(struct peer_op_wr *op, int count); + +//----------------------------------------------------------------------------- + /* * Local variables: * c-indent-level: 8 diff --git a/tests/gds_kernel_latency.c b/tests/gds_kernel_latency.c index 63875bf..04370f5 100644 --- a/tests/gds_kernel_latency.c +++ b/tests/gds_kernel_latency.c @@ -495,7 +495,7 @@ static int pp_wait_cq(struct pingpong_context *ctx, int is_client) { int ret; if (ctx->peersync) { - ret = gds_stream_wait_cq(gpu_stream, &ctx->gds_qp->recv_cq, ctx->consume_rx_cqe); + ret = gds_stream_wait_cq(gpu_stream, ctx->gds_qp->recv_cq, ctx->consume_rx_cqe); } else { if (is_client) { do { @@ -542,23 +542,22 @@ static int pp_post_gpu_send(struct pingpong_context *ctx, uint32_t qpn, CUstream .wr_id = PINGPONG_SEND_WRID, .sg_list = &list, .num_sge = 1, - .exp_opcode = IBV_EXP_WR_SEND, - .exp_send_flags = IBV_EXP_SEND_SIGNALED, + .opcode = IBV_WR_SEND, + .send_flags = IBV_SEND_SIGNALED, .wr = { .ud = { .ah = ctx->ah, .remote_qpn = qpn, .remote_qkey = 0x11111111 } - }, - .comp_mask = 0 + } }; #if 0 if (IBV_QPT_UD != gds_qpt) { memset(&ewr, 0, sizeof(ewr)); ewr.num_sge = 1; - ewr.exp_send_flags = IBV_EXP_SEND_SIGNALED; - ewr.exp_opcode = IBV_EXP_WR_SEND; + ewr.send_flags = IBV_SEND_SIGNALED; + ewr.opcode = IBV_WR_SEND; ewr.wr_id = PINGPONG_SEND_WRID; ewr.sg_list = &list; ewr.next = NULL; @@ -580,23 +579,22 @@ static int pp_prepare_gpu_send(struct pingpong_context *ctx, uint32_t qpn, gds_s .wr_id = PINGPONG_SEND_WRID, .sg_list = &list, .num_sge = 1, - .exp_opcode = IBV_EXP_WR_SEND, - .exp_send_flags = IBV_EXP_SEND_SIGNALED, + .opcode = IBV_WR_SEND, + .send_flags = IBV_SEND_SIGNALED, .wr = { .ud = { .ah = ctx->ah, .remote_qpn = qpn, .remote_qkey = 0x11111111 } - }, - .comp_mask = 0 + } }; if (IBV_QPT_UD != gds_qpt) { memset(&ewr, 0, sizeof(ewr)); ewr.num_sge = 1; - ewr.exp_send_flags = IBV_EXP_SEND_SIGNALED; - ewr.exp_opcode = IBV_EXP_WR_SEND; + ewr.send_flags = IBV_SEND_SIGNALED; + ewr.opcode = IBV_WR_SEND; ewr.wr_id = PINGPONG_SEND_WRID; ewr.sg_list = &list; ewr.next = NULL; @@ -676,7 +674,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin wdesc->descs[k].tag = GDS_TAG_SEND; wdesc->descs[k].send = &wdesc->send_rq; ++k; - ret = gds_prepare_wait_cq(&ctx->gds_qp->send_cq, &wdesc->wait_tx_rq, 0); + ret = gds_prepare_wait_cq(ctx->gds_qp->send_cq, &wdesc->wait_tx_rq, 0); if (ret) { retcode = -ret; break; @@ -685,7 +683,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin wdesc->descs[k].tag = GDS_TAG_WAIT; wdesc->descs[k].wait = &wdesc->wait_tx_rq; ++k; - ret = gds_prepare_wait_cq(&ctx->gds_qp->recv_cq, &wdesc->wait_rx_rq, 0); + ret = gds_prepare_wait_cq(ctx->gds_qp->recv_cq, &wdesc->wait_rx_rq, 0); if (ret) { retcode = -ret; break; @@ -715,14 +713,14 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin retcode = -ret; break; } - ret = gds_stream_wait_cq(gpu_stream, &ctx->gds_qp->send_cq, 0); + ret = gds_stream_wait_cq(gpu_stream, ctx->gds_qp->send_cq, 0); if (ret) { // TODO: rollback gpu send gpu_err("error %d in gds_stream_wait_cq\n", ret); retcode = -ret; break; } - ret = gds_stream_wait_cq(gpu_stream, &ctx->gds_qp->recv_cq, ctx->consume_rx_cqe); + ret = gds_stream_wait_cq(gpu_stream, ctx->gds_qp->recv_cq, ctx->consume_rx_cqe); if (ret) { // TODO: rollback gpu send and wait send_cq gpu_err("[%d] error %d in gds_stream_wait_cq\n", my_rank, ret); @@ -751,7 +749,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin if (ctx->use_desc_apis) { work_desc_t *wdesc = calloc(1, sizeof(*wdesc)); int k = 0; - ret = gds_prepare_wait_cq(&ctx->gds_qp->recv_cq, &wdesc->wait_rx_rq, 0); + ret = gds_prepare_wait_cq(ctx->gds_qp->recv_cq, &wdesc->wait_rx_rq, 0); if (ret) { retcode = -ret; break; @@ -773,7 +771,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin CUCHECK(cuStreamAddCallback(gpu_stream, post_work_cb, wdesc, 0)); } } else if (ctx->peersync) { - ret = gds_stream_wait_cq(gpu_stream, &ctx->gds_qp->recv_cq, ctx->consume_rx_cqe); + ret = gds_stream_wait_cq(gpu_stream, ctx->gds_qp->recv_cq, ctx->consume_rx_cqe); if (ret) { // TODO: rollback gpu send and wait send_cq gpu_err("error %d in gds_stream_wait_cq\n", ret); @@ -806,7 +804,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin wdesc->descs[k].tag = GDS_TAG_SEND; wdesc->descs[k].send = &wdesc->send_rq; ++k; - ret = gds_prepare_wait_cq(&ctx->gds_qp->send_cq, &wdesc->wait_tx_rq, 0); + ret = gds_prepare_wait_cq(ctx->gds_qp->send_cq, &wdesc->wait_tx_rq, 0); if (ret) { retcode = -ret; break; @@ -835,7 +833,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin retcode = -ret; break; } - ret = gds_stream_wait_cq(gpu_stream, &ctx->gds_qp->send_cq, 0); + ret = gds_stream_wait_cq(gpu_stream, ctx->gds_qp->send_cq, 0); if (ret) { // TODO: rollback gpu send gpu_err("error %d in gds_stream_wait_cq\n", ret); diff --git a/tests/gds_kernel_loopback_latency.c b/tests/gds_kernel_loopback_latency.c index b2d209c..f6ccc32 100644 --- a/tests/gds_kernel_loopback_latency.c +++ b/tests/gds_kernel_loopback_latency.c @@ -511,16 +511,15 @@ static int pp_post_send(struct pingpong_context *ctx, uint32_t qpn) .wr_id = PINGPONG_SEND_WRID, .sg_list = &list, .num_sge = 1, - .exp_opcode = IBV_EXP_WR_SEND, - .exp_send_flags = IBV_EXP_SEND_SIGNALED, + .opcode = IBV_WR_SEND, + .send_flags = IBV_SEND_SIGNALED, .wr = { .ud = { .ah = ctx->ah, .remote_qpn = qpn, .remote_qkey = 0x11111111 } - }, - .comp_mask = 0 + } }; gds_send_wr *bad_ewr; return gds_post_send(ctx->gds_qp, &ewr, &bad_ewr); @@ -538,16 +537,15 @@ static int pp_post_gpu_send(struct pingpong_context *ctx, uint32_t qpn, CUstream .wr_id = PINGPONG_SEND_WRID, .sg_list = &list, .num_sge = 1, - .exp_opcode = IBV_EXP_WR_SEND, - .exp_send_flags = IBV_EXP_SEND_SIGNALED, + .opcode = IBV_WR_SEND, + .send_flags = IBV_SEND_SIGNALED, .wr = { .ud = { .ah = ctx->ah, .remote_qpn = qpn, .remote_qkey = 0x11111111 } - }, - .comp_mask = 0 + } }; gds_send_wr *bad_ewr; return gds_stream_queue_send(*p_gpu_stream, ctx->gds_qp, &ewr, &bad_ewr); @@ -565,16 +563,15 @@ static int pp_prepare_gpu_send(struct pingpong_context *ctx, uint32_t qpn, gds_s .wr_id = PINGPONG_SEND_WRID, .sg_list = &list, .num_sge = 1, - .exp_opcode = IBV_EXP_WR_SEND, - .exp_send_flags = IBV_EXP_SEND_SIGNALED, + .opcode = IBV_WR_SEND, + .send_flags = IBV_SEND_SIGNALED, .wr = { .ud = { .ah = ctx->ah, .remote_qpn = qpn, .remote_qkey = 0x11111111 } - }, - .comp_mask = 0 + } }; gds_send_wr *bad_ewr; //printf("gpu_post_send_on_stream\n"); @@ -655,7 +652,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin wdesc->descs[k].send = &wdesc->send_rq; ++k; - ret = gds_prepare_wait_cq(&ctx->gds_qp->send_cq, &wdesc->wait_tx_rq, 0); + ret = gds_prepare_wait_cq(ctx->gds_qp->send_cq, &wdesc->wait_tx_rq, 0); if (ret) { retcode = -ret; break; @@ -665,7 +662,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin wdesc->descs[k].wait = &wdesc->wait_tx_rq; ++k; - ret = gds_prepare_wait_cq(&ctx->gds_qp->recv_cq, &wdesc->wait_rx_rq, 0); + ret = gds_prepare_wait_cq(ctx->gds_qp->recv_cq, &wdesc->wait_rx_rq, 0); if (ret) { retcode = -ret; break; @@ -697,7 +694,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin break; } - ret = gds_stream_wait_cq(gpu_stream_server, &ctx->gds_qp->send_cq, 0); + ret = gds_stream_wait_cq(gpu_stream_server, ctx->gds_qp->send_cq, 0); if (ret) { // TODO: rollback gpu send gpu_err("error %d in gds_stream_wait_cq\n", ret); @@ -705,7 +702,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin break; } - ret = gds_stream_wait_cq(gpu_stream_server, &ctx->gds_qp->recv_cq, ctx->consume_rx_cqe); + ret = gds_stream_wait_cq(gpu_stream_server, ctx->gds_qp->recv_cq, ctx->consume_rx_cqe); if (ret) { // TODO: rollback gpu send and wait send_cq gpu_err("error %d in gds_stream_wait_cq\n", ret);