Commit 16daee89 authored by Yankun Li's avatar Yankun Li Committed by Konrad Sztyber
Browse files

lib/reduce: Add cache that holds chunks and blocks



Selecting an available chunk or backing block from the bit array
(such as allocated_chunk_maps or allocated_backing_io_units)
always starts from the beginning. This is inefficient, especially
with large disk spaces, leading to high CPU consumption and degraded
I/O performance.

This patch introduces free_chunks_queue and free_backing_blocks_queue
queues, which temporarily cache reclaimed chunks and backing blocks.
If the cache is empty, look for a free chunk or backing block starting
from find_chunk/block_offset in the bit array instead of starting from 0.

Change-Id: Ie4c892122063cb4b5d649e13880fbafe34ec2c3d
Signed-off-by: default avatarYankun Li <845245370@qq.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/24745


Reviewed-by: default avatarJim Harris <jim.harris@samsung.com>
Community-CI: Broadcom CI <spdk-ci.pdl@broadcom.com>
Community-CI: Mellanox Build Bot
Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
parent 01b28622
Loading
Loading
Loading
Loading
+68 −0
Original line number Diff line number Diff line
/*   SPDX-License-Identifier: BSD-3-Clause
 *   Copyright (C) 2018 Intel Corporation.
 *   All rights reserved.
 *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 */

#ifndef __REDUCE_STACK_H_
#define __REDUCE_STACK_H_

#include "spdk/stdinc.h"

#define REDUCE_QUEUE_CAPACITY_SIZE 32

struct reduce_queue {
	uint64_t items[REDUCE_QUEUE_CAPACITY_SIZE];
	uint32_t head;
	uint32_t tail;
};

static inline void
queue_init(struct reduce_queue *queue)
{
	queue->head = queue->tail = 0;
}

static inline bool
queue_empty(struct reduce_queue *queue)
{
	return queue->head == queue->tail;
}

static inline bool
queue_full(struct reduce_queue *queue)
{
	return (queue->head == ((queue->tail + 1) % REDUCE_QUEUE_CAPACITY_SIZE));
}

static inline bool
queue_enqueue(struct reduce_queue *queue, uint64_t value)
{
	if (queue_full(queue)) {
		return false;
	}

	queue->items[queue->tail] = value;
	queue->tail = (queue->tail + 1) % REDUCE_QUEUE_CAPACITY_SIZE;
	return true;
}

static inline bool
queue_dequeue(struct reduce_queue *queue, uint64_t *value)
{
	if (queue_empty(queue)) {
		return false;
	}

	*value = queue->items[queue->head];
	queue->head = (queue->head + 1) % REDUCE_QUEUE_CAPACITY_SIZE;
	return true;
}

static inline uint32_t
queue_size(struct reduce_queue *queue)
{
	return (queue->tail + REDUCE_QUEUE_CAPACITY_SIZE - queue->head) % REDUCE_QUEUE_CAPACITY_SIZE;
}

#endif /* __REDUCE_STACK_H_ */
+50 −8
Original line number Diff line number Diff line
@@ -6,6 +6,8 @@

#include "spdk/stdinc.h"

#include "queue_internal.h"

#include "spdk/reduce.h"
#include "spdk/env.h"
#include "spdk/string.h"
@@ -126,7 +128,15 @@ struct spdk_reduce_vol {
	uint64_t				*pm_chunk_maps;

	struct spdk_bit_array			*allocated_chunk_maps;
	/* The starting position when looking for a block from allocated_chunk_maps */
	uint64_t				find_chunk_offset;
	/* Cache free chunks to speed up lookup of free chunk. */
	struct reduce_queue			free_chunks_queue;
	struct spdk_bit_array			*allocated_backing_io_units;
	/* The starting position when looking for a block from allocated_backing_io_units */
	uint64_t				find_block_offset;
	/* Cache free blocks for backing bdev to speed up lookup of free backing blocks. */
	struct reduce_queue			free_backing_blocks_queue;

	struct spdk_reduce_vol_request		*request_mem;
	TAILQ_HEAD(, spdk_reduce_vol_request)	free_requests;
@@ -554,8 +564,10 @@ _allocate_bit_arrays(struct spdk_reduce_vol *vol)

	total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
	vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
	vol->find_chunk_offset = 0;
	total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
	vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
	vol->find_block_offset = 0;

	if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
		return -ENOMEM;
@@ -631,6 +643,8 @@ spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
	TAILQ_INIT(&vol->free_requests);
	TAILQ_INIT(&vol->executing_requests);
	TAILQ_INIT(&vol->queued_requests);
	queue_init(&vol->free_chunks_queue);
	queue_init(&vol->free_backing_blocks_queue);

	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL,
					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
@@ -873,6 +887,8 @@ spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
	TAILQ_INIT(&vol->free_requests);
	TAILQ_INIT(&vol->executing_requests);
	TAILQ_INIT(&vol->queued_requests);
	queue_init(&vol->free_chunks_queue);
	queue_init(&vol->free_backing_blocks_queue);

	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL,
					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
@@ -1101,18 +1117,29 @@ static void
_reduce_vol_reset_chunk(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
{
	struct spdk_reduce_chunk_map *chunk;
	uint64_t index;
	bool success;
	uint32_t i;

	chunk = _reduce_vol_get_chunk_map(vol, chunk_map_index);
	for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
		if (chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) {
		index = chunk->io_unit_index[i];
		if (index == REDUCE_EMPTY_MAP_ENTRY) {
			break;
		}
		assert(spdk_bit_array_get(vol->allocated_backing_io_units,
					  chunk->io_unit_index[i]) == true);
		spdk_bit_array_clear(vol->allocated_backing_io_units, chunk->io_unit_index[i]);
					  index) == true);
		spdk_bit_array_clear(vol->allocated_backing_io_units, index);
		success = queue_enqueue(&vol->free_backing_blocks_queue, index);
		if (!success && index < vol->find_block_offset) {
			vol->find_block_offset = index;
		}
		chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY;
	}
	success = queue_enqueue(&vol->free_chunks_queue, chunk_map_index);
	if (!success && chunk_map_index < vol->find_chunk_offset) {
		vol->find_chunk_offset = chunk_map_index;
	}
	spdk_bit_array_clear(vol->allocated_chunk_maps, chunk_map_index);
}

@@ -1299,11 +1326,19 @@ _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn n
{
	struct spdk_reduce_vol *vol = req->vol;
	uint32_t i;
	uint64_t chunk_offset, remainder, total_len = 0;
	uint64_t chunk_offset, remainder, free_index, total_len = 0;
	uint8_t *buf;
	bool success;
	int j;

	req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0);
	success = queue_dequeue(&vol->free_chunks_queue, &free_index);
	if (success) {
		req->chunk_map_index = free_index;
	} else {
		req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps,
				       vol->find_chunk_offset);
		vol->find_chunk_offset = req->chunk_map_index + 1;
	}

	/* TODO: fail if no chunk map found - but really this should not happen if we
	 * size the number of requests similarly to number of extra chunk maps
@@ -1347,7 +1382,14 @@ _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn n
	}

	for (i = 0; i < req->num_io_units; i++) {
		req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0);
		success = queue_dequeue(&vol->free_backing_blocks_queue, &free_index);
		if (success) {
			req->chunk->io_unit_index[i] = free_index;
		} else {
			req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units,
						       vol->find_block_offset);
			vol->find_block_offset = req->chunk->io_unit_index[i] + 1;
		}
		/* TODO: fail if no backing block found - but really this should also not
		 * happen (see comment above).
		 */
+1 −1
Original line number Diff line number Diff line
@@ -6,7 +6,7 @@
SPDK_ROOT_DIR := $(abspath $(CURDIR)/../../../..)
include $(SPDK_ROOT_DIR)/mk/spdk.common.mk

DIRS-y = reduce.c
DIRS-y = reduce.c queue_internal.h

.PHONY: all clean $(DIRS-y)

+10 −0
Original line number Diff line number Diff line
#  SPDX-License-Identifier: BSD-3-Clause
#  Copyright (C) 2016 Intel Corporation.
#  All rights reserved.
#

SPDK_ROOT_DIR := $(abspath $(CURDIR)/../../../../..)

TEST_FILE = queue_internal_ut.c

include $(SPDK_ROOT_DIR)/mk/spdk.unittest.mk
+99 −0
Original line number Diff line number Diff line
/*   SPDX-License-Identifier: BSD-3-Clause
 *   Copyright (C) 2016 Intel Corporation.
 *   All rights reserved.
 *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 */

#include "spdk/stdinc.h"

#include "spdk_internal/cunit.h"

#include "reduce/queue_internal.h"

static void
test_queue_create(void)
{
	struct reduce_queue queue;

	queue_init(&queue);
	CU_ASSERT(queue_empty(&queue) == true);
	CU_ASSERT(queue_full(&queue) == false);
	CU_ASSERT(queue_size(&queue) == 0);
}

static void
test_queue_enqueue_dequeue(void)
{
	int64_t value;
	struct reduce_queue queue;

	queue_init(&queue);

	CU_ASSERT(queue_enqueue(&queue, 10) == true);
	CU_ASSERT(queue_enqueue(&queue, 20) == true);
	CU_ASSERT(queue_enqueue(&queue, 30) == true);
	CU_ASSERT(queue_size(&queue) == 3);

	CU_ASSERT(queue_dequeue(&queue, &value) == true);
	CU_ASSERT(value == 10);
	CU_ASSERT(queue_size(&queue) == 2);

	CU_ASSERT(queue_dequeue(&queue, &value) == true);
	CU_ASSERT(value == 20);
	CU_ASSERT(queue_size(&queue) == 1);
}

static void
test_queue_full(void)
{
	int i;
	struct reduce_queue queue;

	queue_init(&queue);

	for (i = 1; i < REDUCE_QUEUE_CAPACITY_SIZE; i++) {
		CU_ASSERT(queue_enqueue(&queue, i) == true);
	}
	CU_ASSERT(queue_full(&queue) == true);

	CU_ASSERT(queue_enqueue(&queue, 40) == false);  /* Queue is full */
}

static void
test_queue_empty(void)
{
	int64_t value;

	struct reduce_queue queue;

	queue_init(&queue);

	CU_ASSERT(queue_empty(&queue) == true);
	CU_ASSERT(queue_enqueue(&queue, 10) == true);
	CU_ASSERT(queue_empty(&queue) == false);

	CU_ASSERT(queue_dequeue(&queue, &value) == true);
	CU_ASSERT(queue_empty(&queue) == true);
}

int
main(int argc, char **argv)
{
	CU_pSuite	suite = NULL;
	unsigned int	num_failures;

	CU_initialize_registry();

	suite = CU_add_suite("reduce_queue", NULL, NULL);

	CU_ADD_TEST(suite, test_queue_create);
	CU_ADD_TEST(suite, test_queue_enqueue_dequeue);
	CU_ADD_TEST(suite, test_queue_empty);
	CU_ADD_TEST(suite, test_queue_create);
	CU_ADD_TEST(suite, test_queue_full);

	num_failures = spdk_ut_run_tests(argc, argv, NULL);
	CU_cleanup_registry();

	return num_failures;
}