diff --git a/Makefile.am b/Makefile.am index d91c363..9e2fd21 100644 --- a/Makefile.am +++ b/Makefile.am @@ -21,7 +21,7 @@ libgdsyncincludedir = $(includedir)/gdsync libgdsyncinclude_HEADERS = include/gdsync/core.h include/gdsync/device.cuh include/gdsync/mlx5.h include/gdsync/tools.h src_libgdsync_la_CFLAGS = $(AM_CFLAGS) -src_libgdsync_la_SOURCES = src/gdsync.cpp src/memmgr.cpp src/mem.cpp src/objs.cpp src/apis.cpp src/mlx5.cpp include/gdsync.h +src_libgdsync_la_SOURCES = src/gdsync.cpp src/memmgr.cpp src/mem.cpp src/objs.cpp src/apis.cpp src/mlx5.cpp src/flusher.cpp include/gdsync.h src_libgdsync_la_LDFLAGS = -version-info 2:0:1 noinst_HEADERS = src/mem.hpp src/memmgr.hpp src/objs.hpp src/rangeset.hpp src/utils.hpp src/archutils.h src/mlnxutils.h diff --git a/include/gdsync/core.h b/include/gdsync/core.h index ef501c0..932da03 100644 --- a/include/gdsync/core.h +++ b/include/gdsync/core.h @@ -47,11 +47,14 @@ typedef enum gds_param { int gds_query_param(gds_param_t param, int *value); enum gds_create_qp_flags { - GDS_CREATE_QP_DEFAULT = 0, - GDS_CREATE_QP_WQ_ON_GPU = 1<<0, - GDS_CREATE_QP_TX_CQ_ON_GPU = 1<<1, - GDS_CREATE_QP_RX_CQ_ON_GPU = 1<<2, - GDS_CREATE_QP_WQ_DBREC_ON_GPU = 1<<5, + GDS_CREATE_QP_DEFAULT = 0, + GDS_CREATE_QP_WQ_ON_GPU = 1<<0, + GDS_CREATE_QP_TX_CQ_ON_GPU = 1<<1, + GDS_CREATE_QP_RX_CQ_ON_GPU = 1<<2, + GDS_CREATE_QP_GPU_INVALIDATE_TX_CQ = 1 << 3, + GDS_CREATE_QP_GPU_INVALIDATE_RX_CQ = 1 << 4, + GDS_CREATE_QP_WQ_DBREC_ON_GPU = 1<<5, + GDS_CREATE_QP_FLUSHER = 1<<6, }; typedef struct ibv_exp_qp_init_attr gds_qp_init_attr_t; diff --git a/src/apis.cpp b/src/apis.cpp index 0f29f2f..b727778 100644 --- a/src/apis.cpp +++ b/src/apis.cpp @@ -49,7 +49,7 @@ #include "utils.hpp" #include "memmgr.hpp" //#include "mem.hpp" - +#include "flusher.hpp" //----------------------------------------------------------------------------- @@ -622,7 +622,9 @@ int gds_stream_post_descriptors(CUstream stream, size_t n_descs, gds_descriptor_ // move flush to last wait in the whole batch if (n_waits && no_network_descs_after_entry(n_descs, descs, last_wait)) { gds_dbg("optimizing FLUSH to last wait i=%zu\n", last_wait); - move_flush = true; + if( gds_use_native_flusher() ) move_flush=true; + else n_mem_ops += gds_flusher_count_op(); + gds_dbg("optimizing FLUSH to last wait i=%zu, n_mem_ops: %d, flusher ops: %d\n", last_wait, n_mem_ops, gds_flusher_count_op()); } // alternatively, remove flush for wait is next op is a wait too diff --git a/src/flusher.cpp b/src/flusher.cpp new file mode 100644 index 0000000..b501ae6 --- /dev/null +++ b/src/flusher.cpp @@ -0,0 +1,906 @@ +/* Copyright (c) 2016, NVIDIA CORPORATION. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of NVIDIA CORPORATION nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "flusher.hpp" + +struct flusher_qp_info * flusher_qp=NULL; + +//-------------------------------- STATIC ------------------------------------ +static int * flsign_h; +static gds_flusher_buf flread_d; +static gds_flusher_buf flack_d; +static int flusher_value=0; +static pthread_t flusher_thread; +static int gds_use_flusher = -1; +static int local_gpu_id=0; + +static const char * flusher_int_to_str(int flusher_int) +{ + if(flusher_int == GDS_FLUSHER_NONE) + return "No flusher"; + else if(flusher_int == GDS_FLUSHER_NATIVE) + return "GPU native flusher"; + else if(flusher_int == GDS_FLUSHER_CPU) + return "CPU thread"; + else if(flusher_int == GDS_FLUSHER_NIC) + return "NIC RDMA PUT"; + else + return "Unknown"; +} + +static inline bool gds_flusher_service_active() { + if(gds_use_flusher == GDS_FLUSHER_CPU || gds_use_flusher == GDS_FLUSHER_NIC) + return true; + else + return false; +} +#define CHECK_FLUSHER_SERVICE() \ + if(gds_flusher_service_active() == false) \ + { \ + gds_dbg("Flusher service not active (%d)\n", gds_use_flusher); \ + goto out; \ + } + +#define ROUND_TO(V,PS) ((((V) + (PS) - 1)/(PS)) * (PS)) + +bool gds_use_native_flusher() +{ + if(gds_use_flusher == GDS_FLUSHER_NATIVE) + return true; + + return false; +} + +static int gds_flusher_pin_buffer(gds_flusher_buf * fl_mem, size_t req_size, int type_mem) +{ + int ret=0; + CUcontext gpu_ctx; + CUdevice gpu_device; + size_t size = ROUND_TO(req_size, GDS_GPU_PAGE_SIZE); + + gds_dbg("GPU%u: malloc req_size=%zu size=%zu\n", local_gpu_id, req_size, size); + + if (!fl_mem) { + gds_err("invalid params\n"); + return EINVAL; + } + + // NOTE: gpu_id's primary context is assumed to be the right one + // breaks horribly with multiple contexts + int num_gpus; + do { + CUresult err = cuDeviceGetCount(&num_gpus); + if (CUDA_SUCCESS == err) { + break; + } else if (CUDA_ERROR_NOT_INITIALIZED == err) { + gds_err("CUDA error %d in cuDeviceGetCount, calling cuInit\n", err); + CUCHECK(cuInit(0)); + // try again + continue; + } else { + gds_err("CUDA error %d in cuDeviceGetCount, returning EIO\n", err); + return EIO; + } + } while(0); + gds_dbg("num_gpus=%d\n", num_gpus); + if (local_gpu_id >= num_gpus) { + gds_err("invalid num_GPUs=%d while requesting GPU id %d\n", num_gpus, local_gpu_id); + return EINVAL; + } + + CUCHECK(cuDeviceGet(&gpu_device, local_gpu_id)); + gds_dbg("local_gpu_id=%d gpu_device=%d\n", local_gpu_id, gpu_device); + // TODO: check for existing context before switching to the interop one + CUCHECK(cuDevicePrimaryCtxRetain(&gpu_ctx, gpu_device)); + CUCHECK(cuCtxPushCurrent(gpu_ctx)); + assert(gpu_ctx != NULL); + + gds_mem_desc_t *desc = (gds_mem_desc_t *)calloc(1, sizeof(gds_mem_desc_t)); + if (!desc) { + gds_err("error while allocating mem desc\n"); + ret = ENOMEM; + goto out; + } + + ret = gds_alloc_mapped_memory(desc, size, type_mem); + if (ret) { + gds_err("error %d while allocating mapped GPU buffers\n", ret); + goto out; + } + + fl_mem->buf_h = desc->h_ptr; + fl_mem->buf_d = desc->d_ptr; + fl_mem->desc = desc; + + out: + if (ret) + free(desc); // desc can be NULL + + CUCHECK(cuCtxPopCurrent(NULL)); + CUCHECK(cuDevicePrimaryCtxRelease(gpu_device)); + + return ret; +} + +static int gds_flusher_free_pinned_buffer(gds_flusher_buf * fl_mem) +{ + int ret = 0; + CUcontext gpu_ctx; + CUdevice gpu_device; + + gds_dbg("GPU%u: mfree\n", local_gpu_id); + + if (!fl_mem->desc) { + gds_err("invalid handle\n"); + return EINVAL; + } + + if (!fl_mem->buf_h) { + gds_err("invalid host_addr\n"); + return EINVAL; + } + + // NOTE: gpu_id's primary context is assumed to be the right one + // breaks horribly with multiple contexts + + CUCHECK(cuDeviceGet(&gpu_device, local_gpu_id)); + CUCHECK(cuDevicePrimaryCtxRetain(&gpu_ctx, gpu_device)); + CUCHECK(cuCtxPushCurrent(gpu_ctx)); + assert(gpu_ctx != NULL); + + gds_mem_desc_t *desc = (gds_mem_desc_t *)fl_mem->desc; + ret = gds_free_mapped_memory(desc); + if (ret) { + gds_err("error %d while freeing mapped GPU buffers\n", ret); + } + free(desc); + + CUCHECK(cuCtxPopCurrent(NULL)); + CUCHECK(cuDevicePrimaryCtxRelease(gpu_device)); + + return ret; +} + +static int gds_flusher_create_qp() +{ + struct ibv_port_attr port_attr; + int qp_flags=0; + int attr_flags=0; + int ret=0; + struct ibv_ah_attr ib_ah_attr; + struct ibv_qp_attr attr; + gds_qp_init_attr_t qp_init_attr; + + qp_flags |= GDS_CREATE_QP_FLUSHER; + //qp_flags |= GDS_CREATE_QP_GPU_INVALIDATE_RX_CQ; + qp_flags |= GDS_CREATE_QP_GPU_INVALIDATE_TX_CQ; + + // -------------------------------------------- + + memset(&qp_init_attr, 0, sizeof(gds_qp_init_attr_t)); + + qp_init_attr.send_cq = 0; + qp_init_attr.recv_cq = 0; + qp_init_attr.cap.max_send_wr = 512; + qp_init_attr.cap.max_recv_wr = 512; + qp_init_attr.cap.max_send_sge = 1; + qp_init_attr.cap.max_recv_sge = 1; + qp_init_attr.cap.max_inline_data = 0; //The flusher must not inline data!! + qp_init_attr.qp_type = IBV_QPT_RC; //UD & RDMA_WRITE are not compatible! + + flusher_qp->loopback_qp = gds_create_qp(flusher_qp->pd, flusher_qp->context, &qp_init_attr, flusher_qp->gpu_id, qp_flags); + if (!flusher_qp->loopback_qp) { + gds_err("gds_set_loopback_qp returned NULL\n"); + ret=EINVAL; + goto out; + } + + // -------------------------------------------- + + memset(&attr, 0, sizeof(struct ibv_qp_attr)); + attr.qp_state = IBV_QPS_INIT; + attr.pkey_index = 0; + attr.port_num = flusher_qp->ib_port; + attr.qkey = GDS_FLUSHER_QKEY; + attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE; + + attr_flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS; + + if (ibv_modify_qp(flusher_qp->loopback_qp->qp, &attr, attr_flags)) { + gds_err("Failed to modify QP to INIT\n"); + goto clean_qp; + } + + if(ibv_query_port(flusher_qp->context, flusher_qp->ib_port, &port_attr)) + { + fprintf(stderr, "Failed to modify QP to INIT\n"); + goto clean_qp; + } + + flusher_qp->lid=port_attr.lid; + flusher_qp->qpn=flusher_qp->loopback_qp->qp->qp_num; + flusher_qp->psn=0; //lrand48() & 0xffffff; + + // -------------------------------------------- + + memset(&attr, 0, sizeof(struct ibv_qp_attr)); + attr.qp_state = IBV_QPS_RTR; + attr.path_mtu = port_attr.active_mtu; + attr.dest_qp_num = flusher_qp->qpn; + attr.rq_psn = flusher_qp->psn; + attr.ah_attr.dlid = flusher_qp->lid; + attr.max_dest_rd_atomic = 1; + attr.min_rnr_timer = 12; + attr.ah_attr.is_global = 0; + attr.ah_attr.sl = 0; + attr.ah_attr.src_path_bits = 0; + attr.ah_attr.port_num = flusher_qp->ib_port; + attr_flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MIN_RNR_TIMER | IBV_QP_MAX_DEST_RD_ATOMIC; + + if (ibv_modify_qp(flusher_qp->loopback_qp->qp, &attr, attr_flags)) { + gds_err("Failed to modify QP to RTR\n"); + goto clean_qp; + } + + // -------------------------------------------- + + memset(&attr, 0, sizeof(struct ibv_qp_attr)); + attr.qp_state = IBV_QPS_RTS; + attr.sq_psn = 0; + attr.timeout = 20; + attr.retry_cnt = 7; + attr.rnr_retry = 7; + attr.max_rd_atomic = 1; + attr_flags = IBV_QP_STATE | IBV_QP_SQ_PSN | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_MAX_QP_RD_ATOMIC; + + if (ibv_modify_qp(flusher_qp->loopback_qp->qp, &attr, attr_flags)) { + gds_err("Failed to modify QP to RTS\n"); + goto clean_qp; + } + + out: + return ret; + + clean_qp: + gds_destroy_qp(flusher_qp->loopback_qp); + return EINVAL; + +} + +static int gds_flusher_prepare_put_dmem(gds_send_request_t * send_info, struct gds_flusher_buf * src_fbuf, struct gds_flusher_buf * dst_fbuf) +{ + int ret=0; + struct ibv_sge list; + gds_send_wr p_ewr; + gds_send_wr *bad_ewr; + + assert(send_info); + assert(src_fbuf); + assert(dst_fbuf); + + //send read word + memset(&list, 0, sizeof(struct ibv_sge)); + list.addr = (uintptr_t) (src_fbuf->buf_d); + list.length = src_fbuf->size; + list.lkey = src_fbuf->mr->lkey; + + memset(&p_ewr, 0, sizeof(gds_send_wr)); + p_ewr.next = NULL; + p_ewr.exp_send_flags = 0; //IBV_EXP_SEND_SIGNALED; + p_ewr.exp_opcode = IBV_EXP_WR_RDMA_WRITE; + p_ewr.wr_id = 1; + p_ewr.num_sge = 1; + p_ewr.sg_list = &list; + p_ewr.wr.rdma.remote_addr = dst_fbuf->buf_d; + p_ewr.wr.rdma.rkey = dst_fbuf->mr->rkey; + + ret = gds_prepare_send(flusher_qp->loopback_qp, &p_ewr, &bad_ewr, send_info); + if (ret) { + gds_err("error %d in gds_prepare_send\n", ret); + goto out; + } + + out: + return ret; +} +//------------------------------- COMMON ------------------------------------ +int gds_flusher_get_envars() +{ + if (-1 == gds_use_flusher) { + const char *env = getenv("GDS_FLUSHER_TYPE"); + if (env) + { + gds_use_flusher = atoi(env); + if( + gds_use_flusher != GDS_FLUSHER_NONE && + gds_use_flusher != GDS_FLUSHER_NATIVE && + gds_use_flusher != GDS_FLUSHER_CPU && + gds_use_flusher != GDS_FLUSHER_NIC + ) + { + gds_err("Erroneous flusher type=%d (allowed values 0-%d)\n", gds_use_flusher, GDS_FLUSHER_NIC); + gds_use_flusher=GDS_FLUSHER_NONE; + } + } + else + gds_use_flusher = GDS_FLUSHER_NONE; + + gds_warn("GDS_FLUSHER_TYPE=%d\n", gds_use_flusher); + } + return gds_use_flusher; +} + +int gds_flusher_setup() +{ + gds_flusher_get_envars(); + + if(gds_use_flusher == GDS_FLUSHER_NATIVE) + { +//At least CUDA 9.2 +#if (CUDA_VERSION >= 9020) + + int attr = 0; + CUresult result = CUDA_SUCCESS; + + result = cuDeviceGetAttribute(&attr, CU_DEVICE_ATTRIBUTE_CAN_USE_WAIT_VALUE_FLUSH, local_gpu_id); + if (CUDA_SUCCESS != result) { + const char *err_str = NULL; + cuGetErrorString(result, &err_str); + gds_err("got CUDA result %d (%s) while submitting batch operations:\n", result, err_str); + exit(EXIT_FAILURE); + } + if(attr == 0) + { + gds_warn("GPU #%d doesn't support native flusher\n", local_gpu_id); + gds_use_flusher=GDS_FLUSHER_NONE; + } +#endif + + if(gds_use_flusher == GDS_FLUSHER_NATIVE) + gds_warn("Using GPU native flusher\n"); + } + else + { + if(gds_use_flusher == GDS_FLUSHER_NONE) + gds_warn("No flusher service nor GPU native flusher\n"); + else + gds_warn("Using flusher service '%s' (%d)\n", flusher_int_to_str(gds_use_flusher), gds_use_flusher); + } + + return 0; +} + +int gds_flusher_init(struct ibv_pd *pd, struct ibv_context *context, int gpu_id) +{ + int ret = 0; + unsigned int flag = 1; + + local_gpu_id=gpu_id; + gds_flusher_setup(); + CHECK_FLUSHER_SERVICE(); + + flread_d.size=1*sizeof(int); + flack_d.size=1*sizeof(int); + + gds_dbg("gds_use_flusher=%d\n", gds_use_flusher); + + if(gds_use_flusher == GDS_FLUSHER_CPU) + { + // ------------------ READ WORD ------------------ + ret = gds_flusher_pin_buffer(&flread_d, flread_d.size, GDS_MEMORY_GPU); + if (ret) { + gds_err("error %d while allocating mapped GPU buffers\n", ret); + goto out; + } + CUCHECK(cuMemsetD32(flread_d.buf_d, 0, 1)); + gds_dbg("ReadWord pinned. size: %d buf_d=%p buf_h=%p\n", flread_d.size, flread_d.buf_d, flread_d.buf_h); + + // ------------------ ACK WORD ------------------ + ret = gds_flusher_pin_buffer(&flack_d, flack_d.size, GDS_MEMORY_GPU); + if (ret) { + gds_err("error %d while allocating mapped GPU buffers\n", ret); + goto out; + } + CUCHECK(cuMemsetD32(flack_d.buf_d, 0, 1)); + gds_dbg("Ackword pinned. size: %d buf_d=%p buf_h=%p\n", flack_d.size, flack_d.buf_d, flack_d.buf_h); + + // ------------------ SIGNAL WORD ------------------ + CUCHECK(cuMemAllocHost((void**)&flsign_h, 1*sizeof(int))); + memset(flsign_h, 0, sizeof(int)); + gds_dbg("SignalWord on Host Mem %p\n", flsign_h); + + // ------------------ THREAD ------------------ + gds_flusher_start_thread(&flusher_thread); + } + else if(gds_use_flusher == GDS_FLUSHER_NIC) + { + // ------------------ READ WORD ------------------ + ret = gds_flusher_pin_buffer(&flread_d, flread_d.size, GDS_MEMORY_GPU); + if (ret) { + gds_err("error %d while allocating mapped GPU buffers\n", ret); + goto out; + } + CUCHECK(cuMemsetD32(flread_d.buf_d, 0, 1)); + gds_dbg("ReadWord pinned. size: %d buf_d=%p buf_h=%p\n", flread_d.size, flread_d.buf_d, flread_d.buf_h); + + // ------------------ ACK WORD ------------------ + ret = gds_flusher_pin_buffer(&flack_d, flack_d.size, GDS_MEMORY_GPU); + if (ret) { + gds_err("error %d while allocating mapped GPU buffers\n", ret); + goto out; + } + CUCHECK(cuMemsetD32(flack_d.buf_d, 0, 1)); + gds_dbg("Ackword pinned. size: %d buf_d=%p buf_h=%p\n", flack_d.size, flack_d.buf_d, flack_d.buf_h); + + if(flusher_qp == NULL) + { + flusher_qp = (struct flusher_qp_info *) calloc (1, sizeof(struct flusher_qp_info)); + if(!flusher_qp) { + gds_err("error %d while allocating mapped GPU buffers\n", ret); + goto out; + } + + flusher_qp->gpu_id = local_gpu_id; + flusher_qp->ib_port=GDS_FLUSHER_PORT; + flusher_qp->context=context; + flusher_qp->pd=pd; + + ret = gds_flusher_create_qp(); + if (ret) { + gds_err("error %d gds_flusher_create_qp\n", ret); + goto out; + } + + flread_d.mr = ibv_reg_mr(flusher_qp->pd, (void*)flread_d.buf_d, flread_d.size, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE); + if (!flread_d.mr) { + gds_err("Couldn't register MR\n"); + ret=EINVAL; + goto out; + } + + gds_dbg("flread_d ibv_reg_mr addr:%p size:%zu flags=0x%08x, reg=%p lkey=%x, rkey=%x\n", + flread_d.buf_d, flread_d.size, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE, flread_d.mr, flread_d.mr->lkey, flread_d.mr->rkey); + + flack_d.mr = ibv_reg_mr(flusher_qp->pd, (void*)flack_d.buf_d, flack_d.size, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE); + if (!flread_d.mr) { + gds_err("Couldn't register MR\n"); + ret=EINVAL; + goto out; + } + + gds_dbg("flack_d ibv_reg_mr addr:%p size:%zu flags=0x%08x, reg=%p lkey=%x, rkey=%x\n", + flack_d.buf_d, flack_d.size, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE, flack_d.mr, flack_d.mr->lkey, flack_d.mr->rkey); + + } + } + + if(!ret) + gds_warn("Flusher initialized\n"); + + out: + return ret; +} + +int gds_flusher_destroy() +{ + int ret = 0; + unsigned int flag = 1; + + CHECK_FLUSHER_SERVICE(); + + gds_dbg("gds_use_flusher=%d\n", gds_use_flusher); + + if(gds_use_flusher == GDS_FLUSHER_CPU) + { + ret=gds_flusher_stop_thread(flusher_thread); + if(ret) + { + gds_err("gds_fill_poke error %d\n", ret); + goto out; + } + + ret=gds_flusher_free_pinned_buffer(&(flread_d)); + if (ret) { + gds_err("error %d while freeing mapped GPU buffers\n", ret); + goto out; + } + + ret=gds_flusher_free_pinned_buffer(&(flack_d)); + if (ret) { + gds_err("error %d while freeing mapped GPU buffers\n", ret); + goto out; + } + + CUCHECK(cuMemFreeHost(flsign_h)); + + gds_dbg("Device words unpinned\n"); + } + else if(gds_use_flusher == GDS_FLUSHER_NIC) + { + if(!flusher_qp) { + gds_err("error !flusher_qp\n"); + ret=EINVAL; + goto out; + } + + if(!(flusher_qp->loopback_qp)) { + gds_err("error !loopback_qp\n"); + ret=EINVAL; + goto out; + } + + ret = ibv_destroy_qp(flusher_qp->loopback_qp->qp); + if (ret) { + gds_err("error %d in destroy_qp\n", ret); + goto out; + } + + assert(flusher_qp->loopback_qp->send_cq.cq); + ret = ibv_destroy_cq(flusher_qp->loopback_qp->send_cq.cq); + if (ret) { + gds_err("error %d in destroy_cq send_cq\n", ret); + goto out; + } + //send_cq == recv_cq + + if(flread_d.mr) { + ret = ibv_dereg_mr(flread_d.mr); + if (ret) { + gds_err("error %d in ibv_dereg_mr\n", ret); + goto out; + } + } + + if(flack_d.mr) { + ret = ibv_dereg_mr(flack_d.mr); + if (ret) { + gds_err("error %d in ibv_dereg_mr\n", ret); + goto out; + } + } + + free(flusher_qp->loopback_qp); + free(flusher_qp); + flusher_qp=NULL; + + gds_dbg("Flusher QP destroyed\n"); + + } + + if(!ret) gds_warn("Flusher destroyed\n"); + + out: + return ret; +} + +int gds_flusher_count_op() +{ + if(gds_use_flusher == GDS_FLUSHER_CPU) + return GDS_FLUSHER_OP_CPU; + + if(gds_use_flusher == GDS_FLUSHER_NIC) + return GDS_FLUSHER_OP_NIC; + + //in case of native flusher or no flusher + return 0; +} + +//Not actually used for now! +int gds_flusher_post_stream(CUstream stream) +{ + gds_descriptor_t desc[3]; + gds_send_request_t send_info; + int k = 0, ret = 0; + + CHECK_FLUSHER_SERVICE(); + + if(gds_use_flusher == GDS_FLUSHER_CPU) + { + //write32 signal + desc[k].tag = GDS_TAG_WRITE_VALUE32; + desc[k].write32.ptr = (uint32_t *) flsign_h; + desc[k].write32.value = flusher_value+1; + desc[k].write32.flags = GDS_MEMORY_HOST; + ++k; + + //wait32 ackword + desc[k].tag = GDS_TAG_WAIT_VALUE32; + desc[k].wait32.ptr = (uint32_t *)flack_d.buf_d; + desc[k].wait32.value = flusher_value+1; + desc[k].wait32.cond_flags = GDS_WAIT_COND_EQ; + desc[k].wait32.flags = GDS_MEMORY_GPU; + ++k; + } + else + { + //flusher NIC + //write read word + desc[k].tag = GDS_TAG_WRITE_VALUE32; + desc[k].write32.ptr = (uint32_t *) flread_d.buf_d; + desc[k].write32.value = flusher_value+1; + desc[k].write32.flags = GDS_MEMORY_GPU; + ++k; + //write order respected? + + + ret=gds_flusher_prepare_put_dmem(&send_info, &flread_d, &flack_d); + if(ret) + { + gds_err("gds_flusher_prepare_put_dmem, err: %d\n", ret); + goto out; + } + + desc[k].tag = GDS_TAG_SEND; + desc[k].send = &send_info; + ++k; + + //wait32 ackword + desc[k].tag = GDS_TAG_WAIT_VALUE32; + desc[k].wait32.ptr = (uint32_t *)flack_d.buf_d; + desc[k].wait32.value = flusher_value+1; + desc[k].wait32.cond_flags = GDS_WAIT_COND_EQ; + desc[k].wait32.flags = GDS_MEMORY_GPU; + ++k; + } + + ret = gds_stream_post_descriptors(stream, k, desc, 0); + if (ret) + { + gds_err("gds_stream_post_descriptors, err: %d\n", ret); + return EINVAL; + } + //not multithread safe! + flusher_value++; + + out: + return ret; +} + +int gds_flusher_add_ops(CUstreamBatchMemOpParams *params, int &idx) +{ + int ret=0, tmp_idx=0; + gds_send_request_t send_info; + + CHECK_FLUSHER_SERVICE(); + + if(gds_use_flusher == GDS_FLUSHER_CPU) + { + gds_dbg("gds_fill_poke flsign_h=%p, flusher_value+1=%d, idx=%d\n", flsign_h, flusher_value+1, idx); + ret = gds_fill_poke(params+idx, (uint32_t*)flsign_h, flusher_value+1, GDS_MEMORY_GPU); + if(ret) + { + gds_err("gds_fill_poke error %d\n", ret); + goto out; + } + ++idx; + + gds_dbg("gds_fill_poll flack_d.buf_d=%p, flusher_value+1=%d, idx=%d\n", flack_d.buf_d, flusher_value+1, idx); + ret = gds_fill_poll(params+idx, (uint32_t*)flack_d.buf_d, flusher_value+1, GDS_WAIT_COND_EQ, GDS_MEMORY_GPU); + if(ret) + { + gds_err("gds_fill_poll error %d\n", ret); + goto out; + } + ++idx; + } + else if(gds_use_flusher == GDS_FLUSHER_NIC) + { + ret = gds_fill_poke(params+idx, (uint32_t*)flread_d.buf_d, flusher_value+1, GDS_MEMORY_GPU); + if(ret) + { + gds_err("gds_fill_poke error %d\n", ret); + goto out; + } + gds_dbg("gds_fill_poke done flread_d.buf_d=%p, flusher_value+1=%d, idx=%d\n", flread_d.buf_d, flusher_value+1, idx); + + ++idx; + + ret=gds_flusher_prepare_put_dmem(&send_info, &flread_d, &flack_d); + if(ret) + { + gds_err("gds_flusher_prepare_put_dmem, err: %d\n", ret); + goto out; + } + + ret = gds_post_ops(send_info.commit.entries, send_info.commit.storage, params, idx); + if (ret) { + gds_err("error %d in gds_post_ops\n", ret); + goto out; + } + gds_dbg("gds_post_ops send_info.commit.entries=%p, flusher_value+1=%d, idx=%d\n", send_info.commit.entries, flusher_value+1, idx); + + ret = gds_fill_poll(params+idx, (uint32_t*)flack_d.buf_d, flusher_value+1, GDS_WAIT_COND_EQ, GDS_MEMORY_GPU); + if(ret) + { + gds_err("gds_fill_poll error %d\n", ret); + goto out; + } + gds_dbg("gds_fill_poll flack_d.buf_d=%p, flusher_value+1=%d, idx=%d\n", flack_d.buf_d, flusher_value+1, idx); + ++idx; + + } + + gds_dbg("Final idx=%d\n", idx); + + ++flusher_value; + + out: + return ret; + +} + +int gds_flusher_start_thread(pthread_t *fThread) //, threadFunc, void *arg) +{ + int ret=0; + + CHECK_FLUSHER_SERVICE(); + + if(fThread == NULL) + { + gds_warn("error input"); + return EINVAL; + } + + gds_dbg("Create Thread\n"); + + if(pthread_create(fThread, NULL, gds_flusher_func_thread, NULL) != 0) { + gds_err("pthread_create, err: %d\n", ret); + return EINVAL; + } + + out: + return ret; +} + +int gds_flusher_stop_thread(pthread_t fThread) +{ + int ret=0; + void * tret; + + CHECK_FLUSHER_SERVICE(); + + ret=pthread_cancel(fThread); + if(ret) + { + gds_err("pthread_cancel, ret: %d\n", ret); + return EINVAL; + } + + ret=pthread_join(fThread, &tret); + if(ret) + { + gds_err("pthread_join, ret: %d, thread ret: %ld\n", ret, (long)tret); + return EINVAL; + } + + out: + return ret; +} + +#if 0 +#include + +#define TIMER_DEF(n) struct timeval temp_1_##n={0,0}, temp_2_##n={0,0} +#define TIMER_START(n) gettimeofday(&temp_1_##n, (struct timezone*)0) +#define TIMER_STOP(n) gettimeofday(&temp_2_##n, (struct timezone*)0) +#define TIMER_ELAPSED(n) ((temp_2_##n.tv_sec-temp_1_##n.tv_sec)*1.e6+(temp_2_##n.tv_usec-temp_1_##n.tv_usec)) + +//-------------------------------- NVTX ----------------------------------------- +#include "nvToolsExt.h" +const uint32_t colors[] = { 0x0000ff00, 0x000000ff, 0x00ffff00, 0x00ff00ff, 0x0000ffff, 0x00ff0000, 0x00ffffff }; +const int num_colors = sizeof(colors)/sizeof(uint32_t); + +#define PUSH_RANGE(name,cid) { \ + int color_id = cid; \ + color_id = color_id%num_colors;\ + nvtxEventAttributes_t eventAttrib = {0}; \ + eventAttrib.version = NVTX_VERSION; \ + eventAttrib.size = NVTX_EVENT_ATTRIB_STRUCT_SIZE; \ + eventAttrib.colorType = NVTX_COLOR_ARGB; \ + eventAttrib.color = colors[color_id]; \ + eventAttrib.messageType = NVTX_MESSAGE_TYPE_ASCII; \ + eventAttrib.message.ascii = name; \ + nvtxRangePushEx(&eventAttrib); \ +} +#define POP_RANGE nvtxRangePop(); +#endif +//------------------------------------------------------------------------------- + +typedef int64_t gds_us_t; +static inline gds_us_t gds_get_time_us() +{ + struct timespec ts; + int ret = clock_gettime(CLOCK_MONOTONIC, &ts); + if (ret) { + fprintf(stderr, "error in gettime %d/%s\n", errno, strerror(errno)); + exit(EXIT_FAILURE); + } + return (gds_us_t)ts.tv_sec * 1000 * 1000 + (gds_us_t)ts.tv_nsec / 1000; +} + +void * gds_flusher_func_thread(void * arg) +{ + int last_value=0; + int tmp=0; + gds_us_t start, end; + gds_us_t delta1; + gds_us_t delta2; + gds_us_t delta3; + gds_us_t delta4; + int local_rank = atoi(getenv("OMPI_COMM_WORLD_LOCAL_RANK")); + + //Should be the default setting + //pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); + + while(1) //avoid mutex for kill condition + { + gds_dbg("Thread waiting on flsign_h=%p, current last_value=%d\n", flsign_h, last_value); + + //PUSH_RANGE("THREAD", 1); + //start = gds_get_time_us(); + while( (ACCESS_ONCE( *flsign_h )) <= last_value ) { pthread_testcancel(); } + //end = gds_get_time_us(); + //delta1 = end - start; + + //start = gds_get_time_us(); + last_value = ACCESS_ONCE( *flsign_h ); + rmb(); + //end = gds_get_time_us(); + //delta2 = end - start; + + gds_dbg("Thread last_value=%d\n", last_value); + + //start = gds_get_time_us(); + tmp = ACCESS_ONCE ( *((int*)flread_d.buf_h) ); //Should be always 0! + wmb(); + //end = gds_get_time_us(); + //delta3 = end - start; + + //start = gds_get_time_us(); + //gds_dbg("Thread tmp=%d after wmb\n", tmp); + ACCESS_ONCE ( *((int*)flack_d.buf_h) ) = last_value; + wmb(); + //end = gds_get_time_us(); + //delta4 = end - start; + + //POP_RANGE; + //if(local_rank == 0) + // gds_warn("thread --> last_value: %d, while polling time: %.2f us, sign read time: %.2f us, read_d time: %.2f us, write ack time: %.2f us\n", last_value, (double)delta1, (double)delta2, (double)delta3, (double)delta4); + } + + return NULL; +} + +//----------------------------------------------------------------------------- +/* + * Local variables: + * c-indent-level: 8 + * c-basic-offset: 8 + * tab-width: 8 + * indent-tabs-mode: nil + * End: + */ diff --git a/src/flusher.hpp b/src/flusher.hpp new file mode 100644 index 0000000..8dd6696 --- /dev/null +++ b/src/flusher.hpp @@ -0,0 +1,118 @@ +/* Copyright (c) 2016, NVIDIA CORPORATION. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of NVIDIA CORPORATION nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#if HAVE_CONFIG_H +# include +#endif /* HAVE_CONFIG_H */ + +#include +#include +#include +#include + +#include +#include +#include +using namespace std; + +#include +//#include +#include +#include + +#include +#include +#include + +#include "gdsync.h" +#include "gdsync/tools.h" +#include "objs.hpp" +#include "utils.hpp" +#include "memmgr.hpp" +#include "archutils.h" + +typedef enum gds_flusher_type { + GDS_FLUSHER_NONE=0, + GDS_FLUSHER_NATIVE, + GDS_FLUSHER_CPU, + GDS_FLUSHER_NIC +} gds_flusher_type_t; + +#define GDS_FLUSHER_OP_NATIVE 0 +#define GDS_FLUSHER_OP_CPU 2 +#define GDS_FLUSHER_OP_NIC 5 + +#define GDS_FLUSHER_PORT 1 +#define GDS_FLUSHER_QKEY 0 //0x11111111 + +typedef struct gds_flusher_buf { + CUdeviceptr buf_d; + void * buf_h; + int size; + gdr_mh_t mh; + gds_mem_desc_t *desc; + struct ibv_mr * mr; +} gds_flusher_buf; +typedef gds_flusher_buf * gds_flusher_buf_t; + +typedef struct flusher_qp_info { + struct gds_qp *loopback_qp; + struct ibv_pd *pd; + struct ibv_context *context; + int gpu_id; + struct ibv_ah * ah; + char gid_string[INET6_ADDRSTRLEN]; + union ibv_gid gid_bin; + int lid; + int qpn; + int psn; + int ib_port; +} flusher_qp_info; +typedef flusher_qp_info * flusher_qp_info_t; + +//----------------------------------------------------------------------------- +bool gds_use_native_flusher(); +int gds_flusher_setup(); +int gds_flusher_get_envars(); +int gds_flusher_init(struct ibv_pd *pd, struct ibv_context *context, int gpu_id); +int gds_flusher_destroy(); +int gds_flusher_count_op(); +int gds_flusher_post_stream(CUstream stream); +int gds_flusher_add_ops(CUstreamBatchMemOpParams *params, int &idx); + +int gds_flusher_start_thread(pthread_t *fThread); +int gds_flusher_stop_thread(pthread_t fThread); +void * gds_flusher_func_thread(void *); +//----------------------------------------------------------------------------- +/* + * Local variables: + * c-indent-level: 8 + * c-basic-offset: 8 + * tab-width: 8 + * indent-tabs-mode: nil + * End: + */ diff --git a/src/gdsync.cpp b/src/gdsync.cpp index a52319e..892a9a9 100644 --- a/src/gdsync.cpp +++ b/src/gdsync.cpp @@ -42,7 +42,7 @@ #include "objs.hpp" #include "archutils.h" #include "mlnxutils.h" - +#include "flusher.hpp" //----------------------------------------------------------------------------- int gds_dbg_enabled() @@ -94,6 +94,7 @@ int gds_dbg_enabled() // TODO: use corret value // TODO: make it dependent upon the particular GPU const size_t GDS_GPU_MAX_INLINE_SIZE = 256; +const size_t GDS_MAX_BATCH_OPS = 256; //----------------------------------------------------------------------------- @@ -208,7 +209,7 @@ void gds_dump_param(CUstreamBatchMemOpParams *param) case CU_STREAM_MEM_OP_WAIT_VALUE_32: gds_info("WAIT32 addr:%p alias:%p value:%08x flags:%08x\n", (void*)param->waitValue.address, - (void*)param->writeValue.alias, + (void*)param->waitValue.alias, param->waitValue.value, param->waitValue.flags); break; @@ -436,8 +437,10 @@ static int gds_fill_poll(CUstreamBatchMemOpParams *param, CUdeviceptr ptr, uint3 retcode = EINVAL; goto out; } - if (need_flush) - param->waitValue.flags |= CU_STREAM_WAIT_VALUE_FLUSH; + + //No longer supported since CUDA 9.1 + if (need_flush) param->waitValue.flags |= CU_STREAM_WAIT_VALUE_FLUSH; + gds_dbg("op=%d addr=%p value=%08x cond=%s flags=%08x\n", param->operation, (void*)param->waitValue.address, @@ -478,7 +481,7 @@ int gds_stream_batch_ops(CUstream stream, int nops, CUstreamBatchMemOpParams *pa #endif gds_dbg("nops=%d flags=%08x\n", nops, cuflags); - if (nops > 256) { + if (nops > GDS_MAX_BATCH_OPS) { gds_warn("batch size might be too big, stream=%p nops=%d params=%p flags=%08x\n", stream, nops, params, flags); //return EINVAL; } @@ -561,7 +564,7 @@ int gds_post_ops(size_t n_ops, struct peer_op_wr *op, CUstreamBatchMemOpParams * for (; op && n < n_ops; op = op->next, ++n) { //int flags = 0; - gds_dbg("op[%zu] type:%08x\n", n, op->type); + gds_dbg("op[%zu] type:%08x, idx: %d\n", n, op->type, idx); switch(op->type) { case IBV_EXP_PEER_OP_FENCE: { gds_dbg("OP_FENCE: fence_flags=%"PRIu64"\n", op->wr.fence.fence_flags); @@ -731,9 +734,9 @@ int gds_post_ops(size_t n_ops, struct peer_op_wr *op, CUstreamBatchMemOpParams * uint32_t data = op->wr.dword_va.data; // TODO: properly handle a following fence instead of blidly flushing int flags = 0; - if (!(post_flags & GDS_POST_OPS_DISCARD_WAIT_FLUSH)) + if (!(post_flags & GDS_POST_OPS_DISCARD_WAIT_FLUSH) && gds_use_native_flusher()) flags |= GDS_WAIT_POST_FLUSH; - + gds_dbg("OP_WAIT_DWORD dev_ptr=%llx data=%"PRIx32"\n", dev_ptr, data); switch(op->type) { @@ -756,7 +759,18 @@ int gds_post_ops(size_t n_ops, struct peer_op_wr *op, CUstreamBatchMemOpParams * goto out; } retcode = gds_fill_poll(params+idx, dev_ptr, data, poll_cond, flags); - ++idx; + ++idx; + + retcode = gds_flusher_add_ops(params, idx); + if(retcode) + { + retcode = EINVAL; + gds_err("error in gds_flusher_add_ops func (idx=%d)\n", n, idx); + goto out; + } + //gds_warn("poll params\n"); + //gds_dump_params(idx, params); + break; } default: @@ -1372,13 +1386,14 @@ struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context, gds gds_peer *peer = NULL; gds_peer_attr *peer_attr = NULL; int old_errno = errno; + bool is_qp_flusher = (flags & GDS_CREATE_QP_FLUSHER); gds_dbg("pd=%p context=%p gpu_id=%d flags=%08x errno=%d\n", pd, context, gpu_id, flags, errno); assert(pd); assert(context); assert(qp_attr); - if (flags & ~(GDS_CREATE_QP_WQ_ON_GPU|GDS_CREATE_QP_TX_CQ_ON_GPU|GDS_CREATE_QP_RX_CQ_ON_GPU|GDS_CREATE_QP_WQ_DBREC_ON_GPU)) { + if (flags & ~(GDS_CREATE_QP_WQ_ON_GPU|GDS_CREATE_QP_TX_CQ_ON_GPU|GDS_CREATE_QP_RX_CQ_ON_GPU|GDS_CREATE_QP_WQ_DBREC_ON_GPU|GDS_CREATE_QP_FLUSHER|GDS_CREATE_QP_GPU_INVALIDATE_RX_CQ|GDS_CREATE_QP_GPU_INVALIDATE_TX_CQ)) { gds_err("invalid flags"); return NULL; } @@ -1398,23 +1413,46 @@ struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context, gds gds_err("error %d while creating TX CQ, old_errno=%d\n", ret, old_errno); goto err; } + qp_attr->send_cq = tx_cq; + gds_dbg("created send_cq=%p\n", qp_attr->send_cq); + if(!is_qp_flusher) + { + gds_dbg("creating RX CQ\n"); + rx_cq = gds_create_cq(context, qp_attr->cap.max_recv_wr, NULL, NULL, 0, gpu_id, + (flags & GDS_CREATE_QP_RX_CQ_ON_GPU) ? + GDS_ALLOC_CQ_ON_GPU : GDS_ALLOC_CQ_DEFAULT); + if (!rx_cq) { + ret = errno; + gds_err("error %d while creating RX CQ\n", ret); + goto err_free_tx_cq; + } - gds_dbg("creating RX CQ\n"); - rx_cq = gds_create_cq(context, qp_attr->cap.max_recv_wr, NULL, NULL, 0, gpu_id, - (flags & GDS_CREATE_QP_RX_CQ_ON_GPU) ? - GDS_ALLOC_CQ_ON_GPU : GDS_ALLOC_CQ_DEFAULT); - if (!rx_cq) { - ret = errno; - gds_err("error %d while creating RX CQ\n", ret); - goto err_free_tx_cq; + // peer registration + qp_attr->recv_cq = rx_cq; } + //The flusher doesn't actually need CQs + else qp_attr->recv_cq = tx_cq; + + gds_dbg("created recv_cq=%p\n", qp_attr->recv_cq); - // peer registration - qp_attr->send_cq = tx_cq; - qp_attr->recv_cq = rx_cq; qp_attr->pd = pd; qp_attr->comp_mask |= IBV_EXP_QP_INIT_ATTR_PD; + // disable overflow checks in ibv_poll_cq(), as GPU might invalidate + // the CQE without updating the tracking variables + if (flags & GDS_CREATE_QP_GPU_INVALIDATE_RX_CQ) { + gds_warn("IGNORE_RQ_OVERFLOW\n"); + qp_attr->exp_create_flags |= IBV_EXP_QP_CREATE_IGNORE_RQ_OVERFLOW; + qp_attr->comp_mask |= IBV_EXP_QP_INIT_ATTR_CREATE_FLAGS; + } + + if (flags & GDS_CREATE_QP_GPU_INVALIDATE_TX_CQ) { + gds_warn("IGNORE_SQ_OVERFLOW\n"); + qp_attr->exp_create_flags |= IBV_EXP_QP_CREATE_IGNORE_SQ_OVERFLOW; + qp_attr->comp_mask |= IBV_EXP_QP_INIT_ATTR_CREATE_FLAGS; + } + + gds_dbg("before gds_register_peer_ex\n"); ret = gds_register_peer_ex(context, gpu_id, &peer, &peer_attr); if (ret) { @@ -1441,20 +1479,34 @@ struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context, gds qp_attr->comp_mask |= IBV_EXP_QP_INIT_ATTR_PEER_DIRECT; qp_attr->peer_direct_attrs = peer_attr; + gds_dbg("Comp Mask: %x, exp_create_flags: %x\n", qp_attr->comp_mask, qp_attr->exp_create_flags); qp = ibv_exp_create_qp(context, qp_attr); if (!qp) { ret = EINVAL; - gds_err("error in ibv_exp_create_qp\n"); + gds_err("error in ibv_exp_create_qp, errno: %d, %s\n", errno, strerror(errno)); goto err_free_cqs; } + if(!is_qp_flusher) + gds_warn("created gds_qp=%p\n", gqp); + else + gds_warn("created flusher gds_qp=%p\n", gqp); + gqp->qp = qp; gqp->send_cq.cq = qp->send_cq; gqp->send_cq.curr_offset = 0; gqp->recv_cq.cq = qp->recv_cq; gqp->recv_cq.curr_offset = 0; - gds_dbg("created gds_qp=%p\n", gqp); + if(!is_qp_flusher) + { + if(gds_flusher_init(pd, context, gpu_id)) + { + ret = EINVAL; + gds_err("error in gds_flusher_init\n"); + goto err_free_qp; + } + } return gqp; @@ -1513,6 +1565,8 @@ int gds_destroy_qp(struct gds_qp *qp) free(qp); + gds_flusher_destroy(); + return retcode; }