Commit 8b0a4a3c authored by Dariusz Stojaczyk's avatar Dariusz Stojaczyk Committed by Daniel Verkamp
Browse files

bdev_virtio: implement multiqueue



Virtqueues now have to be "acquired"
by a logical CPU core in order to
perform any traffic on them. The
acquire mechanism is thread-safe to
prevent two reactors accessing the
same virtqueue at the same time.

For now a single virtqueue
may be used by only one io_channel.
Support for shared virtqueues will
be implemented in future.

Added new param "Queues" to the
virtio config file for VirtioUser
bdevs. VirtioPci will use the
max available queues num -
negotiated during QEMU startup.

Change-Id: I3fd4b9d8c470f26ca9b84838b3c64de6f9e48300
Signed-off-by: default avatarDariusz Stojaczyk <dariuszx.stojaczyk@intel.com>
Reviewed-on: https://review.gerrithub.io/377337


Reviewed-by: default avatarJim Harris <james.r.harris@intel.com>
Tested-by: default avatarSPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: default avatarDaniel Verkamp <daniel.verkamp@intel.com>
parent 14db5b66
Loading
Loading
Loading
Loading
+56 −19
Original line number Diff line number Diff line
@@ -70,7 +70,9 @@ struct virtio_scsi_io_ctx {

struct virtio_scsi_scan_base {
	struct virtio_dev		*vdev;
	struct spdk_bdev_poller		*scan_poller;

	/** Virtqueue used for the scan I/O. */
	struct virtqueue		*vq;

	/* Currently queried target */
	unsigned			target;
@@ -94,7 +96,9 @@ struct virtio_scsi_disk {

struct bdev_virtio_io_channel {
	struct virtio_dev	*vdev;
	struct spdk_bdev_poller	*poller;

	/** Virtqueue exclusively assigned to this channel. */
	struct virtqueue	*vq;
};

static void scan_target(struct virtio_scsi_scan_base *base);
@@ -144,6 +148,7 @@ bdev_virtio_rw(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
	struct virtio_scsi_disk *disk = SPDK_CONTAINEROF(bdev_io->bdev, struct virtio_scsi_disk, bdev);
	struct virtio_req *vreq = bdev_virtio_init_io_vreq(ch, bdev_io);
	struct virtio_scsi_cmd_req *req = vreq->iov_req.iov_base;
	struct bdev_virtio_io_channel *virtio_channel = spdk_io_channel_get_ctx(ch);

	vreq->iov = bdev_io->u.bdev.iovs;
	vreq->iovcnt = bdev_io->u.bdev.iovcnt;
@@ -158,15 +163,15 @@ bdev_virtio_rw(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
		to_be16(&req->cdb[7], bdev_io->u.bdev.num_blocks);
	}

	virtio_xmit_pkts(disk->vdev->vqs[2], vreq);
	virtio_xmit_pkts(virtio_channel->vq, vreq);
}

static void
bdev_virtio_unmap(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
{
	struct virtio_scsi_disk *disk = SPDK_CONTAINEROF(bdev_io->bdev, struct virtio_scsi_disk, bdev);
	struct virtio_req *vreq = bdev_virtio_init_io_vreq(ch, bdev_io);
	struct virtio_scsi_cmd_req *req = vreq->iov_req.iov_base;
	struct bdev_virtio_io_channel *virtio_channel = spdk_io_channel_get_ctx(ch);
	struct spdk_scsi_unmap_bdesc *desc, *first_desc;
	uint8_t *buf;
	uint64_t offset_blocks, num_blocks;
@@ -206,7 +211,7 @@ bdev_virtio_unmap(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
	to_be16(&buf[2], cmd_len - 8); /* length of block descriptors */
	memset(&buf[4], 0, 4); /* reserved */

	virtio_xmit_pkts(disk->vdev->vqs[2], vreq);
	virtio_xmit_pkts(virtio_channel->vq, vreq);
}

static int _bdev_virtio_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
@@ -331,7 +336,7 @@ bdev_virtio_poll(void *arg)
	struct virtio_req *req[32];
	uint16_t i, cnt;

	cnt = virtio_recv_pkts(ch->vdev->vqs[2], req, SPDK_COUNTOF(req));
	cnt = virtio_recv_pkts(ch->vq, req, SPDK_COUNTOF(req));
	for (i = 0; i < cnt; ++i) {
		bdev_virtio_io_cpl(req[i]);
	}
@@ -340,12 +345,26 @@ bdev_virtio_poll(void *arg)
static int
bdev_virtio_create_cb(void *io_device, void *ctx_buf)
{
	struct virtio_dev **vdev = io_device;
	struct virtio_dev **vdev_ptr = io_device;
	struct virtio_dev *vdev = *vdev_ptr;
	struct bdev_virtio_io_channel *ch = ctx_buf;
	struct virtqueue *vq;
	int32_t queue_idx;

	queue_idx = virtio_dev_find_and_acquire_queue(vdev, 2);
	if (queue_idx < 0) {
		SPDK_ERRLOG("Couldn't get an unused queue for the io_channel.\n");
		return queue_idx;
	}

	vq = vdev->vqs[queue_idx];

	ch->vdev = vdev;
	ch->vq = vq;

	spdk_bdev_poller_start(&vq->poller, bdev_virtio_poll, ch,
			       vq->owner_lcore, 0);

	ch->vdev = *vdev;
	spdk_bdev_poller_start(&ch->poller, bdev_virtio_poll, ch,
			       spdk_env_get_current_core(), 0);
	return 0;
}

@@ -353,8 +372,11 @@ static void
bdev_virtio_destroy_cb(void *io_device, void *ctx_buf)
{
	struct bdev_virtio_io_channel *io_channel = ctx_buf;
	struct virtio_dev *vdev = io_channel->vdev;
	struct virtqueue *vq = io_channel->vq;

	spdk_bdev_poller_stop(&io_channel->poller);
	spdk_bdev_poller_stop(&vq->poller);
	virtio_dev_release_queue(vdev, vq->vq_queue_index);
}

static void
@@ -368,7 +390,8 @@ scan_target_finish(struct virtio_scsi_scan_base *base)
		return;
	}

	spdk_bdev_poller_stop(&base->scan_poller);
	spdk_bdev_poller_stop(&base->vq->poller);
	virtio_dev_release_queue(base->vdev, base->vq->vq_queue_index);

	while ((disk = TAILQ_FIRST(&base->found_disks))) {
		TAILQ_REMOVE(&base->found_disks, disk, link);
@@ -400,7 +423,7 @@ send_read_cap_10(struct virtio_scsi_scan_base *base, uint8_t target_id, struct v
	iov[0].iov_len = 8;
	req->cdb[0] = SPDK_SBC_READ_CAPACITY_10;

	virtio_xmit_pkts(base->vdev->vqs[2], vreq);
	virtio_xmit_pkts(base->vq, vreq);
}

static void
@@ -418,7 +441,7 @@ send_read_cap_16(struct virtio_scsi_scan_base *base, uint8_t target_id, struct v
	req->cdb[1] = SPDK_SBC_SAI_READ_CAPACITY_16;
	to_be32(&req->cdb[10], iov[0].iov_len);

	virtio_xmit_pkts(base->vdev->vqs[2], vreq);
	virtio_xmit_pkts(base->vq, vreq);
}

static int
@@ -556,7 +579,7 @@ bdev_scan_poll(void *arg)
	struct virtio_req *req;
	uint16_t cnt;

	cnt = virtio_recv_pkts(base->vdev->vqs[2], &req, 1);
	cnt = virtio_recv_pkts(base->vq, &req, 1);
	if (cnt > 0) {
		process_scan_resp(base, req);
	}
@@ -596,7 +619,7 @@ scan_target(struct virtio_scsi_scan_base *base)
	cdb->opcode = SPDK_SPC_INQUIRY;
	cdb->alloc_len[1] = 255;

	virtio_xmit_pkts(base->vdev->vqs[2], vreq);
	virtio_xmit_pkts(base->vq, vreq);
}

static int
@@ -606,6 +629,7 @@ bdev_virtio_process_config(void)
	struct virtio_dev *vdev = NULL;
	char *path;
	unsigned vdev_num;
	int num_queues;
	bool enable_pci;
	int rc = 0;

@@ -628,7 +652,12 @@ bdev_virtio_process_config(void)
			goto out;
		}

		vdev = virtio_user_dev_init(path, 512);
		num_queues = spdk_conf_section_get_intval(sp, "Queues");
		if (num_queues < 1) {
			num_queues = 1;
		}

		vdev = virtio_user_dev_init(path, num_queues + 2, 512);
		if (vdev == NULL) {
			rc = -1;
			goto out;
@@ -659,6 +688,7 @@ bdev_virtio_initialize(void)
{
	struct virtio_scsi_scan_base *base;
	struct virtio_dev *vdev = NULL;
	struct virtqueue *vq;
	int rc = 0;

	rc = bdev_virtio_process_config();
@@ -692,9 +722,16 @@ bdev_virtio_initialize(void)
		base->vdev = vdev;
		TAILQ_INIT(&base->found_disks);

		spdk_bdev_poller_start(&base->scan_poller, bdev_scan_poll, base,
				       spdk_env_get_current_core(), 0);
		rc = virtio_dev_acquire_queue(vdev, 2);
		if (rc != 0) {
			SPDK_ERRLOG("Couldn't acquire requestq for the target scan.\n");
			goto out;
		}

		vq = vdev->vqs[2];
		base->vq = vq;
		spdk_bdev_poller_start(&vq->poller, bdev_scan_poll, base,
				       vq->owner_lcore, 0);
		scan_target(base);
	}

+77 −7
Original line number Diff line number Diff line
@@ -165,6 +165,9 @@ virtio_init_queue(struct virtio_dev *dev, uint16_t vtpci_queue_idx)

	vq->mz = mz;

	vq->owner_lcore = SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED;
	vq->poller = NULL;

	if (vtpci_ops(dev)->setup_queue(dev, vq) < 0) {
		SPDK_ERRLOG("setup_queue failed\n");
		return -EINVAL;
@@ -276,13 +279,6 @@ virtio_dev_init(struct virtio_dev *dev, uint64_t req_features)
	if (virtio_negotiate_features(dev, req_features) < 0)
		return -1;

	/* FIXME
	 * Hardcode num_queues to 3 until we add proper
	 * mutli-queue support. This value should be limited
	 * by number of cores assigned to SPDK
	 */
	dev->max_queues = 3;

	ret = virtio_alloc_queues(dev);
	if (ret < 0)
		return ret;
@@ -532,4 +528,78 @@ virtio_xmit_pkts(struct virtqueue *vq, struct virtio_req *req)
	return 1;
}

int
virtio_dev_acquire_queue(struct virtio_dev *vdev, uint16_t index)
{
	struct virtqueue *vq = NULL;

	if (index >= vdev->max_queues) {
		SPDK_ERRLOG("requested vq index %"PRIu16" exceeds max queue count %"PRIu16".\n",
			    index, vdev->max_queues);
		return -1;
	}

	pthread_mutex_lock(&vdev->mutex);
	vq = vdev->vqs[index];
	if (vq == NULL || vq->owner_lcore != SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED) {
		pthread_mutex_unlock(&vdev->mutex);
		return -1;
	}

	assert(vq->poller == NULL);
	vq->owner_lcore = spdk_env_get_current_core();
	pthread_mutex_unlock(&vdev->mutex);
	return 0;
}

int32_t
virtio_dev_find_and_acquire_queue(struct virtio_dev *vdev, uint16_t start_index)
{
	struct virtqueue *vq = NULL;
	uint16_t i;

	pthread_mutex_lock(&vdev->mutex);
	for (i = start_index; i < vdev->max_queues; ++i) {
		vq = vdev->vqs[i];
		if (vq != NULL && vq->owner_lcore == SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED) {
			break;
		}
	}

	if (vq == NULL || i == vdev->max_queues) {
		SPDK_ERRLOG("no more unused virtio queues with idx >= %"PRIu16".\n", start_index);
		pthread_mutex_unlock(&vdev->mutex);
		return -1;
	}

	assert(vq->poller == NULL);
	vq->owner_lcore = spdk_env_get_current_core();
	pthread_mutex_unlock(&vdev->mutex);
	return i;
}

void
virtio_dev_release_queue(struct virtio_dev *vdev, uint16_t index)
{
	struct virtqueue *vq = NULL;

	if (index >= vdev->max_queues) {
		SPDK_ERRLOG("given vq index %"PRIu16" exceeds max queue count %"PRIu16".\n",
			    index, vdev->max_queues);
		return;
	}

	pthread_mutex_lock(&vdev->mutex);
	vq = vdev->vqs[index];
	if (vq == NULL) {
		SPDK_ERRLOG("virtqueue at index %"PRIu16" is not initialized.\n", index);
		return;
	}

	assert(vq->poller == NULL);
	assert(vq->owner_lcore == spdk_env_get_current_core());
	vq->owner_lcore = SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED;
	pthread_mutex_unlock(&vdev->mutex);
}

SPDK_LOG_REGISTER_TRACE_FLAG("virtio_dev", SPDK_TRACE_VIRTIO_DEV)
+50 −0
Original line number Diff line number Diff line
@@ -67,6 +67,11 @@
 */
#define VQ_RING_DESC_CHAIN_END 32768

/* This is a work-around for fio-plugin bug, where each
 * fio job thread returns local lcore id = -1
 */
#define SPDK_VIRTIO_QUEUE_LCORE_ID_UNUSED (UINT32_MAX - 1)

struct virtio_dev {
	struct virtqueue **vqs;
	uint16_t	started;
@@ -85,6 +90,9 @@ struct virtio_dev {
	/** Modern/legacy virtio device flag. */
	uint8_t		modern;

	/** Mutex for asynchronous virtqueue-changing operations. */
	pthread_mutex_t	mutex;

	TAILQ_ENTRY(virtio_dev) tailq;
};

@@ -128,6 +136,12 @@ struct virtqueue {
	uint16_t  vq_queue_index;   /**< PCI queue index */
	uint16_t  *notify_addr;

	/** Logical CPU ID that's polling this queue. */
	uint32_t owner_lcore;

	/** Response poller. */
	struct spdk_bdev_poller	*poller;

	struct vq_desc_extra vq_descx[0];
};

@@ -212,4 +226,40 @@ virtqueue_kick_prepare(struct virtqueue *vq)
	return !(vq->vq_ring.used->flags & VRING_USED_F_NO_NOTIFY);
}

/**
 * Bind a virtqueue with given index to the current CPU core.
 *
 * This function is thread-safe.
 *
 * \param vdev vhost device
 * \param index virtqueue index
 * \return 0 on success, -1 in case a virtqueue with given index either
 * does not exists or is already acquired.
 */
int virtio_dev_acquire_queue(struct virtio_dev *vdev, uint16_t index);

/**
 * Look for unused queue and bind it to the current CPU core.  This will
 * scan the queues in range from *start_index* (inclusive) up to
 * vdev->max_queues (exclusive).
 *
 * This function is thread-safe.
 *
 * \param vdev vhost device
 * \param start_index virtqueue index to start looking from
 * \return index of acquired queue or -1 in case no unused queue in given range
 * has been found
 */
int32_t virtio_dev_find_and_acquire_queue(struct virtio_dev *vdev, uint16_t start_index);

/**
 * Release previously acquired queue.
 *
 * This function must be called from the thread that acquired the queue.
 *
 * \param vdev vhost device
 * \param index index of virtqueue to release
 */
void virtio_dev_release_queue(struct virtio_dev *vdev, uint16_t index);

#endif /* _VIRTIO_DEV_H_ */
+1 −0
Original line number Diff line number Diff line
@@ -714,6 +714,7 @@ vtpci_init(struct virtio_dev *vdev, const struct virtio_pci_ops *ops)
	}

	vdev->id = vdev_num;
	pthread_mutex_init(&vdev->mutex, NULL);
	g_virtio_driver.internal[vdev_num].vtpci_ops = ops;

	return 0;
+5 −4
Original line number Diff line number Diff line
@@ -174,7 +174,7 @@ virtio_user_dev_setup(struct virtio_user_dev *dev)
}

struct virtio_dev *
virtio_user_dev_init(char *path, int queue_size)
virtio_user_dev_init(char *path, uint16_t requested_queues, uint32_t queue_size)
{
	struct virtio_dev *vdev;
	struct virtio_user_dev *dev;
@@ -206,12 +206,13 @@ virtio_user_dev_init(char *path, int queue_size)
		goto err;
	}

	if (max_queues >= VIRTIO_MAX_VIRTQUEUES) {
		SPDK_ERRLOG("invalid get_queue_num value: %"PRIu64"\n", max_queues);
	if (requested_queues > max_queues) {
		SPDK_ERRLOG("requested %"PRIu16" queues but only %"PRIu64" available\n",
			     requested_queues, max_queues);
		goto err;
	}

	vdev->max_queues = max_queues;
	vdev->max_queues = requested_queues;

	if (dev->ops->send_request(dev, VHOST_USER_SET_OWNER, NULL) < 0) {
		SPDK_ERRLOG("set_owner fails: %s\n", strerror(errno));
Loading