Source code for darts.models.darts_model

from math import fabs
import pickle
import os
import numpy as np

from darts.engines import timer_node, sim_params, value_vector, index_vector, op_vector, ms_well_vector
from darts.engines import print_build_info as engines_pbi
from darts.discretizer import print_build_info as discretizer_pbi
from darts.print_build_info import print_build_info as package_pbi


[docs] class DartsModel: """ This is a base class for creating a model in DARTS. A model is composed of a :class:`darts.models.Reservoir` object and a `darts.physics.Physics` object. Initialization and communication between these two objects takes place through the Model object :ivar reservoir: Reservoir object :type reservoir: :class:`ReservoirBase` :ivar physics: Physics object :type physics: :class:`PhysicsBase` """
[docs] def __init__(self): """" Initialize DartsModel class. :ivar timer: Timer object :type timer: :class:`darts.engines.timer_node` :ivar params: Object to set simulation parameters :type params: :class:`darts.engines.sim_params` """ # print out build information engines_pbi() discretizer_pbi() package_pbi() self.timer = timer_node() # Create time_node object for time record self.timer.start() # Start time record self.timer.node["simulation"] = timer_node() # Create timer.node called "simulation" to record simulation time self.timer.node["newton update"] = timer_node() self.timer.node[ "initialization"] = timer_node() # Create timer.node called "initialization" to record initialization time self.timer.node["initialization"].start() # Start recording "initialization" time self.params = sim_params() # Create sim_params object to set simulation parameters self.timer.node["initialization"].stop() # Stop recording "initialization" time
[docs] def init(self): """ Function to initialize the model, which includes: - initialize well (perforation) position - initialize well rate parameters - initialize reservoir initial conditions - initialize well control settings - define list of operator interpolators for accumulation-flux regions and wells - initialize engine """ self.reservoir.init_wells() self.physics.init_wells(self.reservoir.wells) self.set_initial_conditions() self.set_boundary_conditions() self.set_op_list() self.reset()
[docs] def reset(self): """ Function to initialize the engine by calling 'physics.engine.init()' method. """ self.physics.engine.init(self.reservoir.mesh, ms_well_vector(self.reservoir.wells), op_vector(self.op_list), self.params, self.timer.node["simulation"])
[docs] def set_physics(self): """ Function to define properties and regions and initialize :class:`Physics` object. This function is virtual in DartsModel, needs to be defined in child Model. """ pass
[docs] def set_wells(self): """ Function to define wells and initialize :class:`Reservoir` object. This function is virtual in DartsModel, needs to be defined in child Model. """ pass
[docs] def set_initial_conditions(self): """ Function to set initial conditions. Passes initial conditions to :class:`Physics` object. This function is virtual in DartsModel, needs to be defined in child Model. """ pass
[docs] def set_boundary_conditions(self): """ Function to set boundary conditions. Passes boundary conditions to :class:`Physics` object and wells. This function is virtual in DartsModel, needs to be defined in child Model. """ pass
[docs] def set_op_list(self): """ Function to define list of operator interpolators for accumulation-flux regions and wells. Operator list is in order [acc_flux_itor[0], ..., acc_flux_itor[n-1], acc_flux_w_itor] """ if type(self.physics.acc_flux_itor) == dict: self.op_list = [acc_flux_itor for acc_flux_itor in self.physics.acc_flux_itor.values()] + [self.physics.acc_flux_w_itor] self.op_num = np.array(self.reservoir.mesh.op_num, copy=False) # self.op_num[self.reservoir.nb:] = len(self.op_list) - 1 self.op_num[self.reservoir.mesh.n_res_blocks:] = len(self.op_list) - 1 else: # for backward compatibility self.op_list = [self.physics.acc_flux_itor]
[docs] def set_sim_params(self, first_ts: float = None, mult_ts: float = None, max_ts: float = None, runtime: float = 1000, tol_newton: float = None, tol_linear: float = None, it_newton: int = None, it_linear: int = None, newton_type=None, newton_params=None): """ Function to set simulation parameters. :param first_ts: First timestep :type first_ts: float :param mult_ts: Timestep multiplier :type mult_ts: float :param max_ts: Maximum timestep :type max_ts: float :param runtime: Total runtime in days, default is 1000 :type runtime: float :param tol_newton: Tolerance for Newton iterations :type tol_newton: float :param tol_linear: Tolerance for linear iterations :type tol_linear: float :param it_newton: Maximum number of Newton iterations :type it_newton: int :param it_linear: Maximum number of linear iterations :type it_linear: int :param newton_type: :param newton_params: """ self.params.first_ts = first_ts if first_ts is not None else self.params.first_ts self.params.mult_ts = mult_ts if mult_ts is not None else self.params.mult_ts self.params.max_ts = max_ts if max_ts is not None else self.params.max_ts self.runtime = runtime # Newton tolerance is relatively high because of L2-norm for residual and well segments self.params.tolerance_newton = tol_newton if tol_newton is not None else self.params.tolerance_newton self.params.tolerance_linear = tol_linear if tol_linear is not None else self.params.tolerance_linear self.params.max_i_newton = it_newton if it_newton is not None else self.params.max_i_newton self.params.max_i_linear = it_linear if it_linear is not None else self.params.max_i_linear self.params.newton_type = newton_type if newton_type is not None else self.params.newton_type self.params.newton_params = newton_params if newton_params is not None else self.params.newton_params
[docs] def run(self, days: float = None): runtime = days if days is not None else self.runtime self.physics.engine.run(runtime)
[docs] def run_python(self, days: float, restart_dt: float = 0, timestep_python: bool = False): mult_dt = self.params.mult_ts max_dt = self.params.max_ts self.e = self.physics.engine # get current engine time t = self.e.t # same logic as in engine.run if fabs(t) < 1e-15: dt = self.params.first_ts elif restart_dt > 0: dt = restart_dt else: dt = self.params.max_ts # evaluate end time runtime = t + days ts = 0 while t < runtime: if timestep_python: converged = self.e.run_timestep(dt, t) else: converged = self.run_timestep_python(dt, t) if converged: t += dt ts = ts + 1 print("# %d \tT = %3g\tDT = %2g\tNI = %d\tLI=%d" % (ts, t, dt, self.e.n_newton_last_dt, self.e.n_linear_last_dt)) dt *= mult_dt if dt > max_dt: dt = max_dt if t + dt > runtime: dt = runtime - t else: dt /= mult_dt print("Cut timestep to %2.3f" % dt) if dt < 1e-8: break # update current engine time self.e.t = runtime print("TS = %d(%d), NI = %d(%d), LI = %d(%d)" % (self.e.stat.n_timesteps_total, self.e.stat.n_timesteps_wasted, self.e.stat.n_newton_total, self.e.stat.n_newton_wasted, self.e.stat.n_linear_total, self.e.stat.n_linear_wasted))
[docs] def run_timestep_python(self, dt, t): max_newt = self.params.max_i_newton max_residual = np.zeros(max_newt + 1) self.e.n_linear_last_dt = 0 well_tolerance_coefficient = 1e2 self.timer.node['simulation'].start() for i in range(max_newt+1): self.e.run_single_newton_iteration(dt) self.e.newton_residual_last_dt = self.e.calc_newton_residual() max_residual[i] = self.e.newton_residual_last_dt counter = 0 for j in range(i): if abs(max_residual[i] - max_residual[j])/max_residual[i] < 1e-3: counter += 1 if counter > 2: print("Stationary point detected!") break self.e.well_residual_last_dt = self.e.calc_well_residual() self.e.n_newton_last_dt = i # check tolerance if it converges if ((self.e.newton_residual_last_dt < self.params.tolerance_newton and self.e.well_residual_last_dt < well_tolerance_coefficient * self.params.tolerance_newton) or self.e.n_newton_last_dt == self.params.max_i_newton): if i > 0: # min_i_newton break r_code = self.e.solve_linear_equation() self.timer.node["newton update"].start() self.e.apply_newton_update(dt) self.timer.node["newton update"].stop() # End of newton loop converged = self.e.post_newtonloop(dt, t) self.timer.node['simulation'].stop() return converged
[docs] def output_properties(self): """ Function to return array of properties. Primary variables (vars) are obtained from engine, secondary variables (props) are interpolated by property_itor. :returns: property_array :rtype: np.ndarray """ # Initialize property_array n_vars = self.physics.n_vars n_props = self.physics.n_props tot_props = n_vars + n_props property_array = np.zeros((self.reservoir.nb, tot_props)) # Obtain primary variables from engine for j in range(n_vars): property_array[:, j] = self.physics.engine.X[j:self.reservoir.nb * n_vars:n_vars] # If it has been defined, interpolate secondary variables in property_itor, if self.physics.property_operators is not None: values = value_vector(np.zeros(self.physics.n_ops)) for i in range(self.reservoir.nb): state = [] for j in range(n_vars): state.append(property_array[i, j]) state = value_vector(np.asarray(state)) self.physics.property_itor.evaluate(state, values) for j in range(n_props): property_array[i, j + n_vars] = values[j] return property_array
[docs] def export_vtk(self, file_name: str = 'data', local_cell_data: dict = {}, global_cell_data: dict = {}, vars_data_dtype: type = np.float32, export_grid_data: bool = True): """ Function to export results at timestamp t into `.vtk` format. :param file_name: Name to save .vtk file :type file_name: str :param local_cell_data: Local cell data (active cells) :type local_cell_data: dict :param global_cell_data: Global cell data (all cells including actnum) :type global_cell_data: dict :param vars_data_dtype: :type vars_data_dtype: type :param export_grid_data: :type export_grid_data: bool """ # get current engine time t = self.physics.engine.t nb = self.reservoir.mesh.n_res_blocks nv = self.physics.n_vars X = np.array(self.physics.engine.X, copy=False) for v in range(nv): local_cell_data[self.physics.vars[v]] = X[v:nb * nv:nv].astype(vars_data_dtype) self.reservoir.export_vtk(file_name, t, local_cell_data, global_cell_data, export_grid_data)
[docs] def load_restart_data(self, filename: str = 'restart.pkl'): """ Function to load data from previous simulation and uses them for following simulation. :param filename: restart_data filename :type filename: str """ if os.path.exists(filename): with open(filename, "rb") as fp: data = pickle.load(fp) days, X, arr_n = data self.physics.engine.t = days self.physics.engine.X = value_vector(X) self.physics.engine.Xn = value_vector(X) self.physics.engine.op_vals_arr_n = value_vector(arr_n)
[docs] def save_restart_data(self, filename: str = 'restart.pkl'): """ Function to save the simulation data for restart usage. :param filename: Name of the file where restart_data stores. :type filename: str """ t = np.copy(self.physics.engine.t) X = np.copy(self.physics.engine.X) arr_n = np.copy(self.physics.engine.op_vals_arr_n) data = [t, X, arr_n] with open(filename, "wb") as fp: pickle.dump(data, fp, 4)
[docs] def print_timers(self): """ Function to print the time information, including total time elapsed, time consumption at different stages of the simulation, etc.. """ print(self.timer.print("", ""))
[docs] def print_stat(self): """ Function to print the statistics information, including total timesteps, Newton iteration, linear iteration, etc.. """ self.physics.engine.print_stat()
# destructor to force to destroy all created C objects and free memory def __del__(self): for name in list(vars(self).keys()): delattr(self, name)