From 71ec1fe8c8486f005efb0215ca4e1499b5ec012d Mon Sep 17 00:00:00 2001 From: e-ago Date: Tue, 16 Jan 2018 18:40:07 +0100 Subject: [PATCH 1/7] Initial commit flusher --- Makefile.am | 2 +- include/gdsync/core.h | 13 +- src/apis.cpp | 6 +- src/flusher.cpp | 892 ++++++++++++++++++++++++++++++++++++++++++ src/flusher.hpp | 124 ++++++ src/gdsync.cpp | 87 +++- 6 files changed, 1097 insertions(+), 27 deletions(-) create mode 100644 src/flusher.cpp create mode 100644 src/flusher.hpp diff --git a/Makefile.am b/Makefile.am index d91c363..9e2fd21 100644 --- a/Makefile.am +++ b/Makefile.am @@ -21,7 +21,7 @@ libgdsyncincludedir = $(includedir)/gdsync libgdsyncinclude_HEADERS = include/gdsync/core.h include/gdsync/device.cuh include/gdsync/mlx5.h include/gdsync/tools.h src_libgdsync_la_CFLAGS = $(AM_CFLAGS) -src_libgdsync_la_SOURCES = src/gdsync.cpp src/memmgr.cpp src/mem.cpp src/objs.cpp src/apis.cpp src/mlx5.cpp include/gdsync.h +src_libgdsync_la_SOURCES = src/gdsync.cpp src/memmgr.cpp src/mem.cpp src/objs.cpp src/apis.cpp src/mlx5.cpp src/flusher.cpp include/gdsync.h src_libgdsync_la_LDFLAGS = -version-info 2:0:1 noinst_HEADERS = src/mem.hpp src/memmgr.hpp src/objs.hpp src/rangeset.hpp src/utils.hpp src/archutils.h src/mlnxutils.h diff --git a/include/gdsync/core.h b/include/gdsync/core.h index ef501c0..932da03 100644 --- a/include/gdsync/core.h +++ b/include/gdsync/core.h @@ -47,11 +47,14 @@ typedef enum gds_param { int gds_query_param(gds_param_t param, int *value); enum gds_create_qp_flags { - GDS_CREATE_QP_DEFAULT = 0, - GDS_CREATE_QP_WQ_ON_GPU = 1<<0, - GDS_CREATE_QP_TX_CQ_ON_GPU = 1<<1, - GDS_CREATE_QP_RX_CQ_ON_GPU = 1<<2, - GDS_CREATE_QP_WQ_DBREC_ON_GPU = 1<<5, + GDS_CREATE_QP_DEFAULT = 0, + GDS_CREATE_QP_WQ_ON_GPU = 1<<0, + GDS_CREATE_QP_TX_CQ_ON_GPU = 1<<1, + GDS_CREATE_QP_RX_CQ_ON_GPU = 1<<2, + GDS_CREATE_QP_GPU_INVALIDATE_TX_CQ = 1 << 3, + GDS_CREATE_QP_GPU_INVALIDATE_RX_CQ = 1 << 4, + GDS_CREATE_QP_WQ_DBREC_ON_GPU = 1<<5, + GDS_CREATE_QP_FLUSHER = 1<<6, }; typedef struct ibv_exp_qp_init_attr gds_qp_init_attr_t; diff --git a/src/apis.cpp b/src/apis.cpp index 0f29f2f..bcf80c8 100644 --- a/src/apis.cpp +++ b/src/apis.cpp @@ -49,7 +49,7 @@ #include "utils.hpp" #include "memmgr.hpp" //#include "mem.hpp" - +#include "flusher.hpp" //----------------------------------------------------------------------------- @@ -622,7 +622,9 @@ int gds_stream_post_descriptors(CUstream stream, size_t n_descs, gds_descriptor_ // move flush to last wait in the whole batch if (n_waits && no_network_descs_after_entry(n_descs, descs, last_wait)) { gds_dbg("optimizing FLUSH to last wait i=%zu\n", last_wait); - move_flush = true; + //If GPU doesn't support native flusher + n_mem_ops += gds_flusher_count_op(); + gds_dbg("optimizing FLUSH to last wait i=%zu, n_mem_ops: %d, flusher ops: %d\n", last_wait, n_mem_ops, gds_flusher_count_op()); } // alternatively, remove flush for wait is next op is a wait too diff --git a/src/flusher.cpp b/src/flusher.cpp new file mode 100644 index 0000000..c8956eb --- /dev/null +++ b/src/flusher.cpp @@ -0,0 +1,892 @@ +/* Copyright (c) 2016, NVIDIA CORPORATION. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of NVIDIA CORPORATION nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "flusher.hpp" + +struct flusher_qp_info * flusher_qp=NULL; + +//-------------------------------- STATIC ------------------------------------ +static int * flsign_h; +static gds_flusher_buf flread_d; +static gds_flusher_buf flack_d; +static int flusher_value=0; +static pthread_t flusher_thread; +static int gds_flusher_service = -1; +static int gds_gpu_has_flusher=-1; +static int local_gpu_id=0; + +static const char * flusher_int_to_str(int flusher_int) +{ + if(flusher_int == GDS_FLUSHER_TYPE_CPU) + return "CPU Thread"; + else if(flusher_int == GDS_FLUSHER_TYPE_NIC) + return "NIC RDMA PUT"; + else + return "Unknown"; +} + +static inline int gds_flusher_service_active() { + if(gds_flusher_service == GDS_FLUSHER_TYPE_CPU || gds_flusher_service == GDS_FLUSHER_TYPE_NIC) + return 1; + else + return 0; +} +#define CHECK_FLUSHER_SERVICE() \ + if(!gds_flusher_service_active()) \ + { \ + gds_dbg("Flusher service not active (%d)\n", gds_flusher_service); \ + goto out; \ + } + +#define ROUND_TO(V,PS) ((((V) + (PS) - 1)/(PS)) * (PS)) + +static int gds_flusher_pin_buffer(gds_flusher_buf * fl_mem, size_t req_size, int type_mem) +{ + int ret=0; + CUcontext gpu_ctx; + CUdevice gpu_device; + size_t size = ROUND_TO(req_size, GDS_GPU_PAGE_SIZE); + + gds_dbg("GPU%u: malloc req_size=%zu size=%zu\n", local_gpu_id, req_size, size); + + if (!fl_mem) { + gds_err("invalid params\n"); + return EINVAL; + } + + // NOTE: gpu_id's primary context is assumed to be the right one + // breaks horribly with multiple contexts + int num_gpus; + do { + CUresult err = cuDeviceGetCount(&num_gpus); + if (CUDA_SUCCESS == err) { + break; + } else if (CUDA_ERROR_NOT_INITIALIZED == err) { + gds_err("CUDA error %d in cuDeviceGetCount, calling cuInit\n", err); + CUCHECK(cuInit(0)); + // try again + continue; + } else { + gds_err("CUDA error %d in cuDeviceGetCount, returning EIO\n", err); + return EIO; + } + } while(0); + gds_dbg("num_gpus=%d\n", num_gpus); + if (local_gpu_id >= num_gpus) { + gds_err("invalid num_GPUs=%d while requesting GPU id %d\n", num_gpus, local_gpu_id); + return EINVAL; + } + + CUCHECK(cuDeviceGet(&gpu_device, local_gpu_id)); + gds_dbg("local_gpu_id=%d gpu_device=%d\n", local_gpu_id, gpu_device); + // TODO: check for existing context before switching to the interop one + CUCHECK(cuDevicePrimaryCtxRetain(&gpu_ctx, gpu_device)); + CUCHECK(cuCtxPushCurrent(gpu_ctx)); + assert(gpu_ctx != NULL); + + gds_mem_desc_t *desc = (gds_mem_desc_t *)calloc(1, sizeof(gds_mem_desc_t)); + if (!desc) { + gds_err("error while allocating mem desc\n"); + ret = ENOMEM; + goto out; + } + + ret = gds_alloc_mapped_memory(desc, size, type_mem); + if (ret) { + gds_err("error %d while allocating mapped GPU buffers\n", ret); + goto out; + } + + fl_mem->buf_h = desc->h_ptr; + fl_mem->buf_d = desc->d_ptr; + fl_mem->desc = desc; + + out: + if (ret) + free(desc); // desc can be NULL + + CUCHECK(cuCtxPopCurrent(NULL)); + CUCHECK(cuDevicePrimaryCtxRelease(gpu_device)); + + return ret; +} + +static int gds_flusher_free_pinned_buffer(gds_flusher_buf * fl_mem) +{ + int ret = 0; + CUcontext gpu_ctx; + CUdevice gpu_device; + + gds_dbg("GPU%u: mfree\n", local_gpu_id); + + if (!fl_mem->desc) { + gds_err("invalid handle\n"); + return EINVAL; + } + + if (!fl_mem->buf_h) { + gds_err("invalid host_addr\n"); + return EINVAL; + } + + // NOTE: gpu_id's primary context is assumed to be the right one + // breaks horribly with multiple contexts + + CUCHECK(cuDeviceGet(&gpu_device, local_gpu_id)); + CUCHECK(cuDevicePrimaryCtxRetain(&gpu_ctx, gpu_device)); + CUCHECK(cuCtxPushCurrent(gpu_ctx)); + assert(gpu_ctx != NULL); + + gds_mem_desc_t *desc = (gds_mem_desc_t *)fl_mem->desc; + ret = gds_free_mapped_memory(desc); + if (ret) { + gds_err("error %d while freeing mapped GPU buffers\n", ret); + } + free(desc); + + CUCHECK(cuCtxPopCurrent(NULL)); + CUCHECK(cuDevicePrimaryCtxRelease(gpu_device)); + + return ret; +} + +static int gds_flusher_create_qp() +{ + struct ibv_port_attr port_attr; + int qp_flags=0; + int attr_flags=0; + int ret=0; + struct ibv_ah_attr ib_ah_attr; + struct ibv_qp_attr attr; + gds_qp_init_attr_t qp_init_attr; + + qp_flags |= GDS_CREATE_QP_FLUSHER; + //qp_flags |= GDS_CREATE_QP_GPU_INVALIDATE_RX_CQ; + qp_flags |= GDS_CREATE_QP_GPU_INVALIDATE_TX_CQ; + + // -------------------------------------------- + + memset(&qp_init_attr, 0, sizeof(gds_qp_init_attr_t)); + + qp_init_attr.send_cq = 0; + qp_init_attr.recv_cq = 0; + qp_init_attr.cap.max_send_wr = 512; + qp_init_attr.cap.max_recv_wr = 512; + qp_init_attr.cap.max_send_sge = 1; + qp_init_attr.cap.max_recv_sge = 1; + qp_init_attr.cap.max_inline_data = 0; //The flusher must not inline data!! + qp_init_attr.qp_type = IBV_QPT_RC; //UD & RDMA_WRITE are not compatible! + + flusher_qp->loopback_qp = gds_create_qp(flusher_qp->pd, flusher_qp->context, &qp_init_attr, flusher_qp->gpu_id, qp_flags); + if (!flusher_qp->loopback_qp) { + gds_err("gds_set_loopback_qp returned NULL\n"); + ret=EINVAL; + goto out; + } + + // -------------------------------------------- + + memset(&attr, 0, sizeof(struct ibv_qp_attr)); + attr.qp_state = IBV_QPS_INIT; + attr.pkey_index = 0; + attr.port_num = flusher_qp->ib_port; + attr.qkey = GDS_FLUSHER_QKEY; + attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE; + + attr_flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS; + + if (ibv_modify_qp(flusher_qp->loopback_qp->qp, &attr, attr_flags)) { + gds_err("Failed to modify QP to INIT\n"); + goto clean_qp; + } + + if(ibv_query_port(flusher_qp->context, flusher_qp->ib_port, &port_attr)) + { + fprintf(stderr, "Failed to modify QP to INIT\n"); + goto clean_qp; + } + + flusher_qp->lid=port_attr.lid; + flusher_qp->qpn=flusher_qp->loopback_qp->qp->qp_num; + flusher_qp->psn=0; //lrand48() & 0xffffff; + + // -------------------------------------------- + + memset(&attr, 0, sizeof(struct ibv_qp_attr)); + attr.qp_state = IBV_QPS_RTR; + attr.path_mtu = port_attr.active_mtu; + attr.dest_qp_num = flusher_qp->qpn; + attr.rq_psn = flusher_qp->psn; + attr.ah_attr.dlid = flusher_qp->lid; + attr.max_dest_rd_atomic = 1; + attr.min_rnr_timer = 12; + attr.ah_attr.is_global = 0; + attr.ah_attr.sl = 0; + attr.ah_attr.src_path_bits = 0; + attr.ah_attr.port_num = flusher_qp->ib_port; + attr_flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MIN_RNR_TIMER | IBV_QP_MAX_DEST_RD_ATOMIC; + + if (ibv_modify_qp(flusher_qp->loopback_qp->qp, &attr, attr_flags)) { + gds_err("Failed to modify QP to RTR\n"); + goto clean_qp; + } + + // -------------------------------------------- + + memset(&attr, 0, sizeof(struct ibv_qp_attr)); + attr.qp_state = IBV_QPS_RTS; + attr.sq_psn = 0; + attr.timeout = 20; + attr.retry_cnt = 7; + attr.rnr_retry = 7; + attr.max_rd_atomic = 1; + attr_flags = IBV_QP_STATE | IBV_QP_SQ_PSN | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_MAX_QP_RD_ATOMIC; + + if (ibv_modify_qp(flusher_qp->loopback_qp->qp, &attr, attr_flags)) { + gds_err("Failed to modify QP to RTS\n"); + goto clean_qp; + } + + out: + return ret; + + clean_qp: + gds_destroy_qp(flusher_qp->loopback_qp); + return EINVAL; + +} + +static int gds_flusher_prepare_put_dmem(gds_send_request_t * send_info, struct gds_flusher_buf * src_fbuf, struct gds_flusher_buf * dst_fbuf) +{ + int ret=0; + struct ibv_sge list; + gds_send_wr p_ewr; + gds_send_wr *bad_ewr; + + assert(send_info); + assert(src_fbuf); + assert(dst_fbuf); + + //send read word + memset(&list, 0, sizeof(struct ibv_sge)); + list.addr = (uintptr_t) (src_fbuf->buf_d); + list.length = src_fbuf->size; + list.lkey = src_fbuf->mr->lkey; + + memset(&p_ewr, 0, sizeof(gds_send_wr)); + p_ewr.next = NULL; + p_ewr.exp_send_flags = 0; //IBV_EXP_SEND_SIGNALED; + p_ewr.exp_opcode = IBV_EXP_WR_RDMA_WRITE; + p_ewr.wr_id = 1; + p_ewr.num_sge = 1; + p_ewr.sg_list = &list; + p_ewr.wr.rdma.remote_addr = dst_fbuf->buf_d; + p_ewr.wr.rdma.rkey = dst_fbuf->mr->rkey; + + ret = gds_prepare_send(flusher_qp->loopback_qp, &p_ewr, &bad_ewr, send_info); + if (ret) { + gds_err("error %d in gds_prepare_send\n", ret); + goto out; + } + + out: + return ret; +} +//------------------------------- COMMON ------------------------------------ +int gds_gpu_flusher_env() +{ + if (-1 == gds_gpu_has_flusher) { + const char *env = getenv("GDS_GPU_HAS_FLUSHER"); + if (env) + gds_gpu_has_flusher = !!atoi(env); + else + gds_gpu_has_flusher = 0; + + gds_warn("GDS_GPU_HAS_FLUSHER=%d\n", gds_gpu_has_flusher); + } + return gds_gpu_has_flusher; +} + +int gds_service_flusher_env() +{ + if (-1 == gds_flusher_service) { + const char *env = getenv("GDS_FLUSHER_SERVICE"); + if (env) + { + gds_flusher_service = atoi(env); + if(gds_flusher_service != GDS_FLUSHER_TYPE_CPU && gds_flusher_service != GDS_FLUSHER_TYPE_NIC) + { + //gds_err("Erroneous value GDS_FLUSHER_SERVICE=%d. Service not activated\n", gds_flusher_service); + gds_flusher_service=0; + } + } + else + gds_flusher_service = 0; + + gds_warn("GDS_FLUSHER_SERVICE=%d\n", gds_flusher_service); + } + return gds_flusher_service; +} + +int gds_flusher_check_envs() +{ + gds_gpu_flusher_env(); + + if(gds_gpu_has_flusher == 1) + { + gds_flusher_service=0; + gds_warn("Using GPU native flusher\n"); + } + else + { + gds_service_flusher_env(); + if(gds_flusher_service == 0) + gds_warn("No flusher service nor GPU native flusher\n"); + else + gds_warn("Using flusher service '%s' (%d)\n", flusher_int_to_str(gds_flusher_service), gds_flusher_service); + } +} + +int gds_flusher_init(struct ibv_pd *pd, struct ibv_context *context, int gpu_id) +{ + int ret = 0; + unsigned int flag = 1; + + gds_flusher_check_envs(); + CHECK_FLUSHER_SERVICE(); + + local_gpu_id=gpu_id; + + flread_d.size=1*sizeof(int); + flack_d.size=1*sizeof(int); + + gds_dbg("gds_flusher_service=%d\n", gds_flusher_service); + + if(gds_flusher_service == GDS_FLUSHER_TYPE_CPU) + { + // ------------------ READ WORD ------------------ + ret = gds_flusher_pin_buffer(&flread_d, flread_d.size, GDS_MEMORY_GPU); + if (ret) { + gds_err("error %d while allocating mapped GPU buffers\n", ret); + goto out; + } + CUCHECK(cuMemsetD32(flread_d.buf_d, 0, 1)); + gds_dbg("ReadWord pinned. size: %d buf_d=%p buf_h=%p\n", flread_d.size, flread_d.buf_d, flread_d.buf_h); + + // ------------------ ACK WORD ------------------ + ret = gds_flusher_pin_buffer(&flack_d, flack_d.size, GDS_MEMORY_GPU); + if (ret) { + gds_err("error %d while allocating mapped GPU buffers\n", ret); + goto out; + } + CUCHECK(cuMemsetD32(flack_d.buf_d, 0, 1)); + gds_dbg("Ackword pinned. size: %d buf_d=%p buf_h=%p\n", flack_d.size, flack_d.buf_d, flack_d.buf_h); + + // ------------------ SIGNAL WORD ------------------ + CUCHECK(cuMemAllocHost((void**)&flsign_h, 1*sizeof(int))); + memset(flsign_h, 0, sizeof(int)); + gds_dbg("SignalWord on Host Mem %p\n", flsign_h); + + // ------------------ THREAD ------------------ + gds_flusher_start_thread(&flusher_thread); + } + else if(gds_flusher_service == GDS_FLUSHER_TYPE_NIC) + { + // ------------------ READ WORD ------------------ + ret = gds_flusher_pin_buffer(&flread_d, flread_d.size, GDS_MEMORY_GPU); + if (ret) { + gds_err("error %d while allocating mapped GPU buffers\n", ret); + goto out; + } + CUCHECK(cuMemsetD32(flread_d.buf_d, 0, 1)); + gds_dbg("ReadWord pinned. size: %d buf_d=%p buf_h=%p\n", flread_d.size, flread_d.buf_d, flread_d.buf_h); + + // ------------------ ACK WORD ------------------ + ret = gds_flusher_pin_buffer(&flack_d, flack_d.size, GDS_MEMORY_GPU); + if (ret) { + gds_err("error %d while allocating mapped GPU buffers\n", ret); + goto out; + } + CUCHECK(cuMemsetD32(flack_d.buf_d, 0, 1)); + gds_dbg("Ackword pinned. size: %d buf_d=%p buf_h=%p\n", flack_d.size, flack_d.buf_d, flack_d.buf_h); + + if(flusher_qp == NULL) + { + flusher_qp = (struct flusher_qp_info *) calloc (1, sizeof(struct flusher_qp_info)); + if(!flusher_qp) { + gds_err("error %d while allocating mapped GPU buffers\n", ret); + goto out; + } + + flusher_qp->gpu_id = local_gpu_id; + flusher_qp->ib_port=GDS_FLUSHER_PORT; + flusher_qp->context=context; + flusher_qp->pd=pd; + + ret = gds_flusher_create_qp(); + if (ret) { + gds_err("error %d gds_flusher_create_qp\n", ret); + goto out; + } + + flread_d.mr = ibv_reg_mr(flusher_qp->pd, (void*)flread_d.buf_d, flread_d.size, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE); + if (!flread_d.mr) { + gds_err("Couldn't register MR\n"); + ret=EINVAL; + goto out; + } + + gds_dbg("flread_d ibv_reg_mr addr:%p size:%zu flags=0x%08x, reg=%p lkey=%x, rkey=%x\n", + flread_d.buf_d, flread_d.size, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE, flread_d.mr, flread_d.mr->lkey, flread_d.mr->rkey); + + flack_d.mr = ibv_reg_mr(flusher_qp->pd, (void*)flack_d.buf_d, flack_d.size, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE); + if (!flread_d.mr) { + gds_err("Couldn't register MR\n"); + ret=EINVAL; + goto out; + } + + gds_dbg("flack_d ibv_reg_mr addr:%p size:%zu flags=0x%08x, reg=%p lkey=%x, rkey=%x\n", + flack_d.buf_d, flack_d.size, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE, flack_d.mr, flack_d.mr->lkey, flack_d.mr->rkey); + + } + } + + if(!ret) + gds_warn("Flusher initialized\n"); + + out: + return ret; +} + +int gds_flusher_destroy() +{ + int ret = 0; + unsigned int flag = 1; + + CHECK_FLUSHER_SERVICE(); + + gds_dbg("gds_flusher_service=%d\n", gds_flusher_service); + + if(gds_flusher_service == GDS_FLUSHER_TYPE_CPU) + { + ret=gds_flusher_stop_thread(flusher_thread); + if(ret) + { + gds_err("gds_fill_poke error %d\n", ret); + goto out; + } + + ret=gds_flusher_free_pinned_buffer(&(flread_d)); + if (ret) { + gds_err("error %d while freeing mapped GPU buffers\n", ret); + goto out; + } + + ret=gds_flusher_free_pinned_buffer(&(flack_d)); + if (ret) { + gds_err("error %d while freeing mapped GPU buffers\n", ret); + goto out; + } + + CUCHECK(cuMemFreeHost(flsign_h)); + + gds_dbg("Device words unpinned\n"); + } + else if(gds_flusher_service == GDS_FLUSHER_TYPE_NIC) + { + if(!flusher_qp) { + gds_err("error !flusher_qp\n"); + ret=EINVAL; + goto out; + } + + if(!(flusher_qp->loopback_qp)) { + gds_err("error !loopback_qp\n"); + ret=EINVAL; + goto out; + } + + ret = ibv_destroy_qp(flusher_qp->loopback_qp->qp); + if (ret) { + gds_err("error %d in destroy_qp\n", ret); + goto out; + } + + assert(flusher_qp->loopback_qp->send_cq.cq); + ret = ibv_destroy_cq(flusher_qp->loopback_qp->send_cq.cq); + if (ret) { + gds_err("error %d in destroy_cq send_cq\n", ret); + goto out; + } + //send_cq == recv_cq + + if(flread_d.mr) { + ret = ibv_dereg_mr(flread_d.mr); + if (ret) { + gds_err("error %d in ibv_dereg_mr\n", ret); + goto out; + } + } + + if(flack_d.mr) { + ret = ibv_dereg_mr(flack_d.mr); + if (ret) { + gds_err("error %d in ibv_dereg_mr\n", ret); + goto out; + } + } + + free(flusher_qp->loopback_qp); + free(flusher_qp); + flusher_qp=NULL; + + gds_dbg("Flusher QP destroyed\n"); + + } + + if(!ret) gds_warn("Flusher destroyed\n"); + + out: + return ret; +} + +int gds_flusher_count_op() +{ + if(gds_flusher_service == GDS_FLUSHER_TYPE_CPU) + return GDS_FLUSHER_OP_CPU; + + if(gds_flusher_service == GDS_FLUSHER_TYPE_NIC) + return GDS_FLUSHER_OP_NIC; + + return 0; +} + +void gds_flusher_set_flag(int * flags) +{ + //Enable GPU flusher if GPU has internal flusher (flusher service ignored) + if(gds_gpu_has_flusher == 1) + (*flags) |= GDS_WAIT_POST_FLUSH; + + gds_dbg("flags=%x\n", (*flags)); +} + +//Not actually used for now! +int gds_flusher_post_stream(CUstream stream) +{ + gds_descriptor_t desc[3]; + gds_send_request_t send_info; + int k = 0, ret = 0; + + CHECK_FLUSHER_SERVICE(); + + if(gds_flusher_service == GDS_FLUSHER_TYPE_CPU) + { + //write32 signal + desc[k].tag = GDS_TAG_WRITE_VALUE32; + desc[k].write32.ptr = (uint32_t *) flsign_h; + desc[k].write32.value = flusher_value+1; + desc[k].write32.flags = GDS_MEMORY_HOST; + ++k; + + //wait32 ackword + desc[k].tag = GDS_TAG_WAIT_VALUE32; + desc[k].wait32.ptr = (uint32_t *)flack_d.buf_d; + desc[k].wait32.value = flusher_value+1; + desc[k].wait32.cond_flags = GDS_WAIT_COND_EQ; + desc[k].wait32.flags = GDS_MEMORY_GPU; + ++k; + } + else + { + //flusher NIC + //write read word + desc[k].tag = GDS_TAG_WRITE_VALUE32; + desc[k].write32.ptr = (uint32_t *) flread_d.buf_d; + desc[k].write32.value = flusher_value+1; + desc[k].write32.flags = GDS_MEMORY_GPU; + ++k; + //write order respected? + + + ret=gds_flusher_prepare_put_dmem(&send_info, &flread_d, &flack_d); + if(ret) + { + gds_err("gds_flusher_prepare_put_dmem, err: %d\n", ret); + goto out; + } + + desc[k].tag = GDS_TAG_SEND; + desc[k].send = &send_info; + ++k; + + //wait32 ackword + desc[k].tag = GDS_TAG_WAIT_VALUE32; + desc[k].wait32.ptr = (uint32_t *)flack_d.buf_d; + desc[k].wait32.value = flusher_value+1; + desc[k].wait32.cond_flags = GDS_WAIT_COND_EQ; + desc[k].wait32.flags = GDS_MEMORY_GPU; + ++k; + } + + ret = gds_stream_post_descriptors(stream, k, desc, 0); + if (ret) + { + gds_err("gds_stream_post_descriptors, err: %d\n", ret); + return EINVAL; + } + //not multithread safe! + flusher_value++; + + out: + return ret; +} + +int gds_flusher_add_ops(CUstreamBatchMemOpParams *params, int &idx) +{ + int ret=0, tmp_idx=0; + gds_send_request_t send_info; + + CHECK_FLUSHER_SERVICE(); + + if(gds_flusher_service == GDS_FLUSHER_TYPE_CPU) + { + gds_dbg("gds_fill_poke flsign_h=%p, flusher_value+1=%d, idx=%d\n", flsign_h, flusher_value+1, idx); + ret = gds_fill_poke(params+idx, (uint32_t*)flsign_h, flusher_value+1, GDS_MEMORY_GPU); + if(ret) + { + gds_err("gds_fill_poke error %d\n", ret); + goto out; + } + ++idx; + + gds_dbg("gds_fill_poll flack_d.buf_d=%p, flusher_value+1=%d, idx=%d\n", flack_d.buf_d, flusher_value+1, idx); + ret = gds_fill_poll(params+idx, (uint32_t*)flack_d.buf_d, flusher_value+1, GDS_WAIT_COND_EQ, GDS_MEMORY_GPU); + if(ret) + { + gds_err("gds_fill_poll error %d\n", ret); + goto out; + } + ++idx; + } + else if(gds_flusher_service == GDS_FLUSHER_TYPE_NIC) + { + ret = gds_fill_poke(params+idx, (uint32_t*)flread_d.buf_d, flusher_value+1, GDS_MEMORY_GPU); + if(ret) + { + gds_err("gds_fill_poke error %d\n", ret); + goto out; + } + gds_dbg("gds_fill_poke done flread_d.buf_d=%p, flusher_value+1=%d, idx=%d\n", flread_d.buf_d, flusher_value+1, idx); + + ++idx; + + ret=gds_flusher_prepare_put_dmem(&send_info, &flread_d, &flack_d); + if(ret) + { + gds_err("gds_flusher_prepare_put_dmem, err: %d\n", ret); + goto out; + } + + ret = gds_post_ops(send_info.commit.entries, send_info.commit.storage, params, idx); + if (ret) { + gds_err("error %d in gds_post_ops\n", ret); + goto out; + } + gds_dbg("gds_post_ops send_info.commit.entries=%p, flusher_value+1=%d, idx=%d\n", send_info.commit.entries, flusher_value+1, idx); + + ret = gds_fill_poll(params+idx, (uint32_t*)flack_d.buf_d, flusher_value+1, GDS_WAIT_COND_EQ, GDS_MEMORY_GPU); + if(ret) + { + gds_err("gds_fill_poll error %d\n", ret); + goto out; + } + gds_dbg("gds_fill_poll flack_d.buf_d=%p, flusher_value+1=%d, idx=%d\n", flack_d.buf_d, flusher_value+1, idx); + ++idx; + + } + + gds_dbg("Final idx=%d\n", idx); + + ++flusher_value; + + out: + return ret; + +} + +int gds_flusher_start_thread(pthread_t *fThread) //, threadFunc, void *arg) +{ + int ret=0; + + CHECK_FLUSHER_SERVICE(); + + if(fThread == NULL) + { + gds_warn("error input"); + return EINVAL; + } + + gds_dbg("Create Thread\n"); + + if(pthread_create(fThread, NULL, gds_flusher_func_thread, NULL) != 0) { + gds_err("pthread_create, err: %d\n", ret); + return EINVAL; + } + + out: + return ret; +} + +int gds_flusher_stop_thread(pthread_t fThread) +{ + int ret=0; + void * tret; + + CHECK_FLUSHER_SERVICE(); + + ret=pthread_cancel(fThread); + if(ret) + { + gds_err("pthread_cancel, ret: %d\n", ret); + return EINVAL; + } + + ret=pthread_join(fThread, &tret); + if(ret) + { + gds_err("pthread_join, ret: %d, thread ret: %ld\n", ret, (long)tret); + return EINVAL; + } + + out: + return ret; +} + +#if 0 +#include + +#define TIMER_DEF(n) struct timeval temp_1_##n={0,0}, temp_2_##n={0,0} +#define TIMER_START(n) gettimeofday(&temp_1_##n, (struct timezone*)0) +#define TIMER_STOP(n) gettimeofday(&temp_2_##n, (struct timezone*)0) +#define TIMER_ELAPSED(n) ((temp_2_##n.tv_sec-temp_1_##n.tv_sec)*1.e6+(temp_2_##n.tv_usec-temp_1_##n.tv_usec)) +#endif + +//-------------------------------- NVTX ----------------------------------------- +#include "nvToolsExt.h" +const uint32_t colors[] = { 0x0000ff00, 0x000000ff, 0x00ffff00, 0x00ff00ff, 0x0000ffff, 0x00ff0000, 0x00ffffff }; +const int num_colors = sizeof(colors)/sizeof(uint32_t); + +#define PUSH_RANGE(name,cid) { \ + int color_id = cid; \ + color_id = color_id%num_colors;\ + nvtxEventAttributes_t eventAttrib = {0}; \ + eventAttrib.version = NVTX_VERSION; \ + eventAttrib.size = NVTX_EVENT_ATTRIB_STRUCT_SIZE; \ + eventAttrib.colorType = NVTX_COLOR_ARGB; \ + eventAttrib.color = colors[color_id]; \ + eventAttrib.messageType = NVTX_MESSAGE_TYPE_ASCII; \ + eventAttrib.message.ascii = name; \ + nvtxRangePushEx(&eventAttrib); \ +} +#define POP_RANGE nvtxRangePop(); +//------------------------------------------------------------------------------- + +typedef int64_t gds_us_t; +static inline gds_us_t gds_get_time_us() +{ + struct timespec ts; + int ret = clock_gettime(CLOCK_MONOTONIC, &ts); + if (ret) { + fprintf(stderr, "error in gettime %d/%s\n", errno, strerror(errno)); + exit(EXIT_FAILURE); + } + return (gds_us_t)ts.tv_sec * 1000 * 1000 + (gds_us_t)ts.tv_nsec / 1000; +} + +void * gds_flusher_func_thread(void * arg) +{ + int last_value=0; + int tmp=0; + gds_us_t start, end; + gds_us_t delta1; + gds_us_t delta2; + gds_us_t delta3; + gds_us_t delta4; + int local_rank = atoi(getenv("OMPI_COMM_WORLD_LOCAL_RANK")); + + //Should be the default setting + //pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); + + while(1) //avoid mutex for kill condition + { + gds_dbg("Thread waiting on flsign_h=%p, current last_value=%d\n", flsign_h, last_value); + + PUSH_RANGE("THREAD", 1); + //start = gds_get_time_us(); + while( (ACCESS_ONCE( *flsign_h )) <= last_value ) { pthread_testcancel(); } + //end = gds_get_time_us(); + //delta1 = end - start; + + //start = gds_get_time_us(); + last_value = ACCESS_ONCE( *flsign_h ); + rmb(); + //end = gds_get_time_us(); + //delta2 = end - start; + + gds_dbg("Thread last_value=%d\n", last_value); + + //start = gds_get_time_us(); + tmp = ACCESS_ONCE ( *((int*)flread_d.buf_h) ); //Should be always 0! + wmb(); + //end = gds_get_time_us(); + //delta3 = end - start; + + //start = gds_get_time_us(); + //gds_dbg("Thread tmp=%d after wmb\n", tmp); + ACCESS_ONCE ( *((int*)flack_d.buf_h) ) = last_value; + wmb(); + //end = gds_get_time_us(); + //delta4 = end - start; + + POP_RANGE; + //if(local_rank == 0) + // gds_warn("thread --> last_value: %d, while polling time: %.2f us, sign read time: %.2f us, read_d time: %.2f us, write ack time: %.2f us\n", last_value, (double)delta1, (double)delta2, (double)delta3, (double)delta4); + } + + return NULL; +} + +//----------------------------------------------------------------------------- +/* + * Local variables: + * c-indent-level: 8 + * c-basic-offset: 8 + * tab-width: 8 + * indent-tabs-mode: nil + * End: + */ diff --git a/src/flusher.hpp b/src/flusher.hpp new file mode 100644 index 0000000..b9e10cf --- /dev/null +++ b/src/flusher.hpp @@ -0,0 +1,124 @@ +/* Copyright (c) 2016, NVIDIA CORPORATION. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of NVIDIA CORPORATION nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#if HAVE_CONFIG_H +# include +#endif /* HAVE_CONFIG_H */ + +#include +#include +#include +#include + +#include +#include +#include +using namespace std; + +#include +//#include +#include +#include + +#include +#include +#include + +#include "gdsync.h" +#include "gdsync/tools.h" +#include "objs.hpp" +#include "utils.hpp" +#include "memmgr.hpp" +#include "archutils.h" + +#define GDS_FLUSHER_TYPE_CPU 1 +#define GDS_FLUSHER_TYPE_NIC 2 + +#define GDS_FLUSHER_OP_CPU 2 +#define GDS_FLUSHER_OP_NIC 5 + +#define GDS_FLUSHER_PORT 1 +#define GDS_FLUSHER_QKEY 0 //0x11111111 + +#define CUDA_CHECK(stmt) \ +do { \ + cudaError_t result = (stmt); \ + if (cudaSuccess != result) { \ + fprintf(stderr, "[%s] [%d] cuda failed with %s \n", \ + __FILE__, __LINE__, cudaGetErrorString(result));\ + exit(EXIT_FAILURE); \ + } \ + assert(cudaSuccess == result); \ +} while (0) + + +typedef struct gds_flusher_buf { + CUdeviceptr buf_d; + void * buf_h; + int size; + gdr_mh_t mh; + gds_mem_desc_t *desc; + struct ibv_mr * mr; +} gds_flusher_buf; +typedef gds_flusher_buf * gds_flusher_buf_t; + +typedef struct flusher_qp_info { + struct gds_qp *loopback_qp; + struct ibv_pd *pd; + struct ibv_context *context; + int gpu_id; + struct ibv_ah * ah; + char gid_string[INET6_ADDRSTRLEN]; + union ibv_gid gid_bin; + int lid; + int qpn; + int psn; + int ib_port; +} flusher_qp_info; +typedef flusher_qp_info * flusher_qp_info_t; + +//----------------------------------------------------------------------------- +int gds_gpu_flusher_env(); +int gds_flusher_init(struct ibv_pd *pd, struct ibv_context *context, int gpu_id); +int gds_flusher_destroy(); +int gds_flusher_count_op(); +void gds_flusher_set_flag(int * flags); +int gds_flusher_post_stream(CUstream stream); +int gds_flusher_add_ops(CUstreamBatchMemOpParams *params, int &idx); + +int gds_flusher_start_thread(pthread_t *fThread); +int gds_flusher_stop_thread(pthread_t fThread); +void * gds_flusher_func_thread(void *); +//----------------------------------------------------------------------------- +/* + * Local variables: + * c-indent-level: 8 + * c-basic-offset: 8 + * tab-width: 8 + * indent-tabs-mode: nil + * End: + */ diff --git a/src/gdsync.cpp b/src/gdsync.cpp index a52319e..68538bc 100644 --- a/src/gdsync.cpp +++ b/src/gdsync.cpp @@ -94,6 +94,7 @@ int gds_dbg_enabled() // TODO: use corret value // TODO: make it dependent upon the particular GPU const size_t GDS_GPU_MAX_INLINE_SIZE = 256; +const size_t GDS_MAX_BATCH_OPS = 256; //----------------------------------------------------------------------------- @@ -208,7 +209,7 @@ void gds_dump_param(CUstreamBatchMemOpParams *param) case CU_STREAM_MEM_OP_WAIT_VALUE_32: gds_info("WAIT32 addr:%p alias:%p value:%08x flags:%08x\n", (void*)param->waitValue.address, - (void*)param->writeValue.alias, + (void*)param->waitValue.alias, param->waitValue.value, param->waitValue.flags); break; @@ -478,7 +479,7 @@ int gds_stream_batch_ops(CUstream stream, int nops, CUstreamBatchMemOpParams *pa #endif gds_dbg("nops=%d flags=%08x\n", nops, cuflags); - if (nops > 256) { + if (nops > GDS_MAX_BATCH_OPS) { gds_warn("batch size might be too big, stream=%p nops=%d params=%p flags=%08x\n", stream, nops, params, flags); //return EINVAL; } @@ -561,7 +562,7 @@ int gds_post_ops(size_t n_ops, struct peer_op_wr *op, CUstreamBatchMemOpParams * for (; op && n < n_ops; op = op->next, ++n) { //int flags = 0; - gds_dbg("op[%zu] type:%08x\n", n, op->type); + gds_dbg("op[%zu] type:%08x, idx: %d\n", n, op->type, idx); switch(op->type) { case IBV_EXP_PEER_OP_FENCE: { gds_dbg("OP_FENCE: fence_flags=%"PRIu64"\n", op->wr.fence.fence_flags); @@ -732,7 +733,7 @@ int gds_post_ops(size_t n_ops, struct peer_op_wr *op, CUstreamBatchMemOpParams * // TODO: properly handle a following fence instead of blidly flushing int flags = 0; if (!(post_flags & GDS_POST_OPS_DISCARD_WAIT_FLUSH)) - flags |= GDS_WAIT_POST_FLUSH; + gds_flusher_set_flag(&flags); gds_dbg("OP_WAIT_DWORD dev_ptr=%llx data=%"PRIx32"\n", dev_ptr, data); @@ -756,7 +757,16 @@ int gds_post_ops(size_t n_ops, struct peer_op_wr *op, CUstreamBatchMemOpParams * goto out; } retcode = gds_fill_poll(params+idx, dev_ptr, data, poll_cond, flags); - ++idx; + ++idx; + + retcode = gds_flusher_add_ops(params, idx); + if(retcode) + { + retcode = EINVAL; + gds_err("error in gds_flusher_add_ops func (idx=%d)\n", n, idx); + goto out; + } + break; } default: @@ -1378,7 +1388,7 @@ struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context, gds assert(context); assert(qp_attr); - if (flags & ~(GDS_CREATE_QP_WQ_ON_GPU|GDS_CREATE_QP_TX_CQ_ON_GPU|GDS_CREATE_QP_RX_CQ_ON_GPU|GDS_CREATE_QP_WQ_DBREC_ON_GPU)) { + if (flags & ~(GDS_CREATE_QP_WQ_ON_GPU|GDS_CREATE_QP_TX_CQ_ON_GPU|GDS_CREATE_QP_RX_CQ_ON_GPU|GDS_CREATE_QP_WQ_DBREC_ON_GPU|GDS_CREATE_QP_FLUSHER|GDS_CREATE_QP_GPU_INVALIDATE_RX_CQ|GDS_CREATE_QP_GPU_INVALIDATE_TX_CQ)) { gds_err("invalid flags"); return NULL; } @@ -1398,23 +1408,46 @@ struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context, gds gds_err("error %d while creating TX CQ, old_errno=%d\n", ret, old_errno); goto err; } + qp_attr->send_cq = tx_cq; + gds_dbg("created send_cq=%p\n", qp_attr->send_cq); + if(!(flags & GDS_CREATE_QP_FLUSHER)) + { + gds_dbg("creating RX CQ\n"); + rx_cq = gds_create_cq(context, qp_attr->cap.max_recv_wr, NULL, NULL, 0, gpu_id, + (flags & GDS_CREATE_QP_RX_CQ_ON_GPU) ? + GDS_ALLOC_CQ_ON_GPU : GDS_ALLOC_CQ_DEFAULT); + if (!rx_cq) { + ret = errno; + gds_err("error %d while creating RX CQ\n", ret); + goto err_free_tx_cq; + } - gds_dbg("creating RX CQ\n"); - rx_cq = gds_create_cq(context, qp_attr->cap.max_recv_wr, NULL, NULL, 0, gpu_id, - (flags & GDS_CREATE_QP_RX_CQ_ON_GPU) ? - GDS_ALLOC_CQ_ON_GPU : GDS_ALLOC_CQ_DEFAULT); - if (!rx_cq) { - ret = errno; - gds_err("error %d while creating RX CQ\n", ret); - goto err_free_tx_cq; + // peer registration + qp_attr->recv_cq = rx_cq; } + //The flusher doesn't actually need CQs + else qp_attr->recv_cq = tx_cq; + + gds_dbg("created recv_cq=%p\n", qp_attr->recv_cq); - // peer registration - qp_attr->send_cq = tx_cq; - qp_attr->recv_cq = rx_cq; qp_attr->pd = pd; qp_attr->comp_mask |= IBV_EXP_QP_INIT_ATTR_PD; + // disable overflow checks in ibv_poll_cq(), as GPU might invalidate + // the CQE without updating the tracking variables + if (flags & GDS_CREATE_QP_GPU_INVALIDATE_RX_CQ) { + gds_warn("IGNORE_RQ_OVERFLOW\n"); + qp_attr->exp_create_flags |= IBV_EXP_QP_CREATE_IGNORE_RQ_OVERFLOW; + qp_attr->comp_mask |= IBV_EXP_QP_INIT_ATTR_CREATE_FLAGS; + } + + if (flags & GDS_CREATE_QP_GPU_INVALIDATE_TX_CQ) { + gds_warn("IGNORE_SQ_OVERFLOW\n"); + qp_attr->exp_create_flags |= IBV_EXP_QP_CREATE_IGNORE_SQ_OVERFLOW; + qp_attr->comp_mask |= IBV_EXP_QP_INIT_ATTR_CREATE_FLAGS; + } + + gds_dbg("before gds_register_peer_ex\n"); ret = gds_register_peer_ex(context, gpu_id, &peer, &peer_attr); if (ret) { @@ -1441,20 +1474,34 @@ struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context, gds qp_attr->comp_mask |= IBV_EXP_QP_INIT_ATTR_PEER_DIRECT; qp_attr->peer_direct_attrs = peer_attr; + gds_dbg("Comp Mask: %x, exp_create_flags: %x\n", qp_attr->comp_mask, qp_attr->exp_create_flags); qp = ibv_exp_create_qp(context, qp_attr); if (!qp) { ret = EINVAL; - gds_err("error in ibv_exp_create_qp\n"); + gds_err("error in ibv_exp_create_qp, errno: %d, %s\n", errno, strerror(errno)); goto err_free_cqs; } + if(!(flags & GDS_CREATE_QP_FLUSHER)) + gds_warn("created gds_qp=%p\n", gqp); + else + gds_warn("created flusher gds_qp=%p\n", gqp); + gqp->qp = qp; gqp->send_cq.cq = qp->send_cq; gqp->send_cq.curr_offset = 0; gqp->recv_cq.cq = qp->recv_cq; gqp->recv_cq.curr_offset = 0; - gds_dbg("created gds_qp=%p\n", gqp); + if(!(flags & GDS_CREATE_QP_FLUSHER)) + { + if(gds_flusher_init(pd, context, gpu_id)) + { + ret = EINVAL; + gds_err("error in gds_flusher_init\n"); + goto err_free_qp; + } + } return gqp; @@ -1513,6 +1560,8 @@ int gds_destroy_qp(struct gds_qp *qp) free(qp); + gds_flusher_destroy(); + return retcode; } From 82b11e17eaa5d39b5047d75cd50347d8ccc0522e Mon Sep 17 00:00:00 2001 From: e-ago Date: Tue, 16 Jan 2018 18:45:30 +0100 Subject: [PATCH 2/7] flusher inlcude, nvtx removed --- src/flusher.cpp | 6 +++--- src/gdsync.cpp | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/flusher.cpp b/src/flusher.cpp index c8956eb..112b671 100644 --- a/src/flusher.cpp +++ b/src/flusher.cpp @@ -794,7 +794,6 @@ int gds_flusher_stop_thread(pthread_t fThread) #define TIMER_START(n) gettimeofday(&temp_1_##n, (struct timezone*)0) #define TIMER_STOP(n) gettimeofday(&temp_2_##n, (struct timezone*)0) #define TIMER_ELAPSED(n) ((temp_2_##n.tv_sec-temp_1_##n.tv_sec)*1.e6+(temp_2_##n.tv_usec-temp_1_##n.tv_usec)) -#endif //-------------------------------- NVTX ----------------------------------------- #include "nvToolsExt.h" @@ -814,6 +813,7 @@ const int num_colors = sizeof(colors)/sizeof(uint32_t); nvtxRangePushEx(&eventAttrib); \ } #define POP_RANGE nvtxRangePop(); +#endif //------------------------------------------------------------------------------- typedef int64_t gds_us_t; @@ -846,7 +846,7 @@ void * gds_flusher_func_thread(void * arg) { gds_dbg("Thread waiting on flsign_h=%p, current last_value=%d\n", flsign_h, last_value); - PUSH_RANGE("THREAD", 1); + //PUSH_RANGE("THREAD", 1); //start = gds_get_time_us(); while( (ACCESS_ONCE( *flsign_h )) <= last_value ) { pthread_testcancel(); } //end = gds_get_time_us(); @@ -873,7 +873,7 @@ void * gds_flusher_func_thread(void * arg) //end = gds_get_time_us(); //delta4 = end - start; - POP_RANGE; + //POP_RANGE; //if(local_rank == 0) // gds_warn("thread --> last_value: %d, while polling time: %.2f us, sign read time: %.2f us, read_d time: %.2f us, write ack time: %.2f us\n", last_value, (double)delta1, (double)delta2, (double)delta3, (double)delta4); } diff --git a/src/gdsync.cpp b/src/gdsync.cpp index 68538bc..44b46c3 100644 --- a/src/gdsync.cpp +++ b/src/gdsync.cpp @@ -42,7 +42,7 @@ #include "objs.hpp" #include "archutils.h" #include "mlnxutils.h" - +#include "flusher.hpp" //----------------------------------------------------------------------------- int gds_dbg_enabled() From 31229e23995292374d842f6262e46cc9c02afba4 Mon Sep 17 00:00:00 2001 From: e-ago Date: Wed, 17 Jan 2018 14:47:49 +0100 Subject: [PATCH 3/7] CU_STREAM_WAIT_VALUE_FLUSH no longer supported in CUDA 9.x. Closes #31 --- src/gdsync.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/gdsync.cpp b/src/gdsync.cpp index 44b46c3..0e88a21 100644 --- a/src/gdsync.cpp +++ b/src/gdsync.cpp @@ -414,8 +414,6 @@ static int gds_fill_poll(CUstreamBatchMemOpParams *param, CUdeviceptr ptr, uint3 assert(ptr); assert((((unsigned long)ptr) & 0x3) == 0); - bool need_flush = (flags & GDS_WAIT_POST_FLUSH) ? true : false; - param->operation = CU_STREAM_MEM_OP_WAIT_VALUE_32; param->waitValue.address = dev_ptr; param->waitValue.value = magic; @@ -437,8 +435,7 @@ static int gds_fill_poll(CUstreamBatchMemOpParams *param, CUdeviceptr ptr, uint3 retcode = EINVAL; goto out; } - if (need_flush) - param->waitValue.flags |= CU_STREAM_WAIT_VALUE_FLUSH; + gds_dbg("op=%d addr=%p value=%08x cond=%s flags=%08x\n", param->operation, (void*)param->waitValue.address, From 56a4e7afa8edfc00aac8031aa312d276309df3c5 Mon Sep 17 00:00:00 2001 From: e-ago Date: Wed, 17 Jan 2018 07:00:57 -0800 Subject: [PATCH 4/7] Commented dump wait params useful for debug --- src/gdsync.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/gdsync.cpp b/src/gdsync.cpp index 0e88a21..6b62593 100644 --- a/src/gdsync.cpp +++ b/src/gdsync.cpp @@ -763,6 +763,8 @@ int gds_post_ops(size_t n_ops, struct peer_op_wr *op, CUstreamBatchMemOpParams * gds_err("error in gds_flusher_add_ops func (idx=%d)\n", n, idx); goto out; } + //gds_warn("poll params\n"); + //gds_dump_params(idx, params); break; } From b00b350477b18f1d44d203a73f79bbe889fbe1d6 Mon Sep 17 00:00:00 2001 From: e-ago Date: Wed, 17 Jan 2018 16:21:41 +0100 Subject: [PATCH 5/7] CU_STREAM_WAIT_VALUE_FLUSH code restored commented as a reminder --- src/gdsync.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/gdsync.cpp b/src/gdsync.cpp index 6b62593..848ca66 100644 --- a/src/gdsync.cpp +++ b/src/gdsync.cpp @@ -414,6 +414,8 @@ static int gds_fill_poll(CUstreamBatchMemOpParams *param, CUdeviceptr ptr, uint3 assert(ptr); assert((((unsigned long)ptr) & 0x3) == 0); + //bool need_flush = (flags & GDS_WAIT_POST_FLUSH) ? true : false; + param->operation = CU_STREAM_MEM_OP_WAIT_VALUE_32; param->waitValue.address = dev_ptr; param->waitValue.value = magic; @@ -436,6 +438,9 @@ static int gds_fill_poll(CUstreamBatchMemOpParams *param, CUdeviceptr ptr, uint3 goto out; } + //No longer supported since CUDA 9.1 + //if (need_flush) param->waitValue.flags |= CU_STREAM_WAIT_VALUE_FLUSH; + gds_dbg("op=%d addr=%p value=%08x cond=%s flags=%08x\n", param->operation, (void*)param->waitValue.address, From f3c4cf1c8375b02b1f51cba619391c1edd9b8a62 Mon Sep 17 00:00:00 2001 From: e-ago Date: Mon, 5 Feb 2018 15:51:06 +0100 Subject: [PATCH 6/7] flusher env vars merged in a single one. flusher types enum instead of define. local bool variable during qp creation. if CUDA_VERSION >= 9020 then query CU_DEVICE_ATTRIBUTE_CAN_USE_WAIT_VALUE_FLUSH in case of native flusher --- src/apis.cpp | 4 +- src/flusher.cpp | 148 ++++++++++++++++++++++++++---------------------- src/flusher.hpp | 26 ++++----- src/gdsync.cpp | 17 +++--- 4 files changed, 102 insertions(+), 93 deletions(-) diff --git a/src/apis.cpp b/src/apis.cpp index bcf80c8..b727778 100644 --- a/src/apis.cpp +++ b/src/apis.cpp @@ -622,8 +622,8 @@ int gds_stream_post_descriptors(CUstream stream, size_t n_descs, gds_descriptor_ // move flush to last wait in the whole batch if (n_waits && no_network_descs_after_entry(n_descs, descs, last_wait)) { gds_dbg("optimizing FLUSH to last wait i=%zu\n", last_wait); - //If GPU doesn't support native flusher - n_mem_ops += gds_flusher_count_op(); + if( gds_use_native_flusher() ) move_flush=true; + else n_mem_ops += gds_flusher_count_op(); gds_dbg("optimizing FLUSH to last wait i=%zu, n_mem_ops: %d, flusher ops: %d\n", last_wait, n_mem_ops, gds_flusher_count_op()); } // alternatively, remove flush for wait is next op is a wait too diff --git a/src/flusher.cpp b/src/flusher.cpp index 112b671..b501ae6 100644 --- a/src/flusher.cpp +++ b/src/flusher.cpp @@ -35,35 +35,46 @@ static gds_flusher_buf flread_d; static gds_flusher_buf flack_d; static int flusher_value=0; static pthread_t flusher_thread; -static int gds_flusher_service = -1; -static int gds_gpu_has_flusher=-1; +static int gds_use_flusher = -1; static int local_gpu_id=0; static const char * flusher_int_to_str(int flusher_int) { - if(flusher_int == GDS_FLUSHER_TYPE_CPU) - return "CPU Thread"; - else if(flusher_int == GDS_FLUSHER_TYPE_NIC) + if(flusher_int == GDS_FLUSHER_NONE) + return "No flusher"; + else if(flusher_int == GDS_FLUSHER_NATIVE) + return "GPU native flusher"; + else if(flusher_int == GDS_FLUSHER_CPU) + return "CPU thread"; + else if(flusher_int == GDS_FLUSHER_NIC) return "NIC RDMA PUT"; else return "Unknown"; } -static inline int gds_flusher_service_active() { - if(gds_flusher_service == GDS_FLUSHER_TYPE_CPU || gds_flusher_service == GDS_FLUSHER_TYPE_NIC) - return 1; +static inline bool gds_flusher_service_active() { + if(gds_use_flusher == GDS_FLUSHER_CPU || gds_use_flusher == GDS_FLUSHER_NIC) + return true; else - return 0; + return false; } #define CHECK_FLUSHER_SERVICE() \ - if(!gds_flusher_service_active()) \ + if(gds_flusher_service_active() == false) \ { \ - gds_dbg("Flusher service not active (%d)\n", gds_flusher_service); \ + gds_dbg("Flusher service not active (%d)\n", gds_use_flusher); \ goto out; \ } #define ROUND_TO(V,PS) ((((V) + (PS) - 1)/(PS)) * (PS)) +bool gds_use_native_flusher() +{ + if(gds_use_flusher == GDS_FLUSHER_NATIVE) + return true; + + return false; +} + static int gds_flusher_pin_buffer(gds_flusher_buf * fl_mem, size_t req_size, int type_mem) { int ret=0; @@ -317,58 +328,70 @@ static int gds_flusher_prepare_put_dmem(gds_send_request_t * send_info, struct g return ret; } //------------------------------- COMMON ------------------------------------ -int gds_gpu_flusher_env() +int gds_flusher_get_envars() { - if (-1 == gds_gpu_has_flusher) { - const char *env = getenv("GDS_GPU_HAS_FLUSHER"); - if (env) - gds_gpu_has_flusher = !!atoi(env); - else - gds_gpu_has_flusher = 0; - - gds_warn("GDS_GPU_HAS_FLUSHER=%d\n", gds_gpu_has_flusher); - } - return gds_gpu_has_flusher; -} - -int gds_service_flusher_env() -{ - if (-1 == gds_flusher_service) { - const char *env = getenv("GDS_FLUSHER_SERVICE"); + if (-1 == gds_use_flusher) { + const char *env = getenv("GDS_FLUSHER_TYPE"); if (env) { - gds_flusher_service = atoi(env); - if(gds_flusher_service != GDS_FLUSHER_TYPE_CPU && gds_flusher_service != GDS_FLUSHER_TYPE_NIC) + gds_use_flusher = atoi(env); + if( + gds_use_flusher != GDS_FLUSHER_NONE && + gds_use_flusher != GDS_FLUSHER_NATIVE && + gds_use_flusher != GDS_FLUSHER_CPU && + gds_use_flusher != GDS_FLUSHER_NIC + ) { - //gds_err("Erroneous value GDS_FLUSHER_SERVICE=%d. Service not activated\n", gds_flusher_service); - gds_flusher_service=0; + gds_err("Erroneous flusher type=%d (allowed values 0-%d)\n", gds_use_flusher, GDS_FLUSHER_NIC); + gds_use_flusher=GDS_FLUSHER_NONE; } } else - gds_flusher_service = 0; + gds_use_flusher = GDS_FLUSHER_NONE; - gds_warn("GDS_FLUSHER_SERVICE=%d\n", gds_flusher_service); + gds_warn("GDS_FLUSHER_TYPE=%d\n", gds_use_flusher); } - return gds_flusher_service; + return gds_use_flusher; } -int gds_flusher_check_envs() +int gds_flusher_setup() { - gds_gpu_flusher_env(); + gds_flusher_get_envars(); - if(gds_gpu_has_flusher == 1) + if(gds_use_flusher == GDS_FLUSHER_NATIVE) { - gds_flusher_service=0; - gds_warn("Using GPU native flusher\n"); +//At least CUDA 9.2 +#if (CUDA_VERSION >= 9020) + + int attr = 0; + CUresult result = CUDA_SUCCESS; + + result = cuDeviceGetAttribute(&attr, CU_DEVICE_ATTRIBUTE_CAN_USE_WAIT_VALUE_FLUSH, local_gpu_id); + if (CUDA_SUCCESS != result) { + const char *err_str = NULL; + cuGetErrorString(result, &err_str); + gds_err("got CUDA result %d (%s) while submitting batch operations:\n", result, err_str); + exit(EXIT_FAILURE); + } + if(attr == 0) + { + gds_warn("GPU #%d doesn't support native flusher\n", local_gpu_id); + gds_use_flusher=GDS_FLUSHER_NONE; + } +#endif + + if(gds_use_flusher == GDS_FLUSHER_NATIVE) + gds_warn("Using GPU native flusher\n"); } else { - gds_service_flusher_env(); - if(gds_flusher_service == 0) + if(gds_use_flusher == GDS_FLUSHER_NONE) gds_warn("No flusher service nor GPU native flusher\n"); else - gds_warn("Using flusher service '%s' (%d)\n", flusher_int_to_str(gds_flusher_service), gds_flusher_service); + gds_warn("Using flusher service '%s' (%d)\n", flusher_int_to_str(gds_use_flusher), gds_use_flusher); } + + return 0; } int gds_flusher_init(struct ibv_pd *pd, struct ibv_context *context, int gpu_id) @@ -376,17 +399,16 @@ int gds_flusher_init(struct ibv_pd *pd, struct ibv_context *context, int gpu_id) int ret = 0; unsigned int flag = 1; - gds_flusher_check_envs(); - CHECK_FLUSHER_SERVICE(); - local_gpu_id=gpu_id; + gds_flusher_setup(); + CHECK_FLUSHER_SERVICE(); flread_d.size=1*sizeof(int); flack_d.size=1*sizeof(int); - gds_dbg("gds_flusher_service=%d\n", gds_flusher_service); + gds_dbg("gds_use_flusher=%d\n", gds_use_flusher); - if(gds_flusher_service == GDS_FLUSHER_TYPE_CPU) + if(gds_use_flusher == GDS_FLUSHER_CPU) { // ------------------ READ WORD ------------------ ret = gds_flusher_pin_buffer(&flread_d, flread_d.size, GDS_MEMORY_GPU); @@ -414,7 +436,7 @@ int gds_flusher_init(struct ibv_pd *pd, struct ibv_context *context, int gpu_id) // ------------------ THREAD ------------------ gds_flusher_start_thread(&flusher_thread); } - else if(gds_flusher_service == GDS_FLUSHER_TYPE_NIC) + else if(gds_use_flusher == GDS_FLUSHER_NIC) { // ------------------ READ WORD ------------------ ret = gds_flusher_pin_buffer(&flread_d, flread_d.size, GDS_MEMORY_GPU); @@ -490,9 +512,9 @@ int gds_flusher_destroy() CHECK_FLUSHER_SERVICE(); - gds_dbg("gds_flusher_service=%d\n", gds_flusher_service); + gds_dbg("gds_use_flusher=%d\n", gds_use_flusher); - if(gds_flusher_service == GDS_FLUSHER_TYPE_CPU) + if(gds_use_flusher == GDS_FLUSHER_CPU) { ret=gds_flusher_stop_thread(flusher_thread); if(ret) @@ -517,7 +539,7 @@ int gds_flusher_destroy() gds_dbg("Device words unpinned\n"); } - else if(gds_flusher_service == GDS_FLUSHER_TYPE_NIC) + else if(gds_use_flusher == GDS_FLUSHER_NIC) { if(!flusher_qp) { gds_err("error !flusher_qp\n"); @@ -577,24 +599,16 @@ int gds_flusher_destroy() int gds_flusher_count_op() { - if(gds_flusher_service == GDS_FLUSHER_TYPE_CPU) + if(gds_use_flusher == GDS_FLUSHER_CPU) return GDS_FLUSHER_OP_CPU; - if(gds_flusher_service == GDS_FLUSHER_TYPE_NIC) + if(gds_use_flusher == GDS_FLUSHER_NIC) return GDS_FLUSHER_OP_NIC; + //in case of native flusher or no flusher return 0; } -void gds_flusher_set_flag(int * flags) -{ - //Enable GPU flusher if GPU has internal flusher (flusher service ignored) - if(gds_gpu_has_flusher == 1) - (*flags) |= GDS_WAIT_POST_FLUSH; - - gds_dbg("flags=%x\n", (*flags)); -} - //Not actually used for now! int gds_flusher_post_stream(CUstream stream) { @@ -604,7 +618,7 @@ int gds_flusher_post_stream(CUstream stream) CHECK_FLUSHER_SERVICE(); - if(gds_flusher_service == GDS_FLUSHER_TYPE_CPU) + if(gds_use_flusher == GDS_FLUSHER_CPU) { //write32 signal desc[k].tag = GDS_TAG_WRITE_VALUE32; @@ -673,7 +687,7 @@ int gds_flusher_add_ops(CUstreamBatchMemOpParams *params, int &idx) CHECK_FLUSHER_SERVICE(); - if(gds_flusher_service == GDS_FLUSHER_TYPE_CPU) + if(gds_use_flusher == GDS_FLUSHER_CPU) { gds_dbg("gds_fill_poke flsign_h=%p, flusher_value+1=%d, idx=%d\n", flsign_h, flusher_value+1, idx); ret = gds_fill_poke(params+idx, (uint32_t*)flsign_h, flusher_value+1, GDS_MEMORY_GPU); @@ -693,7 +707,7 @@ int gds_flusher_add_ops(CUstreamBatchMemOpParams *params, int &idx) } ++idx; } - else if(gds_flusher_service == GDS_FLUSHER_TYPE_NIC) + else if(gds_use_flusher == GDS_FLUSHER_NIC) { ret = gds_fill_poke(params+idx, (uint32_t*)flread_d.buf_d, flusher_value+1, GDS_MEMORY_GPU); if(ret) @@ -730,7 +744,7 @@ int gds_flusher_add_ops(CUstreamBatchMemOpParams *params, int &idx) } - gds_dbg("Final idx=%d\n", idx); + gds_dbg("Final idx=%d\n", idx); ++flusher_value; diff --git a/src/flusher.hpp b/src/flusher.hpp index b9e10cf..11f7966 100644 --- a/src/flusher.hpp +++ b/src/flusher.hpp @@ -55,27 +55,20 @@ using namespace std; #include "memmgr.hpp" #include "archutils.h" -#define GDS_FLUSHER_TYPE_CPU 1 -#define GDS_FLUSHER_TYPE_NIC 2 +typedef enum { + GDS_FLUSHER_NONE=0, + GDS_FLUSHER_NATIVE, + GDS_FLUSHER_CPU, + GDS_FLUSHER_NIC +} gds_flusher_type; +#define GDS_FLUSHER_OP_NATIVE 0 #define GDS_FLUSHER_OP_CPU 2 #define GDS_FLUSHER_OP_NIC 5 #define GDS_FLUSHER_PORT 1 #define GDS_FLUSHER_QKEY 0 //0x11111111 -#define CUDA_CHECK(stmt) \ -do { \ - cudaError_t result = (stmt); \ - if (cudaSuccess != result) { \ - fprintf(stderr, "[%s] [%d] cuda failed with %s \n", \ - __FILE__, __LINE__, cudaGetErrorString(result));\ - exit(EXIT_FAILURE); \ - } \ - assert(cudaSuccess == result); \ -} while (0) - - typedef struct gds_flusher_buf { CUdeviceptr buf_d; void * buf_h; @@ -102,11 +95,12 @@ typedef struct flusher_qp_info { typedef flusher_qp_info * flusher_qp_info_t; //----------------------------------------------------------------------------- -int gds_gpu_flusher_env(); +bool gds_use_native_flusher(); +int gds_flusher_setup(); +int gds_flusher_get_envars(); int gds_flusher_init(struct ibv_pd *pd, struct ibv_context *context, int gpu_id); int gds_flusher_destroy(); int gds_flusher_count_op(); -void gds_flusher_set_flag(int * flags); int gds_flusher_post_stream(CUstream stream); int gds_flusher_add_ops(CUstreamBatchMemOpParams *params, int &idx); diff --git a/src/gdsync.cpp b/src/gdsync.cpp index 848ca66..892a9a9 100644 --- a/src/gdsync.cpp +++ b/src/gdsync.cpp @@ -414,7 +414,7 @@ static int gds_fill_poll(CUstreamBatchMemOpParams *param, CUdeviceptr ptr, uint3 assert(ptr); assert((((unsigned long)ptr) & 0x3) == 0); - //bool need_flush = (flags & GDS_WAIT_POST_FLUSH) ? true : false; + bool need_flush = (flags & GDS_WAIT_POST_FLUSH) ? true : false; param->operation = CU_STREAM_MEM_OP_WAIT_VALUE_32; param->waitValue.address = dev_ptr; @@ -439,7 +439,7 @@ static int gds_fill_poll(CUstreamBatchMemOpParams *param, CUdeviceptr ptr, uint3 } //No longer supported since CUDA 9.1 - //if (need_flush) param->waitValue.flags |= CU_STREAM_WAIT_VALUE_FLUSH; + if (need_flush) param->waitValue.flags |= CU_STREAM_WAIT_VALUE_FLUSH; gds_dbg("op=%d addr=%p value=%08x cond=%s flags=%08x\n", param->operation, @@ -734,9 +734,9 @@ int gds_post_ops(size_t n_ops, struct peer_op_wr *op, CUstreamBatchMemOpParams * uint32_t data = op->wr.dword_va.data; // TODO: properly handle a following fence instead of blidly flushing int flags = 0; - if (!(post_flags & GDS_POST_OPS_DISCARD_WAIT_FLUSH)) - gds_flusher_set_flag(&flags); - + if (!(post_flags & GDS_POST_OPS_DISCARD_WAIT_FLUSH) && gds_use_native_flusher()) + flags |= GDS_WAIT_POST_FLUSH; + gds_dbg("OP_WAIT_DWORD dev_ptr=%llx data=%"PRIx32"\n", dev_ptr, data); switch(op->type) { @@ -1386,6 +1386,7 @@ struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context, gds gds_peer *peer = NULL; gds_peer_attr *peer_attr = NULL; int old_errno = errno; + bool is_qp_flusher = (flags & GDS_CREATE_QP_FLUSHER); gds_dbg("pd=%p context=%p gpu_id=%d flags=%08x errno=%d\n", pd, context, gpu_id, flags, errno); assert(pd); @@ -1414,7 +1415,7 @@ struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context, gds } qp_attr->send_cq = tx_cq; gds_dbg("created send_cq=%p\n", qp_attr->send_cq); - if(!(flags & GDS_CREATE_QP_FLUSHER)) + if(!is_qp_flusher) { gds_dbg("creating RX CQ\n"); rx_cq = gds_create_cq(context, qp_attr->cap.max_recv_wr, NULL, NULL, 0, gpu_id, @@ -1486,7 +1487,7 @@ struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context, gds goto err_free_cqs; } - if(!(flags & GDS_CREATE_QP_FLUSHER)) + if(!is_qp_flusher) gds_warn("created gds_qp=%p\n", gqp); else gds_warn("created flusher gds_qp=%p\n", gqp); @@ -1497,7 +1498,7 @@ struct gds_qp *gds_create_qp(struct ibv_pd *pd, struct ibv_context *context, gds gqp->recv_cq.cq = qp->recv_cq; gqp->recv_cq.curr_offset = 0; - if(!(flags & GDS_CREATE_QP_FLUSHER)) + if(!is_qp_flusher) { if(gds_flusher_init(pd, context, gpu_id)) { From d0585375a1a2693aea3d268d3930adb437626c27 Mon Sep 17 00:00:00 2001 From: e-ago Date: Mon, 5 Feb 2018 16:38:51 +0100 Subject: [PATCH 7/7] enum gds_flusher_type renamed --- src/flusher.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flusher.hpp b/src/flusher.hpp index 11f7966..8dd6696 100644 --- a/src/flusher.hpp +++ b/src/flusher.hpp @@ -55,12 +55,12 @@ using namespace std; #include "memmgr.hpp" #include "archutils.h" -typedef enum { +typedef enum gds_flusher_type { GDS_FLUSHER_NONE=0, GDS_FLUSHER_NATIVE, GDS_FLUSHER_CPU, GDS_FLUSHER_NIC -} gds_flusher_type; +} gds_flusher_type_t; #define GDS_FLUSHER_OP_NATIVE 0 #define GDS_FLUSHER_OP_CPU 2