Source code for kedgeswap.Stat

# This file is part of K-edge-swap.
#
#    K-edge-swap is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
#
#    K-edge-swap is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License along with K-edge-swap. If not, see <https://www.gnu.org/licenses/>. 

import os
import time
import scipy
import argparse
import numpy as np

from scipy.stats import kstest
from arch.unitroot import DFGLS
from kedgeswap.Graph import Graph
from progressbar import ProgressBar
from joblib import Parallel, delayed
from kedgeswap.MarkovChain import MarkovChain

[docs]class Stat(): """ Class to compute statistics on a Markov Chain object. This class implements methods to estimate the Markov Chain's sampling gap, and to follow its convergence using the DFGLS test. Attributes: ----------- mc: MarkovChain object The MarkovChain object on which we follow the convergence. eta: float The sampling gap used for the Markov Chain. The sampling gap gives a number of steps to make on the Markov Chain to obtain two uncorrelated graphs. turbo: bool Enable to make a fast but unverified estimation of the sampling gap. verbose: bool Enable to add information to the logs """ def __init__(self, mc, eta=None, turbo=False, verbose=False, njobs=1): #self.use_ks = use_ks self.mc = mc # markov chain self.eta = eta self.turbo = turbo self.verbose = verbose self.njobs = njobs
[docs] @staticmethod def CheckAutocorrLag1(S_T, alpha): """ Check the autocorrelation with lag 1 of a time serie. Parameters ---------- S_T: list(float) List of assortativity(/number of triangles) values to test autocorrelation alpha: float Significance level of the test (usually fixed to 0.04). """ T = len(S_T) tau = 10 # lag at which the sample autocorr is calculated mean_st = np.mean(S_T) var_st = np.var(S_T) R_1 = 1/T * sum( np.multiply(S_T[:-1] -mean_st, S_T[1:] - mean_st)) R_0 = 1/T * sum( np.multiply(S_T -mean_st, S_T - mean_st)) #a_i = R_1/R_0 a = R_1/R_0 #a = np.correlate(S_T-mean_st, S_T-mean_st, mode='full')[T] # mode=full: convolution over each point of overlap - take value at T to get lag=1 #a = a/ (var_st * len(S_T)) mu = -1/T sigma_2 = (T**4 - 4 * T**3 + 4 * T - 4) / ((T+1)* T**2 * (T-1)**2) A = (a - mu)/np.sqrt(sigma_2) z = scipy.stats.norm.ppf(1-alpha) if A > z: return 1 else: return 0
[docs] def guesstimate_sampling_gap(self, graph, gamma): """ Sampling gap estimation is long, this function gives an empirical estimation of the sampling gap. Measure the acceptation rate A of the Markov Chain, and fix the sampling gap as 10*(1/A) * M, where M is the number of edges of the network. This estimation was fixed empirically to overestimate the sampling gap we measure using the estimation from Dutta, U. (2022). """ # run a short burn in N_swap = 5 * self.mc.graph.M # burn in burn_in = MarkovChain(graph, N_swap, gamma, use_jd=self.mc.use_jd, use_triangles=self.mc.use_triangles, use_assortativity=self.mc.use_assortativity, use_mutualdiades=self.mc.use_mutualdiades, verbose=self.mc.verbose, keep_record=False, log_dir=None) burn_in.run() # estimate the acceptation rate of the Markov Chain burn_in_rate = burn_in.accept_rate / (burn_in.accept_rate + burn_in.refusal_rate) # take a large estimate to overestimate minimum sampling gap value eta = 10/burn_in_rate * graph.M #print(f'guesstimation: burn in rate {burn_in_rate}, using eta {eta}') return eta
#def linear_estimate_sampling_gap(self, graph, gamma): # """ Estimate the sampling gap for the MCMC, following algorithm 1 (and using the same values) of # Dutta, U. (2022). Sampling random graphs with specified degree sequences # """ # # run 1 Markov chain to get the S_T timeserie - used to parallelize. # def run_chain(c): # S_T = [] # n_swap = int(np.round(eta)) # for t in range(T): # mc[c].run(n_swap) # if mc[c].use_assortativity: # S_T.append(mc[c].assortativity) # else: # S_T.append(len(mc[c].triangles2edges)) # return S_T # N_swap = 1000 * self.mc.graph.M # burn in # C = 10 # T = 500 # #S_T = [] # list of degree assortativity of size T # u = 1 # lower bound on number of mcmc chains that have significant lag-1 autocorrelation # mc = [] # list of C MCMC # alpha = 0.04 # significance level for each test # if self.verbose: # print(f'estimation parameters: N_swap {N_swap}, C {C}, T {T}, u {u}, alpha {alpha}') # print(f'burn in...') # #for c in range(C): # # if self.verbose: # # print(f'MCMC {c}/{C}') # # mc.append(MarkovChain(graph, N_swap, gamma)) # # mc[c].run() # burn_in = MarkovChain(graph, N_swap, gamma, use_jd=self.mc.use_jd, use_triangles=self.mc.use_triangles, # use_assortativity=self.mc.use_assortativity, # verbose=self.mc.verbose, keep_record=False, log_dir=None) # burn_in.run() # #burn_in_rate = burn_in.accept_rate / (burn_in.accept_rate + burn_in.refusal_rate) # # Measure sampling gap # if self.verbose: # print(f'measuring sampling gap...') # # first eta is estimated from the accept/refuse rate of the burn in # # then dichotomic search to get better eta # #eta = 10 * self.mc.graph.M # #eta = 1/burn_in_rate * self.mc.graph.M # eta=0 # d_eta = C # #prev_d_eta = C # #prev_eta = eta # #tuned = False # while (d_eta > u): # eta += 0.05 * graph.M # if self.verbose: # print(f'considering eta {eta}...') # d_eta = 0 # for c in range(C): # S_T = [] # if d_eta > u: # continue # if len(mc) <= c: # mc.append(MarkovChain(burn_in.graph.copy(), N_swap, gamma, use_jd=self.mc.use_jd, use_triangles=self.mc.use_triangles, use_assortativity=self.mc.use_assortativity, verbose=self.mc.verbose, keep_record=False, log_dir=None)) # else: # mc[c] = MarkovChain(burn_in.graph.copy(), N_swap, gamma, use_jd=self.mc.use_jd, use_triangles=self.mc.use_triangles, use_assortativity=self.mc.use_assortativity, verbose=self.mc.verbose, keep_record=False, log_dir=None) # #mc[c].run() # S_Ts = Parallel(n_jobs=5)(delayed(run_chain)(c) for c in range(C)) # for c in range(C): # d_c = self.CheckAutocorrLag1(S_Ts[c], alpha) # d_eta += d_c # if self.verbose: # print(f'for eta={eta}: d_eta={d_eta}, u={u}') # #if d_eta <= u: # # prev_eta = eta # # prev_d_eta = d_eta # # eta = eta/2 # # #tuned = True # #elif d_eta > u and prev_d_eta <= u: # # prev_d_eta = d_eta # # tuned = True # # eta = prev_eta # #elif d_eta > u and prev_d_eta > u: # # prev_d_eta = d_eta # # prev_eta = eta # # eta = 2 * eta # return eta
[docs] def estimate_sampling_gap(self, graph, gamma): """ Estimate the sampling gap for the MCMC, following algorithm 1 (and using the same values) of Dutta, U. (2022). Sampling random graphs with specified degree sequences """ # run 1 Markov chain to get the S_T timeserie - used to parallelize. def run_chain(c): S_T = [] n_swap = int(np.round(eta)) for t in range(T): mc[c].run(n_swap) if mc[c].use_assortativity: S_T.append(mc[c].assortativity) else: S_T.append(len(mc[c].triangles2edges)) return S_T N_swap = 1000 * self.mc.graph.M # burn in C = 10 T = 500 #S_T = [] # list of degree assortativity of size T u = 1 # lower bound on number of mcmc chains that have significant lag-1 autocorrelation mc = [] # list of C MCMC alpha = 0.04 # significance level for each test if self.verbose: print(f'eta estimation parameters: N_swap {N_swap}, C {C}, T {T}, u {u}, alpha {alpha}') print(f'burn in...') #for c in range(C): # if self.verbose: # print(f'MCMC {c}/{C}') # mc.append(MarkovChain(graph, N_swap, gamma)) # mc[c].run() # run long burn in to reach convergence of the markov chain burn_in = MarkovChain(graph, N_swap, gamma, use_jd=self.mc.use_jd, use_triangles=self.mc.use_triangles, use_assortativity=self.mc.use_assortativity, use_mutualdiades=self.mc.use_mutualdiades, verbose=self.mc.verbose, keep_record=False, log_dir=None) burn_in.run() # estimate the acceptation rate of the markov chain burn_in_rate = burn_in.accept_rate / (burn_in.accept_rate + burn_in.refusal_rate) if self.verbose: print('Burn In : acceptation/refusals by k') print(burn_in.accept_rate_byk) print(burn_in.refusal_rate_byk) # Measure sampling gap if self.verbose: print(f'measuring sampling gap...') # first eta is estimated from the accept/refuse rate of the burn in # then dichotomic search to get better eta #eta = 10 * self.mc.graph.M eta = 1/burn_in_rate * self.mc.graph.M d_eta = C prev_d_eta = C prev_eta = eta tuned = False while (not tuned): if self.verbose: print(f'considering eta {eta}...') d_eta = 0 for c in range(C): S_T = [] if d_eta > u: continue if len(mc) <= c: mc.append(MarkovChain(burn_in.graph.copy(), N_swap, gamma, use_jd=self.mc.use_jd, use_triangles=self.mc.use_triangles, use_assortativity=self.mc.use_assortativity, use_mutualdiades=self.mc.use_mutualdiades, verbose=self.mc.verbose, keep_record=False, log_dir=None)) else: mc[c] = MarkovChain(burn_in.graph.copy(), N_swap, gamma, use_jd=self.mc.use_jd, use_triangles=self.mc.use_triangles, use_assortativity=self.mc.use_assortativity, use_mutualdiades=self.mc.use_mutualdiades, verbose=self.mc.verbose, keep_record=False, log_dir=None) #mc[c].run() S_Ts = Parallel(n_jobs=self.njobs)(delayed(run_chain)(c) for c in range(C)) for c in range(C): d_c = self.CheckAutocorrLag1(S_Ts[c], alpha) d_eta += d_c #if self.verbose: # print(f'for eta={eta}: d_eta={d_eta}, u={u}') # check if eta value is accepted - if a most u chains show no correlation # on the S_T timeserie with lag 1, the value of eta is considered valid. if d_eta <= u: if self.verbose: print('eta {eta} accepted (d_eta={d_eta} <= u={u})') prev_eta = eta prev_d_eta = d_eta if prev_eta == eta/2: # don't check eta/2 again tuned = True else: print('trying eta=eta/2...') eta = eta/2 #tuned = True elif d_eta > u and prev_d_eta <= u: prev_d_eta = d_eta tuned = True if self.verbose: print('eta {eta} refused (d_eta={d_eta} <= u={u}), using eta={prev_eta}.') eta = prev_eta elif d_eta > u and prev_d_eta > u: prev_d_eta = d_eta prev_eta = eta eta = 2 * eta if self.verbose: print('eta {prev_eta} refused (d_eta={d_eta} <= u={u}), trying eta={eta}.') return eta
[docs] def run_dfgls(self, output): """ If no sampling gap eta specified, run estimation of eta. Run Markov Chain for eta steps, retrieve list of assortativity values (or number of triangles) and estimate the convergence of this time serie, to decide if the Markov Chain is converged. """ # Get sampling gap value if self.eta is None: t0 = time.time() if self.turbo: # turbo estimation eta = self.guesstimate_sampling_gap(self.mc.graph, self.mc.gamma) else: # dichotomic estimation eta = self.estimate_sampling_gap(self.mc.graph, self.mc.gamma) self.eta = eta t1 = time.time() eta_time = t1 - t0 #print(f'eta estimation {t1 - t0} seconds') else: # use eta given in input eta = self.eta eta_time = 0 # Run the markov chain, and check its convergence using the DFGLS test has_converged = False if self.verbose: print('running markov chain and checking convergence...') t0 = time.time() while (not has_converged): window = self.mc.run(int(np.round(eta))) test = DFGLS(window) try: # in some very rare cases, usually when very few swaps are accepted, the DFGLS # doesn't have enough different values to run, so it raises an error. Catch that error and # run the markov chain some more. if self.verbose: print(f'test statistic : {test.stat},\n Markov chain is stationnary if test statistic < {test.critical_values["1%"]}') #print(test.summary) if test.stat < test.critical_values["1%"]: has_converged = True self.mc.graph.to_ssv(output) except: print("Warning, dfgls doesn't have enough unique observations to compute.") continue # some verbose output if self.verbose: print('Convergence : acceptation by k') for k in self.mc.accept_rate_byk: print(f'({k}: {self.mc.accept_rate_byk[k]})', end=', ') print('\nConvergence : refusal by k') for k in self.mc.refusal_rate_byk: print(f'({k}: {self.mc.refusal_rate_byk[k]})', end=', ') print('') t1 = time.time() conv_time = t1 - t0 print(f'eta estimation took {eta_time} seconds') print(f'convergence took {conv_time} seconds')