Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ AM_CPPFLAGS += -I$(srcdir)/src
#AM_CPPFLAGS += -I$(CUDA_PATH)/include
AM_CPPFLAGS += -D__STDC_FORMAT_MACROS

#AM_LDFLAGS = -L$(CUDA_PATH)/lib64
AM_LDFLAGS = -lmlx5
LIBGDSTOOLS = @LIBGDSTOOLS@
LIBNVTX = @LIBNVTX@

Expand Down Expand Up @@ -73,7 +73,7 @@ bin_PROGRAMS = tests/gds_kernel_latency tests/gds_poll_lat tests/gds_kernel_loop
noinst_PROGRAMS = tests/rstest tests/wqtest

tests_gds_kernel_latency_SOURCES = tests/gds_kernel_latency.c tests/gpu_kernels.cu tests/pingpong.c tests/gpu.cpp
tests_gds_kernel_latency_LDADD = $(top_builddir)/src/libgdsync.la -lmpi $(LIBGDSTOOLS) -lgdrapi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS)
tests_gds_kernel_latency_LDADD = $(top_builddir)/src/libgdsync.la -lmpi $(LIBGDSTOOLS) -lgdrapi -lmlx5 $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS)

tests_rstest_SOURCES = tests/rstest.cpp
tests_rstest_LDADD =
Expand All @@ -82,13 +82,13 @@ tests_wqtest_SOURCES = tests/task_queue_test.cpp
tests_wqtest_LDADD = $(PTHREAD_LIBS)

tests_gds_poll_lat_SOURCES = tests/gds_poll_lat.c tests/gpu.cpp tests/gpu_kernels.cu
tests_gds_poll_lat_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi -lmpi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS)
tests_gds_poll_lat_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi -lmlx5 -lmpi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS)

tests_gds_sanity_SOURCES = tests/gds_sanity.cpp tests/gpu.cpp tests/gpu_kernels.cu
tests_gds_sanity_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi -lmpi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS)
tests_gds_sanity_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi -lmlx5 -lmpi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS)

tests_gds_kernel_loopback_latency_SOURCES = tests/gds_kernel_loopback_latency.c tests/pingpong.c tests/gpu.cpp tests/gpu_kernels.cu
tests_gds_kernel_loopback_latency_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS)
tests_gds_kernel_loopback_latency_LDADD = $(top_builddir)/src/libgdsync.la $(LIBGDSTOOLS) -lgdrapi -lmlx5 $(LIBNVTX) -lcuda -lcudart $(PTHREAD_LIBS)

endif

Expand Down
22 changes: 20 additions & 2 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ else
fi
fi

AC_ARG_WITH([libmlx5],
AC_HELP_STRING([--with-libmlx5], [ Set path to libmlx5s installation ]))
if test x$with_libmlx5 = x || test x$with_libmlx5 = xno; then
want_libmlx5=no
else
want_libmlx5=yes
if test -d $with_libmlx5; then
CPPFLAGS="$CPPFLAGS -I$with_libmlx5/include"
LDFLAGS="$LDFLAGS -L$with_libmlx5/lib"
fi
fi

AC_ARG_WITH([gdrcopy],
AC_HELP_STRING([--with-gdrcopy], [ Set path to gdrcopy installation ]))
if test x$with_gdrcopy = x || test x$with_gdrcopy = xno; then
Expand Down Expand Up @@ -149,6 +161,13 @@ AC_CHECK_LIB(ibverbs, ibv_exp_create_qp,

AC_CHECK_HEADER(infiniband/peer_ops.h, [],
AC_MSG_ERROR([<infiniband/peer_ops.h> not found. libgdsync requires verbs peer-direct support.]))

AC_CHECK_HEADERS([infiniband/mlx5dv.h], [],
AC_MSG_ERROR([<infiniband/mlx5dv.h> not found. libgdsync requires direct verbs support.]))

AC_CHECK_DECLS([mlx5dv_init_obj],
[], [], [[#include <infiniband/mlx5dv.h>]])

AC_HEADER_STDC

dnl Checks for typedefs, structures, and compiler characteristics.
Expand All @@ -175,12 +194,11 @@ LDFLAGS="$LDFLAGS -L$CUDA_DRV_PATH/lib64 -L$CUDA_DRV_PATH/lib -L$CUDA_PATH/lib64
NVCCFLAGS="$NVCCFLAGS"
CUDA_CFLAGS="$CUDA_CFLAGS"
CUDA_LDFLAGS="-L$CUDA_DRV_PATH/lib64 -L$CUDA_DRV_PATH/lib -L$CUDA_PATH/lib64 -L$CUDA_PATH/lib"
CUDA_LIBS="-lcuda -lcudart -lcufft"
CUDA_LIBS="-lcuda -lcudart -lcufft -lmlx5"
NVCCFLAGS="$NVCCFLAGS $CUDA_CFLAGS $CUDA_LDFLAGS $CUDA_LIBS"
AC_SUBST(NVCC, [nvcc])
AC_SUBST(NVCCFLAGS)


dnl AC_CHECK_MEMBER([union CUstreamBatchMemOpParams_union.flushRemoteWrites],
dnl [AC_SUBST( HAS_CUDA_MEMOP_FLUSH_REMOTE_WRITES, 1 )],
dnl [AC_MSG_NOTICE([flushRemoteWrites is not defined])],
Expand Down
53 changes: 52 additions & 1 deletion include/gdsync/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
( ((((v) & 0xffff0000U) >> 16) == GDS_API_MAJOR_VERSION) && \
((((v) & 0x0000ffffU) >> 0 ) >= GDS_API_MINOR_VERSION) )

#define IBV_EXP_SEND_GET_INFO (1 << 28)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the code still keep this flag in send_flags? I think it would be better to use a separate field so that future send flags in libibverbs won't conflict with this definition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem here is that gds_send_wr is simply ibv_exp_send_wr.
maybe we want to have a new flags arg for gds_prepare_send().


typedef enum gds_param {
GDS_PARAM_VERSION,
GDS_NUM_PARAMS
Expand Down Expand Up @@ -68,6 +70,11 @@ struct gds_qp {
struct gds_cq recv_cq;
struct ibv_exp_res_domain * res_domain;
struct ibv_context *dev_context;

void* swq; //send work queue pointer
size_t swq_cnt; //counter tracking swq location
size_t swq_size; //size of the swq (Blocks)
size_t swq_stride; //size of Blocks
};

/* \brief: Create a peer-enabled QP attached to the specified GPU id.
Expand Down Expand Up @@ -159,8 +166,23 @@ typedef enum gds_update_send_info_type {
* Represents a posted send operation on a particular QP
*/

#define GDS_SEND_MAX_SGE 16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to enforce this limitation? Maybe check it in gds_create_qp?


struct ptr_to_sge{
uintptr_t ptr_to_size;
uintptr_t ptr_to_lkey;
uintptr_t ptr_to_addr;
int offset;
};

struct gds_swr_info{
size_t num_sge;
struct ptr_to_sge sge_list[GDS_SEND_MAX_SGE];
size_t wr_id;
};

typedef struct gds_send_request_info {
struct ibv_qp_swr_info swr_info;
struct gds_swr_info swr_info;
//Size info
uintptr_t ptr_to_size_wqe_h;
CUdeviceptr ptr_to_size_wqe_d;
Expand Down Expand Up @@ -350,6 +372,35 @@ int gds_stream_post_descriptors(CUstream stream, size_t n_descs, gds_descriptor_
*/
int gds_post_descriptors(size_t n_descs, gds_descriptor_t *descs, int flags);

/**
* \brief: TODO
*
*
* \param flags - TODO
*
* \return
* 0 on success or one standard errno error
*
*
* Notes:
* - TODO.
*/
int gds_report_post(struct gds_qp *gqp /*, struct gds_send_wr* wr*/);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function documentation is still missing. I guess this function advances the tracking in the gds_qp struct of the current producer index with the given send wr size?


/**
* \brief: TODO
*
*
* \param flags - TODO
*
* \return
* 0 on success or one standard errno error
*
*
* Notes:
* - TODO.
*/
int gds_query_last_info(struct gds_qp* qp, struct gds_swr_info* gds_info);

/*
* Local variables:
Expand Down
82 changes: 74 additions & 8 deletions src/apis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,28 +127,29 @@ static int gds_rollback_qp(struct gds_qp *qp, gds_send_request_t * send_info, en
//-----------------------------------------------------------------------------

#define ntohll(x) (((uint64_t)(ntohl((int)((x << 32) >> 32))) << 32) | (uint32_t)ntohl(((int)(x >> 32))))
static void gds_dump_swr(const char * func_name, struct ibv_qp_swr_info swr_info)

static void gds_dump_swr(const char * func_name, struct gds_swr_info swr_info)
{
gds_dbg("[%s] wr_id=%lx, num_sge=%d\n", func_name, swr_info.wr_id, swr_info.num_sge);

for(int j=0; j < swr_info.num_sge; j++)
{
gds_dbg("[%s] SGE=%d, Size ptr=0x%08x, Size=%d (0x%08x), +offset=%d\n",
gds_dbg("[%s] SGE=%d, Size ptr=00x%lx, Size=%d (0x%08x), +offset=%d\n",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a typo here (00x), and you might want to put debugging print changes in a separate patch, to make the review easier.

func_name,
j,
(uintptr_t)swr_info.sge_list[j].ptr_to_size,
(uint32_t) ntohl( ((uint32_t*)swr_info.sge_list[j].ptr_to_size)[0]) ,
(uint32_t) ((uint32_t*)swr_info.sge_list[j].ptr_to_size)[0],
((uint32_t) ntohl( ((uint32_t*)swr_info.sge_list[j].ptr_to_size)[0]) ) + swr_info.sge_list[j].offset );

gds_dbg("[%s] SGE=%d, lkey ptr=0x%08x, lkey=%d (0x%08x)\n",
gds_dbg("[%s] SGE=%d, lkey ptr=00x%lx, lkey=%d (0x%08x)\n",
func_name,
j,
(uintptr_t)swr_info.sge_list[j].ptr_to_lkey,
(uint32_t) ntohl( ((uint32_t*)swr_info.sge_list[j].ptr_to_lkey)[0]) ,
(uint32_t) ((uint32_t*)swr_info.sge_list[j].ptr_to_lkey)[0]);

gds_dbg("[%s] SGE=%d, Addr ptr=%lx, Addr=%lx -offset=%lx\n",
gds_dbg("[%s] SGE=%d, Addr ptr=00x%lx, Addr=%lx -offset=%lx\n",
func_name,
j,
(uintptr_t)swr_info.sge_list[j].ptr_to_addr,
Expand Down Expand Up @@ -233,7 +234,7 @@ int gds_prepare_send(struct gds_qp *qp, gds_send_wr *p_ewr,
(int)p_ewr->sg_list[i].length
);
}
memset(&(request->gds_sinfo.swr_info), 0, sizeof(struct ibv_qp_swr_info));
memset(&(request->gds_sinfo.swr_info), 0, sizeof(struct gds_swr_info));
}

ret = ibv_exp_post_send(qp->qp, p_ewr, bad_ewr);
Expand All @@ -246,18 +247,19 @@ int gds_prepare_send(struct gds_qp *qp, gds_send_wr *p_ewr,
}
goto out;
}

if(get_info)
{
ret = ibv_exp_query_send_info(qp->qp, p_ewr->wr_id, &(request->gds_sinfo.swr_info));
ret = gds_query_last_info(qp, &(request->gds_sinfo.swr_info));
if(ret)
{
fprintf(stderr, "ibv_exp_query_send_info returned %d: %s\n", ret, strerror(ret));
fprintf(stderr, "gds_query_last_info returned %d: %s\n", ret, strerror(ret));
goto out;
}

gds_dump_swr("gds_prepare_send", request->gds_sinfo.swr_info);
}
ret = gds_report_post(qp /*, p_ewr*/); //increment counter.

ret = ibv_exp_peer_commit_qp(qp->qp, &request->commit);
if (ret) {
Expand Down Expand Up @@ -1180,6 +1182,70 @@ int gds_post_descriptors(size_t n_descs, gds_descriptor_t *descs, int flags)
return ret;
}

struct mlx5_sge{
uint32_t byte_count;
uint32_t key;
uint64_t addr;
};

struct mlx5_send_wqe{
uint32_t opmod_wqeidx_opcode;
uint32_t qpn_ds;
uint64_t ctrl34;
struct mlx5_sge sge;
};

int gds_report_post(struct gds_qp *qp /*, struct gds_send_wr* wr*/){
++(qp->swq_cnt);
return 0;
/*//Smarter Alternative for cases we use larger wqes:
struct mlx5_send_wqe* wqe = (struct mlx5_send_wqe*) ((char*) qp->swq + qp->swq_stride * ((qp->swq_cnt) % qp->swq_size));
size_t ds = (ntohl(wqe->qpn_ds) & (0x0000007f));
size_t wqes_per_block = (qp->swq_stride / sizeof(mlx5_sge));
size_t num_blocks = ds / wqes_per_block + !!(ds % wqes_per_block);
(qp->swq_cnt)+=num_blocks;
return 0;
*/
}

int gds_query_last_info(struct gds_qp *qp, struct gds_swr_info* gds_info){
struct mlx5_send_wqe* wqe = (struct mlx5_send_wqe*) ((char*) qp->swq + qp->swq_stride * ((qp->swq_cnt) % qp->swq_size));

size_t base_blocks = 1;
switch (ntohl(wqe->opmod_wqeidx_opcode) & (0x000000ff)){
case IBV_WR_RDMA_WRITE:
case IBV_WR_RDMA_WRITE_WITH_IMM:
case IBV_WR_RDMA_READ:
base_blocks = 2;
break;
case IBV_WR_SEND:
default:
base_blocks = 1;
}

gds_info->num_sge = (ntohl(wqe->qpn_ds) & (0x0000007f)) - base_blocks;

struct mlx5_sge* sge = &(wqe->sge);
size_t blocks_per_wqe = (qp->swq_stride / sizeof(mlx5_sge));

uint16_t blocks_left = ((qp->swq_size - (qp->swq_cnt % qp->swq_size)) * qp->swq_stride) - base_blocks;
//we need to monitor how many blocks we have left before wrap around.

for (size_t i = 0; i< gds_info->num_sge; ++i){
gds_info->sge_list[i].ptr_to_size = (uintptr_t) &(sge->byte_count);
gds_info->sge_list[i].ptr_to_lkey = (uintptr_t) &(sge->key);
gds_info->sge_list[i].ptr_to_addr = (uintptr_t) &(sge->addr);
gds_info->sge_list[i].offset = 0; //why is that here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does the offset field stand for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@e-ago do you remember why you had offset in the first place?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did you find this? Considering file https://github.com/gpudirect/libmlx5/blob/expose_send_params/src/qp.c the offset is modified here
qp->swr_info[qp->cur_swr].sge[qp->swr_info[qp->cur_swr].cur_sge].offset = offset;
and here
swr_info->sge_list[j].offset = qp->swr_info[i].sge[j].offset

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the offset field affecting the data written into the wqe in some way?

if (i == blocks_left){
sge = (struct mlx5_sge*) qp->swq;
} else {
(++sge);
}
}
gds_info->wr_id = 1; //just exists to match old API.
return 0;
}

//-----------------------------------------------------------------------------

/*
Expand Down
42 changes: 40 additions & 2 deletions src/gdsync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
#include "mlnxutils.h"
#include "task_queue.hpp"

extern "C" {
#include <infiniband/mlx5dv.h>
}

//-----------------------------------------------------------------------------

void gds_assert(const char *cond, const char *file, unsigned line, const char *function)
Expand Down Expand Up @@ -1871,7 +1875,34 @@ gds_create_cq(struct ibv_context *context, int cqe,

//-----------------------------------------------------------------------------

struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context,

int gds_add_dv_qp_ctx(struct gds_qp* gqp){
struct mlx5dv_obj dv_obj = {};
struct mlx5dv_qp* dv_qp = (struct mlx5dv_qp *)malloc(sizeof(struct mlx5dv_qp));
memset((void *)&dv_obj, 0, sizeof(struct mlx5dv_obj));

dv_obj.qp.in = gqp->qp;
dv_obj.qp.out = dv_qp;
int ret = mlx5dv_init_obj(&dv_obj, MLX5DV_OBJ_QP);

if (ret){
free(dv_qp);
return ret;
}

gqp->swq_cnt = 0;
gqp->swq_size = dv_qp->sq.wqe_cnt;
gqp->swq = dv_qp->sq.buf;
gqp->swq_stride = dv_qp->sq.stride;
free(dv_qp);
gds_dbg("extracted dv_qp context=%p\n", gqp);
return 0;
}

//-----------------------------------------------------------------------------

struct gds_qp*
gds_create_qp(struct ibv_pd *pd, struct ibv_context *context,
gds_qp_init_attr_t *qp_attr, int gpu_id, int flags)
{
int ret = 0;
Expand Down Expand Up @@ -1964,8 +1995,15 @@ struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context,
gqp->recv_cq.cq = qp->recv_cq;
gqp->recv_cq.curr_offset = 0;

gds_dbg("created gds_qp=%p\n", gqp);

ret = gds_add_dv_qp_ctx(gqp);
if (ret){
ret = EINVAL;
gds_err("error in gds_add_dv_qp_ctx\n");
goto err;
}

gds_dbg("created gds_qp=%p\n", gqp);
return gqp;

err:
Expand Down
2 changes: 1 addition & 1 deletion tests/gds_kernel_latency.c
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ static int pp_post_work(struct pingpong_context *ctx, int n_posts, int rcnt, uin
if(ctx->validate)
{
cudaDeviceSynchronize();

MPI_Barrier(MPI_COMM_WORLD);
cudaMemcpy(ctx->validate_buf, ctx->rxbuf, ctx->size, cudaMemcpyDefault);
char *value = (char*)ctx->validate_buf;
char expected=i%CHAR_MAX;
Expand Down