Source code for dgl.graphbolt.impl.gpu_cached_feature

"""GPU cached feature for GraphBolt."""
from typing import Dict, Union

import torch

from ..feature_store import (
    bytes_to_number_of_items,
    Feature,
    FeatureKey,
    wrap_with_cached_feature,
)

from .gpu_feature_cache import GPUFeatureCache

__all__ = ["GPUCachedFeature", "gpu_cached_feature"]


[docs] class GPUCachedFeature(Feature): r"""GPU cached feature wrapping a fallback feature. It uses the least recently used (LRU) algorithm as the cache eviction policy. Use `gpu_cached_feature` to construct an instance of this class. Places the GPU cache to torch.cuda.current_device(). Parameters ---------- fallback_feature : Feature The fallback feature. cache : GPUFeatureCache A GPUFeatureCache instance to serve as the cache backend. offset : int, optional The offset value to add to the given ids before using the cache. This parameter is useful if multiple `GPUCachedFeature`s are sharing a single GPUFeatureCache object. Examples -------- >>> import torch >>> from dgl import graphbolt as gb >>> torch_feat = torch.arange(10).reshape(2, -1).to("cuda") >>> cache_size = 5 >>> fallback_feature = gb.TorchBasedFeature(torch_feat) >>> feature = gb.gpu_cached_feature(fallback_feature, cache_size) >>> feature.read() tensor([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]], device='cuda:0') >>> feature.read(torch.tensor([0]).to("cuda")) tensor([[0, 1, 2, 3, 4]], device='cuda:0') >>> feature.update(torch.tensor([[1 for _ in range(5)]]).to("cuda"), ... torch.tensor([1]).to("cuda")) >>> feature.read(torch.tensor([0, 1]).to("cuda")) tensor([[0, 1, 2, 3, 4], [1, 1, 1, 1, 1]], device='cuda:0') >>> feature.size() torch.Size([5]) """ _cache_type = GPUFeatureCache def __init__( self, fallback_feature: Feature, cache: GPUFeatureCache, offset: int = 0, ): super(GPUCachedFeature, self).__init__() assert isinstance(fallback_feature, Feature), ( f"The fallback_feature must be an instance of Feature, but got " f"{type(fallback_feature)}." ) self._fallback_feature = fallback_feature self._feature = cache self._offset = offset
[docs] def read(self, ids: torch.Tensor = None): """Read the feature by index. The returned tensor is always in GPU memory, no matter whether the fallback feature is in memory or on disk. Parameters ---------- ids : torch.Tensor, optional The index of the feature. If specified, only the specified indices of the feature are read. If None, the entire feature is returned. Returns ------- torch.Tensor The read feature. """ if ids is None: return self._fallback_feature.read() values, missing_index, missing_keys = self._feature.query( ids if self._offset == 0 else ids + self._offset ) missing_values = self._fallback_feature.read( missing_keys if self._offset == 0 else missing_keys - self._offset ) values[missing_index] = missing_values self._feature.replace(missing_keys, missing_values) return values
[docs] def read_async(self, ids: torch.Tensor): r"""Read the feature by index asynchronously. Parameters ---------- ids : torch.Tensor The index of the feature. Only the specified indices of the feature are read. Returns ------- A generator object. The returned generator object returns a future on ``read_async_num_stages(ids.device)``\ th invocation. The return result can be accessed by calling ``.wait()``. on the returned future object. It is undefined behavior to call ``.wait()`` more than once. Examples -------- >>> import dgl.graphbolt as gb >>> feature = gb.Feature(...) >>> ids = torch.tensor([0, 2]) >>> for stage, future in enumerate(feature.read_async(ids)): ... pass >>> assert stage + 1 == feature.read_async_num_stages(ids.device) >>> result = future.wait() # result contains the read values. """ future = self._feature.query( ids if self._offset == 0 else ids + self._offset, async_op=True ) yield values, missing_index, missing_keys = future.wait() fallback_reader = self._fallback_feature.read_async( missing_keys if self._offset == 0 else missing_keys - self._offset ) fallback_num_stages = self._fallback_feature.read_async_num_stages( missing_keys.device ) for i in range(fallback_num_stages): missing_values_future = next(fallback_reader, None) if i < fallback_num_stages - 1: yield # fallback feature stages. class _Waiter: def __init__( self, feature, values, missing_index, missing_keys, missing_values_future, ): self.feature = feature self.values = values self.missing_index = missing_index self.missing_keys = missing_keys self.missing_values_future = missing_values_future def wait(self): """Returns the stored value when invoked.""" missing_values = self.missing_values_future.wait() self.feature.replace(self.missing_keys, missing_values) self.values[self.missing_index] = missing_values values = self.values # Ensure there is no memory leak. self.feature = self.values = self.missing_index = None self.missing_keys = self.missing_values_future = None return values yield _Waiter( self._feature, values, missing_index, missing_keys, missing_values_future, )
[docs] def read_async_num_stages(self, ids_device: torch.device): """The number of stages of the read_async operation. See read_async function for directions on its use. This function is required to return the number of yield operations when read_async is used with a tensor residing on ids_device. Parameters ---------- ids_device : torch.device The device of the ids parameter passed into read_async. Returns ------- int The number of stages of the read_async operation. """ assert ids_device.type == "cuda" return 1 + self._fallback_feature.read_async_num_stages(ids_device)
[docs] def size(self): """Get the size of the feature. Returns ------- torch.Size The size of the feature. """ return self._fallback_feature.size()
[docs] def count(self): """Get the count of the feature. Returns ------- int The count of the feature. """ return self._fallback_feature.count()
[docs] def update(self, value: torch.Tensor, ids: torch.Tensor = None): """Update the feature. Parameters ---------- value : torch.Tensor The updated value of the feature. ids : torch.Tensor, optional The indices of the feature to update. If specified, only the specified indices of the feature will be updated. For the feature, the `ids[i]` row is updated to `value[i]`. So the indices and value must have the same length. If None, the entire feature will be updated. """ if ids is None: feat0 = value[:1] self._fallback_feature.update(value) cache_size = min( bytes_to_number_of_items(self.cache_size_in_bytes, feat0), value.shape[0], ) self._feature = None # Destroy the existing cache first. self._feature = self._cache_type( (cache_size,) + feat0.shape[1:], feat0.dtype ) else: self._fallback_feature.update(value, ids) self._feature.replace(ids, value)
@property def cache_size_in_bytes(self): """Return the size taken by the cache in bytes.""" return self._feature.max_size_in_bytes @property def miss_rate(self): """Returns the cache miss rate since creation.""" return self._feature.miss_rate
[docs] def gpu_cached_feature( fallback_features: Union[Feature, Dict[FeatureKey, Feature]], max_cache_size_in_bytes: int, ) -> Union[GPUCachedFeature, Dict[FeatureKey, GPUCachedFeature]]: r"""GPU cached feature wrapping a fallback feature. It uses the least recently used (LRU) algorithm as the cache eviction policy. Places the GPU cache to torch.cuda.current_device(). Parameters ---------- fallback_features : Union[Feature, Dict[FeatureKey, Feature]] The fallback feature(s). max_cache_size_in_bytes : int The capacity of the GPU cache in bytes. Returns ------- Union[GPUCachedFeature, Dict[FeatureKey, GPUCachedFeature]] The feature(s) wrapped with GPUCachedFeature. """ return wrap_with_cached_feature( GPUCachedFeature, fallback_features, max_cache_size_in_bytes )