Link Prediction๏
This tutorial will show how to train a multi-layer GraphSAGE for link prediction on CoraGraphDataset. The dataset contains 2708 nodes and 10556 edges.
By the end of this tutorial, you will be able to
Train a GNN model for link prediction on target device with DGLโs neighbor sampling components.
Install DGL package๏
[1]:
# Install required packages.
import os
import torch
import numpy as np
os.environ['TORCH'] = torch.__version__
os.environ['DGLBACKEND'] = "pytorch"
# Install the CPU version. If you want to install CUDA version, please
# refer to https://www.dgl.ai/pages/start.html.
device = torch.device("cpu")
!pip install --pre dgl -f https://data.dgl.ai/wheels-test/repo.html
try:
import dgl
import dgl.graphbolt as gb
installed = True
except ImportError as error:
installed = False
print(error)
print("DGL installed!" if installed else "DGL not found!")
Looking in links: https://data.dgl.ai/wheels-test/repo.html
Requirement already satisfied: dgl in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (2.2a240410)
Requirement already satisfied: numpy>=1.14.0 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from dgl) (1.26.4)
Requirement already satisfied: scipy>=1.1.0 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from dgl) (1.14.0)
Requirement already satisfied: networkx>=2.1 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from dgl) (3.3)
Requirement already satisfied: requests>=2.19.0 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from dgl) (2.32.3)
Requirement already satisfied: tqdm in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from dgl) (4.66.5)
Requirement already satisfied: psutil>=5.8.0 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from dgl) (6.0.0)
Requirement already satisfied: torchdata>=0.5.0 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from dgl) (0.7.1)
Requirement already satisfied: pandas in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from dgl) (2.2.2)
Requirement already satisfied: charset-normalizer<4,>=2 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from requests>=2.19.0->dgl) (3.3.2)
Requirement already satisfied: idna<4,>=2.5 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from requests>=2.19.0->dgl) (3.8)
Requirement already satisfied: urllib3<3,>=1.21.1 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from requests>=2.19.0->dgl) (2.2.2)
Requirement already satisfied: certifi>=2017.4.17 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from requests>=2.19.0->dgl) (2024.7.4)
Requirement already satisfied: torch>=2 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from torchdata>=0.5.0->dgl) (2.4.0+cpu)
Requirement already satisfied: python-dateutil>=2.8.2 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from pandas->dgl) (2.9.0.post0)
Requirement already satisfied: pytz>=2020.1 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from pandas->dgl) (2024.1)
Requirement already satisfied: tzdata>=2022.7 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from pandas->dgl) (2024.1)
Requirement already satisfied: six>=1.5 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from python-dateutil>=2.8.2->pandas->dgl) (1.16.0)
Requirement already satisfied: filelock in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from torch>=2->torchdata>=0.5.0->dgl) (3.15.4)
Requirement already satisfied: typing-extensions>=4.8.0 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from torch>=2->torchdata>=0.5.0->dgl) (4.12.2)
Requirement already satisfied: sympy in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from torch>=2->torchdata>=0.5.0->dgl) (1.13.2)
Requirement already satisfied: jinja2 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from torch>=2->torchdata>=0.5.0->dgl) (3.1.4)
Requirement already satisfied: fsspec in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from torch>=2->torchdata>=0.5.0->dgl) (2024.6.1)
Requirement already satisfied: MarkupSafe>=2.0 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from jinja2->torch>=2->torchdata>=0.5.0->dgl) (2.1.5)
Requirement already satisfied: mpmath<1.4,>=1.1.0 in /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages (from sympy->torch>=2->torchdata>=0.5.0->dgl) (1.3.0)
DGL installed!
Loading Dataset๏
cora
is already prepared as BuiltinDataset
in GraphBolt.
[2]:
dataset = gb.BuiltinDataset("cora").load()
Downloading datasets/cora.zip from https://data.dgl.ai/dataset/graphbolt/cora.zip...
datasets/cora.zip: 100%|โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ| 240k/240k [00:00<00:00, 9.97MB/s]
Extracting file to datasets
Start to preprocess the on-disk dataset.
Finish preprocessing the on-disk dataset.
/home/ubuntu/regression_test/dgl/python/dgl/graphbolt/impl/ondisk_dataset.py:467: FutureWarning: You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature.
return torch.load(graph_topology.path)
Dataset consists of graph, feature and tasks. You can get the training-validation-test set from the tasks. Seed nodes and corresponding labels are already stored in each training-validation-test set. This dataset contains 2 tasks, one for node classification and the other for link prediction. We will use the link prediction task.
[3]:
graph = dataset.graph
feature = dataset.feature
train_set = dataset.tasks[1].train_set
test_set = dataset.tasks[1].test_set
task_name = dataset.tasks[1].metadata["name"]
print(f"Task: {task_name}.")
Task: link_prediction.
Defining Neighbor Sampler and Data Loader in DGL๏
Different from the link prediction tutorial for full graph, a common practice to train GNN on large graphs is to iterate over the edges in minibatches, since computing the probability of all edges is usually impossible. For each minibatch of edges, you compute the output representation of their incident nodes using neighbor sampling and GNN, in a similar fashion introduced in the node classification tutorial.
To perform link prediction, you need to specify a negative sampler. DGL provides builtin negative samplers such as dgl.graphbolt.UniformNegativeSampler
. Here this tutorial uniformly draws 5 negative examples per positive example.
Except for the negative sampler, the rest of the code is identical to the node classification tutorial.
[4]:
from functools import partial
datapipe = gb.ItemSampler(train_set, batch_size=256, shuffle=True)
datapipe = datapipe.sample_uniform_negative(graph, 5)
datapipe = datapipe.sample_neighbor(graph, [5, 5, 5])
datapipe = datapipe.transform(partial(gb.exclude_seed_edges, include_reverse_edges=True))
datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"])
datapipe = datapipe.copy_to(device)
train_dataloader = gb.DataLoader(datapipe)
You can peek one minibatch from train_dataloader and see what it will give you.
[5]:
data = next(iter(train_dataloader))
print(f"MiniBatch: {data}")
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
Cell In[5], line 1
----> 1 data = next(iter(train_dataloader))
2 print(f"MiniBatch: {data}")
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/dataloader.py:630, in _BaseDataLoaderIter.__next__(self)
627 if self._sampler_iter is None:
628 # TODO(https://github.com/pytorch/pytorch/issues/76750)
629 self._reset() # type: ignore[call-arg]
--> 630 data = self._next_data()
631 self._num_yielded += 1
632 if self._dataset_kind == _DatasetKind.Iterable and \
633 self._IterableDataset_len_called is not None and \
634 self._num_yielded > self._IterableDataset_len_called:
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/dataloader.py:673, in _SingleProcessDataLoaderIter._next_data(self)
671 def _next_data(self):
672 index = self._next_index() # may raise StopIteration
--> 673 data = self._dataset_fetcher.fetch(index) # may raise StopIteration
674 if self._pin_memory:
675 data = _utils.pin_memory.pin_memory(data, self._pin_memory_device)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/_utils/fetch.py:42, in _IterableDatasetFetcher.fetch(self, possibly_batched_index)
40 raise StopIteration
41 else:
---> 42 data = next(self.dataset_iter)
43 return self.collate_fn(data)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:151, in hook_iterator.<locals>.IteratorDecorator.__next__(self)
149 return self._get_next()
150 else: # Decided against using `contextlib.nullcontext` for performance reasons
--> 151 return self._get_next()
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:139, in hook_iterator.<locals>.IteratorDecorator._get_next(self)
137 """Return next with logic related to iterator validity, profiler, and incrementation of samples yielded."""
138 _check_iterator_valid(self.datapipe, self.iterator_id)
--> 139 result = next(self.iterator)
140 if not self.self_and_has_next_method:
141 self.datapipe._number_of_samples_yielded += 1
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:223, in hook_iterator.<locals>.wrap_next(*args, **kwargs)
221 result = next_func(*args, **kwargs)
222 else:
--> 223 result = next_func(*args, **kwargs)
224 datapipe._number_of_samples_yielded += 1
225 return result
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/datapipe.py:383, in _IterDataPipeSerializationWrapper.__next__(self)
381 def __next__(self) -> T_co: # type: ignore[type-var]
382 assert self._datapipe_iter is not None
--> 383 return next(self._datapipe_iter)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:180, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
178 response = gen.send(None)
179 else:
--> 180 response = gen.send(None)
182 while True:
183 datapipe._number_of_samples_yielded += 1
File ~/regression_test/dgl/python/dgl/graphbolt/base.py:162, in CopyTo.__iter__(self)
161 def __iter__(self):
--> 162 for data in self.datapipe:
163 data = recursive_apply(data, apply_to, self.device)
164 if self.extra_attrs is not None:
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:180, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
178 response = gen.send(None)
179 else:
--> 180 response = gen.send(None)
182 while True:
183 datapipe._number_of_samples_yielded += 1
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py:103, in PrefetcherIterDataPipe.__iter__(self)
101 if isinstance(data, (StopIteration, communication.iter.TerminateRequired)):
102 break
--> 103 raise data
104 yield data
105 else:
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py:73, in PrefetcherIterDataPipe.thread_worker(prefetch_data)
71 if len(prefetch_data.prefetch_buffer) < prefetch_data.buffer_size:
72 try:
---> 73 item = next(itr)
74 prefetch_data.prefetch_buffer.append(item)
75 except Exception as e:
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:180, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
178 response = gen.send(None)
179 else:
--> 180 response = gen.send(None)
182 while True:
183 datapipe._number_of_samples_yielded += 1
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/callable.py:125, in MapperIterDataPipe.__iter__(self)
124 def __iter__(self) -> Iterator[T_co]:
--> 125 for data in self.datapipe:
126 yield self._apply_fn(data)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:180, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
178 response = gen.send(None)
179 else:
--> 180 response = gen.send(None)
182 while True:
183 datapipe._number_of_samples_yielded += 1
File ~/regression_test/dgl/python/dgl/graphbolt/dataloader.py:64, in MultiprocessingWrapper.__iter__(self)
63 def __iter__(self):
---> 64 yield from self.dataloader
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/dataloader.py:630, in _BaseDataLoaderIter.__next__(self)
627 if self._sampler_iter is None:
628 # TODO(https://github.com/pytorch/pytorch/issues/76750)
629 self._reset() # type: ignore[call-arg]
--> 630 data = self._next_data()
631 self._num_yielded += 1
632 if self._dataset_kind == _DatasetKind.Iterable and \
633 self._IterableDataset_len_called is not None and \
634 self._num_yielded > self._IterableDataset_len_called:
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/dataloader.py:673, in _SingleProcessDataLoaderIter._next_data(self)
671 def _next_data(self):
672 index = self._next_index() # may raise StopIteration
--> 673 data = self._dataset_fetcher.fetch(index) # may raise StopIteration
674 if self._pin_memory:
675 data = _utils.pin_memory.pin_memory(data, self._pin_memory_device)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/_utils/fetch.py:42, in _IterableDatasetFetcher.fetch(self, possibly_batched_index)
40 raise StopIteration
41 else:
---> 42 data = next(self.dataset_iter)
43 return self.collate_fn(data)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:151, in hook_iterator.<locals>.IteratorDecorator.__next__(self)
149 return self._get_next()
150 else: # Decided against using `contextlib.nullcontext` for performance reasons
--> 151 return self._get_next()
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:139, in hook_iterator.<locals>.IteratorDecorator._get_next(self)
137 """Return next with logic related to iterator validity, profiler, and incrementation of samples yielded."""
138 _check_iterator_valid(self.datapipe, self.iterator_id)
--> 139 result = next(self.iterator)
140 if not self.self_and_has_next_method:
141 self.datapipe._number_of_samples_yielded += 1
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:223, in hook_iterator.<locals>.wrap_next(*args, **kwargs)
221 result = next_func(*args, **kwargs)
222 else:
--> 223 result = next_func(*args, **kwargs)
224 datapipe._number_of_samples_yielded += 1
225 return result
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/datapipe.py:383, in _IterDataPipeSerializationWrapper.__next__(self)
381 def __next__(self) -> T_co: # type: ignore[type-var]
382 assert self._datapipe_iter is not None
--> 383 return next(self._datapipe_iter)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:180, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
178 response = gen.send(None)
179 else:
--> 180 response = gen.send(None)
182 while True:
183 datapipe._number_of_samples_yielded += 1
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/callable.py:125, in MapperIterDataPipe.__iter__(self)
124 def __iter__(self) -> Iterator[T_co]:
--> 125 for data in self.datapipe:
126 yield self._apply_fn(data)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:180, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
178 response = gen.send(None)
179 else:
--> 180 response = gen.send(None)
182 while True:
183 datapipe._number_of_samples_yielded += 1
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/callable.py:126, in MapperIterDataPipe.__iter__(self)
124 def __iter__(self) -> Iterator[T_co]:
125 for data in self.datapipe:
--> 126 yield self._apply_fn(data)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/callable.py:91, in MapperIterDataPipe._apply_fn(self, data)
89 def _apply_fn(self, data):
90 if self.input_col is None and self.output_col is None:
---> 91 return self.fn(data)
93 if self.input_col is None:
94 res = self.fn(data)
File ~/regression_test/dgl/python/dgl/graphbolt/minibatch_transformer.py:38, in MiniBatchTransformer._transformer(self, minibatch)
37 def _transformer(self, minibatch):
---> 38 minibatch = self.transformer(minibatch)
39 assert isinstance(
40 minibatch, (MiniBatch,)
41 ), "The transformer output should be an instance of MiniBatch"
42 return minibatch
File ~/regression_test/dgl/python/dgl/graphbolt/subgraph_sampler.py:46, in SubgraphSampler._sample(self, minibatch)
39 def _sample(self, minibatch):
40 if minibatch.node_pairs is not None:
41 (
42 seeds,
43 minibatch.compacted_node_pairs,
44 minibatch.compacted_negative_srcs,
45 minibatch.compacted_negative_dsts,
---> 46 ) = self._node_pairs_preprocess(minibatch)
47 elif minibatch.seed_nodes is not None:
48 seeds = minibatch.seed_nodes
File ~/regression_test/dgl/python/dgl/graphbolt/subgraph_sampler.py:122, in SubgraphSampler._node_pairs_preprocess(self, minibatch)
120 nodes.append(neg_dst.view(-1))
121 # Unique and compact the collected nodes.
--> 122 seeds, compacted = unique_and_compact(nodes)
123 # Map back in same order as collect.
124 compacted_node_pairs = tuple(compacted[:2])
File ~/regression_test/dgl/python/dgl/graphbolt/internal/sample_utils.py:61, in unique_and_compact(nodes)
59 return unique, compacted
60 else:
---> 61 return unique_and_compact_per_type(nodes)
File ~/regression_test/dgl/python/dgl/graphbolt/internal/sample_utils.py:47, in unique_and_compact.<locals>.unique_and_compact_per_type(nodes)
45 nodes = torch.cat(nodes)
46 empty_tensor = nodes.new_empty(0)
---> 47 unique, compacted, _ = torch.ops.graphbolt.unique_and_compact(
48 nodes, empty_tensor, empty_tensor
49 )
50 compacted = compacted.split(nums)
51 return unique, list(compacted)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/_ops.py:1061, in OpOverloadPacket.__call__(self_, *args, **kwargs)
1059 if self_._has_torchbind_op_overload and _must_dispatch_in_python(args, kwargs):
1060 return _call_overload_packet_from_python(self_, args, kwargs)
-> 1061 return self_._op(*args, **(kwargs or {}))
RuntimeError: graphbolt::unique_and_compact() is missing value for argument '_3'. Declaration: graphbolt::unique_and_compact(Tensor _0, Tensor _1, Tensor _2, int _3, int _4) -> (Tensor _0, Tensor _1, Tensor _2, Tensor _3)
This exception is thrown by __iter__ of NeighborSampler(datapipe=UniformNegativeSampler, deduplicate=True, fanouts=[tensor([5]), tensor([5]), tensor([5])], graph=FusedCSCSamplingGraph(csc_indptr=tensor([ 0, 3, 6, ..., 10548, 10552, 10556]),
indices=tensor([ 633, 1862, 2582, ..., 598, 1473, 2706]),
num_nodes=2708, num_edges=10556, node_attributes={}, edge_attributes={}), prob_name=None, replace=False)
Defining Model for Node Representation๏
Letโs consider training a 2-layer GraphSAGE with neighbor sampling. The model can be written as follows:
[6]:
import dgl.nn as dglnn
import torch.nn as nn
import torch.nn.functional as F
class SAGE(nn.Module):
def __init__(self, in_size, hidden_size):
super().__init__()
self.layers = nn.ModuleList()
self.layers.append(dglnn.SAGEConv(in_size, hidden_size, "mean"))
self.layers.append(dglnn.SAGEConv(hidden_size, hidden_size, "mean"))
self.hidden_size = hidden_size
self.predictor = nn.Sequential(
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, 1),
)
def forward(self, blocks, x):
hidden_x = x
for layer_idx, (layer, block) in enumerate(zip(self.layers, blocks)):
hidden_x = layer(block, hidden_x)
is_last_layer = layer_idx == len(self.layers) - 1
if not is_last_layer:
hidden_x = F.relu(hidden_x)
return hidden_x
Defining Traing Loop๏
The following initializes the model and defines the optimizer.
[7]:
in_size = feature.size("node", None, "feat")[0]
model = SAGE(in_size, 128).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
The following is the training loop for link prediction and evaluation.
[8]:
import tqdm
for epoch in range(3):
model.train()
total_loss = 0
for step, data in tqdm.tqdm(enumerate(train_dataloader)):
# Get node pairs with labels for loss calculation.
compacted_pairs, labels = data.node_pairs_with_labels
node_feature = data.node_features["feat"]
# Convert sampled subgraphs to DGL blocks.
blocks = data.blocks
# Get the embeddings of the input nodes.
y = model(blocks, node_feature)
logits = model.predictor(
y[compacted_pairs[0]] * y[compacted_pairs[1]]
).squeeze()
# Compute loss.
loss = F.binary_cross_entropy_with_logits(logits, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch:03d} | Loss {total_loss / (step + 1):.3f}")
0it [00:00, ?it/s]
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
Cell In[8], line 5
3 model.train()
4 total_loss = 0
----> 5 for step, data in tqdm.tqdm(enumerate(train_dataloader)):
6 # Get node pairs with labels for loss calculation.
7 compacted_pairs, labels = data.node_pairs_with_labels
8 node_feature = data.node_features["feat"]
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/tqdm/std.py:1181, in tqdm.__iter__(self)
1178 time = self._time
1180 try:
-> 1181 for obj in iterable:
1182 yield obj
1183 # Update and possibly print the progressbar.
1184 # Note: does not call self.update(1) for speed optimisation.
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/dataloader.py:630, in _BaseDataLoaderIter.__next__(self)
627 if self._sampler_iter is None:
628 # TODO(https://github.com/pytorch/pytorch/issues/76750)
629 self._reset() # type: ignore[call-arg]
--> 630 data = self._next_data()
631 self._num_yielded += 1
632 if self._dataset_kind == _DatasetKind.Iterable and \
633 self._IterableDataset_len_called is not None and \
634 self._num_yielded > self._IterableDataset_len_called:
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/dataloader.py:673, in _SingleProcessDataLoaderIter._next_data(self)
671 def _next_data(self):
672 index = self._next_index() # may raise StopIteration
--> 673 data = self._dataset_fetcher.fetch(index) # may raise StopIteration
674 if self._pin_memory:
675 data = _utils.pin_memory.pin_memory(data, self._pin_memory_device)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/_utils/fetch.py:42, in _IterableDatasetFetcher.fetch(self, possibly_batched_index)
40 raise StopIteration
41 else:
---> 42 data = next(self.dataset_iter)
43 return self.collate_fn(data)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:151, in hook_iterator.<locals>.IteratorDecorator.__next__(self)
149 return self._get_next()
150 else: # Decided against using `contextlib.nullcontext` for performance reasons
--> 151 return self._get_next()
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:139, in hook_iterator.<locals>.IteratorDecorator._get_next(self)
137 """Return next with logic related to iterator validity, profiler, and incrementation of samples yielded."""
138 _check_iterator_valid(self.datapipe, self.iterator_id)
--> 139 result = next(self.iterator)
140 if not self.self_and_has_next_method:
141 self.datapipe._number_of_samples_yielded += 1
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:223, in hook_iterator.<locals>.wrap_next(*args, **kwargs)
221 result = next_func(*args, **kwargs)
222 else:
--> 223 result = next_func(*args, **kwargs)
224 datapipe._number_of_samples_yielded += 1
225 return result
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/datapipe.py:383, in _IterDataPipeSerializationWrapper.__next__(self)
381 def __next__(self) -> T_co: # type: ignore[type-var]
382 assert self._datapipe_iter is not None
--> 383 return next(self._datapipe_iter)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:180, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
178 response = gen.send(None)
179 else:
--> 180 response = gen.send(None)
182 while True:
183 datapipe._number_of_samples_yielded += 1
File ~/regression_test/dgl/python/dgl/graphbolt/base.py:162, in CopyTo.__iter__(self)
161 def __iter__(self):
--> 162 for data in self.datapipe:
163 data = recursive_apply(data, apply_to, self.device)
164 if self.extra_attrs is not None:
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:180, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
178 response = gen.send(None)
179 else:
--> 180 response = gen.send(None)
182 while True:
183 datapipe._number_of_samples_yielded += 1
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py:103, in PrefetcherIterDataPipe.__iter__(self)
101 if isinstance(data, (StopIteration, communication.iter.TerminateRequired)):
102 break
--> 103 raise data
104 yield data
105 else:
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py:73, in PrefetcherIterDataPipe.thread_worker(prefetch_data)
71 if len(prefetch_data.prefetch_buffer) < prefetch_data.buffer_size:
72 try:
---> 73 item = next(itr)
74 prefetch_data.prefetch_buffer.append(item)
75 except Exception as e:
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:180, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
178 response = gen.send(None)
179 else:
--> 180 response = gen.send(None)
182 while True:
183 datapipe._number_of_samples_yielded += 1
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/callable.py:125, in MapperIterDataPipe.__iter__(self)
124 def __iter__(self) -> Iterator[T_co]:
--> 125 for data in self.datapipe:
126 yield self._apply_fn(data)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:180, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
178 response = gen.send(None)
179 else:
--> 180 response = gen.send(None)
182 while True:
183 datapipe._number_of_samples_yielded += 1
File ~/regression_test/dgl/python/dgl/graphbolt/dataloader.py:64, in MultiprocessingWrapper.__iter__(self)
63 def __iter__(self):
---> 64 yield from self.dataloader
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/dataloader.py:630, in _BaseDataLoaderIter.__next__(self)
627 if self._sampler_iter is None:
628 # TODO(https://github.com/pytorch/pytorch/issues/76750)
629 self._reset() # type: ignore[call-arg]
--> 630 data = self._next_data()
631 self._num_yielded += 1
632 if self._dataset_kind == _DatasetKind.Iterable and \
633 self._IterableDataset_len_called is not None and \
634 self._num_yielded > self._IterableDataset_len_called:
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/dataloader.py:673, in _SingleProcessDataLoaderIter._next_data(self)
671 def _next_data(self):
672 index = self._next_index() # may raise StopIteration
--> 673 data = self._dataset_fetcher.fetch(index) # may raise StopIteration
674 if self._pin_memory:
675 data = _utils.pin_memory.pin_memory(data, self._pin_memory_device)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/_utils/fetch.py:42, in _IterableDatasetFetcher.fetch(self, possibly_batched_index)
40 raise StopIteration
41 else:
---> 42 data = next(self.dataset_iter)
43 return self.collate_fn(data)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:151, in hook_iterator.<locals>.IteratorDecorator.__next__(self)
149 return self._get_next()
150 else: # Decided against using `contextlib.nullcontext` for performance reasons
--> 151 return self._get_next()
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:139, in hook_iterator.<locals>.IteratorDecorator._get_next(self)
137 """Return next with logic related to iterator validity, profiler, and incrementation of samples yielded."""
138 _check_iterator_valid(self.datapipe, self.iterator_id)
--> 139 result = next(self.iterator)
140 if not self.self_and_has_next_method:
141 self.datapipe._number_of_samples_yielded += 1
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:223, in hook_iterator.<locals>.wrap_next(*args, **kwargs)
221 result = next_func(*args, **kwargs)
222 else:
--> 223 result = next_func(*args, **kwargs)
224 datapipe._number_of_samples_yielded += 1
225 return result
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/datapipe.py:383, in _IterDataPipeSerializationWrapper.__next__(self)
381 def __next__(self) -> T_co: # type: ignore[type-var]
382 assert self._datapipe_iter is not None
--> 383 return next(self._datapipe_iter)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:180, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
178 response = gen.send(None)
179 else:
--> 180 response = gen.send(None)
182 while True:
183 datapipe._number_of_samples_yielded += 1
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/callable.py:125, in MapperIterDataPipe.__iter__(self)
124 def __iter__(self) -> Iterator[T_co]:
--> 125 for data in self.datapipe:
126 yield self._apply_fn(data)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:180, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
178 response = gen.send(None)
179 else:
--> 180 response = gen.send(None)
182 while True:
183 datapipe._number_of_samples_yielded += 1
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/callable.py:126, in MapperIterDataPipe.__iter__(self)
124 def __iter__(self) -> Iterator[T_co]:
125 for data in self.datapipe:
--> 126 yield self._apply_fn(data)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/callable.py:91, in MapperIterDataPipe._apply_fn(self, data)
89 def _apply_fn(self, data):
90 if self.input_col is None and self.output_col is None:
---> 91 return self.fn(data)
93 if self.input_col is None:
94 res = self.fn(data)
File ~/regression_test/dgl/python/dgl/graphbolt/minibatch_transformer.py:38, in MiniBatchTransformer._transformer(self, minibatch)
37 def _transformer(self, minibatch):
---> 38 minibatch = self.transformer(minibatch)
39 assert isinstance(
40 minibatch, (MiniBatch,)
41 ), "The transformer output should be an instance of MiniBatch"
42 return minibatch
File ~/regression_test/dgl/python/dgl/graphbolt/subgraph_sampler.py:46, in SubgraphSampler._sample(self, minibatch)
39 def _sample(self, minibatch):
40 if minibatch.node_pairs is not None:
41 (
42 seeds,
43 minibatch.compacted_node_pairs,
44 minibatch.compacted_negative_srcs,
45 minibatch.compacted_negative_dsts,
---> 46 ) = self._node_pairs_preprocess(minibatch)
47 elif minibatch.seed_nodes is not None:
48 seeds = minibatch.seed_nodes
File ~/regression_test/dgl/python/dgl/graphbolt/subgraph_sampler.py:122, in SubgraphSampler._node_pairs_preprocess(self, minibatch)
120 nodes.append(neg_dst.view(-1))
121 # Unique and compact the collected nodes.
--> 122 seeds, compacted = unique_and_compact(nodes)
123 # Map back in same order as collect.
124 compacted_node_pairs = tuple(compacted[:2])
File ~/regression_test/dgl/python/dgl/graphbolt/internal/sample_utils.py:61, in unique_and_compact(nodes)
59 return unique, compacted
60 else:
---> 61 return unique_and_compact_per_type(nodes)
File ~/regression_test/dgl/python/dgl/graphbolt/internal/sample_utils.py:47, in unique_and_compact.<locals>.unique_and_compact_per_type(nodes)
45 nodes = torch.cat(nodes)
46 empty_tensor = nodes.new_empty(0)
---> 47 unique, compacted, _ = torch.ops.graphbolt.unique_and_compact(
48 nodes, empty_tensor, empty_tensor
49 )
50 compacted = compacted.split(nums)
51 return unique, list(compacted)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/_ops.py:1061, in OpOverloadPacket.__call__(self_, *args, **kwargs)
1059 if self_._has_torchbind_op_overload and _must_dispatch_in_python(args, kwargs):
1060 return _call_overload_packet_from_python(self_, args, kwargs)
-> 1061 return self_._op(*args, **(kwargs or {}))
RuntimeError: graphbolt::unique_and_compact() is missing value for argument '_3'. Declaration: graphbolt::unique_and_compact(Tensor _0, Tensor _1, Tensor _2, int _3, int _4) -> (Tensor _0, Tensor _1, Tensor _2, Tensor _3)
This exception is thrown by __iter__ of NeighborSampler(datapipe=UniformNegativeSampler, deduplicate=True, fanouts=[tensor([5]), tensor([5]), tensor([5])], graph=FusedCSCSamplingGraph(csc_indptr=tensor([ 0, 3, 6, ..., 10548, 10552, 10556]),
indices=tensor([ 633, 1862, 2582, ..., 598, 1473, 2706]),
num_nodes=2708, num_edges=10556, node_attributes={}, edge_attributes={}), prob_name=None, replace=False)
Evaluating Performance with Link Prediction๏
[9]:
model.eval()
datapipe = gb.ItemSampler(test_set, batch_size=256, shuffle=False)
# Since we need to use all neghborhoods for evaluation, we set the fanout
# to -1.
datapipe = datapipe.sample_neighbor(graph, [-1, -1])
datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"])
datapipe = datapipe.copy_to(device)
eval_dataloader = gb.DataLoader(datapipe, num_workers=0)
logits = []
labels = []
for step, data in tqdm.tqdm(enumerate(eval_dataloader)):
# Get node pairs with labels for loss calculation.
compacted_pairs, label = data.node_pairs_with_labels
# The features of sampled nodes.
x = data.node_features["feat"]
# Forward.
y = model(data.blocks, x)
logit = (
model.predictor(y[compacted_pairs[0]] * y[compacted_pairs[1]])
.squeeze()
.detach()
)
logits.append(logit)
labels.append(label)
logits = torch.cat(logits, dim=0)
labels = torch.cat(labels, dim=0)
# Compute the AUROC score.
from sklearn.metrics import roc_auc_score
auc = roc_auc_score(labels.cpu(), logits.cpu())
print("Link Prediction AUC:", auc)
0it [00:00, ?it/s]
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
Cell In[9], line 13
11 logits = []
12 labels = []
---> 13 for step, data in tqdm.tqdm(enumerate(eval_dataloader)):
14 # Get node pairs with labels for loss calculation.
15 compacted_pairs, label = data.node_pairs_with_labels
17 # The features of sampled nodes.
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/tqdm/std.py:1181, in tqdm.__iter__(self)
1178 time = self._time
1180 try:
-> 1181 for obj in iterable:
1182 yield obj
1183 # Update and possibly print the progressbar.
1184 # Note: does not call self.update(1) for speed optimisation.
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/dataloader.py:630, in _BaseDataLoaderIter.__next__(self)
627 if self._sampler_iter is None:
628 # TODO(https://github.com/pytorch/pytorch/issues/76750)
629 self._reset() # type: ignore[call-arg]
--> 630 data = self._next_data()
631 self._num_yielded += 1
632 if self._dataset_kind == _DatasetKind.Iterable and \
633 self._IterableDataset_len_called is not None and \
634 self._num_yielded > self._IterableDataset_len_called:
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/dataloader.py:673, in _SingleProcessDataLoaderIter._next_data(self)
671 def _next_data(self):
672 index = self._next_index() # may raise StopIteration
--> 673 data = self._dataset_fetcher.fetch(index) # may raise StopIteration
674 if self._pin_memory:
675 data = _utils.pin_memory.pin_memory(data, self._pin_memory_device)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/_utils/fetch.py:42, in _IterableDatasetFetcher.fetch(self, possibly_batched_index)
40 raise StopIteration
41 else:
---> 42 data = next(self.dataset_iter)
43 return self.collate_fn(data)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:151, in hook_iterator.<locals>.IteratorDecorator.__next__(self)
149 return self._get_next()
150 else: # Decided against using `contextlib.nullcontext` for performance reasons
--> 151 return self._get_next()
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:139, in hook_iterator.<locals>.IteratorDecorator._get_next(self)
137 """Return next with logic related to iterator validity, profiler, and incrementation of samples yielded."""
138 _check_iterator_valid(self.datapipe, self.iterator_id)
--> 139 result = next(self.iterator)
140 if not self.self_and_has_next_method:
141 self.datapipe._number_of_samples_yielded += 1
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:223, in hook_iterator.<locals>.wrap_next(*args, **kwargs)
221 result = next_func(*args, **kwargs)
222 else:
--> 223 result = next_func(*args, **kwargs)
224 datapipe._number_of_samples_yielded += 1
225 return result
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/datapipe.py:383, in _IterDataPipeSerializationWrapper.__next__(self)
381 def __next__(self) -> T_co: # type: ignore[type-var]
382 assert self._datapipe_iter is not None
--> 383 return next(self._datapipe_iter)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:180, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
178 response = gen.send(None)
179 else:
--> 180 response = gen.send(None)
182 while True:
183 datapipe._number_of_samples_yielded += 1
File ~/regression_test/dgl/python/dgl/graphbolt/base.py:162, in CopyTo.__iter__(self)
161 def __iter__(self):
--> 162 for data in self.datapipe:
163 data = recursive_apply(data, apply_to, self.device)
164 if self.extra_attrs is not None:
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:180, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
178 response = gen.send(None)
179 else:
--> 180 response = gen.send(None)
182 while True:
183 datapipe._number_of_samples_yielded += 1
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py:103, in PrefetcherIterDataPipe.__iter__(self)
101 if isinstance(data, (StopIteration, communication.iter.TerminateRequired)):
102 break
--> 103 raise data
104 yield data
105 else:
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py:73, in PrefetcherIterDataPipe.thread_worker(prefetch_data)
71 if len(prefetch_data.prefetch_buffer) < prefetch_data.buffer_size:
72 try:
---> 73 item = next(itr)
74 prefetch_data.prefetch_buffer.append(item)
75 except Exception as e:
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:180, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
178 response = gen.send(None)
179 else:
--> 180 response = gen.send(None)
182 while True:
183 datapipe._number_of_samples_yielded += 1
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/callable.py:125, in MapperIterDataPipe.__iter__(self)
124 def __iter__(self) -> Iterator[T_co]:
--> 125 for data in self.datapipe:
126 yield self._apply_fn(data)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:180, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
178 response = gen.send(None)
179 else:
--> 180 response = gen.send(None)
182 while True:
183 datapipe._number_of_samples_yielded += 1
File ~/regression_test/dgl/python/dgl/graphbolt/dataloader.py:64, in MultiprocessingWrapper.__iter__(self)
63 def __iter__(self):
---> 64 yield from self.dataloader
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/dataloader.py:630, in _BaseDataLoaderIter.__next__(self)
627 if self._sampler_iter is None:
628 # TODO(https://github.com/pytorch/pytorch/issues/76750)
629 self._reset() # type: ignore[call-arg]
--> 630 data = self._next_data()
631 self._num_yielded += 1
632 if self._dataset_kind == _DatasetKind.Iterable and \
633 self._IterableDataset_len_called is not None and \
634 self._num_yielded > self._IterableDataset_len_called:
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/dataloader.py:673, in _SingleProcessDataLoaderIter._next_data(self)
671 def _next_data(self):
672 index = self._next_index() # may raise StopIteration
--> 673 data = self._dataset_fetcher.fetch(index) # may raise StopIteration
674 if self._pin_memory:
675 data = _utils.pin_memory.pin_memory(data, self._pin_memory_device)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/_utils/fetch.py:42, in _IterableDatasetFetcher.fetch(self, possibly_batched_index)
40 raise StopIteration
41 else:
---> 42 data = next(self.dataset_iter)
43 return self.collate_fn(data)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:151, in hook_iterator.<locals>.IteratorDecorator.__next__(self)
149 return self._get_next()
150 else: # Decided against using `contextlib.nullcontext` for performance reasons
--> 151 return self._get_next()
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:139, in hook_iterator.<locals>.IteratorDecorator._get_next(self)
137 """Return next with logic related to iterator validity, profiler, and incrementation of samples yielded."""
138 _check_iterator_valid(self.datapipe, self.iterator_id)
--> 139 result = next(self.iterator)
140 if not self.self_and_has_next_method:
141 self.datapipe._number_of_samples_yielded += 1
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:223, in hook_iterator.<locals>.wrap_next(*args, **kwargs)
221 result = next_func(*args, **kwargs)
222 else:
--> 223 result = next_func(*args, **kwargs)
224 datapipe._number_of_samples_yielded += 1
225 return result
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/datapipe.py:383, in _IterDataPipeSerializationWrapper.__next__(self)
381 def __next__(self) -> T_co: # type: ignore[type-var]
382 assert self._datapipe_iter is not None
--> 383 return next(self._datapipe_iter)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py:180, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
178 response = gen.send(None)
179 else:
--> 180 response = gen.send(None)
182 while True:
183 datapipe._number_of_samples_yielded += 1
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/callable.py:126, in MapperIterDataPipe.__iter__(self)
124 def __iter__(self) -> Iterator[T_co]:
125 for data in self.datapipe:
--> 126 yield self._apply_fn(data)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/callable.py:91, in MapperIterDataPipe._apply_fn(self, data)
89 def _apply_fn(self, data):
90 if self.input_col is None and self.output_col is None:
---> 91 return self.fn(data)
93 if self.input_col is None:
94 res = self.fn(data)
File ~/regression_test/dgl/python/dgl/graphbolt/minibatch_transformer.py:38, in MiniBatchTransformer._transformer(self, minibatch)
37 def _transformer(self, minibatch):
---> 38 minibatch = self.transformer(minibatch)
39 assert isinstance(
40 minibatch, (MiniBatch,)
41 ), "The transformer output should be an instance of MiniBatch"
42 return minibatch
File ~/regression_test/dgl/python/dgl/graphbolt/subgraph_sampler.py:46, in SubgraphSampler._sample(self, minibatch)
39 def _sample(self, minibatch):
40 if minibatch.node_pairs is not None:
41 (
42 seeds,
43 minibatch.compacted_node_pairs,
44 minibatch.compacted_negative_srcs,
45 minibatch.compacted_negative_dsts,
---> 46 ) = self._node_pairs_preprocess(minibatch)
47 elif minibatch.seed_nodes is not None:
48 seeds = minibatch.seed_nodes
File ~/regression_test/dgl/python/dgl/graphbolt/subgraph_sampler.py:122, in SubgraphSampler._node_pairs_preprocess(self, minibatch)
120 nodes.append(neg_dst.view(-1))
121 # Unique and compact the collected nodes.
--> 122 seeds, compacted = unique_and_compact(nodes)
123 # Map back in same order as collect.
124 compacted_node_pairs = tuple(compacted[:2])
File ~/regression_test/dgl/python/dgl/graphbolt/internal/sample_utils.py:61, in unique_and_compact(nodes)
59 return unique, compacted
60 else:
---> 61 return unique_and_compact_per_type(nodes)
File ~/regression_test/dgl/python/dgl/graphbolt/internal/sample_utils.py:47, in unique_and_compact.<locals>.unique_and_compact_per_type(nodes)
45 nodes = torch.cat(nodes)
46 empty_tensor = nodes.new_empty(0)
---> 47 unique, compacted, _ = torch.ops.graphbolt.unique_and_compact(
48 nodes, empty_tensor, empty_tensor
49 )
50 compacted = compacted.split(nums)
51 return unique, list(compacted)
File /opt/conda/envs/dgl-dev-cpu/lib/python3.10/site-packages/torch/_ops.py:1061, in OpOverloadPacket.__call__(self_, *args, **kwargs)
1059 if self_._has_torchbind_op_overload and _must_dispatch_in_python(args, kwargs):
1060 return _call_overload_packet_from_python(self_, args, kwargs)
-> 1061 return self_._op(*args, **(kwargs or {}))
RuntimeError: graphbolt::unique_and_compact() is missing value for argument '_3'. Declaration: graphbolt::unique_and_compact(Tensor _0, Tensor _1, Tensor _2, int _3, int _4) -> (Tensor _0, Tensor _1, Tensor _2, Tensor _3)
This exception is thrown by __iter__ of NeighborSampler(datapipe=ShardingFilterIterDataPipe, deduplicate=True, fanouts=[tensor([-1]), tensor([-1])], graph=FusedCSCSamplingGraph(csc_indptr=tensor([ 0, 3, 6, ..., 10548, 10552, 10556]),
indices=tensor([ 633, 1862, 2582, ..., 598, 1473, 2706]),
num_nodes=2708, num_edges=10556, node_attributes={}, edge_attributes={}), prob_name=None, replace=False)
Conclusion๏
In this tutorial, you have learned how to train a multi-layer GraphSAGE for link prediction with neighbor sampling.