common module
This module contains some common functions for both ProfileSet and ProfileDynamics classes and the upcoming Space module.
add_grid_loc_coords(grid_gdf, location=None)
¶
Add coordinate fields of the corners of each grid tiles.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
grid_gdf |
gpd.GeoDataFrame |
The geodataframe storing the grid, returned by the grid_from_pts function. |
required |
location |
str |
The location code associated with the grid. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
grid_gdf (gpd.GeoDataFrame) |
The original grid, with UpperLeft X and Y (ulx,uly), UpperRight X and Y (urx,ury), LowerLeft X and Y (llx,llr) and LowerRigth X and Y (lrx,lry) coordinates fields added. |
Source code in sandpyper\common.py
def add_grid_loc_coords(grid_gdf, location=None):
"""
Add coordinate fields of the corners of each grid tiles.
Args:
grid_gdf (gpd.GeoDataFrame): The geodataframe storing the grid, returned by the grid_from_pts function.
location (str): The location code associated with the grid. Defaults to None.
Returns:
grid_gdf (gpd.GeoDataFrame): The original grid, with UpperLeft X and Y (ulx,uly), UpperRight X and Y (urx,ury), LowerLeft X and Y (llx,llr) and LowerRigth X and Y (lrx,lry) coordinates fields added.
"""
if location is None:
location = np.nan
ulxs = []
urxs = []
lrxs = []
llxs = []
ulys = []
urys = []
lrys = []
llys = []
for grid in range(grid_gdf.shape[0]):
coords = grid_gdf.iloc[grid].geometry.exterior.coords.xy
# get upper-left, upper-right, lower-right and lower-left X coordinates.
ulx = coords[0][0]
urx = coords[0][1]
lrx = coords[0][2]
llx = coords[0][3]
# get upper-left, upper-right, lower-right and lower-left Y coordinates.
uly = coords[1][0]
ury = coords[1][1]
lry = coords[1][2]
lly = coords[1][3]
ulxs.append(ulx)
urxs.append(urx)
lrxs.append(lrx)
llxs.append(llx)
ulys.append(uly)
urys.append(ury)
lrys.append(lry)
llys.append(lly)
grid_gdf.loc[:, "ulx"] = ulxs
grid_gdf.loc[:, "urx"] = urxs
grid_gdf.loc[:, "lrx"] = lrxs
grid_gdf.loc[:, "llx"] = llxs
grid_gdf.loc[:, "uly"] = ulys
grid_gdf.loc[:, "ury"] = urys
grid_gdf.loc[:, "lry"] = lrys
grid_gdf.loc[:, "lly"] = llys
grid_gdf.loc[:, "location"] = location
return grid_gdf
arr2geotiff(array, transform, location, crs_dict_string, shape=(64, 64, 1), driver='GTiff', dtype=<class 'numpy.float32'>, save=None)
¶
Transform an array into a Geotiff given its Shapely transform and location code.
!!! args
array (array): array to be transformed.
transform (tuple): tuple with Shapely transform parameters.
location (str): Location code of the shoreline to convert.
crs_dict_string (dict): Dictionary storing location codes as key and crs information as EPSG code (int).
shape (tuple): Tuple of size 3 of the shape of the array. Default to (64,64,1)
driver ("GTiff"): Driver used by Fiona to save file.
dtype (data type object): Default to numpy.float32.
save (None,path). If a full path is provided, save the file to a geotiff image (C:\my
ew\image.tif). If None (default), the geotiff is saved in the memory but not to the disk.
!!! returns
mem_dataset (rasterio.io.MemoryFile): Geotiff image saved into memory or optionally saved to a new raster.
Source code in sandpyper\common.py
def arr2geotiff(
array,
transform,
location,
crs_dict_string,
shape=(64, 64, 1),
driver="GTiff",
dtype=np.float32,
save=None):
"""Transform an array into a Geotiff given its Shapely transform and location code.
Args:
array (array): array to be transformed.
transform (tuple): tuple with Shapely transform parameters.
location (str): Location code of the shoreline to convert.
crs_dict_string (dict): Dictionary storing location codes as key and crs information as EPSG code (int).
shape (tuple): Tuple of size 3 of the shape of the array. Default to (64,64,1)
driver ("GTiff"): Driver used by Fiona to save file.
dtype (data type object): Default to numpy.float32.
save (None,path). If a full path is provided, save the file to a geotiff image (C:\my\new\image.tif). If None (default), the geotiff is saved in the memory but not to the disk.
Returns:
mem_dataset (rasterio.io.MemoryFile): Geotiff image saved into memory or optionally saved to a new raster.
"""
with MemoryFile() as memfile:
mem_dataset = memfile.open(
driver="GTiff",
height=array.shape[0],
width=array.shape[1],
count=array.shape[2],
dtype=dtype,
transform=transform,
crs=crs_dict_string[loc]
)
mem_dataset.write(array.reshape((shape[0], shape[1])), indexes=shape[2])
if save != None:
with ras.open(
save,
"w",
driver=driver,
height=array.shape[0],
width=array.shape[1],
count=array.shape[2],
dtype=dtype,
transform=transform,
crs=crs_dict_string[loc]
) as dest:
if driver == "PNG":
dest.write(mem_dataset.astype(ras.uint16), indexes=1)
else:
dest.write(array.reshape((shape[0], shape[1])), indexes=shape[2])
return mem_dataset
check_overlaps_poly_label(label_corrections, profiles, crs)
¶
Function to check whether overlapping areas of label correction polygons targeting the same label_k in the same surveys but assigning different new classes do not contain points that would be affected by those polygons.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
label_corrections |
gpd.GeoDataFrame |
GeodataFrame of the label correction polygons. |
required |
profiles |
gpd.GeoDataFrame |
Geodataframe of the extracted elevation profiles. |
required |
crs |
dict, int |
Either an EPSG code (int) or a dictionary. If dictionary, it must store location codes as keys and crs information as values, in dictionary form (example: {'init' :'epsg:4326'}). |
required |
Source code in sandpyper\common.py
def check_overlaps_poly_label(label_corrections, profiles,crs):
"""
Function to check whether overlapping areas of label correction polygons targeting the same label_k in the same surveys but assigning different new classes do not contain points that would be affected by those polygons.
Args:
label_corrections (gpd.GeoDataFrame): GeodataFrame of the label correction polygons.
profiles (gpd.GeoDataFrame): Geodataframe of the extracted elevation profiles.
crs (dict, int): Either an EPSG code (int) or a dictionary. If dictionary, it must store location codes as keys and crs information as values, in dictionary form (example: {'init' :'epsg:4326'}).
"""
for loc in label_corrections.location.unique():
for raw_date in label_corrections.query(f"location=='{loc}'").raw_date.unique():
for target_label_k in label_corrections.query(f"location=='{loc}' and raw_date=={raw_date}").target_label_k.unique():
date_labelk_subset=label_corrections.query(f"location=='{loc}' and raw_date=={raw_date} and target_label_k=={int(target_label_k)}")
# if more than one polygons target the same label k, check if they overlap
if len(date_labelk_subset)>1:
# check if there are any one of them that overlaps
for i,z in comb(range(len(date_labelk_subset)),2):
intersection_gdf = overlay(date_labelk_subset.iloc[[i]], date_labelk_subset.iloc[[z]], how='intersection')
if not intersection_gdf.empty:
# check if the overlapping polygons have assigns different new_classes
if any(intersection_gdf.new_class_1 != intersection_gdf.new_class_2):
# if overlap areas assign different classes, check if this area contains points with label_k equal to both polygons target_label_k..
# if contains points, raise an error as it does not make sense and the polygons must be corrected
# by the user
pts=profiles.query(f"location=='{loc}' and raw_date=={raw_date} and label_k=={int(target_label_k)}")
if isinstance(pts.iloc[0]['coordinates'],Point):
pts_gdf=pts
elif isinstance(pts.iloc[0]['coordinates'],str):
pts['coordinates']=pts.coordinates.apply(coords_to_points)
if isinstance(crs, dict):
pts_gdf=gpd.GeoDataFrame(pts, geometry='coordinates', crs=crs[loc])
elif isinstance(crs, int):
crs_adhoc={'init': f'epsg:{crs}'}
pts_gdf=gpd.GeoDataFrame(pts, geometry='coordinates', crs=crs_adhoc)
else:
raise ValueError(f"profiles coordinates field must contain points coordinates either as Shapely Point geometry objects or as a string representing a Shapely Point geometry in well known text. Found {type(pts.iloc[0]['coordinates'])} type instead.")
fully_contains = [intersection_gdf.geometry.contains(mask_geom)[0] for mask_geom in pts_gdf.geometry]
if True in fully_contains:
idx_true=[i for i, x in enumerate(fully_contains) if x]
raise ValueError(f"There are {len(intersection_gdf)} points in the overlap area of two label correction polygons (location: {loc}, raw_date: {raw_date}, target_label_k = {target_label_k}) which assign two different classes: {intersection_gdf.loc[:,'new_class_1'][0], intersection_gdf.loc[:,'new_class_2'][0]}. This doesn't make sense, please correct your label correction polygons. You can have overlapping polygons which act on the same target label k, but if they overlap points with such target_label_k, then they MUST assign the same new class.")
print("Check label correction polygons overlap inconsistencies terminated successfully")
check_overlay(line_geometry, img_path)
¶
Evaluates whether a line intesects the extent of a raster. Returns True if a valid intersection is found or False if not. In case of MultiLine features, evaluate if any of the lines intersects with the raster extent, which confirms that the CRS of both shapes geometries are correctly matched.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
line_geometry |
Shapely Line or MultiLinestring objects |
geometry of line to evaluate its overlay on raster. |
required |
img_path |
str |
Path to the geotiff to evaluate line overlay with. |
required |
Returns:
Type | Description |
---|---|
bool (bool) |
True, if a valid match is found. False, if the line do not intersect the raster. |
Source code in sandpyper\common.py
def check_overlay(line_geometry, img_path):
"""Evaluates whether a line intesects the extent of a raster.
Returns True if a valid intersection is found or False if not. In case of MultiLine features,
evaluate if any of the lines intersects with the raster extent,
which confirms that the CRS of both shapes geometries are correctly matched.
Args:
line_geometry (Shapely Line or MultiLinestring objects): geometry of line to evaluate its overlay on raster.
img_path (str): Path to the geotiff to evaluate line overlay with.
Returns:
bool (bool): True, if a valid match is found. False, if the line do not intersect the raster."""
# create a polygon with raster bounds
with ras.open(img_path, "r") as dataset:
ul = dataset.xy(0, 0)
ur = dataset.xy(0, dataset.shape[1])
lr = dataset.xy(dataset.shape[0], dataset.shape[1])
ll = dataset.xy(dataset.shape[0], 0)
ext_poly = gpd.GeoSeries(Polygon([ul, ur, lr, ll, ul]), crs=dataset.crs)
# get geom_type
if isinstance(line_geometry.geom_type, str):
geom_type = line_geometry.geom_type
elif isinstance(line_geometry.geom_type, pd.Series):
geom_type = geom.geom_type[0]
if geom_type == "MultiLineString":
geom_in = list(line_geometry)[0]
if ext_poly[0].intersects(geom_in):
return True
else:
return False
elif geom_type == "LineString":
geom_in = line_geometry
if ext_poly[0].intersects(geom_in):
return True
else:
return False
else:
raise IOError("Shape is not a line.")
compute_multitemporal(df, geometry_column='coordinates', date_field='survey_date', filter_class='sand')
¶
From a dataframe containing the extracted points and a column specifying whether they are sand or non-sand, returns a multitemporal dataframe with time-periods sand-specific elevation changes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
date_field |
str |
The name of the column storing the survey date. |
'survey_date' |
geometry_column |
str |
Name of the column containing the geometry. |
'coordinates' |
filter_classes |
str,list |
Name of the class of list of classes to be retained in the multitemporal computation. |
required |
Returns:
Type | Description |
---|---|
multiteporal_df (pd.DataFrame) |
A multitemporal dataframe of sand-specific elevation changes. |
Source code in sandpyper\common.py
def compute_multitemporal (df,
geometry_column="coordinates",
date_field='survey_date',
filter_class='sand'):
"""
From a dataframe containing the extracted points and a column specifying whether they are sand or non-sand, returns a multitemporal dataframe
with time-periods sand-specific elevation changes.
Args:
date_field (str): The name of the column storing the survey date.
geometry_column (str): Name of the column containing the geometry.
filter_classes (str,list): Name of the class of list of classes to be retained in the multitemporal computation.
Returns:
multiteporal_df (pd.DataFrame): A multitemporal dataframe of sand-specific elevation changes.
"""
if filter_class != None:
# check if pt_class in columns
if "pt_class" not in df.columns:
raise ValueError("The data is not classified as no 'pt_class' column is present. Please run the method cleanit on the ProfileDynamics object first.")
else:
if isinstance(filter_class, str):
filter_classes_in=[filter_class]
elif isinstance(filter_class, list):
filter_classes_in=filter_class
else:
raise ValueError(" If provided, class_filter must be either a string or a list of strings containing the classes to retain.")
print(f"Filter activated: only {filter_classes_in} points will be retained.")
else:
filter_classes_in=["no_filters_applied"]
pass
df["spatial_id"]=[create_spatial_id(df.iloc[i]) for i in range(df.shape[0])]
fusion_long=pd.DataFrame()
for location in df.location.unique():
print(f"working on {location}")
loc_data=df.query(f"location=='{location}'")
list_dates=loc_data.loc[:,date_field].unique()
list_dates.sort()
for i in tqdm(range(list_dates.shape[0])):
if i < list_dates.shape[0]-1:
date_pre=list_dates[i]
date_post=list_dates[i+1]
print(f"Calculating dt{i}, from {date_pre} to {date_post} in {location}.")
if filter_class != None:
df_pre=loc_data.query(f"{date_field} =={date_pre} & pt_class in {filter_classes_in}").dropna(subset=['z'])
df_post=loc_data.query(f"{date_field} =={date_post} & pt_class in {filter_classes_in}").dropna(subset=['z'])
else:
df_pre=loc_data.query(f"{date_field} =={date_pre}").dropna(subset=['z'])
df_post=loc_data.query(f"{date_field} =={date_post}").dropna(subset=['z'])
merged=pd.merge(df_pre,df_post, how='inner', on='spatial_id', validate="one_to_one",suffixes=('_pre','_post'))
merged["dh"]=merged.z_post.astype(float) - merged.z_pre.astype(float)
dict_short={"geometry":merged.filter(like=geometry_column).iloc[:,0],
"location":location,
"tr_id":merged.tr_id_pre,
"distance":merged.distance_pre,
"class_filter":'_'.join(filter_classes_in),
"dt": f"dt_{i}",
"date_pre":date_pre,
"date_post":date_post,
"z_pre":merged.z_pre.astype(float),
"z_post":merged.z_post.astype(float),
"dh":merged.dh}
short_df=pd.DataFrame(dict_short)
fusion_long=pd.concat([short_df,fusion_long],ignore_index=True)
print("done")
return fusion_long
consecutive_ids(data, indices=True, limit=1)
¶
Returns indices of consecutive tr_ids in groups. Covenient to create multi-line geometries in case of disconnected shorelines.
Source code in sandpyper\common.py
def consecutive_ids(data, indices=True, limit=1):
"""Returns indices of consecutive tr_ids in groups. Covenient to create multi-line geometries in case of disconnected shorelines."""
if bool(indices) == False:
return_i = 1 # return data values
else:
return_i = 0
groups = []
for k, g in groupby(enumerate(data), lambda ix: ix[0] - ix[1]):
groups.append(list(map(itemgetter(return_i), g)))
groups = [i for i in groups if len(i) > limit]
return groups
coords_to_points(string_of_coords)
¶
Function to create Shapely Point geometries from strings representing Shapely Point geometries. Used when loading CSV with point geometries in string type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
string_of_coords |
str |
the string version of Shapely Point geometry |
required |
Returns:
Type | Description |
---|---|
pt_geom |
Shapely Point geometry |
Source code in sandpyper\common.py
def coords_to_points(string_of_coords):
"""
Function to create Shapely Point geometries from strings representing Shapely Point geometries.
Used when loading CSV with point geometries in string type.
args:
string_of_coords (str): the string version of Shapely Point geometry
returns:
pt_geom : Shapely Point geometry
"""
num_ditis = re.findall("\\d+", string_of_coords)
try:
coord_x = float(num_ditis[0] + "." + num_ditis[1])
coord_y = float(num_ditis[2] + "." + num_ditis[3])
pt_geom = Point(coord_x, coord_y)
except BaseException:
print(
f"point creation failed! Assigning NaN. Check the format of the input string."
)
pt_geom = np.nan
return pt_geom
corr_baseline_distance(dist, slope, z_tide)
¶
Applies a simple geometric tidal correction of the points along a shoreline
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dist |
float, pd.Series |
Uncorrected distance or pd.Series of distances from the baseline. |
required |
slope |
float |
Subaerial beach profile slope in percentage. |
required |
z_tide |
float |
Water level or tidal height. |
required |
Returns:
Type | Description |
---|---|
float, list |
The corrected value or list of corrected values. |
Source code in sandpyper\common.py
def corr_baseline_distance(dist, slope, z_tide):
"""Applies a simple geometric tidal correction of the points along a shoreline
Args:
dist (float, pd.Series): Uncorrected distance or pd.Series of distances from the baseline.
slope (float): Subaerial beach profile slope in percentage.
z_tide (float): Water level or tidal height.
Returns:
float, list: The corrected value or list of corrected values.
"""
if isinstance(dist, (int, float, complex)):
tan_b = tan(radians(slope))
return dist + (z_tide / tan_b)
elif isinstance(dist, (pd.Series)):
tans_b = [tan(radians(i)) for i in slope]
corrs = [d + (z / sl) for d, sl, z in zip(dist, tans_b, z_tide)]
return corrs
else:
raise TypeError("Input must be either Pandas.Series or numeric (int,float).")
correct_multi_detections(shore_pts, transects)
¶
Corrects for multiple points detections on shorelines transects. Multi detections occur when the shoreline bends considerably and the transects intersect it more than once.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
shore_pts |
np.array |
array to be transformed. |
required |
transects |
pd.DataFRame |
transects to correct. |
required |
Returns:
Type | Description |
---|---|
geometries (list) |
List of geometries of correct points. |
Source code in sandpyper\common.py
def correct_multi_detections(shore_pts, transects):
"""Corrects for multiple points detections on shorelines transects. Multi detections occur when the shoreline bends considerably and the transects intersect it more than once.
Args:
shore_pts (np.array): array to be transformed.
transects (pd.DataFRame): transects to correct.
Returns:
geometries (list): List of geometries of correct points.
"""
geometries = []
for i in range(shore_pts.shape[0]):
pt_i = shore_pts.iloc[[i]]
if pt_i.qa.values[0] == 0:
geometries.append(np.nan)
else:
if pt_i.qa.values[0] != 1:
start_line = transects.query(
f"tr_id=='{pt_i.tr_id.values[0]}'"
).geometry.boundary.iloc[0][0]
min_idx = np.argmin(
[start_line.distance(pt) for pt in pt_i.geometry.values[0]]
)
geometries.append(pt_i.geometry.values[0][min_idx])
else:
geometries.append(pt_i.geometry.values[0])
return geometries
create_id(series, tr_id_field='tr_id', loc_field='location', dist_field='distance', random_state=42)
¶
Function to create unique IDs from random permutations of integers and letters from the distance, tr_id, location, coordinates and survey_date fields of the rgb and z tables.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
series |
pd.Series |
series having the selected fields. |
required |
tr_id_field |
str |
Field name holding the transect ID (Default="tr_id"). |
'tr_id' |
loc_field |
str |
Field name holding the location of the survey (Default="location"). |
'location' |
dist_field |
str |
Field name holding the distance from start of the transect (Default="distance"). |
'distance' |
random_state |
int |
Random seed. |
42 |
Returns:
Type | Description |
---|---|
ids (list) |
A series od unique IDs. |
Source code in sandpyper\common.py
def create_id(
series,
tr_id_field="tr_id",
loc_field="location",
dist_field="distance",
random_state=42,
):
"""Function to create unique IDs from random permutations of integers and letters from the distance, tr_id, location, coordinates and survey_date fields of the rgb and z tables.
args:
series (pd.Series): series having the selected fields.
tr_id_field (str): Field name holding the transect ID (Default="tr_id").
loc_field (str): Field name holding the location of the survey (Default="location").
dist_field (str): Field name holding the distance from start of the transect (Default="distance").
random_state (int): Random seed.
returns:
ids (list): A series od unique IDs.
"""
dist_c = str(np.round(float(series.loc[dist_field]), 2))
tr_id_c = str(series.loc[tr_id_field])
loc_d = str(series.loc[loc_field])
if type(series.coordinates) != str:
coord_c = series.coordinates.wkt.split()[1][-3:]
else:
coord_c = str(series.coordinates.split()[1][-3:])
if type(series.survey_date) != str:
date_c = str(series.survey_date.date())
else:
date_c = str(series.survey_date)
ids_tmp = dist_c + "0" + tr_id_c + loc_d + coord_c + date_c
ids = ids_tmp.replace(".", "0").replace("-", "")
char_list = list(ids) # convert string inti list
random.Random(random_state).shuffle(
char_list,
) # shuffle the list
ids = "".join(char_list)
return ids
create_spatial_id(series, random_state=42)
¶
Function to create IDs indipended on the survey_date, but related to to distance, tr_id and location only. Equivalent to use coordinates field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
series |
pd.Series |
series of merged table. |
required |
random_state |
int |
Random seed. |
42 |
Returns:
Type | Description |
---|---|
ids (list) |
A series od unique spatial IDs. |
Source code in sandpyper\common.py
def create_spatial_id(series, random_state=42):
"""Function to create IDs indipended on the survey_date, but related to to distance, tr_id and location only. Equivalent to use coordinates field.
args:
series (pd.Series): series of merged table.
random_state (int): Random seed.
returns:
ids (list): A series od unique spatial IDs.
"""
# ID indipended on the survey_date, but only related to distance, tr_id
# and location. Useful ID, but equivalent to use coordinates field.
ids = (
str(np.round(float(series.distance), 2))
+ "0"
+ str(series.tr_id)
+ str(series.location)
)
ids = ids.replace(".", "0").replace("-", "")
char_list = list(ids) # convert string inti list
random.Random(random_state).shuffle(
char_list,
) # shuffle the list
ids = "".join(char_list)
return ids
create_transects(baseline, sampling_step, tick_length, location, crs, side='both')
¶
Creates a GeoDataFrame with transects normal to the baseline, with defined spacing and length.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
baseline |
gdp.GeoDataFrame |
baseline geodataframe. |
required |
sampling_step |
int,float |
alognshore spacing of transects in the CRS reference unit. |
required |
tick_length |
int,float |
transects length |
required |
location |
str |
location code |
required |
crs |
dict |
coordinate reference system to georeference the transects. It must be in dictionary form. |
required |
side |
str |
If "both", the transects will be centered on the baseline. If "left" or "right", transects will start from the baseline and extend to the left/right of it. |
'both' |
Returns:
Type | Description |
---|---|
gdf_transects (gpd.GeoDataFrame) |
Geodataframe of transects. |
Source code in sandpyper\common.py
def create_transects(baseline, sampling_step, tick_length, location, crs, side="both"):
"""Creates a GeoDataFrame with transects normal to the baseline, with defined spacing and length.
args:
baseline (gdp.GeoDataFrame): baseline geodataframe.
sampling_step (int,float): alognshore spacing of transects in the CRS reference unit.
tick_length (int,float): transects length
location (str): location code
crs (dict): coordinate reference system to georeference the transects. It must be in dictionary form.
side (str): If "both", the transects will be centered on the baseline. If "left" or "right", transects will start from the baseline and extend to the left/right of it.
returns:
gdf_transects (gpd.GeoDataFrame): Geodataframe of transects.
"""
if side != "both":
tick_length = 2 * tick_length
else:
pass
if sampling_step == 0 or sampling_step >= baseline.length.values[0]:
raise ValueError(f"Sampling step provided ({sampling_step}) cannot be zero or equal or greater than the baseline length ({baseline.length.values[0]}).")
else:
try:
dists = np.arange(0, baseline.geometry.length[0], sampling_step)
except BaseException:
try:
dists = np.arange(0, baseline.geometry.length, sampling_step)
except BaseException:
dists = np.arange(0, baseline.geometry.length.values[0], sampling_step)
points_coords = []
try:
for j in [baseline.geometry.interpolate(i) for i in dists]:
points_coords.append((j.geometry.x[0], j.geometry.y[0]))
except BaseException:
for j in [baseline.geometry.interpolate(i) for i in dists]:
points_coords.append((j.geometry.x, j.geometry.y))
# create transects as Shapely linestrings
ticks = []
for num, pt in enumerate(points_coords, 1):
# start chainage 0
if num == 1:
angle = getAngle(pt, points_coords[num])
line_end_1 = getPoint1(pt, angle, tick_length / 2)
angle = getAngle([line_end_1.x, line_end_1.y], pt)
line_end_2 = getPoint2([line_end_1.x, line_end_1.y], angle, tick_length)
tick = LineString(
[(line_end_1.x, line_end_1.y), (line_end_2.x, line_end_2.y)]
)
## everything in between
if num < len(points_coords) - 1:
angle = getAngle(pt, points_coords[num])
line_end_1 = getPoint1(points_coords[num], angle, tick_length / 2)
angle = getAngle([line_end_1.x, line_end_1.y], points_coords[num])
line_end_2 = getPoint2([line_end_1.x, line_end_1.y], angle, tick_length)
tick = LineString(
[(line_end_1.x, line_end_1.y), (line_end_2.x, line_end_2.y)]
)
# end chainage
if num == len(points_coords):
angle = getAngle(points_coords[num - 2], pt)
line_end_1 = getPoint1(pt, angle, tick_length / 2)
angle = getAngle([line_end_1.x, line_end_1.y], pt)
line_end_2 = getPoint2([line_end_1.x, line_end_1.y], angle, tick_length)
tick = LineString(
[(line_end_1.x, line_end_1.y), (line_end_2.x, line_end_2.y)]
)
ticks.append(tick)
gdf_transects = gpd.GeoDataFrame(
{
"tr_id": range(len(ticks)),
"geometry": ticks,
"location": [location for i in range(len(ticks))],
},
crs=crs,
)
# clip the transects
if side == "both":
pass
else:
gdf_transects["geometry"] = gdf_transects.geometry.apply(
split_transects, **{"side": side}
)
return gdf_transects
cross_ref(dir_inputs, dirNameTrans, loc_search_dict, list_loc_codes, print_info=False)
¶
Returns a dataframe with location, raw_date, filenames (paths) and CRS of each raster and associated transect files. Used to double-check.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dir_inputs |
str |
Path of the directory containing the geotiffs datasets (.tiff or .tif). |
required |
dirNameTrans |
str |
Path of the directory containing the transects (geopackages, .gpkg). |
required |
loc_search_dict |
list |
Dictionary used to match filename with right location code. |
required |
list_loc_codes |
list |
list of strings containing location codes. |
required |
print_info |
bool |
If True, prints count of datasets/location and total. Default = False. |
False |
Returns:
Type | Description |
---|---|
rasters_df (pd.DataFRame) |
Dataframe and information about raster-transect files matches. |
Source code in sandpyper\common.py
def cross_ref(
dir_inputs, dirNameTrans, loc_search_dict, list_loc_codes, print_info=False
):
"""Returns a dataframe with location, raw_date, filenames (paths) and CRS of each raster and associated transect files. Used to double-check.
args:
dir_inputs (str): Path of the directory containing the geotiffs datasets (.tiff or .tif).
dirNameTrans (str): Path of the directory containing the transects (geopackages, .gpkg).
loc_search_dict (list): Dictionary used to match filename with right location code.
list_loc_codes (list): list of strings containing location codes.
print_info (bool): If True, prints count of datasets/location and total. Default = False.
returns:
rasters_df (pd.DataFRame): Dataframe and information about raster-transect files matches.
"""
ras_type_dict={0:"dsm",
1:"ortho"}
# DF transects
list_transects = glob.glob(rf"{dirNameTrans}/*.gpkg")
locs_transects = pd.DataFrame(
pd.Series(
[getLoc(trs, list_loc_codes) for trs in list_transects], name="location"
)
)
df_tmp_trd = pd.DataFrame(locs_transects, columns=["location"])
df_tmp_trd["filename_trs"] = list_transects
df_tmp_trd["crs_transect"] = df_tmp_trd.filename_trs.apply(getCrs_from_transect)
if isinstance(dir_inputs, list):
dirs=dir_inputs
list_rasters_check_dsm=[]
list_rasters_check_orthos=[]
else:
dirs=[dir_inputs]
rasters_df=pd.DataFrame()
for i,path_in in enumerate(dirs):
list_rasters = glob.glob(rf"{path_in}/*.ti*")
raster_types=[filepath_raster_type(i) for i in list_rasters]
if len(set(raster_types)) != 1:
raise ValueError(f"Mixed input types have been found in {ras_type_dict[i]} folder. Each folder has to contain either DSMs or orthos only.")
if isinstance(dir_inputs, list):
if i == 0:
list_rasters_check_dsm.append([extract_loc_date(os.path.split(i)[-1],loc_search_dict) for i in list_rasters])
else:
list_rasters_check_orthos.append([extract_loc_date(os.path.split(i)[-1],loc_search_dict) for i in list_rasters])
loc_date_labels_raster = [
extract_loc_date(file1, loc_search_dict=loc_search_dict)
for file1 in list_rasters
]
df_tmp_raster = pd.DataFrame(
loc_date_labels_raster, columns=["location", "raw_date"]
)
df_tmp_raster["filename_raster"] = list_rasters
df_tmp_raster["crs_raster"] = df_tmp_raster.filename_raster.apply(
getCrs_from_raster_path
)
df_tmp_raster["raster_type"]=raster_types
rasters_df=pd.concat([rasters_df,df_tmp_raster], ignore_index=True)
if isinstance(dir_inputs, list):
missing_dsms=set(*list_rasters_check_orthos).difference(set(*list_rasters_check_dsm))
missing_orthos=set(*list_rasters_check_dsm).difference(set(*list_rasters_check_orthos))
if len(missing_dsms) >0:
print(f"WARNING: {missing_dsms} missing or misnamed from the DSMs folder.\n")
else:
pass
if len(missing_orthos)>0:
print(f"WARNING: {missing_orthos} missing or misnamed from orthos folder.\n")
else:
pass
matched = pd.merge(rasters_df, df_tmp_trd, on="location", how="left").set_index(
["location"]
)
check_formatted=pd.merge(matched.query("raster_type=='ortho'").reset_index(),
matched.query("raster_type=='dsm'").reset_index(),
on=["location","raw_date"],suffixes=("_ortho","_dsm"))
check_formatted=check_formatted.loc[:,[ 'raw_date','location','filename_trs_ortho','crs_transect_dsm',
'filename_raster_dsm','filename_raster_ortho',
'crs_raster_dsm','crs_raster_ortho']]
if bool(print_info) is True:
counts=matched.groupby(["location","raster_type"]).count().reset_index()
for i,row in counts.iterrows():
print(f"{row['raster_type']} from {row['location']} = {row['raw_date']}\n")
print(f"\number OF DATASETS TO PROCESS: {counts.raw_date.sum()}")
return check_formatted
dissolve_shores(gdf_shores, field='date')
¶
Dissolves multi-part shorelines into one geometry per location-date. Uses GeoPandas.GeoDataFrame.dissolve method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
gdf_shores |
gpd.GeoDataFrame |
The geodataframe storing the shoreline. |
required |
field |
str |
The field to be used to dissolve shorelines. Default to "date". |
'date' |
Returns:
Type | Description |
---|---|
dissolved (gpd.GeoDataFrame) |
GeoDataFrame with one geometry per location-date combination. |
Source code in sandpyper\common.py
def dissolve_shores(gdf_shores, field="date"):
"""Dissolves multi-part shorelines into one geometry per location-date. Uses GeoPandas.GeoDataFrame.dissolve method.
args:
gdf_shores (gpd.GeoDataFrame): The geodataframe storing the shoreline.
field (str): The field to be used to dissolve shorelines. Default to "date".
returns:
dissolved (gpd.GeoDataFrame): GeoDataFrame with one geometry per location-date combination.
"""
if len(gdf_shores.location.unique()) != 1:
multi_location = True
else:
multi_location = False
dissolved = pd.DataFrame()
if bool(multi_location):
for loc in gdf_shores.location.unique():
geom = (
gdf_shores.query(f"location=='{loc}'").dissolve(by=field).reset_index()
)
dissolved = pd.concat([dissolved, geom], ignore_index=True)
else:
gdf_shores["diss"] = 0
geom = gdf_shores.dissolve(by="diss").reset_index()
dissolved = pd.concat([dissolved, geom], ignore_index=True)
dissolved.drop("diss", axis=1, inplace=True)
return dissolved
error_from_gt(shorelines, groundtruths, crs_dict_string, location, sampling_step, tick_length, shore_geometry_field, gt_geometry_field, gt_date_field='raw_date', shore_date_field='raw_date', side='both', baseline_mode='dynamic', tidal_correct=None)
¶
Compute shorelines errors from a groundtruth references. You can use a fixed baseline shoreline or let the baseline be dynamic, which means that a new set of transects will be created on each groundtruth shoreline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
shorelines |
gpd.GeoDataFrame |
GeoDataFrame of shorelines to test. |
required |
groundtruths |
gpd.GeoDataFrame |
GeoDataFrame of shorelines to test. |
required |
crs_dict_string |
dict |
Dictionary storing location codes as key and crs information as EPSG code (int). |
required |
location |
str |
Strings of location code ("apo"). |
required |
sampling_step |
int, float |
Alongshore distanace to separate each evaluation transect. |
required |
tick_length |
int, float |
Length of transects. |
required |
shore_geometry_field |
str |
Field where the geometry of the test shoreline is stored. |
required |
gt_geometry_field |
str |
Field where the geometry of the groundtruth shoreline is stored. |
required |
gt_date_field |
str |
Field where the survey dates are stored in the groundtruth dataset. |
'raw_date' |
shore_date_field |
str |
Field where the survey dates are stored in the to-correct dataset. |
'raw_date' |
side |
str |
Whether to create transect on the right, left or both sides. Default to "both". |
'both' |
baseline_mode |
str |
If "dynamic" (default), statistics will be computed from transects created from each groundtruth shoreline. If path to a .gpkg is provided, then use those arbitrary location specific baselines and transects will be fixed. |
'dynamic' |
tidal_correct |
str, None |
If str, apply tidal correction. |
None |
Returns:
Type | Description |
---|---|
pd.DataFrame |
Dataframe containing the distances from groundtruth at each timestep. |
Source code in sandpyper\common.py
def error_from_gt(
shorelines,
groundtruths,
crs_dict_string,
location,
sampling_step,
tick_length,
shore_geometry_field,
gt_geometry_field,
gt_date_field="raw_date",
shore_date_field="raw_date",
side="both",
baseline_mode="dynamic",
tidal_correct=None,
):
"""Compute shorelines errors from a groundtruth references. You can use a fixed baseline shoreline or let the baseline be dynamic, which means
that a new set of transects will be created on each groundtruth shoreline.
Args:
shorelines (gpd.GeoDataFrame): GeoDataFrame of shorelines to test.
groundtruths (gpd.GeoDataFrame): GeoDataFrame of shorelines to test.
crs_dict_string (dict): Dictionary storing location codes as key and crs information as EPSG code (int).
location (str): Strings of location code ("apo").
sampling_step (int, float): Alongshore distanace to separate each evaluation transect.
tick_length (int, float): Length of transects.
shore_geometry_field (str): Field where the geometry of the test shoreline is stored.
gt_geometry_field (str): Field where the geometry of the groundtruth shoreline is stored.
gt_date_field (str): Field where the survey dates are stored in the groundtruth dataset.
shore_date_field (str): Field where the survey dates are stored in the to-correct dataset.
side (str): Whether to create transect on the right, left or both sides. Default to "both".
baseline_mode (str): If "dynamic" (default), statistics will be computed from transects created from each groundtruth shoreline. If path to a .gpkg is provided, then use those arbitrary location specific baselines and transects will be fixed.
tidal_correct (str, None): If str, apply tidal correction.
Returns:
pd.DataFrame: Dataframe containing the distances from groundtruth at each timestep.
"""
crs = crs_dict_string[loc]
if os.path.isfile(baseline_mode):
print("Fixed baseline mode selected.")
baseline_loc = gpd.read_file(baseline_mode)
if baseline_loc.crs != crs:
baseline_loc = baseline_loc.to_crs(crs)
else:
pass
transects = create_transects(
baseline_loc,
sampling_step=sampling_step,
tick_length=tick_length,
crs=crs,
location=location,
)
df_temp = pd.DataFrame.from_dict(
{
"geometry": [
Point(tr_geom.coords[0][0], tr_geom.coords[0][1])
for tr_geom in transects.geometry
],
"tr_id": transects.tr_id,
}
)
baseline_pts = gpd.GeoDataFrame(df_temp, geometry="geometry", crs=crs)
elif baseline_mode == "dynamic":
print("Dynamic baseline mode selected.")
else:
raise TypeError(
"Baseline mode must be either 'dynamic' or a valid path to a .gpkg."
)
shore_shift_df = pd.DataFrame()
# subset CS shorelines with a location
cs_shore_in = groundtruths.query(f"location=='{location}'")
# select all Sentinel-2 shorelines in that location
tests = shorelines.query(f"location=='{location}'")
# if cs_shore_in.crs != crs:
# cs_shore_in=cs_shore_in.to_crs(crs)
# else:
# pass
#
# if tests.crs != crs:
# tests=tests.to_crs(crs)
# else:
# pass
for i in range(cs_shore_in.shape[0]): # for all CS groundtruths in location
groundtruth = cs_shore_in.iloc[[i]] # select the current groundtruth
# get survey date # HARDCODED DATE FIELD! BAD
survey_date = groundtruth.loc[gt_date_field][0]
if survey_date in tests.raw_date.unique():
print(f"Working on {survey_date}...")
else:
print(
f"Groundtruth in date {survey_date} not matched with any shorelines date."
)
if baseline_mode == "dynamic":
# create transects and baselines pts dynamically from each groundtruth
# shoreline
transects = create_transects(
groundtruth,
sampling_step=sampling_step,
tick_length=tick_length,
crs=crs,
location=location,
)
# create a geodataframe of transects starting points to compute distance
# from
df = pd.DataFrame.from_dict(
{
"geometry": [
Point(tr_geom.coords[0][0], tr_geom.coords[0][1])
for tr_geom in transects.geometry
],
"tr_id": transects.tr_id,
}
)
baseline_pts = gpd.GeoDataFrame(df, geometry="geometry", crs=crs)
# extract groundtruth distance from baseline (should be half transect,
# i.e. transect centroid)
gt_pts = extract_shore_pts(transects, groundtruth)
transects.loc[:, "raw_date"] = survey_date
# list all the satellite shorelines that corresponds to the groundtruth
sat_shores_in = tests.query(f"raw_date=='{survey_date}'")
for j in range(sat_shores_in.shape[0]):
shore_sat = sat_shores_in.iloc[[j]]
new_transects = shore_shift(
transects=transects,
gt=groundtruth,
sat=shore_sat,
crs=crs,
baseline_pts=baseline_pts,
sat_from_baseline=True,
)
shore_sat.rename({"geometry": "geom_shoreline"}, axis=1, inplace=True)
merged = pd.merge(shore_sat, new_transects, on="raw_date")
merged.rename({"location_x": "location"}, axis=1, inplace=True)
shore_shift_df = pd.concat([shore_shift_df, merged], ignore_index=True)
return shore_shift_df
extract_from_folder(dataset_folder, transect_folder, tr_ids, list_loc_codes, mode, sampling_step, add_xy=False, add_slope=False, default_nan_values=-10000)
¶
Wrapper to extract profiles from all rasters inside a folder.
Warning: The folders must contain the geotiffs and geopackages only.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_folder |
str |
Path of the directory containing the datasets (geotiffs, .tiff). |
required |
transect_folder |
str |
Path of the directory containing the transects (geopackages, .gpkg). |
required |
tr_ids |
str |
If 'reset', a new incremental transect_id will be automatically assigned. If the name of a column in the transect files is provided, use that column as transect IDs. |
required |
list_loc_codes |
list |
list of strings containing location codes. |
required |
mode |
str |
If 'dsm', extract from DSMs. If 'ortho', extracts from orthophotos. |
required |
sampling_step |
float |
Distance along-transect to sample points at. In meters. |
required |
add_xy |
bool |
If True, adds extra columns with long and lat coordinates in the input CRS. |
False |
add_slope |
bool |
If True, computes slope raster in degrees (increased processing time) and extract slope values across transects. |
False |
default_nan_values |
int |
Value used for NoData in the raster format. In Pix4D, this is -10000 (Default). |
-10000 |
Returns:
Type | Description |
---|---|
gdf (gpd.GeoDataFrame) |
A geodataframe with survey and topographical or color information extracted. |
Source code in sandpyper\common.py
def extract_from_folder(
dataset_folder,
transect_folder,
tr_ids,
list_loc_codes,
mode,
sampling_step,
add_xy=False,
add_slope=False,
default_nan_values=-10000
):
"""Wrapper to extract profiles from all rasters inside a folder.
Warning: The folders must contain the geotiffs and geopackages only.
Args:
dataset_folder (str): Path of the directory containing the datasets (geotiffs, .tiff).
transect_folder (str): Path of the directory containing the transects (geopackages, .gpkg).
tr_ids (str): If 'reset', a new incremental transect_id will be automatically assigned. If the name of a column in the transect files is provided, use that column as transect IDs.
list_loc_codes (list): list of strings containing location codes.
mode (str): If 'dsm', extract from DSMs. If 'ortho', extracts from orthophotos.
sampling_step (float): Distance along-transect to sample points at. In meters.
add_xy (bool): If True, adds extra columns with long and lat coordinates in the input CRS.
add_slope (bool): If True, computes slope raster in degrees (increased processing time) and extract slope values across transects.
default_nan_values (int): Value used for NoData in the raster format. In Pix4D, this is -10000 (Default).
Returns:
gdf (gpd.GeoDataFrame): A geodataframe with survey and topographical or color information extracted.
"""
# Get a list of all the filenames and path
list_files = filter_filename_list(
getListOfFiles(dataset_folder), fmt=[".tif", ".tiff"]
)
dates = [getDate(dsm_in) for dsm_in in list_files]
# List all the transects datasets
if os.path.isdir(transect_folder):
list_trans = getListOfFiles(transect_folder)
elif os.path.isfile(transect_folder):
list_trans = getListOfFiles(transect_folder)
start = time.time()
# Set the sampling distance (step) for your profiles
gdf = pd.DataFrame()
counter = 0
if bool(add_slope):
warnings.warn(
"WARNING: add_terrain could increase processing time considerably for fine scale DSMs."
)
for dsm in tqdm(list_files):
with ras.open(dsm, 'r') as ds:
nan_values = ds.nodata
if nan_values:
pass
else:
nan_values=default_nan_values
date_string = getDate(dsm)
location = getLoc(dsm, list_loc_codes)
if bool(add_slope):
terr = rd.LoadGDAL(dsm, no_data=nan_values)
print(
f"Computing slope DSM in degrees in {location} at date: {date_string} . . ."
)
slope = rd.TerrainAttribute(terr, attrib="slope_degrees")
else:
slope = False
transect_file_input = [a for a in list_trans if location in a]
transect_file = gpd.read_file(transect_file_input[0])
tr_list = np.arange(0, transect_file.shape[0])
for i in tqdm(tr_list):
if mode == "dsm":
temp = get_profiles(
dsm=dsm,
transect_file=transect_file,
tr_ids=tr_ids,
transect_index=i,
step=sampling_step,
location=location,
date_string=date_string,
add_xy=add_xy,
add_terrain=slope,
)
elif mode == "ortho":
temp = get_profile_dn(
ortho=dsm,
transect_file=transect_file,
transect_index=i,
step=sampling_step,
location=location,
tr_ids=tr_ids,
date_string=date_string,
add_xy=add_xy,
)
gdf = pd.concat([temp, gdf], ignore_index=True)
counter += 1
if counter == len(list_files):
print("Extraction successful")
else:
print(f"There is something wrong with this dataset: {list_files[counter]}")
end = time.time()
timepassed = end - start
print(
f"Number of points extracted:{gdf.shape[0]}\nTime for processing={timepassed} seconds\nFirst 10 rows are printed below"
)
if mode == "dsm":
nan_out = np.count_nonzero(np.isnan(np.array(gdf.z).astype("f")))
nan_raster = np.count_nonzero(gdf.z == nan_values)
gdf.z.replace(-10000, np.nan, inplace=True)
elif mode == "ortho":
nan_out = np.count_nonzero(
np.isnan(np.array(gdf[["band1", "band2", "band3"]]).astype("f"))
)
nan_raster = np.count_nonzero(gdf.band1 == nan_values)
gdf.band1.replace(0.0, np.nan, inplace=True)
gdf.band2.replace(0.0, np.nan, inplace=True)
gdf.band3.replace(0.0, np.nan, inplace=True)
print(
f"Number of points outside the raster extents: {nan_out}\nThe extraction assigns NaN."
)
print(
f"Number of points in NoData areas within the raster extents: {nan_raster}\nThe extraction assigns NaN."
)
return gdf
extract_loc_date(name, loc_search_dict, split_by='_')
¶
Get the location code (e.g. wbl, por) and raw dates (e.g. 20180902) from filenames using the search dictionary. If no location is found using exact matches, a fuzzy word match is implemented, searching closest matches between locations in filenames and search candidates provided in the loc_search_dict dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
full filenames of the tipy 'C:\jupyter\data_in_gcp\20180601_mar_gcps.csv'). |
required |
loc_search_dict |
dict |
a dictionary where keys are the location codes and values are lists containing the expected full location string (["Warrnambool", "warrnambool","warrny"]). |
required |
split_by |
str |
the character used to split the name (default= '_'). |
'_' |
Returns:
Type | Description |
---|---|
('location',raw_date) |
tuple with location and raw date. |
Source code in sandpyper\common.py
def extract_loc_date(name, loc_search_dict, split_by="_"):
"""
Get the location code (e.g. wbl, por) and raw dates (e.g. 20180902) from filenames using the search dictionary.
If no location is found using exact matches, a fuzzy word match is implemented, searching closest matches
between locations in filenames and search candidates provided in the loc_search_dict dictionary.
Args:
name (str): full filenames of the tipy 'C:\\jupyter\\data_in_gcp\\20180601_mar_gcps.csv').
loc_search_dict (dict): a dictionary where keys are the location codes and values are lists containing the expected full location string (["Warrnambool", "warrnambool","warrny"]).
split_by (str): the character used to split the name (default= '_').
Returns:
('location',raw_date) : tuple with location and raw date.
"""
try:
date=getDate(name)
except:
print("Proceeding with automated regular expression match")
date=find_date_string(name)
print(f"Date found: {date}")
names = set((os.path.split(name)[-1].split(split_by)))
locations_search_names=list(loc_search_dict.values())
locations_codes=list(loc_search_dict.keys())
for loc_code, raw_strings_loc in zip(locations_codes, locations_search_names): # loop trhough all possible lists of raw strings
raw_str_set = set(raw_strings_loc)
match = raw_str_set.intersection(names)
if len(match) >= 1:
location_code_found = loc_code
break
try:
return (location_code_found, date)
except:
# location not found. Let's implement fuzzy string match.
scores =[]
for i,search_loc in enumerate(locations_search_names):
for word in search_loc:
score=fuzz.ratio(word,names) # how close is each candidate word to the list of names which contain the location?
scores.append([score,i,word])
scores_arr=np.array(scores) # just to safely use np.argmax on a specified dimension
max_score_idx=scores_arr[:,:2].astype(int).argmax(0)[0] # returns the index of the maximum score in scores array
closest_loc_code_idx=scores[max_score_idx][1] # closest code found
closest_word=scores[max_score_idx][-1]
loc_code=locations_codes[closest_loc_code_idx]
print(f"Location not understood in {name}.\n\
Fuzzy word matching found {closest_word}, which corresponds to location code: {loc_code} ")
return (loc_code, date)
filepath_raster_type(raster_path)
¶
Returns 'dsm' if the path provided (raster_path) is of a DSM (1 single band) or 'ortho' if has multiple bands.
Source code in sandpyper\common.py
def filepath_raster_type(raster_path):
"""Returns 'dsm' if the path provided (raster_path) is of a DSM (1 single band) or 'ortho' if has multiple bands."""
with ras.open(r"{}".format(raster_path)) as raster:
if raster.count==1:
return "dsm"
else:
return "ortho"
fill_gaps(data_in, y_heat_bottom_limit, spacing, bottom=True, y_heat_start=0)
¶
Function to fill the pivoted table (returned from prep_heatmap function) with missing across-shore distances, due to align data on heatmaps. Empty rows (NaN) will be added on top (from 0 to the first valid distance) and, optionally on the bottom of each transect, (from the last valid distance to a specified seaward limit).
Warning
This function assume along-transect distances to be going from land to water, which is not what the profiles distances represent originally.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_in |
pd.DataFrame |
Pivoted dataframe, where each column is a transect and row is a along-shore distance. |
required |
y_heat_bottom_limit |
int |
Lower boundary distance to extend all transects. |
required |
bottom |
bool |
If True (default), rows are extended seaward too, up to y_heat_bottom_limit. If False, only distances from 0 to the first valid values will be added. |
True |
y_heat_start |
int |
Landward starting distance value (default=0) |
0 |
spacing |
float |
Sampling step (meters) used to extract points (default=0.1) |
required |
Returns:
Type | Description |
---|---|
complete_df (pd.DataFrame) |
Complete dataframe with extra rows of NaN added. |
Source code in sandpyper\common.py
def fill_gaps(data_in, y_heat_bottom_limit, spacing, bottom=True, y_heat_start=0):
"""Function to fill the pivoted table (returned from prep_heatmap function) with missing across-shore distances, due to align data on heatmaps. Empty rows (NaN) will be added on top (from 0 to the first valid distance) and, optionally on the bottom of each transect, (from the last valid distance to a specified seaward limit).
Warning:
This function assume along-transect distances to be going from land to water, which is not what the profiles distances represent originally.
Args:
data_in (pd.DataFrame): Pivoted dataframe, where each column is a transect and row is a along-shore distance.
y_heat_bottom_limit (int): Lower boundary distance to extend all transects.
bottom (bool): If True (default), rows are extended seaward too, up to y_heat_bottom_limit. If False, only distances from 0 to the first valid values will be added.
y_heat_start (int): Landward starting distance value (default=0)
spacing (float): Sampling step (meters) used to extract points (default=0.1)
Returns:
complete_df (pd.DataFrame): Complete dataframe with extra rows of NaN added.
"""
if y_heat_bottom_limit < data_in.index[-1]:
raise ValueError(f"y_heat_bottom_limit ({y_heat_bottom_limit}) cannot be lower than the maximum distance already present in the data ({data_in.index[-1]}).")
if bool(bottom):
bottom_fill_array = np.empty(
(
(
int(
np.round(y_heat_bottom_limit + spacing - data_in.index[-1], 1)
)
),
data_in.shape[1],
)
)
bottom_fill_array[:] = np.NaN
to_concat_after = pd.DataFrame(
data=bottom_fill_array,
index=np.arange(data_in.index[-1], y_heat_bottom_limit + spacing, spacing),
columns=data_in.columns,
)
else:
pass
before_fill_array = np.empty((int(data_in.index[0]), data_in.shape[1]))
before_fill_array[:] = np.NaN
to_concat_before = pd.DataFrame(
data=before_fill_array,
index=np.arange(y_heat_start, data_in.index[0], spacing),
columns=data_in.columns,
)
if bool(bottom):
return pd.concat([to_concat_before, data_in, to_concat_after.iloc[1:]])
else:
return pd.concat([to_concat_before, data_in])
filter_filename_list(filenames_list, fmt=['.tif', '.tiff'])
¶
It returns a list of only specific file formats from a list of filenames.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filenames_list |
list |
list of filenames. |
required |
fmt |
list |
list of formats to be filtered (DEFAULT = [".tif",".tiff"]) |
['.tif', '.tiff'] |
Returns:
Type | Description |
---|---|
filtered_list (list) |
A filtered list of filenames. |
Source code in sandpyper\common.py
def filter_filename_list(filenames_list, fmt=[".tif", ".tiff"]):
"""It returns a list of only specific file formats from a list of filenames.
Args:
filenames_list (list): list of filenames.
fmt (list): list of formats to be filtered (DEFAULT = [".tif",".tiff"])
Returns:
filtered_list (list): A filtered list of filenames.
"""
return [name for name in filenames_list if os.path.splitext(name)[1] in fmt]
find_date_string(filename, list_months=['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sept', 'oct', 'nov', 'dec'], to_rawdate=True)
¶
It finds the date in the filename and returns True if it is already in the Sandpyper format. If it is not formatted and a date is found, it returns a formatted version of it. For example, dates in raw filenames should be similar to "Seaspray_22_Oct_2020_GeoTIFF_DSM_GDA94_MGA_zone_55.tiff".
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
filename to test, of the type "Seaspray_22_Oct_2020_GeoTIFF_DSM_GDA94_MGA_zone_55.tiff". |
required |
list_months |
list |
expected denominations for the months. Default to ['jan','feb','mar',...,'dec']. |
['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sept', 'oct', 'nov', 'dec'] |
to_rawdate |
bool |
True to format the found date into raw_date (20201022). False, return True if the date is found or False if not. |
True |
Returns:
Type | Description |
---|---|
bool (bool, str) |
True if it was already formatted or a new string formatted correctly. |
Source code in sandpyper\common.py
def find_date_string(
filename,
list_months=[
"jan",
"feb",
"mar",
"apr",
"may",
"jun",
"jul",
"aug",
"sept",
"oct",
"nov",
"dec",
],
to_rawdate=True,
):
"""It finds the date in the filename and returns True if it is already in the Sandpyper format. If it is not formatted and a date is found, it returns a formatted version of it. For example, dates in raw filenames should be similar to "Seaspray_22_Oct_2020_GeoTIFF_DSM_GDA94_MGA_zone_55.tiff".
Args:
filename (str): filename to test, of the type "Seaspray_22_Oct_2020_GeoTIFF_DSM_GDA94_MGA_zone_55.tiff".
list_months (list): expected denominations for the months. Default to ['jan','feb','mar',...,'dec'].
to_rawdate (bool): True to format the found date into raw_date (20201022). False, return True if the date is found or False if not.
Returns:
bool (bool, str): True if it was already formatted or a new string formatted correctly.
"""
re_list_months = "|".join(list_months)
regx = rf"_\d{{2}}_({re_list_months})_\d{{4}}_"
try:
group = re.search(regx, filename, re.IGNORECASE).group()
if to_rawdate == False:
return True
else:
group_split = group.split("_")
dt = datetime.datetime.strptime(
f"{group_split[1]}{group_split[2]}{group_split[3]}", "%d%b%Y"
)
return dt.strftime("%Y%m%d")
except:
return False
get_beachface_length(series)
¶
Get across-shore beachface length from series of elevation change with distance as indices.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
series |
pd.Series |
series of elevation change with distance as indices. |
required |
Returns:
Type | Description |
---|---|
beachface_length (float) |
Across-shore beachface length in meters. |
Source code in sandpyper\common.py
def get_beachface_length(series):
"""Get across-shore beachface length from series of elevation change with distance as indices.
Args:
series (pd.Series): series of elevation change with distance as indices.
Returns:
beachface_length (float): Across-shore beachface length in meters.
"""
min_dist, max_dist = series.first_valid_index(), series.last_valid_index()
across_shore_beachface_length = np.round(max_dist - min_dist, 1)
return across_shore_beachface_length
get_coastal_Markov(arr_markov, weights_dict, store_neg=True)
¶
Compute BCDs from first-order transition matrices of dh magnitude classes (as states).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
arr_markov |
np.array |
Numpy array of markov transition matrix. |
required |
weights_dict |
dict |
Dictionary with keys:dh classes, values: weight (int). Especially useful for the e-BCDs magnitude trend (sign). |
required |
store_neg |
bool |
If True (default), use the subtraction for diminishing trends. |
True |
Returns:
Type | Description |
---|---|
(tuple) |
Tuple containing: BCD(float) the actual BCD indices trend(float) value of the indices trend sign(str) can be '-' or '+' for plotting purposes. |
Source code in sandpyper\common.py
def get_coastal_Markov(arr_markov, weights_dict, store_neg=True):
"""Compute BCDs from first-order transition matrices of dh magnitude classes (as states).
Args:
arr_markov (np.array): Numpy array of markov transition matrix.
weights_dict (dict): Dictionary with keys:dh classes, values: weight (int). Especially useful for the e-BCDs magnitude trend (sign).
store_neg (bool): If True (default), use the subtraction for diminishing trends.
Returns:
(tuple): Tuple containing:
BCD(float) the actual BCD indices
trend(float) value of the indices trend
sign(str) can be '-' or '+' for plotting purposes.
"""
combs = pd.Series(product(arr_markov.index, (arr_markov.columns)))
value_trans = 0
value_trend = 0
for state1, state2 in combs:
state_1_w = weights_dict[state1] # extract the weights
state_2_w = weights_dict[state2]
value = arr_markov.at[state1, state2]
if state_1_w > state_2_w: # check if the change is decr
if bool(store_neg):
weigth_adhoc_trend = state_1_w * (-(state_2_w))
value_trans += value
value_trend += value * weigth_adhoc_trend
elif state_1_w < state_2_w:
weigth_adhoc_trend = state_1_w * state_2_w
value_trans += value
value_trend += value * weigth_adhoc_trend
else:
weigth_adhoc_trend = state_1_w * state_2_w
value_trans += value
value_trend += value * weigth_adhoc_trend
if value_trend > 0:
sign = "+"
elif value_trend < 0:
sign = "-"
elif value_trend == 0:
sign = "0"
else:
sign = np.nan
return np.round(value_trans, 3), np.round(value_trend, 3), sign
get_dn(x_coord, y_coord, raster, bands, transform)
¶
Returns the value of the raster at a specified location and band.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x_coord |
float |
Projected X coordinates of pixel to extract value. |
required |
y_coord |
float |
Projected Y coordinates of pixel to extract value. |
required |
raster |
rasterio open file |
Open raster object, from rasterio.open(raster_filepath). |
required |
bands |
int |
number of bands. |
required |
transform |
Shapely Affine obj |
Geotransform of the raster. |
required |
Returns:
Type | Description |
---|---|
px (int, float) |
raster pixel value. |
Source code in sandpyper\common.py
def get_dn(x_coord, y_coord, raster, bands, transform):
"""Returns the value of the raster at a specified location and band.
args:
x_coord (float): Projected X coordinates of pixel to extract value.
y_coord (float): Projected Y coordinates of pixel to extract value.
raster (rasterio open file): Open raster object, from rasterio.open(raster_filepath).
bands (int): number of bands.
transform (Shapely Affine obj): Geotransform of the raster.
returns:
px (int, float): raster pixel value.
"""
# Let's create an empty list where we will store the elevation (z) from points
# With GDAL, we extract 4 components of the geotransform (gt) of our north-up image.
dn_val = []
row, col = rowcol(transform, x_coord, y_coord, round)
for j in range(1, 4): # we could iterate thru multiple bands
try:
data = raster.read(j, window=Window(col, row, 1, 1))
dn_val.append(data[0][0])
except BaseException:
dn_val.append(np.nan)
return dn_val
get_elevation(x_coord, y_coord, raster, bands, transform)
¶
Returns the value of the raster at a specified location and band.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x_coord |
float |
Projected X coordinates of pixel to extract value. |
required |
y_coord |
float |
Projected Y coordinates of pixel to extract value. |
required |
raster |
rasterio open file |
Open raster object, from rasterio.open(raster_filepath). |
required |
bands |
int |
number of bands. |
required |
transform |
Shapely Affine obj |
Geotransform of the raster. |
required |
Returns:
Type | Description |
---|---|
px (int,float) |
raster pixel value. |
Source code in sandpyper\common.py
def get_elevation(x_coord, y_coord, raster, bands, transform):
"""
Returns the value of the raster at a specified location and band.
Args:
x_coord (float): Projected X coordinates of pixel to extract value.
y_coord (float): Projected Y coordinates of pixel to extract value.
raster (rasterio open file): Open raster object, from rasterio.open(raster_filepath).
bands (int): number of bands.
transform (Shapely Affine obj): Geotransform of the raster.
Returns:
px (int,float): raster pixel value.
"""
elevation = []
row, col = rowcol(transform, x_coord, y_coord, round)
for j in np.arange(bands): # we could iterate thru multiple bands
try:
data_z = raster.read(1, window=Window(col, row, 1, 1))
elevation.append(data_z[0][0])
except BaseException:
elevation.append(np.nan)
return elevation
get_m3_m_location(data_in, dx, transect_spacing=20)
¶
Get alongshore-shore net volumetric change in cubic meters per meter of beach.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_in |
pd.Dataframe |
Dataframe generated by prep_heatmap function. |
required |
transect_spacing |
int,float |
Spacing between transects in meters. |
20 |
dx |
int,float |
The along-transect point spacing |
required |
Returns:
Type | Description |
---|---|
m3_m (float) |
Cubic meters of change per meter of beach alongshore, at the site level. |
Source code in sandpyper\common.py
def get_m3_m_location(data_in, dx, transect_spacing=20):
"""
Get alongshore-shore net volumetric change in cubic meters per meter of beach.
Args:
data_in (pd.Dataframe): Dataframe generated by prep_heatmap function.
transect_spacing (int,float): Spacing between transects in meters.
dx (int,float): The along-transect point spacing
Returns:
m3_m (float): Cubic meters of change per meter of beach alongshore, at the site level.
"""
# compute alongshore beachface length
along_beach = data_in.shape[1] * transect_spacing
tot_vol = sum(
(data_in.apply(interpol_integrate, axis=0, **{'dx':dx})) * transect_spacing
) # compute net volume change
return tot_vol / along_beach # return m3_m alongshore
get_opt_k(sil_df, sigma=1)
¶
Function to create a dictionary with optimal number of clusters (k) for all surveys. It search for the inflexion points where an additional cluster does not degrade the overall clustering performance. It uses a Gaussian smoothed regression of k against mean silhouette scores to identify relative minima (first order) as possible inlfexion values. When multiple relative minimas are found, the smaller k will be the optimal one. When no relative minima are found, it searches for peaks in the second order derivative of such regression line. If multiple peaks are found, the mean k, computed so far, will be used as optimal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sil_df |
Pandas dataframe |
Dataframe containing the mean silhouette score per k in each survey. |
required |
sigma |
int |
Number of standard deviations to use in the Gaussian filter. Default is 1. |
1 |
Returns:
Type | Description |
---|---|
opt_k (dict) |
Dictionary with optimal k for each survey. |
Source code in sandpyper\common.py
def get_opt_k(sil_df, sigma=1):
"""
Function to create a dictionary with optimal number of clusters (k) for all surveys.
It search for the inflexion points where an additional cluster does not degrade the overall clustering performance.
It uses a Gaussian smoothed regression of k against mean silhouette scores to identify relative minima (first order)
as possible inlfexion values.
When multiple relative minimas are found, the smaller k will be the optimal one.
When no relative minima are found, it searches for peaks in the second order derivative of such regression line.
If multiple peaks are found, the mean k, computed so far, will be used as optimal.
Args:
sil_df (Pandas dataframe): Dataframe containing the mean silhouette score per k in each survey.
sigma (int): Number of standard deviations to use in the Gaussian filter. Default is 1.
Returns:
opt_k (dict): Dictionary with optimal k for each survey.
"""
si_arr = sil_df.groupby(["location", "raw_date"])["silhouette_mean"].apply(
np.array
)
k_arr = sil_df.groupby(["location", "raw_date"])["k"].apply(np.array)
dict_data = {"k_numbers": k_arr, "silhouette_mean": si_arr}
sil_group_df = pd.DataFrame(dict_data)
sil_group_df = sil_group_df.reset_index()
opt_k = dict()
for i in range(0, sil_group_df.shape[0]):
location = sil_group_df.iloc[i].location
survey_date_in = sil_group_df.iloc[i].raw_date
sub = sil_group_df.loc[i, ["k_numbers", "silhouette_mean"]]
# Passing a gaussian filter to smooth the curve for 1 std sigma
gauss = gaussian_filter(sub.silhouette_mean, sigma=sigma)
# obtaining relative minima for the smoothed line (gauss)
mina = sig.argrelmin(gauss, axis=0, order=1, mode="clip")
if len(mina[0]) == 0:
# if no relative minima are found, compute the second order accurate central differences in the interior points
# as optimal k
der = np.gradient(gauss)
peak = sig.find_peaks(der)
if (
len(peak) > 1
): # if multiple plateau values are found, obtain the mean k of those values
peak = int(np.mean(peak[0])) + 2
# +2: the peaks of mina values are 0-based index. As k started at 2, adding 2 returns k instead of index
opt_k[f"{location}_{survey_date_in}"] = peak
else:
opt_k[f"{location}_{survey_date_in}"] = peak[0][0] + 2
elif len(mina[0]) == 1:
opt_k[f"{location}_{survey_date_in}"] = mina[0][0] + 2
else:
# if multiple relative minimas are found, use the first one as optimal k
opt_k[f"{location}_{survey_date_in}"] = mina[0][0] + 2
return opt_k
get_profile_dn(ortho, transect_file, transect_index, tr_ids, step, location, date_string, add_xy=False)
¶
Returns a tidy GeoDataFrame of profile data, extracting raster information at a user-defined (step) meters gap along each transect.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ortho |
str |
path to the DSM raster. |
required |
transect_file |
str |
path to the transect file. |
required |
transect_index |
int |
index of the transect to extract information from. |
required |
step |
int,float |
sampling distance from one point to another in meters along the transect. |
required |
location |
str |
location code |
required |
date_string |
str |
raw format of the survey date (20180329) |
required |
add_xy |
bool |
True to add X and Y coordinates fields. |
False |
Returns:
Type | Description |
---|---|
gdf (gpd.GeoDataFrame) |
Profile data extracted from the raster. |
Source code in sandpyper\common.py
def get_profile_dn(
ortho, transect_file,
transect_index, tr_ids,
step, location, date_string, add_xy=False
):
"""Returns a tidy GeoDataFrame of profile data, extracting raster information at a user-defined (step) meters gap along each transect.
Args:
ortho (str): path to the DSM raster.
transect_file (str): path to the transect file.
transect_index (int): index of the transect to extract information from.
step (int,float): sampling distance from one point to another in meters along the transect.
location (str): location code
date_string (str): raw format of the survey date (20180329)
add_xy (bool): True to add X and Y coordinates fields.
Returns:
gdf (gpd.GeoDataFrame) : Profile data extracted from the raster.
"""
ds = ras.open(ortho, "r")
bands = ds.count
transform = ds.transform
line = transect_file.loc[transect_index]
if tr_ids=='reset':
line_id=line.name
elif isinstance(tr_ids,str) and tr_ids in line.index:
line_id=line.loc[tr_ids]
else:
raise ValueError(f"'tr_ids' must be either 'reset' or the name of an existing column o the transect files. '{tr_ids}' was passed.")
length_m = line.geometry.length
x = []
y = []
dn = []
distance = []
for currentdistance in np.arange(0, int(length_m), step):
# creation of the point on the line
point = line.geometry.interpolate(currentdistance)
xp, yp = (
point.x,
point.y,
) # storing point xy coordinates into xp,xy objects, respectively
x.append(xp) # see below
y.append(
yp
) # append point coordinates to previously created and empty x,y lists
dn.append(get_dn(xp, yp, ds, bands, transform))
distance.append(currentdistance)
dn1 = pd.Series((v[0] for v in dn))
dn2 = pd.Series((v[1] for v in dn))
dn3 = pd.Series((v[2] for v in dn))
df = pd.DataFrame({"distance": distance, "band1": dn1, "band2": dn2, "band3": dn3})
df["coordinates"] = list(zip(x, y))
df["coordinates"] = df["coordinates"].apply(Point)
df["location"] = location
df["survey_date"] = pd.to_datetime(date_string, yearfirst=True, dayfirst=False, format="%Y%m%d")
df["raw_date"] = date_string
df["tr_id"] = int(line_id)
gdf_rgb = gpd.GeoDataFrame(df, geometry="coordinates")
# Last touch, the proj4 info (coordinate reference system) is gathered with
# Geopandas and applied to the newly created one.
gdf_rgb.crs = str(transect_file.crs)
# Let's create unique IDs from the coordinates values, so that the Ids
# follows the coordinates
gdf_rgb["point_id"] = [
create_id(gdf_rgb.iloc[i]) for i in range(0, gdf_rgb.shape[0])
]
if bool(add_xy):
# Adding long/lat fields
gdf_rgb["x"] = gdf_rgb.coordinates.x
gdf_rgb["y"] = gdf_rgb.coordinates.y
else:
pass
return gdf_rgb
get_profiles(dsm, transect_file, tr_ids, transect_index, step, location, date_string, add_xy=False, add_terrain=False)
¶
Returns a tidy GeoDataFrame of profile data, extracting raster information at a user-defined (step) meters gap along each transect.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dsm |
str |
path to the DSM raster. |
required |
transect_file |
str |
path to the transect file. |
required |
transect_index |
int |
index of the transect to extract information from. |
required |
step |
int,float |
sampling distance from one point to another in meters along the transect. |
required |
location |
str |
location code |
required |
date_string |
str |
raw format of the survey date (20180329) |
required |
add_xy |
bool |
True to add X and Y coordinates fields. |
False |
add_terrain |
bool |
True to add slope in degrees. Default to False. |
False |
Returns:
Type | Description |
---|---|
gdf (gpd.GeoDataFrame) |
Profile data extracted from the raster. |
Source code in sandpyper\common.py
def get_profiles(
dsm,
transect_file,
tr_ids,
transect_index,
step,
location,
date_string,
add_xy=False,
add_terrain=False,
):
"""
Returns a tidy GeoDataFrame of profile data, extracting raster information
at a user-defined (step) meters gap along each transect.
Args:
dsm (str): path to the DSM raster.
transect_file (str): path to the transect file.
transect_index (int): index of the transect to extract information from.
step (int,float): sampling distance from one point to another in meters along the transect.
location (str): location code
date_string (str): raw format of the survey date (20180329)
add_xy (bool): True to add X and Y coordinates fields.
add_terrain (bool): True to add slope in degrees. Default to False.
Returns:
gdf (gpd.GeoDataFrame) : Profile data extracted from the raster.
"""
ds = ras.open(dsm, "r")
bands = ds.count # get raster bands. One, in a classic DEM
transform = ds.transform # get geotransform info
# index each transect and store it a "line" object
line = transect_file.loc[transect_index]
if tr_ids=='reset':
line_id=line.name
elif isinstance(tr_ids,str) and tr_ids in line.index:
line_id=line.loc[tr_ids]
else:
raise ValueError(f"'tr_ids' must be either 'reset' or the name of an existing column o the transect files. '{tr_ids}' was passed.")
length_m = line.geometry.length
# Creating empty lists of coordinates, elevations and distance (from start
# to end points along each transect lines)
x = []
y = []
z = []
slopes = []
# The "distance" object is and empty list which will contain the x variable
# which is the displacement from the shoreward end of the transects toward
# the foredunes.
distance = []
for currentdistance in np.arange(0, int(length_m), step):
# creation of the point on the line
point = line.geometry.interpolate(currentdistance)
xp, yp = (
point.x,
point.y,
) # storing point xy coordinates into xp,xy objects, respectively
x.append(xp) # see below
y.append(
yp
) # append point coordinates to previously created and empty x,y lists
# extraction of the elevation value from DSM
z.append(get_elevation(xp, yp, ds, bands, transform)[0])
if str(type(add_terrain)) == "<class 'richdem.rdarray'>":
slopes.append(get_terrain_info(xp, yp, add_terrain))
else:
pass
# append the distance value (currentdistance) to distance list
distance.append(currentdistance)
# Below, the empty lists tr_id_list and the date_list will be filled by strings
# containing the transect_id of every point as stored in the original dataset
# and a label with the date as set in the data setting section, after the input.
zs= pd.Series((elev for elev in z))
if str(type(add_terrain)) == "<class 'richdem.rdarray'>":
slopes_series= pd.Series((slope for slope in slope))
df = pd.DataFrame({"distance": distance, "z": zs, "slope":slopes_series})
else:
df = pd.DataFrame({"distance": distance, "z": zs})
df["coordinates"] = list(zip(x, y))
df["coordinates"] = df["coordinates"].apply(Point)
df["location"] = location
df["survey_date"] = pd.to_datetime(date_string, yearfirst=True, dayfirst=False, format="%Y%m%d")
df["raw_date"] = date_string
df["tr_id"] = int(line_id)
gdf = gpd.GeoDataFrame(df, geometry="coordinates")
# The proj4 info (coordinate reference system) is gathered with
# Geopandas and applied to the newly created one.
gdf.crs = str(transect_file.crs)
# Transforming non-hashable Shapely coordinates to hashable strings and
# store them into a variable
# Let's create unique IDs from the coordinates values, so that the Ids
# follows the coordinates
gdf["point_id"] = [create_id(gdf.iloc[i]) for i in range(0, gdf.shape[0])]
if bool(add_xy):
# Adding long/lat fields
gdf["x"] = gdf.coordinates.x
gdf["y"] = gdf.coordinates.y
else:
pass
return gdf
get_rbcd_transect(df_labelled, loc_specs, reliable_action, dirNameTrans, labels_order, loc_codes, crs_dict_string)
¶
It computes the r-BCDs at the transect level, based on the timeseries of elevation change magnituteds across the beachface dataset (markov_tag dataframe).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df_labelled |
pd.DataFrame |
Pandas dataframe with dh magnitude labelled. |
required |
loc_specs |
dict |
Dictionary where keys are the location codes and values are location-specific (inner) dictionaries where keys are 'thresh' and 'min_points' and values are the associated values, like loc_specs={'mar':{'thresh':6,'min_points':6}, 'leo':{'thresh':4,'min_points':20}}. |
required |
reliable_action |
str) |
Insert "drop" (default) to remove transects that have less than the specified number of non-nnn points (thresh) or "keep" to keep them. |
required |
dirNameTrans |
str |
Path of the directory containing the transects (geopackages, .gpkg). |
required |
labels_order |
list |
Order of labels (magnitude of change) to be preserved. |
required |
loc_codes |
list |
List of strings containing location codes. |
required |
crs_dict_string |
dict |
Dictionary storing location codes as key and crs information as values, in dictionary form. |
required |
Returns:
Type | Description |
---|---|
rbcd_transects (gpd.GeoDataFrame) |
A GeoDataFrames containing the steady-state distribution of each transect. |
Source code in sandpyper\common.py
def get_rbcd_transect(df_labelled, loc_specs, reliable_action, dirNameTrans, labels_order, loc_codes, crs_dict_string):
"""It computes the r-BCDs at the transect level, based on the timeseries of elevation change magnituteds across the beachface dataset (markov_tag dataframe).
Args:
df_labelled (pd.DataFrame): Pandas dataframe with dh magnitude labelled.
loc_specs (dict): Dictionary where keys are the location codes and values are location-specific (inner) dictionaries where keys are 'thresh' and 'min_points' and values are the associated values, like loc_specs={'mar':{'thresh':6,'min_points':6}, 'leo':{'thresh':4,'min_points':20}}.
reliable_action (str) : Insert "drop" (default) to remove transects that have less than the specified number of non-nnn points (thresh) or "keep" to keep them.
dirNameTrans (str): Path of the directory containing the transects (geopackages, .gpkg).
labels_order (list): Order of labels (magnitude of change) to be preserved.
loc_codes (list): List of strings containing location codes.
crs_dict_string (dict): Dictionary storing location codes as key and crs information as values, in dictionary form.
Returns:
rbcd_transects (gpd.GeoDataFrame): A GeoDataFrames containing the steady-state distribution of each transect.
"""
steady_state_tr = pd.DataFrame()
df_labelled["spatial_id"]=[spatial_id(geometry_in) for geometry_in in df_labelled.geometry]
for loc in tqdm(df_labelled.location.unique()):
data_loc = df_labelled.query(f"location=='{loc}'")
if loc not in loc_specs.keys():
print(f"No threshold and mi_points provided in loc_specs for {loc}. Using no filters.")
loc_thresh=0
loc_min_pts=0
else:
loc_thresh=loc_specs[loc]['thresh']
loc_min_pts=loc_specs[loc]['min_points']
for tr_id in data_loc.tr_id.unique():
data_tr = data_loc.query(f"tr_id=='{tr_id}'")
if data_tr.empty:
data_tr = data_loc.query(f"tr_id=={tr_id}")
data_tr["spatial_id"]=[spatial_id(geometry_in) for geometry_in in data_tr.geometry]
data_piv = data_tr.pivot(
values='markov_tag',
index="spatial_id",
columns='dt',
)
# identify the points that have less than the required number of transitions (thresh) of non nan states
valid_pts=((~data_piv.isnull()).sum(axis=1)>=loc_thresh).sum()
# has this transect a number of valid points above the specified min_pts parameter?
valid_transect= valid_pts >= loc_min_pts
# drop ivalid points
data_piv.dropna(axis=0, thresh=loc_thresh, inplace=True)
# all the NaN will be named 'nnn'
data_piv.fillna("nnn", inplace=True)
n = data_piv.shape[0]
arr = np.array(data_piv)
m = Markov(arr)
try:
steady_state = m.steady_state
steady_state = pd.DataFrame(
m.steady_state, index=m.classes, columns=[tr_id]
)
steady_state.reset_index(inplace=True)
steady_state.rename({"index": "markov_tag"}, axis=1, inplace=True)
steady_state = steady_state.melt(
id_vars="markov_tag", value_name="p", var_name="tr_id"
)
steady_state["location"] = loc
steady_state["thresh"] = loc_thresh
steady_state["min_pts"] = loc_min_pts
steady_state["valid_pts"] = valid_pts
steady_state["reliable"] = valid_transect
steady_state_tr = pd.concat(
[steady_state, steady_state_tr], ignore_index=True
)
except BaseException:
print(f"tr_id {tr_id} has {n} valid points.")
null_df=pd.DataFrame({'markov_tag':df_labelled.markov_tag.unique(),
'p':[np.nan for i in df_labelled.markov_tag.unique()]})
null_df["tr_id"]=tr_id
null_df["location"]=loc
null_df["thresh"]=loc_thresh
null_df["min_pts"]=loc_min_pts
null_df["valid_pts"]=valid_pts
null_df["reliable"]=valid_transect
steady_state_tr = pd.concat(
[null_df, steady_state_tr], ignore_index=True
)
# what to do with unreliable transects?
if reliable_action =='drop':
steady_state_tr=steady_state_tr.query("reliable == True")
elif reliable_action =='keep':
pass
else:
raise ValueError("The parameter 'reliable_action' must be either 'drop' or 'keep'.")
ss_transects_idx = pd.DataFrame()
idx_matrix=len(labels_order)//2
for loc in steady_state_tr.location.unique():
sub = steady_state_tr.query(f"location=='{loc}'")
sub = sub.pivot(index="markov_tag", columns="tr_id", values=("p"))
if len(sub.index)!=len(labels_order):
raise ValueError(f" The following magnitude labels are missing from the steady state dataset: {set(sub.index).symmetric_difference(set(labels_order))}.")
else:
pass
sub = sub.loc[labels_order, :]
# Create erosion and deposition sub-matrix
erosion = sub.iloc[idx_matrix:, :].transpose()
erosion["erosion"] = erosion.sum(axis=1)
erosion=erosion.reset_index()[["tr_id","erosion"]]
deposition = sub.iloc[:-idx_matrix, :].transpose()
deposition["deposition"] = deposition.sum(axis=1)
deposition=deposition.reset_index()[["tr_id","deposition"]]
merged_erodepo=pd.merge(erosion, deposition)
merged_erodepo["residual"] = merged_erodepo.deposition - merged_erodepo.erosion
merged_erodepo["location"] = loc
to_plot = merged_erodepo.melt(
id_vars=["tr_id"], var_name="process", value_name="coastal_index"
)
to_plot["location"] = loc
path_trs=glob.glob(rf"{dirNameTrans}/{loc}*")[0]
transect_in = gpd.read_file(path_trs)
transect_in.columns= transect_in.columns.str.lower()
merged_erodepo["geometry"] = pd.merge(
merged_erodepo, transect_in[["tr_id","geometry"]], how="left", on="tr_id"
).geometry
ss_transects_idx_loc = gpd.GeoDataFrame(
merged_erodepo, geometry="geometry", crs=crs_dict_string[loc] )
ss_transects_idx=pd.concat([ss_transects_idx_loc,ss_transects_idx], ignore_index=True)
return ss_transects_idx
get_sil_location(merged_df, ks, feature_set, random_state=10)
¶
Function to obtain average Silhouette scores for a list of number of clusters (k) in all surveys. It uses KMeans as a clusteres and parallel processing for improved speed.
Warning
It might take up to 8h for tables of 6,000,000 observations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
merged_df |
pd.DataFrame |
The clean and merged dataframe containing the features. |
required |
ks |
tuple |
starting and ending number of clusters to run KMeans and compute SA on. |
required |
feature_set |
list |
List of strings of features in the dataframe to use for clustering. |
required |
random_state |
int |
Random seed used to make the randomisation deterministic. |
10 |
Returns:
Type | Description |
---|---|
sil_df (pd.DataFrame) |
A dataframe containing average Silhouette scores for each survey, based on the provided feature set. |
Source code in sandpyper\common.py
def get_sil_location(merged_df, ks, feature_set, random_state=10):
"""
Function to obtain average Silhouette scores for a list of number of clusters (k) in all surveys.
It uses KMeans as a clusteres and parallel processing for improved speed.
Warning:
It might take up to 8h for tables of 6,000,000 observations.
Args:
merged_df (pd.DataFrame): The clean and merged dataframe containing the features.
ks (tuple): starting and ending number of clusters to run KMeans and compute SA on.
feature_set (list): List of strings of features in the dataframe to use for clustering.
random_state (int): Random seed used to make the randomisation deterministic.
Returns:
sil_df (pd.DataFrame): A dataframe containing average Silhouette scores for each survey, based on the provided feature set.
"""
# Creates the range of k to be used for Silhouette Analysis
k_rng = range(*ks)
# Get list of locations
list_locs = merged_df.location.unique()
# Setting up the estimator to scale and translate each feature
# individually to a 0 to 1 range.
scaler = MinMaxScaler()
location_series = []
dates_series = []
n_clusters_series = []
silhouette_avg_series = []
for location in tqdm(list_locs):
list_dates = merged_df.query(f"location=='{location}'").raw_date.unique()
for survey_date_in in tqdm(list_dates):
print(f"Working on : {location}, {survey_date_in}.")
data_in = merged_df.query(
f"location=='{location}' & raw_date == {survey_date_in}"
)
data_in = data_in[feature_set]
data_in.dropna(inplace=True)
for n_clusters in tqdm(k_rng):
minmax_scaled_df = scaler.fit_transform(data_in)
minmax_scaled_df = np.nan_to_num(minmax_scaled_df)
clusterer = KMeans(
n_clusters=n_clusters,
init="k-means++",
algorithm="elkan",
tol=0.0001,
random_state=random_state,
)
cluster_labels = clusterer.fit_predict(np.nan_to_num(minmax_scaled_df))
# The silhouette_score gives the average value for all the samples.
# This gives a perspective into the density and separation of the formed
# clusters
silhouette_avg = silhouette_score(minmax_scaled_df, cluster_labels)
print(
"For n_clusters =",
n_clusters,
"The average silhouette_score is :",
silhouette_avg,
)
location_series.append(location)
dates_series.append(survey_date_in)
n_clusters_series.append(n_clusters)
silhouette_avg_series.append(silhouette_avg)
items_dict = {
"location": pd.Series(data=location_series),
"raw_date": pd.Series(data=dates_series),
"k": pd.Series(data=n_clusters_series),
"silhouette_mean": pd.Series(data=silhouette_avg_series),
}
sil_df = pd.DataFrame(items_dict)
return sil_df
get_state_vol_table(sand_pts, lod, full_specs_table, dx, transect_spacing=20, outliers=False, sigma_n=3)
¶
Function to compute location-level altimetric beach change statistics from the multitemporal table. By default, only sand points beyond LoD are accounted for. Optionally, LoD filter can be turned off. The table contains info on: - monitoring period: location code, full name, period code, dates, number of days and valid points - absolute altimetric change: total beachface rising, lowering and net change - normalised altimetric change: meters of rising, lowering and net elevation change per valid survey point (MEC)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sand_pts |
Pandas dataframe |
multitemporal table. |
required |
lod |
str, bool |
if valid path to an LoD table, use the table. If a value is provided, use the value across all surveys. If False, do not apply LoD filter.cAll elevation change (dh) values within LoD will be set to zero. |
required |
full_specs_table |
False, path |
Full path to the table with extended monitoring info. If False, monitoring period information are limited. |
required |
dx |
int,float |
The along-transect point spacing. |
required |
transect_spacing |
int |
Alongshore spacing of transects (m) |
20 |
outliers |
bool |
when True, use the specified number of standard deviation to exclude outliers. If False, retain all the points. |
False |
sigma_n |
int |
number of standard deviation to use to exclude outliers (default=3). |
3 |
Returns:
Type | Description |
---|---|
volumetrics_location (pd.DataFrame) |
A dataframe storing altimetric beach change info and other information for every period and location. |
Source code in sandpyper\common.py
def get_state_vol_table(
sand_pts, lod, full_specs_table, dx, transect_spacing=20, outliers=False, sigma_n=3
):
"""
Function to compute location-level altimetric beach change statistics from the multitemporal table.
By default, only sand points beyond LoD are accounted for. Optionally, LoD filter can be turned off.
The table contains info on:
- monitoring period: location code, full name, period code, dates, number of days and valid points
- absolute altimetric change: total beachface rising, lowering and net change
- normalised altimetric change: meters of rising, lowering and net elevation change per valid survey point (MEC)
Args:
sand_pts (Pandas dataframe): multitemporal table.
lod (str, bool): if valid path to an LoD table, use the table. If a value is provided, use the value across all surveys. If False, do not apply LoD filter.cAll elevation change (dh) values within LoD will be set to zero.
full_specs_table (False, path): Full path to the table with extended monitoring info. If False, monitoring period information are limited.
dx (int,float): The along-transect point spacing.
transect_spacing (int): Alongshore spacing of transects (m)
outliers (bool): when True, use the specified number of standard deviation to exclude outliers. If False, retain all the points.
sigma_n (int): number of standard deviation to use to exclude outliers (default=3).
Returns:
volumetrics_location (pd.DataFrame): A dataframe storing altimetric beach change info and other information for every period and location.
"""
if isinstance(full_specs_table, bool):
if bool(full_specs_table) == False:
skip_details = True
else:
print("Please provide the path to the specs table.")
elif isinstance(full_specs_table, str):
if os.path.isfile(full_specs_table):
table_details = pd.read_csv(full_specs_table)
skip_details = False
else:
raise TypeError("Not a valid path to the .csv file for the specs table.")
elif isinstance(full_specs_table, pd.DataFrame):
table_details = full_specs_table
skip_details = False
else:
raise TypeError("Not a valid path to the .csv file for the specs table.")
tr_df_full = pd.DataFrame()
locs = sand_pts.location.unique().tolist()
for loc in locs:
test_loc = sand_pts.query(f"location == '{loc}'")
for dt in test_loc.dt.unique():
test_dt = test_loc.query(f" dt =='{dt}'")
data_in = prep_heatmap(test_dt, lod=lod, outliers=outliers, sigma_n=sigma_n)
# compute erosion and deposition volumes at site level
data_in_erosion = data_in[data_in < 0]
data_in_deposition = data_in[data_in > 0]
if bool(skip_details):
pass
else:
specs = table_details.query(f"location=='{loc}' & dt=='{dt}'")
full_loc = specs.loc_full.values[0]
date_from = specs.date_pre.values[0]
date_to = specs.date_post.values[0]
n_days = specs.n_days.values[0]
beach_length = len(data_in.columns) * transect_spacing
n_obs_valid = (
data_in.count().sum()
) # sand only, within beachface, LoD filtered (default)
abs_in = data_in[data_in > 0].sum().sum() # total profiles rising
abs_out = data_in[data_in < 0].sum().sum() # total profiles lowering
abs_net_change = data_in.sum().sum() # net altimetric change
mec_m = (
abs_net_change / beach_length
) # meters of elevation change per meter of beach
norm_in = abs_in / n_obs_valid # meters of profile rising per valid point
norm_out = (
abs_out / n_obs_valid
) # meters of profile lowering per valid point
norm_net_change = abs_net_change / n_obs_valid # MEC
tot_vol_depo = (
data_in_deposition.apply(interpol_integrate, axis=0, **{'dx':dx}) * transect_spacing
).sum()
tot_vol_ero = (
data_in_erosion.apply(interpol_integrate, axis=0, **{'dx':dx}) * transect_spacing
).sum()
net_vol_change = tot_vol_depo + tot_vol_ero
location_m3_m = net_vol_change / beach_length
if bool(skip_details) == False:
df_dict = {
"location": loc,
"location_full": full_loc,
"dt": dt,
"date_from": date_from,
"date_to": date_to,
"n_days": n_days,
"abs_in": abs_in,
"abs_out": abs_out,
"abs_net_change": abs_net_change,
"mec_m": mec_m,
"norm_in": norm_in,
"norm_out": norm_out,
"norm_net_change": norm_net_change,
"tot_vol_depo": tot_vol_depo,
"tot_vol_ero": tot_vol_ero,
"net_vol_change": tot_vol_depo - abs(tot_vol_ero),
"location_m3_m": location_m3_m,
"n_obs_valid": n_obs_valid,
}
else:
df_dict = {
"location": loc,
"dt": dt,
"abs_in": abs_in,
"abs_out": abs_out,
"abs_net_change": abs_net_change,
"mec_m": mec_m,
"norm_in": norm_in,
"norm_out": norm_out,
"norm_net_change": norm_net_change,
"tot_vol_depo": tot_vol_depo,
"tot_vol_ero": tot_vol_ero,
"net_vol_change": tot_vol_depo - abs(tot_vol_ero),
"location_m3_m": location_m3_m,
"n_obs_valid": n_obs_valid,
}
df = pd.DataFrame(df_dict, index=[0])
tr_df_full = pd.concat([df, tr_df_full], ignore_index=True)
return tr_df_full
get_terrain_info(x_coord, y_coord, rdarray)
¶
Returns the value of the rdarray rasters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x_coord |
float |
Projected X coordinates of pixel to extract value. |
required |
y_coord |
float |
Projected Y coordinates of pixel to extract value. |
required |
rdarray |
rdarray |
rdarray dataset. |
required |
Returns:
Type | Description |
---|---|
px (int, float) |
rdarray pixel value. |
Source code in sandpyper\common.py
def get_terrain_info(x_coord, y_coord, rdarray):
"""
Returns the value of the rdarray rasters.
Args:
x_coord (float): Projected X coordinates of pixel to extract value.
y_coord (float): Projected Y coordinates of pixel to extract value.
rdarray (rdarray): rdarray dataset.
Returns:
px (int, float): rdarray pixel value.
"""
geotransform = rdarray.geotransform
xOrigin = geotransform[0] # top-left X
yOrigin = geotransform[3] # top-left y
pixelWidth = geotransform[1] # horizontal pixel resolution
pixelHeight = geotransform[5] # vertical pixel resolution
px = int((x_coord - xOrigin) / pixelWidth) # transform geographic to image coords
py = int((y_coord - yOrigin) / pixelHeight) # transform geographic to image coords
try:
return rdarray[py, px]
except BaseException:
return np.nan
get_transects_vol_table(sand_pts, lod, dx, full_specs_table, transect_spacing=20, outliers=False, sigma_n=3)
¶
Function to compute transect-level altimetric change statistics from the multitemporal table. By default, only sand points beyond LoD are accounted for. Optionally, LoD filter can be turned off. The table contains info on: - monitoring period: location code, full name, period code, transect ID, dates, number of days and valid points - absolute altimetric change: total profile rising, lowering and net change - normalised altimetric change: meters of rising, lowering and net elevation change per valid survey point (MEC)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sand_pts |
pd.DataFrame |
Multitemporal dh table. |
required |
lod |
str, float, False |
If valid path to an LoD table, use the table. If a value is provided, use the value across all surveys. If False, do not apply LoD filter. All elevation change (dh) values within LoD will be set to zero. |
required |
dx |
int,float |
The along-transect point spacing. |
required |
full_specs_table |
str, pd.DataFRame, bool |
Full path to the table with extended monitoring info or the table itself. If False, monitoring period information are limited. |
required |
outliers |
|
when True, use the specified number of standard deviation to exclude outliers. If False, retain all the points. |
False |
sigma_n |
int |
number of standard deviation to use to exclude outliers (default=3). |
3 |
Returns:
Type | Description |
---|---|
volumetrics_transects (pd.DataFrame) |
A dataframe storing altimetric beach change info at the transect level, and other information for every period and location. |
Source code in sandpyper\common.py
def get_transects_vol_table(
sand_pts,
lod,
dx,
full_specs_table,
transect_spacing=20, # INTEGRATED
outliers=False,
sigma_n=3,
):
"""
Function to compute transect-level altimetric change statistics from the multitemporal table.
By default, only sand points beyond LoD are accounted for. Optionally, LoD filter can be turned off.
The table contains info on:
- monitoring period: location code, full name, period code, transect ID, dates, number of days and valid points
- absolute altimetric change: total profile rising, lowering and net change
- normalised altimetric change: meters of rising, lowering and net elevation change per valid survey point (MEC)
Args:
sand_pts (pd.DataFrame): Multitemporal dh table.
lod (str, float, False): If valid path to an LoD table, use the table. If a value is provided, use the value across all surveys. If False, do not apply LoD filter. All elevation change (dh) values within LoD will be set to zero.
dx (int,float): The along-transect point spacing.
full_specs_table (str, pd.DataFRame, bool): Full path to the table with extended monitoring info or the table itself. If False, monitoring period information are limited.
outliers: when True, use the specified number of standard deviation to exclude outliers. If False, retain all the points.
sigma_n (int): number of standard deviation to use to exclude outliers (default=3).
Returns:
volumetrics_transects (pd.DataFrame): A dataframe storing altimetric beach change info at the transect level, and other information for every period and location.
"""
transects_df_full = pd.DataFrame()
locs = sand_pts.location.unique().tolist()
if isinstance(full_specs_table, bool):
if bool(full_specs_table) == False:
skip_details = True
else:
print("Please provide the path to the specs table.")
elif isinstance(full_specs_table, str):
if os.path.isfile(full_specs_table):
table_details = pd.read_csv(full_specs_table)
skip_details = False
else:
raise TypeError("Not a valid path to the .csv file for the specs table.")
elif isinstance(full_specs_table, pd.DataFrame):
table_details = full_specs_table
skip_details = False
else:
raise TypeError("Not a valid path to the .csv file for the specs table.")
for loc in locs:
test_loc = sand_pts.query(f"location == '{loc}'")
for dt in test_loc.dt.unique():
test_dt = test_loc.query(f" dt =='{dt}'")
data_in = prep_heatmap(test_dt, lod, outliers=False, sigma_n=3)
tr_ids = sorted(data_in.columns.values)
if skip_details != True:
specs = table_details.query(f"location=='{loc}' & dt=='{dt}'")
full_loc = specs.loc_full.values[0]
date_from = specs.date_pre.values[0]
date_to = specs.date_post.values[0]
n_days = specs.n_days.values[0]
else:
pass
trs_volumes = data_in.apply(interpol_integrate, **{'dx':dx}, axis=0)
trs_volumes.name = "tot_vol_change"
beach_lengths = data_in.apply(get_beachface_length, axis=0)
tr_df = pd.DataFrame(trs_volumes)
# normalise the volume change computed in the transect by its cross-shore
# length
tr_df["m3_m"] = (
trs_volumes.values * transect_spacing
) / beach_lengths.values
tr_df = tr_df.reset_index()
tr_df.rename({"int_tr": "tr_id"}, axis=1, inplace=True)
tr_df["n_obs_valid"] = data_in.count().values
tr_df["abs_in"] = data_in[data_in > 0].sum().values
tr_df["abs_out"] = data_in[data_in < 0].sum().values
tr_df["abs_net_change"] = data_in.sum().values
tr_df["mec_m"] = tr_df.abs_net_change.values / beach_lengths.values
tr_df["norm_in"] = tr_df.abs_in.values / tr_df.n_obs_valid.values
tr_df["norm_out"] = tr_df.abs_out.values / tr_df.n_obs_valid.values
tr_df["norm_net_change"] = (
tr_df.abs_net_change.values / tr_df.n_obs_valid.values
)
tr_df["dt"] = dt
tr_df["location"] = loc
if skip_details != True:
tr_df["location_full"] = full_loc
tr_df["date_from"] = date_from
tr_df["date_to"] = date_to
tr_df["n_days"] = n_days
else:
pass
transects_df_full = pd.concat([tr_df, transects_df_full], ignore_index=True)
return transects_df_full
getAngle(pt1, pt2)
¶
Helper function to return the angle of two points (pt1 and pt2) coordinates in degrees. Source: http://wikicode.wikidot.com/get-angle-of-line-between-two-points
Source code in sandpyper\common.py
def getAngle(pt1, pt2):
"""Helper function to return the angle of two points (pt1 and pt2) coordinates in degrees.
Source: http://wikicode.wikidot.com/get-angle-of-line-between-two-points"""
x_diff = pt2[0] - pt1[0]
y_diff = pt2[1] - pt1[1]
return math.degrees(math.atan2(y_diff, x_diff))
getCrs_from_raster_path(ras_path)
¶
Returns the EPSG code of the input raster (geotiff).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ras_path |
str |
Path of the raster. |
required |
Returns:
Type | Description |
---|---|
epsg_code (int) |
EPSG code of the input raster. |
Source code in sandpyper\common.py
def getCrs_from_raster_path(ras_path):
"""Returns the EPSG code of the input raster (geotiff).
Args:
ras_path (str): Path of the raster.
Returns:
epsg_code (int): EPSG code of the input raster.
"""
with ras.open(r"{}".format(ras_path)) as raster:
return raster.crs.to_epsg()
getCrs_from_transect(trs_path)
¶
Returns the EPSG code of the input transect file (geopackage).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
trs_path |
str |
Path of the transect file. |
required |
Returns:
Type | Description |
---|---|
epsg_code (int) |
EPSG code of the input transect file. |
Source code in sandpyper\common.py
def getCrs_from_transect(trs_path):
"""Returns the EPSG code of the input transect file (geopackage).
Args:
trs_path (str): Path of the transect file.
Returns:
epsg_code (int): EPSG code of the input transect file.
"""
return gpd.read_file(trs_path).crs
getDate(filename)
¶
Returns the date in raw form (i.e 20180912) from already formatted filenames.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
filename (i.e. apo_20180912_dsm_ahd.tiff). |
required |
Returns:
Type | Description |
---|---|
str |
raw date. |
Source code in sandpyper\common.py
def getDate(filename):
"""
Returns the date in raw form (i.e 20180912) from already formatted filenames.
Args:
filename (str): filename (i.e. apo_20180912_dsm_ahd.tiff).
Returns:
str : raw date.
"""
# get the date out of a file input
num_ditis = re.findall("\\d+", filename)
# now we need to convert each number into integer. int(string) converts string into integer
# we will map int() function onto all elements of numbers list
num_ditis = map(int, num_ditis)
try:
date_ = max(num_ditis)
if len(str(date_)) == 8:
return date_
else:
print(f"Unable to find correct date from input filename. Found: {date_}.")
except BaseException:
raise TypeError(print("No numbers in the input filename."))
return max(num_ditis)
getListOfDate(list_dsm)
¶
Returns the a list of raw dates (i.e 20180912) from a list of formatted filenames.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
list_dsm |
list |
list of filenames of DSM or rothophotos datasets. |
required |
Returns:
Type | Description |
---|---|
list |
raw dates. |
Source code in sandpyper\common.py
def getListOfDate(list_dsm):
"""
Returns the a list of raw dates (i.e 20180912) from a list of formatted filenames.
Args:
list_dsm (list): list of filenames of DSM or rothophotos datasets.
Returns:
list : raw dates.
"""
dates = []
for i in list_dsm:
temp = getDate(i)
dates.append(temp)
return dates
getListOfFiles(dirName)
¶
Function to create a list of files from a folder path, including sub folders.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dirName |
str |
Path of the parent directory. |
required |
Returns:
Type | Description |
---|---|
allFiles |
list of full paths of all files found. |
Source code in sandpyper\common.py
def getListOfFiles(dirName):
"""
Function to create a list of files from a folder path, including sub folders.
Args:
dirName (str): Path of the parent directory.
Returns:
allFiles : list of full paths of all files found.
"""
# create a list of file and sub directories names in the given directory
listOfFile = os.listdir(dirName)
allFiles = list() # Iterate over all the entries
for entry in listOfFile:
fullPath = os.path.join(dirName, entry) # Create full path
if os.path.isdir(
fullPath
): # If entry is a directory then get the list of files in this directory
allFiles = allFiles + getListOfFiles(fullPath)
else:
allFiles.append(fullPath)
return allFiles
getLoc(filename, list_loc_codes)
¶
Function that returns the location code from properly formatted (see documentation) filenames.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
filename (i.e. apo_20180912_dsm_ahd.tiff). |
required |
list_loc_codes |
list |
list of strings containing location codes. |
required |
Returns:
Type | Description |
---|---|
str |
location codes. |
Source code in sandpyper\common.py
def getLoc(filename, list_loc_codes):
"""
Function that returns the location code from properly formatted (see documentation) filenames.
Args:
filename (str): filename (i.e. apo_20180912_dsm_ahd.tiff).
list_loc_codes (list): list of strings containing location codes.
Returns:
str : location codes.
"""
return next((x for x in list_loc_codes if x in filename), False)
getPoint1(pt, bearing, dist)
¶
Helper function to return the point coordinates at a determined distance (dist) and bearing from a starting point (pt).
Source code in sandpyper\common.py
def getPoint1(pt, bearing, dist):
"""Helper function to return the point coordinates at a determined distance (dist) and bearing from a starting point (pt)."""
angle = bearing + 90
bearing = math.radians(angle)
x = pt[0] + dist * math.cos(bearing)
y = pt[1] + dist * math.sin(bearing)
return Point(x, y)
grid_from_pts(pts_gdf, width, height, crs, offsets=(0, 0, 0, 0))
¶
Create a georeferenced grid of polygones from points along a line (shoreline). Used to extract tiles (images patches) from rasters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pts_gdf |
gpd.GeoDataFrame |
The geodataframe storing points along a shoreline. |
required |
height |
int,float |
The height of each single tile of the grid, given in the CRS unit (use projected CRS). |
required |
crs |
str |
Coordinate Reference System in the dictionary format (example: {'init' :'epsg:4326'}) |
required |
offsets |
tuple |
Offsets in meters (needs projected CRS) from the bounds of the pts_gdf, in the form of (xmin, ymin, xmax, ymax). Default to (0,0,0,0). |
(0, 0, 0, 0) |
Returns:
Type | Description |
---|---|
Grid (gpd.GeoDataFrame) |
A GeoDataFrame storing polygon grids, with IDs and geometry columns. |
Source code in sandpyper\common.py
def grid_from_pts(pts_gdf, width, height, crs, offsets=(0, 0, 0, 0)):
"""
Create a georeferenced grid of polygones from points along a line (shoreline).
Used to extract tiles (images patches) from rasters.
Args:
pts_gdf (gpd.GeoDataFrame): The geodataframe storing points along a shoreline.
width (int, float) The width of each single tile of the grid, given in the CRS unit (use projected CRS).
height (int,float): The height of each single tile of the grid, given in the CRS unit (use projected CRS).
crs (str): Coordinate Reference System in the dictionary format (example: {'init' :'epsg:4326'})
offsets (tuple): Offsets in meters (needs projected CRS) from the bounds of the pts_gdf, in the form of (xmin, ymin, xmax, ymax). Default to (0,0,0,0).
Returns:
Grid (gpd.GeoDataFrame): A GeoDataFrame storing polygon grids, with IDs and geometry columns.
"""
xmin, ymin, xmax, ymax = tuple(map(operator.add, pts_gdf.total_bounds, offsets))
rows = int(np.ceil((ymax - ymin) / height))
cols = int(np.ceil((xmax - xmin) / width))
XleftOrigin = xmin
XrightOrigin = xmin + width
YtopOrigin = ymax
YbottomOrigin = ymax - height
polygons = []
for i in range(cols):
Ytop = YtopOrigin
Ybottom = YbottomOrigin
for j in range(rows):
polygons.append(
Polygon(
[
(XleftOrigin, Ytop),
(XrightOrigin, Ytop),
(XrightOrigin, Ybottom),
(XleftOrigin, Ybottom),
]
)
)
Ytop = Ytop - height
Ybottom = Ybottom - height
XleftOrigin = XleftOrigin + width
XrightOrigin = XrightOrigin + width
grid = gpd.GeoDataFrame(
{"grid_id": range(len(polygons)), "geometry": polygons}, crs=crs
)
return grid
grid_from_shore(shore, width, height, location_code, adj_order=1, crs='shore', shore_res=10, offsets=(0, 0, 0, 0), plot_it=True)
¶
Create a georeference grid of equal polygones (tiles) along a line (shoreline) and select those tiles that contain or are adjacent to the shoreline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
shore |
geodataframe |
The geodataframe storing the input line from where the grid will be created. |
required |
width, |
height (int,float |
The width and height of each single tile of the grid, given in the CRS unit (use projected CRS). |
required |
location_code |
str |
The location code associated with the grid. |
required |
adj_order |
False, int |
Contiguity order to subset grid cells adjacent to shoreline. If False, only cells directly touching the shoreline will be extracted (Default=1). Note: The Pysal Queen method is used to compute neighbors. For more info: https://pysal.org/libpysal/generated/libpysal.weights.Queen.html#libpysal.weights.Queen |
1 |
crs |
dict or 'shore' |
If 'shore', use the same CRS of the input line. If dict, keys should be the location code and values the values crs in the dictionary format ('wbl' : {'init' :'epsg:32754'}). |
'shore' |
shore_res |
int,float |
the alongshore spacing of points plotted along the line in the CRS unit (default=10). It doesn't need to be a small value, it is used to get the extent of the bounding box that encapsulate all the shoreline, before split this into a grid. |
10 |
offsets |
tuple |
Offsets in meters (needs projected CRS) from the bounds of the pts_gdf, in the form of (xmin, ymin, xmax, ymax). Default to (0,0,0,0). |
(0, 0, 0, 0) |
plot_it |
bool |
plot the shoreline, full grid and the tiles selected containing the lien (in red). Default to True. |
True |
Returns:
Type | Description |
---|---|
gpd.GeoDataFrame |
Grid of tiles in the specified proximity of the shoreline. |
Source code in sandpyper\common.py
def grid_from_shore(
shore,
width,
height,
location_code,
adj_order=1,
crs="shore",
shore_res=10,
offsets=(0, 0, 0, 0),
plot_it=True,
):
"""
Create a georeference grid of equal polygones (tiles) along a line (shoreline) and select those tiles that contain or are adjacent to the shoreline.
Args:
shore (geodataframe): The geodataframe storing the input line from where the grid will be created.
width, height (int,float): The width and height of each single tile of the grid, given in the CRS unit (use projected CRS).
location_code (str): The location code associated with the grid.
adj_order (False, int): Contiguity order to subset grid cells adjacent to shoreline. If False, only cells
directly touching the shoreline will be extracted (Default=1). Note: The Pysal Queen method is used to compute neighbors.
For more info: https://pysal.org/libpysal/generated/libpysal.weights.Queen.html#libpysal.weights.Queen
crs (dict or 'shore'): If 'shore', use the same CRS of the input line. If dict, keys should be the location code and values the values crs in the dictionary format ('wbl' : {'init' :'epsg:32754'}).
shore_res (int,float): the alongshore spacing of points plotted along the line in the CRS unit (default=10). It doesn't need to be a small value, it is used to get the extent of the bounding box that encapsulate all the shoreline, before split this into a grid.
offsets (tuple): Offsets in meters (needs projected CRS) from the bounds of the pts_gdf,
in the form of (xmin, ymin, xmax, ymax). Default to (0,0,0,0).
plot_it (bool): plot the shoreline, full grid and the tiles selected containing the lien (in red). Default to True.
Returns:
gpd.GeoDataFrame: Grid of tiles in the specified proximity of the shoreline.
"""
xs = []
ys = []
points = []
for distance in np.arange(
0, shore.length.values[0], shore_res
): # shore_res: meters alongshroe to get points from shore
pt = shore.interpolate(distance)
points.append(pt.values[0])
xs.append(pt.values[0].x)
ys.append(pt.values[0].y)
pts = [[x, y] for x, y in zip(xs, ys)]
pts = np.array(pts)
if isinstance(crs, dict):
crs_in = crs[location_code]
elif crs == "shore":
crs_in = shore.crs
points_gdf = gpd.GeoDataFrame(
{"local_id": range(len(points)), "geometry": points},
geometry="geometry",
crs=crs_in,
)
grid = grid_from_pts(points_gdf, width, height, crs=crs_in, offsets=offsets)
# select grid cells that contains shoreline points
shore_grids = grid[
grid.geometry.apply(lambda x: points_gdf.geometry.intersects(x).any())
]
if adj_order != False:
w = Queen.from_dataframe(grid, "geometry")
if adj_order >= 1:
print(f"Higher order ({adj_order}) selected.")
w = higher_order(w, adj_order)
df_adjc = w.to_adjlist()
# get the unique neighbors of all focal cells
qee_polys_ids = set(
df_adjc.query(f"focal in {list(shore_grids.grid_id)}").neighbor
)
# subset grid based on qee_polys_ids
shore_grids = grid.query(f"grid_id in {list(qee_polys_ids)}")
else:
pass
if bool(plot_it):
f, ax = plt.subplots(figsize=(10, 10))
grid.plot(color="white", edgecolor="black", ax=ax)
shore.plot(ax=ax, color="b")
shore_grids.geometry.boundary.plot(
color=None, edgecolor="r", linewidth=1, ax=ax
)
add_grid_loc_coords(shore_grids, location=location_code)
return shore_grids
images_to_dirs(images_folder, target_folder, op=None)
¶
Create one folder per image named as the image. Optionally, move image into it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
images_folder |
str |
path to the directory where the images are stored. |
required |
target_folder |
str |
target path where create subfolders named as the images. |
required |
op |
None, "copy", "move" |
Move or copy the images into the newly created folders. None, creates empty subfolders. |
None |
Source code in sandpyper\common.py
def images_to_dirs(images_folder, target_folder, op=None):
"""Create one folder per image named as the image. Optionally, move image into it.
Args:
images_folder (str): path to the directory where the images are stored.
target_folder (str): target path where create subfolders named as the images.
op (None, "copy", "move"): Move or copy the images into the newly created folders. None, creates empty subfolders.
"""
images = os.listdir(images_folder)
original_images_paths = [os.path.join(images_folder, image) for image in images]
ids = [os.path.splitext(image)[0] for image in images]
if op != None:
target_images_paths = [os.path.join(target_folder, id_in) for id_in in ids]
# create ID-folders in target_parent_folder, named as image names
starting_wd = os.getcwd() # store starting working directory
os.chdir(target_folder) # change working dir
if op == None:
for id_i in ids:
if not os.path.exists(os.path.join(target_folder, id_i)):
os.mkdir(id_i)
else:
print(f"{id_i} exists already. Skipping this one.")
elif op != None:
for id_i, source, destination in zip(
ids, original_images_paths, target_images_paths
):
if not os.path.exists(os.path.join(target_folder, id_i)):
os.mkdir(id_i)
if op == "move":
move(source, destination)
elif op == "copy":
copy(source, destination)
else:
raise ValueError(
"op parameter not set. 'move' to move images into newly created folders, 'copy' to copy them or None to only create folders."
)
else:
print(f"{id_i} exists already. {op} only.")
if op == "move":
move(source, destination)
elif op == "copy":
copy(source, destination)
else:
raise ValueError(
"op parameter not set. 'move' to move images into newly created folders, 'copy' to copy them or None to only create folders."
)
os.chdir(starting_wd) # returning to starting working dir
print(f"Successfully created {len(ids)} ID-folders in {target_folder} .")
interpol_integrate(series, dx)
¶
Linearly interpolate NaN values (non-sand) within the first and last valid points (from the swash to the landward end of each transect), and intergrate the area below this interoplated profile, to obtain transect specific estimates of volumetric change.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
series |
pd.Series |
Series of elevation change with distance as indices. |
required |
dx |
int,float |
The along-transect point spacing. |
required |
Returns:
Type | Description |
---|---|
integrated_area (float) |
Volumetric change in cubic meters. |
Source code in sandpyper\common.py
def interpol_integrate(series, dx):
"""
Linearly interpolate NaN values (non-sand) within the first and last valid points (from the swash to the landward end of each transect),
and intergrate the area below this interoplated profile, to obtain transect specific estimates of volumetric change.
Args:
series (pd.Series): Series of elevation change with distance as indices.
dx (int,float): The along-transect point spacing.
Returns:
integrated_area (float): Volumetric change in cubic meters.
"""
min_dist, max_dist = (
series.first_valid_index(),
series.last_valid_index(),
) # get distances of first and last sand points
interpol = series.loc[min_dist:max_dist].interpolate() # interpolate linearly
area_simps = simps(interpol.values, dx=dx) # intergrate using Simpson's method
return area_simps
kmeans_sa(merged_df, ks, feature_set, thresh_k=5, random_state=10)
¶
Function to use KMeans on all surveys with the optimal k obtained from the Silhouette Analysis. It uses KMeans as a clusterer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
merged_df |
Pandas dataframe |
The clean and merged dataframe containing the features. Must contain the columns point_id, location and survey_date, as well as the |
required |
ks |
int, dict |
number of clusters (k) or dictionary containing the optimal k for each survey. See get_opt_k function. |
required |
feature_set |
list |
List of names of features in the dataframe to use for clustering. |
required |
thresh_k |
int |
Minimim k to be used. If survey-specific optimal k is below this value, then k equals the average k of all above threshold values. |
5 |
random_state |
int |
Random seed used to make the randomisation deterministic. |
10 |
Returns:
Type | Description |
---|---|
data_classified (pd.DataFrame) |
A dataframe containing the label_k column, with point_id, location, survey_date and the features used to cluster the data. |
Source code in sandpyper\common.py
def kmeans_sa(merged_df, ks, feature_set, thresh_k=5, random_state=10):
"""
Function to use KMeans on all surveys with the optimal k obtained from the Silhouette Analysis.
It uses KMeans as a clusterer.
Args:
merged_df (Pandas dataframe): The clean and merged dataframe containing the features. Must contain the columns point_id, location and survey_date, as well as the
ks (int, dict): number of clusters (k) or dictionary containing the optimal k for each survey. See get_opt_k function.
feature_set (list): List of names of features in the dataframe to use for clustering.
thresh_k (int): Minimim k to be used. If survey-specific optimal k is below this value, then k equals the average k of all above threshold values.
random_state (int): Random seed used to make the randomisation deterministic.
Returns:
data_classified (pd.DataFrame): A dataframe containing the label_k column, with point_id, location, survey_date and the features used to cluster the data.
"""
merged_df.dropna(inplace=True)
list_locs = merged_df.location.unique()
scaler = MinMaxScaler()
data_classified = pd.DataFrame()
# Set a threshold k, in case a k is lower than 5, use the mean optimal k
# of the other surveys above threshold
# # Compute the mean optimal k of above threshold ks
if isinstance(ks, dict):
arr_k = np.array([i for i in ks.values() if i > thresh_k])
mean_threshold_k = np.int(np.round(np.mean(arr_k), 0))
else:
pass
for location in tqdm(list_locs):
list_dates = merged_df.query(f"location=='{location}'").raw_date.unique()
for survey_date_in in tqdm(list_dates):
data_in = merged_df.query(
f"location=='{location}'& raw_date == {survey_date_in}"
)
data_clean = data_in[feature_set].apply(pd.to_numeric)
if isinstance(ks, dict):
k = ks[f"{location}_{survey_date_in}"]
else:
k=ks
if k < thresh_k:
k = mean_threshold_k
else:
pass
minmax_scaled_df = scaler.fit_transform(np.nan_to_num(data_clean))
clusterer = KMeans(
n_clusters=k,
init="k-means++",
algorithm="elkan",
tol=0.0001,
random_state=random_state,
)
data_in["label_k"] = clusterer.fit_predict(minmax_scaled_df)
data_classified = pd.concat(
[data_in, data_classified], ignore_index=True
)
return data_classified
LISA_site_level(dh_df, crs_dict_string, mode, distance_value=None, decay=None, k_value=None, geometry_column='geometry')
¶
Performs Hot-Spot analysis using Local Moran's I as LISA for all the survey. Please refer to PySAL package documentation for more info.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dh_df |
pd.DataFrame, str |
Pandas dataframe or local path of the timeseries files, as returned by the multitemporal extraction. |
required |
crs_dict_string |
dict |
Dictionary storing location codes as key and crs information as EPSG code (int). |
required |
geometry_column |
str |
field storing the geometry column. If in string form (as loaded from a csv), it will be converted to Point objects. Default='coordinates'. |
'geometry' |
mode |
str |
If 'distance'(Default), compute spatial weight matrix using a distance-band kernel, specified in distance_value parameter. If 'knn', spatial weight matrix uses a specified (k_value parameter) of k number closest points to compute weights. if 'idw', Inverse Distance Weigthing is used with the specified decay power (decay parameter) to compute weight. |
required |
distance_value |
int |
values in meters (crs must be projected) used as distance band for neigthours definition in distance weight matrix computation. |
None |
decay |
int |
power of decay to use with IDW. |
None |
k_value |
int |
number of closest points for neigthours definition in distance weight matrix computation. |
None |
Returns:
Type | Description |
---|---|
lisa_df (pd.DataFrame) |
Dataframe with the fdr threshold, local moran-s Is, p and z values and the quadrant in which each observation falls in a Moran's scatter plot. |
Source code in sandpyper\common.py
def LISA_site_level(
dh_df,
crs_dict_string,
mode,
distance_value=None,
decay=None,
k_value=None,
geometry_column='geometry'):
"""Performs Hot-Spot analysis using Local Moran's I as LISA for all the survey.
Please refer to PySAL package documentation for more info.
Args:
dh_df (pd.DataFrame, str): Pandas dataframe or local path of the timeseries files, as returned by the multitemporal extraction.
crs_dict_string (dict): Dictionary storing location codes as key and crs information as EPSG code (int).
geometry_column (str): field storing the geometry column. If in string form (as loaded from a csv), it will be converted to Point objects. Default='coordinates'.
mode (str): If 'distance'(Default), compute spatial weight matrix using a distance-band kernel, specified in distance_value parameter.
If 'knn', spatial weight matrix uses a specified (k_value parameter) of k number closest points to compute weights.
if 'idw', Inverse Distance Weigthing is used with the specified decay power (decay parameter) to compute weight.
distance_value (int): values in meters (crs must be projected) used as distance band for neigthours definition in distance weight matrix computation.
decay (int): power of decay to use with IDW.
k_value (int): number of closest points for neigthours definition in distance weight matrix computation.
Returns:
lisa_df (pd.DataFrame): Dataframe with the fdr threshold, local moran-s Is, p and z values and the quadrant in which each observation falls in a Moran's scatter plot.
"""
if isinstance(dh_df, str):
if os.path.isfile(dh_df):
df = pd.read_csv(dh_df)
else:
raise NameError ("The string provided in dh_df is not a valid path.")
elif isinstance(dh_df, pd.DataFrame):
df=dh_df
else:
raise NameError ("dh_df parameter must be either a valid path pointing to a multitemporal dataframe or the dataframe itself as a Pandas Dataframe.")
lisa_df = pd.DataFrame()
locs = df.location.unique() # obtain list of locations
# check whether a geometry type column is present
if not isinstance((df.loc[0,geometry_column]), Point):
df[geometry_column] = df[geometry_column].apply(wkt.loads)
else:
pass
for loc in tqdm(locs):
print(f"Working on {loc}")
df_in = df.query(f"location=='{loc}'") # subset a location
# create a GeoDataFrame with the right CRS
gdf = gpd.GeoDataFrame(df_in, geometry=geometry_column, crs=crs_dict_string[loc] )
dts = gdf.dt.unique() # obtain list of periods
for dt in tqdm(dts):
gdf_input = gdf.query(f"dt=='{dt}'") # subset a periods
gdf_input.dropna(axis=0, how="any", subset=["dh"], inplace=True)
# drop rows where dh is null, due to sand-only condition
if mode == "distance":
dist = distance_value
dist_mode = "distance_band"
decay = 0
dist_w = weights.DistanceBand.from_dataframe(
df=gdf_input, threshold=dist, binary=True
)
# create a binary spatial weight matrix with no IDW and specified
# distance
if mode == "idw":
dist = distance_value
dist_mode = "idw"
decay = decay
dist_w = weights.DistanceBand.from_dataframe(
df=gdf_input, threshold=dist, binary=False, alpha=decay
)
# create a binary spatial weight matrix with no IDW and specified
# distance
elif mode == "knn":
dist_mode = "k"
decay = 0
dist = k_value
dist_w = weights.distance.KNN.from_dataframe(df=gdf_input, k=int(k_value))
lisa_ = moran.Moran_Local(gdf_input.dh, dist_w, permutations=999)
fdr_lisa = fdr(lisa_.p_sim) # as k
# the False Discovery Rate threshold to use for significant cluster
gdf_input["lisa_fdr"] = fdr_lisa
# the quadrant of the Moran's scatter plot (Anselin 1995) in Pysal scheme
gdf_input["lisa_q"] = lisa_.q
gdf_input["lisa_I"] = lisa_.Is # the local Moran's Is
# the number of valid observations used
gdf_input["lisa_n_val_obs"] = lisa_.n
gdf_input["lisa_opt_dist"] = dist # the distance used
gdf_input["lisa_dist_mode"] = dist_mode # mode, k od distance
gdf_input["lisa_p_sim"] = lisa_.p_sim # permutations (999) based p-value
gdf_input["lisa_z_sim"] = lisa_.z_sim # permutations (999) based z-value
gdf_input["lisa_z"] = lisa_.z
# z-value of the original data I distribution (no permutation)
gdf_input["decay"] = decay
lisa_df = pd.concat([lisa_df, gdf_input], ignore_index=True)
return lisa_df
plot_alongshore_change(sand_pts, mode, lod, full_specs_table, return_data=False, location_subset=['wbl'], dt_subset=['dt_0'], ax2_y_lims=(-8, 5), save=False, save_path='C:\\your\\preferred\\folder\\', dpi=300, img_type='.png', from_land=True, from_origin=True, add_orient=False, fig_size=(7.3, 3), font_scale=1, plots_spacing=0, bottom=False, y_heat_bottom_limit=80, transect_spacing=20, along_transect_sampling_step=1, heat_yticklabels_freq=5, heat_xticklabels_freq=5, outliers=False, sigma_n=3)
¶
Display and optionally save alongshore altimetric and volumetric beach changes plots. A subset of locations and periods can be plotted. If LoD parameter is True (default), then white cells in the altimetric heatmap are values within LoD. Grey cells is no data or no sand points. Optionally, LoD filter can be turned off.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sand_pts |
Pandas dataframe |
multitemporal table. |
required |
mode |
str |
if 'subset', only a subset of locations and dts are plotted. If 'all', all periods and locations are plotted. . |
required |
lod |
path, value, False |
if valid path to an LoD table, use the table. If a value is provided, use the value across all surveys. If False, do not apply LoD filter. All elevation change (dh) values within LoD will be set to zero. |
required |
full_specs_table |
False, path |
Full path to the table with extended monitoring info. If False, monitoring period information are limited. |
required |
location_subset |
list |
list of strings containing the location codes (e.g. wbl) to be plotted. |
['wbl'] |
dt_subset |
list |
list of strings containing the period codes (e.g. dt_0) to be plotted. |
['dt_0'] |
ax2_y_lims |
tuple |
limits of y-axis of alonghsore volumetric change plot. Default is (-8,5). |
(-8, 5) |
save |
bool |
If True, saves the plots in the specified save_path. False is default. |
False |
save_path |
path |
Full path to a folder (e.g. C:\preferred\folder) where to save plots. |
'C:\\your\\preferred\\folder\\' |
dpi |
int |
Resolution in Dot Per Inch (DPI) to save the images. |
300 |
img_type |
str |
'.png','.pdf', '.ps', '.svg'. Format of the saved figures. |
'.png' |
from_land |
bool |
If True (default), cross-shore distances are transformed into landward distances, where 0 is the end of beachface. |
True |
from_origin |
bool |
If True (default), transect IDS are transformed in alongshore distance from origin (tr_id=0). It requires regularly spaced transects. |
True |
add_orient |
bool |
if True, an additional lineplot is added to the volumetric plot containing orientation info. It needs pre-computed orientations (tr_orient parameter) (TO UPDAte). False is default. |
False |
fig_size |
tuple |
Tuple of float to specify images size. Default is (7.3,3). |
(7.3, 3) |
font_scale |
float |
Scale of text. Default=1. |
1 |
plots_spacing |
float |
Vertical spacing of the heatmap and alongshore change plots. Default = 0. |
0 |
bottom |
bool |
If True (default), rows are extended seaward too, up to y_heat_bottom_limit. If False, only distances from 0 to the first valid values will be added. |
False |
y_heat_bottom_limit |
int |
Lower boundary distance (seaward) to extend all transects to. |
80 |
transect_spacing |
float |
Alongshore spacing of transects (m). |
20 |
outliers |
bool |
when True, use the specified number of standard deviation to exclude outliers. If False, retain all the points. |
False |
sigma_n |
int |
number of standard deviation to use to exclude outliers (default=3). |
3 |
Source code in sandpyper\common.py
def plot_alongshore_change(
sand_pts,
mode,
lod,
full_specs_table,
return_data=False,
location_subset=["wbl"],
dt_subset=["dt_0"],
ax2_y_lims=(-8, 5),
save=False,
save_path="C:\\your\\preferred\\folder\\",
dpi=300,
img_type=".png",
from_land=True,
from_origin=True,
add_orient=False,
fig_size=(7.3, 3),
font_scale=1,
plots_spacing=0,
bottom=False,
y_heat_bottom_limit=80,
transect_spacing=20,
along_transect_sampling_step=1,
heat_yticklabels_freq=5,
heat_xticklabels_freq=5,
outliers=False,
sigma_n=3,
):
"""
Display and optionally save alongshore altimetric and volumetric beach changes plots.
A subset of locations and periods can be plotted.
If LoD parameter is True (default), then white cells in the altimetric heatmap are values within LoD. Grey cells is no data or no sand points.
Optionally, LoD filter can be turned off.
Args:
sand_pts (Pandas dataframe): multitemporal table.
mode (str): if 'subset', only a subset of locations and dts are plotted. If 'all', all periods and locations are plotted. .
lod (path, value, False): if valid path to an LoD table, use the table. If a value is provided, use the value across all surveys. If False, do not apply LoD filter. All elevation change (dh) values within LoD will be set to zero.
full_specs_table (False, path): Full path to the table with extended monitoring info. If False, monitoring period information are limited.
location_subset (list): list of strings containing the location codes (e.g. wbl) to be plotted.
dt_subset (list): list of strings containing the period codes (e.g. dt_0) to be plotted.
ax2_y_lims (tuple): limits of y-axis of alonghsore volumetric change plot. Default is (-8,5).
save (bool): If True, saves the plots in the specified save_path. False is default.
save_path (path): Full path to a folder (e.g. C:\\preferred\\folder\\) where to save plots.
dpi (int): Resolution in Dot Per Inch (DPI) to save the images.
img_type (str): '.png','.pdf', '.ps', '.svg'. Format of the saved figures.
from_land (bool): If True (default), cross-shore distances are transformed into landward distances, where 0 is the end of beachface.
from_origin (bool): If True (default), transect IDS are transformed in alongshore distance from origin (tr_id=0). It requires regularly spaced transects.
add_orient (bool): if True, an additional lineplot is added to the volumetric plot containing orientation info. It needs pre-computed orientations (tr_orient parameter) (TO UPDAte). False is default.
fig_size (tuple): Tuple of float to specify images size. Default is (7.3,3).
font_scale (float): Scale of text. Default=1.
plots_spacing (float): Vertical spacing of the heatmap and alongshore change plots. Default = 0.
bottom (bool): If True (default), rows are extended seaward too, up to y_heat_bottom_limit. If False, only distances from 0 to the first valid values will be added.
y_heat_bottom_limit (int): Lower boundary distance (seaward) to extend all transects to.
transect_spacing (float): Alongshore spacing of transects (m).
outliers (bool): when True, use the specified number of standard deviation to exclude outliers. If False, retain all the points.
sigma_n (int): number of standard deviation to use to exclude outliers (default=3).
"""
sb.set_context("paper", font_scale=font_scale, rc={"lines.linewidth": 0.8})
if isinstance(full_specs_table, bool):
if bool(full_specs_table) == False:
skip_details = True
else:
raise TypeError(
"Not a DataFrame, nor a valid path to the .csv file for the specs table."
)
elif isinstance(full_specs_table, str):
if os.path.isfile(full_specs_table):
table_details = pd.read_csv(full_specs_table)
skip_details = False
else:
TypeError(
"The path provided in full_spec_table is not a valid path to the .csv file."
)
elif isinstance(full_specs_table, pd.DataFrame):
table_details = full_specs_table
skip_details = False
else:
raise TypeError(
"Not a DataFrame, nor a valid path to the .csv file for the specs table."
)
land_limits = pd.DataFrame(
sand_pts.groupby(["location"]).distance.max()
).reset_index()
locations_to_analyse = sand_pts.location.unique()
if bool(from_origin):
xlabel = ""
else:
xlabel = "Transect ID"
if mode == "subset":
locations_to_analyse = location_subset
dt_to_analyse = dt_subset
elif mode == "all":
locations_to_analyse = sand_pts.location.unique()
dt_to_analyse = sand_pts.query(f"location == '{loc}'").dt.unique()
for loc in locations_to_analyse:
temp_loc = sand_pts.query(f"location == '{loc}'")
for dt in dt_to_analyse:
# subset the data
temp = sand_pts.query(f"location == '{loc}' and dt =='{dt}'")
# prepare axes and figure
f, (ax, ax2) = plt.subplots(
nrows=2,
figsize=fig_size,
sharex=True,
gridspec_kw={"hspace": plots_spacing},
)
# _____________data_preparation______________
# compute beach length based on number of transects
beach_length = len(temp.tr_id.unique()) * transect_spacing
# prepare the data to be suitable for the heatmap
data_in = prep_heatmap(temp, lod=lod, outliers=outliers, sigma_n=sigma_n)
if bool(skip_details):
full_loc = loc
lod_i = 0.05
else:
specs = table_details.query(f"location=='{loc}' & dt=='{dt}'")
full_loc = specs.loc_full.values[0]
date_from = specs.date_pre.values[0]
date_to = specs.date_post.values[0]
n_days = specs.n_days.values[0]
if isinstance(lod, pd.DataFrame):
lod_i = np.round(lod.query(f"location == '{loc}' & dt == '{dt}'").lod.values[0],2)
elif isinstance(lod, (float, int)):
lod_i = lod
else:
lod_i = 0.05
# FIGURE_______________________________________________________________________
#
axins = ax.inset_axes(bounds=[1.02, 0, 0.04, 1])
if bool(from_land):
land_lim = land_limits.query(f"location=='{loc}'").distance.values[0]
data_in["m_from_land"] = np.round(land_lim - data_in.index, 1)
data_in.set_index("m_from_land", inplace=True)
else:
pass
if bool(from_origin):
data_in.columns = data_in.columns.astype("int") * transect_spacing
else:
pass
data_in_filled = fill_gaps(data_in, y_heat_bottom_limit, bottom=bottom, spacing=along_transect_sampling_step)
print(f"Working on {loc} at {dt}")
# _______AX__________________________________________________________________
sb.heatmap(
data=data_in_filled,
xticklabels=heat_xticklabels_freq,
facecolor="w",
robust=True,
center=0,
ax=ax,
cbar_kws={"label": "Δh m AHD"},
cbar=True,
cbar_ax=axins,
cmap="seismic_r",
vmin=-0.8,
vmax=0.8,
)
# _________________________________BACKGROUND COLOR AND TRANSPARENCY_____
ax.patch.set_facecolor("grey")
ax.patch.set_alpha(0.4)
# _________________________________AXIS LABELS_____
ax.set_xlabel("")
ax.set_ylabel("Cross. distance (m)")
ax.set_title(f"")
# __________________________________SPINES,TICKS_AND_GRIDS_________________________________
ax.get_xaxis().set_visible(True)
ax.spines["bottom"].set_visible(True)
ax.spines["left"].set_visible(True)
ax.spines["right"].set_visible(True)
ax.spines["top"].set_visible(True)
ax.grid(b=True, which="minor", linewidth=0.3, ls="-")
ax.grid(b=True, which="major", linewidth=0.8, ls="-")
tmp_list_along_dists = np.arange(0, y_heat_bottom_limit + 5, heat_yticklabels_freq)
y_formatter_list_values = tmp_list_along_dists.astype("str").tolist()
y_formatter_list_values[-1] = ""
y_formatter_list_locators = [value * (1/along_transect_sampling_step) for value in tmp_list_along_dists]
y_formatter = FixedFormatter(y_formatter_list_values)
y_locator = FixedLocator(y_formatter_list_locators)
ax.yaxis.set_major_formatter(y_formatter)
ax.yaxis.set_major_locator(y_locator)
ax.yaxis.set_minor_locator(AutoMinorLocator(4))
ax.grid(which="minor", axis="y")
ax.set_ylim(y_heat_bottom_limit * (1/along_transect_sampling_step), 0)
ax.tick_params(axis="x", which="both", length=0)
# __AX2___________________________________________________________________
red_patch = mpatches.Patch(color="orange", label="Erosion")
blue_patch = mpatches.Patch(color="skyblue", label="Deposition")
trs_volumes = data_in.apply(interpol_integrate, axis=0, **{'dx':along_transect_sampling_step})
beach_lengths = data_in.apply(get_beachface_length, axis=0)
m3_m = (trs_volumes * transect_spacing) / beach_lengths
trs_volumes.reset_index().rename({"int_tr": "tr_id"}, axis=1, inplace=True)
trs_volumes.name = "dh"
tr_df = pd.DataFrame(trs_volumes)
tr_df.reset_index(inplace=True)
tr_df["net_volume_change_m3"] = tr_df.dh
tr_df["net_balance_m3_m"] = m3_m.values
tr_df.set_index("int_tr")
tr_df.reset_index(inplace=True)
sb.lineplot(data=tr_df, x="index", y="net_balance_m3_m", ax=ax2, color="k")
ax2.set_ylim(ax2_y_lims)
ax2.yaxis.set_minor_locator(AutoMinorLocator(4))
# FILLS________________________________________
line_x, line_y = ax2.lines[0].get_data()
ax2.fill_between(
line_x,
line_y,
where=(line_y > 0),
interpolate=True,
color="skyblue",
alpha=0.3,
)
ax2.fill_between(
line_x,
line_y,
where=(line_y < 0),
interpolate=True,
color="orange",
alpha=0.3,
)
ax2.grid(b=True, which="minor", linewidth=0.5, ls="-")
ax2.grid(b=True, which="major", linewidth=0.8, ls="-")
ax2.set_xlabel("Along. distance (m)")
ax2.set_ylabel("Net Δv (m³/m)")
ax2.axhline(0, color="k", ls="--", zorder=1)
ax2.set_title(f"")
ax2.spines["top"].set_visible(False)
ax2.spines["right"].set_visible(True)
ax2.spines["left"].set_visible(True)
ax2.spines["bottom"].set_visible(True)
ax2.grid(which="minor", axis="y")
if bool(skip_details) == False:
date_from_str = pd.to_datetime(str(date_from)).strftime("%d %b '%y")
date_to_str = pd.to_datetime(str(date_to)).strftime("%d %b '%y")
f.suptitle(
f"Beachface change in {full_loc} from {date_from_str} to {date_to_str} (LoD = {lod_i} m)"
)
else:
f.suptitle(f"Beachface change in {loc} for {dt} (LoD = {lod_i} m)")
axs = f.get_axes()
f.align_ylabels(axs[:-1])
ax.set_zorder(1)
ax2.set_zorder(0)
if bool(save):
savetxt = save_path + loc + "_" + dt + "_AlongChange" + img_type
f.savefig(savetxt, dpi=dpi, bbox_inches="tight")
else:
pass
plt.show()
if return_data:
return data_in_filled
plot_sensitivity_rbcds_transects(df, location, x_ticks=[0, 2, 4, 6, 8], figsize=(7, 4), tresh_xlims=(0, 8), trans_ylims=(0, 3), sign_ylims=(0, 10))
¶
Plot both the number of valid transects retained (red) and the total sign changes (blue) as a function of the threshold used. This function creates a plot per min_points situation. The solid black line is the 95th percentile of the total valid transect retained while the dashed one is the 85th.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
pd.DataFrame |
Sensitivity dataframe resulting from using the function sensitivity_tr_rbcd(). |
required |
location |
str |
Location code of the locatin to plot. |
required |
x_ticks |
list |
List of x-axis ticks (thresholds). |
[0, 2, 4, 6, 8] |
figsize |
tuple |
Width and height (in inches) of the figure. |
(7, 4) |
tresh_xlims |
tuple |
Min x and max x of the thresholds x-axis. |
(0, 8) |
trans_ylims |
tuple |
Min y and max y of the transect main y-axis. |
(0, 3) |
sign_ylims |
tuple |
. |
(0, 10) |
Source code in sandpyper\common.py
def plot_sensitivity_rbcds_transects(df, location, x_ticks=[0,2,4,6,8],figsize=(7,4),
tresh_xlims=(0,8), trans_ylims=(0,3), sign_ylims=(0,10)):
"""Plot both the number of valid transects retained (red) and the total sign changes (blue) as a function of the threshold used. This function creates a plot per min_points situation. The solid black line is the 95th percentile of the total valid transect retained while the dashed one is the 85th.
Args:
df (pd.DataFrame): Sensitivity dataframe resulting from using the function sensitivity_tr_rbcd().
location (str): Location code of the locatin to plot.
x_ticks (list): List of x-axis ticks (thresholds).
figsize (tuple): Width and height (in inches) of the figure.
tresh_xlims (tuple): Min x and max x of the thresholds x-axis.
trans_ylims (tuple): Min y and max y of the transect main y-axis.
sign_ylims (tuple): .
"""
plt.rcParams['font.sans-serif'] = 'Arial'
plt.rcParams['font.family'] = 'sans-serif'
sb.set_context("paper", font_scale=1)
q_up_val=0.95
q_low_val=0.85
data_in=df.query(f"location == '{location}'")
list_minpts=data_in.min_pts.unique()
trs_res_ar=data_in.groupby(["tr_id","min_pts"])['residual'].apply(np.array).reset_index()
tot_trs=data_in.groupby(["thresh","min_pts"])['geometry'].count().reset_index()
tot_trs['trs_10']=tot_trs.geometry / 10
zero_crossings=pd.DataFrame([pd.Series({'tr_id':trs_res_ar.loc[i,'tr_id'],
'sign_change_thresh':np.where(np.diff(np.sign(trs_res_ar.iloc[i,-1])))[0][-1]+1,
'min_pts':trs_res_ar.loc[i,'min_pts']}) for i in range(trs_res_ar.shape[0]) if np.where(np.diff(np.sign(trs_res_ar.iloc[i,-1])))[0].shape[0] !=0])
tot_jumps=zero_crossings.groupby(["sign_change_thresh","min_pts"]).count().reset_index() # how many jumps per thresh and minpts
joined=pd.merge(tot_trs,tot_jumps, left_on=['thresh','min_pts'], right_on=['sign_change_thresh','min_pts'], how='left')
joined.rename({'geometry':'tot_trs',
'tr_id':'tot_jumps'}, axis=1, inplace=True)
for minpts in list_minpts:
f,ax=plt.subplots(figsize=figsize)
ax2=ax.twinx()
datain=joined.query(f"min_pts=={minpts}")
sb.lineplot(x="thresh", y="tot_jumps",ci=None,
data=datain,color='b',
alpha=.4,linewidth=3,
ax=ax2, label="sign changes")
sb.lineplot(data=datain,x='thresh',y='trs_10',
alpha=.4,color='r',linewidth=3,
ax=ax,label="transects * 10")
kde_x, kde_y = ax.lines[0].get_data()
kde_x2, kde_y2 = ax2.lines[0].get_data()
ax.fill_between(kde_x, kde_y,interpolate=True, color='r',alpha=0.5)
ax2.fill_between(kde_x2, kde_y2,interpolate=True,color='b',alpha=0.5)
ax.axhline((datain.tot_trs.fillna(0).max()*q_up_val)/10,c='k',ls='-',label='95%')
ax.axhline((datain.tot_trs.fillna(0).max()*q_low_val)/10,c='k',lw=2.5,ls='--',label='85%')
ax.set_ylabel('n. transects x 10', c='r')
ax.set_xlabel('t')
ax2.set_ylabel('sign changes', c='b')
ax2.set_ylim(sign_ylims)
ax.set_ylim(trans_ylims)
ax.set_xlim(tresh_xlims)
plt.tight_layout()
ax.get_legend().remove()
ax2.get_legend().remove()
plt.xticks(x_ticks)
ax.set_title(f"pt: {minpts}")
plt.tight_layout()
plot_single_loc(df, loc_subset, figsize, colors_dict, linewidth, out_date_format, xlabel, ylabel, suptitle)
¶
Display Mean Elevation Change (mec, in m) and cumulative mec (since start of the monitoring), for a single location.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
Pandas dataframe |
location-level-volumetrics table obtained from get_state_vol_table function. |
required |
loc_subset |
list |
a list of location codes, in case multiple locations need to be plotted (not optimal). |
required |
figsize |
tuple |
Tuple of float to specify images size. |
required |
colors_dict |
dict |
Dictionary with keys=location code and values=color (in matplotlib specification). |
required |
linewidth |
float, int |
linewidths of the plot lines. |
required |
out_date_format |
str |
format of the dates plotted in the x axis (datetime format). |
required |
xlabel |
str |
labels for x axis. |
required |
ylabel |
str |
labels for y axis. |
required |
suptitle |
|
title of the plot. |
required |
Source code in sandpyper\common.py
def plot_single_loc(
df,
loc_subset,
figsize,
colors_dict,
linewidth,
out_date_format,
xlabel,
ylabel,
suptitle,
):
"""
Display Mean Elevation Change (mec, in m) and cumulative mec (since start of the monitoring), for a single location.
Args:
df (Pandas dataframe): location-level-volumetrics table obtained from get_state_vol_table function.
loc_subset (list): a list of location codes, in case multiple locations need to be plotted (not optimal).
figsize (tuple): Tuple of float to specify images size.
colors_dict (dict): Dictionary with keys=location code and values=color (in matplotlib specification).
linewidth (float, int): linewidths of the plot lines.
out_date_format (str): format of the dates plotted in the x axis (datetime format).
xlabel (str): labels for x axis.
ylabel (str): labels for y axis.
suptitle: title of the plot.
"""
f, ax = plt.subplots(figsize=figsize)
if isinstance(colors_dict, dict):
print("Color dictionary provided.")
color_mode = "dictionary"
elif colors_dict == None:
num_colors = len(loc_subset)
cmapa = plt.cm.get_cmap("tab20c")
cs = [cmapa(i) for i in np.linspace(0, 0.5, num_colors)]
color_mode = "auto"
else:
raise TypeError("Error in specifying color dictionary.")
dataset_in = df.query(f"location in {loc_subset}")
dataset_in["date_from_dt"] = pd.to_datetime(dataset_in.date_from, format="%Y%m%d", dayfirst=False)
dataset_in["date_to_dt"] = pd.to_datetime(dataset_in.date_to, format="%Y%m%d", dayfirst=False)
for i, location in enumerate(dataset_in.location.unique()):
dataset = dataset_in.query(f"location=='{location}'")
if color_mode == "dictionary":
color_i = colors_dict[location]
elif color_mode == "auto":
color_i = cs[i]
# Calculate the cumulative net volumetric change and mean elevation change from start of monitoring
dataset["cum"] = dataset.net_vol_change.cumsum()
dataset["cum_mec"] = dataset.norm_net_change.cumsum()
ax = sb.lineplot(
data=dataset,
x="date_to_dt",
color=color_i,
y="net_vol_change",
label=f"{location}: period",
lw=linewidth,
)
ax = sb.lineplot(
data=dataset,
x="date_to_dt",
color=color_i,
y="cum",
label=f"{location}: cumulative",
linestyle="--",
lw=linewidth,
)
ax = sb.scatterplot(
data=dataset, color=color_i, x="date_to_dt", y="net_vol_change"
)
ax = sb.scatterplot(data=dataset, color=color_i, x="date_to_dt", y="cum")
x_start = np.array([dataset.iloc[0].date_from_dt, dataset.iloc[0].date_to_dt])
y_start = np.array([0, dataset.iloc[0].cum])
ax.plot(x_start, y_start, c=color_i)
ax.scatter(dataset.iloc[0].date_from_dt, 0, c=color_i, marker="*")
# the zero horizontal line
ax.axhline(0, c="k", lw=0.5)
ax.set(
xticks=np.append(dataset_in.date_from_dt.values, dataset_in.date_to_dt.values[-1])
)
ax.xaxis.set_major_formatter(dates.DateFormatter(out_date_format))
plt.xticks(rotation=90)
# the plot x and y axis labels
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
# the title of the plot
f.suptitle(suptitle)
return ax
prep_heatmap(df, lod, outliers=False, sigma_n=3)
¶
Function to create a pivoted and filtered dataframe from multitemporal table of specific period-location combination (i.e. loc= pfa, dt= dt_3). Elevation differences within LoD (uncertain) can be set to zero and outliers can be eliminated. Each column is a transect and each row is a distance value along transect. Values are elevation differences.
Warning
This function is to be used on location-period specific slices of the multitemporal table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
Pandas dataframe |
Location-period specific subset (filtered for a location and a timeperiod) of multitemporal table. |
required |
lod |
path, value, False |
if valid path to an Limit of Detection table, use the table. If a value is provided, use the value across all surveys. If False, do not apply LoD filter. All elevation changes within +- LoD will be set to zero. |
required |
outliers |
|
when True, use the specified number of standard deviation to exclude outliers. If False, retain all the points. |
False |
sigma_n |
int |
number of standard deviation to use to exclude outliers (default=3). |
3 |
Returns:
Type | Description |
---|---|
piv_df (pd.DataFrame) |
Pivoted and clean Pandas Dataframe, ready to be used to plot heatmaps and compute volumetrics. |
Source code in sandpyper\common.py
def prep_heatmap(df, lod, outliers=False, sigma_n=3):
"""
Function to create a pivoted and filtered dataframe from multitemporal table of specific period-location combination (i.e. loc= pfa, dt= dt_3).
Elevation differences within LoD (uncertain) can be set to zero and outliers can be eliminated.
Each column is a transect and each row is a distance value along transect. Values are elevation differences.
Warning:
This function is to be used on location-period specific slices of the multitemporal table.
Args:
df (Pandas dataframe): Location-period specific subset (filtered for a location and a timeperiod) of multitemporal table.
lod (path, value, False): if valid path to an Limit of Detection table, use the table. If a value is provided, use the value across all surveys. If False, do not apply LoD filter. All elevation changes within +- LoD will be set to zero.
outliers: when True, use the specified number of standard deviation to exclude outliers. If False, retain all the points.
sigma_n (int): number of standard deviation to use to exclude outliers (default=3).
Returns:
piv_df (pd.DataFrame): Pivoted and clean Pandas Dataframe, ready to be used to plot heatmaps and compute volumetrics.
"""
# pivoting and sorting
# add an int version of tr_id to sort better
df["int_tr"] = df.tr_id.apply(lambda a: int(float(a)), convert_dtype=True)
df.rename({"distance": "dist"}, axis=1, inplace=True)
df.dist.astype(float)
df.sort_values(by=["int_tr", "dt", "dist"], inplace=True) # sort
df_piv = df.pivot("dist", "int_tr", "dh")
df_piv = df_piv.astype(float)
df_piv.sort_index(axis=1, inplace=True) # sort columns, tr_id
df_piv.sort_index(axis=0, ascending=False, inplace=True) # sort rows, distance
if bool(outliers):
# eliminating outliers
# find threshold for sigma outlier detection
thresh = sigma_n * np.nanstd(df_piv.astype(float).values)
df_piv = df_piv[df_piv < thresh] # select only the values below the threshold
# select only the negative values below the threshold
df_piv = df_piv[-df_piv < thresh]
else:
pass
if isinstance(lod, str):
if os.path.isfile(lod): # if a valid path, ope the table and use it
# applying LoDs
lod_table = pd.read_csv(lod) # read in the lod table
loc = df.location.iloc[0]
dt = df.dt.iloc[0]
lod_i = np.round(lod_table.query(f"location == '{loc}' & dt == '{dt}'").lod.values[0],2)
# create condition (within LoD) to mask the dataframe
cond = (df_piv >= -lod_i) & (df_piv <= lod_i)
# replace the values that satisfied the condition (within LoD) with zeroes
df_piv2 = df_piv.mask(cond, 0)
df_piv2.set_index(df_piv2.index.astype(float), inplace=True)
return df_piv2.sort_index(ascending=False)
else:
raise NameError(
"Not a valid file or path.Please provide a valid path to the LoD table"
)
elif isinstance(
lod, (float, int)
): # if a numeric, use this value across all surveys
lod_i = float(lod)
cond = (df_piv >= -lod_i) & (df_piv <= lod_i)
df_piv2 = df_piv.mask(cond, 0)
df_piv2.set_index(df_piv2.index.astype(float), inplace=True)
return df_piv2.sort_index(ascending=False)
elif isinstance(lod, pd.DataFrame):
lod_table=lod
loc = df.location.iloc[0]
dt = df.dt.iloc[0]
lod_i = np.round(lod_table.query(f"location == '{loc}' & dt == '{dt}'").lod.values[0],2)
# create condition (within LoD) to mask the dataframe
cond = (df_piv >= -lod_i) & (df_piv <= lod_i)
# replace the values that satisfied the condition (within LoD) with zeroes
df_piv2 = df_piv.mask(cond, 0)
df_piv2.set_index(df_piv2.index.astype(float), inplace=True)
return df_piv2.sort_index(ascending=False)
else: # otherwise,don't use it
df_piv2 = df_piv.copy()
df_piv2.set_index(df_piv2.index.astype(float), inplace=True)
return df_piv2.sort_index(ascending=False)
round_special(num, thr)
¶
It rounds the number (num) to its closest fraction of threshold (thr). Useful to space ticks in plots.
Source code in sandpyper\common.py
def round_special(num, thr):
"""It rounds the number (num) to its closest fraction of threshold (thr). Useful to space ticks in plots."""
return round(float(num) / thr) * thr
s2_to_rgb(imin, scaler_range=(0, 255), re_size=False, dtype=False)
¶
Transform image pixels into specified range. Used to obtain 8-bit RGB images.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
imin |
np.array |
image array to be transformed. |
required |
scaler_range |
tuple |
(min,max) tuple with minimum and maximum brightness values. Defaults is 0-255. |
(0, 255) |
re_size |
False, tuple |
if a tuple of size 2 is provided, reshape the transformed array with the provided shape. Default to False. |
False |
dtype |
False, dtype |
optional data type for the transformed array. False, keep the original dtype. |
False |
Returns:
Type | Description |
---|---|
img_array_rgb (np.array) |
Transformed array. |
Source code in sandpyper\common.py
def s2_to_rgb(imin, scaler_range=(0, 255), re_size=False, dtype=False):
"""Transform image pixels into specified range. Used to obtain 8-bit RGB images.
Args:
imin (np.array): image array to be transformed.
scaler_range (tuple): (min,max) tuple with minimum and maximum brightness values. Defaults is 0-255.
re_size (False, tuple): if a tuple of size 2 is provided, reshape the transformed array with the provided shape. Default to False.
dtype (False, dtype): optional data type for the transformed array. False, keep the original dtype.
Returns:
img_array_rgb (np.array): Transformed array.
"""
scaler = MinMaxScaler(scaler_range)
imo = ras.open(imin, "r")
if dtype != False:
im = imo.read(out_dtype=dtype)
else:
im = imo.read()
if isinstance(re_size, (tuple)):
im = resize(
im,
(re_size[0], re_size[1], re_size[2]),
mode="constant",
preserve_range=True,
) # resize image
else:
pass
if imo.count > 1:
im_rgb = np.stack((im[0], im[1], im[2]), axis=-1)
rgb_array_1 = b = im_rgb.reshape(-1, 1)
scaled_1 = scaler.fit_transform(rgb_array_1).astype(int)
img_array_rgb = scaled_1.reshape(im_rgb.shape)
else:
img_array_rgb = im
if isinstance(re_size, (tuple)):
img_array_rgb = resize(
im,
(re_size[0], re_size[1], re_size[2]),
mode="constant",
preserve_range=True,
) # resize image
return img_array_rgb
sensitivity_tr_rbcd(ProfileDynamics, test_thresholds='max', test_min_pts=[0, 10, 2])
¶
Performs a sensitivity analysis of the transect-level r-BCDs values in respect to both the min_points and thresh parameters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ProfileDynamics |
object |
The ProfileDynamics object to perform sensitivity analysis on. |
required |
test_thresholds |
str, list |
If a list is provided, the list must contain the starting thresh value, the last one and the interval between the two, just like numpy.arange function expects, like [0,10,2]. If 'max', then tests on all the available timeperiods. |
'max' |
test_min_pts |
list |
A list containing the starting min_pts value, the last one and the interval between the two, just like numpy.arange function expects, like [0,100,10]. |
[0, 10, 2] |
Returns:
Type | Description |
---|---|
ss_tr_big (pd.DataFrame) |
A dataframe storing the transect r-BCD values of all combinations of thresh and min_points. |
Source code in sandpyper\common.py
def sensitivity_tr_rbcd(ProfileDynamics,
test_thresholds='max',
test_min_pts=[0,10,2]):
"""Performs a sensitivity analysis of the transect-level r-BCDs values in respect to both the min_points and thresh parameters.
Args:
ProfileDynamics (object): The ProfileDynamics object to perform sensitivity analysis on.
test_thresholds (str, list): If a list is provided, the list must contain the starting thresh value, the last one and the interval between the two, just like numpy.arange function expects, like [0,10,2]. If 'max', then tests on all the available timeperiods.
test_min_pts (list): A list containing the starting min_pts value, the last one and the interval between the two, just like numpy.arange function expects, like [0,100,10].
Returns:
ss_tr_big (pd.DataFrame): A dataframe storing the transect r-BCD values of all combinations of thresh and min_points.
"""
df = ProfileDynamics.df_labelled
ss_tr_big=pd.DataFrame()
for loc in df.location.unique():
data_in=df.query(f"location=='{loc}'")
print(f"Working on {loc}.")
if test_thresholds=='max':
range_thresh=range(0,data_in.dt.unique().shape[0]+1)
else:
range_thresh=range(*test_thresholds)
if test_min_pts==None:
range_min_pts=range(0,20,2)
else:
range_min_pts=range(*test_min_pts)
combs = list(itertools.product(range_min_pts,range_thresh))
print(f"A total of {len(combs)} combinations of thresholds and min_pts will be computed.")
for i in tqdm(combs):
print(f"Working on threshold {i[1]} and min points {i[0]}.")
tmp_loc_specs_dict={loc:{'thresh':i[1],
'min_points':i[0]}}
try:
ss_transects_idx = get_rbcd_transect(df_labelled=data_in,
loc_specs=tmp_loc_specs_dict, reliable_action='drop',
dirNameTrans=ProfileDynamics.ProfileSet.dirNameTrans,
labels_order=ProfileDynamics.tags_order,
loc_codes=ProfileDynamics.ProfileSet.loc_codes,
crs_dict_string=ProfileDynamics.ProfileSet.crs_dict_string)
ss_transects_idx['thresh']=i[1]
ss_transects_idx['min_pts']=i[0]
ss_tr_big=pd.concat([ss_tr_big,ss_transects_idx], ignore_index=True)
except:
print("errore")
pass
return ss_tr_big
shoreline_from_prediction(prediction, z, geotransform, min_vertices=2, shape=(64, 64))
¶
Obtain a georeferenced shoreline in a GeoDataFrame from a binary predicted water mask. Credits: adapted from Dr. Robbie Bishop-Taylor functions in Digital Earth Australia scripts, available at: https://github.com/GeoscienceAustralia/dea-notebooks/blob/develop/Scripts/dea_coastaltools.py
Parameters:
Name | Type | Description | Default |
---|---|---|---|
prediction |
array |
array to be thresholded. |
required |
z |
int, float |
threshold value to separate land and water from the prediction array. |
required |
geotransform |
tuple |
uple with Shapely transform parameters. |
required |
min_vertices |
int |
Minimum number of vertices a segment has to have to be retained as a shoreline. Default to 2. |
2 |
shape |
data type object |
Shape of the array. Default to (64,64). |
(64, 64) |
Returns:
Type | Description |
---|---|
|
Geodataframe with georeferenced shoreline. |
Source code in sandpyper\common.py
def shoreline_from_prediction(
prediction, z, geotransform, min_vertices=2, shape=(64, 64)
):
"""Obtain a georeferenced shoreline in a GeoDataFrame from a binary predicted water mask.
Credits: adapted from Dr. Robbie Bishop-Taylor functions in Digital Earth Australia scripts, available at:
https://github.com/GeoscienceAustralia/dea-notebooks/blob/develop/Scripts/dea_coastaltools.py
Args:
prediction (array): array to be thresholded.
z (int, float): threshold value to separate land and water from the prediction array.
geotransform (tuple): uple with Shapely transform parameters.
min_vertices (int): Minimum number of vertices a segment has to have to be retained as a shoreline. Default to 2.
shape (data type object): Shape of the array. Default to (64,64).
Returns:
Geodataframe with georeferenced shoreline.
"""
# get shoreline
shore_arr = contours_to_multiline(
prediction.reshape(shape), z, min_vertices=min_vertices
)
# create geoseries and geodataframe
shore_arr_geoseries = gpd.GeoSeries(shore_arr, name="geometry")
contours_gdf = gpd.GeoDataFrame(shore_arr_geoseries, geometry="geometry")
# georeference line using tile geotransform
contours_gdf["geometry"] = contours_gdf.affine_transform(shapely_affine)
return contours_gdf
split_transects(geom, side='left')
¶
Helper function to split a transect geometry along its centroid, retaining only their left (default) or right side.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
geom |
geometry |
geometry (shapely LineString) of the transect to split. |
required |
side |
str |
side to keep ('left' or 'right'). |
'left' |
Returns:
Type | Description |
---|---|
geometry |
New geometry split. |
Source code in sandpyper\common.py
def split_transects(geom, side="left"):
"""Helper function to split a transect geometry along its centroid, retaining only their left (default) or right side.
args:
geom (geometry): geometry (shapely LineString) of the transect to split.
side (str): side to keep ('left' or 'right').
returns:
geometry: New geometry split.
"""
side_dict = {"left": 0, "right": 1}
snapped = snap(geom, geom.centroid, 0.001)
result = split(snapped, geom.centroid)
return result[side_dict[side]]
test_format(filename, loc_search_dict)
¶
It returns True if the filename matches the required format (regx) or False if it doesn't.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
filename to test, of the type "Seaspray_22_Oct_2020_GeoTIFF_DSM_GDA94_MGA_zone_55.tiff". |
required |
loc_search_dict |
dict |
a dictionary where keys are the location codes and values are lists containing the expected full location string (["Warrnambool", "warrnambool","warrny"]). |
required |
Returns:
Type | Description |
---|---|
bool (bool) |
True if the filename matches the required format (regx) or False if it doesn't. |
Source code in sandpyper\common.py
def test_format(filename, loc_search_dict):
"""
It returns True if the filename matches the required format (regx) or False if it doesn't.
Args:
filename (str): filename to test, of the type "Seaspray_22_Oct_2020_GeoTIFF_DSM_GDA94_MGA_zone_55.tiff".
loc_search_dict (dict): a dictionary where keys are the location codes and values are lists containing the expected full location string (["Warrnambool", "warrnambool","warrny"]).
Returns:
bool (bool): True if the filename matches the required format (regx) or False if it doesn't.
"""
re_list_loc = "|".join(loc_search_dict.keys())
regx = rf"\d{{8}}_({re_list_loc})_(ortho|dsm)\.(tiff|tif)"
try:
re.search(regx, filename).group()
return True
except:
return False
tidal_correction(shoreline, cs_shores, gdf, baseline_folder, crs_dict_string, limit_correction, mode, alongshore_resolution, slope_value='median', side='both', tick_length=200, subset_loc=None, limit_vertex=1, baseline_threshold='infer', replace_slope_outliers=True, save_trs_details=False, trs_details_folder=None, gdf_date_field='raw_date', distance_field='distance', date_field='raw_date', toe_field='toe')
¶
Simple tidal correction for input shorelines. It can automatically extract subaerial beachfaces and more.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
shoreline |
gpd.GeoDataFrame |
The geodataframe storing points along a shoreline. |
required |
cs_shores |
gpd.GeoDataFrame |
The width and height of each single tile of the grid, given in the CRS unit (use projected CRS). |
required |
gdf |
gpd.GeoDataFrame |
Coordinate Reference System in the dictionary format (example: {'init' :'epsg:4326'}) |
required |
baseline_folder |
str |
Path to the folder storing the baseline Geopackages (.gpkgs). |
required |
crs_dict_string |
dict |
Dictionary storing location codes as key and crs information as EPSG code (int). |
required |
limit_correction |
bool |
If True, only use beachface slopes to compute the statistic for tidal correction. When False, retain the full transects to compute the slope statistics to correct shorelines. When a slope value is provided, it automatically sets to False. Defaults to True. |
required |
mode |
str |
If 'sat', use satellite shorelines as seaward edge to classify beachfaces. If 'gt', use groundthruth shorelines instead. |
required |
alongshore_resolution |
str, float |
The alongshore spacing between transects, in the unit of measure of the location CRS. If 'infer', use the gdf file to detect the spacing with 10cm precision. If the transects spacing is less than 10cm, set the spacing manually. Note: It also smoothes the original line if this value is greater of the original line vertex spacing. |
required |
slope_value |
int,float,'mean','median','min','max' |
If a numeric value is provided (assumed to be in degrees), use it to correct the shorelines. If one of 'mean','median','min','max', use this statistics instead. It also computes range, standard deviation and variance for analytical purposes, despite should not be used to correct shorelines. |
'median' |
side |
str, 'left', 'right', 'both' |
Whether if retain only the left, right or both sides of the transects once created. Defaults to 'both'. |
'both' |
tick_length |
int, float |
Across-shore length of each transect in the unit of measure of the location CRS. |
200 |
limit_vertex |
int |
Sets the minimum number of consecutive transect ids to create one segment of the corrected shoreline. Defaults to 1. |
1 |
baseline_threshold |
"infer", float, None |
If a number is provided, it sets the minimum distance the original un-corrected and the tidal-corrected shorelines must be in order to consider the correction valid. If the distance between the original the tidal-corrected shorelines at one point is less than this value, then the original shoreline is retained. If it is above, then the tidal-corrected value is retained. This is done to avoid to correct areas where an artificial shoreline occurs (seawalls or harbours). If "infer", then the Otsu method is used to find this threshold value. This option works where artificial shorelines are present. If None, do not use this option. Default to "infer". |
'infer' |
replace_slope_outliers |
bool |
If True (default), replace the values of the defined slope statistics (slope_value parameter) with its survey-level median. |
True |
save_trs_details |
bool |
True to save a .CSV file with transect-specific information. It defaults to False. |
False |
trs_details_folder |
str |
Folder where to save the transect details. Defaults to None. |
None |
gdf_date_field |
str |
Date field of the slope geodataframe used to correct the shoreline.Defaults to "raw_date". |
'raw_date' |
distance_field |
str |
Field storing the distance values. Default to "distance". |
'distance' |
date_field |
str |
Field storing the survey date values. Default to "raw_date". |
'raw_date' |
toe_field |
str |
Field storing the toe distances values. Default to "toe". |
'toe' |
Returns:
Type | Description |
---|---|
corr_shorelines (gpd.GeoDataFrame) |
GeoDataFrame containing both the original shorelines and the corrected ones, stored in two different geometry fields. |
Source code in sandpyper\common.py
def tidal_correction(
shoreline,
cs_shores,
gdf,
baseline_folder,
crs_dict_string,
limit_correction,
mode,
alongshore_resolution,
slope_value="median",
side="both",
tick_length=200,
subset_loc=None,
limit_vertex=1,
baseline_threshold="infer",
replace_slope_outliers=True,
save_trs_details=False,
trs_details_folder=None,
gdf_date_field="raw_date",
distance_field="distance",
date_field="raw_date", # of transect geodataframe
toe_field="toe",
):
"""
Simple tidal correction for input shorelines. It can automatically extract subaerial beachfaces and more.
Args:
shoreline (gpd.GeoDataFrame): The geodataframe storing points along a shoreline.
cs_shores (gpd.GeoDataFrame): The width and height of each single tile of the grid, given in the CRS unit (use projected CRS).
gdf (gpd.GeoDataFrame): Coordinate Reference System in the dictionary format (example: {'init' :'epsg:4326'})
baseline_folder (str): Path to the folder storing the baseline Geopackages (.gpkgs).
crs_dict_string (dict): Dictionary storing location codes as key and crs information as EPSG code (int).
limit_correction (bool): If True, only use beachface slopes to compute the statistic for tidal correction. When False, retain the full transects to compute the slope statistics to correct shorelines. When a slope value is provided, it automatically sets to False. Defaults to True.
mode (str): If 'sat', use satellite shorelines as seaward edge to classify beachfaces. If 'gt', use groundthruth shorelines instead.
alongshore_resolution (str, float): The alongshore spacing between transects, in the unit of measure of the location CRS. If 'infer', use the gdf file to detect the spacing with 10cm precision. If the transects spacing is less than 10cm, set the spacing manually. Note: It also smoothes the original line if this value is greater of the original line vertex spacing.
slope_value (int,float,'mean','median','min','max'): If a numeric value is provided (assumed to be in degrees), use it to correct the shorelines. If one of 'mean','median','min','max', use this statistics instead. It also computes range, standard deviation and variance for analytical purposes, despite should not be used to correct shorelines.
side (str, 'left', 'right', 'both'): Whether if retain only the left, right or both sides of the transects once created. Defaults to 'both'.
tick_length (int, float): Across-shore length of each transect in the unit of measure of the location CRS.
subset_loc (list). List of string of location codes to limit the correction. Default to None.
limit_vertex (int): Sets the minimum number of consecutive transect ids to create one segment of the corrected shoreline. Defaults to 1.
baseline_threshold ("infer", float, None): If a number is provided, it sets the minimum distance the original un-corrected and the tidal-corrected shorelines must be in order to consider the correction valid. If the distance between the original the tidal-corrected shorelines at one point is less than this value, then the original shoreline is retained. If it is above, then the tidal-corrected value is retained. This is done to avoid to correct areas where an artificial shoreline occurs (seawalls or harbours). If "infer", then the Otsu method is used to find this threshold value. This option works where artificial shorelines are present. If None, do not use this option. Default to "infer".
replace_slope_outliers (bool): If True (default), replace the values of the defined slope statistics (slope_value parameter) with its survey-level median.
save_trs_details (bool): True to save a .CSV file with transect-specific information. It defaults to False.
trs_details_folder (str): Folder where to save the transect details. Defaults to None.
gdf_date_field (str): Date field of the slope geodataframe used to correct the shoreline.Defaults to "raw_date".
distance_field (str): Field storing the distance values. Default to "distance".
date_field (str): Field storing the survey date values. Default to "raw_date".
toe_field (str): Field storing the toe distances values. Default to "toe".
Returns:
corr_shorelines (gpd.GeoDataFrame): GeoDataFrame containing both the original shorelines and the corrected ones, stored in two different geometry fields.
"""
if isinstance(slope_value, (int, float)):
limit_correction = False
extract_slope = False
print(
f"User-defined slope value of {slope_value} degrees will be used to correct shorelines. No beachfaces extracted."
)
else:
limit_correction = True
extract_slope = True
if limit_correction:
limited = "limited"
else:
limited = "notlimited"
if "water_index" in shoreline.columns:
dataset_type = "uavs_hores"
else:
dataset_type = "satellite_shores"
grouping_fields = set(["water_index", "thr_type", "raw_date", "location"])
cols = set(shoreline.columns.values)
group_by_fields = list(cols.intersection(grouping_fields))
# __________________ TIDAL CORRECTION_____________________________________
big_final = pd.DataFrame()
list_locations = gdf.location.unique() # list the locations in the GDF file
for location in list_locations: # work iteratively on each location
print(f"working on {location}")
if isinstance(alongshore_resolution, (int, float)):
print(
f"Transect spacing set manually (alongshore resolution) = {alongshore_resolution} ."
)
elif alongshore_resolution == "infer":
alongshore_resolution = infer_along_trs_spacing(gdf)
print(
f"Transect spacing (alongshore resolution) inferred = {alongshore_resolution} ."
)
else:
raise NameError(
"Alongshore resolution must be either a float, int, or 'infer'."
)
# get dates of location
list_dates = gdf.query(f"location=='{location}'").raw_date.unique()
shores_to_corr = shoreline.query(
f"location=='{location}' & raw_date in @list_dates"
) # get shores to correct
gt_shores = cs_shores.query(
f"location=='{location}' & raw_date in @list_dates"
) # get groudtruths shores
crs = crs_dict_string[loc] # get crs of location
if shores_to_corr.crs != crs:
shores_to_corr = shores_to_corr.to_crs(crs)
else:
pass
if gt_shores.crs != crs:
gt_shores = gt_shores.to_crs(crs)
else:
pass
# read baseline as a GeoDataFrame
try:
baseline_location_path = glob.glob(f"{baseline_folder}/{location}*.gpkg")[0]
except BaseException:
raise NameError("Baseline file not found.")
baseline = gpd.read_file(baseline_location_path, crs=crs)
# create transects # same parameter as the slope extraction. TO DO: if
# exists, use existing data.
transects = create_transects(
baseline=baseline,
side=side,
sampling_step=alongshore_resolution,
tick_length=tick_length,
location=location,
crs=crs,
)
# 1)_____________________________________________CREATE THE BASELINE POINT
# create a geodataframe of transects starting points to compute distance from
df_tmp = pd.DataFrame.from_dict(
{
"geometry": [
Point(tr_geom.coords[0][0], tr_geom.coords[0][1])
for tr_geom in transects.geometry
],
"tr_id": transects.tr_id,
"location": location,
}
)
baseline_pts = gpd.GeoDataFrame(df_tmp, geometry="geometry", crs=crs)
# loop through all transects in each satellite shoreline to compute
# distances from the baseline points
print("Computing cross-shore distances of satellite shorelines.")
orig_shore_base_distances = pd.DataFrame()
for i in tqdm(range(shores_to_corr.shape[0])):
# if "water_index" column is present, we will need to group based on water
# indices and threshold types too
original_shore = shores_to_corr.iloc[[i]][[*group_by_fields, "geometry"]]
original_shore_pts = extract_shore_pts(
transects, original_shore, date_field="raw_date", crs=crs
)
matched = pd.merge(
original_shore_pts,
baseline_pts,
how="left",
on="tr_id",
suffixes=("_pt_on_shore", "_pt_on_base"),
)
dists = []
for shore_pt, base_pt in zip(
matched.geometry_pt_on_shore, matched.geometry_pt_on_base
):
try:
dists.append(shore_pt.distance(base_pt))
except BaseException:
dists.append(np.nan)
matched["sat_from_baseline"] = dists
matched["raw_date"] = original_shore.raw_date.values[0]
if "water_index" in shoreline.columns:
matched["water_index"] = original_shore.water_index.values[0]
matched["thr_type"] = original_shore.thr_type.values[0]
else:
pass
orig_shore_base_distances = pd.concat(
[
matched.dropna(subset=["sat_from_baseline"]),
orig_shore_base_distances,
],
ignore_index=True,
)
# update transects with the baseline shore distances info
orig_shore_base_distances = pd.merge(
orig_shore_base_distances,
transects[["tr_id", "geometry"]],
on="tr_id",
how="left",
)
shores_to_corr.raw_date.astype(int)
# loop through all transects in each groundthruth shoreline to compute
# distances from the baseline points
print("Computing cross-shore distances of UAV shorelines.")
if mode == "gt": # we will use these points to limit the beachface extraction
gt_base_distances = pd.DataFrame()
for shoreline_i in tqdm(range(gt_shores.shape[0])):
shore_i = gt_shores.iloc[[shoreline_i]][
["location", "raw_date", "geometry"]
]
shore_pts_gt = extract_shore_pts(
transects, shore_i, date_field="raw_date"
)
matched = pd.merge(
shore_pts_gt,
baseline_pts,
how="left",
on="tr_id",
suffixes=("_gt_on_shore", "_gt_on_base"),
)
dists = []
for shore_pt, base_pt in zip(
matched.geometry_gt_on_shore, matched.geometry_gt_on_base
):
try:
dists.append(shore_pt.distance(base_pt))
except BaseException:
dists.append(np.nan)
matched["gt_from_baseline"] = dists
matched["raw_date"] = shore_i.raw_date.values[0]
gt_base_distances = pd.concat(
[matched.dropna(subset=["gt_from_baseline"]), gt_base_distances],
ignore_index=True,
)
gt_base_distances = pd.merge(
gt_base_distances,
transects[["tr_id", "geometry"]],
on="tr_id",
how="left",
)
else:
pass
# _______ EXTRACT BEACHFACE BASED ON SHORELINE POSITION AND DUNE TOE______
if bool(limit_correction):
print(f"Extracting dune toes from slope profiles.")
gdf_loc = gdf.query(f"location=='{location}'")
toes_cand = toes_candidates(gdf_loc, date_field="raw_date")
if mode == "gt":
baseline_distances_in = gt_base_distances
baseline_field = "gt_from_baseline"
txt = "UAV-derived"
base_dist_toes = pd.merge(
baseline_distances_in,
toes_cand.reset_index(),
on=["location", "raw_date", "tr_id"],
how="left",
)
else:
baseline_distances_in = orig_shore_base_distances
baseline_field = "sat_from_baseline"
txt = "satellite-derived"
base_dist_toes = pd.merge(
baseline_distances_in,
toes_cand.reset_index(),
on=["location", "raw_date", "tr_id"],
how="left",
)
toes = []
for t, d in zip(
base_dist_toes.loc[:, "toe_distances"],
base_dist_toes.loc[:, baseline_field],
):
try:
toes.append(t[np.where(t <= d)][-1])
except BaseException:
toes.append(np.nan)
base_dist_toes["toe"] = toes
print(
f"Classifying beachfaces as going from {txt} shorelines to dune toes."
)
# preprocessing
if mode == "gt":
tmp_lookup = (
base_dist_toes.groupby(["location", "raw_date", "tr_id"])[
toe_field, baseline_field
]
.first()
.reset_index()
)
tmp_merge = pd.merge(
tmp_lookup,
gdf_loc,
how="right",
on=["location", "raw_date", "tr_id"],
)
tmp_merge = tmp_merge.dropna(subset=[baseline_field])[
[
"distance",
baseline_field,
"location",
"raw_date",
"tr_id",
"toe",
"slope",
]
]
else:
tmp_lookup = (
base_dist_toes.groupby([*group_by_fields, "tr_id"])[
toe_field, baseline_field
]
.first()
.reset_index()
)
tmp_merge = pd.merge(
tmp_lookup,
gdf_loc,
how="right",
on=["location", "raw_date", "tr_id"],
)
tmp_merge = tmp_merge.dropna(subset=[baseline_field])[
[
"distance",
baseline_field,
"location",
"raw_date",
"tr_id",
"toe",
"slope",
]
]
# conditions
tmp_merge["beachface"] = [
"bf"
if tmp_merge.loc[i, distance_field] >= tmp_merge.loc[i, toe_field]
and tmp_merge.loc[i, distance_field] <= tmp_merge.loc[i, baseline_field]
else "land"
if tmp_merge.loc[i, distance_field] < tmp_merge.loc[i, toe_field]
else "water"
if tmp_merge.loc[i, distance_field] > tmp_merge.loc[i, toe_field]
else np.nan
for i in range(tmp_merge.shape[0])
]
else: # if we do not want to limit the correction to the beachface statistics, but we want to use the full transect stats
merged_tmp = pd.merge(
shores_to_corr,
orig_shore_base_distances,
on=[*group_by_fields],
how="right",
suffixes=("_shore", "_tr"),
)
if isinstance(slope_value, str):
if bool(limit_correction):
df_in = tmp_merge.query("beachface=='bf'")
else:
df_in = pd.merge(
merged_tmp.astype({"tr_id": int, "raw_date": int, "location": str}),
gdf_loc.astype({"tr_id": int, "raw_date": int, "location": str})[
["location", "raw_date", "tr_id", "slope"]
],
how="right",
on=["location", "raw_date", "tr_id"],
)
ops_dict = {
"mean": np.nanmean,
"median": np.nanmedian,
"min": np.min,
"max": np.max,
"range": np.ptp,
"std": np.nanstd,
"var": np.nanvar,
}
stats = pd.DataFrame()
for i in ops_dict.keys():
tmp = pd.Series(
df_in.groupby(["raw_date", "tr_id"]).slope.apply(ops_dict[i]),
name=f"{i}_beachface_slope",
)
stats = pd.concat([stats, tmp], axis=1)
stats = stats.set_index(
pd.MultiIndex.from_tuples(stats.index, names=("raw_date", "tr_id"))
)
stats["location"] = location
slope_field = [
stat for stat in stats.columns if stat.startswith(slope_value)
][0]
if bool(limit_correction):
print(f"Using {slope_field} of beachfaces to correct shorelines.")
else:
print(f"Using full transect {slope_field} to correct shorelines.")
stats = stats.reset_index()
stats["raw_date"] = [int(i) for i in stats.raw_date]
orig_shore_trs_stats = pd.merge(
orig_shore_base_distances,
stats,
on=["tr_id", "location", "raw_date"],
how="left",
)
if bool(replace_slope_outliers):
# create temp dataframes to store survey-level slope values stats and
# derive 3 sigmas thresholds per date
survey_slope_field_stat = (
stats.groupby(["raw_date"])[slope_field].describe().reset_index()
)
survey_slope_field_stat["sigmas_3"] = (
survey_slope_field_stat.loc[:, "50%"] * 3
)
orig_shore_trs_stats = pd.merge(
orig_shore_trs_stats,
survey_slope_field_stat[["raw_date", "sigmas_3", "50%"]],
on=["raw_date"],
how="left",
)
# add the field 'outlier' and mark slope_values exceeding 3 std.
orig_shore_trs_stats.loc[:, "outlier"] = np.where(
orig_shore_trs_stats[slope_field]
> orig_shore_trs_stats["sigmas_3"],
True,
False,
)
# replace outliers with median survey values.
orig_shore_trs_stats.loc[:, slope_field] = np.where(
orig_shore_trs_stats[slope_field]
> orig_shore_trs_stats["sigmas_3"],
orig_shore_trs_stats["50%"],
orig_shore_trs_stats[slope_field],
)
else:
pass
correction_series = orig_shore_trs_stats.loc[:, slope_field]
elif isinstance(slope_value, (int, float)):
print(f"Using user-provided slope of {slope_value} to correct shorelines.")
orig_shore_trs_stats = orig_shore_base_distances.assign(
fixed_slope=slope_value
)
correction_series = orig_shore_trs_stats.fixed_slope
# add tide height
orig_shore_trs_stat_tide = pd.merge(
orig_shore_trs_stats,
shores_to_corr[[*group_by_fields, "tide_height"]],
on=[*group_by_fields],
how="left",
)
orig_shore_trs_stat_tide["corr_dist"] = corr_baseline_distance(
orig_shore_trs_stat_tide.sat_from_baseline,
correction_series,
orig_shore_trs_stat_tide.tide_height,
)
orig_shore_trs_stat_tide["diff_corr"] = (
orig_shore_trs_stat_tide.sat_from_baseline
- orig_shore_trs_stat_tide.corr_dist
)
orig_shore_corr_dist_gdf = gpd.GeoDataFrame(
orig_shore_trs_stat_tide, geometry="geometry", crs=crs
)
# _______ Apply threshold to correction, below this threshold, it is not c
diffs = orig_shore_corr_dist_gdf.diff_corr.values
corrs = orig_shore_corr_dist_gdf.corr_dist.values
originals = orig_shore_corr_dist_gdf.sat_from_baseline.values
# whether to correct or not shorelines closer to this threshold
if baseline_threshold == "infer":
baseline_threshold_value = np.round(
threshold_multiotsu(
orig_shore_corr_dist_gdf.diff_corr.dropna(), classes=2
)[0],
2,
)
print(f"Inferred baseline_threshold of: {baseline_threshold_value} meters")
elif isinstance(baseline_threshold, (int, float)):
baseline_threshold_value = baseline_threshold
print(f" Baseline distance threshold of: {baseline_threshold_value} meters")
elif baseline_threshold is None:
pass
else:
raise ValueError(
"Baseline_threshold must be either 'infer' or a numeric value."
)
updated_corr_dists = [
c if d > baseline_threshold_value else o
for d, c, o in zip(diffs, corrs, originals)
]
orig_shore_corr_dist_gdf["updated_corr_dists"] = updated_corr_dists
if "slope_field" in locals():
a = orig_shore_corr_dist_gdf.dropna(subset=[slope_field])[
[
*group_by_fields,
"tr_id",
"sat_from_baseline",
"updated_corr_dists",
"geometry",
]
]
else:
a = orig_shore_corr_dist_gdf.dropna()[
[
*group_by_fields,
"tr_id",
"sat_from_baseline",
"updated_corr_dists",
"geometry",
]
]
a["updated_corr_dists"] = [
a.iloc[i].sat_from_baseline
if a.iloc[i].updated_corr_dists < 0
else a.iloc[i].updated_corr_dists
for i in range(a.shape[0])
]
b = a.groupby([*group_by_fields])["tr_id"].apply(np.array).reset_index()
b["updated_corr_dists"] = (
a.groupby([*group_by_fields])["updated_corr_dists"]
.apply(np.array)
.reset_index()["updated_corr_dists"]
)
b["tr_geometries"] = (
a.groupby([*group_by_fields])["geometry"]
.apply(np.array)
.reset_index()["geometry"]
)
# 5)_________________ Lines creation_________________________________________
lines = []
for i, row in b.iterrows():
groups = consecutive_ids(data=row.tr_id, limit=limit_vertex)
if len(groups) > 0:
lines_group = []
for j, group in enumerate(groups):
trs = row.tr_geometries[group]
dis = row.updated_corr_dists[group]
line_pts = []
for distance, transect in zip(dis, trs):
point = transect.interpolate(distance)
line_pts.append(point)
line = LineString(line_pts)
lines_group.append(line)
if len(lines_group) > 1:
lines_row = unary_union(lines_group)
else:
lines_row = lines_group[0]
lines.append(lines_row)
else:
print(
f"WARNING:\n\
{row.location}_{row.raw_date}_{row.water_index}_{row.thr_type} has {len(groups)} groups. Skipped."
)
lines.append(np.nan)
lines = gpd.geoseries.GeoSeries(lines, crs=crs)
b["corr_shore_geometry"] = lines.geometry
final = pd.merge(shores_to_corr, b, on=[*group_by_fields], how="left")
# 6)_________________ saving outputs________________________________________
if bool(save_trs_details):
trs_details_file_name = (
f"trsdetails_{location}_{slope_value}_{limited}_{mode}.csv"
)
trs_details_out_path = os.path.join(
trs_details_folder, trs_details_file_name
)
orig_shore_corr_dist_gdf.to_csv(trs_details_out_path, index=False)
print(f"File {trs_details_file_name} saving in {trs_details_folder}.")
else:
pass
big_final = pd.concat([big_final, final], ignore_index=True)
print(f"Done with {location}.")
return big_final
tiles_from_grid(grid, img_path, output_path, list_loc_codes, crs_dict_string, mode='rgb', sel_bands=None, driver='PNG', geotransform=False)
¶
Returns a dataframe with location, raw_date, filenames (paths) or geopackage index and CRS of each raster and its associated vector files. If the directory containing the vector files has only one file, it is assumed that this file stores vectors with location and raw_date columns.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
grid |
GeoDataFrame |
GeoDataFrame of the grid of only tiles containing the line. Output of grid_from_shore function. |
required |
img_path |
str |
Path of the directory containing the geotiffs datasets (.tiff or .tif). |
required |
output_path |
str |
Path of the directory where to save the images tiles. |
required |
list_loc_codes |
list |
list of strings containing location codes. |
required |
mode |
str,'rgb','mask','multi','custom' |
'rgb', the output images are 3-channels RGB tiles. 'mask', 1-band output tiles. |
'rgb' |
(NOTE |
|
in 'custom' mode, output tile bands indices are reindexd, so do not corresponds with the original band indices, but restart from 1). |
required |
geotransform |
bool or 'only' |
If True, save tiles and also return a dictionary with the geotransform of each grid. |
False |
sel_bands |
list |
list of integers (minimum is 1, not zero-indexed) corresponding to the bands to be used to create the tiles. Only used with mode='custom'. |
None |
driver |
str, "GTiff" or "PNG" |
tiles image file type. Default is "PNG". |
'PNG' |
Returns:
Type | Description |
---|---|
|
Saves tiles to the specified output folder and optionally return the tiles geotransform dictionary. |
Source code in sandpyper\common.py
def tiles_from_grid(
grid,
img_path,
output_path,
list_loc_codes,
crs_dict_string,
mode="rgb",
sel_bands=None,
driver="PNG",
geotransform=False
):
"""
Returns a dataframe with location, raw_date, filenames (paths) or geopackage index and CRS of each raster and its associated vector files.
If the directory containing the vector files has only one file, it is assumed that this file stores vectors
with location and raw_date columns.
Args:
grid (GeoDataFrame): GeoDataFrame of the grid of only tiles containing the line. Output of grid_from_shore function.
img_path (str): Path of the directory containing the geotiffs datasets (.tiff or .tif).
output_path (str): Path of the directory where to save the images tiles.
list_loc_codes (list): list of strings containing location codes.
mode (str,'rgb','mask','multi','custom'): 'rgb', the output images are 3-channels RGB tiles. 'mask', 1-band output tiles.
'multi', multibands output tiles (with all input bands).
'custom', use selected band indices (with sel_bands parameter) to only extract those bands from input multiband images
(NOTE: in 'custom' mode, output tile bands indices are reindexd, so do not corresponds with the original band indices, but restart from 1).
geotransform (bool or 'only'): If True, save tiles and also return a dictionary with the geotransform of each grid.
If False, save tiles without geotransform dictionary. If "only", do not save tiles but return the geotransform dictionary only.
sel_bands (list): list of integers (minimum is 1, not zero-indexed) corresponding to the bands to be used to create the tiles. Only used with mode='custom'.
Default is None.
driver (str, "GTiff" or "PNG"): tiles image file type. Default is "PNG".
Returns:
Saves tiles to the specified output folder and optionally return the tiles geotransform dictionary.
"""
loc = getLoc(img_path, list_loc_codes)
crs = crs_dict_string[loc]
if driver == "PNG":
ext = "png"
elif driver == "GTiff":
ext = "tif"
else:
raise NameError("Driver must be either 'PNG' or 'GTiff'.")
with ras.open(img_path, "r") as dataset:
if mode == "rgb":
count = 3
source_idx = [1, 2, 3] # the band indices of the source image
out_idx = source_idx
sel_bands = None
height_idx = 1
width_idx = 2
elif mode == "multi":
if driver == "PNG":
print(
"NOTE: PNG format doesn't support multibands. Returning GeoTiffs instead."
)
driver = "GTiff"
else:
pass
sel_bands = None
count = dataset.count
source_idx = list(dataset.indexes)
out_idx = None
height_idx = 1
width_idx = 2
elif mode == "mask":
count = 1
source_idx = 1
out_idx = 1
height_idx = 0
width_idx = 1
elif mode == "custom":
if len(sel_bands) > 3 and driver == "PNG":
print(
"NOTE: More than 3 bands selected for the creation of PNG images. PNG format doesn't support multibands. Returning GeoTiffs instead."
)
driver = "GTiff"
else:
pass
source_idx = sel_bands
out_idx = None
count = len(sel_bands)
height_idx = 1
width_idx = 2
# creates gereferenced bounding box of the image
geom_im = gpd.GeoSeries(box(*dataset.bounds), crs=crs_dict_string[loc] )
# evaluates which tiles are fully within the raster bounds
fully_contains = [
geom_im.geometry.contains(mask_geom)[0] for mask_geom in grid.geometry
]
# get the expected shape of fully contained tiles
full_in_geom = grid[fully_contains].iloc[[0]]["geometry"]
geom_wdw = geometry_window(dataset, full_in_geom)
expected_shape = (geom_wdw.height, geom_wdw.width)
print(f"Expected shape{expected_shape}")
for i, row in grid.iterrows():
tile_name = f"{loc}_{row.grid_id}_{getDate(img_path)}_{mode}.{ext}"
geom = Polygon( # create the square polygons to clip the raster with
(
(row.ulx, row.uly),
(row.urx, row.ury),
(row.lrx, row.lry),
(row.llx, row.lly),
(row.ulx, row.uly),
)
)
# get the future shape of the tile which is about to get created
geom_wdw = geometry_window(dataset, [mapping(geom)])
tile_shape = (geom_wdw.height, geom_wdw.width)
if tile_shape == expected_shape:
tile_to_disk(
dataset=dataset,
geom=geom,
crs=crs,
tile_name=tile_name,
output_path=output_path,
nodata=0,
source_idx=source_idx,
height_idx=height_idx,
width_idx=width_idx,
out_idx=out_idx,
count=count,
driver=driver,
geotransform=geotransform,
)
else:
partial_tile_padding(
dataset=dataset,
expected_shape=expected_shape,
crs=crs,
geom=geom,
tile_name=tile_name,
output_path=output_path,
nodata=0,
source_idx=source_idx,
height_idx=height_idx,
width_idx=width_idx,
out_idx=out_idx,
count=count,
driver=driver,
geotransform=geotransform,
)
toes_candidates(df, location_field='location', date_field='raw_date', tr_id_field='tr_id', distance_field='distance', slope_field='slope', sigma=0, peak_height=30)
¶
Dataframe implementation of the toes_from_slope function. Returns dune toe distance (from transect origin) by extracting peaks higher of a given value from a Gaussian filtered slope profile. It can return multiple candidates when multiple peaks are found. These will be used to clip beachfaces, defined as going from the swash line to dune toes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
pd.DataFrame |
Dataframe containing the slope profiles. |
required |
location_field |
str |
Field where the location code is stored. Default to "location". |
'location' |
date_field |
str |
Field where the survey date is stored. Default to "raw_date". |
'raw_date' |
tr_id_field |
str |
Field where the transect ID is stored. Default to "tr_id". |
'tr_id' |
distance_field |
str |
Field where the distance is stored. Default to "distance". |
'distance' |
slope_field |
str |
Field where the slope is stored. Default to "slope". |
'slope' |
sigma |
int |
Number of standard deviations sued in the Gaussian smoothing filter. |
0 |
peak_height |
int |
Threshold to use to define a peak in the smoothed slope profile. |
30 |
Returns:
Type | Description |
---|---|
df_formatted (pd.DataFrame) |
Candidate distances of the each slope profile. |
Source code in sandpyper\common.py
def toes_candidates(
df,
location_field="location",
date_field="raw_date",
tr_id_field="tr_id",
distance_field="distance",
slope_field="slope",
sigma=0,
peak_height=30,
):
"""Dataframe implementation of the toes_from_slope function.
Returns dune toe distance (from transect origin) by extracting peaks higher of a given value
from a Gaussian filtered slope profile. It can return multiple candidates when multiple peaks are found.
These will be used to clip beachfaces, defined as going from the swash line to dune toes.
Args:
df (pd.DataFrame): Dataframe containing the slope profiles.
location_field (str): Field where the location code is stored. Default to "location".
date_field (str): Field where the survey date is stored. Default to "raw_date".
tr_id_field (str): Field where the transect ID is stored. Default to "tr_id".
distance_field (str): Field where the distance is stored. Default to "distance".
slope_field (str): Field where the slope is stored. Default to "slope".
sigma (int): Number of standard deviations sued in the Gaussian smoothing filter.
peak_height (int): Threshold to use to define a peak in the smoothed slope profile.
Returns:
df_formatted (pd.DataFrame): Candidate distances of the each slope profile."""
apply_dict = {
"distance_field": distance_field,
"slope_field": slope_field,
"sigma": sigma,
"peak_height": peak_height,
}
dist_toe_ = df.groupby([location_field, date_field, tr_id_field]).apply(
toes_from_slopes, **apply_dict
)
dist_toe_df = pd.DataFrame(pd.Series(dist_toe_, name="toe_distances"))
df_formatted = pd.DataFrame(
dist_toe_df.reset_index()
.groupby([location_field, date_field, tr_id_field])["toe_distances"]
.apply(np.array)
)
return df_formatted
toes_from_slopes(series, distance_field='distance', slope_field='slope', sigma=0, peak_height=30)
¶
Returns dune toe distance (from transect origin) by extracting peaks higher of a given value from a Gaussian filtered slope profile. It can return multiple candidates when multiple peaks are found. These will be used to clip beachfaces, defined as going from the swash line to dune toes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
series |
pd.Series |
Slope profile. |
required |
distance_field |
str |
Field where the distance is stored. Default to "distance". |
'distance' |
slope_field |
str |
Field where the slope is stored. Default to "slope". |
'slope' |
sigma |
int |
Number of standard deviations sued in the Gaussian smoothing filter. |
0 |
peak_height |
int |
Threshold to use to define a peak in the smoothed slope profile. |
30 |
Returns:
Type | Description |
---|---|
|
Toe distances of the given slope profile. |
Source code in sandpyper\common.py
def toes_from_slopes(
series, distance_field="distance", slope_field="slope", sigma=0, peak_height=30
):
"""Returns dune toe distance (from transect origin) by extracting peaks higher of a given value
from a Gaussian filtered slope profile. It can return multiple candidates when multiple peaks are found.
These will be used to clip beachfaces, defined as going from the swash line to dune toes.
Args:
series (pd.Series): Slope profile.
distance_field (str): Field where the distance is stored. Default to "distance".
slope_field (str): Field where the slope is stored. Default to "slope".
sigma (int): Number of standard deviations sued in the Gaussian smoothing filter.
peak_height (int): Threshold to use to define a peak in the smoothed slope profile.
Returns:
Toe distances of the given slope profile.
"""
sorted_series = series.sort_values([distance_field])
gauss = gaussian_filter(sorted_series.loc[:, slope_field], sigma=sigma)
peak = sig.find_peaks(gauss, height=peak_height)
try:
toe_distances = sorted_series.iloc[peak[0]][distance_field]
except BaseException:
toe_distances = np.nan
return toe_distances