diff --git a/example/levelset/lsf_evolve_fem_without_reinit.py b/example/levelset/lsf_evolve_fem_without_reinit.py new file mode 100644 index 000000000..e083d72c6 --- /dev/null +++ b/example/levelset/lsf_evolve_fem_without_reinit.py @@ -0,0 +1 @@ +"exp" diff --git a/fealpy/levelset/ls_fem_solver.py b/fealpy/levelset/ls_fem_solver.py index ce0eaa20d..ab9232bba 100644 --- a/fealpy/levelset/ls_fem_solver.py +++ b/fealpy/levelset/ls_fem_solver.py @@ -15,25 +15,61 @@ class LSFEMSolver(LSSolver): + """ + A finite element solver for the level set evolution equation, which tracks + the evolution of an interface driven by a velocity field. It discretizes + the transport equation using Crank-Nicolson method in time and finite + element method in space. + """ def __init__(self, space, u=None): + """ + Initialize the finite element solver for the level set evolution. + + Parameters: + - space : The finite element space over which the system is defined. + - u : The velocity field driving the interface evolution. It should be a vector + function defined on the mesh nodes. + + The bilinear forms for the mass and convection are assembled here. The + mass matrix M represents the mass term in the equation, while the + convection matrix C represents the velocity field's convection effect. + """ self.space = space + + # Assemble the mass matrix using the ScalarMassIntegrator which represents the L2 inner product of the finite element functions. bform = BilinearForm(space) bform.add_domain_integrator(ScalarMassIntegrator()) - self.M = bform.assembly() # TODO: 实现快速组装方法 + self.M = bform.assembly() # TODO:Implement a fast assembly method self.u = u + + # Assemble the convection matrix only if a velocity field is provided. if u is not None: bform = BilinearForm(space) + + # The convection integrator accounts for the transport effect of the velocity field on the level set function. bform.add_domain_integrator(ScalarConvectionIntegrator(c = u)) - self.C = bform.assembly() # TODO:实现快速组装方法 + self.C = bform.assembly() # TODO:Implement a fast assembly method def solve(self, phi0, dt, u=None, tol=1e-8): - ''' - PDE_solve - ''' + """ + Solve the level set evolution equation for one time step using the + provided initial condition and velocity field. + + Parameters: + - phi0 : The initial condition for the level set function. + - dt : Time step size for the evolution. + - u : (Optional) Updated velocity field for the evolution. + - tol : Tolerance for the linear system solver. + + The function solves for phi^{n+1} given phi^n (phi0) using the + discretized Crank-Nicolson scheme. It returns the updated level set + function after one time step. + """ space = self.space M = self.M + # Use the provided velocity field u for this time step if given, otherwise use the previously stored velocity field. if u is None: C = self.C if C is None: @@ -42,38 +78,70 @@ def solve(self, phi0, dt, u=None, tol=1e-8): bform = BilinearForm(space) bform.add_domain_integrator(ScalarConvectionIntegrator(c = u)) C = bform.assembly() + + # The system matrix A is composed of the mass matrix and the convection matrix. + # It represents the Crank-Nicolson discretization of the PDE. A = M + (dt/2) * C + + # The right-hand side vector b for the linear system includes the effect of the previous time step's level set function and the convection. b = M @ phi0 - (dt/2) * C @ phi0 + # Solve the linear system to find the updated level set function. phi0 = self.solve_system(A, b, tol = tol) return phi0 + def reinit(self, phi0, dt = 0.0001, eps = 5e-6, nt = 4, alpha = None): ''' - PDE_reinit + Reinitialize the level set function to a signed distance function using the PDE-based reinitialization approach. + + This function solves a reinitialization equation to stabilize the level set + function without moving the interface. The equation introduces artificial diffusion + to stabilize the solution process. + + Parameters: + - phi0: The level set function to be reinitialized. + - dt: The timestep size for the pseudo-time evolution. + - eps: A small positive value to avoid division by zero and to smooth the transition across the interface. + - nt: The number of pseudo-time steps to perform. + - alpha: The artificial diffusion coefficient, which is auto-calculated based on the cell scale if not provided. + + The reinitialization equation is discretized in time using a forward Euler method. + The weak form of the reinitialization equation is assembled and solved iteratively + until a stable state is reached. + + Returns: + - phi1: The reinitialized level set function as a signed distance function. ''' space = self.space mesh = space.mesh + # Calculate the maximum cell scale which is used to determine the artificial diffusion coefficient. cellscale = np.max(mesh.entity_measure('cell')) if alpha is None: alpha = 0.0625*cellscale + # Initialize the solution function and set its values to the initial level set function. phi1 = space.function() phi1[:] = phi0 phi2 = space.function() + # Assemble the mass matrix M. M = self.M + # Assemble the stiffness matrix S using ScalarDiffusionIntegrator for artificial diffusion. bform = BilinearForm(space) bform.add_domain_integrator(ScalarDiffusionIntegrator()) S = bform.assembly() - eold = 0 + # Initialize the old error to zero. + eold = 0 + # Iterate for a fixed number of pseudo-time steps or until the error is below a threshold. for _ in range(nt): + # Define the function f for the right-hand side of the reinitialization equation. @barycentric def f(bcs, index): grad = phi1.grad_value(bcs) @@ -81,14 +149,22 @@ def f(bcs, index): val *= np.sign(phi0(bcs)) return val + # Assemble the linear form for the right-hand side of the equation. lform = LinearForm(space) lform.add_domain_integrator( ScalarSourceIntegrator(f = f) ) b0 = lform.assembly() + + # Compute the right-hand side vector for the linear system. b = M @ phi1 + dt * b0 - dt * alpha * (S @ phi1) + # Solve the linear system to update the level set function. phi2[:] = spsolve(M, b) + + # Calculate the error between the new and old level set function. error = space.mesh.error(phi2, phi1) - print("重置:", error) + print("Reinitialization error:", error) + + # If the error starts increasing or is below the threshold, break the loop. if eold < error and error< eps : break else: diff --git a/fealpy/levelset/ls_solver.py b/fealpy/levelset/ls_solver.py index 2766fc9ef..d3610b5a7 100644 --- a/fealpy/levelset/ls_solver.py +++ b/fealpy/levelset/ls_solver.py @@ -16,18 +16,17 @@ def __call__(self, rk=None): class LSSolver(): - def __init__(self, space, phi0, u, output_dir): + def __init__(self, space, phi0, u): self.space = space self.phi0 = phi0 self.u = u - self.output_dir = output_dir - def output(self, timestep): + def output(self, timestep, output_dir, filename_prefix): mesh = self.space.mesh - if self.output_dir != 'None': + if output_dir != 'None': mesh.nodedata['phi'] = self.phi0 mesh.nodedata['velocity'] = self.u - fname = os.path.join(self.output_dir, f'test_{timestep:010}.vtu') + fname = os.path.join(output_dir, f'{filename_prefix}_{timestep:010}.vtu') mesh.to_vtk(fname=fname) def check_gradient_norm(self, phi): @@ -60,7 +59,7 @@ def check_gradient_norm(self, phi): def solve_system(self, A, b, tol=1e-8): counter = IterationCounter(disp = False) - result, info = lgmres(A, b, tol = tol, callback = counter) + result, info = lgmres(A, b, tol = tol, atol = tol, callback = counter) # print("Convergence info:", info) # print("Number of iteration of gmres:", counter.niter) return result diff --git a/test/levelset/test_ls_fem_solver.py b/test/levelset/test_ls_fem_solver.py new file mode 100644 index 000000000..57568514c --- /dev/null +++ b/test/levelset/test_ls_fem_solver.py @@ -0,0 +1,126 @@ +import pytest + +from fealpy.functionspace import LagrangeFESpace +from fealpy.mesh.triangle_mesh import TriangleMesh +from fealpy.decorator import cartesian +from fealpy.levelset.ls_fem_solver import LSFEMSolver +from fealpy.levelset.ls_fem_solver import LSSolver + +import numpy as np + + +# A fixture for setting up the LSSolver environment with a predefined mesh and space. +@pytest.fixture +def ls_fem_solver_setup(): + # Create a mesh in a unit square domain with specified number of divisions along x and y. + mesh = TriangleMesh.from_box([0, 1, 0, 1], nx=100, ny=100) + # Define a Lagrange finite element space of first order on the mesh. + space = LagrangeFESpace(mesh, p=1) + + # Define a velocity field function for testing. + @cartesian + def velocity_field(p): + x = p[..., 0] + y = p[..., 1] + u = np.zeros(p.shape) + u[..., 0] = np.sin(np.pi * x) ** 2 * np.sin(2 * np.pi * y) + u[..., 1] = -np.sin(np.pi * y) ** 2 * np.sin(2 * np.pi * x) + return u + + # Define a circle function representing the level set function for testing. + @cartesian + def circle(p): + x = p[..., 0] + y = p[..., 1] + return np.sqrt((x - 0.5) ** 2 + (y - 0.75) ** 2) - 0.15 + + # Interpolate the functions onto the finite element space. + phi0 = space.interpolate(circle) + u = space.interpolate(velocity_field, dim=2) + + return space, u, phi0 + + +# Test the initialization of the LSFEMSolver class. +def test_LSFEMSolver_init_without_u(ls_fem_solver_setup): + space, _, _ = ls_fem_solver_setup + + solver_without_u = LSFEMSolver(space) + + # Check that the mass matrix M is initialized. + assert solver_without_u.M is not None, "Mass matrix M should be initialized." + + # Check that the space attribute matches the provided finite element space. + assert solver_without_u.space is space, "The space attribute should match the provided finite element space." + + # Check that the velocity field u is None when not provided during initialization. + assert solver_without_u.u is None, "Velocity field u should be None when not provided during initialization." + + # Check that the convection matrix C does not exist or is None when u is not provided. + assert not hasattr(solver_without_u, 'C') or solver_without_u.C is None, "Convection matrix C should not exist or be None when velocity u is not provided." + +def test_LSFEMSolver_init_with_u(ls_fem_solver_setup): + space, u, _ = ls_fem_solver_setup + + # Instantiate the solver with the velocity field. + solver_with_u = LSFEMSolver(space, u=u) + + # Check that the velocity field u is not None when provided during initialization. + assert solver_with_u.u is not None, "Velocity field u should not be None when provided during initialization." + # Check that the convection matrix C is initialized when u is provided. + assert solver_with_u.C is not None, "Convection matrix C should be initialized when velocity u is provided." + + +def test_LSFEMSolver_solve(ls_fem_solver_setup): + space, u, phi0 = ls_fem_solver_setup + + dt = 0.01 # A small time step + + # Instantiate the solver with the velocity field. + solver_with_u = LSFEMSolver(space, u=u) + + # Perform one step of the level set evolution. + phi1 = solver_with_u.solve(phi0, dt, u=u) + + # Check if the result is a numpy array, which it should be after solving. + assert isinstance(phi1, np.ndarray), "The result of the solve method should be a numpy array." + + +def test_LSFEMSolver_reinit(ls_fem_solver_setup): + space, u, _ = ls_fem_solver_setup + + # 提供一个非符号距离函数 phi0 + @cartesian + def non_sdf(p): + x = p[..., 0] + y = p[..., 1] + return (x - 0.5)**2 + (y - 0.5)**2 # 一个非符号距离函数的平方形式 + + phi0 = space.interpolate(non_sdf) + print("phi0:", phi0) + + # Instantiate the solver with the velocity field. + solver_with_u = LSFEMSolver(space, u=u) + + solver = LSSolver(space, phi0, u) + + diff_avg, diff_max = solver.check_gradient_norm(phi = phi0) + print("diff_avg_0:", diff_avg) + print("diff_max_0:", diff_max) + + + # 执行重置化 + phi1 = solver_with_u.reinit(phi0) + print("phi1:", phi1) + + # Call the check_gradient_norm method which calculates the average and maximum difference from 1. + diff_avg, diff_max = solver.check_gradient_norm(phi = phi1) + print("diff_avg_1:", diff_avg) + print("diff_max_1:", diff_max) + + # Assert that the average and maximum gradient norm differences should be close to 0. + # This means that the gradient norm is close to 1 as expected for a signed distance function. + assert np.isclose(diff_avg, 0, atol=1e-2), "The average difference should be close to 0." + assert np.isclose(diff_max, 0, atol=1e-2), "The maximum difference should be close to 0." + + diff --git a/test/levelset/test_ls_solver.py b/test/levelset/test_ls_solver.py new file mode 100644 index 000000000..86b4dc16f --- /dev/null +++ b/test/levelset/test_ls_solver.py @@ -0,0 +1,128 @@ +import pytest +from fealpy.functionspace import LagrangeFESpace +from fealpy.mesh.triangle_mesh import TriangleMesh +from fealpy.decorator import cartesian +from fealpy.levelset.ls_solver import IterationCounter, LSSolver +import numpy as np + + +# A fixture for setting up the LSSolver environment with a predefined mesh and space. +@pytest.fixture +def ls_solver_setup(): + # Create a mesh in a unit square domain with specified number of divisions along x and y. + mesh = TriangleMesh.from_box([0, 1, 0, 1], nx=100, ny=100) + # Define a Lagrange finite element space of first order on the mesh. + space = LagrangeFESpace(mesh, p=1) + + # Define a velocity field function for testing. + @cartesian + def velocity_field(p): + x = p[..., 0] + y = p[..., 1] + u = np.zeros(p.shape) + u[..., 0] = np.sin(np.pi * x) ** 2 * np.sin(2 * np.pi * y) + u[..., 1] = -np.sin(np.pi * y) ** 2 * np.sin(2 * np.pi * x) + return u + + # Define a circle function representing the level set function for testing. + @cartesian + def circle(p): + x = p[..., 0] + y = p[..., 1] + return np.sqrt((x - 0.5) ** 2 + (y - 0.75) ** 2) - 0.15 + + # Interpolate the functions onto the finite element space. + phi0 = space.interpolate(circle) + u = space.interpolate(velocity_field, dim=2) + + # Instantiate the solver with the space and interpolated functions. + solver = LSSolver(space, phi0, u) + + return solver, phi0, u + + +# Test the IterationCounter class. +def test_IterationCounter(): + # Initialize the IterationCounter with display set to False. + counter = IterationCounter(disp = False) + + # Initially, the iteration count should be zero. + assert counter.niter == 0 + + # After calling the counter, the iteration count should increment. + counter() + assert counter.niter == 1 + + +# Test the initialization of the LSSolver class. +def test_LSSolver_init(ls_solver_setup): + # Unpack the solver, phi0 and u from the setup fixture. + solver, phi0, u = ls_solver_setup + + # Check if the solver's phi0 and u attributes match the ones generated in the fixture. + assert solver.phi0 is phi0 + assert solver.u is u + + +# Test the output method of the LSSolver class. +def test_LSSolver_output(ls_solver_setup, tmp_path): + # Unpack the solver from the setup fixture. + solver, _, _, = ls_solver_setup + + # Define a directory for output using pytest's temporary path fixture. + output_dir = str(tmp_path) + filename_prefix = "test_output" + timestep = 0 + + # Call the output method which should generate a file. + solver.output(timestep, output_dir, filename_prefix) + + # Check if the file was created with the expected name. + expected_file_path = tmp_path / f"{filename_prefix}_{timestep:010}.vtu" + assert expected_file_path.is_file() + + +# Test the check_gradient_norm method of the LSSolver class. +def test_check_gradient_norm(ls_solver_setup): + # Unpack the solver from the setup fixture. + solver, _, _, = ls_solver_setup + + # Define a signed distance function for a circle for testing the gradient norm. + @cartesian + def signed_distance_circle(p): + x = p[..., 0] + y = p[..., 1] + return np.sqrt(x**2 + y**2) - 1 + + # Interpolate the function onto the finite element space. + space = solver.space + phi = space.interpolate(signed_distance_circle) + + # Call the check_gradient_norm method which calculates the average and maximum difference from 1. + diff_avg, diff_max = solver.check_gradient_norm(phi) + print("diff_avg:", diff_avg) + print("diff_max:", diff_max) + + # Assert that the average and maximum gradient norm differences should be close to 0. + # This means that the gradient norm is close to 1 as expected for a signed distance function. + assert np.isclose(diff_avg, 0, atol=1e-6), "The average difference should be close to 0." + assert np.isclose(diff_max, 0, atol=1e-6), "The maximum difference should be close to 0." + + +# Test the solve_system method of the LSSolver class. +def test_LSSolver_solve_system(ls_solver_setup): + # Unpack the solver from the setup fixture. + solver, _, _, = ls_solver_setup + + from scipy.sparse import csr_matrix + + # Create a simple linear system with an identity matrix and a random vector b. + size = 10 + A = csr_matrix(np.eye(size)) + b = np.random.rand(size) + + # Call the solve_system method which should solve the linear system A*x = b. + result = solver.solve_system(A, b) + + # Assert that the result should be close to b since A is an identity matrix. + assert np.allclose(result, b), "The solution should be close to b."