""" Satellite imagery processing module for calculating vegetation indices. This module processes satellite imagery data to calculate various vegetation indices such as NDVI, NDRE, NDWI, etc., and generates visualizations of these indices. """ import os import json from typing import Dict, List, Tuple, Union, Any, Optional import numpy as np import cv2 from functools import partial # Constants MAX_PIXEL_LONG = 10980 DEFAULT_SIZE_B4 = [10980, 10980] SMALLER_SIZE_B4 = [5490, 5490] MAX_INDEX_VALUE = 1.0 MIN_INDEX_VALUE = -1.0 MAX_NDMI_VALUE = 0.8 MIN_NDMI_VALUE = -0.8 MAX_BSI_VALUE = 2.5 CLOUD_THRESHOLD = 170 PIXEL_MAX_VALUE = 4096 ETCI_LIMIT = 0.5 ETCI_LOW_LIMIT = 0.008 ETCI_UP_LIMIT = 0.992 ETCI_CR_LIMIT = 0.04 ETCI_CB_LIMIT = -0.04 CR_ADJUSTMENT = 0.596 CB_ADJUSTMENT = 0.523 SOC_CONSTANTS = { 'LAI_OFFSET': 0.0411, 'LAI_SCALE': 0.0087, 'CONVERSION_FACTOR': 0.004536 * 100 / (350 * 9 * 20) } # Color definitions (BGR format for OpenCV) COLORS = { 'badveg': [90, 136, 247], 'vbadveg': [26, 25, 229], 'badwater': [200, 145, 180], 'vbadwater': [97, 19, 60], 'badcloud': [255, 255, 255], 'bothbad': [44, 16, 145], 'otherregions': [95, 167, 17], 'default_blind': [111, 111, 111] } # YCrCb transformation matrix YCRCB_TRANS = np.array([ [0.299, 0.596, 0.211], [0.587, -0.274, -0.523], [0.114, -0.322, 0.312] ]) def parse_json_data(json_string: Union[str, Dict]) -> Dict: """ Parse JSON data from string or dictionary. Args: json_string: JSON data as string or dictionary Returns: Dictionary containing parsed JSON data """ if isinstance(json_string, str): return json.loads(json_string) return json_string def format_date(date_str: str) -> str: """ Format date string from YYYYMMDD to YYYY-MM-DD. Args: date_str: Date string in YYYYMMDD format Returns: Formatted date string in YYYY-MM-DD format """ return f"{date_str[0:4]}-{date_str[4:6]}-{date_str[6:8]}" def get_file_paths(mgrs: str, date: str, version: int = 0) -> Dict[str, str]: """ Generate file paths for satellite imagery. Args: mgrs: Military Grid Reference System code date: Formatted date string version: Version number (0 or 1) Returns: Dictionary of file paths for different bands """ filepath = f"AwsData/{mgrs},{date},{version}/" return { 'B4': filepath + "B04.jp2", 'B8': filepath + "B08.jp2", 'TCI': filepath + "TCI.jp2", 'B12': filepath + "B12.jp2", 'B11': filepath + "B11.jp2", 'B5': filepath + "B05.jp2", 'B2': filepath + "B02.jp2", 'B3': filepath + "B03.jp2" } def load_images(file_paths: Dict[str, str]) -> Dict[str, np.ndarray]: """ Load satellite imagery from file paths. Args: file_paths: Dictionary of file paths for different bands Returns: Dictionary of loaded images Raises: IOError: If images cannot be loaded """ images = {} try: images['B4'] = cv2.imread(file_paths['B4'], cv2.IMREAD_UNCHANGED) images['B5'] = cv2.imread(file_paths['B5'], cv2.IMREAD_UNCHANGED) images['B2'] = cv2.imread(file_paths['B2'], cv2.IMREAD_UNCHANGED) images['B3'] = cv2.imread(file_paths['B3'], cv2.IMREAD_UNCHANGED) images['B8'] = cv2.imread(file_paths['B8'], cv2.IMREAD_UNCHANGED) images['TCI'] = cv2.imread(file_paths['TCI']) images['B12'] = cv2.imread(file_paths['B12'], cv2.IMREAD_UNCHANGED) images['B11'] = cv2.imread(file_paths['B11'], cv2.IMREAD_UNCHANGED) # Check if any images are None if any(img is None for img in images.values()): raise IOError("One or more images could not be loaded") return images except Exception as e: raise IOError(f"Error loading images: {str(e)}") def clip_images(images: Dict[str, np.ndarray], clip: List[int]) -> Dict[str, np.ndarray]: """ Clip images to region of interest. Args: images: Dictionary of images clip: Clipping coordinates [StartPixelLat, EndPixelLat, StartPixelLong, EndPixelLong] Returns: Dictionary of clipped images Raises: ValueError: If clipping fails """ try: clipped_images = {} # Clip RGB image clipped_images['TCI'] = images['TCI'][clip[0]:clip[1]+1, clip[2]:clip[3]+1, :] # Clip single-channel images for band in ['B4', 'B8', 'B2', 'B3']: clipped_images[band] = images[band][clip[0]:clip[1]+1, clip[2]:clip[3]+1] return clipped_images except Exception as e: raise ValueError(f"Error clipping images: {str(e)}") def clip_smaller_images(images: Dict[str, np.ndarray], clip: List[int]) -> Dict[str, np.ndarray]: """ Clip smaller resolution images. Args: images: Dictionary of images clip: Clipping coordinates [StartPixelLat, EndPixelLat, StartPixelLong, EndPixelLong] Returns: Dictionary of clipped images """ clipped_images = {} # Clip multi-channel images for band in ['B12', 'B11', 'B5']: clipped_images[band] = images[band][clip[0]:clip[1]+1, clip[2]:clip[3]+1, :] return clipped_images def prepare_images(clipped_images: Dict[str, np.ndarray], smaller_images: Dict[str, np.ndarray], clip: List[int]) -> Dict[str, np.ndarray]: """ Prepare images for processing by resizing and converting to float32. Args: clipped_images: Dictionary of clipped images smaller_images: Dictionary of smaller clipped images clip: Clipping coordinates Returns: Dictionary of prepared images """ prepared_images = {} # Copy images to avoid modifying originals prepared_images['B8NDRE'] = clipped_images['B8'].copy() prepared_images['B4BSI'] = clipped_images['B4'].copy() prepared_images['B12BSI'] = smaller_images['B12'].copy() # Resize images target_size = (abs(clip[3]-clip[2]+1), abs(clip[1]-clip[0]+1)) prepared_images['B8NDRE'] = cv2.resize(prepared_images['B8NDRE'], target_size).astype(np.float32) prepared_images['B4BSI'] = cv2.resize(prepared_images['B4BSI'], target_size).astype(np.float32) prepared_images['B12BSI'] = cv2.resize(prepared_images['B12BSI'], target_size).astype(np.float32) # Convert all images to float32 for band in clipped_images: prepared_images[band] = clipped_images[band].astype(np.float32) for band in smaller_images: prepared_images[band] = smaller_images[band].astype(np.float32) return prepared_images def calculate_indices(images: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: """ Calculate various vegetation indices from satellite imagery. Args: images: Dictionary of prepared images Returns: Dictionary of calculated indices """ indices = {} # NDVI (Normalized Difference Vegetation Index) indices['NDVI'] = (images['B8'] - images['B4']) / (images['B8'] + images['B4']) # AVI (Advanced Vegetation Index) indices['AVI'] = np.power((images['B8'] * (PIXEL_MAX_VALUE - images['B4']) * (images['B8'] - images['B4'])), 1/3) indices['AVI'] = indices['AVI'] / PIXEL_MAX_VALUE # BSI (Bare Soil Index) indices['BSI'] = (images['B8'] + images['B3'] + images['B4']) / (images['B8'] + images['B3'] - images['B4']) indices['BSI'] = np.clip(indices['BSI'], 0, MAX_BSI_VALUE) / MAX_BSI_VALUE # SI (Soil Index) indices['SI'] = np.power(((PIXEL_MAX_VALUE - images['B2']) * (PIXEL_MAX_VALUE - images['B3']) * (PIXEL_MAX_VALUE - images['B4'])), 1/3) indices['SI'] = indices['SI'] / PIXEL_MAX_VALUE # NDRE (Normalized Difference Red Edge) indices['NDRE'] = (images['B8NDRE'] - images['B5']) / (images['B8NDRE'] + images['B5']) # NDWI (Normalized Difference Water Index) indices['NDWI'] = (images['B8NDRE'] - images['B12']) / (images['B8NDRE'] + images['B12']) # NDMI (Normalized Difference Moisture Index) indices['NDMI'] = (images['B8NDRE'] - images['B11']) / (images['B8NDRE'] + images['B11']) # VARI (Visible Atmospherically Resistant Index) num_vari = images['B3'] - images['B4'] den_vari = (images['B3'] + images['B4'] - images['B2']) indices['VARI'] = num_vari / den_vari # EVI (Enhanced Vegetation Index) indices['EVI'] = (2.5 * (images['B8'] - images['B4'])) / ((images['B8'] + 2.4 * images['B4']) + PIXEL_MAX_VALUE) # SAVI (Soil Adjusted Vegetation Index) indices['SAVI'] = 1.5 * (images['B8'] - images['B4']) / (images['B8'] + images['B4'] + 0.5) # VSSI (Visible Soil Saturation Index) indices['VSSI'] = 2 * images['B3'] - 5 * (images['B4'] + images['B8']) # BI (Brightness Index) indices['BI'] = images['B8NDRE'] / images['B5'] - 1 # Clip indices to valid ranges indices['NDRE'] = np.clip(indices['NDRE'], MIN_INDEX_VALUE, MAX_INDEX_VALUE) indices['NDWI'] = np.clip(indices['NDWI'], MIN_INDEX_VALUE, MAX_INDEX_VALUE) indices['NDMI'] = np.clip(indices['NDMI'], MIN_NDMI_VALUE, MAX_NDMI_VALUE) indices['VARI'] = np.clip(indices['VARI'], MIN_INDEX_VALUE, MAX_INDEX_VALUE) return indices def calculate_soc(ndvi: np.ndarray) -> np.ndarray: """ Calculate Soil Organic Carbon (SOC) from NDVI. Args: ndvi: NDVI array Returns: SOC array """ # Vectorized calculation min_ndvi = -1 max_ndvi = 1 # Calculate fractional cover fc = 1 - (max_ndvi - ndvi) / (max_ndvi - min_ndvi) # Calculate LAI (Leaf Area Index) # Handle potential log(0) by adding a small epsilon epsilon = 1e-10 lai = -2 * np.log(np.maximum(1 - fc, epsilon)) # Calculate SOC soc_pc = 100 * (lai - SOC_CONSTANTS['LAI_OFFSET']) / SOC_CONSTANTS['LAI_SCALE'] soc = soc_pc * SOC_CONSTANTS['CONVERSION_FACTOR'] return soc.astype(np.float32) def expand_indices(indices: Dict[str, np.ndarray], ndvi_shape: Tuple[int, int]) -> Dict[str, np.ndarray]: """ Expand smaller resolution indices to match NDVI resolution. Args: indices: Dictionary of indices ndvi_shape: Shape of NDVI array Returns: Dictionary of expanded indices """ expanded = {} # Create empty arrays for expanded indices for idx in ['NDRE', 'NDWI', 'BI', 'NDMI']: expanded[f'{idx}_expanded'] = np.zeros(ndvi_shape) # Get shapes ndre_shape = indices['NDRE'].shape # Use broadcasting for expansion where possible for i in range(ndre_shape[0]): for j in range(ndre_shape[1]): if 2*i < ndvi_shape[0] and 2*j < ndvi_shape[1]: # Define the 2x2 block in the expanded array rows = slice(2*i-1, 2*i+1) cols = slice(2*j-1, 2*j+1) # Set all 4 pixels in the block to the same value for idx in ['NDRE', 'NDWI', 'BI', 'NDMI']: expanded[f'{idx}_expanded'][rows, cols] = indices[idx][i, j] return expanded def adjust_indices_by_plant_distance(indices: Dict[str, np.ndarray], plant_distance: float) -> Tuple[Dict[str, np.ndarray], float]: """ Adjust indices based on plant distance. Args: indices: Dictionary of indices plant_distance: Plant distance value Returns: Tuple of adjusted indices and delta value """ # Determine delta based on plant distance delta = 0 if 1 < plant_distance < 3: delta = 0.1 elif 3 < plant_distance < 6: delta = 0.2 elif plant_distance > 6: delta = 0.3 # Apply delta and clip values for idx in ['NDRE', 'NDWI', 'BI', 'NDMI']: indices[idx] = np.minimum(0.9, indices[idx] + delta) return indices, delta def create_hybrid_image(ndvi: np.ndarray, expanded_indices: Dict[str, np.ndarray], tci_gray: np.ndarray, delta: float) -> Tuple[np.ndarray, np.ndarray]: """ Create hybrid and blind hybrid images based on indices. Args: ndvi: NDVI array expanded_indices: Dictionary of expanded indices tci_gray: Grayscale TCI image delta: Delta value for adjustment Returns: Tuple of hybrid image and blind hybrid image """ # Get shape shape = ndvi.shape # Create empty arrays hybrid_image = np.zeros((shape[0], shape[1], 3), dtype=np.uint8) hybrid_blind_image = np.zeros((shape[0], shape[1], 3), dtype=np.uint8) # Set default colors hybrid_image[:] = COLORS['otherregions'] hybrid_blind_image[:] = COLORS['default_blind'] # Create masks for different conditions cloud_mask = tci_gray > CLOUD_THRESHOLD # Adjust NDVI and get expanded NDWI ndvi_adj = ndvi + delta ndwi_expanded = expanded_indices['NDWI_expanded'] # Create masks for vegetation and water conditions bad_veg_mask = np.logical_and(ndvi_adj > 0.3, ndvi_adj < 0.45) very_bad_veg_mask = ndvi_adj < 0.3 bad_water_mask = np.logical_and(ndwi_expanded < 0.4, ndvi_adj > 0.3) very_bad_water_mask = ndwi_expanded < 0.3 both_bad_mask = np.logical_and(ndvi_adj < 0.45, ndwi_expanded < 0.4) # Apply colors based on masks # Cloud hybrid_image[cloud_mask] = COLORS['badcloud'] hybrid_blind_image[cloud_mask] = [0, 0, 0] # Non-cloud pixels non_cloud_mask = ~cloud_mask # Bad vegetation mask = np.logical_and(non_cloud_mask, bad_veg_mask) hybrid_image[mask] = COLORS['badveg'] hybrid_blind_image[mask] = [0, 0, 0] # Very bad vegetation mask = np.logical_and(non_cloud_mask, very_bad_veg_mask) hybrid_image[mask] = COLORS['vbadveg'] hybrid_blind_image[mask] = [0, 0, 0] # Bad water mask = np.logical_and(non_cloud_mask, bad_water_mask) hybrid_image[mask] = COLORS['badwater'] hybrid_blind_image[mask] = [255, 255, 255] # Very bad water mask = np.logical_and(non_cloud_mask, very_bad_water_mask) hybrid_image[mask] = COLORS['vbadwater'] hybrid_blind_image[mask] = [255, 255, 255] # Both bad mask = np.logical_and(non_cloud_mask, both_bad_mask) hybrid_image[mask] = COLORS['bothbad'] hybrid_blind_image[mask] = [0, 0, 0] return hybrid_image, hybrid_blind_image def create_etci_image(tci: np.ndarray) -> np.ndarray: """ Create Enhanced True Color Image (ETCI) from TCI. Args: tci: True Color Image array Returns: Enhanced True Color Image array """ img = tci.copy() m1, n1, r1 = img.shape if r1 == 3: try: a = cv2.cvtColor(img, cv2.COLOR_BGR2YCrCb) except: img = img.astype(np.float32) / 255.0 a = cv2.cvtColor(img, cv2.COLOR_BGR2YCrCb) # Adjust Cr channel mean_adjustment = ETCI_CR_LIMIT - np.mean(a[:, :, 1]) a[:, :, 1] = a[:, :, 1] + mean_adjustment * (CR_ADJUSTMENT - a[:, :, 1]) # Adjust Cb channel mean_adjustment = ETCI_CB_LIMIT - np.mean(a[:, :, 2]) a[:, :, 2] = a[:, :, 2] + mean_adjustment * (CB_ADJUSTMENT - a[:, :, 2]) else: a = img.astype(np.float32) / 255.0 # Adjust Y channel mean_adjustment = ETCI_LIMIT - np.mean(a[:, :, 0]) a[:, :, 0] = a[:, :, 0] + mean_adjustment * (1 - a[:, :, 0]) if r1 == 3: a = cv2.cvtColor(a, cv2.COLOR_YCrCb2BGR) img = a * 255.0 # Calculate vmin and vmax v_min = [] v_max = [] for k in range(r1): arr = np.sort(img[:, :, k].flatten()) v_min.append(arr[int(np.ceil(ETCI_LOW_LIMIT * m1 * n1))]) v_max.append(arr[int(np.ceil(ETCI_UP_LIMIT * m1 * n1))]) if r1 == 3: v_min = np.dot(np.array(v_min), YCRCB_TRANS) v_max = np.dot(np.array(v_max), YCRCB_TRANS) # Normalize image img = (img - v_min[0]) / (v_max[0] - v_min[0]) return (img * 255).astype(np.uint8) def save_index_image(index: np.ndarray, path: str, map_func: callable, resize_func: Optional[callable] = None, target_shape: Optional[Tuple[int, int]] = None) -> None: """ Save index image with optional resizing. Args: index: Index array path: Output file path map_func: Color mapping function resize_func: Optional resize function target_shape: Optional target shape for resizing """ img = map_func(index) if resize_func is not None and target_shape is not None: img = resize_func(img, target_shape) cv2.imwrite(path, img) def save_index_data(index: np.ndarray, path: str) -> None: """ Save index data to CSV file. Args: index: Index array path: Output file path """ np.savetxt(path, index, delimiter=",") def resize_index_image(img: np.ndarray, size: Tuple[int, int]) -> np.ndarray: """ Resize index image to target size. Args: img: Image array size: Target size (height, width) Returns: Resized image array """ return cv2.resize(img, (size[1], size[0])) def gen_map1_img(index: np.ndarray) -> np.ndarray: """ Generate color map 1 for index visualization. Args: index: Index array Returns: Color-mapped image array """ # This is a placeholder for the MATLAB gen_map1_img function # In a real implementation, you would use a proper color mapping return np.uint8(index * 255) def gen_map2_img(index: np.ndarray) -> np.ndarray: """ Generate color map 2 for index visualization. Args: index: Index array Returns: Color-mapped image array """ # This is a placeholder for the MATLAB gen_map2_img function return np.uint8(index * 255) def gen_map1_soc(index: np.ndarray) -> np.ndarray: """ Generate color map 1 for SOC visualization. Args: index: SOC array Returns: Color-mapped image array """ # This is a placeholder for the MATLAB gen_map1_soc function return np.uint8(index * 255) def gen_map2_soc(index: np.ndarray) -> np.ndarray: """ Generate color map 2 for SOC visualization. Args: index: SOC array Returns: Color-mapped image array """ # This is a placeholder for the MATLAB gen_map2_soc function return np.uint8(index * 255) def gen_map3_img(index: np.ndarray) -> np.ndarray: """ Generate color map 3 for index visualization. Args: index: Index array Returns: Color-mapped image array """ # This is a placeholder for the MATLAB gen_map3_img function return np.uint8(index * 255) def gen_map4_img(index: np.ndarray) -> np.ndarray: """ Generate color map 4 for index visualization. Args: index: Index array Returns: Color-mapped image array """ # This is a placeholder for the MATLAB gen_map4_img function return np.uint8(index * 255) def process_additional_images(main_path: str, size: Tuple[int, int]) -> None: """ Process additional images if they exist. Args: main_path: Main output directory path size: Target size for resizing """ for img_name in ['evapo', 'lulc', 'rvi', 'dem']: try: filename = f"{main_path}/{img_name}.png" img = cv2.imread(filename) if img is not None: img_new = cv2.resize(img, (size[1], size[0])) cv2.imwrite(filename, img_new) except: pass def save_all_indices(indices: Dict[str, np.ndarray], main_path: str, ndvi_shape: Tuple[int, int]) -> None: """ Save all indices data and visualizations. Args: indices: Dictionary of indices main_path: Main output directory path ndvi_shape: Shape of NDVI array for resizing """ # Define mapping functions map_funcs = { 'default': gen_map1_img, 'cmap2': gen_map2_img, 'soc1': gen_map1_soc, 'soc2': gen_map2_soc, 'vari1': gen_map3_img, 'vari2': gen_map4_img } # Define index-specific settings index_settings = { 'NDVI': {'maps': ['default', 'cmap2']}, 'VSSI': {'maps': ['default', 'cmap2'], 'save_data': True}, 'SOC': {'maps': ['soc1', 'soc2'], 'save_data': True}, 'EVI': {'maps': ['default', 'cmap2'], 'save_data': True}, 'VARI': {'maps': ['vari1', 'vari2'], 'save_data': True}, 'NDRE': {'maps': ['default', 'cmap2'], 'save_data': True, 'resize': True}, 'AVI': {'maps': ['default', 'cmap2'], 'save_data': True}, 'BSI': {'maps': ['default', 'cmap2'], 'save_data': True}, 'SI': {'maps': ['default', 'cmap2'], 'save_data': True}, 'SAVI': {'maps': ['default', 'cmap2'], 'save_data': True}, 'NDMI': {'maps': ['default', 'cmap2'], 'save_data': True, 'resize': True}, 'NDWI': {'maps': ['default', 'cmap2'], 'save_data': True, 'resize': True} } # Save each index for idx_name, settings in index_settings.items(): # Skip if index not in indices if idx_name not in indices: continue # Save data if needed if settings.get('save_data', False): save_index_data(indices[idx_name], f"{main_path}/IndexValues_{idx_name.lower()}.csv") # Save visualizations for map_type in settings.get('maps', []): map_func = map_funcs.get(map_type, map_funcs['default']) # Determine output filename if map_type == 'default': filename = f"{main_path}/{idx_name.lower()}.png" else: filename = f"{main_path}/{idx_name.lower()}_{map_type}.png" # Save with or without resizing if settings.get('resize', False): save_index_image( indices[idx_name], filename, map_func, resize_index_image, ndvi_shape ) else: save_index_image(indices[idx_name], filename, map_func) def monitored_fields_data(JSONString: Union[str, Dict]) -> str: """ Process satellite imagery data to calculate various vegetation indices. Args: JSONString: JSON data containing satellite imagery information Returns: str: Status of the processing """ try: # Parse JSON data data = parse_json_data(JSONString) # Extract field information uid = data["UID"] field_id = data["FieldID"] plant_distance = data["PlantDistance"] mgrs = data["MGRS"] # Adjust end pixel if needed end_pixel_long = data["EndPixelLong"] if end_pixel_long > MAX_PIXEL_LONG: end_pixel_long = MAX_PIXEL_LONG - 1 # Format date and get file paths date_str = data["LatestDay"] formatted_date = format_date(date_str) # Try to load images from primary path try: file_paths = get_file_paths(mgrs, formatted_date, 0) images = load_images(file_paths) except IOError: # Try alternative path try: file_paths = get_file_paths(mgrs, formatted_date, 1) images = load_images(file_paths) except IOError: return f"{uid}failed2" # Clip images clip = [data["StartPixelLat"], data["EndPixelLat"], data["StartPixelLong"], end_pixel_long] try: clipped_images = clip_images(images, clip) except ValueError: return f"{uid}failed2" # Process smaller resolution images smaller_clip = [round(x / 2) for x in clip] smaller_images = clip_smaller_images(images, smaller_clip) # Prepare images for processing prepared_images = prepare_images(clipped_images, smaller_images, smaller_clip) # Calculate indices indices = calculate_indices(prepared_images) # Calculate SOC indices['SOC'] = calculate_soc(indices['NDVI']) # Create output directory main_path = f"{uid}/{field_id}" os.makedirs(main_path, exist_ok=True) # Save NDVI values to CSV and check for clouds save_index_data(indices['NDVI'], f"{main_path}/IndexValues_ndvi.csv") # Check if there is too much cloud is_nan_mat = np.isnan(indices['NDVI']) is_nan_count = np.sum(is_nan_mat) if is_nan_count > clipped_images['B4'].shape[0] * clipped_images['B4'].shape[1] * 0.8: return f"{uid}failed1" # Save TCI image cv2.imwrite(f"{main_path}/TCI.png", clipped_images['TCI']) # Process additional images if they exist process_additional_images(main_path, indices['VARI'].shape) # Create and save ETCI image etci = create_etci_image(clipped_images['TCI']) cv2.imwrite(f"{main_path}/ETCI.png", etci) cv2.imwrite(f"{main_path}/ETCI_large.png", etci) # Save TCI images cv2.imwrite(f"{main_path}/TCI.png", clipped_images['TCI']) cv2.imwrite(f"{main_path}/TCI_large.png", clipped_images['TCI']) # Create grayscale TCI image for hybrid image creation try: tci = cv2.imread(f"{main_path}/ETCI.png") tci_gray = cv2.cvtColor(tci, cv2.COLOR_BGR2GRAY) except: return f"{uid}failed1" # Expand indices to match NDVI resolution expanded_indices = expand_indices(indices, indices['NDVI'].shape) # Adjust indices based on plant distance indices, delta = adjust_indices_by_plant_distance(indices, plant_distance) # Create and save hybrid images hybrid_image, hybrid_blind_image = create_hybrid_image( indices['NDVI'], expanded_indices, tci_gray, delta ) cv2.imwrite(f"{main_path}/hybrid.png", hybrid_image) cv2.imwrite(f"{main_path}/hybrid_blind.png", hybrid_blind_image) # Save all indices data and visualizations save_all_indices(indices, main_path, indices['NDVI'].shape) return f"{uid}successful" except Exception as e: # Get UID for error message if possible uid = "unknown" try: if isinstance(JSONString, str): data = json.loads(JSONString) else: data = JSONString uid = data.get("UID", "unknown") except: pass # Log error and return failure status print(f"Error processing data: {str(e)}") return f"{uid}failed1"