diff --git a/neet/boolean/modularity.py b/neet/boolean/modularity.py new file mode 100644 index 00000000..d744230d --- /dev/null +++ b/neet/boolean/modularity.py @@ -0,0 +1,1270 @@ +# from Doug Moore 12.18.2018 +# +# modified by Bryan Daniels + +import numpy as np +import networkx as nx +import pyinform as pi + +from itertools import chain,combinations +import copy + +#from neet.interfaces import is_network, is_fixed_sized +from neet.network import Network +from neet.synchronous import Landscape + +def attractors_brute_force(net, size=None, subgraph=None): + if not isinstance(net, Network): + raise TypeError("net must be a network or a networkx DiGraph") + + if size is None: + size = net.size + + pin = [ n for n in range(size) if n not in subgraph ] + ls = Landscape(net,pin=pin) + return ls.attractors + +def greatest_predecessors(dag, n): + pred = list(dag.predecessors(n)) + N = len(pred) + greatest = [] + for i in range(N): + is_greatest = True + for j in range(N): + if i != j and nx.has_path(dag, pred[i], pred[j]): + is_greatest = False + break + if is_greatest: + greatest.append(pred[i]) + + return greatest + + +# modified from https://www.kkhaydarov.com/greatest-common-divisor-python/ +def gcd(a,b): + + if a == 0 and b == 0: + return 0 + + while b != 0: + new_a = b + new_b = a % b + + a = new_a + b = new_b + + return a + +# from https://www.w3resource.com/python-exercises/python-basic-exercise-32.php +def lcm(x, y): + if x > y: + z = x + else: + z = y + + while(True): + if((z % x == 0) and (z % y == 0)): + lcm = z + break + z += 1 + + return lcm + +def _merge(nodes_module_1, attractor_module_1, attractor_module_2, overlap): + """ + Merge two attractors from different modules. + + attractor_module_1 and attractor_module_2 should each be decoded attractors + (numpy arrays of decoded states). + + Returns a list of attractors as a numpy array. + Each attractor has shape (#timesteps)x(net.size) + """ + size = len(attractor_module_1[0]) + # check for easier cases + # (below code also works in these cases, but it is slower) + if len(attractor_module_1) == 1 and len(attractor_module_2) == 1: + if np.all( attractor_module_1[0,overlap] == attractor_module_2[0,overlap] ): + result = np.empty([1,1,size]) + result[0,0,:] = attractor_module_2[0,:] + result[0,0,nodes_module_1] = attractor_module_1[0,nodes_module_1] + return result + else: + return [] + else: + # BCD 12.21.2018 deal with attractors of differing length + len1 = len(attractor_module_1) + len2 = len(attractor_module_2) + len_joint = lcm(len1,len2) + num_joint = gcd(len1,len2) + #if len1 > 1 or len2 > 1: + # print "len1 =",len1,"len2 =",len2,"len_joint =",len_joint,"num_joint =",num_joint + result = np.empty([num_joint,len_joint,size]) + matched_attractor_count = 0 # not all potential attractors will match on overlap + #joint1 = np.tile(attractor_module_1,len_joint/len1) + #joint2 = np.tile(attractor_module_2,len_joint/len2) + # loop over relative phase + for attractor_index in range(num_joint): + match = False + #joint2_rolled = np.roll(joint2,attractor_index,axis=0) + # loop over timesteps within the resulting merged attractor + for time_index in range(len_joint): + #state_module_1 = joint1[time_index] + #state_module_2 = joint2_rolled[time_index] + state_module_1 = attractor_module_1[time_index%len1] + state_module_2 = attractor_module_2[(time_index+attractor_index)%len2] + # BCD this may be overly conservative. we might be able to get + # away with checking only one state in the joint attractor + if np.all( state_module_1[overlap] == state_module_2[overlap] ): + match = True + result[matched_attractor_count,time_index,:] = state_module_2[:] + result[matched_attractor_count,time_index,nodes_module_1] = \ + state_module_1[nodes_module_1] + if match: + matched_attractor_count += 1 + return result[:matched_attractor_count] + + +def direct_sum(modules, attrsList, cksList=None, dynamic=False): + """ + Returns a list of decoded attractors (and optionally a corresponding list + of control kernels with the same length). Each attractor is a numpy array of + decoded states. + + modules : List of lists of nodes in each module. + Length #modules. Each module has a varying number + of node indices. + attrsList : List of lists of attractors, one for each module. + Each attractor is a numpy array of decoded states. + Length #modules. Each attractor list has a varying + number of attractors, and each attractor has a varying + length. + cksList (None) : (optional) List of lists of sets of control kernel nodes. + If given, the function will also return a list of sets + of control kernel nodes corresponding to each returned + attractor. + Length #modules. Each list of sets of control kernel + nodes has length equal to the number of attractors for + the corresponding module. Each set of control kernel + nodes has a varying length. + dynamic (False) : If False, compute 'static' control kernels. + If True, compute 'dynamic' control kernels + (dynamic control kernels not yet supported). + """ + assert(len(modules)==len(attrsList)) + if cksList is None: + return_cks = False + cksList = [ [ set() for attr in attrs ] for attrs in attrsList ] + else: + return_cks = True + + if len(modules) == 0: + result = [] + result_cks = [] + elif len(modules) == 1: + result = attrsList[0] + result_cks = cksList[0] + else: + result = [] + result_cks = [] + attractors_of_remaining,cks_of_remaining = \ + direct_sum(modules[1:], attrsList[1:], cksList=cksList[1:],dynamic=dynamic) + assert( len(attrsList[0]) == len(cksList[0]) ) + for attractor,ck in zip(attrsList[0],cksList[0]): + subresult = [] + subresult_cks = [] + for attractor_of_remaining,ck_of_remaining \ + in zip(attractors_of_remaining,cks_of_remaining): + allcomponentnodes = set.union(*modules[1:]) + overlap = list(set.intersection(allcomponentnodes,modules[0])) + merged = _merge(list(modules[0]), attractor, attractor_of_remaining, overlap) + + # () merge control kernels + if dynamic: + raise Exception("Dynamic control kernels not yet supported") + # 3.29.2019 BCD if multiple valid attractors vary only in their + # attractor_index (relative phase), then these attractors + # do not have a static control kernel (I think). + # This corresponds to len(merged) > 1. + if len(merged) > 1: + ck_merged = None + else: + # BCD 2.16.2019 I'm not completely convinced of the following line + ck_merged = merge_cks(ck,ck_of_remaining) + + if len(merged) > 0: + subresult += list(merged) + subresult_cks += [ ck_merged for m in merged ] + if len(subresult) > 0: + result += subresult + result_cks += subresult_cks + if [] in result: # sanity check + raise Exception + + if return_cks: + assert(len(result)==len(result_cks)) # sanity check + return result,result_cks + else: + return result + +def merge_cks(ck1,ck2): + """ + Return union of two sets of control kernels. + + If either set is None, return None. This handles cases for which + a module does not have a valid control kernel. In this case there + is also no valid control kernel for the larger network. + """ + if (ck1 == None) or (ck2 == None): + return None + else: + return set.union(ck1,ck2) + + +# 12.19.2018 BCD +def remove_duplicates(attractors): + # not sure the best way to do this. for now, just remove if they share the + # same first state + first_states = [ a[0] for a in attractors ] + attractor_dict = dict(zip(first_states,attractors)) + return attractor_dict.values() + +# 12.19.2018 BCD +def all_ancestors(dag,module_number): + """ + Return all ancestor modules of a given module + """ + ancestors = nx.shortest_path(dag.reverse(),module_number).keys() + return ancestors + +def leaves(dag): + """ + Find modules with zero out-degree. + """ + lst = [] + for n in dag.nodes: + if dag.out_degree(n) == 0: + lst.append(n) + return lst + +def network_modules(net): + """ + (Not actually used in attractors function at the moment---convenient + to see whether we expect modularity to speed things up significantly.) + """ + if not isinstance(net,Network): + raise TypeError("net must be a network or a networkx DiGraph") + + g = net.to_networkx_graph() + + modules = list(nx.strongly_connected_components(g)) + return modules + +# taken from https://docs.python.org/3/library/itertools.html#recipes +def powerset(iterable): + "powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)" + s = list(iterable) + return chain.from_iterable(combinations(s, r) for r in range(len(s)+1)) + +def pinning_produces_desired_attractor(net,pin,dynamic_pin_states,desired_attractor): + """ + Tests whether a specific pinning (generally dynamic) produces a given + desired attractor. + + pin : List of nodes to be pinned + dynamic_pin_states : Decoded binary states to which nodes are pinned. Should have + shape (# timesteps)x(# of pinned nodes) + desired_attractor : List of encoded states representing the desired attractor + """ + + # find attractors with subset pinned + subset_ls = Landscape(net,pin=pin,dynamic_pin=dynamic_pin_states) + + if len(subset_ls.attractors) == 1: + # is the single attractor we found the one we want? + pinned_attractor = subset_ls.attractors[0] + return attractors_equivalent(pinned_attractor,desired_attractor) + else: + return False + +def _combine_external_and_internal_pin(external_pin,external_pin_states, + internal_pin,internal_pin_state): + """ + Combine a specific (dynamical) pinning of external input nodes with + a specific (static) pinning of a subset of internal nodes. + + external_pin : List or set of pinned nodes considered external + external_pin_states : Decoded binary states to which external nodes are + _dynamically_ pinned. + Should have shape (# timesteps)x(total # of nodes in net) + internal_pin : List or set of pinned nodes considered internal + internal_pin_state : Decoded binary state to which internal nodes are + _statically_ pinned. + Should have shape (total # of nodes in net) + + Returns: + pin : sorted list of pinned nodes + dynamic_pin_states : list with shape (# timesteps)x(total # of pinned nodes) + + Used in module_control_kernel. + """ + # We first pin the external nodes + pin = copy.deepcopy(list(external_pin)) + dynamic_pin_states = copy.deepcopy(external_pin_states) + + # now pin internal nodes with "static" pinning + for node in internal_pin: + pin += [node] + for dynamic_pin_state in dynamic_pin_states: + dynamic_pin_state[node] = internal_pin_state[node] + + dynamic_pin_states = [ [ state[i] for i in sorted(pin) ] for state in dynamic_pin_states ] + + return sorted(pin),dynamic_pin_states + + +# POTENTIAL FOR SPEEDUP: it is possible to have redundant +# cases where the subset nodes have the same +# values in distinct attractors; we could +# potentially save the landscape results and +# reuse them if we find another case where +# the pinning is equivalent. +def module_control_kernel(net,module,input,attractors_given_input, + require_inputs=True,dynamic=False,verbose=False): + """ + Find control kernel for a given module with a given input. The control + kernel is defined (here) with respect to each attractor. It is + the smallest subset of nodes that must be pinned + in order to controllably reach that attractor in the unpinned network. + + Note that the control kernel is not necessarily unique. + + module : List of node indices + input : List of decoded states representing the input attractor + attractors_given_input : List of decoded attractors. Attractors can have + varying lengths. + require_inputs (True) : If True, automatically include input nodes as control kernel + nodes. This can be faster than searching through all + possibilities, and input nodes will always be control kernel + nodes as currently defined. + dynamic (False) : If False, finds "static" control kernel, meaning we + restrict pinning of control kernel nodes to be + constant in time. + If True, finds "dynamic" control kernel, meaning + control kernel nodes can be dynamically pinned. + (dynamic control kernel is not currently supported) + + Returns a list of sets of node indices, one for each attractor. + + See also: + sampled_control_kernel, which includes options for an "iterative" approach + and modifying the "phenotype" + """ + if dynamic: + raise Exception("Dynamic control kernel is not currently supported.") + + if len(attractors_given_input) == 1: + return [set()] + elif len(attractors_given_input) == 0: + raise Exception + + encode = net.state_space().encode + + external_pin = [ n for n in range(net.size) if n not in module ] + + num_attractors = len(attractors_given_input) + + # we'll find a control kernel for each attractor + ck_list = [ -1 for i in range(num_attractors) ] + + if require_inputs: + # automatically include input nodes + required_nodes = tuple([ i for i in tuple(input_nodes(net)) if i in module ]) + else: + required_nodes = () + + # set up generators of distinguishing node sets + # (all distinguishing nodes should be in the current module by + # definition, so only search over these as "possible_nodes") + distinguishing_nodes_gen_list = distinguishing_nodes_from_attractors( + attractors_given_input, + possible_nodes=module, + required_nodes=required_nodes) + + # loop over attractors + for attractor_index,desired_attractor in enumerate(attractors_given_input): + + distinguishing_nodes_gen = distinguishing_nodes_gen_list[attractor_index] + + # only nodes that are constant over the attractor can be control nodes + const_nodes = list( np.where(np.var(desired_attractor,axis=0)==0)[0] ) + potential_control_nodes = [ i for i in const_nodes if i in module ] + + # if the attractor is a cycle, check whether it is controllable + # when pinning _all_ constant nodes + # (this is relatively fast to check and prevents worthless exploration + # when there is no control kernel) + if len(desired_attractor) > 1: + subset = potential_control_nodes + # combine dynamic external input and static internal pinning + # Pin the subset to the first value in the desired attractor. + pin,dynamic_pin_states = _combine_external_and_internal_pin( + external_pin,input, + subset,desired_attractor[0]) + desired_attractor_encoded = [ encode(state) for state in desired_attractor ] + cycle_controllable = pinning_produces_desired_attractor(net, + pin,dynamic_pin_states,desired_attractor_encoded) + if not cycle_controllable: + potential_control_nodes = [] + ck_list[attractor_index] = None + # end cycle control kernel check + + # loop over sets of distinguishing nodes of increasing size + # until we have found a control kernel + while(ck_list[attractor_index] == -1): + subset = distinguishing_nodes_gen.__next__() + if len(subset) > 0: + # combine dynamic external input and static internal pinning + # Pin the subset to the first value in the desired attractor. + pin,dynamic_pin_states = _combine_external_and_internal_pin( + external_pin,input, + subset,desired_attractor[0]) + desired_attractor_encoded = [ encode(state) for state in desired_attractor ] + control_success = pinning_produces_desired_attractor(net, + pin,dynamic_pin_states,desired_attractor_encoded) + if control_success: + ck_list[attractor_index] = subset + # end loop over subsets + # end loop over attractors + + return ck_list + +def modularize(net): + """ + Returns: modules, dag + where `modules' is a list of length equal to the number of modules, with each + element a list of node indices belonging to that module; + and `dag' is a directed acyclic graph in networkx format reperesenting dependencies + among modules. + """ + g = net.to_networkx_graph() + modules = list(nx.strongly_connected_components(g)) + dag = nx.condensation(g) + return modules, dag + +def attractors(net, verbose=False, retall=False, find_control_kernel=False, + encoded=True): + """ + encoded (True) : If True, states in returned attractors are encoded + as integers. + If False, states are not encoded (binary vectors). + """ + if not isinstance(net, Network): + raise TypeError("net must be a network or a networkx DiGraph") + + size = net.size + modules,dag = modularize(net) + dag_list = list(nx.topological_sort(dag)) + + # attractors and control_kernels are indexed by module number + attractors = {} + control_nodes = {} + delta_control_nodes = {} + basin_entropies = {} + + if verbose: print("Modules =",modules) + + decode,encode = net.state_space().decode, net.state_space().encode + decode_attractors = \ + lambda atts: [ np.array([decode(state) for state in att]) for att in atts ] + encode_attractors = \ + lambda atts: [ [ encode(state) for state in att ] for att in atts ] + + ancestors_dict = {} + + for module_number in dag_list: + if verbose: + print + print("Module number",module_number) + parents = greatest_predecessors(dag, module_number) + + ancestors_dict[module_number] = set.union( set.union(*[modules[m] for m in all_ancestors(dag,module_number)]), modules[module_number] ) + + nodes = modules[module_number] + # all nodes not in the module will be pinned + pin = [ n for n in range(size) if n not in nodes ] + if verbose: + if hasattr(net,'names'): + print("nodes in the module:",[net.names[n] for n in nodes]) + else: + print("nodes in the module:",nodes) + + # first, find inputs from ancestors (if any). + # we store these in 'inputs', a list of numpy arrays of decoded states that below + # will be used to (dynamically) set the values of nodes not in the current + # module. + # Each element of inputs is a numpy array with shape (#timesteps)x(net.size) + # (#timesteps may vary across inputs) + if len(parents) == 0: # the module has no ancestors + # dynamically pin all other nodes to zero + inputs = [np.zeros([1,net.size])] + input_cks = [set()] + else: # the module does have ancestors + # want list of each parent's inclusive ancestors + parent_ancestors = [ set.union(*[modules[pm] for pm in all_ancestors(dag,p)]) for p in parents ] + if verbose: + print("parent_ancestors =",parent_ancestors) + + parent_modules = [ modules[p] for p in parents ] + parent_attractors = [ attractors[p] for p in parents ] + if find_control_kernel: + parent_cks = [ control_nodes[p] for p in parents ] + else: + parent_cks = None + if verbose: + print("parent_modules =",parent_modules) + print("parent_attractors =",parent_attractors) + inputs = direct_sum(parent_ancestors, parent_attractors, cksList=parent_cks) + if find_control_kernel: + inputs,input_cks = inputs + if verbose: + print("inputs =",inputs) + if find_control_kernel: + print("input control nodes =",input_cks) + + # now find attractors given each input + attractor_list = [] + control_nodes_list = [] + new_control_nodes_list = [] + basin_entropies_list = [] + # we will add found attractors to a growing list---at the end the + # list will include all attractors of the module over all possible inputs + for input_index,input in enumerate(inputs): + # dynamically pin parent nodes to attractor values + # (input has shape (#timepoints)x(net.size) + dynamic_pin = input[:,pin] + ls = Landscape(net,pin=pin,dynamic_pin=dynamic_pin) + decoded_attractors = decode_attractors(ls.attractors) + attractor_list.extend( decoded_attractors ) + if verbose: + print("input_states =",input_states) + print("pin =",pin) + print("dynamic_pin =",dynamic_pin) + + if find_control_kernel: + # find control nodes of this module given this input. + # new_control_nodes has length equal to that of ls.attractors + new_control_nodes = module_control_kernel(net,nodes,input,decoded_attractors) + new_control_nodes_list.extend(new_control_nodes) + all_control_nodes = [ merge_cks(input_cks[input_index],cks) \ + for cks in new_control_nodes ] # <-- I'm not sure I need to do this + control_nodes_list.extend(all_control_nodes) + basin_entropies_list.extend([ ls.basin_entropy() for att in ls.attractors]) + if verbose: + print("attractors given input =",ls.attractors) + if hasattr(net,'names'): + print("new control nodes =",[ [ net.names[i] for i in ck] for ck in new_control_nodes]) + else: + print("new control nodes =",new_control_nodes) + + attractors[module_number] = attractor_list #remove_duplicates(attractor_list) + control_nodes[module_number] = control_nodes_list + delta_control_nodes[module_number] = new_control_nodes_list + basin_entropies[module_number] = basin_entropies_list + + leaf_module_indices = leaves(dag) + leaf_ancestors = [ ancestors_dict[m] for m in leaf_module_indices ] + leaf_attractors = [ attractors[m] for m in leaf_module_indices ] + if find_control_kernel: + leaf_control_nodes = [ control_nodes[m] for m in leaf_module_indices ] + else: + leaf_control_nodes = None + a = direct_sum(leaf_ancestors, leaf_attractors, cksList=leaf_control_nodes) + if find_control_kernel: + a,control_kernel_list = a + + if encoded: + a = encode_attractors(a) + + if retall or find_control_kernel: + outdict = {'attractors':attractors, + 'ancestors_dict':ancestors_dict, + 'modules':modules, + } + if find_control_kernel: + outdict.update({'control_kernels':control_kernel_list, + 'basin_entropies':basin_entropies, + 'delta_control_nodes':delta_control_nodes}) + return a,outdict + else: + return a + + + +# 5.21.2019 sampling stuff below ---------------------------------------------- + +# see also https://stackoverflow.com/questions/2150108/efficient-way-to-shift-a-list-in-python +def rotate(lst, n): + return lst[n:] + lst[:n] + +def random_state(n): + return np.random.randint(0,2,n) + +def set_pin_state(state,pin,pin_state): + for pin_idx,pin_val in zip(pin,pin_state): + state[pin_idx] = pin_val + return state + +def attractor_from_initial_state(net,state,pin=[],pin_state=[]): + encode = net.state_space().encode + decode = net.state_space().decode + if len(pin) > 0: + state = set_pin_state(state,pin,pin_state) + state_list = [encode(state),] + new_state = -1 + while new_state not in state_list[:-1]: + decoded_new_state = net.update(decode(state_list[-1])) + if len(pin) > 0: + decoded_new_state = set_pin_state(decoded_new_state,pin,pin_state) + new_state = encode(decoded_new_state) + state_list.append(new_state) + #print state_list + att = state_list[state_list.index(new_state)+1:] + # ********* DEBUG + #assert(len(pin)==len(pin_state)) + #consistent = np.all([ decode(att[0])[i] == j for i,j in zip(pin,pin_state) ]) + #if not consistent: + # print("pin = {}".format(pin)) + # print("pin_state = {}".format(pin_state)) + # print("att = {}".format(att)) + # print("decode(att[0]) = {}".format(decode(att[0]))) + # assert(consistent) + # ********** END DEBUG + return att + +def input_nodes(net): + """ + Returns a list of node indices corresponding to input nodes + (those that have in-degree 1 consisting of a single self-loop.) + """ + return np.where([ len(net.neighbors_in(i))==1 and (i in net.neighbors_in(i)) for i in range(net.size) ])[0] + +def phenotype_projection(attractor,hidden_indices,net): + """ + Project encoded state + """ + encode,decode = net.state_space().encode,net.state_space().decode + # decode + att_decoded = [ decode(state) for state in attractor ] + # project + att_proj_decoded = copy.deepcopy(att_decoded) + for idx in hidden_indices: + for state in att_proj_decoded: + state[idx] = 0 + # encode + att_proj = [ encode(state) for state in att_proj_decoded ] + return att_proj + +def _standardize_attractor(att): + """ + Return the standardized form of an encoded attractor. + + The standardized form begins in the state with the smallest encoded value. + """ + list_att = list(att) + # rotate att to standard form + att_ID = attractor_ID(list_att) + att_ID_index = list_att.index(att_ID) + rotated_att = rotate(list_att,att_ID_index) + return rotated_att + +def attractors_equivalent(att1,att2): + """ + Determine whether two encoded attractors are equivalent. + + Cyclic attractors are considered equivalent if they contain the same states in the + same order. For instance, [2,3,1] is equivalent to [1,2,3] (but not [3,2,1]). + """ + if len(att1) != len(att2): + return False + else: + att1standard = _standardize_attractor(att1) + att2standard = _standardize_attractor(att2) + for i in range(len(att1)): + if att1standard[i] != att2standard[i]: + return False + return True + +def atts_and_cks_equivalent(atts1,cks1,atts2,cks2,ck_size_only=False): + """ + Check whether two sets of attractors and corresponding control kernels + are equivalent. + + (Attractor,control kernel) pairs can be listed in any order. + + ck_size_only (False) : If True, only the size of the control kernel must + match for each attractor. + If False, the exact nodes of the control kernel must + match for each attractor. + """ + if (len(atts1) != len(cks1)) or (len(atts2) != len(cks2)): + raise ValueError + if len(atts1) != len(atts2): + return False + else: + atts2IDs = [attractor_ID(att2) for att2 in atts2] + for att1,ck1 in zip(atts1,cks1): + att1ID = attractor_ID(att1) + # check that attractor with same ID exists + if att1ID not in atts2IDs: + return False + idx2 = atts2IDs.index(att1ID) + # check that attractors are equivalent + if not attractors_equivalent(att1,atts2[idx2]): + return False + if ck_size_only: + # check that size of control kernels are equal + if ((ck1 == None) and (cks2[idx2] != None)) or \ + ((ck1 != None) and (cks2[idx2] == None)): + return False + elif (ck1 == None) and (cks2[idx2] == None): + pass + elif len(ck1) != len(cks2[idx2]): + return False + else: + # check that control kernels are equivalent + if ck1 != cks2[idx2]: + return False + return True + +def sampled_attractors(net,numsamples=10000,seed=123,pin=[],pin_state=[],phenotype_list=None, + return_unprojected=False,return_counts=False,desired_attractor=None): + """ + Return unique attractors found via sampling. Attractor states are defined by "phenotype" + nodes. + + Values of hidden nodes in returned attractors are set to zero. To get sampled + values of hidden nodes, use 'return_unprojected'. + + pin ([]) : List of indices of pinned nodes + pin_state ([]) : List of values for pinned nodes (same length as 'pin') + phenotype_list (None) : List of indices of visible nodes. If None, default + to all nodes. + return_unprojected (False): If True, also return dictionary mapping encoded projected + states to sets of all seen decoded unprojected states. + return_counts (False) : If True, also return list of counts giving the number of + times each attractor was seen. + desired_attractor (None) : If given an attractor (a set or list of encoded states), + instead of returning all attractors, the function will return + True if the network has a single attractor phenotype that + matches the given attractor on phenotype nodes, and False + otherwise (in which case the run time can be much faster than + finding all attractors). + + (Note: I'm not sure if this is all consistent when there are nontrivial dynamics + in hidden nodes---e.g. it's possible that fixed point phenotypes could result from + limit cycles in hidden states) + """ + # 5.21.2019 TO DO: Encapsulate "rotate attractor to standard form" + + if len(pin) != len(pin_state): + raise Exception + if type(pin) != list or type(pin_state) != list: + raise TypeError("pin and pin_state must be lists") + + # deal with phenotype + if phenotype_list is None: + phenotype_list = range(net.size) + else: + # check phenotype_list is consistent + for idx in phenotype_list: + if idx not in range(net.size): + raise Exception("phenotype_list not consistent with given net") + non_phenotype_list = [ i for i in range(net.size) if i not in phenotype_list ] + + # standardize desired_attractor if given + if desired_attractor is not None: + # project desired_attractor to given phenotype + desired_attractor = phenotype_projection(desired_attractor,non_phenotype_list,net) + # rotate desired_attractor to standard form + att_ID = attractor_ID(list(desired_attractor)) + att_ID_index = desired_attractor.index(att_ID) + desired_attractor = rotate(list(desired_attractor),att_ID_index) + + encode,decode = net.state_space().encode,net.state_space().decode + np.random.seed(seed) + all_attractors,all_attractors_IDs = [],[] + if return_unprojected: unprojected_dict = {} + for i in range(numsamples): + att = attractor_from_initial_state(net,random_state(net.size),pin=pin,pin_state=pin_state) + #all_attractors.append( att ) + + # do phenotype projection: set all non-phenotype nodes to a constant (0) + att_proj = phenotype_projection(att,non_phenotype_list,net) + att_ID = attractor_ID(att_proj) # use minimum projected state as ID + + if desired_attractor is None: + all_attractors_IDs.append( att_ID ) + all_attractors.append( att_proj ) + else: # just determine whether all sampled initial states go to the desired attractor + # rotate attractor so that ID state is first + att_ID_index = att_proj.index(att_ID) + att_proj = rotate(att_proj,att_ID_index) + if desired_attractor != att_proj: + return False + + if return_unprojected: + att_decoded = [ decode(state) for state in att ] + # rotate attractor so that ID state is first + att_ID_index = att_proj.index(att_ID) + att_decoded = rotate(att_decoded,att_ID_index) + # convert to tuple form + tuple_att_decoded = tuple([ tuple(state) for state in att_decoded]) + + # add attractor to dictionary mapping ID to rotated, decoded, unprojected states + if att_ID in unprojected_dict: + unprojected_dict[att_ID].add( tuple_att_decoded ) + else: + unprojected_dict[att_ID] = set([ tuple_att_decoded ]) + + if desired_attractor is not None: + # if we've made it here, we've successfully tested all starting points + return True + unique_IDs,unique_attractor_indices,unique_counts = \ + np.unique(all_attractors_IDs,return_index=True,return_counts=True) + unique_atts = [ all_attractors[idx] for idx in unique_attractor_indices ] + + if return_unprojected and return_counts: + return unique_atts,unprojected_dict,unique_counts + elif return_unprojected: + return unique_atts,unprojected_dict + elif return_counts: + return unique_atts,unique_counts + else: + return unique_atts + #unique_IDs = np.unique(all_attractors_IDs) + #return unique_IDs + +def constant_nodes(att,net=None): + """ + Return indices of nodes that are constant over time in the given attractor. + + net (None) : If paseed a network, states in the attractor will be + decoded using net.state_space().decode. + If None, attractor is assumed to be decoded. + """ + if net is not None: + decode = net.state_space().decode + decoded_att = np.array([ decode(state) for state in att ]) + else: + decoded_att = att + return list( np.where(np.var(decoded_att,axis=0)==0)[0] ) + +def attractor_ID(att): + """ + Takes a single encoded attractor. + """ + return min(att) + +# **************************************************************************************************************** +# NOTE: This function is very similar to modularity.module_control_kernel, and should eventually be merged with it +# **************************************************************************************************************** +def sampled_control_kernel(net,numsamples=10000,phenotype='all',dynamic=False,seed=123, + verbose=False,phenotype_only=False,require_inputs=False, + iterative=False,iterative_rounds_out=True, + _iterative_pin=[],_iterative_desired_att=None,_iterative_rounds=[]): + """ + phenotype ('all') : A list of node indices defining the phenotype, or a string: + 'internal' -> all non-input nodes + 'all' -> all nodes + phenotype_only (False) : If True, restrict control kernel nodes to only those nodes + defining the phenotype. If False, allow any node to be in the + control kernel. + require_inputs (False) : If True, always include input nodes as control kernel nodes. + This can be faster than searching through all possibilities, + and input nodes will always be control kernel nodes in the + limit of large numsamples. + iterative (False) : If True, use iterative procedure that starts by pinning a + minimal set of distinguishing nodes, then pinning more + distinguishing nodes if additional attractors remain. + This is not guaranteed to find a minimally-sized control + set in general. This mode returns both the list of + controlling sets and a list of the number of iterative + rounds required for each attractor. + If False, use the default method that pins distinguishing sets + of increasing size in the original network. + iterative_rounds_out (True) : If True and iterative=True, also return a list of + list of the number of attractors remaining at each + iterative step for each attractor. + _iterative_pin ([]) : (Used internally when iterative = True) + _iterative_desired_att (None): (Used internally when iterative = True) + _iterative_rounds ([]) : (Used internally when iterative = True) + """ + decode,encode = net.state_space().decode,net.state_space().encode + + # set phenotype as list of node indices + if np.isreal(phenotype[0]): + phenotype_nodes = phenotype + elif phenotype == 'internal': + phenotype_nodes = [ i for i in range(net.size) if i not in input_nodes(net) ] + elif phenotype == 'all': + phenotype_nodes = list(range(net.size)) + else: + raise Exception("Unrecognized phenotype") + + if iterative: + if phenotype_nodes != list(range(net.size)): + raise Exception("iterative = True not yet supported with hidden nodes") + + # set up _iterative_pin_state + _iterative_pin = sorted(_iterative_pin) + if iterative and _iterative_desired_att is not None: + _iterative_pin_state = [ decode(_iterative_desired_att[0])[j] for j in _iterative_pin ] + else: + _iterative_pin_state = [] + + # find original attractors + original_sampled_attractors,full_attractor_dict = sampled_attractors(net, + numsamples=numsamples, + seed=seed,phenotype_list=phenotype_nodes, + return_unprojected=True, + pin=_iterative_pin,pin_state=_iterative_pin_state) + original_sampled_attractor_IDs = [ attractor_ID(att) for att in original_sampled_attractors ] + if _iterative_desired_att is not None: + if attractor_ID(_iterative_desired_att) not in original_sampled_attractor_IDs: + print("sampled_control_kernel WARNING: _iterative_desired_att was not found in the pinned sampled attractors. We will add it manually in order to continue with the calculation. You may need to increase numsamples for consistent results.") + original_sampled_attractors.append(_iterative_desired_att) + full_attractor_dict[attractor_ID(_iterative_desired_att)] = [[decode(state) for state in _iterative_desired_att]] + original_sampled_attractors_decoded = \ + [ [ decode(state) for state in att ] for att in original_sampled_attractors ] + if verbose and not (iterative and _iterative_desired_att is not None): + print("sampled_control_kernel: number of attractors = {}".format( + len(original_sampled_attractors))) + + # we'll find a control kernel for each attractor + ck_list = [ -1 for i in range(len(original_sampled_attractors)) ] + # we'll also keep track of how many iterative rounds we need in the iterative case + iterative_rounds_list = [ -1 for i in range(len(original_sampled_attractors)) ] + + # define the set of potential control nodes + if phenotype_only: + possible_nodes = phenotype_nodes + else: + # we order potential nodes to check hidden ones [currently external] first + # so they are preferred + hidden_nodes = [ i for i in range(net.size) if i not in phenotype_nodes ] + possible_nodes = hidden_nodes + phenotype_nodes + + if require_inputs: + # automatically include input nodes + required_nodes = tuple(input_nodes(net)) + else: + required_nodes = () + + # set up generators of distinguishing node sets + distinguishing_nodes_gen_list = distinguishing_nodes_from_attractors( + original_sampled_attractors_decoded, + possible_nodes=possible_nodes, + required_nodes=required_nodes) + + # loop over phenotype attractors + if iterative and _iterative_desired_att is not None: + assert(attractor_ID(_iterative_desired_att) in original_sampled_attractor_IDs) + attractor_IDs_to_analyze = [ attractor_ID(_iterative_desired_att) ] + else: + attractor_IDs_to_analyze = [ attractor_ID(att) for att in original_sampled_attractors ] + for attractor_index in range(len(original_sampled_attractors)): + maxSizeTested = -1 + + desired_attractor = original_sampled_attractors[attractor_index] + if attractor_ID(desired_attractor) in attractor_IDs_to_analyze: + distinguishing_nodes_gen = distinguishing_nodes_gen_list[attractor_index] + + full_attractor_list = list( full_attractor_dict[attractor_ID(desired_attractor)] ) + + # if the attractor is a cycle, check whether it is controllable + # when pinning _all_ constant nodes (this is relatively fast to check + # and prevents worthless exploration when there is no control kernel) + if len(desired_attractor) > 1: + full_attractor_index = 0 + cycle_controllable = False + # only nodes that are constant over the attractor can be control nodes + possible_control_nodes = \ + [ i for i in constant_nodes(desired_attractor,net) if i in possible_nodes ] + while(not cycle_controllable and full_attractor_index < len(full_attractor_list)): + # get full attractor that will be tried for pinning + desired_full_attractor = full_attractor_list[full_attractor_index] + subset = possible_control_nodes + + # for now there are no external nodes here + # (only nodes that we pinned already in previous iterations when iterative=True) + external_pin = _iterative_pin #[] + external_pin_states = [list(desired_full_attractor[0])] + pin,dynamic_pin_states = _combine_external_and_internal_pin( + external_pin,external_pin_states, + subset,desired_full_attractor[0]) + # for now we only have static pinning + static_pin_state = dynamic_pin_states[0] + + cycle_controllable = sampled_attractors(net,numsamples=numsamples, + seed=seed,phenotype_list=phenotype_nodes, + pin=pin,pin_state=static_pin_state, + desired_attractor=desired_attractor) + full_attractor_index += 1 + if not cycle_controllable: + potential_control_nodes = [] + ck_list[attractor_index] = None + iterative_rounds_list[attractor_index] = 0 + # end cycle control check + + if verbose: + print(" Searching for control of attractor ID",attractor_ID(desired_attractor)) + + if iterative: + subset_list,num_atts_pinned_list = [],[] + + # loop over sets of distinguishing nodes of increasing size + while(ck_list[attractor_index] == -1): + subset = distinguishing_nodes_gen.__next__() + + if len(subset) > maxSizeTested: + maxSizeTested = len(subset) + if iterative and len(subset_list) > 0: + # then we've checked all distinguishing node sets of minimal size + # and we haven't yet found a control kernel, so we need to iterate + best_dn_set_index = np.argmin(num_atts_pinned_list) + iterative_pin = set(_iterative_pin).union(subset_list[best_dn_set_index]) + num_remaining = num_atts_pinned_list[best_dn_set_index] + if verbose: + print(" Minimum pinned number of attractors = {}. Iterating after pinning {}...".format(num_remaining,iterative_pin)) + pinned_ck,rounds = sampled_control_kernel(net, + numsamples=numsamples, + phenotype=phenotype, + dynamic=dynamic, + seed=seed, + verbose=verbose, + phenotype_only=phenotype_only, + require_inputs=require_inputs, + iterative=True, + _iterative_pin=iterative_pin, + _iterative_desired_att=desired_attractor, + _iterative_rounds=_iterative_rounds+[num_remaining,]) + assert(len(pinned_ck)==1) + assert(len(rounds)==1) + ck_list[attractor_index] = set.union(iterative_pin,pinned_ck[0]) + iterative_rounds_list[attractor_index] = rounds[0] + elif verbose: + print(" Testing distinguishing node sets of size",maxSizeTested) + if verbose and (ck_list[attractor_index] == -1): + print(" Trying pinning {}".format(subset)) + full_attractor_index = 0 + # have we found the control kernel yet for this attractor? + # XXX BCD 8.9.2019 looking back at this code now---is there a possibility that one + # of the settings of hidden nodes (corresponding to one of the + # 'desired_full_attractor's) has a smaller control kernel, but we + # don't keep looking for it once we've found a control kernel for + # one of the settings of hidden nodes? + while(ck_list[attractor_index] == -1 and full_attractor_index < len(full_attractor_list)): + # get full attractor that will be tried for pinning + desired_full_attractor = full_attractor_list[full_attractor_index] + + # for now there are no external nodes here + # (only nodes that we pinned already in previous iterations when iterative=True) + external_pin = _iterative_pin #[] + external_pin_states = [list(desired_full_attractor[0])] + pin,dynamic_pin_states = _combine_external_and_internal_pin( + external_pin,external_pin_states, + subset,desired_full_attractor[0]) + # for now we only have static pinning + static_pin_state = dynamic_pin_states[0] + + if iterative: # we need to count the number of attractors + atts_pinned = sampled_attractors(net,numsamples=numsamples, + seed=seed,phenotype_list=phenotype_nodes, + pin=pin,pin_state=static_pin_state) + num_atts_pinned = len(atts_pinned) + if num_atts_pinned == 1: + control_success = True + else: + control_success = False + # keep track of the number of attractors we find + # for each distinguishing node set of minimum size + subset_list.append(subset) + num_atts_pinned_list.append(num_atts_pinned) + + else: # 'guaranteed' method: we can simply stop if we find more than one attractor + control_success = sampled_attractors(net,numsamples=numsamples, + seed=seed,phenotype_list=phenotype_nodes, + pin=pin,pin_state=static_pin_state, + desired_attractor=desired_attractor) + if control_success: + if verbose: + print(" Successful control of attractor ID", + attractor_ID(desired_attractor),"with nodes",subset) + ck_list[attractor_index] = set(subset) + iterative_rounds_list[attractor_index] = _iterative_rounds + [1,] + + # move to next full_attractor given this phenotype + full_attractor_index += 1 + # end loop over full_attractors + # end loop over subsets + # end loop over phenotype attractors + + # remove any remaining ck placeholders for attractors we didn't analyze + # (in the 'iterative' option there are cases when we only analyze a single attractor) + ck_list_reduced = [ ck for ck in ck_list if ck != -1 ] + iterative_rounds_list_reduced = [ r for r in iterative_rounds_list if r != -1 ] + assert(len(ck_list_reduced) == len(attractor_IDs_to_analyze)) + assert(len(iterative_rounds_list_reduced) == len(attractor_IDs_to_analyze)) + + if iterative and iterative_rounds_out: + return ck_list_reduced,iterative_rounds_list_reduced + else: + return ck_list_reduced + + +# 5.28.2019 branched from neet.synchronous.basin_entropy +def sampled_basin_entropy(net,numsamples=10000,seed=123,pin=[],pin_state=[], + phenotype_list=None,base=2.0): + """ + Estimate the basin entropy of the landscape [Krawitz2007]_. + + :param base: the base of the logarithm + :type base: a number or ``None`` + :return: the estimated basin entropy of the landscape of type ``float`` + """ + _,basin_sizes = sampled_attractors(net,numsamples=numsamples,seed=seed, + pin=pin,pin_state=pin_state, + phenotype_list=phenotype_list, + return_counts=True) + dist = pi.Dist(basin_sizes) + return pi.shannon.entropy(dist, b=base) + + +# 8.9.2019 +def sampled_distinguishing_nodes(net,numsamples=10000,phenotype='all', + seed=123,verbose=False,**kwargs): + decode,encode = net.state_space().decode,net.state_space().encode + + # set phenotype as list of node indices + if np.isreal(phenotype[0]): + phenotype_nodes = phenotype + elif phenotype == 'internal': + phenotype_nodes = [ i for i in range(net.size) if i not in input_nodes(net) ] + elif phenotype == 'all': + phenotype_nodes = list(range(net.size)) + else: + raise Exception("Unrecognized phenotype") + + original_sampled_attractors = sampled_attractors(net,numsamples=numsamples, + seed=seed,phenotype_list=phenotype_nodes) + original_sampled_attractors_decoded = \ + [ [ decode(state) for state in att ] for att in original_sampled_attractors ] + if verbose: + print("sampled_distinguishing_nodes: original attractors:",original_sampled_attractors) + + return distinguishing_nodes_from_attractors(original_sampled_attractors_decoded, + **kwargs) + +def distinguishing_nodes_from_attractors(attractors,possible_nodes=None,required_nodes=()): + """ + Given a list of decoded attractors, return for each attractor a generator + that produces sets of distinguishing nodes of increasing size. + + We require distinguishing nodes here to be constant. + + required_nodes (()) : A tuple of node indices that will always be + included as distinguishing nodes. + """ + + dn_list = [] + + attractors_mean = np.array( [ np.mean(att,axis=0) for att in attractors ] ) + + for attractor_index,desired_attractor in enumerate(attractors): + dn_gen = distinguishing_nodes_from_attractor(attractors,attractor_index, + possible_nodes,required_nodes=required_nodes, + attractors_mean=attractors_mean) + dn_list.append(dn_gen) + + return dn_list + +def distinguishing_nodes_from_attractor(attractors,attractor_index,possible_nodes=None, + required_nodes=(),attractors_mean=None,sets_to_test_first=[]): + """ + Given a list of decoded attractors and index of a particular attractor, return + a distinguishing nodes generator for that attractor, which produces sets of + distinguishing nodes of increasing size. + + We require distinguishing nodes here to be constant. + + required_nodes (()) : A tuple of node indices that will always be + included as distinguishing nodes. + attractors_mean (None) : To increase speed when running for a large number + of individual attractor_index values and the same + attractors, optionally pass + attractors_mean = + np.array( [ np.mean(att,axis=0) for att in attractors ] ) + sets_to_test_first ([]) : If given, test the given sets _before_ looping over + sets of increasing size. Note this will not generally + produce distinguishing node sets of minimal size. + Any required_nodes are not forced to be included + for these sets. + """ + try: + N = len(attractors[0][0]) + except TypeError: + raise TypeError("Unrecognized form of attractors list") + if type(required_nodes) is not tuple: + raise TypeError("required_nodes must be a tuple") + if possible_nodes is None: + possible_nodes = range(N) + + # 8.9.2019 take mean over states in each attractor. then values that are not 0 or 1 + # are not constant over the attractor + if attractors_mean is None: + attractors_mean = np.array( [ np.mean(att,axis=0) for att in attractors ] ) + + desired_attractor = attractors[attractor_index] + + # find which visible nodes are constant for this attractor + # (and thus potential distinguishing nodes) + constant_possible_nodes = [ i for i in constant_nodes(desired_attractor) \ + if i in possible_nodes ] + desired_attractor_mean = np.mean(desired_attractor,axis=0) + + # don't include required nodes (these will automatically be included below) + constant_possible_nodes = [ n for n in constant_possible_nodes if (n not in required_nodes) ] + + # potential future speedup here if we intelligently deal with nodes that + # are constant over the attractors. note that we still need to include these + # as potential distinguishing nodes, but they will never constitute + # distinguishing node sets by themselves. + #constant_across_attractors = constant_nodes([ att[0] for att in attractors ]) + + for subset in sets_to_test_first: + subset_with_required = subset # + required_nodes + # determine whether this set is a distinguishing set + distinguished_atts_union = np.array([]) + for subset_node in subset_with_required: + distinguished_atts = np.where(attractors_mean[:,subset_node] != \ + desired_attractor_mean[subset_node])[0] + distinguished_atts_union = \ + np.union1d(distinguished_atts_union,distinguished_atts) + if len(distinguished_atts_union) == len(attractors) - 1: + yield set(subset_with_required) + + # loop over subsets of increasing size + sets_to_test = powerset( constant_possible_nodes ) + for subset in sets_to_test: + subset_with_required = subset + required_nodes + # determine whether this set is a distinguishing set + distinguished_atts_union = np.array([]) + for subset_node in subset_with_required: + distinguished_atts = np.where(attractors_mean[:,subset_node] != \ + desired_attractor_mean[subset_node])[0] + distinguished_atts_union = \ + np.union1d(distinguished_atts_union,distinguished_atts) + if len(distinguished_atts_union) == len(attractors) - 1: + yield set(subset_with_required) + diff --git a/neet/boolean/randomnet.py b/neet/boolean/randomnet.py new file mode 100644 index 00000000..d0863260 --- /dev/null +++ b/neet/boolean/randomnet.py @@ -0,0 +1,500 @@ +""" +.. currentmodule:: neet.boolean.randomnet + +.. testsetup:: randomnet + + import numpy as np + from neet.boolean.examples import * + from neet.boolean.randomnet import * + def neighbors_in(net): + return [net.neighbors_in(node) for node in range(net.size)] + def neighbors_out(net): + return [net.neighbors_out(node) for node in range(net.size)] + def in_degree(net): + return [len(net.neighbors_in(node)) for node in range(net.size)] + def out_degree(net): + return [len(net.neighbors_out(node)) for node in range(net.size)] + def mean_degree(net): + return np.mean(out_degree(net)) + +Random networks +=============== +""" +import random +import numpy as np +from neet.sensitivity import canalizing_nodes +from .logicnetwork import LogicNetwork +from .wtnetwork import WTNetwork +import networkx as nx + +def RBN_Kim(network_size,avg_deg,prob_inh,type=1,seed=None): + """ + Create a random network designed to mimic those in Kim et al. 2013. + + network_size : Number of nodes + avg_deg : Average degree of nodes (used to set + p = avg_deg / network_size) + prob_inh : Probability that a given edge is inhibitory + """ + if type != 1: + raise NotImplementedError + + p = avg_deg / network_size + g = nx.gnp_random_graph(network_size,p,seed=seed,directed=True) + adj = nx.to_numpy_array(g) + + # change to inhibitory with probability prob_inh + # (probably a terribly inefficient way of doing this) + if seed is not None: + np.random.seed(seed+1) + for i in range(network_size): + for j in range(network_size): + if adj[i,j] == 1: + if np.random.random() < prob_inh: + adj[i,j] = -1 + + return WTNetwork(adj) + + +def RBN(network_size, k, p, self_loops=False): + """ + Given a network with $n$ nodes, the dynamic state of each node $i$ + at time $t+1$ depends on $k$ randomly selected input nodes + $i_1,\dots,i_k$ at time $t$. + + There are $2^k$ possible configurations of + $[ x_{i_1}(t),\dots,x_{i_k}(t) ]$, and each one of them + coresponds to $x_i(t+1)=1$ with probability $p$. + """ + connectivity,bias = k,p + new_table = [] + for output_label in range(network_size): + one_states = [] + inputs = _RBN_input_labels(output_label, network_size, connectivity, + self_loops=self_loops) + inputs.sort() + pattern = _RBN_output_pattern(connectivity,bias) + for j in range(len(pattern)): + if pattern[j]==1: + one_states.append(list(_label_to_state(j,connectivity))) + new_table.append([inputs,one_states]) + return LogicNetwork(new_table) + +def _RBN_input_labels(output_label, network_size, connectivity, self_loops=False): + labels = range(network_size) + if not self_loops: + labels.remove(output_label) + random.shuffle(labels) + return labels[:connectivity] + +def _RBN_output_pattern(connectivity,bias): + pattern = [] + for i in range(pow(2,connectivity)): + if random.uniform(0,1) < bias: + pattern.append(1) + else: + pattern.append(0) + return pattern + +def _label_to_state(label, digits): + return np.array(map(int,list(format(label,'0'+str(digits)+'b')))) + +def random_logic(logic_net, p=0.5, connections='fixed-structure', + fix_external=False, make_irreducible=False, + fix_canalizing=False): + """ + Return a ``LogicNetwork`` from an input ``LogicNetwork`` with a random + logic table. + + ``connections`` decides how a node in the random network is connected to + other nodes: + + ``'fixed-structure'`` + the random network has the same connections as the input network. + + ``'fixed-in-degree'`` + the number of connections to a node is the same as the input + network, but the connections are randomly selected. + + ``'fixed-mean-degree'`` + the total number of edges is conserved, but edges are placed + randomly between nodes. + + ``'free'`` + only the number of nodes is conserved, with the number of + connections to a node chosen uniformly between 1 and the total + number of nodes. + + ``p`` is the probability of a state of the connected nodes being present in + the activation table. It is also equavolent to the probability of any node + being activated. If ``p`` is a single number, it applies to all nodes. + Otherwise ``p`` must be a sequence of numbers that match in size with the + input network. + + .. doctest:: randomnet + + >>> net = random_logic(myeloid, connections='fixed-structure') + >>> neighbors_in(net) == neighbors_in(myeloid) + True + >>> neighbors_out(net) == neighbors_out(myeloid) + True + + .. doctest:: randomnet + + >>> net = random_logic(myeloid, connections='fixed-in-degree') + >>> in_degree(net) == in_degree(myeloid) + True + >>> out_degree(net) == out_degree(myeloid) + False + + .. doctest:: randomnet + + >>> net = random_logic(myeloid, connections='fixed-mean-degree') + >>> mean_degree(net) == mean_degree(myeloid) + True + + :param logic_net: a :class:`neet.boolean.LogicNetwork` + :param p: probability that a state is present in the activation table + :type p: number or sequence + :param connections: ``'fixed-structure'``, ``'fixed-in-degree'``, + ``'fixed-mean-degree'``, or ``'free'`` + :type connections: str + :returns: a random :class:`neet.boolean.LogicNetwork` + """ + if not isinstance(logic_net, LogicNetwork): + raise ValueError('object must be a LogicNetwork') + + if isinstance(p, (int, float)): + ps = [p] * logic_net.size + elif len(p) != logic_net.size: + raise ValueError("p's length must match with network size") + else: + ps = p + + random_styles = {'fixed-structure': _random_logic_fixed_connections, + 'fixed-in-degree': _random_logic_shuffled_connections, + 'fixed-mean-degree': _random_logic_fixed_num_edges, + 'free': _random_logic_free_connections} + + try: + return random_styles[connections](logic_net, ps, fix_external, + make_irreducible, fix_canalizing) + except KeyError: + msg = "connections must be 'fixed', 'fixed-in-degree'," \ + "'fixed-mean-degree', or 'free'" + raise ValueError(msg) + + +def random_binary_states(k, p): + """ + Return a set of binary states. Each state has length `k` and the number + of states is `k * p` (or chosen to produce `k * p` on average if `n * p` + is not an integer). + """ + integer, decimal = divmod(2**k * p, 1) + num_states = int(integer + np.random.choice(2, p=[1 - decimal, decimal])) + state_idxs = np.random.choice(2 ** k, num_states, replace=False) + + return set('{0:0{1}b}'.format(idx, k) for idx in state_idxs) + + +def random_canalizing_binary_states(k, p): + """ + Return a set of binary states that, when considered as a set of + activating conditions, represents a canalizing function. + + Designed to sample each possible canalized function with equal + probability. + + Each state has length `k` and the number of states is set in the + same way as `random_binary_states`. + """ + integer, decimal = divmod(2**k * p, 1) + num_states = int(integer + np.random.choice(2, p=[1 - decimal, decimal])) + + # calculate values specifying which input is canalizing and how + canalizing_input = np.random.choice(k) + canalizing_value = np.random.choice(2) + if num_states > 2**(k - 1): + canalized_value = 1 + elif num_states < 2**(k - 1): + canalized_value = 0 + elif num_states == 2**(k - 1): + canalized_value = np.random.choice(2) + + fixed_states = _all_states_with_one_node_fixed( + k, canalizing_input, canalizing_value) + other_states = np.lib.arraysetops.setxor1d(np.arange(2**k), + fixed_states, + assume_unique=True) + if canalized_value == 1: + # include all fixed_states as activating conditions + state_idxs = np.random.choice(other_states, + num_states - len(fixed_states), + replace=False) + state_idxs = np.concatenate((state_idxs, np.array(fixed_states))) + elif canalized_value == 0: + # include none of fixed_states as activating conditions + state_idxs = np.random.choice(other_states, num_states, replace=False) + + return set('{0:0{1}b}'.format(idx, k) for idx in state_idxs) + + +def _all_states_with_one_node_fixed(k, fixed_index, fixed_value, max_k=20): + """ + (Should have length 2**(k-1).) + """ + if k > max_k: + raise Exception("k > max_k") + # there may be a more efficient way to do this... + return [idx for idx in range(2**k) + if '{0:0{1}b}'.format(idx, k)[fixed_index] == str(fixed_value)] + + +def _external_nodes(logic_net): + externals = set() + for idx, row in enumerate(logic_net.table): + if row[0] == (idx, ) and row[1] == {'1'}: + externals.add(idx) + return externals + +# stolen from grn-survey.generate_variants + + +def _fake_connections(net): + fakes = [] + for idx in range(net.size): + for neighbor_in in net.neighbors_in(idx): + if not net.is_dependent(idx, neighbor_in): + fakes.append((idx, neighbor_in)) + return fakes + + +def _logic_table_row_is_irreducible(row, i, size): + table = [((), set()) for j in range(size)] + table[i] = row + net = LogicNetwork(table) + return len(_fake_connections(net)) == 0 + + +def _logic_table_row_is_canalizing(row, i, size): + table = [((), set()) for j in range(size)] + table[i] = row + net = LogicNetwork(table) + return i in canalizing_nodes(net) + + +def _random_logic_fixed_connections(logic_net, ps, fix_external=False, + make_irreducible=False, + fix_canalizing=False, + give_up_number=1000): + """ + Return a `LogicNetwork` from an input `LogicNetwork` with a random logic + table. + + Connections in the returned network are the same as those of the input. + + :param logic_net: a :class:LogicNetwork + :param ps: probability that a state is present in the activation table + :returns: a random :class:LogicNetwork + """ + if not isinstance(logic_net, LogicNetwork): + raise ValueError('object must be a LogicNetwork') + + externals = _external_nodes(logic_net) + + new_table = [] + for i, row in enumerate(logic_net.table): + indices = row[0] + if i in externals: + conditions = row[1] + else: + if fix_canalizing: + original_canalizing = _logic_table_row_is_canalizing( + row, i, logic_net.size) + keep_trying = True + number_tried = 0 + while keep_trying and (number_tried < give_up_number): + if fix_canalizing and original_canalizing: + conditions = random_canalizing_binary_states( + len(indices), ps[i]) + else: + conditions = random_binary_states(len(indices), ps[i]) + + number_tried += 1 + keep_trying = False + if make_irreducible: + node_irreducible = _logic_table_row_is_irreducible( + (indices, conditions), i, logic_net.size) + keep_trying = not node_irreducible + if (not keep_trying) and fix_canalizing: + node_canalizing = _logic_table_row_is_canalizing( + (indices, conditions), i, logic_net.size) + keep_trying = not (node_canalizing == original_canalizing) + if number_tried >= give_up_number: + msg = "No function out of " + str(give_up_number) + " tried" \ + " satisfied constraints" + raise Exception(msg) + + new_table.append((indices, conditions)) + + return LogicNetwork(new_table, logic_net.names) + + +def _random_logic_shuffled_connections(logic_net, ps, fix_external=False, + make_irreducible=False, + fix_canalizing=False, + give_up_number=1000): + """ + Return a `LogicNetwork` from an input `LogicNetwork` with a random logic + table. + + The number of connections to a node is the same as the input network, but + the connections are randomly selected. + + :param logic_net: a :class:LogicNetwork + :param p: probability that a state is present in the activation table + :returns: a random :class:LogicNetwork + """ + if not isinstance(logic_net, LogicNetwork): + raise ValueError('object must be a LogicNetwork') + + externals = _external_nodes(logic_net) if fix_external else set() + + new_table = [] + for i, row in enumerate(logic_net.table): + if i in externals: + indices, conditions = row + else: + if fix_canalizing: + original_canalizing = _logic_table_row_is_canalizing( + row, i, logic_net.size) + keep_trying = True + number_tried = 0 + while keep_trying and (number_tried < give_up_number): + n_indices = len(row[0]) + indices = tuple(sorted(random.sample( + range(logic_net.size), k=n_indices))) + + if fix_canalizing and original_canalizing: + conditions = random_canalizing_binary_states( + n_indices, ps[i]) + else: + conditions = random_binary_states(n_indices, ps[i]) + + number_tried += 1 + keep_trying = False + if make_irreducible: + node_irreducible = _logic_table_row_is_irreducible( + (indices, conditions), i, logic_net.size) + keep_trying = not node_irreducible + if (not keep_trying) and fix_canalizing: + node_canalizing = _logic_table_row_is_canalizing( + (indices, conditions), i, logic_net.size) + keep_trying = not (node_canalizing == original_canalizing) + if number_tried >= give_up_number: + msg = "No function out of " + str(give_up_number) + \ + " tried satisfied constraints" + raise Exception(msg) + + new_table.append((indices, conditions)) + + return LogicNetwork(new_table, logic_net.names) + + +def _random_logic_free_connections(logic_net, ps): + """ + Return a `LogicNetwork` from an input `LogicNetwork` with a random logic + table. + + All possible connections within the network are considered in the random + process. + + :param logic_net: a :class:LogicNetwork + :param p: probability that a state is present in the activation table + :returns: a random :class:LogicNetwork + """ + if not isinstance(logic_net, LogicNetwork): + raise ValueError('object must be a LogicNetwork') + + new_table = [] + for i in range(logic_net.size): + n_indices = random.randint(1, logic_net.size) + indices = tuple(sorted(random.sample( + range(logic_net.size), k=n_indices))) + + conditions = random_binary_states(n_indices, ps[i]) + + new_table.append((indices, conditions)) + + return LogicNetwork(new_table, logic_net.names) + + +def _random_logic_fixed_num_edges(logic_net, ps, fix_external=False, + make_irreducible=False, + fix_canalizing=False, + give_up_number=1000): + """ + Returns new network that corresponds to adding a fixed number of + edges between random nodes, with random corresponding boolean rules. + """ + if fix_canalizing: + raise NotImplementedError("fix_canalizing=True not yet implemented") + + num_edges = sum(len(logic_net.neighbors_in(i)) + for i in range(logic_net.size)) + + externals = _external_nodes(logic_net) if fix_external else set() + + num_edges -= len(externals) + + internals = [idx for idx in range(logic_net.size) if idx not in externals] + num_internal_connections = np.zeros(len(internals)) + + # partition edges among nodes + keep_trying = True + number_tried = 0 + while keep_trying and (number_tried < give_up_number): + rng = range(len(internals) * logic_net.size) + options = [i // logic_net.size for i in rng] + sample = np.random.choice(options, num_edges - len(internals), + replace=False) + idxs, counts = np.unique(sample, return_counts=True) + + num_internal_connections[idxs] = counts + num_internal_connections += 1 + + number_tried += 1 + # we need to check that there is no node that will want + # more connections than there are nodes + if max(num_internal_connections) <= logic_net.size: + keep_trying = False + if number_tried >= give_up_number: + msg = "No partition out of " + str(give_up_number) + " tried satisfied constraints" + raise Exception(msg) + + new_table = [()] * logic_net.size + for internal, num in zip(internals, num_internal_connections): + keep_trying = True + number_tried = 0 + while keep_trying and (number_tried < give_up_number): + in_indices = tuple(np.random.choice( + logic_net.size, int(num), replace=False)) + conditions = random_binary_states(len(in_indices), ps[internal]) + new_table[internal] = (in_indices, conditions) + + number_tried += 1 + if make_irreducible: + node_irreducible = _logic_table_row_is_irreducible( + (in_indices, conditions), internal, logic_net.size) + keep_trying = not node_irreducible + else: + keep_trying = False + if number_tried >= give_up_number: + msg = "No function out of " + str(give_up_number) + " tried satisfied constraints" + raise Exception(msg) + + for external in externals: + new_table[external] = logic_net.table[external] + + return LogicNetwork(new_table, logic_net.names) diff --git a/neet/synchronous.py b/neet/synchronous.py new file mode 100644 index 00000000..77c0849f --- /dev/null +++ b/neet/synchronous.py @@ -0,0 +1,731 @@ +""" +.. currentmodule:: neet.synchronous + +.. testsetup:: synchronous + + from math import e + from neet.automata import ECA + from neet.boolean import WTNetwork + from neet.boolean.examples import s_pombe + from neet.synchronous import * + +Synchronous Network Analysis +============================ + +The disadvantage of these functions is that they cannot share any +information. If you wish to compute a number of them, then many values will be +repeatedly computed. For this reason, we created the :class:`Landscape` class +to simultaneously compute and several of these properties. This provides much +better performance overall. + +API Documentation +----------------- +""" +import networkx as nx +import numpy as np +import pyinform as pi +import copy +from .statespace import StateSpace +from .network import Network + + +class Landscape(StateSpace): + """ + The ``Landscape`` class represents the structure and topology of the + "landscape" of state transitions. That is, it is the state space + together with information about state transitions and the topology of + the state transition graph. + """ + + def __init__(self, net, index=None, pin=None, values=None, + dynamic_pin=None): + """ + Construct the landscape for a network. + + .. rubric:: Examples + + .. doctest:: synchronous + + >>> Landscape(s_pombe) + + >>> Landscape(ECA(30, 5)) + + + :param net: the network + :param index: the index to update (or None) + :param pin: the indices to pin during update (or None); see also + dynamic_pin + :param values: a dictionary of index-value pairs to set after update + :param dynamic_pin: a list of lists of pinned values over time; + shape = (# timesteps)x(# pinned nodes) + :raises TypeError: if ``net`` is not a network + :raises ValueError: if ``dynamic_pin`` has incorrect shape + """ + + if not isinstance(net, Network): + raise TypeError("net is not a network") + else: + state_space = net.state_space() + + if dynamic_pin is not None: + if pin is None: + raise ValueError("pin must be specified to use dynamic_pin") + elif len(dynamic_pin[0]) != len(pin): + raise ValueError("each element of dynamic_pin must have the same length as pin") + + super(Landscape, self).__init__(state_space.shape) + + self.__net = net + self.__index = index + self.__pin = pin + self.__values = values + self.__dynamic_pin = dynamic_pin + + self.__expounded = False + self.__graph = None + + self.__setup() + + @property + def network(self): + """ + The landscape's dynamical network + + .. rubric:: Examples + + .. doctest:: synchronous + + >>> landscape = Landscape(s_pombe) + >>> landscape.network + + """ + return self.__net + + @property + def size(self): + """ + The number of nodes in the landscape's dynamical network + + .. rubric:: Examples + + .. doctest:: synchronous + + >>> landscape = Landscape(s_pombe) + >>> landscape.size + 9 + """ + return self.ndim + + @property + def transitions(self): + """ + The state transitions array + + The transitions array is an array, indexed by states, whose + values are the subsequent state of the indexing state. + + .. rubric:: Examples + + .. doctest:: synchronous + + >>> landscape = Landscape(s_pombe) + >>> landscape.transitions + array([ 2, 2, 130, 130, 4, 0, 128, 128, 8, 0, 128, 128, 12, + 0, 128, 128, 256, 256, 384, 384, 260, 256, 384, 384, 264, 256, + ... + 208, 208, 336, 336, 464, 464, 340, 336, 464, 464, 344, 336, 464, + 464, 348, 336, 464, 464]) + """ + return self.__transitions + + @property + def attractors(self): + """ + The array of attractor cycles. + + Each element of the array is an array of states in an attractor + cycle. + + .. rubric:: Examples + + .. doctest:: synchronous + + >>> landscape = Landscape(s_pombe) + >>> landscape.attractors + array([array([76]), array([4]), array([8]), array([12]), + array([144, 110, 384]), array([68]), array([72]), array([132]), + array([136]), array([140]), array([196]), array([200]), + array([204])], dtype=object) + """ + if not self.__expounded: + self.__expound() + return self.__attractors + + @property + def basins(self): + """ + The array of basin numbers, indexed by states. + + .. rubric:: Examples + + .. doctest:: synchronous + + >>> landscape = Landscape(s_pombe) + >>> landscape.basins + array([ 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 0, + 0, 4, 4, 0, 0, 4, 4, 0, 0, 4, 4, 0, 0, 4, 4, 4, 4, + ... + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0]) + """ + if not self.__expounded: + self.__expound() + return self.__basins + + @property + def basin_sizes(self): + """ + The basin sizes as an array indexed by the basin number. + + .. rubric:: Examples + + .. doctest:: synchronous + + >>> landscape = Landscape(s_pombe) + >>> landscape.basin_sizes + array([378, 2, 2, 2, 104, 6, 6, 2, 2, 2, 2, 2, 2]) + """ + if not self.__expounded: + self.__expound() + return self.__basin_sizes + + @property + def in_degrees(self): + """ + The in-degree of each state as an array. + + .. rubric:: Examples + + .. doctest:: synchronous + + >>> landscape = Landscape(s_pombe) + >>> landscape.in_degrees + array([ 6, 0, 4, 0, 2, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 12, + 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ... + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0]) + """ + if not self.__expounded: + self.__expound() + return self.__in_degrees + + @property + def heights(self): + """ + The height of each state as an array. + + The *height* of a state is the number of time steps from that + state to a state in it's attractor cycle. + + .. rubric:: Examples + + .. doctest:: synchronous + + >>> landscape = Landscape(s_pombe) + >>> landscape.heights + array([7, 7, 6, 6, 0, 8, 6, 6, 0, 8, 6, 6, 0, 8, 6, 6, 8, 8, 1, 1, 2, 8, + 1, 1, 2, 8, 1, 1, 2, 8, 1, 1, 2, 2, 2, 2, 9, 9, 1, 1, 9, 9, 1, 1, + ... + 3, 9, 9, 9, 3, 9, 9, 9, 3, 9, 9, 9, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, + 3, 3, 3, 3, 3, 3]) + """ + if not self.__expounded: + self.__expound() + return self.__heights + + @property + def attractor_lengths(self): + """ + The length of each attractor cycle as an array, indexed by the + attractor. + + .. rubric:: Examples + + .. doctest:: synchronous + + >>> landscape = Landscape(s_pombe) + >>> landscape.attractor_lengths + array([1, 1, 1, 1, 3, 1, 1, 1, 1, 1, 1, 1, 1]) + """ + if not self.__expounded: + self.__expound() + return self.__attractor_lengths + + @property + def recurrence_times(self): + """ + The recurrence time of each state as an array. + + The *recurrence time* is the number of time steps from that + state until a state is repeated. + + .. rubric:: Examples + + .. doctest:: synchronous + + >>> landscape = Landscape(s_pombe) + >>> landscape.recurrence_times + array([7, 7, 6, 6, 0, 8, 6, 6, 0, 8, 6, 6, 0, 8, 6, 6, 8, 8, 3, 3, 2, 8, + 3, 3, 2, 8, 3, 3, 2, 8, 3, 3, 4, 4, 4, 4, 9, 9, 3, 3, 9, 9, 3, 3, + ... + 3, 9, 9, 9, 3, 9, 9, 9, 3, 9, 9, 9, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, + 3, 3, 3, 3, 3, 3]) + """ + if not self.__expounded: + self.__expound() + return self.__recurrence_times + + @property + def graph(self): + """ + The state transitions graph of the landscape as a + ``networkx.Digraph``. + + .. rubric:: Examples + + .. doctest:: synchronous + + >>> landscape = Landscape(s_pombe) + >>> landscape.graph + + """ + if self.__graph is None: + self.__graph = nx.DiGraph(list(enumerate(self.__transitions))) + return self.__graph + + def __setup(self): + """ + This function performs all of the initilization-time setup of + the ``Landscape`` object. At present this is limited to + computing the state transitions array, but subsequent versions + may expand the work that ``__setup`` does. + """ + update = self.__net._unsafe_update + encode = self._unsafe_encode + + if self.__dynamic_pin is None: + transitions = np.empty(self.volume, dtype=np.int) + for i, state in enumerate(self): + transitions[i] = encode(update(state, + index=self.__index, + pin=self.__pin, + values=self.__values)) + else: # update multiple times using dynamic pin for inputs + + # we will construct a special limited state space that does not + # iterate over fixed nodes + limited_shape = self.shape[:] + for fixed_node_index in self.__pin: + limited_shape[fixed_node_index] = 1 + self.limited_state_space = StateSpace(limited_shape) + + # we will keep track of both the "transitions" (starting points + # to endpoints after ell timesteps, where ell is the length of + # the dynamic pin) and "dynamic_paths" (all steps along the way). + # note that transitions are encoded using the limited state space, + # while dynamic paths are encoded using the full state space. + transitions = np.empty(self.limited_state_space.volume, dtype=np.object) + dynamic_paths = np.empty((self.limited_state_space.volume, + len(self.__dynamic_pin)), + dtype=np.object) + for i,state in enumerate(self.limited_state_space): + current_state = copy.copy(state) + ell = len(self.__dynamic_pin) + # set first pinned values + for k,pinned_value in enumerate(self.__dynamic_pin[-1]): + current_state[self.__pin[k]] = pinned_value + for j in range(ell): + # update with those nodes pinned + current_state = update(current_state, + index=self.__index, + pin=self.__pin, + values=self.__values) + # set to next pinned values + for k,pinned_value in enumerate(self.__dynamic_pin[j%ell]): + current_state[self.__pin[k]] = pinned_value + # record result + dynamic_paths[i,j] = encode(current_state) + # reset pinned values to zero to encode correctly using limited state space + for pinned_node in self.__pin: + current_state[pinned_node] = 0 + transitions[i] = self.limited_state_space._unsafe_encode(current_state) + + self.__transitions = transitions + if self.__dynamic_pin is None: + self.__dynamic_paths = transitions.reshape(len(transitions),1) + else: + self.__dynamic_paths = dynamic_paths + self.dynamic_paths = self.__dynamic_paths # THERE'S PROBABLY A BETTER WAY + + def __expound(self): + """ + This function performs the bulk of the calculations that the + ``Landscape`` is concerned with. Most of the properties in this + class are computed by this function whenever *any one* of them + is requested and the results are cached. The advantage of this + is that it saves computation time; why traverse the state space + for every property call when you can do it all at once. The + downside is that the cached results may use a good bit more + memory. This is a trade-off that we are willing to make for now. + + The properties that are computed by this function include: + - :method:`attractors` + - :method:`basins` + - :method:`basin_sizes` + - :method:`in_degrees` + - :method:`heights` + - :method:`attractor_lengths` + - :method:`recurrence_times` + """ + + if self.__dynamic_pin is None: + volume = self.volume + else: + # With a dynamic pin, the volume of state space to be + # explored is limited + volume = self.limited_state_space.volume + + # Get the state transitions + trans = self.__transitions + # Create an array to store whether a given state has visited + visited = np.zeros(volume, dtype=np.bool) + # Create an array to store which attractor basin each state is in + basins = np.full(volume, -1, dtype=np.int) + # Create an array to store the in-degree of each state + in_degrees = np.zeros(volume, dtype=np.int) + # Create an array to store the height of each state + heights = np.zeros(volume, dtype=np.int) + # Create an array to store the recurrence time of each state + recurrence_times = np.zeros(volume, dtype=np.int) + # Create a counter to keep track of how many basins have been visited + basin_number = 0 + # Create a list of basin sizes + basin_sizes = [] + # Create a list of attractor cycles + attractors = [] + # Create a list of attractor lengths + attractor_lengths = [] + + # Start at state 0 + initial_state = 0 + # While the initial state is a state of the system + while initial_state < len(trans): + # Create a stack to store the state so far visited + state_stack = [] + # Create a array to store the states in the attractor cycle + cycle = [] + # Create a flag to signify whether the current state is part of + # the cycle + in_cycle = False + # Set the current state to the initial state + state = initial_state + # Store the next state and terminus variables to the next state + terminus = next_state = trans[state] + # Set the visited flag of the current state + visited[state] = True + # Increment in-degree + in_degrees[next_state] += 1 + # While the next state hasn't been visited + while not visited[next_state]: + # Push the current state onto the stack + state_stack.append(state) + # Set the current state to the next state + state = next_state + # Update the terminus and next_state variables + terminus = next_state = trans[state] + # Update the visited flag for the current state + visited[state] = True + # Increment in-degree + in_degrees[next_state] += 1 + + # If the next state hasn't been assigned a basin yet + if basins[next_state] == -1: + # Set the current basin to the basin number + basin = basin_number + # Increment the basin number + basin_number += 1 + # Add a new basin size + basin_sizes.append(0) + # Add a new attractor length + attractor_lengths.append(1) + # Add the current state to the attractor cycle + cycle.append(state) + # Set the current state's recurrence time + recurrence_times[state] = 0 + # We're still in the cycle until the current state is equal to + # the terminus + in_cycle = (terminus != state) + else: + # Set the current basin to the basin of next_state + basin = basins[next_state] + # Set the state's height to one greater than the next state's + heights[state] = heights[next_state] + 1 + # Set the state's recurrence time to one greater than the next + # state's + recurrence_times[state] = recurrence_times[next_state] + 1 + + # Set the basin of the current state + basins[state] = basin + # Increment the basin size + basin_sizes[basin] += 1 + + # While we still have states on the stack + while len(state_stack) != 0: + # Save the current state as the next state + next_state = state + # Pop the current state off of the top of the stack + state = state_stack.pop() + # Set the basin of the current state + basins[state] = basin + # Increment the basin_size + basin_sizes[basin] += 1 + # If we're still in the cycle + if in_cycle: + # Add the current state to the attractor cycle + cycle.append(state) + # Increment the current attractor length + attractor_lengths[basin] += 1 + # We're still in the cycle until the current state is + # equal to the terminus + in_cycle = (terminus != state) + # Set the cycle state's recurrence times + if not in_cycle: + for cycle_state in cycle: + rec_time = attractor_lengths[basin] - 1 + recurrence_times[cycle_state] = rec_time + else: + # Set the state's height to one create than the next + # state's + heights[state] = heights[next_state] + 1 + # Set the state's recurrence time to one greater than the + # next state's + recurrence_times[state] = recurrence_times[next_state] + 1 + + # Find the next unvisited initial state + while initial_state < len(visited) and visited[initial_state]: + initial_state += 1 + + # If the cycle isn't empty, append it to the attractors list + if len(cycle) != 0: + attractors.append(np.asarray(cycle, dtype=np.int)) + + # BCD 12.19.2018 I think the attractors are drawkcab... + attractors = [ a[::-1] for a in attractors ] + + self.__basins = basins + self.__basin_sizes = np.asarray(basin_sizes) + if self.__dynamic_pin is None: + self.__attractors = np.asarray(attractors) + else: + # each attractor state needs to be expanded to length ell paths + attractor_paths = [ np.concatenate([ self.dynamic_paths[state] \ + for state in a]) \ + for a in attractors ] + self.__attractors = np.asarray(attractor_paths) + self.__attractor_lengths = np.asarray(attractor_lengths) + self.__in_degrees = in_degrees + self.__heights = heights + self.__recurrence_times = np.asarray(recurrence_times) + self.__expounded = True + + def trajectory(self, init, timesteps=None, encode=None): + """ + Compute the trajectory of a state. + + This method computes a trajectory from ``init`` to the last + before the trajectory begins to repeat. If ``timesteps`` is + provided, then the trajectory will have a length of ``timesteps + + 1`` regardless of repeated states. The ``encode`` argument + forces the states in the trajectory to be either encoded or not. + When ``encode is None``, whether or not the states of the + trajectory are encoded is determined by whether or not the + initial state (``init``) is provided in encoded form. + + Note that when ``timesteps is None``, the length of the + resulting trajectory should be one greater than the recurrence + time of the state. + + .. rubric:: Examples + + .. doctest:: synchronous + + >>> landscape = Landscape(s_pombe) + >>> landscape.trajectory([1,0,0,1,0,1,1,0,1]) + [[1, 0, 0, 1, 0, 1, 1, 0, 1], ... [0, 0, 1, 1, 0, 0, 1, 0, 0]] + + >>> landscape.trajectory([1,0,0,1,0,1,1,0,1], encode=True) + [361, 80, 320, 78, 128, 162, 178, 400, 332, 76] + + >>> landscape.trajectory(361) + [361, 80, 320, 78, 128, 162, 178, 400, 332, 76] + + >>> landscape.trajectory(361, encode=False) + [[1, 0, 0, 1, 0, 1, 1, 0, 1], ... [0, 0, 1, 1, 0, 0, 1, 0, 0]] + + >>> landscape.trajectory(361, timesteps=5) + [361, 80, 320, 78, 128, 162] + + >>> landscape.trajectory(361, timesteps=10) + [361, 80, 320, 78, 128, 162, 178, 400, 332, 76, 76] + + :param init: the initial state + :type init: ``int`` or an iterable + :param timesteps: the number of time steps to include in the trajectory + :type timesteps: ``int`` or ``None`` + :param encode: whether to encode the states in the trajectory + :type encode: ``bool`` or ``None`` + :return: a list whose elements are subsequent states of the trajectory + + :raises ValueError: if ``init`` an empty array + :raises ValueError: if ``timesteps`` is less than :math:`1` + """ + decoded = isinstance(init, list) or isinstance(init, np.ndarray) + + if decoded: + if init == []: + raise ValueError("initial state cannot be empty") + elif encode is None: + encode = False + init = self.encode(init) + elif encode is None: + encode = True + + trans = self.__transitions + if timesteps is not None: + if timesteps < 1: + raise ValueError("number of steps must be positive, non-zero") + + path = [init] * (timesteps + 1) + for i in range(1, len(path)): + path[i] = trans[path[i - 1]] + else: + path = [init] + state = trans[init] + while state not in path: + path.append(state) + state = trans[state] + + if not encode: + decode = self.decode + path = [decode(state) for state in path] + + return path + + def timeseries(self, timesteps): + """ + Compute the full time series of the landscape. + + This method computes a 3-dimensional array elements are the + states of each node in the network. The dimensions of the array + are indexed by, in order, the node, the initial state and the + time step. + + .. rubric:: Examples + + .. doctest:: synchronous + + >>> landscape = Landscape(s_pombe) + >>> landscape.timeseries(5) + array([[[0, 0, 0, 0, 0, 0], + [1, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0], + ..., + [1, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0], + [1, 0, 0, 0, 0, 0]], + + [[0, 1, 1, 1, 1, 0], + [0, 1, 1, 1, 1, 0], + [1, 1, 1, 1, 0, 0], + ..., + [0, 0, 0, 0, 0, 0], + [1, 0, 0, 0, 0, 0], + [1, 0, 0, 0, 0, 0]], + + ... + + [[0, 0, 1, 1, 1, 1], + [0, 0, 1, 1, 1, 1], + [0, 1, 1, 1, 1, 0], + ..., + [1, 0, 0, 0, 0, 0], + [1, 1, 0, 0, 0, 0], + [1, 1, 0, 0, 0, 0]], + + [[0, 0, 0, 0, 0, 1], + [0, 0, 0, 0, 0, 1], + [0, 0, 0, 0, 1, 1], + ..., + [1, 1, 1, 0, 0, 0], + [1, 1, 1, 0, 0, 0], + [1, 1, 1, 0, 0, 0]]]) + + :param timesteps: the number of timesteps to evolve the system + :type timesteps: ``int`` + :return: a 3-D array of node states + + :raises ValueError: if ``timesteps`` is less than :math:`1` + """ + if timesteps < 1: + raise ValueError("number of steps must be positive, non-zero") + + trans = self.__transitions + decode = self.decode + decoded_trans = [decode(state) for state in trans] + + shape = (self.ndim, self.volume, timesteps + 1) + series = np.empty(shape, dtype=np.int) + + for index, init in enumerate(self): + k = index + series[:, index, 0] = init[:] + for time in range(1, timesteps + 1): + series[:, index, time] = decoded_trans[k][:] + k = trans[k] + + return series + + def basin_entropy(self, base=2.0): + """ + Compute the basin entropy of the landscape [Krawitz2007]_. + + This method computes the Shannon entropy of the distribution of + basin sizes. The base of the logarithm is chosen to be the + number of basins so that the result is :math:`0 \\leq h \\leq 1`. + If there is fewer than :math:`2` basins, then the base is taken + to be :math:`2` so that the result is never `NaN`. The base can + be forcibly overridden with the ``base`` keyword argument. + + .. rubric:: Examples + + .. doctest:: synchronous + + >>> landscape = Landscape(s_pombe) + >>> landscape.basin_entropy() + 1.221888833884975 + >>> landscape.basin_entropy(base=2) + 1.221888833884975 + >>> landscape.basin_entropy(base=10) + 0.36782519036626105 + >>> landscape.basin_entropy(base=e) + 0.8469488001650497 + + :param base: the base of the logarithm + :type base: a number or ``None`` + :return: the basin entropy of the landscape of type ``float`` + """ + if not self.__expounded: + self.__expound() + dist = pi.Dist(self.__basin_sizes) + return pi.shannon.entropy(dist, b=base) diff --git a/test/boolean/test_modularity.py b/test/boolean/test_modularity.py new file mode 100644 index 00000000..515039ac --- /dev/null +++ b/test/boolean/test_modularity.py @@ -0,0 +1,264 @@ +import unittest +import modularity as md + +from neet.boolean.examples import s_pombe, s_cerevisiae +from neet.boolean import LogicNetwork + +TEST_CELL_COLLECTIVE = True + +if TEST_CELL_COLLECTIVE: + import datadex + from load_cell_collective_nets import loadCellCollectiveNets + +def atts_and_cks_modular_and_sampled(net,numsamples=100,seed=123,iterative=False): + """ + Calculate attractors and control kernels using both exact "modular" approach + and "sampled" appraoch. + + Returns: atts, cks, sampled_atts, sampled_cks + """ + # calculate using modular code + atts,outdict = md.attractors(net,find_control_kernel=True) + cks = outdict['control_kernels'] + + # calculate using sampling code + sampled_atts = md.sampled_attractors(net,numsamples=numsamples,seed=seed) + sampled_cks = md.sampled_control_kernel(net,numsamples=numsamples, + phenotype='all',seed=seed, + iterative=iterative, + iterative_rounds_out=False) + + return atts,cks,sampled_atts,sampled_cks + +class TestModularityHelpers(unittest.TestCase): + + def test_attractors_equivalent(self): + a1 = [1,2,3,4] + a2 = [2,3,4,1] + a3 = [4,1,2,3] + a4 = [3,2,1,4] + a5 = [1,2,3,8] + a6 = [1,2,3] + self.assertTrue(md.attractors_equivalent(a1,a1)) + self.assertTrue(md.attractors_equivalent(a1,a2)) + self.assertTrue(md.attractors_equivalent(a1,a3)) + self.assertTrue(md.attractors_equivalent(a2,a3)) + self.assertFalse(md.attractors_equivalent(a1,a4)) + self.assertFalse(md.attractors_equivalent(a1,a5)) + self.assertFalse(md.attractors_equivalent(a1,a6)) + + def test_atts_and_cks_equivalent(self): + atts1 = [[1,2,3,4],[5,6,7],[8,]] + cks1 = [{0,1,2},{0,},None] + atts2 = [[6,7,5],[3,4,1,2],[8,]] + cks2 = [{0,},{1,0,2},None] + atts3 = [[6,7,5],[3,4,1,2],[8,]] + cks3 = [{1,0,2},{0,},None] + atts4 = [[1,2,3,4],[5,6,7]] + cks4 = [{0,1,2},{0,}] + atts5 = [[1,2,3,4],[8,],[5,6,7]] + cks5 = [{10,11,12},None,{13,}] + atts6 = [[1,2,3,4],[8,],[5,6,7]] + cks6 = [{10,11,12},{0,},{13,}] + self.assertTrue(md.atts_and_cks_equivalent(atts1,cks1,atts1,cks1)) + self.assertTrue(md.atts_and_cks_equivalent(atts1,cks1,atts2,cks2)) + self.assertFalse(md.atts_and_cks_equivalent(atts1,cks1,atts3,cks3)) + self.assertFalse(md.atts_and_cks_equivalent(atts1,cks1,atts4,cks4)) + self.assertFalse(md.atts_and_cks_equivalent(atts1,cks1,atts5,cks5)) + self.assertTrue(md.atts_and_cks_equivalent(atts1,cks1,atts5,cks5, + ck_size_only=True)) + self.assertFalse(md.atts_and_cks_equivalent(atts1,cks1,atts6,cks6, + ck_size_only=True)) + + def test_distinguishing_nodes(self): + attractors = [[[1,0,0]], + [[0,1,0]], + [[0,0,1]], + [[0,0,0]]] + correct_dn_list = [ [{0,},{0,1},{0,2},{0,1,2}], + [{1,},{0,1},{1,2},{0,1,2}], + [{2,},{0,2},{1,2},{0,1,2}], + [{0,1,2}] ] + dn_gen_list = md.distinguishing_nodes_from_attractors(attractors) + self.assertTrue(len(dn_gen_list) == len(attractors)) + for dn_gen,correct_dn in zip(dn_gen_list,correct_dn_list): + dn = [ d for d in dn_gen ] + self.assertTrue(len(dn) == len(correct_dn)) + for d in dn: + self.assertTrue(d in correct_dn) + + +class TestModularity(unittest.TestCase): + + def test_attractors_s_cerevisiae(self): + atts = md.attractors(s_cerevisiae) + known_fixed_points = [0,272,12,16,256,274,258] # depends on neet encoding... + self.assertEqual(len(known_fixed_points),len(atts)) + for fp in known_fixed_points: + self.assertTrue(md.np.array([fp]) in atts) + + def test_control_kernel_s_pombe(self): + test_attractor = md.np.array([76]) # note this depends on neet encoding... + atts,outdict = md.attractors(s_pombe,find_control_kernel=True) + test_index = atts.index(test_attractor) + self.assertEqual({2,3,6,7},outdict['control_kernels'][test_index]) + + def test_control_kernel_s_cerevisiae(self): + test_attractor = md.np.array([272]) # note this depends on neet encoding... + atts,outdict = md.attractors(s_cerevisiae,find_control_kernel=True) + test_index = atts.index(test_attractor) + self.assertEqual({1,2,4,8},outdict['control_kernels'][test_index]) + + def test_control_kernel_cycle(self): + table = [((0,),{'1',}), + ((0,1,),{'01','10'})] + net = LogicNetwork(table) + + atts,cks,sampled_atts,sampled_cks = atts_and_cks_modular_and_sampled(net) + + # note that encoded attractors depend on the neet encoding + correct_atts = [ [0,], [2,], [1,3] ] + correct_cks = [ {0,1}, {0,1}, {0,} ] + + self.assertTrue(md.atts_and_cks_equivalent(correct_atts,correct_cks, + atts,cks)) + self.assertTrue(md.atts_and_cks_equivalent(correct_atts,correct_cks, + sampled_atts,sampled_cks)) + + def test_control_simple_switcher(self): + table = [((0,),{'0',})] + net = LogicNetwork(table) + + atts,cks,sampled_atts,sampled_cks = atts_and_cks_modular_and_sampled(net) + + # note that encoded attractors depend on the neet encoding + correct_atts = [ [0,1], ] + correct_cks = [ set(), ] + + self.assertTrue(md.atts_and_cks_equivalent(correct_atts,correct_cks, + atts,cks)) + self.assertTrue(md.atts_and_cks_equivalent(correct_atts,correct_cks, + sampled_atts,sampled_cks)) + + def test_control_simple_uncontrollable_cycle(self): + table = [((1,),{'0',}), + ((0,),{'0',})] + net = LogicNetwork(table) + + atts,cks,sampled_atts,sampled_cks = atts_and_cks_modular_and_sampled(net) + + # note that encoded attractors depend on the neet encoding + correct_atts = [ [1,], [2,], [0,3] ] + correct_cks = [ {0,}, {0,}, None ] + + self.assertTrue(md.atts_and_cks_equivalent(correct_atts,correct_cks, + atts,cks, + ck_size_only=True)) + self.assertTrue(md.atts_and_cks_equivalent(correct_atts,correct_cks, + sampled_atts,sampled_cks, + ck_size_only=True)) + + def test_control_simple_asynchronous_switcher(self): + table = [((0,),{'0',}), + ((1,),{'0',})] + net = LogicNetwork(table) + + atts,cks,sampled_atts,sampled_cks = atts_and_cks_modular_and_sampled(net) + + # note that encoded attractors depend on the neet encoding + correct_atts = [ [1,2], [0,3] ] + correct_cks = [ None, None ] + + self.assertTrue(md.atts_and_cks_equivalent(correct_atts,correct_cks, + atts,cks)) + self.assertTrue(md.atts_and_cks_equivalent(correct_atts,correct_cks, + sampled_atts,sampled_cks)) + + def test_control_tricky_1(self): + """ + "Counterexample" case from notes 4/22/2020. For this case, + the iterative and non-iterative methods give different results, + because the smallest distinguishing node set is not in the + final control kernel for the 0 attractor. + """ + table = [((0,1,2),{'001','010','011','101','110'}), + ((0,1,2),{'010','011','110'}), + ((0,1,2),{'001','011','101'})] + net = LogicNetwork(table) + + atts,cks,sampled_atts,sampled_cks = atts_and_cks_modular_and_sampled(net) + + # note that encoded attractors depend on the neet encoding + correct_atts = [ [0], [3], [5] ] + correct_cks = [ {1,2}, {1}, {2} ] + + self.assertTrue(md.atts_and_cks_equivalent(correct_atts,correct_cks, + atts,cks)) + self.assertTrue(md.atts_and_cks_equivalent(correct_atts,correct_cks, + sampled_atts,sampled_cks)) + + # also check that sampled_control_kernel with iterative=True + # fails in the expected way + expected_cks_iterative = [ {0,1,2}, {1}, {2} ] + _,_,sampled_atts_iterative,sampled_cks_iterative = \ + atts_and_cks_modular_and_sampled(net,iterative=True) + self.assertTrue(md.atts_and_cks_equivalent(correct_atts,expected_cks_iterative, + sampled_atts_iterative, + sampled_cks_iterative)) + + # also check the "iterative rounds" functionality + _,iterative_rounds = md.sampled_control_kernel(net,numsamples=100, + phenotype='all',seed=123, + iterative=True, + iterative_rounds_out=True) + self.assertTrue([4,1] in iterative_rounds) + self.assertTrue([1] in iterative_rounds) + + + def test_control_tricky_2(self): + """ + Example case from notes 5/8/2020. For this case, the control + kernel includes nodes that have constant values over the + original attractors. + """ + table = [((0,1,2),{'001','010','011','100'}), + ((0,1),{'01'}), + ((0,2),{'01'})] + net = LogicNetwork(table) + + atts,cks,sampled_atts,sampled_cks = atts_and_cks_modular_and_sampled(net) + + # note that encoded attractors depend on the neet encoding + correct_atts = [ [0], [1] ] + correct_cks = [ {0,1,2}, {0} ] + + self.assertTrue(md.atts_and_cks_equivalent(correct_atts,correct_cks, + atts,cks)) + self.assertTrue(md.atts_and_cks_equivalent(correct_atts,correct_cks, + sampled_atts,sampled_cks)) + + def test_control_iron(self): + if TEST_CELL_COLLECTIVE: + cc_iron_index = 37 + cc_iron_name = 'Iron Acquisition And Oxidative Stress Response In Aspergillus Fumigatus.' + net = loadCellCollectiveNets(cc_iron_index)[cc_iron_name] + + atts,cks,sampled_atts,sampled_cks = atts_and_cks_modular_and_sampled(net) + + # note that encoded attractors depend on the neet encoding + correct_atts = [[462114, 470306, 486435, 97959, 581343, + 562060, 920586, 69030, 396546], + [3084075, 3092267, 3108395, 2752175, 2678495, + 2659212, 3017738, 2166183, 2494211], + [1684397, 1986139], + [3781549, 4083291]] + correct_cks = [{20, 21}, {20, 21}, {20, 21}, {20, 21}] + + self.assertTrue(md.atts_and_cks_equivalent(correct_atts,correct_cks, + atts,cks)) + self.assertTrue(md.atts_and_cks_equivalent(correct_atts,correct_cks, + sampled_atts,sampled_cks)) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/boolean/test_wtnetwork.py b/test/boolean/test_wtnetwork.py index e0b86ff0..8acf1f96 100644 --- a/test/boolean/test_wtnetwork.py +++ b/test/boolean/test_wtnetwork.py @@ -633,10 +633,10 @@ def test_neighbors_in_split_threshold(self): self.assertEqual(net.neighbors_in(2), set([0, 1, 2, 5, 8])) with self.assertRaises(IndexError): - self.assertEqual(net.neighbors_in(2.0)) + net.neighbors_in(2.0) with self.assertRaises(IndexError): - self.assertEqual(net.neighbors_in('2')) + net.neighbors_in('2') def test_neighbors_out_split_threshold(self): @@ -672,10 +672,10 @@ def test_neighbors_in_positive_threshold(self): self.assertEqual(net.neighbors_in(2), set([0, 1, 5, 8])) with self.assertRaises(IndexError): - self.assertEqual(net.neighbors_in(2.0)) + net.neighbors_in(2.0) with self.assertRaises(IndexError): - self.assertEqual(net.neighbors_in('2')) + net.neighbors_in('2') def test_neighbors_out_negative_threshold(self): diff --git a/test/test_randomnet.py b/test/test_randomnet.py new file mode 100644 index 00000000..1de2b5b2 --- /dev/null +++ b/test/test_randomnet.py @@ -0,0 +1,64 @@ +import unittest +import numpy as np +from neet.boolean.examples import mouse_cortical_7B +from neet.boolean.randomnet import (random_logic, random_binary_states,RBN,) + +TESTSEED = 314159 + + +class TestRandomnet(unittest.TestCase): + def test_random_logic_invalid_p(self): + """ + ``random_logic`` should raise a value error if ``p`` is an + incorrect size + """ + with self.assertRaises(ValueError): + net = mouse_cortical_7B + random_logic(net, p=np.ones(net.size + 1)) + + def test_random_binary_states(self): + self.assertEqual(8, len(random_binary_states(4, 0.5))) + self.assertTrue(len(random_binary_states(3, 0.4)) in (3, 4)) + + def test_random_logic_fixed_structure(self): + net = mouse_cortical_7B + np.random.seed(TESTSEED) + randnet = random_logic(net, connections='fixed-structure') + # fixed-structure should preserve all neighbors + for i in range(net.size): + self.assertEqual(net.neighbors_in(i), randnet.neighbors_in(i)) + + def test_random_logic_fixed_in_degree(self): + net = mouse_cortical_7B + np.random.seed(TESTSEED) + randnet = random_logic(net, connections='fixed-in-degree') + # fixed-in-degree should preserve each node's in degree + for i in range(net.size): + self.assertEqual(len(net.neighbors_in(i)), + len(randnet.neighbors_in(i))) + + def test_random_logic_fixed_mean_degree(self): + net = mouse_cortical_7B + np.random.seed(TESTSEED) + randnet = random_logic(net, connections='fixed-mean-degree') + # fixed-mean-degree should preserve the total number of edges + numedges = np.sum([len(net.neighbors_in(i)) for i in range(net.size)]) + randnumedges = np.sum([len(randnet.neighbors_in(i)) + for i in range(randnet.size)]) + self.assertEqual(numedges, randnumedges) + + def test_RBN(self): + N = 10 + k = 2 + p = 0.5 + np.random.seed(TESTSEED) + net = RBN(N,k,p) + # the network should have size N + self.assertEqual(N,net.size) + # every node should have k incoming edges + edges_per_node = [len(net.neighbors_in(i)) for i in range(net.size)] + self.assertEqual(edges_per_node,[k for i in range(net.size)]) + + + +