HEX
Server: nginx/1.22.0
System: Linux iZuf6jdxbygmf6cco977lcZ 5.10.84-10.4.al8.x86_64 #1 SMP Tue Apr 12 12:31:07 CST 2022 x86_64
User: root (0)
PHP: 7.4.29
Disabled: passthru,exec,system,chroot,chgrp,chown,shell_exec,proc_open,proc_get_status,ini_alter,ini_restore,dl,readlink,symlink,popepassthru,stream_socket_server,fsocket,popen
Upload Files
File: //lib/python3.6/site-packages/pyudev/discover.py
# -*- coding: utf-8 -*-
# Copyright (C) 2015 mulhern <amulhern@redhat.com>

# This library is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation; either version 2.1 of the License, or (at your
# option) any later version.

# This library is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
# for more details.

# You should have received a copy of the GNU Lesser General Public License
# along with this library; if not, write to the Free Software Foundation,
# Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA


"""
    pyudev.discover
    ===============

    Tools to discover a device given limited information.

    .. moduleauthor::  mulhern <amulhern@redhat.com>
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import abc
import functools
import os
import re
import six

from pyudev.device import Devices
from pyudev.device import DeviceNotFoundError


def wrap_exception(func):
    """
    Allow Device discovery methods to return None instead of raising an
    exception.
    """

    @functools.wraps(func)
    def the_func(*args, **kwargs):
        """
        Returns result of calling ``func`` on ``args``, ``kwargs``.
        Returns None if ``func`` raises :exc:`DeviceNotFoundError`.
        """
        try:
            return func(*args, **kwargs)
        except DeviceNotFoundError:
            return None

    return the_func

@six.add_metaclass(abc.ABCMeta)
class Hypothesis(object):
    """
    Represents a hypothesis about the meaning of the device identifier.
    """

    @classmethod
    @abc.abstractmethod
    def match(cls, value): # pragma: no cover
        """
        Match the given string according to the hypothesis.

        The purpose of this method is to obtain a value corresponding to
        ``value`` if that is possible. It may use a regular expression, but
        in general it should just return ``value`` and let the lookup method
        sort out the rest.

        :param str value: the string to inspect
        :returns: the matched thing or None if unmatched
        :rtype: the type of lookup's key parameter or NoneType
        """
        raise NotImplementedError()

    @classmethod
    @abc.abstractmethod
    def lookup(cls, context, key): # pragma: no cover
        """
        Lookup the given string according to the hypothesis.

        :param Context context: the pyudev context
        :param key: a key with which to lookup the device
        :type key: the type of match's return value if not None
        :returns: a list of Devices obtained
        :rtype: frozenset of :class:`Device`
        """
        raise NotImplementedError()

    @classmethod
    def setup(cls, context):
        """
        A potentially expensive method that may allow an :class:`Hypothesis`
        to find devices more rapidly or to find a device that it would
        otherwise miss.

        :param Context context: the pyudev context
        """
        pass

    @classmethod
    def get_devices(cls, context, value):
        """
        Get any devices that may correspond to the given string.

        :param Context context: the pyudev context
        :param str value: the value to look for
        :returns: a list of devices obtained
        :rtype: set of :class:`Device`
        """
        key = cls.match(value)
        return cls.lookup(context, key) if key is not None else frozenset()


class DeviceNumberHypothesis(Hypothesis):
    """
    Represents the hypothesis that the device is a device number.

    The device may be separated into major/minor number or a composite number.
    """

    @classmethod
    def _match_major_minor(cls, value):
        """
        Match the number under the assumption that it is a major,minor pair.

        :param str value: value to match
        :returns: the device number or None
        :rtype: int or NoneType
        """
        major_minor_re = re.compile(
           r'^(?P<major>\d+)(\D+)(?P<minor>\d+)$'
        )
        match = major_minor_re.match(value)
        return match and os.makedev(
           int(match.group('major')),
           int(match.group('minor'))
        )

    @classmethod
    def _match_number(cls, value):
        """
        Match the number under the assumption that it is a single number.

        :param str value: value to match
        :returns: the device number or None
        :rtype: int or NoneType
        """
        number_re = re.compile(r'^(?P<number>\d+)$')
        match = number_re.match(value)
        return match and int(match.group('number'))

    @classmethod
    def match(cls, value):
        """
        Match the number under the assumption that it is a device number.

        :returns: the device number or None
        :rtype: int or NoneType
        """
        return cls._match_major_minor(value) or cls._match_number(value)

    @classmethod
    def find_subsystems(cls, context):
        """
        Find subsystems in /sys/dev.

        :param Context context: the context
        :returns: a lis of available subsystems
        :rtype: list of str
        """
        sys_path = context.sys_path
        return os.listdir(os.path.join(sys_path, 'dev'))

    @classmethod
    def lookup(cls, context, key):
        """
        Lookup by the device number.

        :param Context context: the context
        :param int key: the device number
        :returns: a list of matching devices
        :rtype: frozenset of :class:`Device`
        """
        func = wrap_exception(Devices.from_device_number)
        res = (func(context, s, key) for s in cls.find_subsystems(context))
        return frozenset(r for r in res if r is not None)


class DevicePathHypothesis(Hypothesis):
    """
    Discover the device assuming the identifier is a device path.
    """

    @classmethod
    def match(cls, value):
        """
        Match ``value`` under the assumption that it is a device path.

        :returns: the device path or None
        :rtype: str or NoneType
        """
        return value

    @classmethod
    def lookup(cls, context, key):
        """
        Lookup by the path.

        :param Context context: the context
        :param str key: the device path
        :returns: a list of matching devices
        :rtype: frozenset of :class:`Device`
        """
        res = wrap_exception(Devices.from_path)(context, key)
        return frozenset((res,)) if res is not None else frozenset()


class DeviceNameHypothesis(Hypothesis):
    """
    Discover the device assuming the input is a device name.

    Try every available subsystem.
    """

    @classmethod
    def find_subsystems(cls, context):
        """
        Find all subsystems in sysfs.

        :param Context context: the context
        :rtype: frozenset
        :returns: subsystems in sysfs
        """
        sys_path = context.sys_path
        dirnames = ('bus', 'class', 'subsystem')
        absnames = (os.path.join(sys_path, name) for name in dirnames)
        realnames = (d for d in absnames if os.path.isdir(d))
        return frozenset(n for d in realnames for n in os.listdir(d))

    @classmethod
    def match(cls, value):
        """
        Match ``value`` under the assumption that it is a device name.

        :returns: the device path or None
        :rtype: str or NoneType
        """
        return value

    @classmethod
    def lookup(cls, context, key):
        """
        Lookup by the path.

        :param Context context: the context
        :param str key: the device path
        :returns: a list of matching devices
        :rtype: frozenset of :class:`Device`
        """
        func = wrap_exception(Devices.from_name)
        res = (func(context, s, key) for s in cls.find_subsystems(context))
        return frozenset(r for r in res if r is not None)


class DeviceFileHypothesis(Hypothesis):
    """
    Discover the device assuming the value is some portion of a device file.

    The device file may be a link to a device node.
    """

    _LINK_DIRS = [
       '/dev',
       '/dev/disk/by-id',
       '/dev/disk/by-label',
       '/dev/disk/by-partlabel',
       '/dev/disk/by-partuuid',
       '/dev/disk/by-path',
       '/dev/disk/by-uuid',
       '/dev/input/by-path',
       '/dev/mapper',
       '/dev/md',
       '/dev/vg'
    ]

    @classmethod
    def get_link_dirs(cls, context):
        """
        Get all directories that may contain links to device nodes.

        This method checks the device links of every device, so it is very
        expensive.

        :param Context context: the context
        :returns: a sorted list of directories that contain device links
        :rtype: list
        """
        devices = context.list_devices()
        devices_with_links = (d for d in devices if list(d.device_links))
        links = (l for d in devices_with_links for l in d.device_links)
        return sorted(set(os.path.dirname(l) for l in links))

    @classmethod
    def setup(cls, context):
        """
        Set the link directories to be used when discovering by file.

        Uses `get_link_dirs`, so is as expensive as it is.

        :param Context context: the context
        """
        cls._LINK_DIRS = cls.get_link_dirs(context)

    @classmethod
    def match(cls, value):
        return value

    @classmethod
    def lookup(cls, context, key):
        """
        Lookup the device under the assumption that the key is part of
        the name of a device file.

        :param Context context: the context
        :param str key: a portion of the device file name

        It is assumed that either it is the whole name of the device file
        or it is the basename.

        A device file may be a device node or a device link.
        """
        func = wrap_exception(Devices.from_device_file)
        if '/' in key:
            device = func(context, key)
            return frozenset((device,)) if device is not None else frozenset()
        else:
            files = (os.path.join(ld, key) for ld in cls._LINK_DIRS)
            devices = (func(context, f) for f in files)
            return frozenset(d for d in devices if d is not None)


class Discovery(object):
    # pylint: disable=too-few-public-methods
    """
    Provides discovery methods for devices.
    """

    _HYPOTHESES = [
       DeviceFileHypothesis,
       DeviceNameHypothesis,
       DeviceNumberHypothesis,
       DevicePathHypothesis
    ]

    def __init__(self):
        self._hypotheses = self._HYPOTHESES

    def setup(self, context):
        """
        Set up individual hypotheses.

        May be an expensive call.

        :param Context context: the context
        """
        for hyp in self._hypotheses:
            hyp.setup(context)

    def get_devices(self, context, value):
        """
        Get the devices corresponding to value.

        :param Context context: the context
        :param str value: some identifier of the device
        :returns: a list of corresponding devices
        :rtype: frozenset of :class:`Device`
        """
        return frozenset(
           d for h in self._hypotheses for d in h.get_devices(context, value)
        )