Commit 158d15b7 authored by Ben Walker's avatar Ben Walker Committed by Jim Harris
Browse files

util/pipe: Add spdk_pipe_group



Pool the data buffers backing the pipe. When a pipe reaches empty state,
it puts the data buffer into the group's pool. If a pipe needs a data
buffer, it takes one from the pool. Since the pool is a stack, a small
number of data buffers tend to be re-used very frequently.

Change-Id: Ic660a326ce33f2ba99820708171597370414a750
Signed-off-by: default avatarBen Walker <benjamin.walker@intel.com>
Signed-off-by: default avatarJacek Kalwas <jacek.kalwas@intel.com>
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/16992


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarAleksey Marchuk <alexeymar@nvidia.com>
Reviewed-by: default avatarJim Harris <jim.harris@samsung.com>
Community-CI: Mellanox Build Bot
parent 94123731
Loading
Loading
Loading
Loading
+40 −0
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@
#include "spdk/stdinc.h"

struct spdk_pipe;
struct spdk_pipe_group;

/**
 * Construct a pipe around the given memory buffer. The pipe treats the memory
@@ -118,4 +119,43 @@ int spdk_pipe_reader_get_buffer(struct spdk_pipe *pipe, uint32_t sz, struct iove
 */
int spdk_pipe_reader_advance(struct spdk_pipe *pipe, uint32_t count);

/**
 * Constructs a pipe group.
 *
 * \return spdk_pipe_group. The new pipe group.
 */
struct spdk_pipe_group *spdk_pipe_group_create(void);

/**
 * Destroys the pipe group.
 *
 * \param group The pipe group to operate on.
 */
void spdk_pipe_group_destroy(struct spdk_pipe_group *group);

/**
 * Adds the pipe to the group.
 *
 * When a pipe reaches empty state, it puts the data buffer into
 * the group's pool. If a pipe needs a data buffer, it takes one
 * from the pool. Since the pool is a stack, a small number of
 * data buffers tend to be re-used very frequently.
 *
 * \param group The pipe group to operate on.
 * \param pipe The pipe to be added.
 *
 * \return On error, a negated errno. On success, 0.
 */
int spdk_pipe_group_add(struct spdk_pipe_group *group, struct spdk_pipe *pipe);

/**
 * Removes the pipe to the group.
 *
 * \param group The pipe group to operate on.
 * \param pipe The pipe to be removed.
 *
 * \return On error, a negated errno. On success, 0.
 */
int spdk_pipe_group_remove(struct spdk_pipe_group *group, struct spdk_pipe *pipe);

#endif
+118 −0
Original line number Diff line number Diff line
@@ -5,6 +5,17 @@

#include "spdk/pipe.h"
#include "spdk/util.h"
#include "spdk/queue.h"
#include "spdk/log.h"

struct spdk_pipe_buf {
	SLIST_ENTRY(spdk_pipe_buf)	link;
	uint32_t			sz;
};

struct spdk_pipe_group {
	SLIST_HEAD(, spdk_pipe_buf) bufs;
};

struct spdk_pipe {
	uint8_t	*buf;
@@ -13,6 +24,8 @@ struct spdk_pipe {
	uint32_t write;
	uint32_t read;
	bool full;

	struct spdk_pipe_group *group;
};

struct spdk_pipe *
@@ -40,11 +53,40 @@ spdk_pipe_destroy(struct spdk_pipe *pipe)
		return NULL;
	}

	if (pipe->group) {
		spdk_pipe_group_remove(pipe->group, pipe);
	}

	buf = pipe->buf;
	free(pipe);
	return buf;
}

static void
pipe_alloc_buf_from_group(struct spdk_pipe *pipe)
{
	struct spdk_pipe_buf *buf;
	struct spdk_pipe_group *group;

	assert(pipe->group != NULL);
	group = pipe->group;

	/* We have to pick a buffer that's the correct size. It's almost always
	 * the first one. */
	buf = SLIST_FIRST(&group->bufs);
	while (buf != NULL) {
		if (buf->sz == pipe->sz) {
			/* TODO: Could track the previous and do an SLIST_REMOVE_AFTER */
			SLIST_REMOVE(&pipe->group->bufs, buf, spdk_pipe_buf, link);
			pipe->buf = (void *)buf;
			return;
		}
		buf = SLIST_NEXT(buf, link);
	}
	/* Should never get here. */
	assert(false);
}

int
spdk_pipe_writer_get_buffer(struct spdk_pipe *pipe, uint32_t requested_sz, struct iovec *iovs)
{
@@ -61,6 +103,10 @@ spdk_pipe_writer_get_buffer(struct spdk_pipe *pipe, uint32_t requested_sz, struc
		return 0;
	}

	if (pipe->buf == NULL) {
		pipe_alloc_buf_from_group(pipe);
	}

	if (read <= write) {
		sz = spdk_min(requested_sz, pipe->sz - write);

@@ -243,9 +289,81 @@ spdk_pipe_reader_advance(struct spdk_pipe *pipe, uint32_t requested_sz)
		 * both pointers back to the beginning of the pipe. */
		read = 0;
		pipe->write = 0;

		/* Additionally, release the buffer to the shared pool */
		if (pipe->group) {
			struct spdk_pipe_buf *buf = (struct spdk_pipe_buf *)pipe->buf;
			buf->sz = pipe->sz;
			SLIST_INSERT_HEAD(&pipe->group->bufs, buf, link);
			pipe->buf = NULL;
		}
	}

	pipe->read = read;

	return 0;
}

struct spdk_pipe_group *
spdk_pipe_group_create(void)
{
	struct spdk_pipe_group *group;

	group = calloc(1, sizeof(*group));
	if (!group) {
		return NULL;
	}

	SLIST_INIT(&group->bufs);

	return group;
}

void
spdk_pipe_group_destroy(struct spdk_pipe_group *group)
{
	if (!SLIST_EMPTY(&group->bufs)) {
		SPDK_ERRLOG("Destroying a pipe group that still has buffers!\n");
		assert(false);
	}

	free(group);
}

int
spdk_pipe_group_add(struct spdk_pipe_group *group, struct spdk_pipe *pipe)
{
	struct spdk_pipe_buf *buf;

	assert(pipe->group == NULL);

	pipe->group = group;
	if (pipe->read != pipe->write || pipe->full) {
		/* Pipe currently has valid data, so keep the buffer attached
		 * to the pipe for now.  We can move it to the group's SLIST
		 * later when it gets emptied.
		 */
		return 0;
	}

	buf = (struct spdk_pipe_buf *)pipe->buf;
	buf->sz = pipe->sz;
	SLIST_INSERT_HEAD(&group->bufs, buf, link);
	pipe->buf = NULL;
	return 0;
}

int
spdk_pipe_group_remove(struct spdk_pipe_group *group, struct spdk_pipe *pipe)
{
	assert(pipe->group == group);

	if (pipe->buf == NULL) {
		/* Associate a buffer with the pipe before returning. */
		pipe_alloc_buf_from_group(pipe);
		assert(pipe->buf != NULL);
	}

	pipe->group = NULL;
	return 0;
}
+4 −0
Original line number Diff line number Diff line
@@ -108,6 +108,10 @@
	spdk_pipe_reader_bytes_available;
	spdk_pipe_reader_get_buffer;
	spdk_pipe_reader_advance;
	spdk_pipe_group_create;
	spdk_pipe_group_destroy;
	spdk_pipe_group_add;
	spdk_pipe_group_remove;

	# public functions in string.h
	spdk_sprintf_alloc;
+23 −0
Original line number Diff line number Diff line
@@ -65,6 +65,7 @@ struct spdk_posix_sock_group_impl {
	int				fd;
	struct spdk_has_data_list	socks_with_data;
	int				placement_id;
	struct spdk_pipe_group		*pipe_group;
};

static struct spdk_sock_impl_opts g_posix_impl_opts = {
@@ -359,6 +360,13 @@ posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz)
	sock->recv_buf_sz = sz;
	sock->recv_pipe = new_pipe;

	if (sock->base.group_impl) {
		struct spdk_posix_sock_group_impl *group;

		group = __posix_group_impl(sock->base.group_impl);
		spdk_pipe_group_add(group->pipe_group, sock->recv_pipe);
	}

	return 0;
}

@@ -1790,6 +1798,14 @@ _sock_group_impl_create(uint32_t enable_placement_id)
		return NULL;
	}

	group_impl->pipe_group = spdk_pipe_group_create();
	if (group_impl->pipe_group == NULL) {
		SPDK_ERRLOG("pipe_group allocation failed\n");
		free(group_impl);
		close(fd);
		return NULL;
	}

	group_impl->fd = fd;
	TAILQ_INIT(&group_impl->socks_with_data);
	group_impl->placement_id = -1;
@@ -1902,6 +1918,9 @@ posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_
		sock->pipe_has_data = true;
		sock->socket_has_data = false;
		TAILQ_INSERT_TAIL(&group->socks_with_data, sock, link);
	} else if (sock->recv_pipe != NULL) {
		rc = spdk_pipe_group_add(group->pipe_group, sock->recv_pipe);
		assert(rc == 0);
	}

	if (_sock->impl_opts.enable_placement_id == PLACEMENT_MARK) {
@@ -1928,6 +1947,9 @@ posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct sp
		TAILQ_REMOVE(&group->socks_with_data, sock, link);
		sock->pipe_has_data = false;
		sock->socket_has_data = false;
	} else if (sock->recv_pipe != NULL) {
		rc = spdk_pipe_group_remove(group->pipe_group, sock->recv_pipe);
		assert(rc == 0);
	}

	if (sock->placement_id != -1) {
@@ -2150,6 +2172,7 @@ _sock_group_impl_close(struct spdk_sock_group_impl *_group, uint32_t enable_plac
		spdk_sock_map_release(&g_map, spdk_env_get_current_core());
	}

	spdk_pipe_group_destroy(group->pipe_group);
	rc = close(group->fd);
	free(group);
	return rc;