""" This code compares the NDVI multiband trends (monthly composite) with a CSV signature trend (from an uploaded asset) and plots only those pixels whose NDVI timeā€series match the signature. """ import ee import pandas as pd import json import os import time # Initialize Earth Engine ee.Initialize() print("\n[1/6] Earth Engine initialized successfully") # ---------- Load GeoJSON and Process Polygons ---------- def load_polygons(): with open('cluster.geojson') as f: data = json.load(f) return data['features'] def calc_landsat_ndvi(img): red = img.select('SR_B4') \ .multiply(ee.Number(img.get('REFLECTANCE_MULT_BAND_4'))) \ .add(ee.Number(img.get('REFLECTANCE_ADD_BAND_4'))) nir = img.select('SR_B5') \ .multiply(ee.Number(img.get('REFLECTANCE_MULT_BAND_5'))) \ .add(ee.Number(img.get('REFLECTANCE_ADD_BAND_5'))) ndvi = nir.subtract(red).divide(nir.add(red)).rename('ndvi') return ndvi.copyProperties(img, ['system:time_start', 'system:index']) def calc_sentinel_ndvi(img): ms = img.select(['B4','B8']).multiply(0.0001) ndvi = ms.normalizedDifference(['B8','B4']).rename('ndvi') return ndvi.copyProperties(img, ['system:time_start', 'system:index']) def create_monthly_composite(month, ndvi_collection): month = ee.Number(month) # Calculate year and actual_month using Earth Engine operations is_first_three_months = month.lte(3) is_last_month = month.eq(16) # Use ee.Algorithms.If() for conditional logic year = ee.Algorithms.If( is_first_three_months, 2023, ee.Algorithms.If(is_last_month, 2025, 2024) ) actual_month = ee.Algorithms.If( is_first_three_months, month.add(9), # 1->10, 2->11, 3->12 ee.Algorithms.If( is_last_month, 1, # month 16 -> January month.subtract(3).subtract(1).mod(12).add(1) # 4->1, 5->2, ..., 15->12 ) ) month_img = ndvi_collection.filter(ee.Filter.calendarRange(actual_month, actual_month, 'month')).median() date = ee.Date.fromYMD(year, actual_month, 1) return month_img \ .set('system:time_start', date.millis()) \ .set('system:index', date.format('YYYY-MM')) def process_signature(feature, ndvi_multiband, band_names, relative_tolerance, monthly_tolerance, min_months_match): # Extract signature values for this feature signature_list = ee.List.sequence(1, 16).map(lambda m: ee.Number(feature.get(ee.String('NDVI_').cat(ee.Number(m).format('%02d'))))) # Create signature image signature_image = ee.Image.constant(signature_list).rename(band_names) # Calculate dynamic threshold based on signature's range signature_min = ee.Number(signature_list.reduce(ee.Reducer.min())) signature_max = ee.Number(signature_list.reduce(ee.Reducer.max())) signature_range = signature_max.subtract(signature_min) dynamic_threshold = signature_range.multiply(relative_tolerance) # Calculate Euclidean distance for overall pattern matching diff_squared = ndvi_multiband.subtract(signature_image).pow(2) sum_squared = diff_squared.reduce(ee.Reducer.sum()) distance = sum_squared.sqrt() # Calculate band-wise (monthly) differences monthly_diffs = ndvi_multiband.subtract(signature_image).abs() monthly_matches = monthly_diffs.lt(monthly_tolerance) matching_months_count = monthly_matches.reduce(ee.Reducer.sum()) # Combine both criteria matching_pixels = ndvi_multiband.updateMask( distance.lt(dynamic_threshold) .And(matching_months_count.gte(min_months_match)) ) return matching_pixels def export_ndvi_collection(collection, geometry, base_folder, sensor_folder, farm_id, description_prefix): """Helper function to export individual NDVI images from a collection.""" # Get list of images with their timestamps image_list = collection.toList(collection.size()) size = image_list.size().getInfo() # Create folder path that Google Drive will interpret as nested folders folder_path = f"{base_folder}/{sensor_folder}/Farm_{farm_id}" for i in range(size): image = ee.Image(image_list.get(i)) timestamp = ee.Date(image.get('system:time_start')).format('YYYY-MM-dd').getInfo() task = ee.batch.Export.image.toDrive( image=image.select('ndvi'), description=f'{description_prefix}_{timestamp}', folder=folder_path, # Google Drive creates nested folders from this path scale=30, region=geometry, fileFormat='GeoTIFF', formatOptions={'cloudOptimized': True}, maxPixels=1e9 ) task.start() print(f"Started export task for {description_prefix}_{timestamp} in {folder_path}") def process_single_polygon(feature): # Start timing start_time = time.time() # Extract coordinates and properties coordinates = feature['geometry']['coordinates'][0] farm_id = feature['properties']['LOC_CODE'] village_name = feature['properties']['NAME'] taluk = feature['properties']['TALUK'] district = feature['properties']['DISTRICT'] farm_id, taluk, district, village_name = "farm_id", "taluk", "district", "vilage_name" # Create EE geometry geometry = ee.Geometry.Polygon(coordinates) print(f"\nProcessing Farm ID: {farm_id}") time_start = '2023-10-01' time_end = '2025-02-28' # Landsat 8 and 9 NDVI landsat8 = ee.ImageCollection("LANDSAT/LC08/C02/T1_L2") \ .filterDate(time_start, time_end) \ .filterBounds(geometry) \ .map(calc_landsat_ndvi) landsat9 = ee.ImageCollection("LANDSAT/LC09/C02/T1_L2") \ .filterDate(time_start, time_end) \ .filterBounds(geometry) \ .map(calc_landsat_ndvi) # Sentinel-2 NDVI sen2 = ee.ImageCollection("COPERNICUS/S2_SR_HARMONIZED") \ .filterDate(time_start, time_end) \ .filterBounds(geometry) \ .map(calc_sentinel_ndvi) # Merge and sort by time ndvi_collection = landsat8.merge(landsat9).merge(sen2) \ .sort('system:time_start') print("\n[2/6] NDVI collections created and merged successfully") print(f"Time elapsed: {time.time() - start_time:.2f} seconds") # Monthly NDVI Composite month_list = ee.List.sequence(1, 16) monthly_ndvi = ee.ImageCollection(month_list.map(lambda m: create_monthly_composite(m, ndvi_collection))) # Visualize the monthly composite as a multiband image ndvi_multiband = monthly_ndvi.toBands().clip(geometry) band_names = ee.List.sequence(1, 16).map(lambda m: ee.String('NDVI_').cat(ee.Number(m).format('%02d'))) ndvi_multiband = ndvi_multiband.rename(band_names) print("\n[3/6] Monthly NDVI composites created successfully") print(f"Time elapsed: {time.time() - start_time:.2f} seconds") # Read local CSV file df = pd.read_csv('ClusterSignature_Cluster_1.csv') # Convert DataFrame to a list of feature dictionaries features = [] for _, row in df.iterrows(): feature = {} for month in range(1, 17): col_name = f'NDVI_{month:02d}' if col_name in row: feature[col_name] = float(row[col_name]) features.append(ee.Feature(None, feature)) # Create FeatureCollection from features csv_asset = ee.FeatureCollection(features) # Set tolerance parameters for matching relative_tolerance = 0.80 # 30% of signature's range monthly_tolerance = 0.70 # 25% tolerance per month min_months_match = 10 # Number of months that must be within tolerance # Process all signatures and combine results print("\n[4/6] Starting signature processing...") all_matching_pixels = csv_asset.map(lambda f: process_signature(f, ndvi_multiband, band_names, relative_tolerance, monthly_tolerance, min_months_match)) combined_matching_pixels = ee.ImageCollection(all_matching_pixels).mosaic() print("Signature processing completed successfully") print(f"Time elapsed: {time.time() - start_time:.2f} seconds") # Get ESA WorldCover data worldcover = ee.ImageCollection("ESA/WorldCover/v200").first() # Create mask for non-urban/water areas non_urban_water_mask = worldcover.neq(80).And(worldcover.neq(50)).And(worldcover.neq(10)).And(worldcover.neq(20)).And(worldcover.neq(30)).And(worldcover.neq(60)).And(worldcover.neq(50)).And(worldcover.neq(60)).And(worldcover.neq(90)).And(worldcover.neq(95)) # For visualization, display the combined matching pixels matching_band = combined_matching_pixels.select("NDVI_01") # Apply the land cover mask matching_band = matching_band.updateMask(non_urban_water_mask) # Convert to binary mask (0 or 1) for connected components analysis binary_mask = matching_band.mask().toInt() # Remove standalone pixels using connectedPixels connected_pixels = binary_mask.connectedPixelCount(8) # 8-connected neighbors matching_band = matching_band.updateMask(connected_pixels.gt(1)) # Create a custom visualization using SLD-style rules for NDVI ranges sld_style = ''' ''' # Apply the SLD style to the image #styled_image = matching_band.sldStyle(sld_style) print("\n[5/6] Image styling completed successfully") print(f"Time elapsed: {time.time() - start_time:.2f} seconds") # Calculate statistics for combined matching pixels matching_count = combined_matching_pixels.reduceRegion( reducer=ee.Reducer.count(), geometry=geometry, scale=30, maxPixels=1e9 ) total_count = ndvi_multiband.reduceRegion( reducer=ee.Reducer.count(), geometry=geometry, scale=30, maxPixels=1e9 ) print("Matching pixels (all signatures):", matching_count.getInfo()) print("Total pixels:", total_count.getInfo()) # Print matching configuration and results print("Matching Configuration:") print(f"- Relative Tolerance: {relative_tolerance} (proportion of signature range)") print(f"- Monthly Tolerance: {monthly_tolerance} (absolute NDVI difference)") print(f"- Required Matching Months: {min_months_match} out of 16") # Print processing results print("Number of signatures processed:", csv_asset.size().getInfo()) # Create RGB bands for color-coded visualization and calculate areas # Define conditions low_ndvi = matching_band.lt(0.35).And(matching_band.gt(0.2)) mid_ndvi = matching_band.gte(0.35).And(matching_band.lte(0.55)) high_ndvi = matching_band.gt(0.55).And(matching_band.lte(0.85)) # Calculate areas in square meters and convert to acres (1 sq meter = 0.000247105 acres) total_area = geometry.area().getInfo() * 0.000247105 # Calculate areas for each NDVI category orange_area = ee.Image.pixelArea().mask(low_ndvi).reduceRegion( reducer=ee.Reducer.sum(), geometry=geometry, scale=30, maxPixels=1e9 ).get('area').getInfo() * 0.000247105 dark_green_area = ee.Image.pixelArea().mask(mid_ndvi).reduceRegion( reducer=ee.Reducer.sum(), geometry=geometry, scale=30, maxPixels=1e9 ).get('area').getInfo() * 0.000247105 light_green_area = ee.Image.pixelArea().mask(high_ndvi).reduceRegion( reducer=ee.Reducer.sum(), geometry=geometry, scale=30, maxPixels=1e9 ).get('area').getInfo() * 0.000247105 # Save areas to CSV try: area_data = { 'farm_id': farm_id, 'village_name': village_name, 'taluk': taluk, 'district': district, 'total_area_acres': round(total_area, 2), 'dark_green_area_acres': round(dark_green_area, 2), 'light_green_area_acres': round(light_green_area, 2), 'orange_area_acres': round(orange_area, 2) } csv_file = 'polygon_areas.csv' df = pd.DataFrame([area_data]) if os.path.exists(csv_file): df.to_csv(csv_file, mode='a', header=False, index=False) else: df.to_csv(csv_file, mode='w', header=True, index=False) print(f"\nArea calculations saved to {csv_file} for Farm ID: {farm_id}") print(f"Total area: {round(total_area, 2)} acres") print(f"Dark green area: {round(dark_green_area, 2)} acres") print(f"Light green area: {round(light_green_area, 2)} acres") print(f"Orange area: {round(orange_area, 2)} acres") except Exception as e: print(f"Error saving area calculations to CSV: {str(e)}") # Print NDVI value distribution # ndvi_stats = matching_band.reduceRegion( # reducer=ee.Reducer.percentile([0, 25, 50, 75, 100]), # geometry=geometry, # scale=30, # maxPixels=1e9 # ) # print("\nNDVI Distribution Statistics:") # print(ndvi_stats.getInfo()) # Create R, G, B bands based on conditions with explicit masking # Orange (255,165,0) for NDVI < 0.3 # Dark green (0,100,0) for 0.3 <= NDVI <= 0.5 # Light green (144,238,144) for NDVI > 0.5 # Create base mask from matching_band base_mask = matching_band.mask() # Apply masks to conditions low_ndvi_masked = low_ndvi.updateMask(base_mask) mid_ndvi_masked = mid_ndvi.updateMask(base_mask) high_ndvi_masked = high_ndvi.updateMask(base_mask) r_band = ee.Image(0) \ .where(low_ndvi_masked, 255) \ .where(mid_ndvi_masked, 0) \ .where(high_ndvi_masked, 144) g_band = ee.Image(0) \ .where(low_ndvi_masked, 165) \ .where(mid_ndvi_masked, 100) \ .where(high_ndvi_masked, 238) b_band = ee.Image(0) \ .where(low_ndvi_masked, 0) \ .where(mid_ndvi_masked, 0) \ .where(high_ndvi_masked, 144) # Combine bands into a single RGB image color_coded = ee.Image.cat([r_band, g_band, b_band]) # Create base folder base_folder = "GEE_Exports" # Export raw NDVI collections print("\nStarting raw NDVI exports...") # Export Landsat 8 NDVI export_ndvi_collection( landsat8, geometry, base_folder, "Raw_NDVI/Landsat8", farm_id, f"L8_NDVI_farm_{farm_id}" ) # Export Landsat 9 NDVI export_ndvi_collection( landsat9, geometry, base_folder, "Raw_NDVI/Landsat9", farm_id, f"L9_NDVI_farm_{farm_id}" ) # Export Sentinel-2 NDVI export_ndvi_collection( sen2, geometry, base_folder, "Raw_NDVI/Sentinel2", farm_id, f"S2_NDVI_farm_{farm_id}" ) # Export monthly composites print("\nStarting monthly composite exports...") monthly_folder = f"{base_folder}/Monthly_Composites/Farm_{farm_id}" for i in range(16): month_num = i + 1 month_band = ndvi_multiband.select(f'NDVI_{month_num:02d}') task = ee.batch.Export.image.toDrive( image=month_band, description=f'monthly_composite_{month_num:02d}_farm_{farm_id}', folder=monthly_folder, scale=30, region=geometry, fileFormat='GeoTIFF', formatOptions={'cloudOptimized': True}, maxPixels=1e9 ) task.start() print(f"Started export for month {month_num:02d} in {monthly_folder}") # Export the final RGB color-coded image as GeoTIFF processed_folder = f"{base_folder}/Processed/Farm_{farm_id}" task = ee.batch.Export.image.toDrive( image=color_coded, description=f'matching_ndvi_pixels_farm_{farm_id}_{taluk}_{district}', folder=processed_folder, scale=30, region=geometry, fileFormat='GeoTIFF', maxPixels=1e9, formatOptions={ 'cloudOptimized': True } ) task.start() print(f"\n[6/6] Export task started successfully for Farm ID: {farm_id}") print(f"\nTotal processing time: {time.time() - start_time:.2f} seconds") return { 'farm_id': farm_id, 'matching_pixels': matching_count.getInfo(), 'total_pixels': total_count.getInfo() } def log_failed_polygon(farm_id, error_message): """Log failed polygon processing to a CSV file.""" try: failed_data = { 'farm_id': farm_id, 'error_message': str(error_message), 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S') } csv_file = 'failed_polygons.csv' df = pd.DataFrame([failed_data]) if os.path.exists(csv_file): df.to_csv(csv_file, mode='a', header=False, index=False) else: df.to_csv(csv_file, mode='w', header=True, index=False) print(f"\nLogged failed polygon {farm_id} to {csv_file}") except Exception as e: print(f"Error logging failed polygon: {str(e)}") def main(start_index, end_index): # start_index =0 # end_index = 100 polygon_num = 0 # Start timing for all polygons total_start_time = time.time() # Load all polygons from GeoJSON polygons = load_polygons() print(f"\nFound {len(polygons)} polygons to process") # Process each polygon results = [] failed_count = 0 for polygon in polygons: polygon_num = polygon_num + 1 if polygon_num >= start_index and polygon_num < end_index: try: # Extract farm_id before processing in case of early failure farm_id = polygon['properties'].get('LOC_CODE', 'Unknown') result = process_single_polygon(polygon) results.append(result) print(f"\nSuccessfully processed polygon {farm_id}") except Exception as e: failed_count += 1 print(f"\nError processing polygon {farm_id}: {str(e)}") log_failed_polygon(farm_id, str(e)) continue # Continue with next polygon # Print final summary successful_count = len(results) total_processed = successful_count + failed_count print("\nProcessing Summary:") print(f"Total polygons processed: {total_processed}") print(f"Successfully processed: {successful_count}") print(f"Failed to process: {failed_count}") print(f"Total processing time: {time.time() - total_start_time:.2f} seconds") if results: print("\nSuccessful results summary:") for result in results: print(f"Farm ID {result['farm_id']}:") print(f"- Matching pixels: {result['matching_pixels']}") print(f"- Total pixels: {result['total_pixels']}") if failed_count > 0: print(f"\nCheck failed_polygons.csv for details on failed processes") # if __name__ == "__main__": #main(1,2)