Commit d5d6efd8 authored by Milosz Linkiewicz's avatar Milosz Linkiewicz Committed by Tomasz Zawadzki
Browse files

sma: nvmf/vfiouser device manager implementation



This patch utilize generic sma implementation by adding vfiouser
devices manager. It's allow to expose virtualized block devices to
QEMU instances or other arbitrary processes.

Max device capacity depend on available `pci-bridge`
```yaml
devices:
  - name: 'vfiouser'
    params:
      buses:
        - name: 'pci.spdk.0'
          count: 32
        - name: 'pci.spdk.1'
          count: 32
      qmp_addr: 127.0.0.1
      qmp_port: 9090
```

Signed-off-by: default avatarMilosz Linkiewicz <milosz.linkiewicz@intel.com>
Signed-off-by: default avatarSebastian Brzezinka <sebastian.brzezinka@intel.com>
Change-Id: I5ab43f4b877c371fa16a4daf4212ac2686991bd4
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/13004


Tested-by: default avatarSPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: default avatarKonrad Sztyber <konrad.sztyber@intel.com>
Reviewed-by: default avatarTomasz Zawadzki <tomasz.zawadzki@intel.com>
parent bae771fc
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -9,3 +9,4 @@ from .device import DeviceException # noqa
from .device import DeviceManager           # noqa
from .device import NvmfTcpDeviceManager    # noqa
from .device import VhostBlkDeviceManager   # noqa
from .device import NvmfVfioDeviceManager   # noqa
+1 −0
Original line number Diff line number Diff line
@@ -2,3 +2,4 @@ from .device import DeviceException
from .device import DeviceManager
from .nvmf_tcp import NvmfTcpDeviceManager
from .vhost_blk import VhostBlkDeviceManager
from .nvmf_vfiouser import NvmfVfioDeviceManager
+275 −0
Original line number Diff line number Diff line
import os
import grpc
import logging
from ..common import format_volume_id
from socket import AddressFamily
from spdk.rpc.client import JSONRPCException
import shutil
from .device import DeviceManager, DeviceException
from google.protobuf import wrappers_pb2 as wrap
from ..qmp import QMPClient, QMPError
from ..proto import sma_pb2
from contextlib import contextmanager

log = logging.getLogger(__name__)


class NvmfVfioDeviceManager(DeviceManager):
    def __init__(self, client):
        super().__init__('vfiouser', 'nvme', client)

    def init(self, config):
        self._buses = config.get('buses', [])
        try:
            if len(self._buses) != len(list({v['name']: v for v in self._buses}.values())):
                raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT,
                                      'Duplicate PCI bridge names')
        except KeyError:
            raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT,
                                  'PCI bridge name is missing')
        for bus in self._buses:
            bus['count'] = bus.get('count', 32)
            if bus['count'] < 0:
                raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT,
                                      'Incorrect PCI bridge count')
        self._qmp_addr = (config.get('qmp_addr', '127.0.0.1'), config.get('qmp_port'))
        self._sock_path = config.get('sock_path', '/var/tmp/')
        self._prefix = f'{self.protocol}'
        if not self._create_transport():
            raise DeviceException(grpc.StatusCode.INTERNAL,
                                  'NVMe/vfiouser transport is unavailable')

    def _create_transport(self):
        try:
            with self._client() as client:
                transports = client.call('nvmf_get_transports')
                for transport in transports:
                    if transport['trtype'].lower() == 'vfiouser':
                        return True
                return client.call('nvmf_create_transport', {'trtype': 'vfiouser'})
        except JSONRPCException:
            logging.error(f'Transport query NVMe/vfiouser failed')
            return False

    @contextmanager
    def _client_wrap(self):
        try:
            with self._client() as client:
                yield client
        except JSONRPCException:
            raise DeviceException(grpc.StatusCode.INTERNAL,
                                  'Failed to connect to SPDK application')

    def _get_subsys(self, client, nqn):
        try:
            return client.call('nvmf_get_subsystems', {'nqn': nqn})[0]
        except JSONRPCException:
            return False

    def _create_subsystem(self, client, subnqn):
        try:
            if self._get_subsys(client, subnqn):
                return True
            return client.call('nvmf_create_subsystem', {'nqn': subnqn, 'allow_any_host': True})
        except JSONRPCException:
            logging.error('Failed to create subsystem')
        return False

    def _delete_subsystem(self, client, subnqn):
        try:
            if not self._get_subsys(client, subnqn):
                return True
            return client.call('nvmf_delete_subsystem', {'nqn': subnqn})
        except JSONRPCException:
            logging.error('Failed to delete subsystem')
        return False

    def _subsystem_add_listener(self, client, subnqn, addr):
        try:
            return client.call('nvmf_subsystem_add_listener',
                               {'nqn': subnqn,
                                'listen_address': {
                                    'trtype': 'vfiouser',
                                    'traddr': addr}})
        except JSONRPCException:
            logging.error('Failed to add listener')
        return False

    def _create_socket_dir(self, dev_id):
        try:
            path = os.path.join(self._sock_path, dev_id)
            if os.path.exists(path):
                shutil.rmtree(path)
            os.makedirs(path)
            if os.path.isdir(path):
                return path
        except OSError:
            raise DeviceException(grpc.StatusCode.INTERNAL, f'Socket path error {path}')

    def _find_pcidev(self, qclient, name):
        def rsearch(devices, name):
            for dev in devices:
                if dev['qdev_id'] == name:
                    return dev
                if 'pci_bridge' in dev:
                    return rsearch(dev['pci_bridge']['devices'], name)
                else:
                    pass
        try:
            buses = qclient.query_pci()['return']
            for bus in buses:
                rc = rsearch(bus['devices'], name)
                if rc is not None:
                    return rc
        except QMPError:
            return None

    def _qmp_add_device(self, phid, dev_id):
        # Find a bus that the physical_id maps to
        for bus in self._buses:
            if phid >= bus['count']:
                phid = phid - bus['count']
            else:
                break
        else:
            raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT, 'Invalid physical_id')
        try:
            with QMPClient(self._qmp_addr, AddressFamily.AF_INET) as qclient:
                if self._find_pcidev(qclient, dev_id) is None:
                    qclient.device_add({'driver': 'vfio-user-pci',
                                        'x-enable-migration': 'on',
                                        'socket': os.path.join(self._sock_path, dev_id, 'cntrl'),
                                        'bus': bus.get('name'),
                                        'addr': hex(phid),
                                        'id': dev_id})
            return True
        except QMPError:
            logging.error('QMP: Failed to add device')
        return False

    def _create_device(self, physical_id):
        with self._client_wrap() as client:
            dev_id = f'{self.name}-{physical_id}'
            subnqn = f'nqn.2016-06.io.spdk:{dev_id}'
            rc = self._create_subsystem(client, subnqn)
            if not rc:
                raise DeviceException(grpc.StatusCode.INTERNAL,
                                      'Failed to create the NVMe/vfiouser subsystem')
            rc = self._subsystem_add_listener(client, subnqn,
                                              self._create_socket_dir(dev_id))
            if not rc:
                self._delete_subsystem(client, subnqn)
                raise DeviceException(grpc.StatusCode.INTERNAL,
                                      'Failed to add the NVMe/vfiouser listener')
            rc = self._qmp_add_device(physical_id, dev_id)
            if not rc:
                self._delete_subsystem(client, subnqn)
                raise DeviceException(grpc.StatusCode.INTERNAL,
                                      'Failed to create NVMe/vfiouser device')
            return subnqn

    def create_device(self, request):
        if request.nvme.virtual_id != 0:
            raise DeviceException(grpc.StatusCode.INVALID_ARGUMENT,
                                  'Unsupported virtual_id value')
        subnqn = self._create_device(request.nvme.physical_id)
        return sma_pb2.CreateDeviceResponse(handle=f'{self._prefix}:{subnqn}')

    def _qmp_delete_device(self, dev_id):
        try:
            with QMPClient(self._qmp_addr, AddressFamily.AF_INET) as qclient:
                if self._find_pcidev(qclient, dev_id) is not None:
                    qclient.device_del({'id': dev_id})
            return True
        except QMPError:
            logging.error('QMP: Failed to delete device')
        return False

    def delete_device(self, request):
        with self._client_wrap() as client:
            nqn = request.handle[len(f'{self._prefix}:'):]
            dev_id = nqn[len('nqn.2016-06.io.spdk:'):]
            if not self._delete_subsystem(client, nqn):
                raise DeviceException(grpc.StatusCode.INTERNAL,
                                      'Failed to delete NVMe/vfiouser device')
            if not self._qmp_delete_device(dev_id):
                raise DeviceException(grpc.StatusCode.INTERNAL,
                                      'Failed to delete NVMe/vfiouser device')
        try:
            path = os.path.join(self._sock_path, dev_id)
            if os.path.exists(path):
                shutil.rmtree(path)
        except OSError:
            raise DeviceException(grpc.StatusCode.INTERNAL, f'Socket path error {path}')

    def _get_bdev(self, client, guid):
        try:
            return client.call('bdev_get_bdevs', {'name': guid})[0]
        except JSONRPCException:
            logging.error('Failed to find bdev')
            return None

    def _get_ns(self, bdev, subsystem):
        for ns in subsystem['namespaces']:
            if ns['name'] == bdev['name']:
                return ns

    def _subsystem_add_ns(self, client, bdev, subsystem, subnqn):
        try:
            if self._get_ns(bdev, subsystem) is not None:
                return True
            return client.call('nvmf_subsystem_add_ns',
                               {'nqn': subnqn,
                                'namespace': {
                                    'bdev_name': bdev['name']}})
        except JSONRPCException:
            logging.error('Failed to add ns')
        return False

    def attach_volume(self, request):
        nqn = request.device_handle[len(f'{self._prefix}:'):]
        volume_id = format_volume_id(request.volume.volume_id)
        with self._client_wrap() as client:
            bdev = self._get_bdev(client, volume_id)
            if bdev is None:
                raise DeviceException(grpc.StatusCode.NOT_FOUND,
                                      'Invalid volume GUID')
            subsys = self._get_subsys(client, nqn)
            if subsys is None:
                raise DeviceException(grpc.StatusCode.NOT_FOUND,
                                      'Invalid device handle')
            result = self._subsystem_add_ns(client, bdev, subsys, nqn)
            if not result:
                raise DeviceException(grpc.StatusCode.INTERNAL,
                                      'Failed to attach volume')

    def _subsystem_remove_ns(self, client, bdev, subsystem, subnqn):
        try:
            ns = self._get_ns(bdev, subsystem)
            if ns is None:
                return True
            return client.call('nvmf_subsystem_remove_ns',
                               {'nqn': subnqn, 'nsid': ns['nsid']})
        except JSONRPCException:
            logging.error('Failed to remove ns')
        return False

    def detach_volume(self, request):
        nqn = request.device_handle[len(f'{self._prefix}:'):]
        volume_id = format_volume_id(request.volume_id)
        with self._client_wrap() as client:
            bdev = self._get_bdev(client, volume_id)
            if bdev is None:
                raise DeviceException(grpc.StatusCode.NOT_FOUND,
                                      'Invalid volume GUID')
            subsys = self._get_subsys(client, nqn)
            if subsys is None:
                raise DeviceException(grpc.StatusCode.NOT_FOUND,
                                      'Invalid device handle')
            result = self._subsystem_remove_ns(client, bdev, subsys, nqn)
            if not result:
                raise DeviceException(grpc.StatusCode.INTERNAL,
                                      'Failed to detach volume')

    def owns_device(self, id):
        return id.startswith(self._prefix)
+2 −1
Original line number Diff line number Diff line
@@ -120,7 +120,8 @@ if __name__ == '__main__':

    agent = sma.StorageManagementAgent(config, client)

    devices = [sma.NvmfTcpDeviceManager(client), sma.VhostBlkDeviceManager(client)]
    devices = [sma.NvmfTcpDeviceManager(client), sma.VhostBlkDeviceManager(client),
               sma.NvmfVfioDeviceManager(client)]
    devices += load_plugins(config.get('plugins') or [], client)
    devices += load_plugins(filter(None, os.environ.get('SMA_PLUGINS', '').split(':')),
                            client)