Commit 50d58ff3 authored by Artur Paszkiewicz's avatar Artur Paszkiewicz Committed by Tomasz Zawadzki
Browse files

module/raid: rebuild support



Add support for running a background "process" for performing the
rebuild I/O. In the future, different types of processes can be added,
like resync or migration.

The process performs its work inside a "window" - a range of LBAs of
predefined maximum size. The basic flow of the process is:
- lock (quiesce) the range
- submit process requests, handled by the raid module implementation
- completion of process requests
- update io channels - move the processed range offset
- unlock (unquiesce) the range
- repeat until all blocks are processed or the process is stopped

Change-Id: I6a89bf06791683cac2e178211efe7b66a34394eb
Signed-off-by: default avatarArtur Paszkiewicz <artur.paszkiewicz@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/18739


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Community-CI: Mellanox Build Bot
Reviewed-by: default avatarJim Harris <jim.harris@samsung.com>
Reviewed-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
Reviewed-by: default avatarLiYankun <845245370@qq.com>
parent 0ea6e36a
Loading
Loading
Loading
Loading
+748 −14
Original line number Diff line number Diff line
@@ -13,6 +13,10 @@
#include "spdk/json.h"
#include "spdk/likely.h"

#define RAID_OFFSET_BLOCKS_INVALID	UINT64_MAX
#define RAID_BDEV_PROCESS_WINDOW_SIZE	1024 * 1024
#define RAID_BDEV_PROCESS_MAX_QD	16

static bool g_shutdown_started = false;

/* List of all raid bdevs */
@@ -30,6 +34,36 @@ struct raid_bdev_io_channel {

	/* Private raid module IO channel */
	struct spdk_io_channel	*module_channel;

	/* Background process data */
	struct {
		uint64_t offset;
		struct spdk_io_channel *target_ch;
	} process;
};

enum raid_bdev_process_state {
	RAID_PROCESS_STATE_INIT,
	RAID_PROCESS_STATE_RUNNING,
	RAID_PROCESS_STATE_STOPPING,
	RAID_PROCESS_STATE_STOPPED,
};

struct raid_bdev_process {
	struct raid_bdev		*raid_bdev;
	enum raid_process_type		type;
	enum raid_bdev_process_state	state;
	struct spdk_thread		*thread;
	struct raid_bdev_io_channel	*raid_ch;
	TAILQ_HEAD(, raid_bdev_process_request) requests;
	uint64_t			max_window_size;
	uint64_t			window_size;
	uint64_t			window_remaining;
	int				window_status;
	uint64_t			window_offset;
	bool				window_range_locked;
	struct raid_base_bdev_info	*target;
	int				status;
};

static struct raid_bdev_module *
@@ -78,6 +112,36 @@ static int raid_bdev_init(void);
static void	raid_bdev_deconfigure(struct raid_bdev *raid_bdev,
				      raid_bdev_destruct_cb cb_fn, void *cb_arg);

static void
raid_bdev_ch_process_cleanup(struct raid_bdev_io_channel *raid_ch)
{
	raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;

	if (raid_ch->process.target_ch != NULL) {
		spdk_put_io_channel(raid_ch->process.target_ch);
		raid_ch->process.target_ch = NULL;
	}
}

static int
raid_bdev_ch_process_setup(struct raid_bdev_io_channel *raid_ch, struct raid_bdev_process *process)
{
	raid_ch->process.offset = process->window_offset;

	/* In the future we may have other types of processes which don't use a target bdev,
	 * like data scrubbing or strip size migration. Until then, expect that there always is
	 * a process target. */
	assert(process->target != NULL);

	raid_ch->process.target_ch = spdk_bdev_get_io_channel(process->target->desc);
	if (raid_ch->process.target_ch == NULL) {
		raid_bdev_ch_process_cleanup(raid_ch);
		return -ENOMEM;
	}

	return 0;
}

/*
 * brief:
 * raid_bdev_create_cb function is a cb function for raid bdev which creates the
@@ -95,14 +159,13 @@ raid_bdev_create_cb(void *io_device, void *ctx_buf)
	struct raid_bdev            *raid_bdev = io_device;
	struct raid_bdev_io_channel *raid_ch = ctx_buf;
	uint8_t i;
	int ret = 0;
	int ret = -ENOMEM;

	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_create_cb, %p\n", raid_ch);

	assert(raid_bdev != NULL);
	assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);


	raid_ch->base_channel = calloc(raid_bdev->num_base_bdevs, sizeof(struct spdk_io_channel *));
	if (!raid_ch->base_channel) {
		SPDK_ERRLOG("Unable to allocate base bdevs io channel\n");
@@ -115,37 +178,53 @@ raid_bdev_create_cb(void *io_device, void *ctx_buf)
		 * Get the spdk_io_channel for all the base bdevs. This is used during
		 * split logic to send the respective child bdev ios to respective base
		 * bdev io channel.
		 * Skip missing base bdevs and the process target, which should also be treated as
		 * missing until the process completes.
		 */
		if (raid_bdev->base_bdev_info[i].desc == NULL) {
		if (raid_bdev->base_bdev_info[i].desc == NULL ||
		    (raid_bdev->process != NULL && raid_bdev->process->target == &raid_bdev->base_bdev_info[i])) {
			continue;
		}
		raid_ch->base_channel[i] = spdk_bdev_get_io_channel(
						   raid_bdev->base_bdev_info[i].desc);
		if (!raid_ch->base_channel[i]) {
			SPDK_ERRLOG("Unable to create io channel for base bdev\n");
			ret = -ENOMEM;
			break;
			goto err;
		}
	}

	if (raid_bdev->process != NULL) {
		ret = raid_bdev_ch_process_setup(raid_ch, raid_bdev->process);
		if (ret != 0) {
			SPDK_ERRLOG("Failed to setup process io channel\n");
			goto err;
		}
	} else {
		raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
	}
	spdk_spin_unlock(&raid_bdev->base_bdev_lock);

	if (!ret && raid_bdev->module->get_io_channel) {
	if (raid_bdev->module->get_io_channel) {
		raid_ch->module_channel = raid_bdev->module->get_io_channel(raid_bdev);
		if (!raid_ch->module_channel) {
			SPDK_ERRLOG("Unable to create io channel for raid module\n");
			ret = -ENOMEM;
			goto err_unlocked;
		}
	}

	if (ret) {
	return 0;
err:
	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
err_unlocked:
	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
		if (raid_ch->base_channel[i] != NULL) {
			spdk_put_io_channel(raid_ch->base_channel[i]);
		}
	}
	free(raid_ch->base_channel);
		raid_ch->base_channel = NULL;
	}

	raid_bdev_ch_process_cleanup(raid_ch);

	return ret;
}

@@ -183,6 +262,8 @@ raid_bdev_destroy_cb(void *io_device, void *ctx_buf)
	}
	free(raid_ch->base_channel);
	raid_ch->base_channel = NULL;

	raid_bdev_ch_process_cleanup(raid_ch);
}

/*
@@ -674,6 +755,20 @@ raid_bdev_write_info_json(struct raid_bdev *raid_bdev, struct spdk_json_write_ct
	spdk_json_write_named_uint32(w, "num_base_bdevs_discovered", raid_bdev->num_base_bdevs_discovered);
	spdk_json_write_named_uint32(w, "num_base_bdevs_operational",
				     raid_bdev->num_base_bdevs_operational);
	if (raid_bdev->process) {
		struct raid_bdev_process *process = raid_bdev->process;
		uint64_t offset = process->window_offset;

		spdk_json_write_named_object_begin(w, "process");
		spdk_json_write_name(w, "type");
		spdk_json_write_string(w, raid_bdev_process_to_str(process->type));
		spdk_json_write_named_string(w, "target", process->target->name);
		spdk_json_write_named_object_begin(w, "progress");
		spdk_json_write_named_uint64(w, "blocks", offset);
		spdk_json_write_named_uint32(w, "percent", offset * 100.0 / raid_bdev->bdev.blockcnt);
		spdk_json_write_object_end(w);
		spdk_json_write_object_end(w);
	}
	spdk_json_write_name(w, "base_bdevs_list");
	spdk_json_write_array_begin(w);
	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
@@ -862,6 +957,12 @@ const char *g_raid_state_names[] = {
	[RAID_BDEV_STATE_MAX]		= NULL
};

static const char *g_raid_process_type_names[] = {
	[RAID_PROCESS_NONE]	= "none",
	[RAID_PROCESS_REBUILD]	= "rebuild",
	[RAID_PROCESS_MAX]	= NULL
};

/* We have to use the typedef in the function declaration to appease astyle. */
typedef enum raid_level raid_level_t;
typedef enum raid_bdev_state raid_bdev_state_t;
@@ -922,6 +1023,16 @@ raid_bdev_state_to_str(enum raid_bdev_state state)
	return g_raid_state_names[state];
}

const char *
raid_bdev_process_to_str(enum raid_process_type value)
{
	if (value >= RAID_PROCESS_MAX) {
		return "";
	}

	return g_raid_process_type_names[value];
}

/*
 * brief:
 * raid_bdev_fini_start is called when bdev layer is starting the
@@ -1733,12 +1844,627 @@ raid_bdev_delete(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn, void
	}
}

static void
raid_bdev_process_finish_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
{
	if (status != 0) {
		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after background process finished: %s\n",
			    raid_bdev->bdev.name, spdk_strerror(-status));
	}
}

static void
raid_bdev_process_finish_write_sb(void *ctx)
{
	struct raid_bdev *raid_bdev = ctx;
	struct raid_bdev_superblock *sb = raid_bdev->sb;
	struct raid_bdev_sb_base_bdev *sb_base_bdev;
	struct raid_base_bdev_info *base_info;
	uint8_t i;

	for (i = 0; i < sb->base_bdevs_size; i++) {
		sb_base_bdev = &sb->base_bdevs[i];

		if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED &&
		    sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
			base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
			if (base_info->is_configured) {
				sb_base_bdev->state = RAID_SB_BASE_BDEV_CONFIGURED;
				spdk_uuid_copy(&sb_base_bdev->uuid, &base_info->uuid);
			}
		}
	}

	raid_bdev_write_superblock(raid_bdev, raid_bdev_process_finish_write_sb_cb, NULL);
}

static void raid_bdev_process_free(struct raid_bdev_process *process);

static void
_raid_bdev_process_finish_done(void *ctx)
{
	struct raid_bdev_process *process = ctx;

	raid_bdev_process_free(process);

	spdk_thread_exit(spdk_get_thread());
}

static void
raid_bdev_process_finish_target_removed(void *ctx, int status)
{
	struct raid_bdev_process *process = ctx;

	if (status != 0) {
		SPDK_ERRLOG("Failed to remove target bdev: %s\n", spdk_strerror(-status));
	}

	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
}

static void
raid_bdev_process_finish_unquiesced(void *ctx, int status)
{
	struct raid_bdev_process *process = ctx;

	if (status != 0) {
		SPDK_ERRLOG("Failed to unquiesce bdev: %s\n", spdk_strerror(-status));
	}

	if (process->status != 0) {
		struct raid_base_bdev_info *target = process->target;

		if (target->desc != NULL && target->remove_scheduled == false) {
			_raid_bdev_remove_base_bdev(target, raid_bdev_process_finish_target_removed, process);
			return;
		}
	}

	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
}

static void
raid_bdev_process_finish_unquiesce(void *ctx)
{
	struct raid_bdev_process *process = ctx;
	int rc;

	rc = spdk_bdev_unquiesce(&process->raid_bdev->bdev, &g_raid_if,
				 raid_bdev_process_finish_unquiesced, process);
	if (rc != 0) {
		raid_bdev_process_finish_unquiesced(process, rc);
	}
}

static void
raid_bdev_process_finish_done(void *ctx)
{
	struct raid_bdev_process *process = ctx;
	struct raid_bdev *raid_bdev = process->raid_bdev;

	if (process->raid_ch != NULL) {
		spdk_put_io_channel(spdk_io_channel_from_ctx(process->raid_ch));
	}

	process->state = RAID_PROCESS_STATE_STOPPED;

	if (process->status == 0) {
		SPDK_NOTICELOG("Finished %s on raid bdev %s\n",
			       raid_bdev_process_to_str(process->type),
			       raid_bdev->bdev.name);
		if (raid_bdev->sb != NULL) {
			spdk_thread_send_msg(spdk_thread_get_app_thread(),
					     raid_bdev_process_finish_write_sb,
					     raid_bdev);
		}
	} else {
		SPDK_WARNLOG("Finished %s on raid bdev %s: %s\n",
			     raid_bdev_process_to_str(process->type),
			     raid_bdev->bdev.name,
			     spdk_strerror(-process->status));
	}

	spdk_thread_send_msg(spdk_thread_get_app_thread(), raid_bdev_process_finish_unquiesce,
			     process);
}

static void
__raid_bdev_process_finish(struct spdk_io_channel_iter *i, int status)
{
	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);

	spdk_thread_send_msg(process->thread, raid_bdev_process_finish_done, process);
}

static void
raid_bdev_channel_process_finish(struct spdk_io_channel_iter *i)
{
	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);

	if (process->status == 0) {
		uint8_t slot = raid_bdev_base_bdev_slot(process->target);

		raid_ch->base_channel[slot] = raid_ch->process.target_ch;
		raid_ch->process.target_ch = NULL;
	}

	raid_bdev_ch_process_cleanup(raid_ch);

	spdk_for_each_channel_continue(i, 0);
}

static void
raid_bdev_process_finish_quiesced(void *ctx, int status)
{
	struct raid_bdev_process *process = ctx;
	struct raid_bdev *raid_bdev = process->raid_bdev;

	if (status != 0) {
		SPDK_ERRLOG("Failed to quiesce bdev: %s\n", spdk_strerror(-status));
		return;
	}

	raid_bdev->process = NULL;
	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_process_finish, process,
			      __raid_bdev_process_finish);
}

static void
_raid_bdev_process_finish(void *ctx)
{
	struct raid_bdev_process *process = ctx;
	int rc;

	rc = spdk_bdev_quiesce(&process->raid_bdev->bdev, &g_raid_if,
			       raid_bdev_process_finish_quiesced, process);
	if (rc != 0) {
		raid_bdev_process_finish_quiesced(ctx, rc);
	}
}

static void
raid_bdev_process_do_finish(struct raid_bdev_process *process)
{
	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_finish, process);
}

static void raid_bdev_process_unlock_window_range(struct raid_bdev_process *process);
static void raid_bdev_process_thread_run(struct raid_bdev_process *process);

static void
raid_bdev_process_finish(struct raid_bdev_process *process, int status)
{
	assert(spdk_get_thread() == process->thread);

	if (process->status == 0) {
		process->status = status;
	}

	if (process->state >= RAID_PROCESS_STATE_STOPPING) {
		return;
	}

	assert(process->state == RAID_PROCESS_STATE_RUNNING);
	process->state = RAID_PROCESS_STATE_STOPPING;

	if (process->window_range_locked) {
		raid_bdev_process_unlock_window_range(process);
	} else {
		raid_bdev_process_thread_run(process);
	}
}

static void
raid_bdev_process_window_range_unlocked(void *ctx, int status)
{
	struct raid_bdev_process *process = ctx;

	if (status != 0) {
		SPDK_ERRLOG("Failed to unlock LBA range: %s\n", spdk_strerror(-status));
		raid_bdev_process_finish(process, status);
		return;
	}

	process->window_range_locked = false;
	process->window_offset += process->window_size;

	raid_bdev_process_thread_run(process);
}

static void
raid_bdev_process_unlock_window_range(struct raid_bdev_process *process)
{
	int rc;

	assert(process->window_range_locked == true);

	rc = spdk_bdev_unquiesce_range(&process->raid_bdev->bdev, &g_raid_if,
				       process->window_offset, process->max_window_size,
				       raid_bdev_process_window_range_unlocked, process);
	if (rc != 0) {
		raid_bdev_process_window_range_unlocked(process, rc);
	}
}

static void
raid_bdev_process_channels_update_done(struct spdk_io_channel_iter *i, int status)
{
	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);

	raid_bdev_process_unlock_window_range(process);
}

static void
raid_bdev_process_channel_update(struct spdk_io_channel_iter *i)
{
	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);

	raid_ch->process.offset = process->window_offset + process->window_size;

	spdk_for_each_channel_continue(i, 0);
}

void
raid_bdev_process_request_complete(struct raid_bdev_process_request *process_req, int status)
{
	struct raid_bdev_process *process = process_req->process;

	TAILQ_INSERT_TAIL(&process->requests, process_req, link);

	assert(spdk_get_thread() == process->thread);
	assert(process->window_remaining >= process_req->num_blocks);

	if (status != 0) {
		process->window_status = status;
	}

	process->window_remaining -= process_req->num_blocks;
	if (process->window_remaining == 0) {
		if (process->window_status != 0) {
			raid_bdev_process_finish(process, process->window_status);
			return;
		}

		spdk_for_each_channel(process->raid_bdev, raid_bdev_process_channel_update, process,
				      raid_bdev_process_channels_update_done);
	}
}

static int
raid_bdev_submit_process_request(struct raid_bdev_process *process, uint64_t offset_blocks,
				 uint32_t num_blocks)
{
	struct raid_bdev *raid_bdev = process->raid_bdev;
	struct raid_bdev_process_request *process_req;
	int ret;

	process_req = TAILQ_FIRST(&process->requests);
	if (process_req == NULL) {
		assert(process->window_remaining > 0);
		return 0;
	}

	process_req->target = process->target;
	process_req->target_ch = process->raid_ch->process.target_ch;
	process_req->offset_blocks = offset_blocks;
	process_req->num_blocks = num_blocks;
	process_req->iov.iov_len = num_blocks * raid_bdev->bdev.blocklen;

	ret = raid_bdev->module->submit_process_request(process_req, process->raid_ch);
	if (ret <= 0) {
		if (ret < 0) {
			SPDK_ERRLOG("Failed to submit process request on %s: %s\n",
				    raid_bdev->bdev.name, spdk_strerror(-ret));
			process->window_status = ret;
		}
		return ret;
	}

	process_req->num_blocks = ret;
	TAILQ_REMOVE(&process->requests, process_req, link);

	return ret;
}

static void
_raid_bdev_process_thread_run(struct raid_bdev_process *process)
{
	struct raid_bdev *raid_bdev = process->raid_bdev;
	uint64_t offset = process->window_offset;
	const uint64_t offset_end = spdk_min(offset + process->max_window_size, raid_bdev->bdev.blockcnt);
	int ret;

	while (offset < offset_end) {
		ret = raid_bdev_submit_process_request(process, offset, offset_end - offset);
		if (ret <= 0) {
			break;
		}

		process->window_remaining += ret;
		offset += ret;
	}

	if (process->window_remaining > 0) {
		process->window_size = process->window_remaining;
	} else {
		raid_bdev_process_finish(process, process->window_status);
	}
}

static void
raid_bdev_process_window_range_locked(void *ctx, int status)
{
	struct raid_bdev_process *process = ctx;

	if (status != 0) {
		SPDK_ERRLOG("Failed to lock LBA range: %s\n", spdk_strerror(-status));
		raid_bdev_process_finish(process, status);
		return;
	}

	process->window_range_locked = true;

	if (process->state == RAID_PROCESS_STATE_STOPPING) {
		raid_bdev_process_unlock_window_range(process);
		return;
	}

	_raid_bdev_process_thread_run(process);
}

static void
raid_bdev_process_thread_run(struct raid_bdev_process *process)
{
	struct raid_bdev *raid_bdev = process->raid_bdev;
	int rc;

	assert(spdk_get_thread() == process->thread);
	assert(process->window_remaining == 0);
	assert(process->window_range_locked == false);

	if (process->state == RAID_PROCESS_STATE_STOPPING) {
		raid_bdev_process_do_finish(process);
		return;
	}

	if (process->window_offset == raid_bdev->bdev.blockcnt) {
		SPDK_DEBUGLOG(bdev_raid, "process completed on %s\n", raid_bdev->bdev.name);
		raid_bdev_process_finish(process, 0);
		return;
	}

	process->max_window_size = spdk_min(raid_bdev->bdev.blockcnt - process->window_offset,
					    process->max_window_size);

	rc = spdk_bdev_quiesce_range(&raid_bdev->bdev, &g_raid_if,
				     process->window_offset, process->max_window_size,
				     raid_bdev_process_window_range_locked, process);
	if (rc != 0) {
		raid_bdev_process_window_range_locked(process, rc);
	}
}

static void
raid_bdev_process_thread_init(void *ctx)
{
	struct raid_bdev_process *process = ctx;
	struct raid_bdev *raid_bdev = process->raid_bdev;
	struct spdk_io_channel *ch;

	process->thread = spdk_get_thread();

	ch = spdk_get_io_channel(raid_bdev);
	if (ch == NULL) {
		process->status = -ENOMEM;
		raid_bdev_process_do_finish(process);
		return;
	}

	process->raid_ch = spdk_io_channel_get_ctx(ch);
	process->state = RAID_PROCESS_STATE_RUNNING;

	SPDK_NOTICELOG("Started %s on raid bdev %s\n",
		       raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);

	raid_bdev_process_thread_run(process);
}

static void
raid_bdev_channels_abort_start_process_done(struct spdk_io_channel_iter *i, int status)
{
	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);

	_raid_bdev_remove_base_bdev(process->target, NULL, NULL);
	raid_bdev_process_free(process);

	/* TODO: update sb */
}

static void
raid_bdev_channel_abort_start_process(struct spdk_io_channel_iter *i)
{
	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);

	raid_bdev_ch_process_cleanup(raid_ch);

	spdk_for_each_channel_continue(i, 0);
}

static void
raid_bdev_channels_start_process_done(struct spdk_io_channel_iter *i, int status)
{
	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
	struct raid_bdev *raid_bdev = process->raid_bdev;
	struct spdk_thread *thread;
	char thread_name[RAID_BDEV_SB_NAME_SIZE + 16];

	if (status != 0) {
		SPDK_ERRLOG("Failed to start %s on %s: %s\n",
			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name,
			    spdk_strerror(-status));
		goto err;
	}

	/* TODO: we may need to abort if a base bdev was removed before we got here */

	snprintf(thread_name, sizeof(thread_name), "%s_%s",
		 raid_bdev->bdev.name, raid_bdev_process_to_str(process->type));

	thread = spdk_thread_create(thread_name, NULL);
	if (thread == NULL) {
		SPDK_ERRLOG("Failed to create %s thread for %s\n",
			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
		goto err;
	}

	raid_bdev->process = process;

	spdk_thread_send_msg(thread, raid_bdev_process_thread_init, process);

	return;
err:
	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_abort_start_process, process,
			      raid_bdev_channels_abort_start_process_done);
}

static void
raid_bdev_channel_start_process(struct spdk_io_channel_iter *i)
{
	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
	int rc;

	rc = raid_bdev_ch_process_setup(raid_ch, process);

	spdk_for_each_channel_continue(i, rc);
}

static void
raid_bdev_process_start(struct raid_bdev_process *process)
{
	struct raid_bdev *raid_bdev = process->raid_bdev;

	assert(raid_bdev->module->submit_process_request != NULL);

	spdk_for_each_channel(raid_bdev, raid_bdev_channel_start_process, process,
			      raid_bdev_channels_start_process_done);
}

static void
raid_bdev_process_request_free(struct raid_bdev_process_request *process_req)
{
	spdk_dma_free(process_req->iov.iov_base);
	spdk_dma_free(process_req->md_buf);
	free(process_req);
}

static struct raid_bdev_process_request *
raid_bdev_process_alloc_request(struct raid_bdev_process *process)
{
	struct raid_bdev *raid_bdev = process->raid_bdev;
	struct raid_bdev_process_request *process_req;

	process_req = calloc(1, sizeof(*process_req));
	if (process_req == NULL) {
		return NULL;
	}

	process_req->process = process;
	process_req->iov.iov_len = process->max_window_size * raid_bdev->bdev.blocklen;
	process_req->iov.iov_base = spdk_dma_malloc(process_req->iov.iov_len, 4096, 0);
	if (process_req->iov.iov_base == NULL) {
		free(process_req);
		return NULL;
	}
	if (spdk_bdev_is_md_separate(&raid_bdev->bdev)) {
		process_req->md_buf = spdk_dma_malloc(process->max_window_size * raid_bdev->bdev.md_len, 4096, 0);
		if (process_req->md_buf == NULL) {
			raid_bdev_process_request_free(process_req);
			return NULL;
		}
	}

	return process_req;
}

static void
raid_bdev_process_free(struct raid_bdev_process *process)
{
	struct raid_bdev_process_request *process_req;

	while ((process_req = TAILQ_FIRST(&process->requests)) != NULL) {
		TAILQ_REMOVE(&process->requests, process_req, link);
		raid_bdev_process_request_free(process_req);
	}

	free(process);
}

static struct raid_bdev_process *
raid_bdev_process_alloc(struct raid_bdev *raid_bdev, enum raid_process_type type,
			struct raid_base_bdev_info *target)
{
	struct raid_bdev_process *process;
	struct raid_bdev_process_request *process_req;
	int i;

	process = calloc(1, sizeof(*process));
	if (process == NULL) {
		return NULL;
	}

	process->raid_bdev = raid_bdev;
	process->type = type;
	process->target = target;
	process->max_window_size = spdk_max(RAID_BDEV_PROCESS_WINDOW_SIZE / raid_bdev->bdev.blocklen,
					    raid_bdev->bdev.write_unit_size);
	TAILQ_INIT(&process->requests);

	for (i = 0; i < RAID_BDEV_PROCESS_MAX_QD; i++) {
		process_req = raid_bdev_process_alloc_request(process);
		if (process_req == NULL) {
			raid_bdev_process_free(process);
			return NULL;
		}

		TAILQ_INSERT_TAIL(&process->requests, process_req, link);
	}

	return process;
}

static int
raid_bdev_start_rebuild(struct raid_base_bdev_info *target)
{
	struct raid_bdev_process *process;

	assert(spdk_get_thread() == spdk_thread_get_app_thread());

	process = raid_bdev_process_alloc(target->raid_bdev, RAID_PROCESS_REBUILD, target);
	if (process == NULL) {
		return -ENOMEM;
	}

	raid_bdev_process_start(process);

	return 0;
}

static void
raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info)
{
	struct raid_bdev *raid_bdev = base_info->raid_bdev;
	int rc;

	/* TODO: defer if rebuild in progress on another base bdev */
	assert(raid_bdev->process == NULL);

	base_info->is_configured = true;

	raid_bdev->num_base_bdevs_discovered++;
@@ -1757,6 +2483,14 @@ raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info)
		if (rc != 0) {
			SPDK_ERRLOG("Failed to configure raid bdev: %s\n", spdk_strerror(-rc));
		}
	} else if (raid_bdev->num_base_bdevs_discovered > raid_bdev->num_base_bdevs_operational) {
		assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
		raid_bdev->num_base_bdevs_operational++;
		rc = raid_bdev_start_rebuild(base_info);
		if (rc != 0) {
			SPDK_ERRLOG("Failed to start rebuild: %s\n", spdk_strerror(-rc));
			_raid_bdev_remove_base_bdev(base_info, NULL, NULL);
		}
	}
}

+33 −3

File changed.

Preview size limit exceeded, changes collapsed.

+121 −8

File changed.

Preview size limit exceeded, changes collapsed.