"""GraphBolt OnDiskDataset."""importbisectimportjsonimportosimportshutilimporttextwrapfromcopyimportdeepcopyfromtypingimportDict,List,Unionimportnumpyasnpimporttorchimportyamlfrom..baseimportetype_str_to_tuple,ORIGINAL_EDGE_IDfrom..datasetimportDataset,Taskfrom..internalimport(calculate_dir_hash,check_dataset_change,copy_or_convert_data,read_data,read_edges,)from..internal_utilsimport(download,extract_archive,gb_warning,get_attributes,)from..itemsetimportHeteroItemSet,ItemSetfrom..sampling_graphimportSamplingGraphfrom.fused_csc_sampling_graphimport(fused_csc_sampling_graph,FusedCSCSamplingGraph,)from.ondisk_metadataimport(OnDiskGraphTopology,OnDiskMetaData,OnDiskTaskData,OnDiskTVTSet,)from.torch_based_feature_storeimportTorchBasedFeatureStore__all__=["OnDiskDataset","preprocess_ondisk_dataset","BuiltinDataset"]NAMES_INDICATING_NODE_IDS=["seeds",]def_graph_data_to_fused_csc_sampling_graph(dataset_dir:str,graph_data:Dict,include_original_edge_id:bool,auto_cast_to_optimal_dtype:bool,)->FusedCSCSamplingGraph:"""Convert the raw graph data into FusedCSCSamplingGraph. Parameters ---------- dataset_dir : str The path to the dataset directory. graph_data : Dict The raw data read from yaml file. include_original_edge_id : bool Whether to include the original edge id in the FusedCSCSamplingGraph. auto_cast_to_optimal_dtype: bool, optional Casts the dtypes of tensors in the dataset into smallest possible dtypes for reduced storage requirements and potentially increased performance. Returns ------- sampling_graph : FusedCSCSamplingGraph The FusedCSCSamplingGraph constructed from the raw data. """from...sparseimportspmatrixis_homogeneous=(len(graph_data["nodes"])==1andlen(graph_data["edges"])==1and"type"notingraph_data["nodes"][0]and"type"notingraph_data["edges"][0])ifis_homogeneous:# Homogeneous graph.edge_fmt=graph_data["edges"][0]["format"]edge_path=graph_data["edges"][0]["path"]src,dst=read_edges(dataset_dir,edge_fmt,edge_path)num_nodes=graph_data["nodes"][0]["num"]num_edges=len(src)coo_tensor=torch.tensor(np.array([src,dst]))sparse_matrix=spmatrix(coo_tensor,shape=(num_nodes,num_nodes))delcoo_tensorindptr,indices,edge_ids=sparse_matrix.csc()delsparse_matrixifauto_cast_to_optimal_dtype:ifnum_nodes<=torch.iinfo(torch.int32).max:indices=indices.to(torch.int32)ifnum_edges<=torch.iinfo(torch.int32).max:indptr=indptr.to(torch.int32)edge_ids=edge_ids.to(torch.int32)node_type_offset=Nonetype_per_edge=Nonenode_type_to_id=Noneedge_type_to_id=Nonenode_attributes={}edge_attributes={}ifinclude_original_edge_id:edge_attributes[ORIGINAL_EDGE_ID]=edge_idselse:# Heterogeneous graph.# Sort graph_data by ntype/etype lexicographically to ensure ordering.graph_data["nodes"].sort(key=lambdax:x["type"])graph_data["edges"].sort(key=lambdax:x["type"])# Construct node_type_offset and node_type_to_id.node_type_offset=[0]node_type_to_id={}forntype_id,node_infoinenumerate(graph_data["nodes"]):node_type_to_id[node_info["type"]]=ntype_idnode_type_offset.append(node_type_offset[-1]+node_info["num"])total_num_nodes=node_type_offset[-1]# Construct edge_type_offset, edge_type_to_id and coo_tensor.edge_type_offset=[0]edge_type_to_id={}coo_src_list=[]coo_dst_list=[]coo_etype_list=[]foretype_id,edge_infoinenumerate(graph_data["edges"]):edge_type_to_id[edge_info["type"]]=etype_idedge_fmt=edge_info["format"]edge_path=edge_info["path"]src,dst=read_edges(dataset_dir,edge_fmt,edge_path)edge_type_offset.append(edge_type_offset[-1]+len(src))src_type,_,dst_type=etype_str_to_tuple(edge_info["type"])src+=node_type_offset[node_type_to_id[src_type]]dst+=node_type_offset[node_type_to_id[dst_type]]coo_src_list.append(torch.tensor(src))coo_dst_list.append(torch.tensor(dst))coo_etype_list.append(torch.full((len(src),),etype_id))total_num_edges=edge_type_offset[-1]coo_src=torch.cat(coo_src_list)delcoo_src_listcoo_dst=torch.cat(coo_dst_list)delcoo_dst_listifauto_cast_to_optimal_dtype:dtypes=[torch.uint8,torch.int16,torch.int32,torch.int64]dtype_maxes=[torch.iinfo(dtype).maxfordtypeindtypes]dtype_id=bisect.bisect_left(dtype_maxes,len(edge_type_to_id)-1)etype_dtype=dtypes[dtype_id]coo_etype_list=[tensor.to(etype_dtype)fortensorincoo_etype_list]coo_etype=torch.cat(coo_etype_list)delcoo_etype_listsparse_matrix=spmatrix(indices=torch.stack((coo_src,coo_dst),dim=0),shape=(total_num_nodes,total_num_nodes),)delcoo_src,coo_dstindptr,indices,edge_ids=sparse_matrix.csc()delsparse_matrixifauto_cast_to_optimal_dtype:iftotal_num_nodes<=torch.iinfo(torch.int32).max:indices=indices.to(torch.int32)iftotal_num_edges<=torch.iinfo(torch.int32).max:indptr=indptr.to(torch.int32)edge_ids=edge_ids.to(torch.int32)node_type_offset=torch.tensor(node_type_offset,dtype=indices.dtype)type_per_edge=torch.index_select(coo_etype,dim=0,index=edge_ids)delcoo_etypenode_attributes={}edge_attributes={}ifinclude_original_edge_id:# If uint8 or int16 was chosen above for etypes, we cast to int.temp_etypes=(type_per_edge.int()iftype_per_edge.element_size()<4elsetype_per_edge)edge_ids-=torch.index_select(torch.tensor(edge_type_offset,dtype=edge_ids.dtype),dim=0,index=temp_etypes,)deltemp_etypesedge_attributes[ORIGINAL_EDGE_ID]=edge_ids# Load the sampling related node/edge features and add them to# the sampling-graph.ifgraph_data.get("feature_data",None):ifis_homogeneous:# Homogeneous graph.forgraph_featureingraph_data["feature_data"]:in_memory=(Trueif"in_memory"notingraph_featureelsegraph_feature["in_memory"])ifgraph_feature["domain"]=="node":node_data=read_data(os.path.join(dataset_dir,graph_feature["path"]),graph_feature["format"],in_memory=in_memory,)assertnode_data.shape[0]==num_nodesnode_attributes[graph_feature["name"]]=node_dataelifgraph_feature["domain"]=="edge":edge_data=read_data(os.path.join(dataset_dir,graph_feature["path"]),graph_feature["format"],in_memory=in_memory,)assertedge_data.shape[0]==num_edgesedge_attributes[graph_feature["name"]]=edge_dataelse:# Heterogeneous graph.node_feature_collector={}edge_feature_collector={}forgraph_featureingraph_data["feature_data"]:in_memory=(Trueif"in_memory"notingraph_featureelsegraph_feature["in_memory"])ifgraph_feature["domain"]=="node":node_data=read_data(os.path.join(dataset_dir,graph_feature["path"]),graph_feature["format"],in_memory=in_memory,)ifgraph_feature["name"]notinnode_feature_collector:node_feature_collector[graph_feature["name"]]={}node_feature_collector[graph_feature["name"]][graph_feature["type"]]=node_dataelifgraph_feature["domain"]=="edge":edge_data=read_data(os.path.join(dataset_dir,graph_feature["path"]),graph_feature["format"],in_memory=in_memory,)ifgraph_feature["name"]notinedge_feature_collector:edge_feature_collector[graph_feature["name"]]={}edge_feature_collector[graph_feature["name"]][graph_feature["type"]]=edge_data# For heterogenous, a node/edge feature must cover all node/edge types.all_node_types=set(node_type_to_id.keys())forfeat_name,feat_datainnode_feature_collector.items():existing_node_type=set(feat_data.keys())assertall_node_types==existing_node_type,(f"Node feature {feat_name} does not cover all node types. "f"Existing types: {existing_node_type}. "f"Expected types: {all_node_types}.")all_edge_types=set(edge_type_to_id.keys())forfeat_name,feat_datainedge_feature_collector.items():existing_edge_type=set(feat_data.keys())assertall_edge_types==existing_edge_type,(f"Edge feature {feat_name} does not cover all edge types. "f"Existing types: {existing_edge_type}. "f"Expected types: {all_edge_types}.")forfeat_name,feat_datainnode_feature_collector.items():_feat=next(iter(feat_data.values()))feat_tensor=torch.empty(([total_num_nodes]+list(_feat.shape[1:])),dtype=_feat.dtype,)forntype,featinfeat_data.items():feat_tensor[node_type_offset[node_type_to_id[ntype]]:node_type_offset[node_type_to_id[ntype]+1]]=featnode_attributes[feat_name]=feat_tensordelnode_feature_collectorforfeat_name,feat_datainedge_feature_collector.items():_feat=next(iter(feat_data.values()))feat_tensor=torch.empty(([total_num_edges]+list(_feat.shape[1:])),dtype=_feat.dtype,)foretype,featinfeat_data.items():feat_tensor[edge_type_offset[edge_type_to_id[etype]]:edge_type_offset[edge_type_to_id[etype]+1]]=featedge_attributes[feat_name]=feat_tensordeledge_feature_collectorifnotbool(node_attributes):node_attributes=Noneifnotbool(edge_attributes):edge_attributes=None# Construct the FusedCSCSamplingGraph.returnfused_csc_sampling_graph(csc_indptr=indptr,indices=indices,node_type_offset=node_type_offset,type_per_edge=type_per_edge,node_type_to_id=node_type_to_id,edge_type_to_id=edge_type_to_id,node_attributes=node_attributes,edge_attributes=edge_attributes,)defpreprocess_ondisk_dataset(dataset_dir:str,include_original_edge_id:bool=False,force_preprocess:bool=None,auto_cast_to_optimal_dtype:bool=True,)->str:"""Preprocess the on-disk dataset. Parse the input config file, load the data, and save the data in the format that GraphBolt supports. Parameters ---------- dataset_dir : str The path to the dataset directory. include_original_edge_id : bool, optional Whether to include the original edge id in the FusedCSCSamplingGraph. force_preprocess: bool, optional Whether to force reload the ondisk dataset. auto_cast_to_optimal_dtype: bool, optional Casts the dtypes of tensors in the dataset into smallest possible dtypes for reduced storage requirements and potentially increased performance. Default is True. Returns ------- output_config_path : str The path to the output config file. """# Check if the dataset path is valid.ifnotos.path.exists(dataset_dir):raiseRuntimeError(f"Invalid dataset path: {dataset_dir}")# Check if the dataset_dir is a directory.ifnotos.path.isdir(dataset_dir):raiseRuntimeError(f"The dataset must be a directory. But got {dataset_dir}")# 0. Check if the dataset is already preprocessed.processed_dir_prefix="preprocessed"preprocess_metadata_path=os.path.join(processed_dir_prefix,"metadata.yaml")ifos.path.exists(os.path.join(dataset_dir,preprocess_metadata_path)):ifforce_preprocessisNone:withopen(os.path.join(dataset_dir,preprocess_metadata_path),"r")asf:preprocess_config=yaml.safe_load(f)if(preprocess_config.get("include_original_edge_id",None)==include_original_edge_id):force_preprocess=check_dataset_change(dataset_dir,processed_dir_prefix)else:force_preprocess=Trueifforce_preprocess:shutil.rmtree(os.path.join(dataset_dir,processed_dir_prefix))print("The on-disk dataset is re-preprocessing, so the existing "+"preprocessed dataset has been removed.")else:print("The dataset is already preprocessed.")returnos.path.join(dataset_dir,preprocess_metadata_path)print("Start to preprocess the on-disk dataset.")# Check if the metadata.yaml exists.metadata_file_path=os.path.join(dataset_dir,"metadata.yaml")ifnotos.path.exists(metadata_file_path):raiseRuntimeError("metadata.yaml does not exist.")# Read the input config.withopen(metadata_file_path,"r")asf:input_config=yaml.safe_load(f)# 1. Make `processed_dir_abs` directory if it does not exist.os.makedirs(os.path.join(dataset_dir,processed_dir_prefix),exist_ok=True)output_config=deepcopy(input_config)# 2. Load the data and create a FusedCSCSamplingGraph.if"graph"notininput_config:raiseRuntimeError("Invalid config: does not contain graph field.")sampling_graph=_graph_data_to_fused_csc_sampling_graph(dataset_dir,input_config["graph"],include_original_edge_id,auto_cast_to_optimal_dtype,)# 3. Record value of include_original_edge_id.output_config["include_original_edge_id"]=include_original_edge_id# 4. Save the FusedCSCSamplingGraph and modify the output_config.output_config["graph_topology"]={}output_config["graph_topology"]["type"]="FusedCSCSamplingGraph"output_config["graph_topology"]["path"]=os.path.join(processed_dir_prefix,"fused_csc_sampling_graph.pt")node_ids_within_int32=(sampling_graph.indices.dtype==torch.int32andauto_cast_to_optimal_dtype)torch.save(sampling_graph,os.path.join(dataset_dir,output_config["graph_topology"]["path"],),)delsampling_graphdeloutput_config["graph"]# 5. Load the node/edge features and do necessary conversion.ifinput_config.get("feature_data",None):has_edge_feature_data=Falseforfeature,out_featureinzip(input_config["feature_data"],output_config["feature_data"]):# Always save the feature in numpy format.out_feature["format"]="numpy"out_feature["path"]=os.path.join(processed_dir_prefix,feature["path"].replace("pt","npy"))in_memory=(Trueif"in_memory"notinfeatureelsefeature["in_memory"])ifnothas_edge_feature_dataandfeature["domain"]=="edge":has_edge_feature_data=Truecopy_or_convert_data(os.path.join(dataset_dir,feature["path"]),os.path.join(dataset_dir,out_feature["path"]),feature["format"],output_format=out_feature["format"],in_memory=in_memory,is_feature=True,)ifhas_edge_feature_dataandnotinclude_original_edge_id:gb_warning("Edge feature is stored, but edge IDs are not saved.")# 6. Save tasks and train/val/test split according to the output_config.ifinput_config.get("tasks",None):forinput_task,output_taskinzip(input_config["tasks"],output_config["tasks"]):forset_namein["train_set","validation_set","test_set"]:ifset_namenotininput_task:continueforinput_set_per_type,output_set_per_typeinzip(input_task[set_name],output_task[set_name]):forinput_data,output_datainzip(input_set_per_type["data"],output_set_per_type["data"]):# Always save the feature in numpy format.output_data["format"]="numpy"output_data["path"]=os.path.join(processed_dir_prefix,input_data["path"].replace("pt","npy"),)name=(input_data["name"]if"name"ininput_dataelseNone)copy_or_convert_data(os.path.join(dataset_dir,input_data["path"]),os.path.join(dataset_dir,output_data["path"]),input_data["format"],output_data["format"],within_int32=node_ids_within_int32andnameinNAMES_INDICATING_NODE_IDS,)# 7. Save the output_config.output_config_path=os.path.join(dataset_dir,preprocess_metadata_path)withopen(output_config_path,"w")asf:yaml.dump(output_config,f)print("Finish preprocessing the on-disk dataset.")# 8. Calculate and save the hash value of the dataset directory.hash_value_file="dataset_hash_value.txt"hash_value_file_path=os.path.join(dataset_dir,processed_dir_prefix,hash_value_file)ifos.path.exists(hash_value_file_path):os.remove(hash_value_file_path)dir_hash=calculate_dir_hash(dataset_dir)withopen(hash_value_file_path,"w")asf:f.write(json.dumps(dir_hash,indent=4))# 9. Return the absolute path of the preprocessing yaml file.returnoutput_config_pathclassOnDiskTask:"""An on-disk task. An on-disk task is for ``OnDiskDataset``. It contains the metadata and the train/val/test sets. """def__init__(self,metadata:Dict,train_set:Union[ItemSet,HeteroItemSet],validation_set:Union[ItemSet,HeteroItemSet],test_set:Union[ItemSet,HeteroItemSet],):"""Initialize a task. Parameters ---------- metadata : Dict Metadata. train_set : Union[ItemSet, HeteroItemSet] Training set. validation_set : Union[ItemSet, HeteroItemSet] Validation set. test_set : Union[ItemSet, HeteroItemSet] Test set. """self._metadata=metadataself._train_set=train_setself._validation_set=validation_setself._test_set=test_set@propertydefmetadata(self)->Dict:"""Return the task metadata."""returnself._metadata@propertydeftrain_set(self)->Union[ItemSet,HeteroItemSet]:"""Return the training set."""returnself._train_set@propertydefvalidation_set(self)->Union[ItemSet,HeteroItemSet]:"""Return the validation set."""returnself._validation_set@propertydeftest_set(self)->Union[ItemSet,HeteroItemSet]:"""Return the test set."""returnself._test_setdef__repr__(self)->str:ret="{Classname}({attributes})"attributes_str=""attributes=get_attributes(self)attributes.reverse()forattributeinattributes:ifattribute[0]=="_":continuevalue=getattr(self,attribute)attributes_str+=f"{attribute}={value},\n"attributes_str=textwrap.indent(attributes_str," "*len("OnDiskTask(")).strip()returnret.format(Classname=self.__class__.__name__,attributes=attributes_str)
[docs]classOnDiskDataset(Dataset):"""An on-disk dataset which reads graph topology, feature data and Train/Validation/Test set from disk. Due to limited resources, the data which are too large to fit into RAM will remain on disk while others reside in RAM once ``OnDiskDataset`` is initialized. This behavior could be controled by user via ``in_memory`` field in YAML file. All paths in YAML file are relative paths to the dataset directory. A full example of YAML file is as follows: .. code-block:: yaml dataset_name: graphbolt_test graph: nodes: - type: paper # could be omitted for homogeneous graph. num: 1000 - type: author num: 1000 edges: - type: author:writes:paper # could be omitted for homogeneous graph. format: csv # Can be csv only. path: edge_data/author-writes-paper.csv - type: paper:cites:paper format: csv path: edge_data/paper-cites-paper.csv feature_data: - domain: node type: paper # could be omitted for homogeneous graph. name: feat format: numpy in_memory: false # If not specified, default to true. path: node_data/paper-feat.npy - domain: edge type: "author:writes:paper" name: feat format: numpy in_memory: false path: edge_data/author-writes-paper-feat.npy tasks: - name: "edge_classification" num_classes: 10 train_set: - type: paper # could be omitted for homogeneous graph. data: # multiple data sources could be specified. - name: seeds format: numpy # Can be numpy or torch. in_memory: true # If not specified, default to true. path: set/paper-train-seeds.npy - name: labels format: numpy path: set/paper-train-labels.npy validation_set: - type: paper data: - name: seeds format: numpy path: set/paper-validation-seeds.npy - name: labels format: numpy path: set/paper-validation-labels.npy test_set: - type: paper data: - name: seeds format: numpy path: set/paper-test-seeds.npy - name: labels format: numpy path: set/paper-test-labels.npy Parameters ---------- path: str The YAML file path. include_original_edge_id: bool, optional Whether to include the original edge id in the FusedCSCSamplingGraph. force_preprocess: bool, optional Whether to force reload the ondisk dataset. auto_cast_to_optimal_dtype: bool, optional Casts the dtypes of tensors in the dataset into smallest possible dtypes for reduced storage requirements and potentially increased performance. Default is True. """def__init__(self,path:str,include_original_edge_id:bool=False,force_preprocess:bool=None,auto_cast_to_optimal_dtype:bool=True,)->None:# Always call the preprocess function first. If already preprocessed,# the function will return the original path directly.self._dataset_dir=pathyaml_path=preprocess_ondisk_dataset(path,include_original_edge_id,force_preprocess,auto_cast_to_optimal_dtype,)withopen(yaml_path)asf:self._yaml_data=yaml.load(f,Loader=yaml.loader.SafeLoader)self._loaded=Falsedef_convert_yaml_path_to_absolute_path(self):"""Convert the path in YAML file to absolute path."""if"graph_topology"inself._yaml_data:self._yaml_data["graph_topology"]["path"]=os.path.join(self._dataset_dir,self._yaml_data["graph_topology"]["path"])if"feature_data"inself._yaml_data:forfeatureinself._yaml_data["feature_data"]:feature["path"]=os.path.join(self._dataset_dir,feature["path"])if"tasks"inself._yaml_data:fortaskinself._yaml_data["tasks"]:forset_namein["train_set","validation_set","test_set"]:ifset_namenotintask:continueforset_per_typeintask[set_name]:fordatainset_per_type["data"]:data["path"]=os.path.join(self._dataset_dir,data["path"])
[docs]defload(self,tasks:List[str]=None):"""Load the dataset. Parameters ---------- tasks: List[str] = None The name of the tasks to be loaded. For single task, the type of tasks can be both string and List[str]. For multiple tasks, only List[str] is acceptable. Examples -------- 1. Loading via single task name "node_classification". >>> dataset = gb.OnDiskDataset(base_dir).load( ... tasks="node_classification") >>> len(dataset.tasks) 1 >>> dataset.tasks[0].metadata["name"] "node_classification" 2. Loading via single task name ["node_classification"]. >>> dataset = gb.OnDiskDataset(base_dir).load( ... tasks=["node_classification"]) >>> len(dataset.tasks) 1 >>> dataset.tasks[0].metadata["name"] "node_classification" 3. Loading via multiple task names ["node_classification", "link_prediction"]. >>> dataset = gb.OnDiskDataset(base_dir).load( ... tasks=["node_classification","link_prediction"]) >>> len(dataset.tasks) 2 >>> dataset.tasks[0].metadata["name"] "node_classification" >>> dataset.tasks[1].metadata["name"] "link_prediction" """self._convert_yaml_path_to_absolute_path()self._meta=OnDiskMetaData(**self._yaml_data)self._dataset_name=self._meta.dataset_nameself._graph=self._load_graph(self._meta.graph_topology)self._feature=TorchBasedFeatureStore(self._meta.feature_data)self._tasks=self._init_tasks(self._meta.tasks,tasks)self._all_nodes_set=self._init_all_nodes_set(self._graph)self._loaded=Truereturnself
@propertydefyaml_data(self)->Dict:"""Return the YAML data."""returnself._yaml_data@propertydeftasks(self)->List[Task]:"""Return the tasks."""self._check_loaded()returnself._tasks@propertydefgraph(self)->SamplingGraph:"""Return the graph."""self._check_loaded()returnself._graph@propertydeffeature(self)->TorchBasedFeatureStore:"""Return the feature."""self._check_loaded()returnself._feature@propertydefdataset_name(self)->str:"""Return the dataset name."""self._check_loaded()returnself._dataset_name@propertydefall_nodes_set(self)->Union[ItemSet,HeteroItemSet]:"""Return the itemset containing all nodes."""self._check_loaded()returnself._all_nodes_setdef_init_tasks(self,tasks:List[OnDiskTaskData],selected_tasks:List[str])->List[OnDiskTask]:"""Initialize the tasks."""ifisinstance(selected_tasks,str):selected_tasks=[selected_tasks]ifselected_tasksandnotisinstance(selected_tasks,list):raiseTypeError(f"The type of selected_task should be list, but got {type(selected_tasks)}")ret=[]iftasksisNone:returnrettask_names=set()fortaskintasks:task_name=task.extra_fields.get("name",None)ifselected_tasksisNoneortask_nameinselected_tasks:ret.append(OnDiskTask(task.extra_fields,self._init_tvt_set(task.train_set),self._init_tvt_set(task.validation_set),self._init_tvt_set(task.test_set),))ifselected_tasks:task_names.add(task_name)ifselected_tasks:not_found_tasks=set(selected_tasks)-task_namesiflen(not_found_tasks):gb_warning(f"Below tasks are not found in YAML: {not_found_tasks}. Skipped.")returnretdef_check_loaded(self):assertself._loaded,("Please ensure that you have called the OnDiskDataset.load() method"+" to properly load the data.")def_load_graph(self,graph_topology:OnDiskGraphTopology)->FusedCSCSamplingGraph:"""Load the graph topology."""ifgraph_topologyisNone:returnNoneifgraph_topology.type=="FusedCSCSamplingGraph":returntorch.load(graph_topology.path)raiseNotImplementedError(f"Graph topology type {graph_topology.type} is not supported.")def_init_tvt_set(self,tvt_set:List[OnDiskTVTSet])->Union[ItemSet,HeteroItemSet]:"""Initialize the TVT set."""ret=Noneif(tvt_setisNone)or(len(tvt_set)==0):returnretiftvt_set[0].typeisNone:assert(len(tvt_set)==1),"Only one TVT set is allowed if type is not specified."ret=ItemSet(tuple(read_data(data.path,data.format,data.in_memory)fordataintvt_set[0].data),names=tuple(data.namefordataintvt_set[0].data),)else:itemsets={}fortvtintvt_set:itemsets[tvt.type]=ItemSet(tuple(read_data(data.path,data.format,data.in_memory)fordataintvt.data),names=tuple(data.namefordataintvt.data),)ret=HeteroItemSet(itemsets)returnretdef_init_all_nodes_set(self,graph)->Union[ItemSet,HeteroItemSet]:ifgraphisNone:gb_warning("`all_nodes_set` is returned as None, since graph is None.")returnNonenum_nodes=graph.num_nodesdtype=graph.indices.dtypeifisinstance(num_nodes,int):returnItemSet(torch.tensor(num_nodes,dtype=dtype),names="seeds",)else:data={node_type:ItemSet(torch.tensor(num_node,dtype=dtype),names="seeds",)fornode_type,num_nodeinnum_nodes.items()}returnHeteroItemSet(data)
[docs]classBuiltinDataset(OnDiskDataset):"""A utility class to download built-in dataset from AWS S3 and load it as :class:`OnDiskDataset`. Available built-in datasets include: **cora** The cora dataset is a homogeneous citation network dataset, which is designed for the node classification task. **ogbn-mag** The ogbn-mag dataset is a heterogeneous network composed of a subset of the Microsoft Academic Graph (MAG). See more details in `ogbn-mag <https://ogb.stanford.edu/docs/nodeprop/#ogbn-mag>`_. .. note:: Reverse edges are added to the original graph and duplicated edges are removed. **ogbl-citation2** The ogbl-citation2 dataset is a directed graph, representing the citation network between a subset of papers extracted from MAG. See more details in `ogbl-citation2 <https://ogb.stanford.edu/docs/linkprop/#ogbl-citation2>`_. .. note:: Reverse edges are added to the original graph and duplicated edges are removed. **ogbn-arxiv** The ogbn-arxiv dataset is a directed graph, representing the citation network between all Computer Science (CS) arXiv papers indexed by MAG. See more details in `ogbn-arxiv <https://ogb.stanford.edu/docs/nodeprop/#ogbn-arxiv>`_. .. note:: Reverse edges are added to the original graph and duplicated edges are removed. **ogbn-papers100M** The ogbn-papers100M dataset is a directed graph, representing the citation network between all Computer Science (CS) arXiv papers indexed by MAG. See more details in `ogbn-papers100M <https://ogb.stanford.edu/docs/nodeprop/#ogbn-papers100M>`_. .. note:: Reverse edges are added to the original graph and duplicated edges are removed. **ogbn-products** The ogbn-products dataset is an undirected and unweighted graph, representing an Amazon product co-purchasing network. See more details in `ogbn-products <https://ogb.stanford.edu/docs/nodeprop/#ogbn-products>`_. .. note:: Reverse edges are added to the original graph. Node features are stored as float32. **ogb-lsc-mag240m** The ogb-lsc-mag240m dataset is a heterogeneous academic graph extracted from the Microsoft Academic Graph (MAG). See more details in `ogb-lsc-mag240m <https://ogb.stanford.edu/docs/lsc/mag240m/>`_. .. note:: Reverse edges are added to the original graph. **igb-hom and igb-hom-[tiny|small|medium|large]** The igb-hom-[tiny|small|medium|large] and igb-hom dataset is a homogeneous citation network, which is designed for developers to train and evaluate GNN models with high fidelity. See more details in `igb-hom-[tiny|small|medium|large] <https://github.com/IllinoisGraphBenchmark/IGB-Datasets>`_. .. note:: Self edges are added to the original graph. Node features are stored as float32. **igb-het-[tiny|small|medium]** The igb-hom-[tiny|small|medium] dataset is a heterogeneous citation network, which is designed for developers to train and evaluate GNN models with high fidelity. See more details in `igb-het-[tiny|small|medium] <https://github.com/IllinoisGraphBenchmark/IGB-Datasets>`_. .. note:: Four Reverse edge types are added to the original graph. Node features are stored as float32. Parameters ---------- name : str The name of the builtin dataset. root : str, optional The root directory of the dataset. Default ot ``datasets``. """# For dataset that is smaller than 30GB, we use the base url.# Otherwise, we use the accelerated url._base_url="https://data.dgl.ai/dataset/graphbolt/"_accelerated_url=("https://dgl-data.s3-accelerate.amazonaws.com/dataset/graphbolt/")_datasets=["cora","cora-seeds","ogbn-mag","ogbn-mag-seeds","ogbl-citation2","ogbl-citation2-seeds","ogbn-products","ogbn-products-seeds","ogbn-arxiv","ogbn-arxiv-seeds","igb-hom-tiny","igb-hom-tiny-seeds","igb-hom-small","igb-hom-small-seeds","igb-het-tiny","igb-het-tiny-seeds","igb-het-small","igb-het-small-seeds",]_large_datasets=["ogb-lsc-mag240m","ogb-lsc-mag240m-seeds","ogbn-papers100M","ogbn-papers100M-seeds","igb-hom-medium","igb-hom-medium-seeds","igb-hom-large","igb-hom-large-seeds","igb-hom","igb-hom-seeds","igb-het-medium","igb-het-medium-seeds",]_all_datasets=_datasets+_large_datasetsdef__init__(self,name:str,root:str="datasets")->OnDiskDataset:# For user using DGL 2.2 or later version, we prefer them to use# datasets with `seeds` suffix. This hack should be removed, when the# datasets with `seeds` suffix have covered previous ones.if"seeds"notinname:name+="-seeds"dataset_dir=os.path.join(root,name)ifnotos.path.exists(dataset_dir):ifnamenotinself._all_datasets:raiseRuntimeError(f"Dataset {name} is not available. Available datasets are "f"{self._all_datasets}.")url=(self._accelerated_urlifnameinself._large_datasetselseself._base_url)url+=name+".zip"os.makedirs(root,exist_ok=True)zip_file_path=os.path.join(root,name+".zip")download(url,path=zip_file_path)extract_archive(zip_file_path,root,overwrite=True)os.remove(zip_file_path)super().__init__(dataset_dir,force_preprocess=False)