DiskBasedFeature

class dgl.graphbolt.DiskBasedFeature(path: str, metadata: Dict | None = None, num_threads=None)[source]

Bases: Feature

A wrapper of disk based feature.

Initialize a disk based feature fetcher by a numpy file. Note that you can use gb.numpy_save_aligned as a replacement for np.save to potentially get increased performance.

Parameters:
  • path (string) – The path to the numpy feature file. Note that the dimension of the numpy should be greater than 1.

  • metadata (Dict) – The metadata of the feature.

  • num_threads (int) – The number of threads driving io_uring queues.

Examples

>>> import torch
>>> from dgl import graphbolt as gb
>>> torch_feat = torch.arange(10).reshape(2, -1)
>>> pth = "path/to/feat.npy"
>>> np.save(pth, torch_feat)
>>> feature = gb.DiskBasedFeature(pth)
>>> feature.read(torch.tensor([0]))
tensor([[0, 1, 2, 3, 4]])
>>> feature.size()
torch.Size([5])
count()[source]

Get the count of the feature.

Returns:

The count of the feature.

Return type:

int

metadata()[source]

Get the metadata of the feature. :returns: The metadata of the feature. :rtype: Dict

pin_memory_()[source]

Placeholder DiskBasedFeature pin_memory_ implementation. It is a no-op.

read(ids: Tensor | None = None)[source]

Read the feature by index. The returned tensor will be on CPU. :param ids: The index of the feature. Only the specified indices of the

feature are read.

Returns:

The read feature.

Return type:

torch.Tensor

read_async(ids: Tensor)[source]

Read the feature by index asynchronously.

Parameters:

ids (torch.Tensor) – The index of the feature. Only the specified indices of the feature are read.

Returns:

The returned generator object returns a future on read_async_num_stages(ids.device)th invocation. The return result can be accessed by calling .wait(). on the returned future object. It is undefined behavior to call .wait() more than once.

Return type:

A generator object.

Examples

>>> import dgl.graphbolt as gb
>>> feature = gb.Feature(...)
>>> ids = torch.tensor([0, 2])
>>> for stage, future in enumerate(feature.read_async(ids)):
...     pass
>>> assert stage + 1 == feature.read_async_num_stages(ids.device)
>>> result = future.wait()  # result contains the read values.
read_async_num_stages(ids_device: device)[source]

The number of stages of the read_async operation. See read_async function for directions on its use. This function is required to return the number of yield operations when read_async is used with a tensor residing on ids_device.

Parameters:

ids_device (torch.device) – The device of the ids parameter passed into read_async.

Returns:

The number of stages of the read_async operation.

Return type:

int

read_into_memory() β†’ TorchBasedFeature[source]

Change disk-based feature to torch-based feature.

size()[source]

Get the size of the feature. :returns: The size of the feature. :rtype: torch.Size

to(_)[source]

Placeholder DiskBasedFeature to implementation. It is a no-op.

update(value: Tensor, ids: Tensor | None = None)[source]

Disk based feature does not support update for now.