Commit 85ee8897 authored by Daniel Verkamp's avatar Daniel Verkamp Committed by Benjamin Walker
Browse files

nvmf: move SGL processing to a generic function



Create spdk_nvmf_request_prep_data(), which handles SGL processing and
data transfer for all command types, and spdk_nvmf_request_exec(), which
executes a command after data transfer has completed.

Change-Id: I51c2196260dd0686db8acca4d8f7c93e17618c2f
Signed-off-by: default avatarDaniel Verkamp <daniel.verkamp@intel.com>
parent a9282085
Loading
Loading
Loading
Loading
+183 −268
Original line number Diff line number Diff line
@@ -398,26 +398,6 @@ spdk_nvmf_send_response(struct spdk_nvmf_conn *conn, struct nvmf_request *req)
	return nvmf_post_rdma_send(conn, req->fabric_tx_ctx);
}

static int
nvmf_io_cmd_continue(struct spdk_nvmf_conn *conn, struct nvmf_request *req)
{
	int ret;

	/* send to NVMf library for backend NVMe processing */
	ret = nvmf_process_io_cmd(req);
	if (ret) {
		/* library failed the request and should have
		   Updated the response */
		SPDK_TRACELOG(SPDK_TRACE_DEBUG, " send nvme io cmd capsule error response\n");
		ret = spdk_nvmf_send_response(conn, req);
		if (ret) {
			SPDK_ERRLOG("Unable to send aq qp tx descriptor\n");
			return -1;
		}
	}
	return 0;
}

static void
nvmf_process_async_completion(struct nvmf_request *req)
{
@@ -562,103 +542,10 @@ static void nvmf_trace_command(struct spdk_nvmf_capsule_cmd *cap_hdr, enum conn_

static int
nvmf_process_io_command(struct spdk_nvmf_conn *conn,
			struct nvme_qp_tx_desc *tx_desc)
			struct nvmf_request *req)
{
	struct nvme_qp_rx_desc *rx_desc = tx_desc->rx_desc;
	struct nvmf_request *req;
	struct spdk_nvme_sgl_descriptor *sgl;
	struct spdk_nvmf_keyed_sgl_descriptor *keyed_sgl;
	struct spdk_nvme_cmd *cmd;
	enum spdk_nvme_data_transfer xfer;
	int	ret;

	req = &tx_desc->req_state;
	cmd = &req->cmd->nvme_cmd;
	sgl = (struct spdk_nvme_sgl_descriptor *)&cmd->dptr.sgl1;
	keyed_sgl = (struct spdk_nvmf_keyed_sgl_descriptor *)sgl;

	xfer = spdk_nvme_opc_get_data_transfer(cmd->opc);
	if (xfer != SPDK_NVME_DATA_NONE) {
		/*
		  NVMf does support in-capsule data for write comamnds.  If caller indicates SGL,
		  verify the SGL for in-capsule or RDMA read/write use and prepare
		  data buffer reference and length for the NVMf library.
		*/
		/* TBD: add code to handle I/O larger than default bb size */
		if (sgl->type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK &&
		    (sgl->type_specific == SPDK_NVME_SGL_SUBTYPE_ADDRESS ||
		     sgl->type_specific == SPDK_NVME_SGL_SUBTYPE_INVALIDATE_KEY)) {
			if (keyed_sgl->key == 0) {
				SPDK_ERRLOG("Host did not specify SGL key!\n");
				goto command_fail;
			}

			if (keyed_sgl->length > rx_desc->bb_sgl.length) {
				SPDK_ERRLOG("SGL length 0x%x exceeds BB length 0x%x\n",
					    (uint32_t)keyed_sgl->length, rx_desc->bb_sgl.length);
				goto command_fail;
			}

			req->data = rx_desc->bb;
			req->remote_addr = keyed_sgl->address;
			req->rkey = keyed_sgl->key;
			req->length = keyed_sgl->length;
		}  else if (sgl->type == SPDK_NVME_SGL_TYPE_DATA_BLOCK &&
			    sgl->type_specific == SPDK_NVME_SGL_SUBTYPE_OFFSET) {
			uint64_t offset = sgl->address;
			uint32_t max_len = rx_desc->bb_sgl.length;

			if (offset > max_len) {
				SPDK_ERRLOG("In-capsule offset 0x%" PRIx64 " exceeds capsule length 0x%x\n",
					    offset, max_len);
				goto command_fail;
			}
			max_len -= (uint32_t)offset;

			if (sgl->length > max_len) {
				SPDK_ERRLOG("In-capsule data length 0x%x exceeds capsule length 0x%x\n",
					    sgl->length, max_len);
				goto command_fail;
			}

			req->data = rx_desc->bb + offset;
			req->length = sgl->length;
		} else {
			SPDK_ERRLOG("Invalid NVMf I/O Command SGL:  Type %2x, Subtype %2x\n",
				    sgl->type, sgl->type_specific);
			goto command_fail;
		}

		if (req->length == 0) {
			xfer = SPDK_NVME_DATA_NONE;
		}

		req->xfer = xfer;

		/* for any I/O that requires rdma data to be
		   pulled into target BB before processing by
		   the backend NVMe device
		*/
		if (xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) {
			if (sgl->type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK) {
				SPDK_TRACELOG(SPDK_TRACE_RDMA, "	Issuing RDMA Read to get host data\n");
				/* data to be copied from remote host via memory RDMA */

				/* temporarily adjust SGE to only copy what the host is prepared to send. */
				rx_desc->bb_sgl.length = req->length;

				ret = nvmf_post_rdma_read(tx_desc->conn, tx_desc);
				if (ret) {
					SPDK_ERRLOG("Unable to post rdma read tx descriptor\n");
					goto command_fail;
				}
				/* Need to wait for RDMA completion indication where
				it will continue I/O operation */
				return 0;
			}
		}
	}

	/* send to NVMf library for backend NVMe processing */
	ret = nvmf_process_io_cmd(req);
	if (ret) {
@@ -668,54 +555,19 @@ nvmf_process_io_command(struct spdk_nvmf_conn *conn,
		ret = spdk_nvmf_send_response(conn, req);
		if (ret) {
			SPDK_ERRLOG("Unable to send aq qp tx descriptor\n");
			goto command_fail;
			return -1;
		}
	}

	return 0;

command_fail:
	return -1;
}

static int
nvmf_process_admin_command(struct spdk_nvmf_conn *conn,
			   struct nvme_qp_tx_desc *tx_desc)
			   struct nvmf_request *req)
{
	struct nvme_qp_rx_desc *rx_desc = tx_desc->rx_desc;
	struct nvmf_request *req;
	struct spdk_nvme_cmd *cmd;
	struct spdk_nvme_sgl_descriptor *sgl;
	struct spdk_nvmf_keyed_sgl_descriptor *keyed_sgl;
	int	ret;

	req = &tx_desc->req_state;
	cmd = &req->cmd->nvme_cmd;
	sgl = (struct spdk_nvme_sgl_descriptor *)&cmd->dptr.sgl1;
	keyed_sgl = (struct spdk_nvmf_keyed_sgl_descriptor *)sgl;

	/*
	  NVMf does not support in-capsule data for admin command or response capsules.
	  If caller indicates SGL for return RDMA data, verify the SGL and prepare
	  data buffer reference and length for the NVMf library.  Only keyed type
	  SGLs are supported for return data
	 */
	if (sgl->type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK &&
	    (sgl->type_specific == SPDK_NVME_SGL_SUBTYPE_ADDRESS ||
	     sgl->type_specific == SPDK_NVME_SGL_SUBTYPE_INVALIDATE_KEY)) {
		req->data = rx_desc->bb;
		req->remote_addr = keyed_sgl->address;
		req->rkey = keyed_sgl->key;
		req->length = keyed_sgl->length;
		if (req->length) {
			req->xfer = spdk_nvme_opc_get_data_transfer(cmd->opc);
		}
	}

	SPDK_TRACELOG(SPDK_TRACE_DEBUG, "	tx_desc %p: req_state %p, rsp %p, addr %p\n",
		      tx_desc, req, (void *)req->rsp, (void *)tx_desc->send_sgl.addr);

	/* send to NVMf library for backend NVMe processing */
	ret = nvmf_process_admin_cmd(req);
	if (ret) {
		/* library failed the request and should have
@@ -724,14 +576,11 @@ nvmf_process_admin_command(struct spdk_nvmf_conn *conn,
		ret = spdk_nvmf_send_response(conn, req);
		if (ret) {
			SPDK_ERRLOG("Unable to send aq qp tx descriptor\n");
			goto command_fail;
			return -1;
		}
	}

	return 0;

command_fail:
	return -1;
}

static void
@@ -772,7 +621,7 @@ nvmf_init_conn_properites(struct spdk_nvmf_conn *conn,
}

static int
nvmf_connect_continue(struct spdk_nvmf_conn *conn,
nvmf_process_connect(struct spdk_nvmf_conn *conn,
		     struct nvmf_request *req)
{
	struct spdk_nvmf_fabric_connect_cmd *connect;
@@ -781,11 +630,22 @@ nvmf_connect_continue(struct spdk_nvmf_conn *conn,
	struct nvmf_session *session;
	int ret;

	if (req->length < sizeof(struct spdk_nvmf_fabric_connect_data)) {
		SPDK_ERRLOG("Connect command data length 0x%x too small\n", req->length);
		return -1;
	}

	connect = &req->cmd->connect_cmd;
	connect_data = (struct spdk_nvmf_fabric_connect_data *)req->data;

	RTE_VERIFY(connect_data != NULL);

	SPDK_TRACELOG(SPDK_TRACE_NVMF, "    *** Connect Capsule *** %p\n", connect);
	SPDK_TRACELOG(SPDK_TRACE_NVMF, "    *** cid              = %x ***\n", connect->cid);
	SPDK_TRACELOG(SPDK_TRACE_NVMF, "    *** recfmt           = %x ***\n", connect->recfmt);
	SPDK_TRACELOG(SPDK_TRACE_NVMF, "    *** qid              = %x ***\n", connect->qid);
	SPDK_TRACELOG(SPDK_TRACE_NVMF, "    *** sqsize           = %x ***\n", connect->sqsize);

	SPDK_TRACELOG(SPDK_TRACE_NVMF, "    *** Connect Capsule Data *** %p\n", connect_data);
	SPDK_TRACELOG(SPDK_TRACE_NVMF, "    *** cntlid  = %x ***\n", connect_data->cntlid);
	SPDK_TRACELOG(SPDK_TRACE_NVMF, "    *** hostid = %04x%04x-%04x-%04x-%04x-%04x%04x%04x ***\n",
@@ -830,95 +690,162 @@ nvmf_connect_continue(struct spdk_nvmf_conn *conn,
}

static int
nvmf_process_connect(struct spdk_nvmf_conn *conn,
		     struct nvme_qp_tx_desc *tx_desc)
nvmf_process_fabrics_command(struct spdk_nvmf_conn *conn, struct nvmf_request *req)
{
	struct spdk_nvmf_fabric_connect_cmd *connect;
	struct nvmf_request *req = &tx_desc->req_state;
	struct nvme_qp_rx_desc *rx_desc = tx_desc->rx_desc;
	union sgl_shift *sgl;
	int	ret;
	struct spdk_nvmf_capsule_cmd *cap_hdr;

	connect = &req->cmd->connect_cmd;
	sgl = (union sgl_shift *)&connect->sgl1;
	cap_hdr = &req->cmd->nvmf_cmd;

	/* debug - display the connect capsule */
	SPDK_TRACELOG(SPDK_TRACE_NVMF, "    *** Connect Capsule *** %p\n", connect);
	SPDK_TRACELOG(SPDK_TRACE_NVMF, "    *** cid              = %x ***\n", connect->cid);
	SPDK_TRACELOG(SPDK_TRACE_NVMF, "    *** recfmt           = %x ***\n", connect->recfmt);
	SPDK_TRACELOG(SPDK_TRACE_NVMF, "    *** qid              = %x ***\n", connect->qid);
	SPDK_TRACELOG(SPDK_TRACE_NVMF, "    *** sqsize           = %x ***\n", connect->sqsize);
	switch (cap_hdr->fctype) {
	case SPDK_NVMF_FABRIC_COMMAND_PROPERTY_SET:
		return nvmf_process_property_set(conn, req);
	case SPDK_NVMF_FABRIC_COMMAND_PROPERTY_GET:
		return nvmf_process_property_get(conn, req);
	case SPDK_NVMF_FABRIC_COMMAND_CONNECT:
		return nvmf_process_connect(conn, req);
	default:
		SPDK_TRACELOG(SPDK_TRACE_DEBUG, "recv capsule header type invalid [%x]!\n",
			      cap_hdr->fctype);
		return 1; /* skip, do nothing */
	}
}

	if (sgl->nvmf_sgl.type == SPDK_NVME_SGL_TYPE_DATA_BLOCK &&
	    sgl->nvmf_sgl.subtype == SPDK_NVME_SGL_SUBTYPE_OFFSET) {
/*
		  Extended data was passed by initiator to target via in-capsule
		  data and not via RDMA SGL xfer.  So extended data resides in
		  the rx message buffer
 * Prepare the nvmf_request data and length fields.
 *
 * A data transfer will be initiated if required by the request.
 *
 * \return 1 on success with data immediately available (in-capsule data or controller to host),
 *         0 if host to controller transfer was initiated (command will be issued pending completion
 *         of transfer), or negative on error.
 */
		SPDK_TRACELOG(SPDK_TRACE_NVMF, "	Using In-Capsule connect data\n");
		if (rx_desc->recv_bc < (sizeof(struct spdk_nvmf_fabric_connect_cmd) +
					sizeof(struct spdk_nvmf_fabric_connect_data))) {
			SPDK_ERRLOG("insufficient in-capsule data to satisfy connect!\n");
			goto connect_fail;
static int
spdk_nvmf_request_prep_data(struct nvmf_request *req)
{
	struct nvme_qp_tx_desc *tx_desc = req->fabric_tx_ctx;
	struct nvme_qp_rx_desc *rx_desc = tx_desc->rx_desc;
	struct spdk_nvmf_conn *conn = tx_desc->conn;
	struct spdk_nvme_cmd *cmd = &req->cmd->nvme_cmd;
	enum spdk_nvme_data_transfer xfer;
	int ret;

	if (cmd->opc == SPDK_NVMF_FABRIC_OPCODE) {
		xfer = spdk_nvme_opc_get_data_transfer(req->cmd->nvmf_cmd.fctype);
	} else {
		xfer = spdk_nvme_opc_get_data_transfer(cmd->opc);
	}

	if (xfer != SPDK_NVME_DATA_NONE) {
		struct spdk_nvme_sgl_descriptor *sgl = (struct spdk_nvme_sgl_descriptor *)&cmd->dptr.sgl1;
		struct spdk_nvmf_keyed_sgl_descriptor *keyed_sgl = (struct spdk_nvmf_keyed_sgl_descriptor *)sgl;

		if (sgl->type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK &&
		    (sgl->type_specific == SPDK_NVME_SGL_SUBTYPE_ADDRESS ||
		     sgl->type_specific == SPDK_NVME_SGL_SUBTYPE_INVALIDATE_KEY)) {
			SPDK_TRACELOG(SPDK_TRACE_RDMA, "Keyed data block: raddr 0x%" PRIx64 ", rkey 0x%x, length 0x%x\n",
				      keyed_sgl->address, keyed_sgl->key, keyed_sgl->length);

			if (keyed_sgl->length > rx_desc->bb_sgl.length) {
				SPDK_ERRLOG("SGL length 0x%x exceeds BB length 0x%x\n",
					    (uint32_t)keyed_sgl->length, rx_desc->bb_sgl.length);
				return -1;
			}

			req->data = rx_desc->bb;
		req->length = sgl->nvmf_sgl.length;
		return nvmf_connect_continue(conn, req);
	} else if (sgl->nvmf_sgl.type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK &&
		   (sgl->nvmf_sgl.subtype == SPDK_NVME_SGL_SUBTYPE_ADDRESS ||
		    sgl->nvmf_sgl.subtype == SPDK_NVME_SGL_SUBTYPE_INVALIDATE_KEY)) {
		/* setup a new SQE that uses local bounce buffer */
		req->data = rx_desc->bb;
		req->remote_addr = sgl->nvmf_sgl.address;
		req->rkey = sgl->nvmf_sgl.key;
		req->length = sgl->nvmf_sgl.length;
		req->xfer = SPDK_NVME_DATA_HOST_TO_CONTROLLER;
			req->remote_addr = keyed_sgl->address;
			req->rkey = keyed_sgl->key;
			req->length = keyed_sgl->length;
		} else if (sgl->type == SPDK_NVME_SGL_TYPE_DATA_BLOCK &&
			   sgl->type_specific == SPDK_NVME_SGL_SUBTYPE_OFFSET) {
			uint64_t offset = sgl->address;
			uint32_t max_len = rx_desc->bb_sgl.length;

		SPDK_TRACELOG(SPDK_TRACE_RDMA, "	Issuing RDMA Read to get host connect data\n");
		/* data to be copied from host via memory RDMA */
			SPDK_TRACELOG(SPDK_TRACE_RDMA, "In-capsule data: offset 0x%" PRIx64 ", length 0x%x\n",
				      offset, sgl->length);

			if (conn->type == CONN_TYPE_AQ) {
				SPDK_ERRLOG("In-capsule data not allowed for admin queue\n");
				return -1;
			}

			if (offset > max_len) {
				SPDK_ERRLOG("In-capsule offset 0x%" PRIx64 " exceeds capsule length 0x%x\n",
					    offset, max_len);
				return -1;
			}
			max_len -= (uint32_t)offset;

			if (sgl->length > max_len) {
				SPDK_ERRLOG("In-capsule data length 0x%x exceeds capsule length 0x%x\n",
					    sgl->length, max_len);
				return -1;
			}

			req->data = rx_desc->bb + offset;
			req->length = sgl->length;
		} else {
			SPDK_ERRLOG("Invalid NVMf I/O Command SGL:  Type 0x%x, Subtype 0x%x\n",
				    sgl->type, sgl->type_specific);
			return -1;
		}

		if (req->length == 0) {
			xfer = SPDK_NVME_DATA_NONE;
			req->data = NULL;
		}

		req->xfer = xfer;

		/*
		 * For any I/O that requires data to be
		 * pulled into target BB before processing by
		 * the backend NVMe device
		 */
		if (xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) {
			if (sgl->type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK) {
				SPDK_TRACELOG(SPDK_TRACE_RDMA, "Issuing RDMA Read to get host data\n");

				/* temporarily adjust SGE to only copy what the host is prepared to send. */
				rx_desc->bb_sgl.length = req->length;

		ret = nvmf_post_rdma_read(tx_desc->conn, tx_desc);
				ret = nvmf_post_rdma_read(conn, tx_desc);
				if (ret) {
					SPDK_ERRLOG("Unable to post rdma read tx descriptor\n");
			goto connect_fail;
					return -1;
				}
		/* Need to wait for RDMA completion indication where
		   it will continue connect operation */

				/* Wait for transfer to complete before executing command. */
				return 1;
			}
		}
	}

	if (xfer == SPDK_NVME_DATA_NONE) {
		SPDK_TRACELOG(SPDK_TRACE_RDMA, "No data to transfer\n");
		RTE_VERIFY(req->data == NULL);
		RTE_VERIFY(req->length == 0);
	} else {
		SPDK_ERRLOG("Invalid NVMf Connect SGL:  Type %2x, Subtype %2x\n",
			    sgl->nvmf_sgl.type, sgl->nvmf_sgl.subtype);
		goto connect_fail;
		RTE_VERIFY(req->data != NULL);
		RTE_VERIFY(req->length != 0);
		SPDK_TRACELOG(SPDK_TRACE_RDMA, "%s data ready\n",
			      xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER ? "Host to Controller" :
			      "Controller to Host");
	}
	return 0;

connect_fail:
	return -1;
	return 0;
}

static int
nvmf_process_fabrics_command(struct spdk_nvmf_conn *conn, struct nvme_qp_tx_desc *tx_desc)
spdk_nvmf_request_exec(struct spdk_nvmf_conn *conn, struct nvmf_request *req)
{
	struct nvmf_request *req = &tx_desc->req_state;
	struct nvme_qp_rx_desc *rx_desc = tx_desc->rx_desc;
	struct spdk_nvmf_capsule_cmd *cap_hdr;

	cap_hdr = (struct spdk_nvmf_capsule_cmd *)&rx_desc->msg_buf;
	struct spdk_nvme_cmd *cmd = &req->cmd->nvme_cmd;

	switch (cap_hdr->fctype) {
	case SPDK_NVMF_FABRIC_COMMAND_PROPERTY_SET:
		return nvmf_process_property_set(conn, req);
	case SPDK_NVMF_FABRIC_COMMAND_PROPERTY_GET:
		return nvmf_process_property_get(conn, req);
	case SPDK_NVMF_FABRIC_COMMAND_CONNECT:
		return nvmf_process_connect(conn, tx_desc);
	default:
		SPDK_TRACELOG(SPDK_TRACE_DEBUG, "recv capsule header type invalid [%x]!\n",
			      cap_hdr->fctype);
		return 1; /* skip, do nothing */
	if (cmd->opc == SPDK_NVMF_FABRIC_OPCODE) {
		return nvmf_process_fabrics_command(conn, req);
	} else if (conn->type == CONN_TYPE_AQ) {
		return nvmf_process_admin_command(conn, req);
	} else {
		return nvmf_process_io_command(conn, req);
	}
}

@@ -998,29 +925,33 @@ static int nvmf_recv(struct spdk_nvmf_conn *conn, struct ibv_wc *wc)

	nvmf_trace_command(cap_hdr, conn->type);

	if (cap_hdr->opcode == SPDK_NVMF_FABRIC_OPCODE) {
		ret = nvmf_process_fabrics_command(conn, tx_desc);
	} else if (conn->type == CONN_TYPE_AQ) {
		ret = nvmf_process_admin_command(conn, tx_desc);
	} else {
		ret = nvmf_process_io_command(conn, tx_desc);
	ret = spdk_nvmf_request_prep_data(req);
	if (ret < 0) {
		SPDK_ERRLOG("prep_data failed\n");
		goto recv_error;
	}

	if (ret == 0) {
		/* Data is available now; execute command immediately. */
		ret = spdk_nvmf_request_exec(conn, req);
		if (ret < 0) {
			SPDK_ERRLOG("Command execution failed\n");
			goto recv_error;
		}

	/* re-post rx_desc and re-queue tx_desc here,
	   there is not a delayed posting because of
	   command processing.
	 */
		if (ret == 1) {
			/*
			 * Immediate completion.
			 * Re-post rx_desc and re-queue tx_desc here,
			 * there is not a delayed posting because of
			 * command processing.
			 */
			tx_desc->rx_desc = NULL;
			nvmf_deactive_tx_desc(tx_desc);
		tx_desc = NULL;
			if (nvmf_post_rdma_recv(conn, rx_desc)) {
				SPDK_ERRLOG("Unable to re-post aq rx descriptor\n");
			goto recv_error;
				return -1;
			}
		}
	}

@@ -1096,27 +1027,11 @@ static int nvmf_cq_event_handler(struct spdk_nvmf_conn *conn)
			tx_desc = (struct nvme_qp_tx_desc *)wc.wr_id;
			spdk_trace_record(TRACE_RDMA_READ_COMPLETE, 0, 0, (uint64_t)tx_desc->rx_desc, 0);
			req = &tx_desc->req_state;
			if (req->cmd->nvme_cmd.opc == SPDK_NVMF_FABRIC_OPCODE) {
				switch (req->cmd->nvmf_cmd.fctype) {
				case SPDK_NVMF_FABRIC_COMMAND_CONNECT:
					if (nvmf_connect_continue(conn, req)) {
						SPDK_ERRLOG("nvmf_connect_continue() failed\n");
						goto handler_error;
					}
					break;
				default:
					SPDK_ERRLOG("Unhandled fctype 0x%02X\n", req->cmd->nvmf_cmd.fctype);
					return -1;
				}
			} else if (conn->type == CONN_TYPE_AQ) {
				rte_panic("AQ continuation not implemented yet\n");
			} else {
				rc = nvmf_io_cmd_continue(conn, req);
			rc = spdk_nvmf_request_exec(conn, req);
			if (rc) {
					SPDK_ERRLOG("error from io cmd continue\n");
				SPDK_ERRLOG("request_exec error %d after RDMA Read completion\n", rc);
				goto handler_error;
			}
			}

			/*
			 * Check for any pending rdma_reads to start