Commit 42109157 authored by Tomasz Zawadzki's avatar Tomasz Zawadzki
Browse files

lib/blob: add starting cluster index to extent page



Size of a blob (thus size of clusters array in mutable data)
is known from extent table descriptor.
Extent pages were read sequentially in order they were
placed in extent table. This meant that cluster
array could have been filled up from beginning to end.
Yet reading extent pages in any other order,
would result in incorrect placement of clusters.

This patch adds first cluster index that is contained within
each extent page. This will allow to read/write
multiple extent pages in parallel, since
we will know where in clusters array to put the cluster idxs.

Signed-off-by: default avatarTomasz Zawadzki <tomasz.zawadzki@intel.com>
Change-Id: Ib6b9332111cd93f990d057dc60624152907dd87f
Reviewed-on: https://review.gerrithub.io/c/spdk/spdk/+/482701


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarBen Walker <benjamin.walker@intel.com>
Reviewed-by: default avatarPaul Luse <paul.e.luse@intel.com>
Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
parent c85a3d10
Loading
Loading
Loading
Loading
+32 −12
Original line number Diff line number Diff line
@@ -653,6 +653,7 @@ _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *bl
			struct spdk_blob_md_descriptor_extent_page	*desc_extent;
			unsigned int					i;
			unsigned int					cluster_count = blob->active.num_clusters;
			size_t						cluster_idx_length;

			if (blob->extent_rle_found) {
				/* This means that Extent RLE is present in MD,
@@ -661,13 +662,14 @@ _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *bl
			}

			desc_extent = (struct spdk_blob_md_descriptor_extent_page *)desc;
			cluster_idx_length = desc_extent->length - sizeof(desc_extent->start_cluster_idx);

			if (desc_extent->length == 0 ||
			    (desc_extent->length % sizeof(desc_extent->cluster_idx[0]) != 0)) {
			if (desc_extent->length <= sizeof(desc_extent->start_cluster_idx) ||
			    (cluster_idx_length % sizeof(desc_extent->cluster_idx[0]) != 0)) {
				return -EINVAL;
			}

			for (i = 0; i < desc_extent->length / sizeof(desc_extent->cluster_idx[0]); i++) {
			for (i = 0; i < cluster_idx_length / sizeof(desc_extent->cluster_idx[0]); i++) {
				if (desc_extent->cluster_idx[i] != 0) {
					if (!spdk_bit_array_get(blob->bs->used_clusters, desc_extent->cluster_idx[i])) {
						return -EINVAL;
@@ -679,6 +681,14 @@ _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *bl
			if (cluster_count == 0) {
				return -EINVAL;
			}

			/* When reading extent pages sequentially starting cluster idx should match
			 * current size of a blob.
			 * If changed to batch reading, this check shall be removed. */
			if (desc_extent->start_cluster_idx != blob->active.num_clusters) {
				return -EINVAL;
			}

			tmp = realloc(blob->active.clusters, cluster_count * sizeof(*blob->active.clusters));
			if (tmp == NULL) {
				return -ENOMEM;
@@ -686,7 +696,7 @@ _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *bl
			blob->active.clusters = tmp;
			blob->active.cluster_array_size = cluster_count;

			for (i = 0; i < desc_extent->length / sizeof(desc_extent->cluster_idx[0]); i++) {
			for (i = 0; i < cluster_idx_length / sizeof(desc_extent->cluster_idx[0]); i++) {
				if (desc_extent->cluster_idx[i] != 0) {
					blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
							desc_extent->cluster_idx[i]);
@@ -696,6 +706,7 @@ _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *bl
					return -EINVAL;
				}
			}
			assert(desc_extent->start_cluster_idx + cluster_count == blob->active.num_clusters);
			assert(blob->remaining_clusters_in_et >= cluster_count);
			blob->remaining_clusters_in_et -= cluster_count;
		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
@@ -1059,21 +1070,24 @@ _spdk_blob_serialize_extent_page(const struct spdk_blob *blob,
	struct spdk_blob_md_descriptor_extent_page *desc_extent;
	uint64_t i, extent_idx;
	uint64_t lba, lba_per_cluster;
	uint64_t start_cluster = (cluster / SPDK_EXTENTS_PER_EP) * SPDK_EXTENTS_PER_EP;
	uint64_t end_cluster = spdk_min(start_cluster + SPDK_EXTENTS_PER_EP, blob->active.num_clusters);
	uint64_t start_cluster_idx = (cluster / SPDK_EXTENTS_PER_EP) * SPDK_EXTENTS_PER_EP;

	desc_extent = (struct spdk_blob_md_descriptor_extent_page *) page->descriptors;
	desc_extent->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT_PAGE;

	lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);

	desc_extent->start_cluster_idx = start_cluster_idx;
	extent_idx = 0;
	for (i = start_cluster; i < end_cluster; i++) {
	for (i = start_cluster_idx; i < blob->active.num_clusters; i++) {
		lba = blob->active.clusters[i];
		desc_extent->cluster_idx[extent_idx++] = lba / lba_per_cluster;
		if (extent_idx >= SPDK_EXTENTS_PER_EP) {
			break;
		}

	desc_extent->length = sizeof(desc_extent->cluster_idx[0]) * extent_idx;
	}
	desc_extent->length = sizeof(desc_extent->start_cluster_idx) +
			      sizeof(desc_extent->cluster_idx[0]) * extent_idx;
}

static void
@@ -3621,21 +3635,27 @@ _spdk_bs_load_replay_md_parse_page(struct spdk_bs_load_ctx *ctx)
			uint32_t					i;
			uint32_t					cluster_count = 0;
			uint32_t					cluster_idx;
			size_t						cluster_idx_length;

			desc_extent = (struct spdk_blob_md_descriptor_extent_page *)desc;
			cluster_idx_length = desc_extent->length - sizeof(desc_extent->start_cluster_idx);

			if (desc_extent->length == 0 ||
			    (desc_extent->length % sizeof(desc_extent->cluster_idx[0]) != 0)) {
			if (desc_extent->length <= sizeof(desc_extent->start_cluster_idx) ||
			    (cluster_idx_length % sizeof(desc_extent->cluster_idx[0]) != 0)) {
				return -EINVAL;
			}

			for (i = 0; i < desc_extent->length / sizeof(desc_extent->cluster_idx[0]); i++) {
			for (i = 0; i < cluster_idx_length / sizeof(desc_extent->cluster_idx[0]); i++) {
				cluster_idx = desc_extent->cluster_idx[i];
				/*
				 * cluster_idx = 0 means an unallocated cluster - don't mark that
				 * in the used cluster map.
				 */
				if (cluster_idx != 0) {
					if (cluster_idx < desc_extent->start_cluster_idx &&
					    cluster_idx >= desc_extent->start_cluster_idx + cluster_count) {
						return -EINVAL;
					}
					spdk_bit_array_set(bs->used_clusters, cluster_idx);
					if (bs->num_free_clusters == 0) {
						return -ENOSPC;
+3 −1
Original line number Diff line number Diff line
@@ -323,7 +323,9 @@ struct spdk_blob_md_descriptor_extent_page {
	uint8_t		type;
	uint32_t	length;

	/* TODO: add indicator for ranges of clusters in this EP */
	/* First cluster index in this extent page */
	uint32_t	start_cluster_idx;

	uint32_t        cluster_idx[0];
};