import numpy as np
import matplotlib.pyplot as plt
from astropy.convolution import Gaussian1DKernel, convolve
from scipy.optimize import curve_fit
from astropy.coordinates import SkyCoord
from scipy.signal import find_peaks, peak_widths
import itertools
import scipy.stats as stats
from scipy.signal import savgol_filter
from itertools import product
from gaussFitSpec import fit_spectrum
from gaussFitSpec.fitting import gaussian, multi_gaussian
from .spectra_decomposing_io import (
load_six_column_spectrum,
validate_absorption_input,
write_table_outputs,
)
from .spectra_decomposing_plotting import create_legacy_axes, plot_fit_panels
from .spectra_decomposing_utils import align_spectra_grids, filter_positive_error_rows
class _GaussianFitSpecAdapter:
"""Thin compatibility bridge from legacy code to ``gaussFitSpec``."""
def __init__(self, x, y, y_err):
self.x = x
self.y = y
self.y_err = y_err
self.CF_limit = 0.97
self.x_peak = []
self.fit_mode = "BIC"
self.nGaussianMax = 8
self.bic_weight = 10
self.num_cold = 0
self.last_result = None
self.fit_label = "absorption"
self.verbose = True
self.initial_center_window = None
self.filter_components = False
@staticmethod
def gaussian_func(x, *params):
"""Evaluate one Gaussian component using ``gaussFitSpec``."""
return gaussian(x, *params)
def gaussian_func_multi(self, x, *gausf):
"""Evaluate summed Gaussian components using ``gaussFitSpec``."""
return multi_gaussian(x, *gausf)
def fitting(self):
"""Fit with ``gaussFitSpec.fit_spectrum`` and return legacy arrays."""
method = "f_test" if self.fit_mode.lower() in {"f-test", "f_test"} else "bic"
kwargs = {
"name": self.fit_label,
"method": method,
"max_components": int(self.nGaussianMax),
"bic_weight": float(self.bic_weight),
"positive_amplitudes": True,
"filter_components": bool(self.filter_components),
"verbose": bool(self.verbose),
}
if len(self.x_peak) > 0:
kwargs["initial_centers"] = np.asarray(list(self.x_peak), dtype=float)
kwargs["initial_center_window"] = self.initial_center_window
elif self.num_cold > 0:
kwargs["fixed_n_components"] = int(self.num_cold)
result = fit_spectrum(self.x, self.y, self.y_err, **kwargs)
self.last_result = result
popt = np.asarray(result.parameters, dtype=float)
covariance = result.covariance
if covariance is None or np.ndim(covariance) != 2:
pcov = np.full_like(popt, np.nan, dtype=float)
else:
diagonal = np.diag(covariance)
pcov = np.where(diagonal >= 0, np.sqrt(diagonal), np.nan)
return popt, pcov
[docs]
class SpectraDecomposing:
"""Legacy radiative-transfer decomposition workflow."""
load_six_column_spectrum = staticmethod(load_six_column_spectrum)
create_legacy_axes = staticmethod(create_legacy_axes)
def __init__(self, x,y,y_err,xemi,yemi,yemi_err):
self.x = x
self.y = y
self.y_err = y_err
self.xemi = xemi
self.yemi = yemi
self.yemi_err = yemi_err
self.CF_limit=0.97
self.Tsmin=2.73
self.F=[0,0.5,1]
self.Tsky=2.73 #CMB
self.v_shift=0.00001 # mannually adjust shift
self.nGaussian=0
self.nGaussianMax=8
self.bic_weight=20
self.bic_max_value=10000
self.num_cold=0 # number of cold components assigned, if zero, means no assignment and fitting is fully automatic.
# self.bic_weight=20
self.peak_abs=[]
self.peak_emi=[]
self.fit_mode='BIC'
self.ax=None
self.savecsv=False
self.name='source'
self.datapath='./'
self.renew=False
self.align_data=False
self.max_auto_warm_components=None
self.absorption_center_window=5.0
def _prepare_inputs(self):
"""Filter invalid rows, validate absorption format, and align grids if needed."""
x, y, yerr = filter_positive_error_rows(self.x, np.copy(self.y), self.y_err)
xemi, yemi, yemi_err = filter_positive_error_rows(self.xemi, self.yemi, self.yemi_err)
validate_absorption_input(y, yerr)
if self.align_data:
x, y, yerr, xemi, yemi, yemi_err = align_spectra_grids(x, y, yerr, xemi, yemi, yemi_err)
return x, y, yerr, xemi, yemi, yemi_err
[docs]
@staticmethod
def parse_coords(coord_string):
"""
Convert a coordinate string in the format 'JHHMMSSÂħDDMMSS' to RA and Dec.
Parameters:
coord_string (str): The coordinate string, e.g., 'J053344-721624'.
Returns:
tuple: RA (in decimal degrees), Dec (in decimal degrees)
"""
# Add colons between the components to make it parseable
ra = coord_string[1:3] + 'h' + coord_string[3:5] + 'm' + coord_string[5:7] + 's'
dec = coord_string[7:10] + 'd' + coord_string[10:12] + 'm' + coord_string[12:] + 's'
# Create a SkyCoord object with formatted RA and Dec
coords = SkyCoord(ra + ' ' + dec, frame='icrs')
# Get RA and Dec in decimal degrees
return coords.ra.degree, coords.dec.degree
[docs]
@staticmethod
def calculate_noise(y,yerr,n=1):
ye=n*yerr-y
p=np.argwhere(ye>0).flatten()
ye_n=yerr[p]
return np.nanmean(ye_n)
[docs]
@staticmethod
def calculate_fwhm(wavelength, intensity):
half_max = np.max(intensity) / 2.0
above_half_max = intensity >= half_max
half_max_wavelengths = wavelength[above_half_max]
fwhm = np.max(half_max_wavelengths) - np.min(half_max_wavelengths)
return fwhm
[docs]
@staticmethod
def sigma1_data(x,rang=30):
err=[]
for i in range(len(x)):
if i<rang:
st=0
ed=i+rang
elif i>(len(x)-rang):
st=i-rang
ed=len(x)-1
else:
ed=i+rang
st=i-rang
err.append(np.std(x[st:ed]))
return np.array(err)
[docs]
def gaussian_func_multi(self,x, *gausf):
"""Evaluate the sum of Gaussian components via ``gaussFitSpec``."""
return multi_gaussian(x, *gausf)
[docs]
@staticmethod
def gaussian_func(x, *params):
"""Evaluate one Gaussian component via ``gaussFitSpec``."""
return gaussian(x, *params)
[docs]
@staticmethod
def F_test(x,y_0, y_fit1, y_fit2, sigma_rms, x_1, x_2):
'''
x_1,x_2: degree of freedom
'''
chi_square1=np.sum((y_0-y_fit1)**2/sigma_rms)/x_1
chi_square2=np.sum((y_0-y_fit2)**2/sigma_rms)/x_2
F=chi_square1/chi_square2*x_2/x_1
CF=stats.f.cdf(F, x_1, x_2)
return CF
[docs]
@staticmethod
def fwhm_x_range(x, y):
peaks, _ = find_peaks(y)
if len(peaks) == 0:
return np.nan # No peaks found
results_half = peak_widths(y, peaks, rel_height=0.3)
left_ips = results_half[2]
right_ips = results_half[3]
x_left = x[left_ips.astype(int)]
x_right = x[right_ips.astype(int)]
x_min = np.min(x_left)
x_max = np.max(x_right)
return x_max - x_min
[docs]
def Gaussian_fit(self):
"""Run the full absorption plus emission decomposition workflow."""
x, y, yerr, xemi, yemi, yemi_err = self._prepare_inputs()
peak_emi=self.peak_emi
F=self.F
Tsmin=self.Tsmin
v_sh=self.v_shift
yemi_selferr=yemi-savgol_filter(yemi, window_length=51, polyorder=3)
#y is 1-e^(-tau)
bic_limit=-1000
ynew=np.copy(y)
g = Gaussian1DKernel(stddev=0.5)
ynew = convolve(ynew, g)
print('start absorption fitting')
gf=_GaussianFitSpecAdapter(x,ynew,yerr)
gf.fit_label="smoothed"
gf.x_peak=self.peak_abs
gf.fit_mode=self.fit_mode
gf.bic_weight=self.bic_weight
gf.num_cold=self.num_cold
gf.initial_center_window=self.absorption_center_window if len(self.peak_abs) > 0 else None
gf.filter_components=len(self.peak_abs) > 0
popt_,pcov_=gf.fitting()
#print(popt_[1::3])
if y.max()>=1.:
print('Satuated')
p=np.argwhere(y>=0.99).flatten()
y[p]=y[p]/y.max()*0.99
#y=y/y.max()-np.exp(-3)
y=-np.log(1-y)
gf=_GaussianFitSpecAdapter(x,y,yerr)
gf.fit_label="tau"
gf.x_peak=popt_[1::3]
gf.fit_mode=self.fit_mode
gf.bic_weight=self.bic_weight
gf.num_cold=self.num_cold
gf.initial_center_window=self.absorption_center_window
gf.filter_components=True
popt,pcov=gf.fitting()
#popt[1]=popt[1]-3
popt_ori=np.copy(popt)
print('Absorption fitting finished')
ncold=int(len(popt)/3)
p0=np.zeros(ncold*2)+self.Tsmin
for _ in range(ncold):
p0[_]=popt[3*_+1]
Ts_low_lim=[]
for i in range(ncold):
j=i*3
_popt=popt[j:j+3]
p=np.argmin(abs(_popt[1]-xemi))
Ts_low_lim.append(yemi_err[p]/(1-np.exp(-y[p])))
#print('Ts_low limit', Ts_low_lim)
#print(x,xemi)
if len(peak_emi)>0:
#print(xemi[peak_emi],peak_emi)
nwarm=len(peak_emi)
for i in range(len(peak_emi)):
p0=np.append(p0,[np.max(yemi), peak_emi[i], 1])
auto=False
else:
p=np.argmax(yemi)
nwarm=1
p0=np.append(p0,[np.max(yemi), xemi[p], 1])
auto=True
print('start emission fitting')
def loop(p0,ncold, xemi, yemi,yemi_err,popt,F=F,nwarm=nwarm,Tsmin=Tsmin):
#print(p0)
popt2_=[]
res=[]
wf=[]
sigma_Tsf,all_Tsf=[],[]
funTexp=[]
order=[]
fit_err=[]
v_shift=[]
lowbound=np.array([0. for _ in range(len(p0))])
#lowbound=np.array([-np.inf for _ in range(len(p0))])
for _ in range(ncold):
lowbound[_]=p0[_]-v_sh
_=2*ncold
lowbound[ncold:_]=Tsmin
for _ in range(nwarm):
lowbound[2*ncold+_*3+1]=xemi.min()
ind=np.argmin(np.abs(x - p0[2*ncold+_*3+1]))
lowbound[2*ncold+_*3]=np.mean(yemi_err[max(ind-20,0):min(ind+20,len(xemi)-1)])
#lowbound[2*ncold+_*3]=np.max(yemi_err)*0.5
p0[2*ncold+_*3]=np.mean(yemi_err[max(ind-20,0):min(ind+20,len(xemi)-1)])*1+1
#p0[2*ncold+_*3]=np.max(yemi_err)*0.5+1
#lowbound[2*ncold+_*3]=calculate_noise(yemi,yemi_err,n=3)*2
highbound=np.array([np.inf for _ in range(len(p0))])
for _ in range(ncold):
highbound[_]=p0[_]+v_sh
for _ in range(ncold):
__=_*3
_popt=popt[__:__+3]
fwhm_=2.35482*_popt[2]
max_y2 = np.max(yemi[(xemi >= xemi[np.argmax(y)] - v_sh) & (xemi <= xemi[np.argmax(y)] + v_sh)])
_1=max_y2/(1-np.exp(-_popt[0]))-Tsmin*(np.exp(-_popt[0])-1)
cons=min(_1+_1*0.3,21.866*fwhm_**2)
if cons>Tsmin:
highbound[ncold+_]=cons
else:
highbound[ncold+_]=Tsmin+100
for _ in range(nwarm):
highbound[2*ncold+_*3+1]=xemi.max()
highbound[2*ncold+_*3]=np.max(yemi)+ 3*np.max(yemi_err)
highbound[2*ncold+_*3+2]=max(np.ptp(xemi[yemi > 4 * yemi_err]) if np.any(yemi > 4 * yemi_err) else 0,
self.fwhm_x_range(xemi,yemi))
mask = (p0 > highbound) | (p0 < lowbound)
p0[mask] = (highbound[mask] + lowbound[mask]) / 2
# print(highbound,lowbound,p0)
values = np.arange(0, ncold)
# print(p0,lowbound,highbound)
CNMsequences = np.array(list(itertools.permutations(values, ncold)))
Fsequences = np.array(list(itertools.product(F, repeat=nwarm)))
for cn in range(len(CNMsequences)):
for i in range(len(Fsequences)):
def T_exp(x,*pa):
_vp=pa[:ncold]
_=2*ncold
Ts=pa[ncold:_]
para=pa[_:]
T_CNM = np.zeros_like(x)
CN_=CNMsequences[cn]
for j in range(len(CN_)):
index=CN_[j]*3
if j==0:
tau_=0
else:
tau_=np.zeros_like(x)
for l in range(j):
index2=CN_[l]*3
#tau_+=np.log(1-gaussian_func(x,*popt[index2:index2+3]))
tau_+=self.gaussian_func(x,*popt[index2:index2+3])
#T_CNM+=gaussian_func(x,*popt[index:index+3])*Ts[CN_[j]]*np.exp(tau_)
_popt_=popt[index:index+3]
_popt_[1]=_vp[CN_[j]]
#print(_popt_,x,gaussian_func(x,*_popt_))
T_CNM+=(1-np.exp(-self.gaussian_func(x,*_popt_)))*(Ts[CN_[j]])*np.exp(-tau_)
T_WNM = np.zeros_like(x)
F_=Fsequences[i]
for m in range(len(F_)):
i_=m*3
#T_WNM+=(F_[m] + (1 - F_[m]) *(1-gaussian_func_multi(x,*popt))) * gaussian_func(x, *para[i_:i_+3])
T_WNM+=(F_[m] + (1 - F_[m]) *np.exp(-self.gaussian_func_multi(x,*popt))) * self.gaussian_func(x, *para[i_:i_+3])
#print(np.exp(-gaussian_func_multi(x,*popt)))
T_WNM=T_WNM+self.Tsky*(np.exp(-self.gaussian_func_multi(x,*popt))-1)
return T_CNM+T_WNM
try:
pop_, pcov = curve_fit(T_exp, xemi, yemi,p0=p0,bounds=(lowbound,highbound),maxfev=120000)
except RuntimeError as e:
print(f"Fit failed: {e}")
pcov_=np.sqrt(np.diag(pcov))
# print(pcov_)
popt2_.append(pop_)
funTexp.append(T_exp(xemi, *pop_))
wf.append(1/np.std((yemi-T_exp(xemi, *pop_)))**2)
v_ori=np.array([popt_ori[3*_+1] for _ in range(ncold)])
v_shift.append(pop_[:ncold]-v_ori)
_=2*ncold
sigma_Tsf.append(pcov_[ncold:_])
all_Tsf.append(pop_[ncold:_])
order.append(CNMsequences[cn])
fit_err.append(pcov_)
residuals=yemi-T_exp(xemi, *pop_)
chi2 = np.nansum((residuals / yemi_err) ** 2)
k = len(pop_)
n = len(yemi)
#print(k,n,chi2,yemi_err.min())
bic = k * np.log(n) + chi2
_pc=pcov_
#print('pcov',_pc)
count = np.sum(np.array(_pc)>50)
p=np.argwhere(_pc>50).flatten()
_b=bic/3+np.sum(_pc[p]/10)
bic+=_b*count
#if len(p)>0:
# print('large error bars:',_pc[p])
# print('bic',bic)
#bic +=np.sum(_pc[p]/2)
count = np.sum(np.array(_pc)/np.array(pop_)>0.5)
# if count>0:
# print(bic,count)
_b=bic/2
bic+=_b*count
a=pop_[_:]
#count = np.sum(a[0::3]<self.calculate_noise(yemi,yemi_err)*3)
count = np.sum(a[0::3]<max(np.max(yemi_err),self.calculate_noise(yemi,yemi_err)*3))
#print(a[0::3],calculate_noise(yemi,yemi_err),count)
#print(count)
#if count>0:
# print(count)
_b=20
bic +=_b* count
#print(k,n,chi2,bic)
#print(pcov_,bic)
res.append(bic)
# print(i,cn)
#res.append(np.sqrt(np.sum((yemi-T_exp(xemi, *pop_))**2)))
#print(np.array(res),p0)
return (res,popt2_,funTexp,Fsequences,wf,np.array(sigma_Tsf),
np.array(all_Tsf),np.array(order),np.array(fit_err),np.array(v_shift))
if auto:
if self.fit_mode=='F_test':
CF=1
p0_1=p0
n_=1
res1p=1000
resye=np.sqrt(np.sum(yemi_selferr**2))
#print('1,',CF,C_Flim, res1p,resye)
print(resye)
amp2=np.max(yemi)
restd2=np.std(yemi_selferr)
while CF>self.C_Flim and n_<self.nGaussianMax:
# while CF>C_Flim and res1p>resye and n_<7:
#while CF>C_Flim and amp2>2*restd2 and n_<7:
res1,popt2_1,funTexp1,Fsequences1,wf1,sigma_Tsf1,all_Tsf1,order1,fit_err1,v_shift1=loop(p0_1,ncold, xemi,
yemi,
yemi_err,popt,nwarm=nwarm)
_=2*ncold
Ts_1 = [popt2_1[i][ncold:_] for i in range(len(popt2_1))]
score_1=[]
for j in range(len(res1)):
score_=0
score_+=res1[j]
for i in range(ncold):
if Ts_1[j][i]>10:
Ts_score=-res1[j]/100
else:
Ts_score=0
score_+=Ts_score
score_1.append(score_)
p=np.argmin(res1)
res1p=res1[p]
funT1=funTexp1[p]
g = Gaussian1DKernel(stddev=3)
_res=yemi-funT1
_res= convolve(_res, g)
p_=np.argmax(_res)
p0_2=np.append(p0_1,[1, xemi[p_], 1])
#print('p:',p0_2)
nwarm=nwarm+1
res2,popt2_2,funTexp2,Fsequences2,wf2,sigma_Tsf2,all_Tsf2,order2,fit_err1,v_shift1=loop(p0_2,ncold, xemi, yemi,
yemi_err,popt,nwarm=nwarm)
_=2*ncold
Ts_2 = [popt2_2[i][ncold:_] for i in range(len(popt2_2))]
score_2=[]
for j in range(len(res2)):
score_=0
score_+=res2[j]
for i in range(ncold):
if Ts_2[j][i]>10:
Ts_score=-res2[j]/100
else:
Ts_score=0
score_+=Ts_score
score_2.append(score_)
p=np.argmin(res2)
funT2=funTexp2[p]
CF=self.F_test(xemi,yemi, funT1, funT2, yemi_err, len(xemi)-len(p0_1), len(xemi)-len(p0_2))
n_+=1
p0_1=p0_2
restd2=np.sqrt(res2[p]**2/(len(yemi-1)))
amp2 = min(popt2_2[p][ncold + i * 3] for i in range(nwarm))
print(CF,n_,res1p)
res,popt2_,funTexp,Fsequences=res1,popt2_1,funTexp1,Fsequences1
wf,sigma_Tsf,all_Tsf=wf1,sigma_Tsf1,all_Tsf1
order=order1
fit_err=fit_err1
v_shift=v_shift1
elif self.fit_mode=='BIC':
print('Fitting mode: BIC')
def fit_and_calculate_bic(x, y, y_error):
p0_1=np.zeros(ncold*2)+Tsmin
for _ in range(ncold):
p0_1[_]=popt[3*_+1]
pe, _ =find_peaks(y, height=np.max(y)/7, distance=5)
#print('_pe',x[pe])
index=np.argsort(y[pe])[::-1]
pe=pe[index]
_pe = min(6, len(pe))
#print('peak',len(pe),x[pe],y[pe])
res1,popt2_1,funTexp1,Fsequences1,wf1,sigma_Tsf1,all_Tsf1,order1,fit_err1,v_shift1=loop(p0_1,ncold, x,
y,y_error,popt,nwarm=0)
_=2*ncold
Ts_1 = [popt2_1[i][ncold:_] for i in range(len(popt2_1))]
score_1 = [ res1[j] + sum(-5 if Ts_1[j][i] > 10 else 0 for i in range(ncold))
for j in range(len(res1))]
p=np.argmin(score_1)
_bbic=score_1[p]
best_bic=score_1[p]
best_mean_score=np.mean(score_1)
_mmean_score=np.mean(score_1)
b_pos=-1
print('BIC ',_bbic,'Mean_score ', _mmean_score,'nemi=',0)
improving = True
nwarm=1
best_p0_1 = np.copy(p0_1)
best_nwarm = 0
y_res=y
num=0
lim=1
#while improving or num<=lim:
bic_limit=500
while improving and (
self.max_auto_warm_components is None
or nwarm <= self.max_auto_warm_components
):
#print(improving, num,lim)
_bbic=self.bic_max_value
_mmean_score=self.bic_max_value
try:
#print('_pe',x[pe])
for pos in range(_pe):
_pp=pe[pos]
_p0_1=np.append(p0_1,[np.max(y_error), x[_pp], 1])
#print('_p0_1',_p0_1)
res1,popt2_1,funTexp1,Fsequences1,wf1,sigma_Tsf1,all_Tsf1,order1,fit_err1,v_shift1=loop(_p0_1,
ncold, x,
y,y_error,popt,nwarm=nwarm)
_=2*ncold
Ts_1 = [popt2_1[i][ncold:_] for i in range(len(popt2_1))]
score_1 = [ res1[j] + sum(-10 if Ts_1[j][i] > 10 else 0 for i in range(ncold))
for j in range(len(res1))]
#print('score_1',np.array(score_1))
# identify large mean bic but small minimum bic
score_1=np.array(score_1)
p=np.argwhere(score_1<self.bic_max_value).flatten()
if len(p)>0:
score_1=score_1[p]
else:
print('too large BIC value')
# print('sc',score_1)
#if np.mean(score_1)-np.min(score_1)>100 and np.min(score_1)<best_bic:
if np.min(score_1)<best_bic and np.mean(score_1)-np.min(score_1)>100:
#print('True')
#print('>300')
#print(np.min(score_1),np.max(score_1),np.mean(score_1))
bic_limit=best_mean_score+40
_p=np.argwhere(score_1<bic_limit).flatten()
score_1=score_1[_p]
#print(np.min(score_1),np.max(score_1),np.mean(score_1))
#print(score_1[score_1<np.mean(score_1)])
#print('sc',score_1)
p=np.argmin(score_1)
_bic=score_1[p]
#print('_bic',_bic)
_mean_score=np.mean(score_1)
#if _bic-_bbic<.5 and _mean_score-_mmean_score<10:
#print(_bic-_bbic,_mean_score-_mmean_score)
if _bic-_bbic<.1 and _mean_score-_mmean_score<10:
#print('BIC ',_bic, 'Mean_score ', _mean_score)
#if np.sum(fit_err1/popt2_1>0.5)>0:
# print('BIC ',_bic, 'Mean_score ', _mean_score,'fit_err',fit_err1)
_bbic = _bic
b_pos=pos
_mmean_score=_mean_score
funT1=funTexp1[p]
g = Gaussian1DKernel(stddev=2)
_res=yemi-funT1
y_res= convolve(_res, g)
except RuntimeError as e:
print(f"Fit failed for Gaussians: {e}")
nwarm+=1
p0_1=np.append(p0_1,[np.max(y_error), x[pe[b_pos]], 1])
bic=_bbic
mean_score=_mmean_score
pe0, _ =find_peaks(y_res, height=np.max(y_res)/5, distance=5)
pe=np.append(pe,pe0)
pe=np.unique(pe)
index=np.argsort(y[pe])[::-1]
pe=pe[index]
_pe = min(7, len(pe))
b_pos=-1
print('BIC ',bic, 'Mean_score ', mean_score,'nemi=',nwarm-1)
#print('bestBIC ',best_bic, 'Mean_score ', best_mean_score)
if (bic-best_bic)<.1 and (mean_score-best_mean_score)<10:
# a=np.array(fit_err1)
# b=np.array(popt2_1)
#print('BIC ',_bic, 'Mean_score ', _mean_score,'fit_err',a/b)
#if bic< best_bic+.1:
best_bic = bic
best_mean_score=mean_score
best_p0_1 = np.copy(p0_1)
best_nwarm = nwarm - 1
if not improving:
num=0
lim=0
else:
# If the BIC did not improve, stop fitting additional Gaussians
num+=1
# if num>=lim and lim<2:
# lim+=1
if best_bic<700:
improving = False
# print(improving)
#print(p0_1)
# print(p0_1,nwarm)
#back=-3*(lim+1)
final_p0_1 = best_p0_1
final_nwarm = best_nwarm
if final_nwarm == 0 and len(p0_1) > 2 * ncold:
final_p0_1 = p0_1[:-3]
final_nwarm = max(nwarm - 2, 0)
back=-3
# print(back,p0_1[:back])
# res1,popt2_1,funTexp1,Fsequences1,wf1,sigma_Tsf1,all_Tsf1,order1,fit_err1,v_shift1=loop(p0_1[:back],ncold, x,
# y,y_error,popt,nwarm=nwarm-2-lim)
res1,popt2_1,funTexp1,Fsequences1,wf1,sigma_Tsf1,all_Tsf1,order1,fit_err1,v_shift1=loop(final_p0_1,ncold, x,
y,y_error,popt,nwarm=final_nwarm)
# print('res1',np.array(res1))
# print(res1)
return res1,popt2_1,funTexp1,Fsequences1,wf1,sigma_Tsf1,all_Tsf1,order1,fit_err1,v_shift1,bic_limit
res,popt2_,funTexp,Fsequences,wf,sigma_Tsf,all_Tsf,order,fit_err,v_shift,bic_limit=fit_and_calculate_bic(xemi, yemi, yemi_err)
else:
print('Fitting mode: mannual')
resye=np.sqrt(np.sum(yemi_selferr**2))
#print(resye)
p0_=p0[2*ncold:]
t0=p0[:2*ncold]
num = len(p0_) // 3
#modifications = [
# [p0_[i * 3 + 1] - 4, p0_[i * 3 + 1] + 4] for i in range(num)
#]
modifications = [
[p0[i * 3 + 1] - 4,
p0[i * 3 + 1],
p0[i * 3 + 1] + 4] for i in range(num)
]
# Generate all combinations of the modified second elements
all_combinations = list(product(*modifications))
original_p0_combination = tuple(p0_[i * 3 + 1] for i in range(num))
all_combinations.append(original_p0_combination)
# print(all_combinations)
# Evaluate all combinations to find the minimum BIC
results = []
for combination in all_combinations:
new_p0 = p0_.copy()
for i in range(num):
new_p0[i * 3 + 1] = combination[i] # Update second elements
new_p0=np.concatenate((t0, new_p0))
results.append(loop(new_p0,ncold, xemi, yemi,yemi_err,popt))
res,popt2_,funTexp,Fsequences,wf,sigma_Tsf,all_Tsf,order,fit_err,v_shift= min(results, key=lambda r: r[0])
#print('BIC=',bic)
#res,popt2_,funTexp,Fsequences,wf,sigma_Tsf,all_Tsf,order,fit_err,v_shift=loop(p0,ncold, xemi, yemi,yemi_err,popt)
_=2*ncold
Ts_1 = [popt2_[i][ncold:_] for i in range(len(popt2_))]
#print(Ts_1)
score_1=[]
for j in range(len(res)):
score_=0
score_+=res[j]
for i in range(ncold):
if Ts_1[j][i]>10:
Ts_score=-10
else:
Ts_score=0
score_+=Ts_score
score_1.append(score_)
res=np.array(res)
score_1,popt2_,funTexp,Fsequences,wf,sigma_Tsf,all_Tsf=(np.array(score_1),np.array(popt2_),
np.array(funTexp),np.array(Fsequences),
np.array(wf),np.array(sigma_Tsf),np.array(all_Tsf))
#print(score_1.shape,popt2_.shape,funTexp.shape,Fsequences.shape,wf.shape,sigma_Tsf.shape,all_Tsf.shape)
p=np.argwhere(score_1<self.bic_max_value).flatten()
if len(p)>0:
score_1,popt2_,funTexp,wf,sigma_Tsf,all_Tsf,order,fit_err,v_shift,res=score_1[p],popt2_[p],funTexp[p],wf[p],sigma_Tsf[p],all_Tsf[p],order[p],fit_err[p],v_shift[p],res[p]
if len(score_1)>3:
if np.mean(score_1)-np.min(score_1)>100 and bic_limit>0:
_p=np.argwhere(score_1<(bic_limit)).flatten()
else:
_p=np.argwhere(score_1<(np.mean(score_1)+3*np.std(score_1))).flatten()
#print(score_1,_p,np.mean(score_1)+np.std(score_1))
score_1,popt2_,funTexp,wf,sigma_Tsf,all_Tsf,order,fit_err,v_shift,res=(score_1[_p],popt2_[_p],funTexp[_p],wf[_p],sigma_Tsf[_p],
all_Tsf[_p],order[_p],fit_err[_p],v_shift[_p],res[_p])
#print(score_1)
#print(all_Tsf.shape)
#for i in range(ncold):
# ___p=np.argwhere(all_Tsf[:,i]>3.77).flatten()
# print('Ts>3.77', ___p.shape)
# if len(___p)>0:
# score_1,popt2_,funTexp,wf,sigma_Tsf,all_Tsf,order,fit_err,v_shift,res=(score_1[___p],popt2_[___p],funTexp[___p],
# wf[___p],sigma_Tsf[___p],all_Tsf[___p],
# order[___p],fit_err[___p],v_shift[___p],res[___p])
# else:
# ___p=np.argwhere(all_Tsf[:,i]>3.77).flatten()
# print('Ts>3.77', ___p.shape)
# if len(___p)>0:
# score_1,popt2_,funTexp,wf,sigma_Tsf,all_Tsf,order,fit_err,v_shift,res=(score_1[___p],popt2_[___p],funTexp[___p],
# wf[___p],sigma_Tsf[___p],all_Tsf[___p],
# order[___p],fit_err[___p],v_shift[___p],res[___p])
# else:
# ___p=np.argwhere(all_Tsf[:,i]>0).flatten()
# print('Ts>0', ___p.shape)
# if len(___p)>0:
# score_1,popt2_,funTexp,wf,sigma_Tsf,all_Tsf,order,fit_err,v_shift,res=(score_1[___p],popt2_[___p],funTexp[___p],
# wf[___p],sigma_Tsf[___p],all_Tsf[___p],
# order[___p],fit_err[___p],v_shift[___p],res[___p])
#print('Ts>10 num', ___p.shape)
p=np.argmin(score_1)
print('final BIC ', score_1[p],'mean score ', np.mean(score_1) ,'nemi=',len(popt2_[p][2*ncold:])//3)
#print(score_1[p])
print('velocity shift',v_shift[p])
# print('all_Tsf',all_Tsf)
mean_Ts,sigma_meanTsf=[],[]
for i in range(ncold):
if len(all_Tsf[:,i])>1:
m_=np.sum(wf*all_Tsf[:,i])/np.sum(wf)
mean_Ts.append(m_)
sigma_meanTsf.append(np.sqrt(np.sum(wf*(all_Tsf[:,i]-m_)**2+wf*sigma_Tsf[:,i]**2)/np.sum(wf)*len(wf)/(len(wf)-1)))
else:
mean_Ts.append(float(np.ravel(all_Tsf[:, i])[0]))
sigma_meanTsf.append(float(np.ravel(sigma_Tsf[:, i])[0]))
v_shi=v_shift[p]
popt2=popt2_[p]
_=2*ncold
Ts=popt2[ncold:_]
Ts_err=sigma_Tsf[p]
gausf=popt2[_:]
funT=funTexp[p]
_F=Fsequences[p%len(Fsequences)]
Tfit_err=self.sigma1_data(yemi-funT)
Or=order[p]
fit_e=fit_err[p]
fit_e=fit_e[_:]
#print('fiterr',fit_err)
#if self.selfabs:
# p1=np.argwhere(y==y.max()).flatten()
# a_=abs(xemi-x[p1])
# p=np.argmin(a_)
# Ton=yemi[p]
# Toff=funT[p]
# p2=[]
# for i in range(int(len(gausf)/3)):
# p2.append((i+1)*3-2)
# p3=np.argmin(abs(gausf[p2]-xemi[p]))
# Ts=(Ton-Toff)/y[p1]+gausf[(p3+1)*3-3]
return popt_ori,pcov,popt2,Ts,Ts_err,gausf,funT,Tfit_err,Or,fit_e,mean_Ts,sigma_meanTsf,nwarm,v_shi,_F
[docs]
def fit_and_plot(self):
"""Fit the spectra and draw the legacy four-panel diagnostic figure."""
if self.ax is None:
_, self.ax = create_legacy_axes()
x, y, yerr, xemi, yemi, yemi_err = self._prepare_inputs()
name = self.name
fit_output = self.Gaussian_fit()
(
popt,
pcov,
popt2,
Ts,
Ts_err,
gausf,
funT,
Tfit_err,
Or,
fit_e,
mean_Ts,
sigma_meanTsf,
nwarm,
v_shift,
F_values,
) = fit_output
popt_ori = np.copy(popt)
if self.savecsv:
write_table_outputs(
self,
name=name,
popt=popt,
pcov=pcov,
gausf=gausf,
funT=funT,
xemi=xemi,
Or=Or,
fit_e=fit_e,
mean_Ts=mean_Ts,
sigma_meanTsf=sigma_meanTsf,
v_shift=v_shift,
F_values=F_values,
)
plot_fit_panels(
self,
ax=self.ax,
x=x,
y=y,
yerr=yerr,
xemi=xemi,
yemi=yemi,
yemi_err=yemi_err,
name=name,
popt_ori=popt_ori,
popt=popt,
Ts=Ts,
gausf=gausf,
funT=funT,
mean_Ts=mean_Ts,
sigma_meanTsf=sigma_meanTsf,
nwarm=nwarm,
v_shift=v_shift,
F_values=F_values,
)
return fit_output