Commit 6716729c authored by Shuhei Matsumoto's avatar Shuhei Matsumoto Committed by Changpeng Liu
Browse files

dif: Create iovec array so that a space for metadata is left for each block



This patch adds an API spdk_dif_set_md_interleave_iovs().
This function is used to leave a space for metadata for each block
when the network socket reads data, or to make the network socket
ignore a space for metadata for each block when the network socket
writes data. This function removes the necessity of data copy in
the SPDK application during DIF insertion and strip.

Change-Id: I018efd77bdadcaec4679eea4dc1e404f155b0879
Signed-off-by: default avatarShuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/446216


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarChangpeng Liu <changpeng.liu@intel.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-by: default avatarwuzhouhui <wuzhouhui@kingsoft.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent adc8da4a
Loading
Loading
Loading
Loading
+28 −0
Original line number Diff line number Diff line
@@ -257,4 +257,32 @@ int spdk_dix_verify(struct iovec *iovs, int iovcnt, struct iovec *md_iov,
int spdk_dix_inject_error(struct iovec *iovs, int iovcnt, struct iovec *md_iov,
			  uint32_t num_blocks, const struct spdk_dif_ctx *ctx,
			  uint32_t inject_flags, uint32_t *inject_offset);

/**
 * Setup iovec array to leave a space for metadata for each block.
 *
 * This function is used to leave a space for metadata for each block when
 * the network socket reads data, or to make the network socket ignore a
 * space for metadata for each block when the network socket writes data.
 * This function removes the necessity of data copy in the SPDK application
 * during DIF insertion and strip.
 *
 * \param iovs iovec array set by this function.
 * \param num_iovs Number of elements in the iovec array.
 * \param buf Buffer to create extended LBA payload.
 * \param buf_len Length of the buffer to create extended LBA payload.
 * \param data_offset Offset to store the next incoming data.
 * \param data_len Expected data length of the payload.
 * \param mapped_len Output parameter that will contain data length mapped by
 * the iovec array.
 * \param ctx DIF context.
 *
 * \return Number of used elements in the iovec array on success or negated
 * errno otherwise.
 */
int spdk_dif_set_md_interleave_iovs(struct iovec *iovs, int num_iovs,
				    uint8_t *buf, uint32_t buf_len,
				    uint32_t data_offset, uint32_t data_len,
				    uint32_t *mapped_len,
				    const struct spdk_dif_ctx *ctx);
#endif /* SPDK_DIF_H */
+73 −0
Original line number Diff line number Diff line
@@ -1297,3 +1297,76 @@ spdk_dix_inject_error(struct iovec *iovs, int iovcnt, struct iovec *md_iov,

	return 0;
}

int
spdk_dif_set_md_interleave_iovs(struct iovec *iovs, int num_iovs,
				uint8_t *buf, uint32_t buf_len,
				uint32_t data_offset, uint32_t data_len,
				uint32_t *_mapped_len,
				const struct spdk_dif_ctx *ctx)
{
	uint32_t data_block_size, head_unalign, mapped_len = 0;
	uint32_t num_blocks, offset_blocks;
	struct iovec *iov = iovs;
	int iovcnt = 0;

	if (iovs == NULL || num_iovs == 0) {
		return -EINVAL;
	}

	data_block_size = ctx->block_size - ctx->md_size;

	if ((data_len % data_block_size) != 0) {
		SPDK_ERRLOG("Data length must be a multiple of data block size\n");
		return -EINVAL;
	}

	if (data_offset >= data_len) {
		SPDK_ERRLOG("Data offset must be smaller than data length\n");
		return -ERANGE;
	}

	num_blocks = data_len / data_block_size;

	if (buf_len < num_blocks * ctx->block_size) {
		SPDK_ERRLOG("Buffer overflow will occur. Buffer size is %" PRIu32 " but"
			    " necessary size is %" PRIu32 "\n",
			    buf_len, num_blocks * ctx->block_size);
		return -ERANGE;
	}

	offset_blocks = data_offset / data_block_size;
	head_unalign = data_offset % data_block_size;

	buf += offset_blocks * ctx->block_size;

	if (head_unalign != 0) {
		buf += head_unalign;

		iov->iov_base = buf;
		iov->iov_len = data_block_size - head_unalign;
		mapped_len += data_block_size - head_unalign;
		iov++;
		iovcnt++;

		buf += ctx->block_size - head_unalign;
		offset_blocks++;
	}

	while (offset_blocks < num_blocks && iovcnt < num_iovs) {
		iov->iov_base = buf;
		iov->iov_len = data_block_size;
		mapped_len += data_block_size;
		iov++;
		iovcnt++;

		buf += ctx->block_size;
		offset_blocks++;
	}

	if (_mapped_len != NULL) {
		*_mapped_len = mapped_len;
	}

	return iovcnt;
}
+135 −1
Original line number Diff line number Diff line
@@ -136,6 +136,19 @@ _iov_free_buf(struct iovec *iov)
	free(iov->iov_base);
}

static void
_iov_set_buf(struct iovec *iov, uint8_t *buf, uint32_t buf_len)
{
	iov->iov_base = buf;
	iov->iov_len = buf_len;
}

static bool
_iov_check(struct iovec *iov, void *iov_base, uint32_t iov_len)
{
	return (iov->iov_base == iov_base && iov->iov_len == iov_len);
}

static void
_dif_generate_and_verify(struct iovec *iov,
			 uint32_t block_size, uint32_t md_size, bool dif_loc,
@@ -1349,6 +1362,126 @@ dix_sec_4096_md_128_inject_1_2_4_8_multi_iovs_split_test(void)
	_iov_free_buf(&md_iov);
}

static int
ut_readv(uint32_t read_base, uint32_t read_len, struct iovec *iovs, int iovcnt)
{
	int i;
	uint32_t j, offset;
	uint8_t *buf;

	offset = 0;
	for (i = 0; i < iovcnt; i++) {
		buf = iovs[i].iov_base;
		for (j = 0; j < iovs[i].iov_len; j++, offset++) {
			if (offset >= read_len) {
				return offset;
			}
			buf[j] = DATA_PATTERN(read_base + offset);
		}
	}

	return offset;
}

static void
set_md_interleave_iovs_test(void)
{
	struct spdk_dif_ctx ctx = {};
	struct spdk_dif_error err_blk = {};
	struct iovec iov1, iov2, dif_iovs[4];
	uint32_t dif_check_flags, mapped_len = 0, read_base = 0;
	uint8_t *buf1, *buf2;
	int rc;

	dif_check_flags = SPDK_DIF_FLAGS_GUARD_CHECK | SPDK_DIF_FLAGS_APPTAG_CHECK |
			  SPDK_DIF_FLAGS_REFTAG_CHECK;

	rc = spdk_dif_ctx_init(&ctx, 4096 + 128, 128, true, false, SPDK_DIF_TYPE1,
			       dif_check_flags, 22, 0xFFFF, 0x22, GUARD_SEED);
	CU_ASSERT(rc == 0);

	/* The first data buffer:
	 * - Create iovec array to Leave a space for metadata for each block
	 * - Split vectored read and so creating iovec array is done before every vectored read.
	 */
	buf1 = calloc(1, (4096 + 128) * 4);
	SPDK_CU_ASSERT_FATAL(buf1 != NULL);
	_iov_set_buf(&iov1, buf1, (4096 + 128) * 4);

	rc = spdk_dif_set_md_interleave_iovs(dif_iovs, 4, buf1, (4096 + 128) * 4,
					     0, 4096 * 4, &mapped_len, &ctx);
	CU_ASSERT(rc == 4);
	CU_ASSERT(mapped_len == 4096 * 4);
	CU_ASSERT(_iov_check(&dif_iovs[0], buf1, 4096) == true);
	CU_ASSERT(_iov_check(&dif_iovs[1], buf1 + 4096 + 128, 4096) == true);
	CU_ASSERT(_iov_check(&dif_iovs[2], buf1 + (4096 + 128) * 2, 4096) == true);
	CU_ASSERT(_iov_check(&dif_iovs[3], buf1 + (4096 + 128) * 3, 4096) == true);

	read_base = ut_readv(0, 1024, dif_iovs, 4);
	CU_ASSERT(read_base == 1024);

	rc = spdk_dif_set_md_interleave_iovs(dif_iovs, 4, buf1, (4096 + 128) * 4,
					     read_base, 4096 * 4, &mapped_len, &ctx);
	CU_ASSERT(rc == 4);
	CU_ASSERT(mapped_len == 3072 + 4096 * 3);
	CU_ASSERT(_iov_check(&dif_iovs[0], buf1 + 1024, 3072) == true);
	CU_ASSERT(_iov_check(&dif_iovs[1], buf1 + 4096 + 128, 4096) == true);
	CU_ASSERT(_iov_check(&dif_iovs[2], buf1 + (4096 + 128) * 2, 4096) == true);
	CU_ASSERT(_iov_check(&dif_iovs[3], buf1 + (4096 + 128) * 3, 4096) == true);

	read_base += ut_readv(read_base, 3071, dif_iovs, 4);
	CU_ASSERT(read_base == 4095);

	rc = spdk_dif_set_md_interleave_iovs(dif_iovs, 4, buf1, (4096 + 128) * 4,
					     read_base, 4096 * 4, &mapped_len, &ctx);
	CU_ASSERT(rc == 4);
	CU_ASSERT(mapped_len == 1 + 4096 * 3);
	CU_ASSERT(_iov_check(&dif_iovs[0], buf1 + 4095, 1) == true);
	CU_ASSERT(_iov_check(&dif_iovs[1], buf1 + 4096 + 128, 4096) == true);
	CU_ASSERT(_iov_check(&dif_iovs[2], buf1 + (4096 + 128) * 2, 4096) == true);
	CU_ASSERT(_iov_check(&dif_iovs[3], buf1 + (4096 + 128) * 3, 4096) == true);

	read_base += ut_readv(read_base, 1 + 4096 * 2 + 512, dif_iovs, 4);
	CU_ASSERT(read_base == 4096 * 3 + 512);

	rc = spdk_dif_set_md_interleave_iovs(dif_iovs, 4, buf1, (4096 + 128) * 4,
					     read_base, 4096 * 4, &mapped_len, &ctx);
	CU_ASSERT(rc == 1);
	CU_ASSERT(mapped_len == 3584);
	CU_ASSERT(_iov_check(&dif_iovs[0], buf1 + (4096 + 128) * 3 + 512, 3584) == true);

	read_base += ut_readv(read_base, 3584, dif_iovs, 1);
	CU_ASSERT(read_base == 4096 * 4);

	rc = spdk_dif_generate(&iov1, 1, 4, &ctx);
	CU_ASSERT(rc == 0);

	/* The second data buffer:
	 * - Set data pattern with a space for metadata for each block.
	 */
	buf2 = calloc(1, (4096 + 128) * 4);
	SPDK_CU_ASSERT_FATAL(buf2 != NULL);
	_iov_set_buf(&iov2, buf2, (4096 + 128) * 4);

	rc = ut_data_pattern_generate(&iov2, 1, 4096 + 128, 128, 4);
	CU_ASSERT(rc == 0);
	rc = spdk_dif_generate(&iov2, 1, 4, &ctx);
	CU_ASSERT(rc == 0);

	rc = spdk_dif_verify(&iov1, 1, 4, &ctx, &err_blk);
	CU_ASSERT(rc == 0);

	rc = spdk_dif_verify(&iov2, 1, 4, &ctx, &err_blk);
	CU_ASSERT(rc == 0);

	/* Compare the first and the second data buffer by byte. */
	rc = memcmp(buf1, buf2, (4096 + 128) * 4);
	CU_ASSERT(rc == 0);

	free(buf1);
	free(buf2);
}

int
main(int argc, char **argv)
{
@@ -1432,7 +1565,8 @@ main(int argc, char **argv)
		CU_add_test(suite, "dix_sec_4096_md_128_inject_1_2_4_8_multi_iovs_test",
			    dix_sec_4096_md_128_inject_1_2_4_8_multi_iovs_test) == NULL ||
		CU_add_test(suite, "dix_sec_4096_md_128_inject_1_2_4_8_multi_iovs_split_test",
			    dix_sec_4096_md_128_inject_1_2_4_8_multi_iovs_split_test) == NULL
			    dix_sec_4096_md_128_inject_1_2_4_8_multi_iovs_split_test) == NULL ||
		CU_add_test(suite, "set_md_interleave_iovs_test", set_md_interleave_iovs_test) == NULL
	) {
		CU_cleanup_registry();
		return CU_get_error();