### −∗− mode : python ; −∗−
# @file NetworkEpiModel.py
# @author Bruno Goncalves
######################################################
from typing import Union, Optional
import networkx as nx
import numpy as np
from numpy import linalg
from numpy import random
import pandas as pd
import matplotlib.pyplot as plt
from .EpiModel import EpiModel
from collections import Counter
from . import utils
[docs]
class NetworkEpiModel(EpiModel):
def __init__(self, network:nx.Graph, compartments: Optional[str]=None):
super(NetworkEpiModel, self).__init__(compartments)
self.network = network
self.kavg_ = 2 * network.number_of_edges() / network.number_of_nodes()
self.spontaneous = {}
self.interactions = {}
self.params = utils.Parameters()
[docs]
def integrate(self, timesteps, **kwargs):
raise NotImplementedError("Network Models don't support numerical integration")
[docs]
def add_interaction(
self, source: str, target: str, agent: str, rate:Optional[float] = None, rescale: bool = False, **rates
) -> None:
if rescale:
if rate:
rate /= self.kavg_
else:
rates_names = list(rates.keys())
rates[rates_names[0]] /= self.kavg_
super(NetworkEpiModel, self).add_interaction(
source, target, agent=agent, rate=rate, **rates
)
if rate is not None:
count = len(self.params) + 1
rate_key = "rate" + str(count)
else:
self.params.define_parameters(**rates)
rates = list(rates.keys())
rate_key = rates[0]
if source not in self.interactions:
self.interactions[source] = {}
if target not in self.interactions[source]:
self.interactions[source] = {}
self.interactions[source][agent] = {"target": target, "rate": rate_key}
[docs]
def add_spontaneous(self, source: str, target: str, rate: Optional[float], **rates):
super(NetworkEpiModel, self).add_spontaneous(source, target, rate=rate, **rates)
if rate is not None:
count = len(self.params) + 1
rate_key = "rate" + str(count)
else:
self.params.define_parameters(**rates)
rates = list(rates.keys())
rate_key = rates[0]
if source not in self.spontaneous:
self.spontaneous[source] = {}
if target not in self.spontaneous[source]:
self.spontaneous[source] = {}
self.spontaneous[source][target] = rate_key
[docs]
def simulate(self, timesteps: int, seeds, **kwargs) -> None:
"""Stochastically simulate the epidemic model"""
pos = {comp: i for i, comp in enumerate(self.transitions.nodes())}
N = self.network.number_of_nodes()
population = np.zeros((timesteps, N), dtype="str")
comps = list(self.transitions.nodes)
time = np.arange(1, timesteps, 1, dtype="int")
susceptible = self._get_susceptible().pop()
active_nodes = set()
current_active = set()
active_states = self._get_active()
for node in range(N):
if node in seeds:
population[0, node] = seeds[node]
active_nodes.add(node)
else:
population[0, node] = susceptible
infections = self._get_infections()
for t in time:
population[t] = np.copy(population[t - 1])
if len(active_nodes) == 0:
continue
current_active = list(active_nodes)
self.rng.shuffle(current_active)
for node_i in current_active:
state_i = population[t - 1, node_i]
if state_i in infections:
# contact each neighbour to see if we infect them
NN = list(self.network.neighbors(node_i))
self.rng.shuffle(NN)
for node_j in NN:
state_j = population[t - 1, node_j]
if state_j in infections[state_i]:
prob = self.rng.random()
rate = self.params[infections[state_i][state_j]["rate"]]
if prob < rate:
new_state = infections[state_i][state_j]["target"]
population[t, node_j] = new_state
if new_state in active_states:
active_nodes.add(node_j)
if state_i in self.spontaneous:
n_trans = len(self.spontaneous[state_i])
prob = np.zeros(len(pos))
for target in self.spontaneous[state_i]:
rate = self.params[self.spontaneous[state_i]["target"]]
prob[pos[target]] = rate
prob[pos[state_i]] = 1 - np.sum(prob)
new_state = comps[np.argmax(random.multinomial(1, prob))]
if new_state != state_i:
population[t, node_i] = new_state
active_nodes.add(node_i)
if new_state not in active_states:
active_nodes.remove(node_i)
continue
self.population_ = pd.DataFrame(population)
self.values_ = (
pd.DataFrame.from_records(
self.population_.apply(lambda x: Counter(x), axis=1)
)
.fillna(0)
.astype("int")
)
[docs]
def R0(self):
if "R" not in set(self.transitions.nodes):
return None
return np.round(super(NetworkEpiModel, self).R0() * self.kavg_, 2)