"""GraphBolt OnDiskDataset."""importosfromcopyimportdeepcopyfromtypingimportDict,List,Unionimportpandasaspdimporttorchimportyamlimportdglfrom...baseimportdgl_warningfrom...data.utilsimportdownload,extract_archivefrom..baseimportetype_str_to_tuplefrom..datasetimportDataset,Taskfrom..internalimportcopy_or_convert_data,get_attributes,read_datafrom..itemsetimportItemSet,ItemSetDictfrom..sampling_graphimportSamplingGraphfrom.fused_csc_sampling_graphimportfrom_dglgraph,FusedCSCSamplingGraphfrom.ondisk_metadataimport(OnDiskGraphTopology,OnDiskMetaData,OnDiskTaskData,OnDiskTVTSet,)from.torch_based_feature_storeimportTorchBasedFeatureStore__all__=["OnDiskDataset","preprocess_ondisk_dataset","BuiltinDataset"]defpreprocess_ondisk_dataset(dataset_dir:str,include_original_edge_id:bool=False)->str:"""Preprocess the on-disk dataset. Parse the input config file, load the data, and save the data in the format that GraphBolt supports. Parameters ---------- dataset_dir : str The path to the dataset directory. include_original_edge_id : bool, optional Whether to include the original edge id in the FusedCSCSamplingGraph. Returns ------- output_config_path : str The path to the output config file. """# Check if the dataset path is valid.ifnotos.path.exists(dataset_dir):raiseRuntimeError(f"Invalid dataset path: {dataset_dir}")# Check if the dataset_dir is a directory.ifnotos.path.isdir(dataset_dir):raiseRuntimeError(f"The dataset must be a directory. But got {dataset_dir}")# 0. Check if the dataset is already preprocessed.preprocess_metadata_path=os.path.join("preprocessed","metadata.yaml")ifos.path.exists(os.path.join(dataset_dir,preprocess_metadata_path)):print("The dataset is already preprocessed.")returnos.path.join(dataset_dir,preprocess_metadata_path)print("Start to preprocess the on-disk dataset.")processed_dir_prefix="preprocessed"# Check if the metadata.yaml exists.metadata_file_path=os.path.join(dataset_dir,"metadata.yaml")ifnotos.path.exists(metadata_file_path):raiseRuntimeError("metadata.yaml does not exist.")# Read the input config.withopen(metadata_file_path,"r")asf:input_config=yaml.safe_load(f)# 1. Make `processed_dir_abs` directory if it does not exist.os.makedirs(os.path.join(dataset_dir,processed_dir_prefix),exist_ok=True)output_config=deepcopy(input_config)# 2. Load the edge data and create a DGLGraph.if"graph"notininput_config:raiseRuntimeError("Invalid config: does not contain graph field.")is_homogeneous="type"notininput_config["graph"]["nodes"][0]ifis_homogeneous:# Homogeneous graph.num_nodes=input_config["graph"]["nodes"][0]["num"]edge_data=pd.read_csv(os.path.join(dataset_dir,input_config["graph"]["edges"][0]["path"]),names=["src","dst"],)src,dst=edge_data["src"].to_numpy(),edge_data["dst"].to_numpy()g=dgl.graph((src,dst),num_nodes=num_nodes)else:# Heterogeneous graph.# Construct the num nodes dict.num_nodes_dict={}fornode_infoininput_config["graph"]["nodes"]:num_nodes_dict[node_info["type"]]=node_info["num"]# Construct the data dict.data_dict={}foredge_infoininput_config["graph"]["edges"]:edge_data=pd.read_csv(os.path.join(dataset_dir,edge_info["path"]),names=["src","dst"],)src=torch.tensor(edge_data["src"])dst=torch.tensor(edge_data["dst"])data_dict[etype_str_to_tuple(edge_info["type"])]=(src,dst)# Construct the heterograph.g=dgl.heterograph(data_dict,num_nodes_dict)# 3. Load the sampling related node/edge features and add them to# the sampling-graph.ifinput_config["graph"].get("feature_data",None):forgraph_featureininput_config["graph"]["feature_data"]:in_memory=(Trueif"in_memory"notingraph_featureelsegraph_feature["in_memory"])ifgraph_feature["domain"]=="node":node_data=read_data(os.path.join(dataset_dir,graph_feature["path"]),graph_feature["format"],in_memory=in_memory,)g.ndata[graph_feature["name"]]=node_dataifgraph_feature["domain"]=="edge":edge_data=read_data(os.path.join(dataset_dir,graph_feature["path"]),graph_feature["format"],in_memory=in_memory,)g.edata[graph_feature["name"]]=edge_data# 4. Convert the DGLGraph to a FusedCSCSamplingGraph.fused_csc_sampling_graph=from_dglgraph(g,is_homogeneous,include_original_edge_id)# 5. Save the FusedCSCSamplingGraph and modify the output_config.output_config["graph_topology"]={}output_config["graph_topology"]["type"]="FusedCSCSamplingGraph"output_config["graph_topology"]["path"]=os.path.join(processed_dir_prefix,"fused_csc_sampling_graph.pt")torch.save(fused_csc_sampling_graph,os.path.join(dataset_dir,output_config["graph_topology"]["path"],),)deloutput_config["graph"]# 6. Load the node/edge features and do necessary conversion.ifinput_config.get("feature_data",None):forfeature,out_featureinzip(input_config["feature_data"],output_config["feature_data"]):# Always save the feature in numpy format.out_feature["format"]="numpy"out_feature["path"]=os.path.join(processed_dir_prefix,feature["path"].replace("pt","npy"))in_memory=(Trueif"in_memory"notinfeatureelsefeature["in_memory"])copy_or_convert_data(os.path.join(dataset_dir,feature["path"]),os.path.join(dataset_dir,out_feature["path"]),feature["format"],output_format=out_feature["format"],in_memory=in_memory,is_feature=True,)# 7. Save tasks and train/val/test split according to the output_config.ifinput_config.get("tasks",None):forinput_task,output_taskinzip(input_config["tasks"],output_config["tasks"]):forset_namein["train_set","validation_set","test_set"]:ifset_namenotininput_task:continueforinput_set_per_type,output_set_per_typeinzip(input_task[set_name],output_task[set_name]):forinput_data,output_datainzip(input_set_per_type["data"],output_set_per_type["data"]):# Always save the feature in numpy format.output_data["format"]="numpy"output_data["path"]=os.path.join(processed_dir_prefix,input_data["path"].replace("pt","npy"),)copy_or_convert_data(os.path.join(dataset_dir,input_data["path"]),os.path.join(dataset_dir,output_data["path"]),input_data["format"],output_data["format"],)# 8. Save the output_config.output_config_path=os.path.join(dataset_dir,preprocess_metadata_path)withopen(output_config_path,"w")asf:yaml.dump(output_config,f)print("Finish preprocessing the on-disk dataset.")# 9. Return the absolute path of the preprocessing yaml file.returnoutput_config_pathclassOnDiskTask:"""An on-disk task. An on-disk task is for ``OnDiskDataset``. It contains the metadata and the train/val/test sets. """def__init__(self,metadata:Dict,train_set:Union[ItemSet,ItemSetDict],validation_set:Union[ItemSet,ItemSetDict],test_set:Union[ItemSet,ItemSetDict],):"""Initialize a task. Parameters ---------- metadata : Dict Metadata. train_set : Union[ItemSet, ItemSetDict] Training set. validation_set : Union[ItemSet, ItemSetDict] Validation set. test_set : Union[ItemSet, ItemSetDict] Test set. """self._metadata=metadataself._train_set=train_setself._validation_set=validation_setself._test_set=test_set@propertydefmetadata(self)->Dict:"""Return the task metadata."""returnself._metadata@propertydeftrain_set(self)->Union[ItemSet,ItemSetDict]:"""Return the training set."""returnself._train_set@propertydefvalidation_set(self)->Union[ItemSet,ItemSetDict]:"""Return the validation set."""returnself._validation_set@propertydeftest_set(self)->Union[ItemSet,ItemSetDict]:"""Return the test set."""returnself._test_setdef__repr__(self)->str:return_ondisk_task_str(self)
[docs]classOnDiskDataset(Dataset):"""An on-disk dataset which reads graph topology, feature data and Train/Validation/Test set from disk. Due to limited resources, the data which are too large to fit into RAM will remain on disk while others reside in RAM once ``OnDiskDataset`` is initialized. This behavior could be controled by user via ``in_memory`` field in YAML file. All paths in YAML file are relative paths to the dataset directory. A full example of YAML file is as follows: .. code-block:: yaml dataset_name: graphbolt_test graph: nodes: - type: paper # could be omitted for homogeneous graph. num: 1000 - type: author num: 1000 edges: - type: author:writes:paper # could be omitted for homogeneous graph. format: csv # Can be csv only. path: edge_data/author-writes-paper.csv - type: paper:cites:paper format: csv path: edge_data/paper-cites-paper.csv feature_data: - domain: node type: paper # could be omitted for homogeneous graph. name: feat format: numpy in_memory: false # If not specified, default to true. path: node_data/paper-feat.npy - domain: edge type: "author:writes:paper" name: feat format: numpy in_memory: false path: edge_data/author-writes-paper-feat.npy tasks: - name: "edge_classification" num_classes: 10 train_set: - type: paper # could be omitted for homogeneous graph. data: # multiple data sources could be specified. - name: node_pairs format: numpy # Can be numpy or torch. in_memory: true # If not specified, default to true. path: set/paper-train-node_pairs.npy - name: labels format: numpy path: set/paper-train-labels.npy validation_set: - type: paper data: - name: node_pairs format: numpy path: set/paper-validation-node_pairs.npy - name: labels format: numpy path: set/paper-validation-labels.npy test_set: - type: paper data: - name: node_pairs format: numpy path: set/paper-test-node_pairs.npy - name: labels format: numpy path: set/paper-test-labels.npy Parameters ---------- path: str The YAML file path. include_original_edge_id: bool, optional Whether to include the original edge id in the FusedCSCSamplingGraph. """def__init__(self,path:str,include_original_edge_id:bool=False)->None:# Always call the preprocess function first. If already preprocessed,# the function will return the original path directly.self._dataset_dir=pathyaml_path=preprocess_ondisk_dataset(path,include_original_edge_id)withopen(yaml_path)asf:self._yaml_data=yaml.load(f,Loader=yaml.loader.SafeLoader)self._loaded=Falsedef_convert_yaml_path_to_absolute_path(self):"""Convert the path in YAML file to absolute path."""if"graph_topology"inself._yaml_data:self._yaml_data["graph_topology"]["path"]=os.path.join(self._dataset_dir,self._yaml_data["graph_topology"]["path"])if"feature_data"inself._yaml_data:forfeatureinself._yaml_data["feature_data"]:feature["path"]=os.path.join(self._dataset_dir,feature["path"])if"tasks"inself._yaml_data:fortaskinself._yaml_data["tasks"]:forset_namein["train_set","validation_set","test_set"]:ifset_namenotintask:continueforset_per_typeintask[set_name]:fordatainset_per_type["data"]:data["path"]=os.path.join(self._dataset_dir,data["path"])
[docs]defload(self):"""Load the dataset."""self._convert_yaml_path_to_absolute_path()self._meta=OnDiskMetaData(**self._yaml_data)self._dataset_name=self._meta.dataset_nameself._graph=self._load_graph(self._meta.graph_topology)self._feature=TorchBasedFeatureStore(self._meta.feature_data)self._tasks=self._init_tasks(self._meta.tasks)self._all_nodes_set=self._init_all_nodes_set(self._graph)self._loaded=Truereturnself
@propertydefyaml_data(self)->Dict:"""Return the YAML data."""returnself._yaml_data@propertydeftasks(self)->List[Task]:"""Return the tasks."""self._check_loaded()returnself._tasks@propertydefgraph(self)->SamplingGraph:"""Return the graph."""self._check_loaded()returnself._graph@propertydeffeature(self)->TorchBasedFeatureStore:"""Return the feature."""self._check_loaded()returnself._feature@propertydefdataset_name(self)->str:"""Return the dataset name."""self._check_loaded()returnself._dataset_name@propertydefall_nodes_set(self)->Union[ItemSet,ItemSetDict]:"""Return the itemset containing all nodes."""self._check_loaded()returnself._all_nodes_setdef_init_tasks(self,tasks:List[OnDiskTaskData])->List[OnDiskTask]:"""Initialize the tasks."""ret=[]iftasksisNone:returnretfortaskintasks:ret.append(OnDiskTask(task.extra_fields,self._init_tvt_set(task.train_set),self._init_tvt_set(task.validation_set),self._init_tvt_set(task.test_set),))returnretdef_check_loaded(self):assertself._loaded,("Please ensure that you have called the OnDiskDataset.load() method"+" to properly load the data.")def_load_graph(self,graph_topology:OnDiskGraphTopology)->FusedCSCSamplingGraph:"""Load the graph topology."""ifgraph_topologyisNone:returnNoneifgraph_topology.type=="FusedCSCSamplingGraph":returntorch.load(graph_topology.path)raiseNotImplementedError(f"Graph topology type {graph_topology.type} is not supported.")def_init_tvt_set(self,tvt_set:List[OnDiskTVTSet])->Union[ItemSet,ItemSetDict]:"""Initialize the TVT set."""ret=Noneif(tvt_setisNone)or(len(tvt_set)==0):returnretiftvt_set[0].typeisNone:assert(len(tvt_set)==1),"Only one TVT set is allowed if type is not specified."ret=ItemSet(tuple(read_data(data.path,data.format,data.in_memory)fordataintvt_set[0].data),names=tuple(data.namefordataintvt_set[0].data),)else:data={}fortvtintvt_set:data[tvt.type]=ItemSet(tuple(read_data(data.path,data.format,data.in_memory)fordataintvt.data),names=tuple(data.namefordataintvt.data),)ret=ItemSetDict(data)returnretdef_init_all_nodes_set(self,graph)->Union[ItemSet,ItemSetDict]:ifgraphisNone:dgl_warning("`all_node_set` is returned as None, since graph is None.")returnNonenum_nodes=graph.num_nodesifisinstance(num_nodes,int):returnItemSet(num_nodes,names="seed_nodes")else:data={node_type:ItemSet(num_node,names="seed_nodes")fornode_type,num_nodeinnum_nodes.items()}returnItemSetDict(data)
[docs]classBuiltinDataset(OnDiskDataset):"""A utility class to download built-in dataset from AWS S3 and load it as :class:`OnDiskDataset`. Available built-in datasets include: **cora** The cora dataset is a homogeneous citation network dataset, which is designed for the node classification task. **ogbn-mag** The ogbn-mag dataset is a heterogeneous network composed of a subset of the Microsoft Academic Graph (MAG). See more details in `ogbn-mag <https://ogb.stanford.edu/docs/nodeprop/#ogbn-mag>`_. .. note:: Reverse edges are added to the original graph and duplicated edges are removed. **ogbl-citation2** The ogbl-citation2 dataset is a directed graph, representing the citation network between a subset of papers extracted from MAG. See more details in `ogbl-citation2 <https://ogb.stanford.edu/docs/linkprop/#ogbl-citation2>`_. .. note:: Reverse edges are added to the original graph and duplicated edges are removed. **ogbn-arxiv** The ogbn-arxiv dataset is a directed graph, representing the citation network between all Computer Science (CS) arXiv papers indexed by MAG. See more details in `ogbn-arxiv <https://ogb.stanford.edu/docs/nodeprop/#ogbn-arxiv>`_. .. note:: Reverse edges are added to the original graph and duplicated edges are removed. **ogbn-products** The ogbn-products dataset is an undirected and unweighted graph, representing an Amazon product co-purchasing network. See more details in `ogbn-products <https://ogb.stanford.edu/docs/nodeprop/#ogbn-products>`_. .. note:: Reverse edges are added to the original graph. Node features are stored as float32. **ogb-lsc-mag240m** The ogb-lsc-mag240m dataset is a heterogeneous academic graph extracted from the Microsoft Academic Graph (MAG). See more details in `ogb-lsc-mag240m <https://ogb.stanford.edu/docs/lsc/mag240m/>`_. .. note:: Reverse edges are added to the original graph. Parameters ---------- name : str The name of the builtin dataset. root : str, optional The root directory of the dataset. Default ot ``datasets``. """# For dataset that is smaller than 30GB, we use the base url.# Otherwise, we use the accelerated url._base_url="https://data.dgl.ai/dataset/graphbolt/"_accelerated_url=("https://dgl-data.s3-accelerate.amazonaws.com/dataset/graphbolt/")_datasets=["cora","ogbn-mag","ogbl-citation2","ogbn-products","ogbn-arxiv",]_large_datasets=["ogb-lsc-mag240m"]_all_datasets=_datasets+_large_datasetsdef__init__(self,name:str,root:str="datasets")->OnDiskDataset:dataset_dir=os.path.join(root,name)ifnotos.path.exists(dataset_dir):ifnamenotinself._all_datasets:raiseRuntimeError(f"Dataset {name} is not available. Available datasets are "f"{self._all_datasets}.")url=(self._accelerated_urlifnameinself._large_datasetselseself._base_url)url+=name+".zip"os.makedirs(root,exist_ok=True)zip_file_path=os.path.join(root,name+".zip")download(url,path=zip_file_path)extract_archive(zip_file_path,root,overwrite=True)os.remove(zip_file_path)super().__init__(dataset_dir)