...
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
# **************************** LICENSE START ***********************************
#
# Copyright 2022 ECMWF. This software is distributed under the terms
# of the Apache License version 2.0. In applying this license, ECMWF does not
# waive the privileges and immunities granted to it by virtue of its status as
# an Intergovernmental Organization or submit itself to any jurisdiction.
#
# ***************************** LICENSE END ************************************
import cfgrib
import xarray as xr
import numpy as np
from eccodes import *
import matplotlib.pyplot as plt
import argparse
import sys
import os
def parse_args():
''' Parse program arguments using ArgumentParser'''
parser = argparse.ArgumentParser(description ="Python tool to calculate the model level variable at a given pressure level and write data to a netCDF file")
parser.add_argument('-p', '--pressure', required=True, nargs='+',type=float,
help='Pressure levels (Pa) to calculate the variable')
parser.add_argument('-o', '--output', required=False, help='name of the output file (default "output.nc"')
parser.add_argument('-i', '--input', required=True, metavar='input.grib', type=str,
help=('grib file with required variable(s) on model level and surface pressure fields',
'the model levels'))
args = parser.parse_args()
if not args.output:
args.output = 'output.nc'
return args
def get_input_variable_list(fin):
f = open(fin)
var_list = []
while 1:
gid = codes_grib_new_from_file(f)
if gid is None:
break
keys = ('dataDate', 'dataTime', 'shortName')
for key in keys:
if key == 'shortName':
var_list.append(codes_get(gid, key))
codes_release(gid)
var_list_unique = list(set(var_list))
f.close()
if 'lnsp' not in var_list_unique:
print("Error - lnsp variable missing from input file -exiting")
sys.exit()
if len(var_list_unique) < 2:
print("Error - Data variable missing from input file -exiting")
sys.exit()
return var_list_unique
def check_requested_levels(plevs):
check_lev = True
if len(plevs) > 1:
error_msg = "Error - only specify 1 input pressure level to interpolate to"
else:
for lev in plevs:
if lev < 0 or lev > 110000 :
check_lev = False
error_msg = "Error - negative values and large positive values for pressure are not allowed -exiting"
if check_lev == False:
print(error_msg)
sys.exit()
return check_lev
def check_in_range(data_array,requested_levels):
amin = data_array.minimum()
amax = data_array.maximum()
print("min max ",amin,amax)
def vertical_interpolate(vcoord_data, interp_var, interp_levels):
"""A function to interpolate sounding data from each station to
every millibar. Assumes a log-linear relationship.
Input
-----
vcoord_data : A 1D array of vertical level values (e.g., pressure from a radiosonde)
interp_var : A 1D array of the variable to be interpolated to all pressure levels
vcoord_interp_levels : A 1D array containing veritcal levels to interpolate to
Return
------
interp_data : A 1D array that contains the interpolated variable on the interp_levels
"""
global interp_data
l_count = 0
for l in interp_levels:
if l < np.min(vcoord_data) or l > np.max(vcoord_data):
interp_data[l_count] = np.NAN
# Make vertical coordinate data and grid level log variables
lnp = np.log(vcoord_data)
lnp_intervals = [np.log(x) for x in interp_levels]
# Use numpy to interpolate from observed levels to grid levels
interp_data = np.interp(lnp_intervals, lnp, interp_var)
return interp_data[0]
def calculate_pressure_on_model_levels(ds_var,ds_lnsp):
# Get the number of model levels in the input variable
nlevs=ds_var.sizes['hybrid']
# Get the a and b coefficients from the pv array to calculate the model level pressure
pv_coeff = np.array(ds_var.GRIB_pv)
pv_coeff=pv_coeff.reshape(2,nlevs+1)
a_coeff=pv_coeff[0,:]
b_coeff=pv_coeff[1,:]
# get the surface pressure in hPa
sp = np.exp(ds_lnsp)
p_half=[]
for i in range(len(a_coeff)):
p_half.append(a_coeff[i] + b_coeff[i] * sp)
p_ml=[]
for hybrid in range(len(p_half) - 1):
p_ml.append((p_half[hybrid + 1] + p_half[hybrid]) / 2.0)
ds_p_ml = xr.concat(p_ml, 'hybrid')
return ds_p_ml
def plot_profile(var_ml,press_ml, var_int_press,var_int_plevs,tstep,lat,lon):
var_v= var_ml.sel(time = var_ml.time[tstep],longitude=lon, latitude=lat, method='nearest')
var_v_values = var_v.values
var_p= press_ml.sel(time = var_ml.time[tstep],longitude=lon, latitude=lat, method='nearest')
var_p_values = var_p.values
var_ip= var_int_press.sel(time = var_ml.time[tstep],longitude=lon, latitude=lat, method='nearest')
var_ip_values = var_ip.values
var_ip_p = var_ip.pressure
var_ip_p_values = var_ip_p.values
plt.axis([min(var_v_values), max(var_v_values), max(var_p_values), min(var_p_values)])
plt.plot(var_v_values,var_p_values, 'o', color = 'black')
plt.plot(var_ip_values,var_ip_p_values,'o', color = 'red')
plt.show()
return
def calculate_interpolated_pressure_field(data_var_on_ml, data_p_on_ml,plevs):
nlevs = len(data_var_on_ml.hybrid)
p_array = np.stack(data_p_on_ml, axis=2).flatten()
# Flatten the data array to enable faster processing
var_array = np.stack(data_var_on_ml, axis=2).flatten()
no_grid_points = int(len(var_array)/nlevs)
interpolated_var = np.empty((len(plevs), no_grid_points))
ds_shape = data_var_on_ml.shape
nlats_values = data_var_on_ml.coords['latitude']
nlons_values = data_var_on_ml.coords['longitude']
nlats = len(nlats_values)
nlons = len(nlons_values)
# Iterate over the data, selecting one vertical profile at a time
count = 0
profile_count = 0
interpolated_values=[]
for point in range(no_grid_points):
offset = count*nlevs
var_profile = var_array[offset:offset+nlevs]
p_profile = p_array[offset:offset+nlevs]
interpolated_values.append(vertical_interpolate(p_profile, var_profile, plevs))
profile_count += len(p_profile)
count = count + 1
interpolated_field=np.asarray(interpolated_values).reshape(len(plevs),nlats,nlons)
return interpolated_field
def check_data_cube(dc):
checks = True
for var_name in dc.variables:
if var_name in ['time','step','hybrid','latitude','longitude','valid_time']:
continue
if var_name == 'lnsp':
lnsp_dims = ['time','latitude','longitude']
if all(value in lnsp_dims for value in dc.variables[var_name].dims):
continue
else:
print("Not all required lnsp dimensions found -exiting ", dc.variables[var_name].dims)
checks = False
else:
var_dims = ['time','hybrid','latitude','longitude']
if all(value in var_dims for value in dc.variables[var_name].dims):
continue
else:
print("Not all required variable dimensions found -exiting ",dc.variables[var_name].dims)
checks = False
continue
return checks
def main():
'''Main function'''
print("-p <pressure level (Pa) > -o <output_file> -i <input grib file>")
print("e.g. to process a grib file containing 6 hours of lnsp and temperature data to the 500 hPa level:")
print("python3 script.py -o output_press.nc -p 50000 -i output_00_06_130_152_1x1.grib`n")
args = parse_args()
print('Arguments: %s' % ", ".join(
['%s: %s' % (k, v) for k, v in vars(args).items()]))
plevels = args.pressure
plevels.sort(reverse = True)
check_requested_levels(plevels)
input_fname = args.input
output_fname = args.output
if not os.path.isfile(input_fname):
print("Input file does not exist - exiting")
sys.exit()
variable_list = get_input_variable_list(input_fname)
# Create a data object to hold the input and derived data
data_cube = xr.merge(cfgrib.open_datasets(input_fname, backend_kwargs={'read_keys': ['pv']}), combine_attrs='override')
if not check_data_cube(data_cube):
sys.exit()
# Get the ln surface pressure
lnsp = data_cube['lnsp']
for var in variable_list:
if var == 'lnsp':
continue
else:
data_cube['pml']=data_cube[var].copy()
break
for var in variable_list:
if var == 'lnsp' :
continue
data_pressure_on_model_levels_list =[]
for time_step in range(len(data_cube[var].time)):
data_slice_var=data_cube[var][time_step,:,:,:]
data_slice_lnsp=data_cube['lnsp'][time_step,:,:]
# Get the pressure field on model levels for each timestep
data_cube['pml'][time_step,:,:,:] = calculate_pressure_on_model_levels(data_slice_var,data_slice_lnsp)
data_cube['pml'].attrs = {'units' : 'Pa','long_name':'pressure','standard_name':'air_pressure','positive':'down'}
all_interpolated_var_fields_list = []
for var in variable_list:
if var == 'lnsp' or var == 'pml':
continue
interpolated_var_field = data_cube[var].copy()
interpolated_var_field = interpolated_var_field[:,0:len(plevels),:,:]
interpolated_var_field = interpolated_var_field.rename({'hybrid':'pressure'})
interpolated_var_field['pressure'] = plevels
for time_step in range(len(data_cube[var].time)):
var_on_ml = data_cube[var][time_step,:,:,:]
p_on_ml = data_cube['pml'][time_step,:,:,:]
interpolated_var_field[time_step,:,:,:] = calculate_interpolated_pressure_field(var_on_ml,p_on_ml,plevels)
all_interpolated_var_fields_list.append(interpolated_var_field)
all_interpolated_var_fields = xr.merge(all_interpolated_var_fields_list)
all_interpolated_var_fields['pressure'].attrs = {'units' : 'Pa','long_name':'pressure','standard_name':'air_pressure','positive':'down'}
all_interpolated_var_fields.to_netcdf(output_fname)
# Write interpolated data variable to output filename
PLOT_DATA = False
if PLOT_DATA:
latitude = 45.0
longitude = 0
tstep =0
plot_profile(data_cube[var],data_cube['pml'],interpolated_var_field,plevels,tstep,latitude,longitude)
print("Finished interpolation of variables to pressure level")
if __name__ == '__main__':
main()
|
...