Source code for dgl.distributed.graph_partition_book
"""Define graph partition book."""importpicklefromabcimportABCimportnumpyasnpfrom..importbackendasF,utilsfrom.._ffi.ndarrayimportempty_shared_memfrom..baseimportDGLErrorfrom..ndarrayimportexist_shared_mem_arrayfrom..partitionimportNDArrayPartitionfrom.constantsimportDEFAULT_ETYPE,DEFAULT_NTYPEfrom.id_mapimportIdMapfrom.shared_mem_utilsimport(_get_edata_path,_get_ndata_path,_to_shared_mem,DTYPE_DICT,)CANONICAL_ETYPE_DELIMITER=":"def_etype_tuple_to_str(c_etype):"""Convert canonical etype from tuple to string. Examples -------- >>> c_etype = ('user', 'like', 'item') >>> c_etype_str = _etype_tuple_to_str(c_etype) >>> print(c_etype_str) 'user:like:item' """assertisinstance(c_etype,tuple)andlen(c_etype)==3,("Passed-in canonical etype should be in format of (str, str, str). "f"But got {c_etype}.")returnCANONICAL_ETYPE_DELIMITER.join(c_etype)def_etype_str_to_tuple(c_etype):"""Convert canonical etype from tuple to string. Examples -------- >>> c_etype_str = 'user:like:item' >>> c_etype = _etype_str_to_tuple(c_etype_str) >>> print(c_etype) ('user', 'like', 'item') """ret=tuple(c_etype.split(CANONICAL_ETYPE_DELIMITER))assertlen(ret)==3,("Passed-in canonical etype should be in format of 'str:str:str'. "f"But got {c_etype}.")returnretdef_move_metadata_to_shared_mem(graph_name,num_nodes,num_edges,part_id,num_partitions,node_map,edge_map,is_range_part,):"""Move all metadata of the partition book to the shared memory. These metadata will be used to construct graph partition book. Parameters ---------- graph_name : str The name of the graph num_nodes : int The total number of nodes num_edges : int The total number of edges part_id : int The partition ID. num_partitions : int The number of physical partitions generated for the graph. node_map : Tensor It stores the mapping information from node IDs to partitions. With range partitioning, the tensor stores the serialized result of partition ranges. edge_map : Tensor It stores the mapping information from edge IDs to partitions. With range partitioning, the tensor stores the serialized result of partition ranges. is_range_part : bool Indicate that we use a range partition. This is important for us to deserialize data in node_map and edge_map. Returns ------- (Tensor, Tensor, Tensor) The first tensor stores the serialized metadata, the second tensor stores the serialized node map and the third tensor stores the serialized edge map. All tensors are stored in shared memory. """meta=_to_shared_mem(F.tensor([int(is_range_part),num_nodes,num_edges,num_partitions,part_id,len(node_map),len(edge_map),]),_get_ndata_path(graph_name,"meta"),)node_map=_to_shared_mem(node_map,_get_ndata_path(graph_name,"node_map"))edge_map=_to_shared_mem(edge_map,_get_edata_path(graph_name,"edge_map"))returnmeta,node_map,edge_mapdef_get_shared_mem_metadata(graph_name):"""Get the metadata of the graph from shared memory. The server serializes the metadata of a graph and store them in shared memory. The client needs to deserialize the data in shared memory and get the metadata of the graph. Parameters ---------- graph_name : str The name of the graph. We can use the graph name to find the shared memory name. Returns ------- (bool, int, int, Tensor, Tensor) The first element indicates whether it is range partitioning; the second element is the partition ID; the third element is the number of partitions; the fourth element is the tensor that stores the serialized result of node maps; the fifth element is the tensor that stores the serialized result of edge maps. """# The metadata has 7 elements: is_range_part, num_nodes, num_edges, num_partitions, part_id,# the length of node map and the length of the edge map.shape=(7,)dtype=F.int64dtype=DTYPE_DICT[dtype]data=empty_shared_mem(_get_ndata_path(graph_name,"meta"),False,shape,dtype)dlpack=data.to_dlpack()meta=F.asnumpy(F.zerocopy_from_dlpack(dlpack))(is_range_part,_,_,num_partitions,part_id,node_map_len,edge_map_len,)=meta# Load node mapdata=empty_shared_mem(_get_ndata_path(graph_name,"node_map"),False,(node_map_len,),dtype)dlpack=data.to_dlpack()node_map=F.zerocopy_from_dlpack(dlpack)# Load edge_mapdata=empty_shared_mem(_get_edata_path(graph_name,"edge_map"),False,(edge_map_len,),dtype)dlpack=data.to_dlpack()edge_map=F.zerocopy_from_dlpack(dlpack)returnis_range_part,part_id,num_partitions,node_map,edge_mapdefget_shared_mem_partition_book(graph_name):"""Get a graph partition book from shared memory. A graph partition book of a specific graph can be serialized to shared memory. We can reconstruct a graph partition book from shared memory. Parameters ---------- graph_name : str The name of the graph. Returns ------- GraphPartitionBook A graph partition book for a particular partition. """ifnotexist_shared_mem_array(_get_ndata_path(graph_name,"meta")):returnNone(is_range_part,part_id,num_parts,node_map_data,edge_map_data,)=_get_shared_mem_metadata(graph_name)ifis_range_part==1:# node ID ranges and edge ID ranges are stored in the order of node type IDs# and edge type IDs.node_map={}ntypes={}# node_map_data and edge_map_data were serialized with pickle and converted into# a list of bytes and then stored in a numpy array before being placed in shared# memory. To deserialize, we need to reverse the process.node_map_data=pickle.loads(bytes(F.asnumpy(node_map_data).tolist()))fori,(ntype,nid_range)inenumerate(node_map_data):ntypes[ntype]=inode_map[ntype]=nid_rangeedge_map={}etypes={}edge_map_data=pickle.loads(bytes(F.asnumpy(edge_map_data).tolist()))fori,(etype,eid_range)inenumerate(edge_map_data):etypes[etype]=iedge_map[etype]=eid_rangereturnRangePartitionBook(part_id,num_parts,node_map,edge_map,ntypes,etypes)else:raiseTypeError("Only RangePartitionBook is supported currently.")defget_node_partition_from_book(book,device):"""Get an NDArrayPartition of the nodes from a RangePartitionBook. Parameters ---------- book : RangePartitionBook The partition book to extract the node partition from. device : Device context object. The location to node partition is to be used. Returns ------- NDarrayPartition The NDArrayPartition object for the nodes in the graph. """assertisinstance(book,RangePartitionBook),("Can only convert ""RangePartitionBook to NDArrayPartition.")# create prefix-sum array on hostmax_node_ids=F.zerocopy_from_numpy(book._max_node_ids)cpu_range=F.cat([F.tensor([0],dtype=F.dtype(max_node_ids)),max_node_ids+1],dim=0)gpu_range=F.copy_to(cpu_range,ctx=device)# convert from numpyarray_size=int(F.as_scalar(cpu_range[-1]))num_parts=book.num_partitions()returnNDArrayPartition(array_size,num_parts,mode="range",part_ranges=gpu_range)
[docs]classGraphPartitionBook(ABC):"""The base class of the graph partition book. For distributed training, a graph is partitioned into multiple parts and is loaded in multiple machines. The partition book contains all necessary information to locate nodes and edges in the cluster. The partition book contains various partition information, including * the number of partitions, * the partition ID that a node or edge belongs to, * the node IDs and the edge IDs that a partition has. * the local IDs of nodes and edges in a partition. Currently, only one class that implement ``GraphPartitionBook`` :``RangePartitionBook``. It calculates the mapping between node/edge IDs and partition IDs based on some small metadata because nodes/edges have been relabeled to have IDs in the same partition fall in a contiguous ID range. A graph partition book is constructed automatically when a graph is partitioned. When a graph partition is loaded, a graph partition book is loaded as well. Please see :py:meth:`~dgl.distributed.partition.partition_graph`, :py:meth:`~dgl.distributed.partition.load_partition` and :py:meth:`~dgl.distributed.partition.load_partition_book` for more details. """
[docs]defshared_memory(self,graph_name):"""Move the partition book to shared memory. Parameters ---------- graph_name : str The graph name. This name will be used to read the partition book from shared memory in another process. """
[docs]defnum_partitions(self):"""Return the number of partitions. Returns ------- int number of partitions """
[docs]defmetadata(self):"""Return the partition meta data. The meta data includes: * The machine ID. * Number of nodes and edges of each partition. Examples -------- >>> print(g.get_partition_book().metadata()) >>> [{'machine_id' : 0, 'num_nodes' : 3000, 'num_edges' : 5000}, ... {'machine_id' : 1, 'num_nodes' : 2000, 'num_edges' : 4888}, ... ...] Returns ------- list[dict[str, any]] Meta data of each partition. """
[docs]defnid2partid(self,nids,ntype):"""From global node IDs to partition IDs Parameters ---------- nids : tensor global node IDs ntype : str The node type Returns ------- tensor partition IDs """
[docs]defeid2partid(self,eids,etype):"""From global edge IDs to partition IDs Parameters ---------- eids : tensor global edge IDs etype : str or (str, str, str) The edge type Returns ------- tensor partition IDs """
[docs]defpartid2nids(self,partid,ntype):"""From partition id to global node IDs Parameters ---------- partid : int partition id ntype : str The node type Returns ------- tensor node IDs """
[docs]defpartid2eids(self,partid,etype):"""From partition id to global edge IDs Parameters ---------- partid : int partition id etype : str or (str, str, str) The edge type Returns ------- tensor edge IDs """
[docs]defnid2localnid(self,nids,partid,ntype):"""Get local node IDs within the given partition. Parameters ---------- nids : tensor global node IDs partid : int partition ID ntype : str The node type Returns ------- tensor local node IDs """
[docs]defeid2localeid(self,eids,partid,etype):"""Get the local edge ids within the given partition. Parameters ---------- eids : tensor global edge IDs partid : int partition ID etype : str or (str, str, str) The edge type Returns ------- tensor local edge IDs """
@propertydefpartid(self):"""Get the current partition ID Return ------ int The partition ID of current machine """@propertydefntypes(self):"""Get the list of node types"""@propertydefetypes(self):"""Get the list of edge types"""@propertydefcanonical_etypes(self):"""Get the list of canonical edge types Returns ------- list[(str, str, str)] A list of canonical etypes """defto_canonical_etype(self,etype):"""Convert an edge type to the corresponding canonical edge type. Parameters ---------- etype : str or (str, str, str) The edge type Returns ------- (str, str, str) The corresponding canonical edge type """@propertydefis_homogeneous(self):"""check if homogeneous"""returnnot(len(self.etypes)>1orlen(self.ntypes)>1)
[docs]defmap_to_per_ntype(self,ids):"""Map homogeneous node IDs to type-wise IDs and node types. Parameters ---------- ids : tensor Homogeneous node IDs. Returns ------- (tensor, tensor) node type IDs and type-wise node IDs. """
[docs]defmap_to_per_etype(self,ids):"""Map homogeneous edge IDs to type-wise IDs and edge types. Parameters ---------- ids : tensor Homogeneous edge IDs. Returns ------- (tensor, tensor) edge type IDs and type-wise edge IDs. """
[docs]defmap_to_homo_nid(self,ids,ntype):"""Map type-wise node IDs and type IDs to homogeneous node IDs. Parameters ---------- ids : tensor Type-wise node Ids ntype : str node type Returns ------- Tensor Homogeneous node IDs. """
[docs]defmap_to_homo_eid(self,ids,etype):"""Map type-wise edge IDs and type IDs to homogeneous edge IDs. Parameters ---------- ids : tensor Type-wise edge Ids etype : str or (str, str, str) The edge type Returns ------- Tensor Homogeneous edge IDs. """
classRangePartitionBook(GraphPartitionBook):"""This partition book supports more efficient storage of partition information. This partition book is used if the nodes and edges of a graph partition are assigned with contiguous IDs. It uses very small amount of memory to store the partition information. Parameters ---------- part_id : int partition ID of current partition book num_parts : int number of total partitions node_map : dict[str, Tensor] Global node ID ranges within partitions for each node type. The key is the node type name in string. The value is a tensor of shape :math:`(K, 2)`, where :math:`K` is the number of partitions. Each row has two integers: the starting and the ending IDs for a particular node type in a partition. For example, all nodes of type ``"T"`` in partition ``i`` has ID range ``node_map["T"][i][0]`` to ``node_map["T"][i][1]``. edge_map : dict[(str, str, str), Tensor] Global edge ID ranges within partitions for each edge type. The key is the edge type name in string. The value is a tensor of shape :math:`(K, 2)`, where :math:`K` is the number of partitions. Each row has two integers: the starting and the ending IDs for a particular edge type in a partition. For example, all edges of type ``"T"`` in partition ``i`` has ID range ``edge_map["T"][i][0]`` to ``edge_map["T"][i][1]``. ntypes : dict[str, int] map ntype strings to ntype IDs. etypes : dict[(str, str, str), int] map canonical etypes to etype IDs. """def__init__(self,part_id,num_parts,node_map,edge_map,ntypes,etypes):assertpart_id>=0,"part_id cannot be a negative number."assertnum_parts>0,"num_parts must be greater than zero."self._partid=part_idself._num_partitions=num_partsself._ntypes=[None]*len(ntypes)self._etypes=[None]*len(etypes)self._canonical_etypes=[None]*len(etypes)# map etypes to canonical onesself._etype2canonical={}forntypeinntypes:ntype_id=ntypes[ntype]self._ntypes[ntype_id]=ntypeassertall(ntypeisnotNoneforntypeinself._ntypes),"The node types have invalid IDs."forc_etype,etype_idinetypes.items():assertisinstance(c_etype,tuple)andlen(c_etype)==3,("Expect canonical edge type in a triplet of string, but got "f"{c_etype}.")etype=c_etype[1]self._etypes[etype_id]=etypeself._canonical_etypes[etype_id]=c_etypeifetypeinself._etype2canonical:# If one etype maps to multiple canonical etypes, empty tuple# is used to indicate such ambiguity casued by etype. See more# details in self.to_canonical_etype().self._etype2canonical[etype]=tuple()else:self._etype2canonical[etype]=c_etypeassertall(etypeisnotNoneforetypeinself._etypes),"The edge types have invalid IDs."# This stores the node ID ranges for each node type in each partition.# The key is the node type, the value is a NumPy matrix with two# columns, in which each row indicates the start and the end of the# node ID range in a partition. The node IDs are global node IDs in the# homogeneous representation.self._typed_nid_range={}# This stores the node ID map for per-node-type IDs in each partition.# The key is the node type, the value is a NumPy vector which indicates# the last node ID in a partition.self._typed_max_node_ids={}max_node_map=np.zeros((num_parts,),dtype=np.int64)forkeyinnode_map:assertkeyinntypes,"Unexpected ntype: {}.".format(key)ifnotisinstance(node_map[key],np.ndarray):node_map[key]=F.asnumpy(node_map[key])assertnode_map[key].shape==(num_parts,2)self._typed_nid_range[key]=node_map[key]# This is used for per-node-type lookup.self._typed_max_node_ids[key]=np.cumsum(self._typed_nid_range[key][:,1]-self._typed_nid_range[key][:,0])# This is used for homogeneous node ID lookup.max_node_map=np.maximum(self._typed_nid_range[key][:,1],max_node_map)# This is a vector that indicates the last node ID in each partition.# The ID is the global ID in the homogeneous representation.self._max_node_ids=max_node_map# Similar to _typed_nid_range.self._typed_eid_range={}# similar to _typed_max_node_ids.self._typed_max_edge_ids={}max_edge_map=np.zeros((num_parts,),dtype=np.int64)forkeyinedge_map:assertkeyinetypes,"Unexpected etype: {}.".format(key)ifnotisinstance(edge_map[key],np.ndarray):edge_map[key]=F.asnumpy(edge_map[key])assertedge_map[key].shape==(num_parts,2)self._typed_eid_range[key]=edge_map[key]# This is used for per-edge-type lookup.self._typed_max_edge_ids[key]=np.cumsum(self._typed_eid_range[key][:,1]-self._typed_eid_range[key][:,0])# This is used for homogeneous edge ID lookup.max_edge_map=np.maximum(self._typed_eid_range[key][:,1],max_edge_map)# Similar to _max_node_idsself._max_edge_ids=max_edge_map# These two are map functions that map node/edge IDs to node/edge type IDs.self._nid_map=IdMap(self._typed_nid_range)self._eid_map=IdMap(self._typed_eid_range)# Local node/edge type offset that maps the local homogenized node/edge IDs# to local heterogenized node/edge IDs. One can do the mapping by binary search# on these arrays.self._local_ntype_offset=np.cumsum([0]+[v[self._partid,1]-v[self._partid,0]forvinself._typed_nid_range.values()]).tolist()self._local_etype_offset=np.cumsum([0]+[v[self._partid,1]-v[self._partid,0]forvinself._typed_eid_range.values()]).tolist()# Get meta data of the partition bookself._partition_meta_data=[]forpartidinrange(self._num_partitions):nrange_start=max_node_map[partid-1]ifpartid>0else0nrange_end=max_node_map[partid]num_nodes=nrange_end-nrange_starterange_start=max_edge_map[partid-1]ifpartid>0else0erange_end=max_edge_map[partid]num_edges=erange_end-erange_startpart_info={}part_info["machine_id"]=partidpart_info["num_nodes"]=int(num_nodes)part_info["num_edges"]=int(num_edges)self._partition_meta_data.append(part_info)defshared_memory(self,graph_name):"""Move data to shared memory."""# we need to store the nid ranges and eid ranges of different types in the order defined# by type IDs.nid_range=[None]*len(self.ntypes)fori,ntypeinenumerate(self.ntypes):nid_range[i]=(ntype,self._typed_nid_range[ntype])nid_range_pickle=list(pickle.dumps(nid_range))eid_range=[None]*len(self.canonical_etypes)fori,etypeinenumerate(self.canonical_etypes):eid_range[i]=(etype,self._typed_eid_range[etype])eid_range_pickle=list(pickle.dumps(eid_range))self._meta=_move_metadata_to_shared_mem(graph_name,0,# We don't need to provide the number of nodes0,# We don't need to provide the number of edgesself._partid,self._num_partitions,F.tensor(nid_range_pickle),F.tensor(eid_range_pickle),True,)defnum_partitions(self):"""Return the number of partitions."""returnself._num_partitionsdef_num_nodes(self,ntype=DEFAULT_NTYPE):"""The total number of nodes"""ifntype==DEFAULT_NTYPE:returnint(self._max_node_ids[-1])else:returnint(self._typed_max_node_ids[ntype][-1])def_num_edges(self,etype=DEFAULT_ETYPE):"""The total number of edges"""ifetypein(DEFAULT_ETYPE,DEFAULT_ETYPE[1]):returnint(self._max_edge_ids[-1])else:c_etype=self.to_canonical_etype(etype)returnint(self._typed_max_edge_ids[c_etype][-1])defmetadata(self):"""Return the partition meta data."""returnself._partition_meta_datadefmap_to_per_ntype(self,ids):"""Map global homogeneous node IDs to node type IDs. Returns type_ids, per_type_ids """returnself._nid_map(ids)defmap_to_per_etype(self,ids):"""Map global homogeneous edge IDs to edge type IDs. Returns type_ids, per_type_ids """returnself._eid_map(ids)defmap_to_homo_nid(self,ids,ntype):"""Map per-node-type IDs to global node IDs in the homogeneous format."""ids=utils.toindex(ids).tousertensor()partids=self.nid2partid(ids,ntype)typed_max_nids=F.zerocopy_from_numpy(self._typed_max_node_ids[ntype])end_diff=F.gather_row(typed_max_nids,partids)-idstyped_nid_range=F.zerocopy_from_numpy(self._typed_nid_range[ntype][:,1])returnF.gather_row(typed_nid_range,partids)-end_diffdefmap_to_homo_eid(self,ids,etype):"""Map per-edge-type IDs to global edge IDs in the homoenegeous format."""ids=utils.toindex(ids).tousertensor()c_etype=self.to_canonical_etype(etype)partids=self.eid2partid(ids,c_etype)typed_max_eids=F.zerocopy_from_numpy(self._typed_max_edge_ids[c_etype])end_diff=F.gather_row(typed_max_eids,partids)-idstyped_eid_range=F.zerocopy_from_numpy(self._typed_eid_range[c_etype][:,1])returnF.gather_row(typed_eid_range,partids)-end_diffdefnid2partid(self,nids,ntype=DEFAULT_NTYPE):"""From global node IDs to partition IDs"""nids=utils.toindex(nids)ifntype==DEFAULT_NTYPE:ret=np.searchsorted(self._max_node_ids,nids.tonumpy(),side="right")else:ret=np.searchsorted(self._typed_max_node_ids[ntype],nids.tonumpy(),side="right")ret=utils.toindex(ret)returnret.tousertensor()defeid2partid(self,eids,etype=DEFAULT_ETYPE):"""From global edge IDs to partition IDs"""eids=utils.toindex(eids)ifetypein(DEFAULT_ETYPE,DEFAULT_ETYPE[1]):ret=np.searchsorted(self._max_edge_ids,eids.tonumpy(),side="right")else:c_etype=self.to_canonical_etype(etype)ret=np.searchsorted(self._typed_max_edge_ids[c_etype],eids.tonumpy(),side="right")ret=utils.toindex(ret)returnret.tousertensor()defpartid2nids(self,partid,ntype=DEFAULT_NTYPE):"""From partition ID to global node IDs"""# TODO do we need to cache it?ifntype==DEFAULT_NTYPE:start=self._max_node_ids[partid-1]ifpartid>0else0end=self._max_node_ids[partid]returnF.arange(start,end)else:start=(self._typed_max_node_ids[ntype][partid-1]ifpartid>0else0)end=self._typed_max_node_ids[ntype][partid]returnF.arange(start,end)defpartid2eids(self,partid,etype=DEFAULT_ETYPE):"""From partition ID to global edge IDs"""# TODO do we need to cache it?ifetypein(DEFAULT_ETYPE,DEFAULT_ETYPE[1]):start=self._max_edge_ids[partid-1]ifpartid>0else0end=self._max_edge_ids[partid]returnF.arange(start,end)else:c_etype=self.to_canonical_etype(etype)start=(self._typed_max_edge_ids[c_etype][partid-1]ifpartid>0else0)end=self._typed_max_edge_ids[c_etype][partid]returnF.arange(start,end)defnid2localnid(self,nids,partid,ntype=DEFAULT_NTYPE):"""Get local node IDs within the given partition."""ifpartid!=self._partid:raiseRuntimeError("Now RangePartitionBook does not support \ getting remote tensor of nid2localnid.")nids=utils.toindex(nids)nids=nids.tousertensor()ifntype==DEFAULT_NTYPE:start=self._max_node_ids[partid-1]ifpartid>0else0else:start=(self._typed_max_node_ids[ntype][partid-1]ifpartid>0else0)returnnids-int(start)defeid2localeid(self,eids,partid,etype=DEFAULT_ETYPE):"""Get the local edge IDs within the given partition."""ifpartid!=self._partid:raiseRuntimeError("Now RangePartitionBook does not support \ getting remote tensor of eid2localeid.")eids=utils.toindex(eids)eids=eids.tousertensor()ifetypein(DEFAULT_ETYPE,DEFAULT_ETYPE[1]):start=self._max_edge_ids[partid-1]ifpartid>0else0else:c_etype=self.to_canonical_etype(etype)start=(self._typed_max_edge_ids[c_etype][partid-1]ifpartid>0else0)returneids-int(start)@propertydefpartid(self):"""Get the current partition ID."""returnself._partid@propertydefntypes(self):"""Get the list of node types"""returnself._ntypes@propertydefetypes(self):"""Get the list of edge types"""returnself._etypes@propertydefcanonical_etypes(self):"""Get the list of canonical edge types Returns ------- list[(str, str, str)] or list[None] A list of canonical etypes. If keys of ``edge_map`` and ``etypes`` are strings, a list of ``None`` is returned as canonical etypes are not available. """returnself._canonical_etypes@propertydeflocal_ntype_offset(self):"""Get the node type offset array of the local partition. The i-th element indicates the starting position of the i-th node type. """returnself._local_ntype_offset@propertydeflocal_etype_offset(self):"""Get the edge type offset array of the local partition. The i-th element indicates the starting position of the i-th edge type. """returnself._local_etype_offsetdefto_canonical_etype(self,etype):"""Convert an edge type to the corresponding canonical edge type. Parameters ---------- etype : str or (str, str, str) The edge type Returns ------- (str, str, str) The corresponding canonical edge type """ifisinstance(etype,tuple):ifetypenotinself.canonical_etypes:raiseDGLError('Edge type "{}" does not exist.'.format(etype))returnetyperet=self._etype2canonical.get(etype,None)ifretisNone:raiseDGLError('Edge type "{}" does not exist.'.format(etype))iflen(ret)==0:raiseDGLError('Edge type "%s" is ambiguous. Please use canonical edge type '"in the form of (srctype, etype, dsttype)"%etype)returnretNODE_PART_POLICY="node"EDGE_PART_POLICY="edge"POLICY_DELIMITER="~"
[docs]classPartitionPolicy(object):"""This defines a partition policy for a distributed tensor or distributed embedding. When DGL shards tensors and stores them in a cluster of machines, it requires partition policies that map rows of the tensors to machines in the cluster. Although an arbitrary partition policy can be defined, DGL currently supports two partition policies for mapping nodes and edges to machines. To define a partition policy from a graph partition book, users need to specify the policy name ('node' or 'edge'). Parameters ---------- policy_str : str Partition policy name, e.g., 'edge~_N:_E:_N' or 'node~_N'. partition_book : GraphPartitionBook A graph partition book """def__init__(self,policy_str,partition_book):assertpolicy_str.startswith(NODE_PART_POLICY)orpolicy_str.startswith(EDGE_PART_POLICY),(f"policy_str must start with {NODE_PART_POLICY} or "f"{EDGE_PART_POLICY}, but got {policy_str}.")ifNODE_PART_POLICY==policy_str:policy_str=NODE_PART_POLICY+POLICY_DELIMITER+DEFAULT_NTYPEifEDGE_PART_POLICY==policy_str:policy_str=EDGE_PART_POLICY+POLICY_DELIMITER+DEFAULT_ETYPE[1]self._policy_str=policy_strself._part_id=partition_book.partidself._partition_book=partition_bookpart_policy,self._type_name=policy_str.split(POLICY_DELIMITER,1)ifpart_policy==EDGE_PART_POLICY:self._type_name=_etype_str_to_tuple(self._type_name)self._is_node=self.policy_str.startswith(NODE_PART_POLICY)@propertydefpolicy_str(self):"""Get the policy name Returns ------- str The name of the partition policy. """returnself._policy_str@propertydeftype_name(self):"""Get the type name: ntype or etype Returns ------- str or (str, str, str) The ntype or etype. """returnself._type_name@propertydefpart_id(self):"""Get partition ID Returns ------- int The partition ID """returnself._part_id@propertydefpartition_book(self):"""Get partition book Returns ------- GraphPartitionBook The graph partition book """returnself._partition_book@propertydefis_node(self):"""Indicate whether the policy is for node or edge Returns ------- bool node or edge """returnself._is_nodedefget_data_name(self,name):"""Get HeteroDataName"""returnHeteroDataName(self.is_node,self.type_name,name)
[docs]defto_local(self,id_tensor):"""Mapping global ID to local ID. Parameters ---------- id_tensor : tensor Gloabl ID tensor Return ------ tensor local ID tensor """ifself.is_node:returnself._partition_book.nid2localnid(id_tensor,self._part_id,self.type_name)else:returnself._partition_book.eid2localeid(id_tensor,self._part_id,self.type_name)
[docs]defto_partid(self,id_tensor):"""Mapping global ID to partition ID. Parameters ---------- id_tensor : tensor Global ID tensor Return ------ tensor partition ID """ifself.is_node:returnself._partition_book.nid2partid(id_tensor,self.type_name)else:returnself._partition_book.eid2partid(id_tensor,self.type_name)
[docs]defget_part_size(self):"""Get data size of current partition. Returns ------- int data size """ifself.is_node:returnlen(self._partition_book.partid2nids(self._part_id,self.type_name))else:returnlen(self._partition_book.partid2eids(self._part_id,self.type_name))
[docs]defget_size(self):"""Get the full size of the data. Returns ------- int data size """ifself.is_node:returnself._partition_book._num_nodes(self.type_name)else:returnself._partition_book._num_edges(self.type_name)
classNodePartitionPolicy(PartitionPolicy):"""Partition policy for nodes."""def__init__(self,partition_book,ntype=DEFAULT_NTYPE):super(NodePartitionPolicy,self).__init__(NODE_PART_POLICY+POLICY_DELIMITER+ntype,partition_book)classEdgePartitionPolicy(PartitionPolicy):"""Partition policy for edges."""def__init__(self,partition_book,etype=DEFAULT_ETYPE):assert(isinstance(etype,tuple)andlen(etype)==3),f"Expect canonical edge type in a triplet of string, but got {etype}."super(EdgePartitionPolicy,self).__init__(EDGE_PART_POLICY+POLICY_DELIMITER+_etype_tuple_to_str(etype),partition_book,)classHeteroDataName(object):"""The data name in a heterogeneous graph. A unique data name has three components: * indicate it's node data or edge data. * indicate the node/edge type. * the name of the data. Parameters ---------- is_node : bool Indicate whether it's node data or edge data. entity_type : str or (str, str, str) The type of the node/edge. data_name : str The name of the data. """def__init__(self,is_node,entity_type,data_name):self._policy=NODE_PART_POLICYifis_nodeelseEDGE_PART_POLICYifnotis_node:assertisinstance(entity_type,tuple)andlen(entity_type)==3,("Expect canonical edge type in a triplet of string, but got "f"{entity_type}.")self._entity_type=entity_typeself.data_name=data_name@propertydefpolicy_str(self):"""concatenate policy and entity type into string"""entity_type=self.get_type()ifself.is_edge():entity_type=_etype_tuple_to_str(entity_type)returnself._policy+POLICY_DELIMITER+entity_typedefis_node(self):"""Is this the name of node data"""returnself._policy==NODE_PART_POLICYdefis_edge(self):"""Is this the name of edge data"""returnself._policy==EDGE_PART_POLICYdefget_type(self):"""The type of the node/edge. This is only meaningful in a heterogeneous graph. In homogeneous graph, type is '_N' for a node and '_N:_E:_N' for an edge. """returnself._entity_typedefget_name(self):"""The name of the data."""returnself.data_namedef__str__(self):"""The full name of the data. The full name is used as the key in the KVStore. """returnself.policy_str+POLICY_DELIMITER+self.data_namedefparse_hetero_data_name(name):"""Parse data name and create HeteroDataName. The data name has a specialized format. We can parse the name to determine if it's node data or edge data, node/edge type and its actual name. The data name has three fields and they are separated by ":". Parameters ---------- name : str The data name Returns ------- HeteroDataName """names=name.split(POLICY_DELIMITER)assertlen(names)==3,"{} is not a valid heterograph data name".format(name)assertnames[0]in(NODE_PART_POLICY,EDGE_PART_POLICY,),"{} is not a valid heterograph data name".format(name)is_node=names[0]==NODE_PART_POLICYentity_type=names[1]ifnotis_node:entity_type=_etype_str_to_tuple(entity_type)returnHeteroDataName(is_node,entity_type,names[2])