"""Defines a vectorial field over a grid.
.. codeauthor:: David Zwicker <david.zwicker@ds.mpg.de>
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, overload
import numpy as np
from ..grids.base import DimensionError, GridBase
from ..grids.cartesian import CartesianGrid
from ..tools.docstrings import fill_in_docstring
from ..tools.misc import get_common_dtype
from ..tools.plotting import PlotReference, plot_on_figure
from .datafield_base import DataFieldBase
from .scalar import ScalarField
if TYPE_CHECKING:
from collections.abc import Callable, Sequence
from numpy.typing import DTypeLike
from ..backends import BackendBase
from ..grids.boundaries.axes import BoundariesData
from ..tools.typing import (
BinaryOperatorImplType,
Number,
NumberOrArray,
)
from .tensorial import Tensor2Field
[docs]
class VectorField(DataFieldBase):
"""Vector field discretized on a grid.
Warning:
Components of the vector field are given in the local basis. While the local
basis is identical to the global basis in Cartesian coordinates, the local basis
depends on position in curvilinear coordinate systems. Moreover, the field
always contains all components, even if the underlying grid assumes symmetries.
"""
rank = 1
[docs]
@classmethod
def from_scalars(
cls,
fields: list[ScalarField],
*,
label: str | None = None,
dtype: DTypeLike | None = None,
) -> VectorField:
"""Create a vector field from a list of ScalarFields.
Note that the data of the scalar fields is copied in the process
Args:
fields (list):
The list of (compatible) scalar fields
label (str, optional):
Name of the returned field
dtype (numpy dtype):
The data type of the field. If omitted, it will be determined from
`data` automatically.
Returns:
:class:`VectorField`: the resulting vector field
"""
grid = fields[0].grid
if grid.dim != len(fields):
msg = (
f"Grid dimension and number of scalar fields differ ({grid.dim} != "
f"{len(fields)})"
)
raise DimensionError(msg)
data = []
for field in fields:
if not field.grid.compatible_with(grid):
msg = "Grids are incompatible"
raise ValueError(msg)
data.append(field.data)
return cls(grid, data, label=label, dtype=dtype)
[docs]
@classmethod
@fill_in_docstring
def from_expression(
cls,
grid: GridBase,
expressions: Sequence[str],
*,
user_funcs: dict[str, Callable] | None = None,
consts: dict[str, NumberOrArray] | None = None,
label: str | None = None,
dtype: DTypeLike | None = None,
) -> VectorField:
"""Create a vector field on a grid from given expressions.
Warning:
{WARNING_EXEC}
Args:
grid (:class:`~pde.grids.base.GridBase`):
Grid defining the space on which this field is defined
expressions (list of str):
A list of mathematical expression, one for each component of the vector
field. The expressions determine the values as a function of the
position on the grid. The expressions may contain standard mathematical
functions and they may depend on the axes labels of the grid.
More information can be found in the
:ref:`expression documentation <documentation-expressions>`.
user_funcs (dict, optional):
A dictionary with user defined functions that can be used in the
expression
consts (dict, optional):
A dictionary with user defined constants that can be used in the
expression. The values of these constants should either be numbers or
:class:`~numpy.ndarray`.
label (str, optional):
Name of the field
dtype (numpy dtype):
The data type of the field. If omitted, it will be determined from
`data` automatically.
"""
from ..tools.expressions import ScalarExpression
if isinstance(expressions, str) or len(expressions) != grid.dim:
axes_names = grid.axes + grid.axes_symmetric
msg = f"Expected {grid.dim} expressions for the coordinates {axes_names}."
raise DimensionError(msg)
if any("cartesian" in str(expression) for expression in expressions):
# support Cartesian coordinates via a special constant
if consts is None:
consts = {}
if "cartesian" not in consts:
coords_cart = grid.point_to_cartesian(grid.cell_coords)
consts["cartesian"] = np.moveaxis(coords_cart, -1, 0)
assert "cartesian" in consts
# obtain the coordinates of the grid points
points = [grid.cell_coords[..., i] for i in range(grid.num_axes)]
# evaluate all vector components at all points
data = []
for expression in expressions:
expr = ScalarExpression(
expression=expression,
signature=grid.axes,
user_funcs=user_funcs,
consts=consts,
repl=grid.c._axes_alt_repl,
allow_indexed=True,
)
values = np.broadcast_to(expr(*points), grid.shape)
data.append(values)
# create vector field from the data
return cls(grid=grid, data=data, label=label, dtype=dtype)
def __getitem__(self, key: int | str) -> ScalarField:
"""Extract a component of the VectorField."""
axis = self.grid.get_axis_index(key)
comp_name = self.grid.c.axes[axis]
if self.label:
label = self.label + f"_{comp_name}"
else:
label = f"{comp_name} component"
return ScalarField(
self.grid, data=self._data_full[axis], label=label, with_ghost_cells=True
)
def __setitem__(self, key: int | str, value: NumberOrArray | ScalarField):
"""Set a component of the VectorField."""
idx = self.grid.get_axis_index(key)
if isinstance(value, ScalarField):
self.grid.assert_grid_compatible(value.grid)
self.data[idx] = value.data
else:
self.data[idx] = value
@overload
def dot(
self,
other: VectorField,
out: ScalarField | None = ...,
*,
conjugate: bool = ...,
label: str = ...,
) -> ScalarField: ...
@overload
def dot(
self,
other: Tensor2Field,
out: VectorField | None = ...,
*,
conjugate: bool = ...,
label: str = ...,
) -> VectorField: ...
[docs]
def dot(
self,
other: VectorField | Tensor2Field,
out: ScalarField | VectorField | None = None,
*,
conjugate: bool = True,
label: str = "dot product",
) -> ScalarField | VectorField:
"""Calculate the dot product involving a vector field.
This supports the dot product between two vectors fields as well as the
product between a vector and a tensor. The resulting fields will be a
scalar or vector, respectively.
Args:
other (VectorField or Tensor2Field):
the second field
out (ScalarField or VectorField, optional):
Optional field to which the result is written.
conjugate (bool):
Whether to use the complex conjugate for the second operand
label (str, optional):
Name of the returned field
Returns:
:class:`~pde.fields.scalar.ScalarField` or
:class:`~pde.fields.vectorial.VectorField`: result of applying the operator
"""
from .tensorial import Tensor2Field
# check input
self.grid.assert_grid_compatible(other.grid)
if isinstance(other, VectorField):
result_type = ScalarField
elif isinstance(other, Tensor2Field):
result_type = VectorField # type: ignore
else:
msg = "Second term must be a vector or tensor field"
raise TypeError(msg)
if out is None:
out = result_type(self.grid, dtype=get_common_dtype(self, other))
else:
if not isinstance(out, result_type):
msg = f"`out` must be of type `{result_type}`"
raise TypeError(msg)
self.grid.assert_grid_compatible(out.grid)
# calculate the result
other_data = other.data.conj() if conjugate else other.data
np.einsum("i...,i...->...", self.data, other_data, out=out.data)
if label is not None:
out.label = label
return out
__matmul__ = dot # support python @-syntax for matrix multiplication
[docs]
def outer_product(
self,
other: VectorField,
out: Tensor2Field | None = None,
*,
label: str | None = None,
) -> Tensor2Field:
"""Calculate the outer product of this vector field with another.
Args:
other (:class:`~pde.fields.vectorial.VectorField`):
The second vector field
out (:class:`~pde.fields.tensorial.Tensor2Field`, optional):
Optional tensorial field to which the result is written.
label (str, optional):
Name of the returned field
Returns:
:class:`~pde.fields.tensorial.Tensor2Field`: result of the operation
"""
from .tensorial import Tensor2Field
self.assert_field_compatible(other)
if out is None:
out = Tensor2Field(self.grid)
else:
self.grid.assert_grid_compatible(out.grid)
# calculate the result
np.einsum("i...,j...->ij...", self.data, other.data, out=out.data)
if label is not None:
out.label = label
return out
[docs]
def make_outer_prod_operator(
self, backend: str | BackendBase = "numba"
) -> BinaryOperatorImplType:
"""Return operator calculating the outer product of two vector fields.
Warning:
This function does not check types or dimensions.
Args:
backend (str):
The backend (e.g., 'numba' or 'numba') used for this operator.
Returns:
function that takes two instance of :class:`~numpy.ndarray`, which contain
the discretized data of the two operands. An optional third argument can
specify the output array to which the result is written.
"""
from ..backends import get_backend
return get_backend(backend).make_outer_prod_operator(self)
[docs]
@fill_in_docstring
def divergence(
self, bc: BoundariesData | None, out: ScalarField | None = None, **kwargs
) -> ScalarField:
"""Apply divergence operator and return result as a field.
Args:
bc:
The boundary conditions applied to the field.
{ARG_BOUNDARIES_OPTIONAL}
out (ScalarField, optional):
Optional scalar field to which the result is written.
**kwargs:
Additional arguments affecting how the operator behaves.
Returns:
:class:`~pde.fields.scalar.ScalarField`: Divergence of the field
"""
return self.apply_operator("divergence", bc=bc, out=out, **kwargs) # type: ignore
[docs]
@fill_in_docstring
def gradient(
self,
bc: BoundariesData | None,
out: Tensor2Field | None = None,
**kwargs,
) -> Tensor2Field:
r"""Apply vector gradient operator and return result as a field.
The vector gradient field is a tensor field :math:`t_{\alpha\beta}` that
specifies the derivatives of the vector field :math:`v_\alpha` with respect to
all coordinates :math:`x_\beta`.
Args:
bc:
The boundary conditions applied to the field. Boundary conditions need
to determine all components of the vector field.
{ARG_BOUNDARIES_OPTIONAL}
out (VectorField, optional):
Optional vector field to which the result is written.
**kwargs:
Additional arguments affecting how the operator behaves.
Returns:
:class:`~pde.fields.tensorial.Tensor2Field`: Gradient of the field
"""
return self.apply_operator("vector_gradient", bc=bc, out=out, **kwargs) # type: ignore
[docs]
@fill_in_docstring
def laplace(
self, bc: BoundariesData | None, out: VectorField | None = None, **kwargs
) -> VectorField:
r"""Apply vector Laplace operator and return result as a field.
The vector Laplacian is a vector field :math:`L_\alpha` containing the second
derivatives of the vector field :math:`v_\alpha` with respect to the coordinates
:math:`x_\beta`:
.. math::
L_\alpha = \sum_\beta
\frac{\partial^2 v_\alpha}{\partial x_\beta \partial x_\beta}
Args:
bc:
The boundary conditions applied to the field.
{ARG_BOUNDARIES_OPTIONAL}
out (VectorField, optional):
Optional vector field to which the result is written.
**kwargs:
Additional arguments affecting how the operator behaves.
Returns:
:class:`~pde.fields.vectorial.VectorField`: Laplacian of the field
"""
return self.apply_operator("vector_laplace", bc=bc, out=out, **kwargs) # type: ignore
[docs]
def to_scalar(
self,
scalar: str | int = "auto",
*,
label: str | None = "scalar `{scalar}`",
) -> ScalarField:
"""Return scalar variant of the field.
Args:
scalar (str):
Choose the method to use. Possible choices are `norm`, `max`, `min`,
`squared_sum`, `norm_squared`, or an integer specifying which component
is returned (indexing starts at `0`). The default value `auto` picks the
method automatically: The first (and only) component is returned for
real fields on one-dimensional spaces, while the norm of the vector is
returned otherwise.
label (str, optional):
Name of the returned field
Returns:
:class:`pde.fields.scalar.ScalarField`:
The scalar field after applying the operation
"""
if scalar == "auto":
if self.grid.dim > 1 or np.iscomplexobj(self.data):
scalar = "norm"
else:
scalar = 0 # return the field unchanged
if isinstance(scalar, int):
data = self.data[scalar]
elif scalar == "norm":
data = np.linalg.norm(self.data, axis=0)
elif scalar == "max":
data = np.max(self.data, axis=0)
elif scalar == "min":
data = np.min(self.data, axis=0)
elif scalar == "squared_sum":
data = np.sum(self.data**2, axis=0)
elif scalar == "norm_squared":
data = np.sum(self.data * self.data.conj(), axis=0)
else:
msg = f"Unknown method `{scalar}` for `to_scalar`"
raise ValueError(msg)
if label is not None:
label = label.format(scalar=scalar)
return ScalarField(self.grid, data, label=label)
[docs]
def get_vector_data(
self, transpose: bool = False, max_points: int | None = None, **kwargs
) -> dict[str, Any]:
r"""Return data for a vector plot of the field.
Args:
transpose (bool):
Determines whether the transpose of the data should be plotted.
max_points (int):
The maximal number of points that is used along each axis. This
option can be used to sub-sample the data.
**kwargs:
Additional parameters forwarded to `grid.get_image_data`
Returns:
dict: Information useful for plotting an vector field
"""
if self.is_complex:
self._logger.warning("Only the real part of complex data is shown")
# extract the image data
data = self.grid.get_vector_data(self.data.real, **kwargs)
data["title"] = self.label
# transpose the data if requested
if transpose:
data["x"], data["y"] = data["y"], data["x"]
data["data_x"], data["data_y"] = data["data_y"].T, data["data_x"].T
data["label_x"], data["label_y"] = data["label_y"], data["label_x"]
data["extent"] = data["extent"][2:] + data["extent"][:2]
# reduce the sampling of the vector points
if max_points is not None:
shape = data["data_x"].shape
for axis, size in enumerate(shape):
if size > max_points:
# sub-sample the data
idx_f = np.linspace(0, size - 1, max_points)
idx_i = np.round(idx_f).astype(int)
data["data_x"] = np.take(data["data_x"], idx_i, axis=axis)
data["data_y"] = np.take(data["data_y"], idx_i, axis=axis)
if axis == 0:
data["x"] = data["x"][idx_i]
elif axis == 1:
data["y"] = data["y"][idx_i]
else:
msg = "Only supports 2d grids"
raise RuntimeError(msg)
data["shape"] = data["data_x"].shape
data["size"] = data["data_x"].size
return data
[docs]
@fill_in_docstring
def interpolate_to_grid(
self: VectorField,
grid: GridBase,
*,
bc: BoundariesData | None = None,
fill: Number | None = None,
label: str | None = None,
) -> VectorField:
"""Interpolate the data of this vector field to another grid.
Args:
grid (:class:`~pde.grids.base.GridBase`):
The grid of the new field onto which the current field is interpolated.
bc:
The boundary conditions applied to the field, which affects values close
to the boundary. If omitted, the argument `fill` is used to determine
values outside the domain.
{ARG_BOUNDARIES_OPTIONAL}
fill (Number, optional):
Determines how values out of bounds are handled. If `None`, a
`ValueError` is raised when out-of-bounds points are requested.
Otherwise, the given value is returned.
label (str, optional):
Name of the returned field
Returns:
Field of the same rank as the current one.
"""
if self.grid.dim != grid.dim:
msg = f"Incompatible grid dimensions ({self.grid.dim:d} != {grid.dim:d})"
raise DimensionError(msg)
# determine the points at which data needs to be calculated
if (
self.grid.num_axes == grid.num_axes
and self.grid.__class__ is grid.__class__
) or ( # UnitGrid and CartesianGrid should be treated the same here
isinstance(self.grid, CartesianGrid) and isinstance(grid, CartesianGrid)
):
# convert within the same grid class
points = grid.cell_coords
# vectors are already given in the correct basis
data = self.interpolate(points, bc=bc, fill=fill)
elif isinstance(grid, CartesianGrid):
# convert Cartesian coordinates to coordinates in current grid
points = self.grid.c.pos_from_cart(grid.cell_coords)
points_grid_sym = self.grid._coords_symmetric(points)
# interpolate the data to the grid; this gives the vector in the grid basis
data_grid = self.interpolate(points_grid_sym, bc=bc, fill=fill)
# convert the vector to the cartesian basis
data = self.grid._vector_to_cartesian(points, data_grid)
else:
# this type of interpolation is not supported
grid_in = self.grid.__class__.__name__
grid_out = grid.__class__.__name__
msg = f"Can't interpolate from {grid_in} to {grid_out}"
raise NotImplementedError(msg)
return self.__class__(grid, data, label=label)
def _update_plot_components(self, reference: list[PlotReference]) -> None:
"""Update a component plot with the current field values.
Args:
reference (list of :class:`PlotReference`):
All references of the plot to update
"""
for i in range(self.grid.dim):
self[i]._update_plot(reference[i])
[docs]
@plot_on_figure(update_method="_update_plot_components")
def plot_components(
self,
kind: str = "auto",
fig=None,
**kwargs,
) -> list[PlotReference]:
r"""Visualize all the components of this vector field.
Args:
kind (str or list of str):
Determines the kind of the visualizations. Supported values are `image`
or `line`. Alternatively, `auto` determines the best visualization based
on the grid.
{PLOT_ARGS}
**kwargs:
All additional keyword arguments are forwarded to the actual plotting
function of all subplots.
Returns:
list of :class:`PlotReference`: Instances that contain information
to update all the plots with new data later.
"""
# create all the subpanels
dim = self.grid.dim
axs = fig.subplots(nrows=1, ncols=dim, squeeze=False)
# plot all the elements onto the respective axes
kwargs.setdefault("action", "none")
kwargs["kind"] = kind
comps = self.grid.axes + self.grid.axes_symmetric
references = [
self[i].plot(
ax=axs[0][i],
title=f"{comps[i]} Component",
**kwargs,
)
for i in range(dim)
]
# return the references for all subplots
return references
def _get_napari_layer_data( # type: ignore
self, max_points: int | None = None, args: dict[str, Any] | None = None
) -> dict[str, Any]:
"""Returns data for plotting on a single napari layer.
Args:
max_points (int):
The maximal number of points that is used along each axis. This option
can be used to subsample the data.
args (dict):
Additional arguments returned in the result, which affect how the layer
is shown.
Returns:
dict: all the information necessary to plot this field
"""
result = {} if args is None else args.copy()
# extract the vector components in the format required by napari
data = self.get_vector_data(max_points=max_points)
vectors = np.empty((data["size"], 2, 2))
xs, ys = np.meshgrid(data["x"], data["y"], indexing="ij")
vectors[:, 0, 0] = xs.flat
vectors[:, 0, 1] = ys.flat
vectors[:, 1, 0] = data["data_x"].flat
vectors[:, 1, 1] = data["data_y"].flat
result["type"] = "vectors"
result["data"] = vectors
return result