Source code for kedgeswap.Graph

# This file is part of K-edge-swap.
#
#    K-edge-swap is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
#
#    K-edge-swap is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License along with Foobar. If not, see <https://www.gnu.org/licenses/>. 
import os
#import ipdb

import numpy as np
import argparse

from pathlib import Path
from collections import defaultdict


[docs]class Graph: """ Read input graph and store graph as adjacency list Attributes ---------- N: int number of nodes M: int number of edges neighbors: dict(list) store adjacency list for each node in_neighbors: dict(list) used only in directed graph, for each node store their neighbors from "in-edges" out_neighbors: dict(list) used only in directed graph, for each node store their neighbors from "out-edges" edges: dict() | in undirected graph: | * for each edge (u,v), store the position\ | of v in the adjacency list of u\ | in directed graph: | * for each edge (u,v), store a quartuplet\ | (v_idx, u_idx, v_out_idx, u_in_idx), where:\ | v_idx is the position of v in u's adjacency list\ | u_idx is the position of u in v's adjacency list\ | v_out_idx is the position of v in out_neighbors[u]_\ | u_in_idx is the position of u in in_neighbors[v]\ unique_edges: list() used mostly for undirected graph, to store one version\ of each edge directed: bool enable if graph is directed """ def __init__(self, directed = False): self.N = 0 # number of node self.M = 0 # number of edges # adjacency lists self.neighbors = defaultdict(list) self.in_neighbors = dict() self.out_neighbors = dict() # store edges and the indexes of neighbors in adjacency lists.² self.edges = dict() self.unique_edges = list() self.directed = directed # directed graph flag self.dataset_name = None
[docs] def copy(self): # Make a copy of all the graphs data structures graph_copy = Graph() graph_copy.N = self.N graph_copy.M = self.M graph_copy.neighbors = self.neighbors.copy() graph_copy.in_neighbors = self.in_neighbors.copy() graph_copy.out_neighbors = self.out_neighbors.copy() graph_copy.edges = self.edges.copy() graph_copy.unique_edges = self.unique_edges.copy() graph_copy.directed = self.directed return graph_copy
[docs] def read_ssv(self, in_file): """ Read space separated values Input format is separated with spaces or tabulations, e.g.: | 0 1 | 3 2 | 2 4 | . | . | . where the first columns is the source node and the second column is the destination node. |When self.directed == True, the graph is considered directed and edges are stored as written in the file, else they are stored as (src, dest) with src < dest. Self Loop and multi-graphs are not accepted. Parameters ---------- in_file: str path to the input file """ self.dataset_name = Path(in_file).stem with open(in_file, 'r') as fin: for line in fin: if line == '\n': # skip empty lines continue if line.startswith('%'): # skip first line of konnect format continue _node_in, _node_out = line.strip().split(' ') node_in, node_out = int(_node_in), int(_node_out) if _node_in == _node_out: #raise ValueError("self loop detected") continue # add to graph if self.directed : self.neighbors[node_in].append(node_out) self.neighbors[node_out].append(node_in) # in/out adjacency lists are useful for dyads if node_in not in self.out_neighbors: self.out_neighbors[(node_in)] = [] if node_out not in self.out_neighbors: self.out_neighbors[(node_out)] = [] if node_in not in self.in_neighbors: self.in_neighbors[(node_in)] = [] if node_out not in self.in_neighbors: self.in_neighbors[(node_out)] = [] self.out_neighbors[(node_in)].append(node_out) self.in_neighbors[(node_out)].append(node_in) self.M += 1 # for directed graph, keep indexes in all adjacency lists self.edges[(node_in, node_out)] = (len(self.neighbors[node_in]) -1, len(self.neighbors[node_out]) -1, len(self.out_neighbors[node_in]) - 1, len(self.in_neighbors[node_out]) - 1) self.unique_edges.append((node_in, node_out)) else: if (node_in, node_out) in self.edges: # edge already exists continue else: if node_in < node_out: self.unique_edges.append((node_in, node_out)) else: self.unique_edges.append((node_out, node_in)) self.neighbors[node_in].append(node_out) self.edges[(node_in, node_out)] = len(self.neighbors[node_in]) -1 self.neighbors[node_out].append(node_in) self.edges[(node_out, node_in)] = len(self.neighbors[node_out]) -1 assert len(self.unique_edges) == len(set(self.unique_edges)) self.M = len(self.unique_edges) self.N = len(self.neighbors)
[docs] def read_ael(self, in_file): """ Read ael format TODO example format TODO lecteurs pour d'autres formats """ with open(in_file, 'r') as fin: for node_in, line in enumerate(fin): if line == '\n': # skip empty lines continue _neighbor_list = line.strip().split(' ') neighbor_list = [int(node_out) for node_out in _neighbor_list] # check self loop if node_in in neighbor_list: raise ValueError("self loop detected") # check no repetition if len(set(neighbor_list)) != len(neighbor_list): print('warning : removing duplicate values') neighbor_list = list(set(neighbor_list)) #raise ValueError("repetition detected") # add to graph if self.directed : # TODO : adapt AEL to directed for node_idx, node_out in enumerate(neighbor_list): self.M += 1 self.neighbors[node_in].append(node_out) self.edges[(node_in, node_out)] = len(self.neighbors[node_in]) -1 self.unique_edges.append((node_in, node_out)) else: for node_idx, node_out in enumerate(neighbor_list): # probablement pas utile et overkill..? if (node_in, node_out) in self.edges: continue else: if node_in < node_out: self.unique_edges.append((node_in, node_out)) else: self.unique_edges.append((node_out, node_in)) self.neighbors[node_in].append(node_out) self.edges[(node_in, node_out)] = len(self.neighbors[node_in]) -1 self.neighbors[node_out].append(node_in) self.edges[(node_out, node_in)] = len(self.neighbors[node_out]) -1 assert len(self.unique_edges) == len(set(self.unique_edges)) self.M = len(self.unique_edges) #TODO for directed graph self.N = node_in
[docs] def to_ael(self, output): to_write = dict() for node_in, node_out in self.edges: if node_in not in to_write: to_write[node_in] = [] if node_out not in to_write: to_write[node_out] = [] to_write[node_in].append(str(node_out)) to_write[node_out].append(str(node_in)) with open(output, 'w') as fout: for node_in in range(self.N + 1): if node_in in to_write: fout.write(f'{" ".join(to_write[node_in])}\n') else: fout.write(f'\n')
[docs] def to_ssv(self, output): with open(output, 'w') as fout: for node_in, node_out in self.unique_edges: fout.write(f'{node_in} {node_out}\n')