Commit 1435ee36 authored by Shuhei Matsumoto's avatar Shuhei Matsumoto Committed by Tomasz Zawadzki
Browse files

lib/iscsi: Make the data_in_cnt only used for subread tasks.



Purpose: Do not let the primary task do the I/O if it is
splitted into subtasks. This will make the code simplier,
when all the sub I/Os are finished, we can free the
primary task.

Update the corresponding unit test also.

As a result of this change, when read I/O is split into subtasks,
the primary task uses only some of its data. Hence separate
iscsi_pdu_payload_op_scsi_read() into split and non-split
case explicitly.

Signed-off-by: default avatarZiye Yang <ziye.yang@intel.com>
Change-Id: I9bbe4b8dd92a2996f35ad810b33676e34670c77e
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/473532


Reviewed-by: default avatarChangpeng Liu <changpeng.liu@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
parent eb7e1f7e
Loading
Loading
Loading
Loading
+24 −40
Original line number Diff line number Diff line
@@ -304,35 +304,15 @@ error_return:
void
spdk_iscsi_conn_free_pdu(struct spdk_iscsi_conn *conn, struct spdk_iscsi_pdu *pdu)
{
	struct spdk_iscsi_task *primary;

	if (pdu->task) {
		primary = spdk_iscsi_task_get_primary(pdu->task);
		if (pdu->bhs.opcode == ISCSI_OP_SCSI_DATAIN) {
			if (pdu->task->scsi.offset > 0) {
				assert(conn->data_in_cnt > 0);
				conn->data_in_cnt--;
				if (pdu->bhs.flags & ISCSI_DATAIN_STATUS) {
					/* Free the primary task after the last subtask done */
			if (pdu->task != spdk_iscsi_task_get_primary(pdu->task)) {
				assert(conn->data_in_cnt > 0);
				conn->data_in_cnt--;
					spdk_iscsi_task_put(primary);
				}
				spdk_iscsi_conn_handle_queued_datain_tasks(conn);
			}
		} else if (pdu->bhs.opcode == ISCSI_OP_SCSI_RSP &&
			   pdu->task->scsi.status != SPDK_SCSI_STATUS_GOOD) {
			if (pdu->task->scsi.offset > 0) {
				if (spdk_iscsi_task_is_read(primary)) {
					if (primary->bytes_completed == primary->scsi.transfer_len) {
						/* Free the primary task after the last subtask done */
						assert(conn->data_in_cnt > 0);
						conn->data_in_cnt--;
						spdk_iscsi_task_put(primary);
					}
				}
			}
		}

		spdk_iscsi_task_put(pdu->task);
	}
	spdk_put_pdu(pdu);
@@ -1035,6 +1015,10 @@ process_completed_read_subtask_list(struct spdk_iscsi_conn *conn,
			break;
		}
	}

	if (primary->bytes_completed == primary->scsi.transfer_len) {
		spdk_iscsi_task_put(primary);
	}
}

static void
@@ -1063,8 +1047,14 @@ process_read_task_completion(struct spdk_iscsi_conn *conn,
		iscsi_task_copy_from_rsp_scsi_status(&task->scsi, primary);
	}

	if ((task != primary) &&
	    (task->scsi.offset != primary->bytes_completed)) {
	if (task == primary) {
		primary->bytes_completed = task->scsi.length;
		/* For non split read I/O */
		assert(primary->bytes_completed == task->scsi.transfer_len);
		spdk_iscsi_task_response(conn, task);
		spdk_iscsi_task_put(task);
	} else {
		if (task->scsi.offset != primary->bytes_completed) {
			TAILQ_FOREACH(tmp, &primary->subtask_list, subtask_link) {
				if (task->scsi.offset < tmp->scsi.offset) {
					TAILQ_INSERT_BEFORE(tmp, task, subtask_link);
@@ -1073,17 +1063,11 @@ process_read_task_completion(struct spdk_iscsi_conn *conn,
			}

			TAILQ_INSERT_TAIL(&primary->subtask_list, task, subtask_link);
		return;
		} else {
			TAILQ_INSERT_HEAD(&primary->subtask_list, task, subtask_link);
			process_completed_read_subtask_list(conn, primary);
		}

	primary->bytes_completed += task->scsi.length;
	spdk_iscsi_task_response(conn, task);

	if ((task != primary) ||
	    (task->scsi.transfer_len == task->scsi.length)) {
		spdk_iscsi_task_put(task);
	}
	process_completed_read_subtask_list(conn, primary);
}

static void
+18 −45
Original line number Diff line number Diff line
@@ -3039,7 +3039,6 @@ iscsi_transfer_in(struct spdk_iscsi_conn *conn, struct spdk_iscsi_task *task)
			assert(conn->data_in_cnt > 0);
			conn->data_in_cnt--;
		}

		return 0;
	}

@@ -3253,19 +3252,6 @@ int spdk_iscsi_conn_handle_queued_datain_tasks(struct spdk_iscsi_conn *conn)
	       conn->data_in_cnt < MAX_LARGE_DATAIN_PER_CONNECTION) {
		task = TAILQ_FIRST(&conn->queued_datain_tasks);
		assert(task->current_datain_offset <= task->scsi.transfer_len);

		if (task->current_datain_offset == 0) {
			if (spdk_scsi_dev_get_lun(conn->dev, task->lun_id) == NULL) {
				TAILQ_REMOVE(&conn->queued_datain_tasks, task, link);
				spdk_scsi_task_process_null_lun(&task->scsi);
				spdk_iscsi_task_cpl(&task->scsi);
				return 0;
			}
			task->current_datain_offset = task->scsi.length;
			conn->data_in_cnt++;
			iscsi_queue_task(conn, task);
			continue;
		}
		if (task->current_datain_offset < task->scsi.transfer_len) {
			struct spdk_iscsi_task *subtask;
			uint32_t remaining_size = 0;
@@ -3274,22 +3260,22 @@ int spdk_iscsi_conn_handle_queued_datain_tasks(struct spdk_iscsi_conn *conn)
			subtask = spdk_iscsi_task_get(conn, task, spdk_iscsi_task_cpl);
			assert(subtask != NULL);
			subtask->scsi.offset = task->current_datain_offset;
			subtask->scsi.length = DMIN32(SPDK_BDEV_LARGE_BUF_MAX_SIZE, remaining_size);
			spdk_scsi_task_set_data(&subtask->scsi, NULL, 0);
			task->current_datain_offset += subtask->scsi.length;
			conn->data_in_cnt++;

			if (spdk_scsi_dev_get_lun(conn->dev, task->lun_id) == NULL) {
				/* Remove the primary task from the list if this is the last subtask */
				if (task->current_datain_offset == task->scsi.transfer_len) {
				/* Stop submitting split read I/Os for remaining data. */
				TAILQ_REMOVE(&conn->queued_datain_tasks, task, link);
				}
				subtask->scsi.transfer_len = subtask->scsi.length;
				task->current_datain_offset += remaining_size;
				assert(task->current_datain_offset == task->scsi.transfer_len);
				subtask->scsi.transfer_len = remaining_size;
				spdk_scsi_task_process_null_lun(&subtask->scsi);
				spdk_iscsi_task_cpl(&subtask->scsi);
				return 0;
			}

			subtask->scsi.length = DMIN32(SPDK_BDEV_LARGE_BUF_MAX_SIZE, remaining_size);
			task->current_datain_offset += subtask->scsi.length;
			iscsi_queue_task(conn, subtask);
		}
		if (task->current_datain_offset == task->scsi.transfer_len) {
@@ -3302,26 +3288,22 @@ int spdk_iscsi_conn_handle_queued_datain_tasks(struct spdk_iscsi_conn *conn)
static int
iscsi_pdu_payload_op_scsi_read(struct spdk_iscsi_conn *conn, struct spdk_iscsi_task *task)
{
	int32_t remaining_size;

	TAILQ_INIT(&task->subtask_list);
	if (task->scsi.transfer_len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
		task->parent = NULL;
		task->scsi.offset = 0;
		task->scsi.length = DMIN32(SPDK_BDEV_LARGE_BUF_MAX_SIZE, task->scsi.transfer_len);
		spdk_scsi_task_set_data(&task->scsi, NULL, 0);

	remaining_size = task->scsi.transfer_len - task->scsi.length;
	task->current_datain_offset = 0;

	if (remaining_size == 0) {
		iscsi_queue_task(conn, task);
		return 0;
	}

	} else {
		TAILQ_INIT(&task->subtask_list);
		task->current_datain_offset = 0;
		TAILQ_INSERT_TAIL(&conn->queued_datain_tasks, task, link);

		return spdk_iscsi_conn_handle_queued_datain_tasks(conn);
	}
}

static int
iscsi_pdu_payload_op_scsi_write(struct spdk_iscsi_conn *conn, struct spdk_iscsi_task *task)
@@ -3613,15 +3595,6 @@ _iscsi_conn_abort_queued_datain_task(struct spdk_iscsi_conn *conn,

	while (conn->data_in_cnt < MAX_LARGE_DATAIN_PER_CONNECTION) {
		assert(task->current_datain_offset <= task->scsi.transfer_len);

		/* If no IO is submitted yet, just abort the primary task. */
		if (task->current_datain_offset == 0) {
			TAILQ_REMOVE(&conn->queued_datain_tasks, task, link);
			spdk_scsi_task_process_abort(&task->scsi);
			spdk_iscsi_task_cpl(&task->scsi);
			return 0;
		}

		/* If any IO is submitted already, abort all subtasks by repetition. */
		if (task->current_datain_offset < task->scsi.transfer_len) {
			remaining_size = task->scsi.transfer_len - task->current_datain_offset;
+21 −0
Original line number Diff line number Diff line
@@ -34,6 +34,22 @@ spdk_iscsi_task_get(struct spdk_iscsi_conn *conn,
	if (!task) {
		return NULL;
	}

	task->conn = conn;
	task->scsi.cpl_fn = cpl_fn;
	if (parent) {
		parent->scsi.ref++;
		task->parent = parent;
		task->tag = parent->tag;
		task->lun_id = parent->lun_id;
		task->scsi.dxfer_dir = parent->scsi.dxfer_dir;
		task->scsi.transfer_len = parent->scsi.transfer_len;
		task->scsi.lun = parent->scsi.lun;
		task->scsi.cdb = parent->scsi.cdb;
		task->scsi.target_port = parent->scsi.target_port;
		task->scsi.initiator_port = parent->scsi.initiator_port;
	}

	task->scsi.iovs = &task->scsi.iov;
	return task;
}
@@ -141,6 +157,11 @@ spdk_iscsi_task_cpl(struct spdk_scsi_task *scsi_task)

	if (scsi_task != NULL) {
		iscsi_task = spdk_iscsi_task_from_scsi_task(scsi_task);
		if (iscsi_task->parent && (iscsi_task->scsi.dxfer_dir == SPDK_SCSI_DIR_FROM_DEV)) {
			assert(iscsi_task->conn->data_in_cnt > 0);
			iscsi_task->conn->data_in_cnt--;
		}

		free(iscsi_task);
	}
}
+41 −60
Original line number Diff line number Diff line
@@ -197,49 +197,27 @@ ut_conn_task_get(struct spdk_iscsi_task *parent)
}

static void
ut_conn_create_read_tasks(int transfer_len)
ut_conn_create_read_tasks(struct spdk_iscsi_task *primary)
{
	struct spdk_iscsi_task *task, *subtask;
	struct spdk_iscsi_task *subtask;
	int32_t remaining_size = 0;

	task = ut_conn_task_get(NULL);

	TAILQ_INIT(&task->subtask_list);
	task->scsi.transfer_len = transfer_len;
	task->scsi.offset = 0;
	task->scsi.length = DMIN32(SPDK_BDEV_LARGE_BUF_MAX_SIZE, task->scsi.transfer_len);
	task->scsi.status = SPDK_SCSI_STATUS_GOOD;

	remaining_size = task->scsi.transfer_len - task->scsi.length;
	task->current_datain_offset = 0;

	if (remaining_size == 0) {
		TAILQ_INSERT_TAIL(&g_ut_read_tasks, task, link);
		return;
	}

	while (1) {
		if (task->current_datain_offset == 0) {
			task->current_datain_offset = task->scsi.length;
			TAILQ_INSERT_TAIL(&g_ut_read_tasks, task, link);
			continue;
		}
		if (primary->current_datain_offset < primary->scsi.transfer_len) {
			remaining_size = primary->scsi.transfer_len - primary->current_datain_offset;

		if (task->current_datain_offset < task->scsi.transfer_len) {
			remaining_size = task->scsi.transfer_len - task->current_datain_offset;
			subtask = ut_conn_task_get(primary);

			subtask = ut_conn_task_get(task);

			subtask->scsi.offset = task->current_datain_offset;
			subtask->scsi.offset = primary->current_datain_offset;
			subtask->scsi.length = DMIN32(SPDK_BDEV_LARGE_BUF_MAX_SIZE, remaining_size);
			subtask->scsi.status = SPDK_SCSI_STATUS_GOOD;

			task->current_datain_offset += subtask->scsi.length;
			primary->current_datain_offset += subtask->scsi.length;

			TAILQ_INSERT_TAIL(&g_ut_read_tasks, subtask, link);
		}

		if (task->current_datain_offset == task->scsi.transfer_len) {
		if (primary->current_datain_offset == primary->scsi.transfer_len) {
			break;
		}
	}
@@ -248,51 +226,55 @@ ut_conn_create_read_tasks(int transfer_len)
static void
read_task_split_in_order_case(void)
{
	struct spdk_iscsi_task *primary, *task, *tmp;
	struct spdk_iscsi_task primary = {};
	struct spdk_iscsi_task *task, *tmp;

	primary.scsi.transfer_len = SPDK_BDEV_LARGE_BUF_MAX_SIZE * 8;
	TAILQ_INIT(&primary.subtask_list);
	primary.current_datain_offset = 0;
	primary.bytes_completed = 0;

	ut_conn_create_read_tasks(SPDK_BDEV_LARGE_BUF_MAX_SIZE * 8);
	ut_conn_create_read_tasks(&primary);
	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&g_ut_read_tasks));

	TAILQ_FOREACH(task, &g_ut_read_tasks, link) {
		primary = spdk_iscsi_task_get_primary(task);
		process_read_task_completion(NULL, task, primary);
		CU_ASSERT(&primary == spdk_iscsi_task_get_primary(task));
		process_read_task_completion(NULL, task, &primary);
	}

	primary = TAILQ_FIRST(&g_ut_read_tasks);
	SPDK_CU_ASSERT_FATAL(primary != NULL);

	CU_ASSERT(primary->bytes_completed == primary->scsi.transfer_len);
	CU_ASSERT(primary.bytes_completed == primary.scsi.transfer_len);

	TAILQ_FOREACH_SAFE(task, &g_ut_read_tasks, link, tmp) {
		TAILQ_REMOVE(&g_ut_read_tasks, task, link);
		free(task);
	}

	CU_ASSERT(TAILQ_EMPTY(&g_ut_read_tasks));
}

static void
read_task_split_reverse_order_case(void)
{
	struct spdk_iscsi_task *primary, *task, *tmp;
	struct spdk_iscsi_task primary = {};
	struct spdk_iscsi_task *task, *tmp;

	primary.scsi.transfer_len = SPDK_BDEV_LARGE_BUF_MAX_SIZE * 8;
	TAILQ_INIT(&primary.subtask_list);
	primary.current_datain_offset = 0;
	primary.bytes_completed = 0;

	ut_conn_create_read_tasks(SPDK_BDEV_LARGE_BUF_MAX_SIZE * 8);
	ut_conn_create_read_tasks(&primary);
	SPDK_CU_ASSERT_FATAL(!TAILQ_EMPTY(&g_ut_read_tasks));

	TAILQ_FOREACH_REVERSE(task, &g_ut_read_tasks, read_tasks_head, link) {
		primary = spdk_iscsi_task_get_primary(task);
		process_read_task_completion(NULL, task, primary);
		CU_ASSERT(&primary == spdk_iscsi_task_get_primary(task));
		process_read_task_completion(NULL, task, &primary);
	}

	primary = TAILQ_FIRST(&g_ut_read_tasks);
	SPDK_CU_ASSERT_FATAL(primary != NULL);

	CU_ASSERT(primary->bytes_completed == primary->scsi.transfer_len);
	CU_ASSERT(primary.bytes_completed == primary.scsi.transfer_len);

	TAILQ_FOREACH_SAFE(task, &g_ut_read_tasks, link, tmp) {
		TAILQ_REMOVE(&g_ut_read_tasks, task, link);
		free(task);
	}

	CU_ASSERT(TAILQ_EMPTY(&g_ut_read_tasks));
}

static void
@@ -301,38 +283,37 @@ propagate_scsi_error_status_for_split_read_tasks(void)
	struct spdk_iscsi_task primary, task1, task2, task3, task4, task5, task6;

	memset(&primary, 0, sizeof(struct spdk_iscsi_task));
	primary.scsi.length = 512;
	primary.scsi.status = SPDK_SCSI_STATUS_GOOD;
	primary.scsi.transfer_len = 512 * 6;
	primary.rsp_scsi_status = SPDK_SCSI_STATUS_GOOD;
	TAILQ_INIT(&primary.subtask_list);

	memset(&task1, 0, sizeof(struct spdk_iscsi_task));
	task1.scsi.offset = 512;
	task1.scsi.offset = 0;
	task1.scsi.length = 512;
	task1.scsi.status = SPDK_SCSI_STATUS_GOOD;

	memset(&task2, 0, sizeof(struct spdk_iscsi_task));
	task2.scsi.offset = 512 * 2;
	task2.scsi.offset = 512;
	task2.scsi.length = 512;
	task2.scsi.status = SPDK_SCSI_STATUS_CHECK_CONDITION;

	memset(&task3, 0, sizeof(struct spdk_iscsi_task));
	task3.scsi.offset = 512 * 3;
	task3.scsi.offset = 512 * 2;
	task3.scsi.length = 512;
	task3.scsi.status = SPDK_SCSI_STATUS_GOOD;

	memset(&task4, 0, sizeof(struct spdk_iscsi_task));
	task4.scsi.offset = 512 * 4;
	task4.scsi.offset = 512 * 3;
	task4.scsi.length = 512;
	task4.scsi.status = SPDK_SCSI_STATUS_GOOD;

	memset(&task5, 0, sizeof(struct spdk_iscsi_task));
	task5.scsi.offset = 512 * 5;
	task5.scsi.offset = 512 * 4;
	task5.scsi.length = 512;
	task5.scsi.status = SPDK_SCSI_STATUS_GOOD;

	memset(&task6, 0, sizeof(struct spdk_iscsi_task));
	task6.scsi.offset = 512 * 6;
	task6.scsi.offset = 512 * 5;
	task6.scsi.length = 512;
	task6.scsi.status = SPDK_SCSI_STATUS_GOOD;

@@ -344,17 +325,17 @@ propagate_scsi_error_status_for_split_read_tasks(void)
	process_read_task_completion(NULL, &task3, &primary);
	process_read_task_completion(NULL, &task2, &primary);
	process_read_task_completion(NULL, &task1, &primary);
	process_read_task_completion(NULL, &primary, &primary);
	process_read_task_completion(NULL, &task5, &primary);
	process_read_task_completion(NULL, &task6, &primary);

	CU_ASSERT(primary.scsi.status == SPDK_SCSI_STATUS_CHECK_CONDITION);
	CU_ASSERT(primary.rsp_scsi_status == SPDK_SCSI_STATUS_CHECK_CONDITION);
	CU_ASSERT(task1.scsi.status == SPDK_SCSI_STATUS_CHECK_CONDITION);
	CU_ASSERT(task2.scsi.status == SPDK_SCSI_STATUS_CHECK_CONDITION);
	CU_ASSERT(task3.scsi.status == SPDK_SCSI_STATUS_CHECK_CONDITION);
	CU_ASSERT(task4.scsi.status == SPDK_SCSI_STATUS_CHECK_CONDITION);
	CU_ASSERT(task5.scsi.status == SPDK_SCSI_STATUS_CHECK_CONDITION);
	CU_ASSERT(task6.scsi.status == SPDK_SCSI_STATUS_CHECK_CONDITION);
	CU_ASSERT(primary.bytes_completed == primary.scsi.transfer_len);
	CU_ASSERT(TAILQ_EMPTY(&primary.subtask_list));
}

+39 −25
Original line number Diff line number Diff line
@@ -1104,73 +1104,79 @@ clear_all_transfer_tasks_test(void)
static void
abort_queued_datain_task_test(void)
{
	struct spdk_iscsi_conn conn;
	struct spdk_iscsi_conn conn = {};
	struct spdk_iscsi_task *task, *task2, *task3;
	int rc;

	TAILQ_INIT(&conn.queued_datain_tasks);

	/* Case1: Queue one task, and this task is not executed */
	task = spdk_iscsi_task_get(&conn, NULL, NULL);
	SPDK_CU_ASSERT_FATAL(task != NULL);
	TAILQ_INSERT_TAIL(&conn.queued_datain_tasks, task, link);
	task->scsi.dxfer_dir = SPDK_SCSI_DIR_FROM_DEV;

	/* Slot of data in tasks are full */
	/* No slots for sub read tasks */
	conn.data_in_cnt = MAX_LARGE_DATAIN_PER_CONNECTION;
	task->scsi.transfer_len = SPDK_BDEV_LARGE_BUF_MAX_SIZE * 3;
	task->scsi.offset = 0;
	TAILQ_INSERT_TAIL(&conn.queued_datain_tasks, task, link);

	rc = _iscsi_conn_abort_queued_datain_task(&conn, task);
	CU_ASSERT(rc != 0);
	CU_ASSERT(!TAILQ_EMPTY(&conn.queued_datain_tasks));
	assert(conn.data_in_cnt == MAX_LARGE_DATAIN_PER_CONNECTION);

	/* Only one slot remains and no subtasks are submitted yet. */
	conn.data_in_cnt--;
	task->current_datain_offset = 0;

	/* havs slots for sub read tasks */
	conn.data_in_cnt = 0;
	rc = _iscsi_conn_abort_queued_datain_task(&conn, task);
	CU_ASSERT(rc == 0);
	CU_ASSERT(TAILQ_EMPTY(&conn.queued_datain_tasks));
	assert(conn.data_in_cnt == 0);

	task = spdk_iscsi_task_get(&conn, NULL, NULL);
	SPDK_CU_ASSERT_FATAL(task != NULL);
	TAILQ_INSERT_TAIL(&conn.queued_datain_tasks, task, link);

	/* Only one slot remains and a subtask is submitted. */
	/* Case2: Queue one task, and this task is partially executed,
	 * and the slot of sub read tasks are full
	 */
	conn.data_in_cnt = MAX_LARGE_DATAIN_PER_CONNECTION;
	task->scsi.transfer_len = SPDK_BDEV_LARGE_BUF_MAX_SIZE * 3;
	task->current_datain_offset = SPDK_BDEV_LARGE_BUF_MAX_SIZE;
	TAILQ_INSERT_TAIL(&conn.queued_datain_tasks, task, link);

	rc = _iscsi_conn_abort_queued_datain_task(&conn, task);
	CU_ASSERT(rc != 0);
	CU_ASSERT(task->current_datain_offset == SPDK_BDEV_LARGE_BUF_MAX_SIZE * 2);
	CU_ASSERT(conn.data_in_cnt == MAX_LARGE_DATAIN_PER_CONNECTION);

	/* Additional one slot becomes vacant. */
	conn.data_in_cnt--;

	/* Case3: Queue one task, and this task is partially executed,
	 * and the slot of sub read tasks is not full.
	 */
	conn.data_in_cnt = MAX_LARGE_DATAIN_PER_CONNECTION - 2;
	rc = _iscsi_conn_abort_queued_datain_task(&conn, task);
	CU_ASSERT(rc == 0);
	assert(conn.data_in_cnt == MAX_LARGE_DATAIN_PER_CONNECTION - 2);
	CU_ASSERT(TAILQ_EMPTY(&conn.queued_datain_tasks));

	spdk_iscsi_task_cpl(&task->scsi);
	/* Case4: Queue three tasks and abort each task sequentially */
	conn.data_in_cnt = 0;

	/* Queue three data in tasks and abort each task sequentially */
	task = spdk_iscsi_task_get(&conn, NULL, NULL);
	SPDK_CU_ASSERT_FATAL(task != NULL);
	task->tag = 1;
	task->scsi.transfer_len = SPDK_BDEV_LARGE_BUF_MAX_SIZE * 3;
	task->current_datain_offset = 0;
	TAILQ_INSERT_TAIL(&conn.queued_datain_tasks, task, link);

	task2 = spdk_iscsi_task_get(&conn, NULL, NULL);
	SPDK_CU_ASSERT_FATAL(task2 != NULL);
	task2->tag = 2;
	task2->current_datain_offset = 0;
	task2->scsi.dxfer_dir = SPDK_SCSI_DIR_FROM_DEV;
	task2->scsi.transfer_len = SPDK_BDEV_LARGE_BUF_MAX_SIZE * 4;
	task2->current_datain_offset = SPDK_BDEV_LARGE_BUF_MAX_SIZE;
	TAILQ_INSERT_TAIL(&conn.queued_datain_tasks, task2, link);

	task3 = spdk_iscsi_task_get(&conn, NULL, NULL);
	SPDK_CU_ASSERT_FATAL(task3 != NULL);
	task3->tag = 3;
	task3->current_datain_offset = 0;
	task3->scsi.dxfer_dir = SPDK_SCSI_DIR_FROM_DEV;
	task3->scsi.transfer_len = SPDK_BDEV_LARGE_BUF_MAX_SIZE * 5;
	task3->current_datain_offset = SPDK_BDEV_LARGE_BUF_MAX_SIZE * 2;
	TAILQ_INSERT_TAIL(&conn.queued_datain_tasks, task3, link);

	conn.data_in_cnt--;

	rc = iscsi_conn_abort_queued_datain_task(&conn, 1);
	CU_ASSERT(rc == 0);

@@ -1181,6 +1187,11 @@ abort_queued_datain_task_test(void)
	CU_ASSERT(rc == 0);

	CU_ASSERT(TAILQ_EMPTY(&conn.queued_datain_tasks));
	assert(conn.data_in_cnt == 0);

	spdk_iscsi_task_cpl(&task->scsi);
	spdk_iscsi_task_cpl(&task2->scsi);
	spdk_iscsi_task_cpl(&task3->scsi);
}

static bool
@@ -1306,6 +1317,7 @@ abort_queued_datain_tasks_test(void)
	CU_ASSERT(datain_task_is_queued(&conn, task4));
	CU_ASSERT(datain_task_is_queued(&conn, task5));
	CU_ASSERT(datain_task_is_queued(&conn, task6));
	spdk_iscsi_task_cpl(&task1->scsi);

	rc = iscsi_conn_abort_queued_datain_tasks(&conn, &lun2, mgmt_pdu2);
	CU_ASSERT(rc == 0);
@@ -1314,6 +1326,8 @@ abort_queued_datain_tasks_test(void)
	CU_ASSERT(!datain_task_is_queued(&conn, task4));
	CU_ASSERT(datain_task_is_queued(&conn, task5));
	CU_ASSERT(datain_task_is_queued(&conn, task6));
	spdk_iscsi_task_cpl(&task2->scsi);
	spdk_iscsi_task_cpl(&task4->scsi);

	TAILQ_FOREACH_SAFE(task, &conn.queued_datain_tasks, link, tmp) {
		TAILQ_REMOVE(&conn.queued_datain_tasks, task, link);