""" This code compares the NDVI multiband trends (monthly composite) with a CSV signature trend (from an uploaded asset) and plots only those pixels whose NDVI time‐series match the signature. """ import ee import pandas as pd import json import os import time # Initialize Earth Engine ee.Initialize() print("\n[1/6] Earth Engine initialized successfully") # ---------- Load GeoJSON and Process Polygons ---------- def load_polygons(): with open('cluster.geojson') as f: data = json.load(f) return data['features'] def calc_landsat_ndvi(img): red = img.select('SR_B4') \ .multiply(ee.Number(img.get('REFLECTANCE_MULT_BAND_4'))) \ .add(ee.Number(img.get('REFLECTANCE_ADD_BAND_4'))) nir = img.select('SR_B5') \ .multiply(ee.Number(img.get('REFLECTANCE_MULT_BAND_5'))) \ .add(ee.Number(img.get('REFLECTANCE_ADD_BAND_5'))) ndvi = nir.subtract(red).divide(nir.add(red)).rename('ndvi') return ndvi.copyProperties(img, ['system:time_start', 'system:index']) def calc_sentinel_ndvi(img): ms = img.select(['B4','B8']).multiply(0.0001) ndvi = ms.normalizedDifference(['B8','B4']).rename('ndvi') return ndvi.copyProperties(img, ['system:time_start', 'system:index']) def create_monthly_composite(month, ndvi_collection): month = ee.Number(month) # Calculate year and actual_month using Earth Engine operations is_first_three_months = month.lte(3) is_last_month = month.eq(16) # Use ee.Algorithms.If() for conditional logic year = ee.Algorithms.If( is_first_three_months, 2023, ee.Algorithms.If(is_last_month, 2025, 2024) ) actual_month = ee.Algorithms.If( is_first_three_months, month.add(9), # 1->10, 2->11, 3->12 ee.Algorithms.If( is_last_month, 1, # month 16 -> January month.subtract(3).subtract(1).mod(12).add(1) # 4->1, 5->2, ..., 15->12 ) ) month_img = ndvi_collection.filter(ee.Filter.calendarRange(actual_month, actual_month, 'month')).median() date = ee.Date.fromYMD(year, actual_month, 1) return month_img \ .set('system:time_start', date.millis()) \ .set('system:index', date.format('YYYY-MM')) def process_signature(feature, ndvi_multiband, band_names, relative_tolerance, monthly_tolerance, min_months_match): # Extract signature values for this feature signature_list = ee.List.sequence(1, 16).map(lambda m: ee.Number(feature.get(ee.String('NDVI_').cat(ee.Number(m).format('%02d'))))) # Create signature image signature_image = ee.Image.constant(signature_list).rename(band_names) # Calculate dynamic threshold based on signature's range signature_min = ee.Number(signature_list.reduce(ee.Reducer.min())) signature_max = ee.Number(signature_list.reduce(ee.Reducer.max())) signature_range = signature_max.subtract(signature_min) dynamic_threshold = signature_range.multiply(relative_tolerance) # Calculate Euclidean distance for overall pattern matching diff_squared = ndvi_multiband.subtract(signature_image).pow(2) sum_squared = diff_squared.reduce(ee.Reducer.sum()) distance = sum_squared.sqrt() # Calculate band-wise (monthly) differences monthly_diffs = ndvi_multiband.subtract(signature_image).abs() monthly_matches = monthly_diffs.lt(monthly_tolerance) matching_months_count = monthly_matches.reduce(ee.Reducer.sum()) # Combine both criteria matching_pixels = ndvi_multiband.updateMask( distance.lt(dynamic_threshold) .And(matching_months_count.gte(min_months_match)) ) return matching_pixels def export_ndvi_collection(collection, geometry, base_folder, sensor_folder, farm_id, description_prefix): """Helper function to export individual NDVI images from a collection.""" # Get list of images with their timestamps image_list = collection.toList(collection.size()) size = image_list.size().getInfo() # Create folder path that Google Drive will interpret as nested folders folder_path = f"{base_folder}/{sensor_folder}/Farm_{farm_id}" for i in range(size): image = ee.Image(image_list.get(i)) timestamp = ee.Date(image.get('system:time_start')).format('YYYY-MM-dd').getInfo() task = ee.batch.Export.image.toDrive( image=image.select('ndvi'), description=f'{description_prefix}_{timestamp}', folder=folder_path, # Google Drive creates nested folders from this path scale=30, region=geometry, fileFormat='GeoTIFF', formatOptions={'cloudOptimized': True}, maxPixels=1e9 ) task.start() print(f"Started export task for {description_prefix}_{timestamp} in {folder_path}") def process_single_polygon(feature): # Start timing start_time = time.time() # Extract coordinates and properties coordinates = feature['geometry']['coordinates'][0] farm_id = feature['properties']['LOC_CODE'] village_name = feature['properties']['NAME'] taluk = feature['properties']['TALUK'] district = feature['properties']['DISTRICT'] farm_id, taluk, district, village_name = "farm_id", "taluk", "district", "vilage_name" # Create EE geometry geometry = ee.Geometry.Polygon(coordinates) print(f"\nProcessing Farm ID: {farm_id}") time_start = '2023-10-01' time_end = '2025-02-28' # Landsat 8 and 9 NDVI landsat8 = ee.ImageCollection("LANDSAT/LC08/C02/T1_L2") \ .filterDate(time_start, time_end) \ .filterBounds(geometry) \ .map(calc_landsat_ndvi) landsat9 = ee.ImageCollection("LANDSAT/LC09/C02/T1_L2") \ .filterDate(time_start, time_end) \ .filterBounds(geometry) \ .map(calc_landsat_ndvi) # Sentinel-2 NDVI sen2 = ee.ImageCollection("COPERNICUS/S2_SR_HARMONIZED") \ .filterDate(time_start, time_end) \ .filterBounds(geometry) \ .map(calc_sentinel_ndvi) # Merge and sort by time ndvi_collection = landsat8.merge(landsat9).merge(sen2) \ .sort('system:time_start') print("\n[2/6] NDVI collections created and merged successfully") print(f"Time elapsed: {time.time() - start_time:.2f} seconds") # Monthly NDVI Composite month_list = ee.List.sequence(1, 16) monthly_ndvi = ee.ImageCollection(month_list.map(lambda m: create_monthly_composite(m, ndvi_collection))) # Visualize the monthly composite as a multiband image ndvi_multiband = monthly_ndvi.toBands().clip(geometry) band_names = ee.List.sequence(1, 16).map(lambda m: ee.String('NDVI_').cat(ee.Number(m).format('%02d'))) ndvi_multiband = ndvi_multiband.rename(band_names) print("\n[3/6] Monthly NDVI composites created successfully") print(f"Time elapsed: {time.time() - start_time:.2f} seconds") # Read local CSV file df = pd.read_csv('ClusterSignature_Cluster_1.csv') # Convert DataFrame to a list of feature dictionaries features = [] for _, row in df.iterrows(): feature = {} for month in range(1, 17): col_name = f'NDVI_{month:02d}' if col_name in row: feature[col_name] = float(row[col_name]) features.append(ee.Feature(None, feature)) # Create FeatureCollection from features csv_asset = ee.FeatureCollection(features) # Set tolerance parameters for matching relative_tolerance = 0.80 # 30% of signature's range monthly_tolerance = 0.70 # 25% tolerance per month min_months_match = 10 # Number of months that must be within tolerance # Process all signatures and combine results print("\n[4/6] Starting signature processing...") all_matching_pixels = csv_asset.map(lambda f: process_signature(f, ndvi_multiband, band_names, relative_tolerance, monthly_tolerance, min_months_match)) combined_matching_pixels = ee.ImageCollection(all_matching_pixels).mosaic() print("Signature processing completed successfully") print(f"Time elapsed: {time.time() - start_time:.2f} seconds") # Get ESA WorldCover data worldcover = ee.ImageCollection("ESA/WorldCover/v200").first() # Create mask for non-urban/water areas non_urban_water_mask = worldcover.neq(80).And(worldcover.neq(50)).And(worldcover.neq(10)).And(worldcover.neq(20)).And(worldcover.neq(30)).And(worldcover.neq(60)).And(worldcover.neq(50)).And(worldcover.neq(60)).And(worldcover.neq(90)).And(worldcover.neq(95)) # For visualization, display the combined matching pixels matching_band = combined_matching_pixels.select("NDVI_01") # Apply the land cover mask matching_band = matching_band.updateMask(non_urban_water_mask) # Convert to binary mask (0 or 1) for connected components analysis binary_mask = matching_band.mask().toInt() # Remove standalone pixels using connectedPixels connected_pixels = binary_mask.connectedPixelCount(8) # 8-connected neighbors matching_band = matching_band.updateMask(connected_pixels.gt(1)) # Create a custom visualization using SLD-style rules for NDVI ranges sld_style = ''' ''' # Apply the SLD style to the image #styled_image = matching_band.sldStyle(sld_style) print("\n[5/6] Image styling completed successfully") print(f"Time elapsed: {time.time() - start_time:.2f} seconds") # Calculate statistics for combined matching pixels matching_count = combined_matching_pixels.reduceRegion( reducer=ee.Reducer.count(), geometry=geometry, scale=30, maxPixels=1e9 ) total_count = ndvi_multiband.reduceRegion( reducer=ee.Reducer.count(), geometry=geometry, scale=30, maxPixels=1e9 ) print("Matching pixels (all signatures):", matching_count.getInfo()) print("Total pixels:", total_count.getInfo()) # Print matching configuration and results print("Matching Configuration:") print(f"- Relative Tolerance: {relative_tolerance} (proportion of signature range)") print(f"- Monthly Tolerance: {monthly_tolerance} (absolute NDVI difference)") print(f"- Required Matching Months: {min_months_match} out of 16") # Print processing results print("Number of signatures processed:", csv_asset.size().getInfo()) # Create RGB bands for color-coded visualization and calculate areas # Define conditions low_ndvi = matching_band.lt(0.35).And(matching_band.gt(0.2)) mid_ndvi = matching_band.gte(0.35).And(matching_band.lte(0.55)) high_ndvi = matching_band.gt(0.55).And(matching_band.lte(0.85)) # Calculate areas in square meters and convert to acres (1 sq meter = 0.000247105 acres) total_area = geometry.area().getInfo() * 0.000247105 # Calculate areas for each NDVI category orange_area = ee.Image.pixelArea().mask(low_ndvi).reduceRegion( reducer=ee.Reducer.sum(), geometry=geometry, scale=30, maxPixels=1e9 ).get('area').getInfo() * 0.000247105 dark_green_area = ee.Image.pixelArea().mask(mid_ndvi).reduceRegion( reducer=ee.Reducer.sum(), geometry=geometry, scale=30, maxPixels=1e9 ).get('area').getInfo() * 0.000247105 light_green_area = ee.Image.pixelArea().mask(high_ndvi).reduceRegion( reducer=ee.Reducer.sum(), geometry=geometry, scale=30, maxPixels=1e9 ).get('area').getInfo() * 0.000247105 # Save areas to CSV try: area_data = { 'farm_id': farm_id, 'village_name': village_name, 'taluk': taluk, 'district': district, 'total_area_acres': round(total_area, 2), 'dark_green_area_acres': round(dark_green_area, 2), 'light_green_area_acres': round(light_green_area, 2), 'orange_area_acres': round(orange_area, 2) } csv_file = 'polygon_areas.csv' df = pd.DataFrame([area_data]) if os.path.exists(csv_file): df.to_csv(csv_file, mode='a', header=False, index=False) else: df.to_csv(csv_file, mode='w', header=True, index=False) print(f"\nArea calculations saved to {csv_file} for Farm ID: {farm_id}") print(f"Total area: {round(total_area, 2)} acres") print(f"Dark green area: {round(dark_green_area, 2)} acres") print(f"Light green area: {round(light_green_area, 2)} acres") print(f"Orange area: {round(orange_area, 2)} acres") except Exception as e: print(f"Error saving area calculations to CSV: {str(e)}") # Print NDVI value distribution # ndvi_stats = matching_band.reduceRegion( # reducer=ee.Reducer.percentile([0, 25, 50, 75, 100]), # geometry=geometry, # scale=30, # maxPixels=1e9 # ) # print("\nNDVI Distribution Statistics:") # print(ndvi_stats.getInfo()) # Create R, G, B bands based on conditions with explicit masking # Orange (255,165,0) for NDVI < 0.3 # Dark green (0,100,0) for 0.3 <= NDVI <= 0.5 # Light green (144,238,144) for NDVI > 0.5 # Create base mask from matching_band base_mask = matching_band.mask() # Apply masks to conditions low_ndvi_masked = low_ndvi.updateMask(base_mask) mid_ndvi_masked = mid_ndvi.updateMask(base_mask) high_ndvi_masked = high_ndvi.updateMask(base_mask) r_band = ee.Image(0) \ .where(low_ndvi_masked, 255) \ .where(mid_ndvi_masked, 0) \ .where(high_ndvi_masked, 144) g_band = ee.Image(0) \ .where(low_ndvi_masked, 165) \ .where(mid_ndvi_masked, 100) \ .where(high_ndvi_masked, 238) b_band = ee.Image(0) \ .where(low_ndvi_masked, 0) \ .where(mid_ndvi_masked, 0) \ .where(high_ndvi_masked, 144) # Combine bands into a single RGB image color_coded = ee.Image.cat([r_band, g_band, b_band]) # Create base folder base_folder = "GEE_Exports" # Export raw NDVI collections print("\nStarting raw NDVI exports...") # Export Landsat 8 NDVI export_ndvi_collection( landsat8, geometry, base_folder, "Raw_NDVI/Landsat8", farm_id, f"L8_NDVI_farm_{farm_id}" ) # Export Landsat 9 NDVI export_ndvi_collection( landsat9, geometry, base_folder, "Raw_NDVI/Landsat9", farm_id, f"L9_NDVI_farm_{farm_id}" ) # Export Sentinel-2 NDVI export_ndvi_collection( sen2, geometry, base_folder, "Raw_NDVI/Sentinel2", farm_id, f"S2_NDVI_farm_{farm_id}" ) # Export monthly composites print("\nStarting monthly composite exports...") monthly_folder = f"{base_folder}/Monthly_Composites/Farm_{farm_id}" for i in range(16): month_num = i + 1 month_band = ndvi_multiband.select(f'NDVI_{month_num:02d}') task = ee.batch.Export.image.toDrive( image=month_band, description=f'monthly_composite_{month_num:02d}_farm_{farm_id}', folder=monthly_folder, scale=30, region=geometry, fileFormat='GeoTIFF', formatOptions={'cloudOptimized': True}, maxPixels=1e9 ) task.start() print(f"Started export for month {month_num:02d} in {monthly_folder}") # Export the final RGB color-coded image as GeoTIFF processed_folder = f"{base_folder}/Processed/Farm_{farm_id}" task = ee.batch.Export.image.toDrive( image=color_coded, description=f'matching_ndvi_pixels_farm_{farm_id}_{taluk}_{district}', folder=processed_folder, scale=30, region=geometry, fileFormat='GeoTIFF', maxPixels=1e9, formatOptions={ 'cloudOptimized': True } ) task.start() print(f"\n[6/6] Export task started successfully for Farm ID: {farm_id}") print(f"\nTotal processing time: {time.time() - start_time:.2f} seconds") return { 'farm_id': farm_id, 'matching_pixels': matching_count.getInfo(), 'total_pixels': total_count.getInfo() } def log_failed_polygon(farm_id, error_message): """Log failed polygon processing to a CSV file.""" try: failed_data = { 'farm_id': farm_id, 'error_message': str(error_message), 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S') } csv_file = 'failed_polygons.csv' df = pd.DataFrame([failed_data]) if os.path.exists(csv_file): df.to_csv(csv_file, mode='a', header=False, index=False) else: df.to_csv(csv_file, mode='w', header=True, index=False) print(f"\nLogged failed polygon {farm_id} to {csv_file}") except Exception as e: print(f"Error logging failed polygon: {str(e)}") def main(start_index, end_index): # start_index =0 # end_index = 100 polygon_num = 0 # Start timing for all polygons total_start_time = time.time() # Load all polygons from GeoJSON polygons = load_polygons() print(f"\nFound {len(polygons)} polygons to process") # Process each polygon results = [] failed_count = 0 for polygon in polygons: polygon_num = polygon_num + 1 if polygon_num >= start_index and polygon_num < end_index: try: # Extract farm_id before processing in case of early failure farm_id = polygon['properties'].get('LOC_CODE', 'Unknown') result = process_single_polygon(polygon) results.append(result) print(f"\nSuccessfully processed polygon {farm_id}") except Exception as e: failed_count += 1 print(f"\nError processing polygon {farm_id}: {str(e)}") log_failed_polygon(farm_id, str(e)) continue # Continue with next polygon # Print final summary successful_count = len(results) total_processed = successful_count + failed_count print("\nProcessing Summary:") print(f"Total polygons processed: {total_processed}") print(f"Successfully processed: {successful_count}") print(f"Failed to process: {failed_count}") print(f"Total processing time: {time.time() - total_start_time:.2f} seconds") if results: print("\nSuccessful results summary:") for result in results: print(f"Farm ID {result['farm_id']}:") print(f"- Matching pixels: {result['matching_pixels']}") print(f"- Total pixels: {result['total_pixels']}") if failed_count > 0: print(f"\nCheck failed_polygons.csv for details on failed processes") # if __name__ == "__main__": #main(1,2)