Source code for sircuitenum.enumeration

__doc__ = "enumeration.py: Contains the core functionality for enumerating circuits"
__author__ = "Eli Weissler, Mohit Bhat"
__version__ = "0.1.0"
__all__ = ["generate_all_circuits", "generate_graphs_node", "trim_graph_node", "gen_hamiltonian", "find_equiv_cir_series", "find_unique_ground_placements", "num_possible_circuits"]

import sqlite3
import itertools
import functools
import traceback
import contextlib
from pathlib import Path
from typing import Union
from multiprocessing import Pool
from multiprocessing import set_start_method
try:
    set_start_method("fork")
except:
    print("Multiprocessing fork not available on your system.\
           More than one worker is not supported for enumeration \
           with custom elements.")

import sympy as sym
import networkx as nx
import numpy as np
import pandas as pd
from sympy.parsing.latex import parse_latex
from tqdm import tqdm
from func_timeout import func_timeout, FunctionTimedOut

from sircuitenum import utils
from sircuitenum import reduction as red
from sircuitenum import qpackage_interface as pi
from sircuitenum import quantize

# -------------------------------------------------------------------
# Functions
# -------------------------------------------------------------------
[docs]def num_possible_circuits(base: int, n_nodes: int, quiet: bool = True) -> int: """ Estimate the number of possible circuits for a given number of edges and vertices. This function calculates the number of possible circuits for a graph with `n_nodes` vertices and `base` possible edge types. The estimate may be an overestimation. Parameters ---------- base : int, optional The number of possible edge types. Defaults to ``7``, corresponding to: ``J, C, L, JL, CL, JC, JCL``. n_nodes : int The number of vertices (nodes) in the graph. quiet : bool, optional If ``False``, prints the estimated number of circuits. Defaults to ``True``. Returns ------- int The estimated number of possible circuits. """ all_graphs = utils.get_basegraphs(n_nodes) n_circuits = 0 for graph in all_graphs: n_circuits += base**len(graph.edges) if not quiet: print("With " + str(base) + " elements and " + str(n_nodes) + " nodes there are " + str(n_circuits) + " possible circuits") return n_circuits
def generate_for_specific_graph(base: int, graph: nx.Graph, graph_index: int, cursor_obj=None, return_vals: bool = False): """Generates all circuits derived from a given graph Args: base (int): The number of possible edges. By default this is 7: (i.e., J, C, I, JI, CI, JC, JCI) graph (nx Graph) : base graph to generate circuits for graph index (int): the index of the graph for the written circuit within the file for the number of nodes n_nodes (int): Number of nodes in circuit cursor_obj: sqllite cursor object pointing to the desired database. return_vals (bool): return the circuits as a dataframe """ n_nodes = len(graph.nodes) if cursor_obj is None and return_vals is False: raise ValueError("Graphs are generating but neither \ being returned nor saved") edges = graph.edges n_edges = len(edges) if return_vals: data = [] num_configs = base**n_edges for i, circuit in enumerate(itertools.product(utils.ENUM_PARAMS["CHAR_LIST"][:base], repeat=n_edges)): c_dict = utils.circuit_entry_dict(circuit, graph_index, n_nodes, i, base) # Commit for the last one in the set if cursor_obj is not None: utils.write_circuit(cursor_obj, c_dict, to_commit=i == (num_configs-1)) if return_vals: data.append(c_dict) if return_vals: return pd.DataFrame(data) def delete_table(db_file: str, n_nodes: int): """Deletes table in sql database Args: n_nodes (int): Number of nodes for table db_file (str): sql database to delete table from """ connection_obj = sqlite3.connect(db_file) cursor_obj = connection_obj.cursor() table_name = 'CIRCUITS_' + str(n_nodes) + '_NODES' cursor_obj.execute("DROP TABLE IF EXISTS {table}".format(table=table_name)) connection_obj.commit() connection_obj.close() return
[docs]def find_unique_ground_placements(circuit: list, edges: list) -> tuple[int]: """ Uses component graph isomorphism to determine the unique ground node placements for a given circuit. Parameters ---------- circuit : list of list of str A list representing the elements of the desired circuit. Example: ``[["J"], ["L", "J"], ["C"]]``. edges : list of tuple of int A list of edge connections for the desired circuit. Example: ``[(0,1), (0,2), (1,2)]``. Returns ------- tuple of int A tuple containing integers representing the unique ground node placements for the given circuit. """ unique_nodes = [] unique_graphs = [] for gnd in range(utils.get_num_nodes(edges)): test = red.convert_circuit_to_component_graph(circuit, edges, ground_nodes=[gnd]) isomorphic_in_set = False for ref in unique_graphs: if nx.is_isomorphic(test, ref, node_match=red.colors_match): isomorphic_in_set = True break if not isomorphic_in_set: unique_graphs.append(test) unique_nodes.append(gnd) return tuple(unique_nodes)
def expand_ground_node(df: pd.DataFrame): """ Create new entries in the dataframe for unique placements of ground nodes Args: df (pd.DataFrame): circuit dataframe Returns: pd.DataFrame: dataframe with an entry for each ground node placement. """ new_df = [] df["ground_node"] = -1 for i in tqdm(range(df.shape[0])): row = df.iloc[[i]].copy() circuit, edges = row["circuit"].iloc[0], row["edges"].iloc[0] for gnd in find_unique_ground_placements(circuit, edges): new_row = row.copy() new_row["ground_node"] = gnd new_df.append(new_row) return pd.concat(new_df) def has_dangling_edges(circuit: list, edges: list): """ Determines whether a circuit has a dangling edge, i.e. a single branch through which current cannot flow Args: circuit (list): a list of element labels for the desired circuit e.g. [["J"],["L", "J"], ["C"]] edges (list): a list of edge connections for the desired circuit e.g. [(0,1), (0,2), (1,2)] Returns: True if there is a dangling edge, false if not """ deg = utils.circuit_degree(circuit, edges) if all(d > 1 for d in deg): return False else: return True def remove_dangling_edges(df: pd.DataFrame): """ Removes edges that cannot have current flowing through them after placing a ground node Args: df (pd.DataFrame): circuit dataframe Returns: pd.DataFrame: dataframe with all circuits that have dangling edges removed. """ ind_to_keep = [] for i in range(df.shape[0]): row = df.iloc[i] if not has_dangling_edges(row["circuit"], row["edges"]): ind_to_keep.append(i) return df.iloc[ind_to_keep].copy()
[docs]def find_equiv_cir_series(db_file: str, circuit: list, edges: list) -> str: """ Searches the database for circuits that are equivalent to the given one, up to a reduction of series linear circuit elements. Parameters ---------- db_file : str Path to the SQLite database file that has been preprocessed for the given number of nodes. circuit : list of list of str A list representing the circuit elements. Example: ``[["J"], ["L", "J"], ["C"]]``. edges : list of tuple of int A list of edge connections that define the circuit's connectivity. Example: ``[(0,1), (0,2), (1,2)]``. Returns ------- str The unique key of the equivalent circuit found in the non-isomorphic set. Returns "" if no equivalent circuit is found. """ # What does it look like with series elems removed c2, e2 = red.remove_series_elems(circuit, edges) equiv = utils.find_circuit_in_db(db_file, c2, e2) if equiv.empty: return "" # Return the equivalent circuit if equiv.iloc[0]['equiv_circuit'] == "": return equiv.iloc[0]['unique_key'] else: return equiv.iloc[0]['equiv_circuit']
[docs]def generate_graphs_node(db_file: str, n_nodes: int, base: int, return_vals: bool = False) -> Union[pd.DataFrame, None]: """ Generate circuits for all graphs with a given number of nodes and store them in an SQL database. This function generates circuits for all possible graphs with `n_nodes` nodes and stores them in a table within the specified SQL database. The table is labeled as ``CIRCUITS_<n_nodes>_NODES``. Parameters ---------- n_nodes : int The number of nodes for which circuits will be generated and stored. base : int, optional The number of possible edge types. Defaults to ``7``, corresponding to: ``J, C, L, JL, CL, JC, JCL``. db_file : str Path to the SQL database file where the circuits will be stored. return_vals : bool, optional If ``True``, returns the generated circuits as a Pandas DataFrame. Defaults to ``False``. Returns ------- pandas.DataFrame or None If `return_vals` is ``True``, returns a DataFrame containing the generated circuits. Otherwise, returns ``None``. """ # Initialize table if db_file is not None: if Path(db_file).exists(): delete_table(db_file, n_nodes) table_name = 'CIRCUITS_' + str(n_nodes) + '_NODES' connection_obj = sqlite3.connect(db_file) cursor_obj = connection_obj.cursor() sql_str = f"CREATE TABLE {table_name} (circuit, graph_index int, edge_counts, \ unique_key, n_nodes int, base int, no_series int, \ filter int, in_non_iso_set int, \ equiv_circuit, " sql_str += "PRIMARY KEY(unique_key))" cursor_obj.execute(sql_str) connection_obj.commit() else: cursor_obj = None all_graphs = utils.get_basegraphs(n_nodes) data = [] for graph_index, G in tqdm(enumerate(all_graphs), total=len(all_graphs)): data.append(generate_for_specific_graph(base, G, graph_index, cursor_obj, return_vals)) if cursor_obj is not None: connection_obj.close() if return_vals: return pd.concat(data)
[docs]def trim_graph_node(db_file: str, n_nodes: int, base: int = None, n_workers: int = 1) -> None: """ Mark circuits in the database based on Josephson junctions, series linear components, and non-isomorphism. This function updates the database to indicate whether each circuit contains Josephson junctions (JJs), series linear components, and belongs to a non-isomorphic set of circuits. If a circuit is not in the non-isomorphic set, an equivalent circuit that is in the set is recorded. All three conditions must be met for inclusion in the final set. Parameters ---------- db_file : str Path to the SQL database file where circuits are stored. n_nodes : int The number of nodes to consider. base : int, optional The number of possible edge types. Defaults to ``7``, corresponding to: ``J, C, L, JL, CL, JC, JCL``. n_workers : int, optional The number of workers to use for processing. Defaults to ``1``. """ if base is None: base = len(utils.ENUM_PARAMS["CHAR_TO_COMBINATION"]) # Get the max number of edges # from fully connected graph all_graphs = utils.get_basegraphs(n_nodes) n_edges_in_graph = [len(g.edges) for g in all_graphs] n_graphs = len(all_graphs) max_edges = max(n_edges_in_graph) # Loop through all possible numbers of each component # For all unique base graphs and create non-isomorphic # Sets within these slices print("Trimming graphs with no jj's, linear elements in series", "and reducing isomorphic graphs...") with sqlite3.connect(db_file) as con: cur = con.cursor() table_name = 'CIRCUITS_' + str(n_nodes) + '_NODES' sql_query = f"SELECT DISTINCT edge_counts FROM {table_name}" counts_to_consider = [x[0] for x in cur.execute(sql_query).fetchall()] args = [] for counts_str in counts_to_consider: n_edges = sum(int(x) for x in counts_str.split(",")) for graph_index in range(n_graphs): # Skip entries without the right number of edges # in edge counts if n_edges != n_edges_in_graph[graph_index]: continue else: filter_str = f"WHERE edge_counts = '{counts_str}'\ AND graph_index = {graph_index}" args.append((filter_str, db_file, n_nodes, utils.ENUM_PARAMS["CHAR_TO_COMBINATION"])) # Shuffle to spread out longer cases for more accurate time # estimates and better parallel performance np.random.shuffle(args) if n_workers > 1: pool = Pool(processes=n_workers) for _ in tqdm(pool.imap_unordered(reduce_individual_set_, args), total=sum(1 for _ in args)): pass else: for arg_set in tqdm(args): reduce_individual_set_(arg_set)
def reduce_individual_set_(args: tuple): """ Parallel helper function for calling full reduction on groups defined by the specified sql filter string. Intended to split by number of each circuit element. Args: args (tuple): filter_str, db_file, n_nodes, mapping Raises: ValueError: when an empty df is encountered Returns: None, updates dataframe specified by db_file """ # print(args) filter_str = args[0] db_file = args[1] n_nodes = args[2] mapping = args[3] df = utils.get_circuit_data_batch(db_file, n_nodes, char_mapping=mapping, filter_str=filter_str) if df.empty: print('-------------------------------') print(utils.get_circuit_data_batch(db_file, n_nodes)) print("Filter String:", filter_str) raise ValueError("Empty Dataframe when there shouldn't be") # Mark up the set red.full_reduction(df) # Find equivalent circuits for the series reduced circuits equiv_cir = df['equiv_circuit'].values yes_series = np.logical_not(df['no_series'].values) for i in range(df.shape[0]): if yes_series[i]: row = df.iloc[i] equiv_cir[i] = find_equiv_cir_series(db_file, row['circuit'], row['edges'] ) # Update the table to_update = ["no_series", "filter", "in_non_iso_set", "equiv_circuit"] str_cols = ["equiv_circuit"] utils.update_db_from_df(db_file, df, to_update, str_cols) def add_hamiltonians_to_table(db_file: str, n_nodes: int, n_workers: int = 4, resume: bool = False): """ Adds hamiltonians to the specified db file Args: db_file (str): database file n_nodes (int): number of nodes to add for n_workers (int): parallelize the Hamiltonian generation to this many processes. resume (bool, optional): whether to resume a previously started run. this only grabs rows that don't have Hamiltonians yet. Raises: ValueError: if multiple circuits with the same unique key exist Returns: None """ # Add new columns if not resuming with sqlite3.connect(db_file) as con: cur = con.cursor() table_name = 'CIRCUITS_' + str(n_nodes) + '_NODES' if not resume: new_cols = ["n_periodic", "n_extended", "n_harmonic", "periodic", "extended", "harmonic"] new_cols += gen_func_combos_(n_nodes-1).keys() new_cols += [x+"_sym" for x in new_cols] new_cols = ["H", "H_sym", "coord_transform", "H_class", "H_class_sym", "nonlinearity_counts", "nonlinearity_counts_sym", "H_group", "H_group_sym"] + new_cols for col in new_cols: sql_str = f"ALTER TABLE {table_name}\n" if "n_" in col or "cos" in col or "sin" in col: sql_str += f"ADD {col} int DEFAULT 0" else: sql_str += f"ADD {col}" cur.execute(sql_str) con.commit() sql_query = f"SELECT DISTINCT unique_key\ FROM {table_name}\ WHERE in_non_iso_set LIKE 1\ AND filter LIKE 1" unique_keys_all = [x[0] for x in cur.execute(sql_query).fetchall()] n_total = len(unique_keys_all) # If we're resuming filter out those without H_class made if resume: sql_query += " AND H_class is null" unique_keys = [x[0] for x in cur.execute(sql_query).fetchall()] else: unique_keys = unique_keys_all # Randmize order because difficult ones tend to be near each other # This will give more accurate time estimates and spread parallel better np.random.shuffle(unique_keys) # Go through all the circuits and update rows with info args = list(zip(unique_keys, [db_file]*len(unique_keys))) if n_workers > 1: pool = Pool(processes=n_workers) for _ in tqdm(pool.imap_unordered(timed_out_, args), total=n_total, initial=n_total-len(unique_keys)): pass else: for arg_set in tqdm(args): gen_ham_row_(arg_set[0], arg_set[1]) # Sometimes it doesn't work and hangs :( # 60 Minute Timeout def timed_out_(args): timeout_min = 60 try: return func_timeout(60*timeout_min, gen_ham_row_, args) except FunctionTimedOut: print(f"Could not complete {args[0]} ({timeout_min} min timout)") except Exception as e: raise e def gen_ham_row_(uid: str, db_file: str): """ Helper function to generate the Hamiltonian for the given uid in the given db file. Args: uid (str): circuit unique key db_file (str): database file Raises: ValueError: Error with circuit database kbi: Keyboard interrupt """ # Load the graphs with the specified edges counts and graph index filter_str = f"WHERE unique_key LIKE '{uid}'" n_nodes = int(uid[1]) df = utils.get_circuit_data_batch(db_file, n_nodes, filter_str=filter_str) if df.shape[0] > 1: raise ValueError("Multiple Circuits on Unique Key") entry = df.iloc[0] # Generate the Hamiltonian try: H, trans, H_class, all_combos = gen_hamiltonian(entry.circuit, entry.edges, symmetric=False, return_combos=True) H_str = refine_latex(sym.latex(H)) info = categorize_hamiltonian(H) # Symmetrize the Hamiltonian C = sym.Symbol("C", positive=True, real=True) EJ = sym.Symbol("E_{J}", positive=True, real=True) CJ = sym.Symbol("C_{J}", positive=True, real=True) L = sym.Symbol("L", positive=True, real=True) H_sym = H.copy() for s in H.free_symbols: if "C_" in str(s) and "J" not in str(s): H_sym = H_sym.subs(s, C) elif "C_" in str(s) and "J" in str(s): H_sym = H_sym.subs(s, CJ) elif "L_" in str(s): H_sym = H_sym.subs(s, L) elif "E_{J" in str(s): H_sym = H_sym.subs(s, EJ) # Zero out terms to get the H_class H_class_sym = utils._remove_coeff(H_sym, all_combos) H_sym_str = refine_latex(sym.latex(H_sym)) info_sym = categorize_hamiltonian(H_sym) except KeyboardInterrupt as kbi: raise kbi except Exception as exc: print("-------------------------------------------") print("Unable to Generate Hamiltonian for:", uid) print(traceback.format_exc()) print(exc) print("-------------------------------------------") return # Set values to_update = ["H", "H_sym", "coord_transform", "H_class", "H_class_sym", "nonlinearity_counts", "nonlinearity_counts_sym", "H_group", "H_group_sym"] df.at[uid, "H"] = H_str df.at[uid, "H_sym"] = H_sym_str df.at[uid, "coord_transform"] = str(trans) df.at[uid, "H_class"] = refine_latex(sym.latex(H_class)) df.at[uid, "H_class_sym"] = refine_latex(sym.latex(H_class_sym)) for col in info: if col in df.columns: df.at[uid, col] = info[col] to_update.append(col) for col in info_sym: if col+"_sym" in df.columns: df.at[uid, col+"_sym"] = info_sym[col] to_update.append(col+"_sym") # Add nonlinearity counts nonlinearity_cols = [x for x in df.columns if "sin_" in x or "cos_" in x] nonlinearity_cols_sym = [x for x in nonlinearity_cols if "_sym" in x] nonlinearity_cols = [x for x in nonlinearity_cols if "_sym" not in x] nonlinearity_counts = "".join([str(int(x)) for x in df[nonlinearity_cols].loc[uid].values]) nonlinearity_counts_sym = "".join([str(int(x)) for x in df[nonlinearity_cols_sym].loc[uid].values]) df.at[uid, "nonlinearity_counts"] = nonlinearity_counts df.at[uid, "nonlinearity_counts_sym"] = nonlinearity_counts_sym # Update value in database utils.update_db_from_df(db_file, df, to_update, str_cols=["H", "H_sym", "periodic", "extended", "harmonic", "coord_transform", "periodic_sym", "extended_sym", "harmonic_sym", "H_class", "H_class_sym", "nonlinearity_counts", "nonlinearity_counts_sym", "H_group", "H_group_sym"])
[docs]def gen_hamiltonian(circuit: list, edges: list, symmetric: bool = False, cob: sym.Matrix = None, var_class: dict = None, return_combos: bool = False, basis_completion: str = "heuristic") -> tuple: """ Generate a SymPy Hamiltonian for the specified circuit. This function uses `scqubits` to determine an appropriate variable transformation. .. note:: External fluxes and charges are not currently supported. Parameters ---------- circuit : list A list of element labels defining the desired circuit. Example: ``[["J"], ["L", "J"], ["C"]]``. edges : list A list of edge connections defining the circuit topology. Example: ``[(0,1), (0,2), (1,2)]``. symmetric : bool, optional Whether to set all capacitances, inductances, and Josephson energies equal. This may result in the loss of some terms. Default is ``False``. cob : sympy.Matrix, optional An optional variable transformation matrix. If not provided, the Z transformation matrix from `scqubits` is used. var_class : dict, optional Required if providing a custom variable transformation. Should be a dictionary similar to scqubits's `var_categories`, with keys "free", "frozen", "periodic", and "extended" to classify the variables. return_combos : bool, optional Whether to return the combinations of variables present. Default is False. basis_completion : str, optional Basis completion option for `scqubits`. Returns ------- A tuple containing: - **hamiltonian: sympy.Add** Symbolic Hamiltonian where periodic modes are labeled by `n` and extended variables by `q`. - **transformation_matrix: numpy.ndarray** Coordinate transformation matrix (expressing new variables in terms of node variables). - **hamiltonian_class: sympy.Add** Hamiltonian with all constants removed. - optionally combinations of variables present """ elems = { 'C': {'default_unit': 'GHz', 'default_value': 0.2}, 'L': {'default_unit': 'GHz', 'default_value': 1.0}, 'J': {'default_unit': 'GHz', 'default_value': 15.0}, 'CJ': {'default_unit': 'GHz', 'default_value': 500.0} } params = utils.gen_param_dict(circuit, edges, elems) if not symmetric: # Set random values to avoid unintentionally # deleting terms for edge, comp in params: if comp in ["C", "L"]: param_range = (0.1, 1) else: param_range = (1, 20) params[(edge, comp)] = (np.random.uniform(*param_range), "GHz") if cob is None: # Use scQubits to get a transformation Matrix obj = pi.to_SCqubits(circuit, edges, params=params, sym_cir=True, initiate_sym_calc=False, basis_completion=basis_completion) # Get symbolic Hamiltonian and add final free mode # as a given cob, var_class = obj.variable_transformation_matrix() var_class["free"] += [utils.get_num_nodes(edges)] elif var_class is None: raise ValueError("Must include variable classification with cob matrix") # Un-symmetrize the edges if that's what's requested if not symmetric: new_circuit = [] counts = {"J": 0, "L": 0, "C": 0} for elems in circuit: new_elems = [] for elem in elems: new_elems.append(f"{elem}_{counts[elem] + 1}") counts[elem] += 1 new_circuit.append(new_elems) circuit = new_circuit H, H_class, all_combos = quantize.quantize_circuit(circuit, edges, cob=sym.Matrix(cob), **var_class, return_H_class=True, return_combos=True, collect_phase=True) to_return = (H, cob, H_class) if return_combos: to_return = to_return + (all_combos,) return to_return
def assign_H_groups(db_file: str, n_nodes: int, n_workers: int = 1, resume: bool = False) -> None: """ Assigns Hamiltonians in the database into groups based on the functional form of the linear and nonlinear parts of their hamiltonians Args: db_file (str): path to database file n_nodes (int): number of nodes to examine n_workers (int, optional): Number of workers to use. Defaults to 1. """ # Figure out where to start if resuming if resume: table_name = 'CIRCUITS_' + str(n_nodes) + '_NODES' columns = utils.list_all_columns(db_file, table_name) H_group_started = "H_group" in columns H_group_sym_started = "H_group_sym" in columns else: H_group_started = False H_group_sym_started = False # Get the unique nonlinearity counts strings with sqlite3.connect(db_file) as con: cur = con.cursor() table_name = 'CIRCUITS_' + str(n_nodes) + '_NODES' sql_str = f"SELECT DISTINCT nonlinearity_counts \ FROM {table_name}\ WHERE in_non_iso_set LIKE 1\ AND filter LIKE 1" unique_counts = [x for x in cur.execute(sql_str).fetchall()] n_counts = len(unique_counts) sql_str_sym = f"SELECT DISTINCT nonlinearity_counts_sym \ FROM {table_name}\ WHERE in_non_iso_set LIKE 1\ AND filter LIKE 1" unique_counts_sym = [x for x in cur.execute(sql_str_sym).fetchall()] n_counts_sym = len(unique_counts) if resume: if H_group_sym_started: sql_str_sym = sql_str[:] sql_str_sym += " AND H_group_sym is null" unique_counts_sym = [x for x in cur.execute(sql_str_sym).fetchall()] elif H_group_started: sql_str += " AND H_group is null" unique_counts = [x for x in cur.execute(sql_str).fetchall()] print("Total Groups:", n_counts) # Filter out none values from circuits that timed out in # quantization unique_counts = [x[0] for x in unique_counts if x is not None] unique_counts_sym = [x[0] for x in unique_counts_sym if x is not None] # Shuffle for accurate runtime estimates np.random.shuffle(unique_counts) np.random.shuffle(unique_counts_sym) # Make the pool if we're parallel if n_workers > 1: pool = Pool(processes=n_workers) # Do non-symmetric first # args are db_file, n_nodes, nl_cnt, symmetric, mapping n_entries = len(unique_counts) if not H_group_sym_started: print("Full Hamiltonians...") args = list(zip([db_file]*n_entries, [n_nodes]*n_entries, unique_counts, [False]*n_entries, [utils.ENUM_PARAMS["CHAR_TO_COMBINATION"]]*n_entries)) if n_workers > 1: for _ in tqdm(pool.imap_unordered(unique_hams_for_count_, args), total=n_counts, initial=n_counts-n_entries): pass else: for arg_set in args: unique_hams_for_count_(arg_set) # Now do symmetric print("Symmetric Hamiltonians...") n_entries = len(unique_counts_sym) args = list(zip([db_file]*n_entries, [n_nodes]*n_entries, unique_counts_sym, [True]*n_entries, [utils.ENUM_PARAMS["CHAR_TO_COMBINATION"]]*n_entries)) if n_workers > 1: for _ in tqdm(pool.imap_unordered(unique_hams_for_count_, args), total=n_counts_sym, initial=n_counts_sym-n_entries): pass else: for arg_set in args: unique_hams_for_count_(arg_set) def unique_hams(hams: list[str], group_base: str = "", normalize_sign: bool = True): """ Identifies Hamiltonians in a set that differ by relabelling variables. Assigns each entry to a group. Args: hams (list[str]): list of Hamiltonians, loaded from the database group_base (str, optional): optional prefix for group name. Defaults to "". normalize_sign (bool, optional): Whether to make all terms positive in H_class. Defaults to True. Returns: list[str]: list of unique hamiltonian strings list[str]: group labels for each entry """ reduced = [] groups = [] # Examine every row in the set group_n = 1 for l_str in hams: is_dup = False # Make the string parsable by sympy l_str = l_str.replace("\\hat{" + quantize.EXTENDED_CHARGE + "}", "Q") l_str = l_str.replace("\\hat{" + quantize.PERIODIC_CHARGE + "}", "n") l_str = l_str.replace("\\hat{" + quantize.EXTENDED_PHASE + "}", "F") l_str = l_str.replace("\\hat{" + quantize.PERIODIC_PHASE + "}", "p") H_base = parse_latex(l_str) # Get the Phase and Charge terms q_list = [q for q in H_base.free_symbols if "Q" in str(q)] n_list = [q for q in H_base.free_symbols if "n" in str(q)] f_list = [q for q in H_base.free_symbols if "F" in str(q)] p_list = [q for q in H_base.free_symbols if "p" in str(q)] # Alternative A, B, C terms q_list_alt = np.array([sym.Symbol(f"Q_{chr(ord('A') + int(n))}") for n in range(len(q_list))]) n_list_alt = np.array([sym.Symbol(f"n_{chr(ord('A') + int(n))}") for n in range(len(n_list))]) f_list_alt = np.array([sym.Symbol(f"F_{chr(ord('A') + int(n))}") for n in range(len(f_list))]) p_list_alt = np.array([sym.Symbol(f"p_{chr(ord('A') + int(n))}") for n in range(len(p_list))]) # Possible assignments of 1, 2, 3 -> A, B, C q_ass = itertools.permutations(range(len(q_list)), len(q_list)) n_ass = itertools.permutations(range(len(n_list)), len(n_list)) f_ass = itertools.permutations(range(len(f_list)), len(f_list)) p_ass = itertools.permutations(range(len(p_list)), len(p_list)) # Try every permutation of A, B, C -> 1, 2, 3 for combo in itertools.product(q_ass, n_ass, f_ass, p_ass): # Make it a list for indexing combo = [list(x) for x in combo] # Test out the specific permutation H_test = H_base.copy() # Replace 1, 2, 3 with A, B, C if combo[0]: for x1, x2 in zip(q_list, q_list_alt[combo[0]]): H_test = H_test.subs(x1, x2) if combo[1]: for x1, x2 in zip(n_list, n_list_alt[combo[1]]): H_test = H_test.subs(x1, x2) if combo[2]: for x1, x2 in zip(f_list, f_list_alt[combo[2]]): H_test = H_test.subs(x1, x2) if combo[3]: for x1, x2 in zip(p_list, p_list_alt[combo[3]]): H_test = H_test.subs(x1, x2) # Check if this permutation is in the reduced set already for i, H_ref in enumerate(reduced): if H_ref is None: continue if H_test - H_ref == 0: is_dup = True dup_group = groups[i] break if is_dup: break if is_dup: reduced.append(None) groups.append(dup_group) if not is_dup: reduced.append(H_test) groups.append(group_base + f"_{group_n}") group_n += 1 return reduced, groups def unique_hams_in_df(df: pd.DataFrame, symmetric: bool, normalize_sign: bool = True): """ Marks unique Hamiltonian classes by creating the H_group column. Meant to be provided a dataframe containing values for a single nonlinearity counts. Catches entries with the same H_class and those that differ by renumbering variables. Args: df (pd.DataFrame): dataframe containing circuit entries for a single value of nonlinearity_counts symmetric (bool): whether to examine the "_sym" columns or not. normalize_sign (bool, optional): Whether to make all terms positive in H_class. Defaults to True. Returns: group_col_name: name of column added to input df """ if symmetric: l_str_vec = df["H_class_sym"].values nl_cnt = df["nonlinearity_counts_sym"].iloc[0] else: l_str_vec = df["H_class"].values nl_cnt = df["nonlinearity_counts"].iloc[0] if normalize_sign: l_str_vec = [x.replace("-", "+") for x in l_str_vec] # Catch the obviously same ones, i.e. # the H_str is the exact same unique_str, index, inv = np.unique(l_str_vec, return_index=True, return_inverse=True) # Catch the ones with variables labeled differently reduced, groups = unique_hams(unique_str, group_base=nl_cnt, normalize_sign=normalize_sign) groups = np.array(groups) group_col = groups[inv] if symmetric: group_col_name = "H_group_sym" df[group_col_name] = group_col else: group_col_name = "H_group" df[group_col_name] = group_col return group_col_name def unique_hams_for_count_(args): db_file, n_nodes, nl_cnt, symmetric, mapping = args if symmetric: col_name = "nonlinearity_counts_sym" else: col_name = "nonlinearity_counts" filter_str = f"WHERE {col_name} LIKE '{nl_cnt}'" df = utils.get_circuit_data_batch(db_file, n_nodes, char_mapping=mapping, filter_str=filter_str) # if df.shape[0] == 0: # raise ValueError("No entries found for nl count") group_col_name = unique_hams_in_df(df, symmetric) utils.update_db_from_df(db_file, df, to_update=[group_col_name], str_cols=[group_col_name] ) return def gen_func_combos_(n_modes: int) -> dict: """ Helper function that generates all combinations of sin/cos given a maximum power Args: n_modes (int): Number of modes in the circuit (i.e. max power) Returns: dict: dictionary with combos as keys and 0 as values """ info = {} for n in range(1, n_modes+1): for type_combo in itertools.product(["p", "e"], repeat=n): for func_combo in itertools.product(["cos", "sin"], repeat=n): # Count the functions present counts = {} for combo in zip(func_combo, type_combo): if combo in counts: counts[combo] += 1 else: counts[combo] = 1 # Add the field in the dictionary combos = [] for combo in counts: combos += [combo]*counts[combo] # Sort alphabetically for consistency order = np.sort(["_".join(x) for x in combos]) info_str = "_".join(order) info[info_str] = 0 return info def categorize_hamiltonian(H: sym.core.Add): """ Categorizes a Hamiltonian according to the nonlinearities present. Assumes frozen and free modes have already been removed. Args: H (sympy.core.Add): sympy Hamiltonian, generated from gen_hamiltonian Returns: info: Dictionary counting the nonlinearities present. Considers every possibility of cos/sin and extended/periodic variables. Should be 4 choices with one modes, 10 choices with two modes, and 20 with three modes. """ # Expand H to make searching easier H_test = sym.expand(H) # List of variable types theta_list = [th for th in H.free_symbols if (quantize.PERIODIC_PHASE in str(th) or quantize.EXTENDED_PHASE in str(th) or quantize.NODE_PHASE in str(th)) and quantize.EXT_PHASE not in str(th)] # Information about the hamiltonian n_modes = len(theta_list) info = {"n_modes": n_modes, "periodic": [], "extended": [], "harmonic": []} # Categorize Modes: types = {} funcs = set() [[funcs.add(f) for f in x.atoms(sym.Function)] for x in H_test.atoms(sym.Mul)] for th in theta_list: mode_num = "".join([x for x in str(th) if x.isnumeric()]) if quantize.PERIODIC_PHASE in str(th): info["periodic"].append(mode_num) types[str(th)] = "p" elif sym.cos(th) in funcs or sym.sin(th) in funcs: info["extended"].append(mode_num) types[str(th)] = "e" else: info["harmonic"].append(mode_num) types[str(th)] = "h" # Sort mode list for var_type in ["periodic", "extended", "harmonic"]: info[var_type] = sorted(info[var_type]) # Add counts info["n_periodic"] = len(info["periodic"]) info["n_extended"] = len(info["extended"]) info["n_harmonic"] = len(info["harmonic"]) # Products of sin/cos up to n_modes info.update(gen_func_combos_(n_modes)) # Count the nonlinear terms funcs = set(functools.reduce(lambda x, y: x*y, list(x.atoms(sym.Function))) for x in H_test.atoms(sym.Mul) if len(x.atoms(sym.Function)) > 0) # n = number of nonlinear terms for n in range(1, n_modes+1): # th_combo = list of variables for th_combo in itertools.product(theta_list, repeat=n): # Variable types th_types = [types[str(th)] for th in th_combo] # All different combinations of sin's and cos # of the two thetas for bar in range(n+1): term = 1 # Cos terms for i in range(bar): term *= sym.cos(th_combo[i]) # Sin Terms for i in range(bar, n): term *= sym.sin(th_combo[i]) # Check if term was present if term in funcs: info_str = ["_".join(x) for x in zip(["cos"]*bar, th_types[:bar])] info_str += ["_".join(x) for x in zip(["sin"]*(n-bar), th_types[bar:])] info_str = "_".join(np.sort(info_str)) info[info_str] += 1 funcs.remove(term) return info def refine_latex(latex_str): """ Adds hats to operators, and removes cdots before parenthesis Args: latex_str (str): string of the latex math Returns: str: copy of the latex_str with the modifications done """ latex_str = latex_str.replace(r"\cdot \left(", r"\left(") return latex_str def generate_and_trim(n_nodes: int, db_file: str = "circuits.db", base: int = None, n_workers: int = 1, resume: bool = False): """ Generates circuits for all graphs for a given number of nodes Then trims identical circuits from database. Stores circuits in sql database Args: n_nodes (int): Number of nodes for table db_file (str): sql database to store data in base (int): The number of possible edges. By default this is 7: (i.e., J, C, I, JI, CI, JC, JCI) n_workers (int): The number of workers to use. Default 1. resume (bool): Resuming a run or not """ if base is None: base = len(utils.ENUM_PARAMS["CHAR_TO_COMBINATION"]) # Check if Hamiltonians have started or not if resume: table_name = f'CIRCUITS_{n_nodes}_NODES' columns = utils.list_all_columns(db_file, table_name) H_started = "H_class" in columns H_group_started = "H_group" in columns H_group_sym_started = "H_group_sym" in columns if H_started and not H_group_started: print("---------------------------------------") print("Resuming at Hamiltonian Phase") print("---------------------------------------") elif H_started and H_group_started and not H_group_sym_started: print("---------------------------------------") print("Resuming at Full H Group") print("---------------------------------------") elif H_started and H_group_started and H_group_sym_started: print("---------------------------------------") print("Resuming at Symmetric H Group") print("---------------------------------------") # Pre-Hamiltonian Steps are Fast if (not resume) or (not H_started): print("----------------------------------------") print('Starting generating ' + str(n_nodes) + ' node circuits.') generate_graphs_node(db_file, n_nodes, base) print("Circuits Generated for " + str(n_nodes) + " node circuits.") print("Now Trimming.") trim_graph_node(db_file=db_file, n_nodes=n_nodes, base=base, n_workers=n_workers) print("Finished trimming " + str(n_nodes) + " node circuits.") # Hamiltonian is the slow part if (not resume) or (not H_group_started): print("Appending Hamiltonians to " + str(n_nodes) + " node circuits.") add_hamiltonians_to_table(db_file=db_file, n_nodes=n_nodes, n_workers=n_workers, resume=resume) # print("Categorizing Linear Portion of Hamiltonians for " + str(n_nodes) + " node circuits.") # assign_H_groups(db_file=db_file, n_nodes=n_nodes, n_workers=n_workers, # resume=resume) # print("Categorizing Non-Linear Portion of Hamiltonians for " + str(n_nodes) + " node circuits.") # Max 10 workers because this is fast and db conflicts assign_H_groups(db_file=db_file, n_nodes=n_nodes, n_workers=n_workers, resume=False) return True
[docs]def generate_all_circuits(db_file: str = "circuits.db", n_nodes_start: int = 2, n_nodes_stop: int = 4, base: int = None, n_workers: int = 1, resume: bool = False, quiet: bool = True) -> None: """ Generate all circuits with node counts between `n_nodes_start` and `n_nodes_stop`. This function generates circuits with varying numbers of nodes, removes duplicate circuits, and stores both the full set and the deduplicated set in an SQL database. Parameters ---------- file : str Path to the SQL database file where the generated circuits will be stored. n_nodes_start : int Minimum number of nodes to generate circuits for. n_nodes_stop : int Maximum number of nodes to generate circuits for. base : int, optional The number of possible edge types. Defaults to ``7``, corresponding to: ``J, C, L, JL, CL, JC, JCL``. n_workers : int, optional The number of workers to use for circuit generation. Defaults to ``1``. """ if base is None: base = len(utils.ENUM_PARAMS["CHAR_TO_COMBINATION"]) if not quiet: print("---------------------------------------") print("---------------------------------------") print("Starting Circuit Enumeration") print("db_file:", db_file) print("n_nodes_start:", n_nodes_start) print("n_nodes_stop:", n_nodes_stop) print("base:", base) print("n_workers:", n_workers) print("resume:", resume) print("---------------------------------------") print("---------------------------------------") # Determine number of nodes to start at if resume: tables = utils.list_all_tables(db_file) if f'CIRCUITS_{n_nodes_stop}_NODES' in tables: n_nodes_start = n_nodes_stop else: for n in range(n_nodes_start, n_nodes_stop+1): if f'CIRCUITS_{n}_NODES' not in tables: n_nodes_start = n - 1 if not quiet: print("---------------------------------------") print("Resuming enumeration at", n_nodes_start, "nodes") print("---------------------------------------") for n in range(n_nodes_start, n_nodes_stop+1): if not quiet: tqdm.__init__ = functools.partialmethod(tqdm.__init__, disable=False) generate_and_trim(n, db_file=db_file, base=base, n_workers=n_workers, resume=resume) else: with contextlib.redirect_stdout(None): tqdm.__init__ = functools.partialmethod(tqdm.__init__, disable=True) generate_and_trim(n, db_file=db_file, base=base, n_workers=n_workers, resume=resume)