"""
| MEAFS Fit Functions
| Matheus J. Castro
| Fit functions that are used, like the C library, the :math:`\\chi^2` calculation, bisection and more.
"""
from astropy.convolution import Gaussian1DKernel, convolve
from specutils.fitting import fit_generic_continuum
from specutils.spectra import Spectrum
from astropy import units as u
from pathlib import Path
import numpy as np
import subprocess
import warnings
import ctypes
import os
[docs]
def c_init(c_name):
"""
Function to initialize the C shared library
:param c_name: name of the shared C library.
:return: the initialized object.
"""
c_library = ctypes.CDLL("{}".format(c_name))
# Defining functions argument types
c_library.bisec.argtypes = [ctypes.POINTER(ctypes.c_float), ctypes.c_int, ctypes.c_float]
c_library.bisec.restype = ctypes.c_int
c_library.chi2.argtypes = [ctypes.POINTER(ctypes.c_float), ctypes.POINTER(ctypes.c_float), ctypes.c_int,
ctypes.POINTER(ctypes.c_float), ctypes.POINTER(ctypes.c_float), ctypes.c_int]
c_library.chi2.restype = ctypes.c_float
return c_library
[docs]
def chi2(spec1, spec2):
"""
Function to find the :math:`\\chi^2` of two arrays using the C library.
:param spec1: first array.
:param spec2: seccond array.
:return: the :math:`\\chi^2`.
"""
global c_lib
# Using C library
spec1x = spec1[0].tolist()
spec1y = spec1[1].tolist()
# noinspection PyCallingNonCallable,PyTypeChecker
spec1x = (ctypes.c_float * len(spec1x))(*spec1x)
# noinspection PyCallingNonCallable,PyTypeChecker
spec1y = (ctypes.c_float * len(spec1y))(*spec1y)
spec2x = spec2[0].tolist()
spec2y = spec2[1].tolist()
# noinspection PyCallingNonCallable,PyTypeChecker
spec2x = (ctypes.c_float * len(spec2x))(*spec2x)
# noinspection PyCallingNonCallable,PyTypeChecker
spec2y = (ctypes.c_float * len(spec2y))(*spec2y)
return c_lib.chi2(spec1x, spec1y, len(spec1x), spec2x, spec2y, len(spec2x))
# Using Python (slower)
# sp1_interpol = np.array([])
# sp2_interpol = np.array([])
#
# for i in spec2.iloc():
# pos = bisec(spec1, i[0])
# if pos != -1:
# x1 = spec1.iloc[pos][0]
# x2 = spec1.iloc[pos + 1][0]
# y1 = spec1.iloc[pos][1]
# y2 = spec1.iloc[pos + 1][1]
# x = i[0]
# y = y1 + (y2 - y1) / (x2 - x1) * (x - x1)
#
# sp1_interpol = np.append(sp1_interpol, y)
# sp2_interpol = np.append(sp2_interpol, i[1])
#
# return sum((sp1_interpol - sp2_interpol)**2 / sp2_interpol)
[docs]
def bisec(spec, lamb):
"""
Function to apply the bisection script to find a number position in an array.
:param spec: array to analyse.
:param lamb: value to look for.
:return: the position.
"""
# Uncomment the script bellow to use the C library (slower)
# global c_lib
# arr = spec[0].tolist()
# arr_c = (ctypes.c_float * len(arr))(*arr)
#
# index = c_lib.bisec(arr_c, len(arr), lamb)
#
# return index
# Bisection Script
if lamb < spec.iloc[0][0]:
return -1
elif lamb > spec.iloc[-1][0]:
return -1
elif lamb == spec.iloc[0][0]:
return 0
elif lamb == spec.iloc[-1][0]:
return len(spec) - 1
else:
gap = [0, len(spec) - 1]
while True:
new_gap = gap[0] + (gap[1] - gap[0]) // 2
if lamb < spec.iloc[new_gap][0]:
gap[1] = new_gap
else:
gap[0] = new_gap
if gap[0] + 1 == gap[1]:
return gap[0]
[docs]
def cut_spec(spc, lamb, cut_val=1.):
"""
Function to restrict the array in the desired range
:param spc: the array.
:param lamb: the central position.
:param cut_val: the range to be restricted.
:return: the truncated array.
"""
val0 = bisec(spc, lamb - cut_val)
val1 = bisec(spc, lamb + cut_val)+1
return spc.iloc[val0:val1]
[docs]
def line_boundaries(spec, lamb, threshold=0.98, contpars=None, iterac=10000):
"""
Function to retrieve the width position of an absorption line in a spectrum.
:param spec: 2D spectrum.
:param lamb: central wavelength of the line.
:param threshold: threshold in percentage of the continuum to define line limits.
:param contpars: the calibration values for the continuum method.
:param iterac: maximum number of iterations for the continuum method.
:return: min ax max position values in the array for the line.
"""
lamb_pos = bisec(spec, lamb)
cont_level = fit_continuum(spec, contpars=contpars, iterac=iterac)[0]
min_line = lamb_pos
max_line = lamb_pos
while True:
min_line -= 1
if spec.iloc[min_line][1] >= cont_level * threshold or min_line <= 0:
break
while True:
max_line += 1
if spec.iloc[max_line][1] >= cont_level * threshold or max_line >= len(spec)-1:
break
return min_line, max_line
[docs]
def spec_operations(spec, lamb_desloc=0., continuum=1., convol=0.):
"""
Function to apply lambda shift, continuum fit and convolution to the spectrum.
:param spec: the spectrum to be changed.
:param lamb_desloc: the shift in wavelength to be applied.
:param continuum: the continuum to be applied.
:param convol: the convolution in FWHM to be applied.
:return: the modified spectrum.
"""
spec[0] = spec[0] + lamb_desloc
spec[1] = spec[1] * continuum
if convol != 0:
try:
# Change the convolution parameter from FWHM to STD
# FWHM = 2 * sqrt[2 * ln(2)] * STD
convol = convol / (2*np.sqrt(2*np.log(2)))
# Apply the convolution using the Gaussian1DKernel function
g = Gaussian1DKernel(stddev=convol)
spec[1] = convolve(spec[1], g) # convolution between the spectrum and the function above
except ValueError:
pass
return spec
[docs]
def fit_continuum(spec, contpars=None, iterac=1000, method=0, contdisabled=False,
medianwindow=3, hardvalue=1.0):
"""
Fit the overall continuum in the entire spectrum.
:param spec: spectrum data.
:param contpars: the calibration values of the Sigma-clipping method.
:param iterac: maximum number of iterations.
:param method: method to fit the continuum: 0-Chebyshev; 1-Sigma-clipping; 2-Simple Average.
:param contdisabled: disable the continuum fit and use a fixed value.
:param medianwindow: median window for the Chebyshev method.
:param hardvalue: fixed value when not continuum fit.
:return: the mean and the standard deviation (continuum and errors).
"""
def sigma_clipping():
if contpars is None:
alpha = .5
eps = 20
else:
alpha = contpars[0]
eps = contpars[1]
eps = np.float16(10)**(-np.float16(eps))
new_spec = spec.iloc[:, 1].values.tolist()
median = np.median(new_spec)
std = np.std(new_spec)
std_old = std
count = 0
while True:
for i in reversed(range(len(new_spec))):
value = new_spec[i]
if value > median + alpha * std or value < median - alpha * std:
del new_spec[i]
median = np.median(new_spec)
std = np.std(new_spec)
count += 1
if count >= iterac:
print("Maximum Iterations for Continuum fit reached. iter_max = ", iterac)
break
elif (std_old - std) / std <= eps:
break
std_old = std
median = np.median(new_spec)
std = np.std(new_spec)
return median, std, np.zeros(len(spec)) + median
def chebyshev():
spec1d = Spectrum(spectral_axis=spec.iloc[:, 0].values.tolist() * u.AA,
flux=spec.iloc[:, 1].values.tolist() * u.dimensionless_unscaled)
with warnings.catch_warnings(): # Ignore warnings
warnings.simplefilter('ignore')
g1_fit = fit_generic_continuum(spec1d)
continuum_fitted = g1_fit(spec1d.spectral_axis)
return float(np.median(continuum_fitted)), 0, continuum_fitted
def simple_average():
new_spec = spec.iloc[:, 1].values.tolist()
mean = np.mean(new_spec)
std = np.std(new_spec)
return mean, std, np.zeros(len(spec)) + mean
if contdisabled:
return hardvalue, 0, np.zeros(len(spec)) + hardvalue
if method == 0:
cont, cont_err, func = sigma_clipping()
elif method == 1:
cont, cont_err, func = chebyshev()
else:
cont, cont_err, func = simple_average()
return cont, cont_err, func
# Compile C files
if not os.path.isfile(Path(os.path.dirname(__file__)).joinpath("bisec_interpol.so")):
print("C module not found. Compiling...")
currdir = os.getcwd()
os.chdir(Path(os.path.dirname(__file__)))
subprocess.run("gcc -Wall -pedantic -O3 bisec_interpol.c -o bisec_interpol.so -shared", shell=True)
os.chdir(currdir)
print("Done.")
c_lib = c_init(Path(os.path.dirname(__file__)).joinpath("bisec_interpol.so")) # initialize the C library