Source code for pygimli.physics.traveltime.TravelTimeManager

#!/usr/bin/env python

"""Class for managing first arrival travel time inversions."""
from pathlib import Path
import numpy as np

import pygimli as pg
from pygimli.frameworks import MeshMethodManager

from pygimli.utils import getSavePath
from . modelling import TravelTimeDijkstraModelling, FatrayDijkstraModelling
from . plotting import drawFirstPicks


[docs] class TravelTimeManager(MeshMethodManager): """Manager for refraction seismics (traveltime tomography). TODO Document main members and use default MethodManager interface e.g., self.inv, self.fop, self.paraDomain, self.mesh, self.data """
[docs] def __init__(self, data=None, **kwargs): """Create an instance of the Traveltime manager. Parameters ---------- data: :gimliapi:`GIMLI::DataContainer` | str You can initialize the Manager with data or give them a dataset when calling the inversion. """ self.useFatray = kwargs.pop("fatray", False) self.frequency = kwargs.pop("frequency", 100.) self.secNodes = kwargs.pop("secNodes", 2) super().__init__(data=data, **kwargs) self.inv.dataTrans = pg.trans.Trans()
@property def velocity(self): """Return velocity vector (the inversion model).""" # we can check here if there was an inversion run return self.fw.model # shouldn't it be the inverse?
[docs] def createForwardOperator(self, **kwargs): """Create default forward operator for Traveltime modelling. Your want your Manager use a special forward operator you can add them here on default Dijkstra is used. """ if self.useFatray: fop = FatrayDijkstraModelling(frequency=self.frequency, **kwargs) else: fop = TravelTimeDijkstraModelling(verbose=self.verbose) return fop
[docs] def load(self, fileName): """Load any supported data file.""" self.data = pg.physics.traveltime.load(fileName) return self.data
[docs] def createMeshMovedToMeshManager(self, data=None, **kwargs): """Create default inversion mesh. Inversion mesh for traveltime inversion does not need boundary region. Args ---- data: DataContainer Data container to read sensors from. Keyword Args ------------ Forwarded to `:py:func:pygimli.meshtools.createParaMesh` """ pg._error('do not use. will be removed soon') #25032025 data = data or self.data if not hasattr(data, 'sensors'): pg.critical('Please provide a data container for mesh generation') return pg.meshtools.createParaMesh(data.sensors(), paraBoundary=0, boundary=0, **kwargs)
[docs] def checkData(self, data): """Return data from container.""" if isinstance(data, pg.DataContainer): if not data.haveData('t'): pg.critical('DataContainer has no "t" values.') return data['t'] return data
[docs] def checkError(self, err, dataVals): """Return relative error.""" if isinstance(err, pg.DataContainer): if not err.haveData('err'): pg.error('DataContainer has no "err" values. Fallback to 3%') return np.ones(err.size()) * 0.03 return err['err'] / dataVals return err
[docs] def applyMesh(self, mesh, secNodes=None, ignoreRegionManager=False): """Apply mesh, i.e. set mesh in the forward operator class.""" if secNodes is None: secNodes = self.secNodes self.fop._refineSecNodes = secNodes if secNodes > 0 and ignoreRegionManager: mesh = self.fop.createRefinedFwdMesh(mesh) self.fop.setMesh(mesh, ignoreRegionManager=ignoreRegionManager) self.fop.jacobian().clear()
[docs] def simulate(self, mesh=None, scheme=None, slowness=None, vel=None, seed=None, secNodes=2, noiseLevel=0.0, noiseAbs=0.0, **kwargs): """Simulate traveltime measurements. Perform the forward task for a given mesh, a slowness distribution (per cell) and return data (traveltime) for a measurement scheme. Parameters ---------- mesh : :gimliapi:`GIMLI::Mesh` Mesh to calculate for or use the last known mesh. scheme: :gimliapi:`GIMLI::DataContainer` Data measurement scheme needs 's' for shot and 'g' for geophone data token. slowness : array(mesh.cellCount()) | array(N, mesh.cellCount()) Slowness distribution for the given mesh cells can be: * a single array of len mesh.cellCount() * a matrix of N slowness distributions of len mesh.cellCount() * a slowness map [[marker0, slow0], [marker1, slow1], ...] vel : array(mesh.cellCount()) | array(N, mesh.cellCount()) Velocity distribution for the mesh cells (overwrites slowness!). secNodes: int [2] Number of refinement nodes to increase accuracy of the forward calculation. noiseLevel: float [0.0] Add relative noise to the simulated data. noiseLevel*100 in % noiseAbs: float [0.0] Add absolute noise to the simulated data in ms. seed: int [None] Seed the random generator for the noise. Keyword Arguments ----------------- returnArray: [False] Return only the calculated times. verbose: [self.verbose] Overwrite verbose level. **kwargs Additional kwargs ... Returns ------- t : array(N, data.size()) | DataContainer The resulting simulated travel time values (returnArray=True) or DataContainer containing them in t field (returnArray=False). Either one column array or matrix in case of slowness matrix. """ verbose = kwargs.pop('verbose', self.verbose) fop = self.fop scheme = scheme or self.data fop.data = scheme fop.verbose = verbose if mesh is not None: self.applyMesh(mesh, secNodes=secNodes, ignoreRegionManager=True) if vel is not None: slowness = 1/vel if slowness is None: pg.critical("Need some slowness or velocity distribution for" " simulation.") if len(slowness) == self.fop.mesh().cellCount(): t = fop.response(slowness) if verbose: print('min/max t:', min(t), max(t)) else: print(self.fop.mesh()) print("slowness: ", slowness) pg.critical("Simulate called with wrong slowness array.") ret = pg.DataContainer(scheme) if noiseLevel > 0 or noiseAbs > 0: if not ret.allNonZero('err'): err = noiseAbs + t * noiseLevel ret['err'] = err pg.verbose("Absolute error estimates (min:max) {}:{}".format( min(ret['err']), max(ret['err']))) t += pg.randn(ret.size(), seed=seed) * ret('err') if kwargs.pop('returnArray', False) is True: return t ret['t'] = t return ret
[docs] def invert(self, data=None, useGradient=True, vTop=500, vBottom=5000, secNodes=None, **kwargs): """Invert data. Parameters ---------- data : pg.DataContainer() Data container with at least SensorIndices 's g' (shot/geophone) & data values 't' (traveltime in s) and 'err' (absolute error in s) useGradient: bool [True] Use gradient starting model typical for refraction cases. For crosshole tomography geometry you should set this to False for a non-gradient (e.g. homogeneous) starting model. vTop: float Top velocity for gradient starting model. vBottom: float Bottom velocity for gradient starting model. secNodes: int [2] Number of secondary nodes for accuracy of forward computation. Returns ------- model: Mapped (for paradomain) velocity model. Keyword Arguments ----------------- ** kwargs: Inversion related arguments: See :py:mod:`pygimli.frameworks.MeshMethodManager.invert` """ mesh = kwargs.pop('mesh', None) if secNodes is not None: self.secNodes = secNodes if 'limits' in kwargs and kwargs['limits'][0] > 1: tmp = kwargs['limits'][0] kwargs['limits'][0] = 1.0 / kwargs['limits'][1] kwargs['limits'][1] = 1.0 / tmp pg.verbose('Switching velocity limits to slowness limits.', kwargs['limits']) if useGradient: self.fop._useGradient = [vTop, vBottom] else: self.fop._useGradient = None ### invert return mapped models slowness = super().invert(data, mesh, **kwargs) velocity = 1.0 / slowness velocity.isParaModel = slowness.isParaModel self.fw.model = 1.0 / self.fw.model #C42 self.fw only hold non-mapped model # that needs to be compatible to self.fw.mesh return velocity
[docs] def showFit(self, axs=None, firstPicks=True, **kwargs): """Show data fit as first-break picks or apparent velocity.""" if firstPicks: kwargs.setdefault("linestyle", "None") ax, _ = self.showData(firstPicks=True, **kwargs) drawFirstPicks(ax, self.fop.data, self.inv.response, marker=None) else: super().showFit(axs=axs, **kwargs)
[docs] def getRayPaths(self, model=None): """Compute ray paths. If model is not specified, the last calculated Jacobian is used. Parameters ---------- model : array Velocity model for which to calculate and visualize ray paths (the default is model for last Jacobian calculation in self.velocity). Returns ------- list of two-column array holding x and y positions """ if model is not None: # if self.fop.jacobian().size() == 0 or model != self.model: self.fop.createJacobian(1/model) shots = self.fop.data.id("s") recei = self.fop.data.id("g") segs = [] nodes = self.fop._core.mesh().positions(withSecNodes=True) for s, g in zip(shots, recei, strict=False): wi = self.fop.way(s, g) points = nodes[wi] segs.append(np.column_stack((pg.x(points), pg.y(points)))) return segs
[docs] def drawRayPaths(self, ax, model=None, rayPaths=None, **kwargs): """Draw the the ray paths for model or last model. If model is not specified, the last calculated Jacobian is used. Parameters ---------- rayPaths : list of np.array x/y column array with ray point positions model : array Velocity model for which to calculate and visualize ray paths (the default is model for last Jacobian calculation in self.velocity). ax : matplotlib.axes object To draw the model and the path into. **kwargs : type Additional arguments passed to LineCollection (alpha, linewidths, color, linestyles). Returns ------- lc : matplotlib.LineCollection """ rayPaths = rayPaths or self.getRayPaths(model=model) _ = kwargs.setdefault("color", "w") _ = kwargs.setdefault("alpha", 0.5) _ = kwargs.setdefault("linewidths", 0.8) from matplotlib.collections import LineCollection lc = LineCollection(rayPaths, **kwargs) ax.add_collection(lc) return lc
[docs] def showRayPaths(self, model=None, ax=None, **kwargs): """Show the model with ray paths for given model. If not model specified, the last calculated Jacobian is taken. Parameters ---------- model : array Velocity model for which to calculate and visualize ray paths (the default is model for last Jacobian calculation in self.velocity). ax : matplotlib.axes object To draw the model and the path into. **kwargs : type forward to drawRayPaths Returns ------- ax : matplotlib.axes object cb : matplotlib.colorbar object (only if model is provided) Examples -------- >>> import pygimli as pg >>> from pygimli.physics import traveltime as tt >>> >>> x, y = 8, 6 >>> mesh = pg.createGrid(x, y) >>> data = tt.createRAData([(0, 0)] + [(x, i) for i in range(y)], ... shotDistance=y+1) >>> data["t"] = 1.0 >>> mgr = tt.Manager(data) >>> mgr.applyMesh(mesh, secNodes=10) >>> ax, cb = mgr.showRayPaths(showMesh=True, diam=0.1) """ if model is None: if self.fop.jacobian().size() == 0: self.fop.mesh() # initialize any meshs .. just to be sure is 1 model = pg.Vector(self.fop.regionManager().parameterCount(), 1.0) else: model = self.model ax, cbar = self.showModel(ax=ax, model=model, showMesh=kwargs.pop('showMesh', None), diam=kwargs.pop('diam', None)) self.drawRayPaths(ax, model=model, **kwargs) return ax, cbar
[docs] def rayCoverage(self): """Ray coverage, i.e. summed raypath lengths.""" J = self.fop.jacobian() return J.transMult(np.ones(J.rows()))
[docs] def createTraveltimefield(self, v=None, startPos=None, withSec=False): """Compute a single traveltime field.""" startPos = startPos or self.data.sensor(0) fop = self.fop mesh = fop.mesh() Di = fop.dijkstra slowPerCell = fop.createMappedModel(1/v, 1e16) Di.setGraph(fop._core.createGraph(slowPerCell)) Di.setStartNode(mesh.findNearestNode(startPos)) dist = Di.distances() if withSec: return dist else: return dist[:mesh.nodeCount()]
[docs] def standardizedCoverage(self): """Standardized coverage vector (0|1) using neighbor info.""" coverage = self.rayCoverage() C = self.fop.constraintsRef() return np.sign(np.absolute(C.transMult(C * coverage)))
[docs] def showCoverage(self, ax=None, name='coverage', **kwargs): """Show the ray coverage in log-scale.""" if ax is None: fig, ax = pg.plt.subplots() cov = self.rayCoverage() return pg.show(self.fop.paraDomain, pg.log10(cov+min(cov[cov > 0])*.5), ax=ax, coverage=self.standardizedCoverage(), **kwargs)
[docs] def saveResult(self, folder=None, size=(16, 10), verbose=False, **kwargs): """Save the results in a specified (or date-time derived) folder. Saved items are: * Resulting inversion model * Velocity vector * Coverage vector * Standardized coverage vector * Mesh (bms and vtk with results) Args ---- path: str[None] Path to save into. If not set the name is automatically created size: (float, float) (16,10) Figure size. Keyword Args ------------ Will be forwarded to showResults Returns ------- str: Name of the result path. """ subfolder = self.__class__.__name__ path = Path(getSavePath(folder, subfolder)) if verbose: pg.info(f'Saving refraction data to: {path}') np.savetxt(path / 'velocity.vector', self.velocity) np.savetxt(path / 'velocity-cov.vector', self.rayCoverage()) np.savetxt(path / 'velocity-scov.vector', self.standardizedCoverage()) m = pg.Mesh(self.paraDomain) m['Velocity'] = self.paraModel(self.velocity) m['Coverage'] = self.rayCoverage() m['S_Coverage'] = self.standardizedCoverage() m.exportVTK(str(path / 'velocity')) m.saveBinaryV2(str(path / 'velocity-pd')) self.fop.mesh().save(str(path / 'velocity-mesh')) np.savetxt(path / 'chi.txt', self.inv.chi2History) if m.dim() == 2: fig, ax = pg.plt.subplots() self.showResult(ax=ax, cov=self.standardizedCoverage(), **kwargs) fig.set_size_inches(size) fig.savefig(path / 'velocity.pdf', bbox_inches='tight') pg.plt.close(fig) return path