import os import json import glob from PIL import Image import numpy as np from google.cloud import storage import firebase_admin from firebase_admin import credentials, db from collections import Counter import logging # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class ImageProcessor: def __init__(self, firebase_cred_path, gcs_bucket_name, firebase_db_url): """ Initialize the processor with Firebase and GCS credentials Args: firebase_cred_path: Path to Firebase service account JSON file gcs_bucket_name: Name of the GCS bucket firebase_db_url: Firebase Realtime Database URL """ # Initialize Firebase cred = credentials.Certificate(firebase_cred_path) firebase_admin.initialize_app(cred, {'databaseURL': firebase_db_url}) # Initialize GCS client self.storage_client = storage.Client() self.bucket = self.storage_client.bucket(gcs_bucket_name) # Hex code to value mapping self.hex_values = { '#06653d': 0.9, '#11a75f': 0.8, '#81bf6c': 0.7, '#bae383': 0.6, '#e6f3a4': 0.5, '#fff0b5': 0.4, '#fbc07e': 0.3, '#f7885a': 0.2, '#ea4f3b': 0.1, '#ab0535': 0.0 } # Normalize hex codes (remove # and convert to lowercase) self.normalized_hex_values = { hex_code.replace('#', '').lower(): value for hex_code, value in self.hex_values.items() } self.date_folder = "20250528" def load_polygon_field_mapping(self, json_file_path): """Load the polygon_id to field_id mapping from JSON file""" try: with open(json_file_path, 'r') as f: return json.load(f) except FileNotFoundError: logger.error(f"JSON file not found: {json_file_path}") return {} except json.JSONDecodeError: logger.error(f"Invalid JSON file: {json_file_path}") return {} def rgb_to_hex(self, rgb): """Convert RGB tuple to hex string""" return f"{rgb[0]:02x}{rgb[1]:02x}{rgb[2]:02x}" def calculate_image_average(self, image_path): """ Calculate average value for an image based on hex code mapping Args: image_path: Path to the image file Returns: float: Average value rounded to 2 decimal places, or None if no valid pixels """ try: # Open and convert image to RGB with Image.open(image_path) as img: img_rgb = img.convert('RGB') pixels = np.array(img_rgb) # Reshape to get all pixels pixel_colors = pixels.reshape(-1, 3) valid_values = [] # Process each pixel for pixel in pixel_colors: hex_color = self.rgb_to_hex(tuple(pixel)) if hex_color in self.normalized_hex_values: valid_values.append(self.normalized_hex_values[hex_color]) if not valid_values: logger.warning(f"No valid pixels found in {image_path}") return None average = sum(valid_values) / len(valid_values) return round(average, 2) except Exception as e: logger.error(f"Error processing image {image_path}: {str(e)}") return None def upload_to_gcs(self, local_file_path, gcs_path): """Upload a file to Google Cloud Storage""" try: blob = self.bucket.blob(gcs_path) blob.upload_from_filename(local_file_path) logger.info(f"Uploaded {local_file_path} to gs://{self.bucket.name}/{gcs_path}") return True except Exception as e: logger.error(f"Failed to upload {local_file_path} to GCS: {str(e)}") return False def update_firebase_sensed_days(self, user_id, field_id, polygon_id): """Update Firebase Realtime Database - SensedDays location""" try: ref_path = f"/PaidMonitoredFields/PMF/{user_id}/{field_id}/Polygons/{polygon_id}/SensedDays/{self.date_folder}" ref = db.reference(ref_path) ref.set("yes") logger.info(f"Updated Firebase SensedDays: {ref_path}") return True except Exception as e: logger.error(f"Failed to update Firebase SensedDays: {str(e)}") return False def update_firebase_health(self, user_id, field_id, polygon_id, image_type, average_value): """Update Firebase Realtime Database - Health location""" try: ref_path = f"/PaidMonitoredFields/PMF/{user_id}/{field_id}/Polygons/{polygon_id}/Health/{self.date_folder}/{image_type}" ref = db.reference(ref_path) ref.set(average_value) logger.info(f"Updated Firebase Health: {ref_path} = {average_value}") return True except Exception as e: logger.error(f"Failed to update Firebase Health: {str(e)}") return False def process_folder(self, base_folder_path, json_file_path): """ Process the main folder containing polygon subfolders Args: base_folder_path: Path to TCXcp5VIsfhHZrh0nm2VsgBtcGy2 folder json_file_path: Path to full_gavl_polygons_obj.json file """ # Load polygon to field mapping polygon_field_mapping = self.load_polygon_field_mapping(json_file_path) if not polygon_field_mapping: logger.error("No polygon-field mapping found. Exiting.") return # Get user_id from folder name user_id = os.path.basename(base_folder_path) # Get all polygon folders polygon_folders = [d for d in os.listdir(base_folder_path) if os.path.isdir(os.path.join(base_folder_path, d))] logger.info(f"Found {len(polygon_folders)} polygon folders to process") # Process each polygon folder for polygon_id in polygon_folders: polygon_path = os.path.join(base_folder_path, polygon_id) # Get field_id from mapping field_id = polygon_field_mapping.get(polygon_id) if not field_id: logger.warning(f"No field_id found for polygon_id: {polygon_id}") continue logger.info(f"Processing polygon {polygon_id} -> field {field_id}") # Update Firebase SensedDays self.update_firebase_sensed_days(user_id, field_id, polygon_id) # Process images in the polygon folder image_types = ['NDRE.png', 'NDVI.png', 'NDWI.png', 'RECL.png'] for image_type in image_types: image_path = os.path.join(polygon_path, image_type) if os.path.exists(image_path): logger.info(f"Processing {image_type} for polygon {polygon_id}") # Upload to GCS gcs_path = f"PaidMonitoredFieldsPolygons/{user_id}/{field_id}/{polygon_id}/{self.date_folder}/{image_type}" upload_success = self.upload_to_gcs(image_path, gcs_path) # Calculate average and update Firebase Health if upload_success: average_value = self.calculate_image_average(image_path) if average_value is not None: image_name = image_type.replace('.png', '') self.update_firebase_health(user_id, field_id, polygon_id, image_name, average_value) else: logger.warning(f"Could not calculate average for {image_path}") else: logger.info(f"Image {image_type} not found in {polygon_path}") # Upload all other files in the polygon folder all_files = glob.glob(os.path.join(polygon_path, '*')) for file_path in all_files: if os.path.isfile(file_path): file_name = os.path.basename(file_path) if file_name not in image_types: # Skip already processed images gcs_path = f"PaidMonitoredFieldsPolygons/{user_id}/{field_id}/{polygon_id}/{self.date_folder}/{file_name}" self.upload_to_gcs(file_path, gcs_path) def main(): # Configuration - Update these paths according to your setup FIREBASE_CRED_PATH = "servicekey.json" GCS_BUCKET_NAME = "farmbase-b2f7e.appspot.com" FIREBASE_DB_URL = "https://farmbase-b2f7e-31c0c.firebaseio.com/" BASE_FOLDER_PATH = "TCXcp5VIsfhHZrh0nm2VsgBtcGy2" JSON_FILE_PATH = "full_gavl_polygons_obj.json" try: # Initialize processor processor = ImageProcessor(FIREBASE_CRED_PATH, GCS_BUCKET_NAME, FIREBASE_DB_URL) # Process the folder processor.process_folder(BASE_FOLDER_PATH, JSON_FILE_PATH) logger.info("Processing completed successfully!") except Exception as e: logger.error(f"Error in main execution: {str(e)}") if __name__ == "__main__": main()