import os import shutil import time import traceback from datetime import date, datetime, timedelta import firebase_admin import pytz from firebase_admin import credentials, db, firestore from google.cloud import storage from oct2py import octave # Assuming these are custom modules from the project directory from find_dem import find_dem from find_sar import find_sar from gen_geotiff2 import gen_geotiff2 from gen_report_new2 import gen_report_new from get_land_use import get_land_use from get_weather_data import get_weather_data from gmap_image import gmap_image from gmap_image_large import gmap_image_large from map_coords import map_coords from make_dir import make_dir from merge_dem import merge_dem from merge_sar import merge_sar import process_field_flag from satellite_visit_email import send_html_email from search_new_sentinel import search_new_sentinel from send_error_noti import send_error_noti from send_expired_noti import send_expired_noti from send_expiring_noti import send_expiring_noti from send_moni_noti import send_moni_noti from send_webhook_data import send_webhook_data from sendfailedreport import sendfailedreport # --- Configuration --- CONFIG = { "FIREBASE_URL": "https://farmbase-b2f7e-31c0c.firebaseio.com/", "SERVICE_KEY_PATH": "servicekey.json", "GCS_PROJECT_ID": "farmbase-b2f7e", "GCS_BUCKET_NAME": "farmbase-b2f7e.appspot.com", "DISABLED_UIDS": [ "snQYQZqQx3SmVbRztmEqYn5Mkcz2", "KZQ7TZIYXnXN0b07OtrL1hlyYij1", "CeMGYvLXrGR5ZThxZ46iV7vY8sa2", "TCXcp5VIsfhHZrh0nm2VsgBtcGy2", "mFFHQdEtiSbn2hbYQAwwoIdYVi02", "4fPRPyszwLOjweG1qbpiCx3CFQo1", "KYThGMoIlISJG4mVXArK6Y5QPDh1", "hOV9C5CalZc9mIYCQQxplvgAfkG3", "ipRHhCOFIDV2pxgg7Nfz1ufZBmV2", "4fPRPyszwLOjweG1qbpiCx3CFQo1" ], "IMAGE_TYPES": [ "TCI", "ETCI", "ndvi", "evi", "rvi", "rsm", "ndwi", "ndre", "vari", "soc", "savi", "ndmi", "evapo", "avi", "bsi", "si", "dem", "hybrid_blind", "vssi", "lulc", "lai" ], "CMAP2_IMAGE_TYPES": [ "soc_cmap2", "ndre_cmap2", "ndvi_cmap2", "evi_cmap2", "ndwi_cmap2", "vari_cmap2", "savi_cmap2", "avi_cmap2", "bsi_cmap2", "si_cmap2", "ndmi_cmap2", "vssi_cmap2", "lai_cmap2" ], "CHART_IMAGE_TYPES": [ "ndvi_pie", "evi_pie", "ndwi_pie", "ndre_pie", "vari_pie", "savi_pie", "avi_pie", "bsi_pie", "si_pie", "soc_pie", "ndmi_pie", "vssi_pie", "lai_pie", "ndvi_linegraph", "evi_linegraph", "ndwi_linegraph", "ndre_linegraph", "vari_linegraph", "savi_linegraph", "avi_linegraph", "bsi_linegraph", "si_linegraph", "soc_linegraph", "ndmi_linegraph", "vssi_linegraph", "weather_linegraph", "lai_linegraph" ], "TIF_IMAGE_TYPES": [ "tci.tif", "hybrid.tif", "etci.tif", "ndvi.tif", "evi.tif", "soc.tif", "ndre.tif", "vari.tif" ] } # --- Initialization --- def initialize_firebase(): """Initializes the Firebase app, handling cases where it's already initialized.""" if not firebase_admin._apps: try: cred = credentials.Certificate(CONFIG["SERVICE_KEY_PATH"]) firebase_admin.initialize_app(cred, {"databaseURL": CONFIG["FIREBASE_URL"]}) except Exception as e: print(f"Error initializing Firebase: {e}") raise def get_firestore_client(): """Returns a Firestore client instance.""" return firestore.client() def get_storage_client(): """Returns a Google Cloud Storage client instance.""" return storage.Client(project=CONFIG["GCS_PROJECT_ID"]) # --- Date Helper Functions --- def get_date_range_for_search(requested_date_yyyymmdd, days_offset, is_radar=False): """ Calculates a 'from' and 'to' date range for searching satellite imagery. """ base_window = 12 if is_radar else 4 additional_window = 1 window = base_window + (0 if days_offset > 0 else additional_window) to_date = datetime.strptime(requested_date_yyyymmdd, "%Y%m%d") from_date = to_date - timedelta(days=window) from_date_str = from_date.strftime("%Y-%m-%dT00:00:00") to_date_str = to_date.strftime("%Y-%m-%dT23:59:59") return from_date_str, to_date_str def get_today_ist_yyyymmdd(): """Returns the current date in IST as a 'YYYYMMDD' string.""" ist_now = datetime.utcnow() + timedelta(minutes=330) return ist_now.strftime("%Y%m%d") # --- Data Fetching and Pre-check Functions --- def get_fields_to_process(uid, fieldid): """ Determines which fields to process based on the input uid and fieldid. """ if uid in CONFIG["DISABLED_UIDS"]: return {} base_ref = db.reference("PaidMonitoredFields/PMF") if uid is None and fieldid is None: all_pmf = base_ref.get(shallow=True) if not all_pmf: return {} return { u: f for u, f in all_pmf.items() if u not in CONFIG["DISABLED_UIDS"] } elif uid is not None and fieldid is None: user_fields = base_ref.child(uid).get(shallow=True) return {uid: user_fields} if user_fields else {} elif uid is not None and fieldid is not None: return {uid: {fieldid: True}} return {} def is_recently_processed(uid, fieldid, days_threshold=5): """Checks if a field has been processed within a given number of days.""" latest_day_str = db.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/LatestDay").get() if not latest_day_str: return False today_yyyymmdd = date.today().strftime("%Y%m%d") day_diff = int(today_yyyymmdd) - int(latest_day_str) if day_diff < days_threshold: print(f"Field {fieldid} recently processed on {latest_day_str}. Skipping.") return True return False def should_delete_expired_field(uid, fieldid): """ Checks if an expired or free-trial field is old enough to be deleted. """ field_ref = db.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}") field_data = field_ref.get() if not field_data: return False is_expired = int(field_data.get("Expired", 0)) == 1 payment_type = float(field_data.get("PaymentType", 0)) if not (is_expired or payment_type == 0): return False if payment_type == 0: retention_millis = 24 * 60 * 60 * 1000 else: months_to_wait = 1 retention_millis = (abs(int(payment_type)) + months_to_wait) * 30 * 24 * 60 * 60 * 1000 field_creation_time_millis = int(fieldid) current_time_millis = int(round(time.time() * 1000)) if (current_time_millis - field_creation_time_millis) > retention_millis: print(f"Deleting expired field: UID={uid}, FieldID={fieldid}") try: db.reference(f"DeletedFields/PMF/{uid}/{fieldid}").set(field_data) field_ref.delete() return True except Exception: print(f"Error during field deletion: {traceback.format_exc()}") field_ref.delete() return True return False # --- Core Processing Logic --- def get_indices(uid, fieldid, latestday): """Fetches NDVI and NDWI values from Firebase for a given day.""" try: base_ref = db.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/Health") ndvi_val = base_ref.child("ndvi").child(latestday).get() ndwi_val = base_ref.child("ndwi").child(latestday).get() ndvi = float(ndvi_val) / 100 if ndvi_val is not None else 0.0 ndwi = float(ndwi_val) / 100 if ndwi_val is not None else 0.0 print(f"Retrieved Indices for {latestday}: NDVI={ndvi}, NDWI={ndwi}") return ndvi, ndwi except Exception as e: print(f"Could not retrieve indices for {latestday}: {e}") return 0.0, 0.0 def process_single_field(uid, fieldid, session_type): """ Main processing pipeline for a single field. """ field_path = os.path.join(uid, str(fieldid)) if os.path.exists(field_path): shutil.rmtree(field_path) make_dir(uid, fieldid) print(f"Starting processing for UID: {uid}, FieldID: {fieldid}") if should_delete_expired_field(uid, fieldid): return processing_flags = process_field_flag.process_field(uid, fieldid) if not processing_flags.get("process_field"): print("Skipping field based on processing flags.") return try: field_ref = db.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}") field_data = field_ref.get() if not field_data: print("Field data not found.") return satellite_processed = False if not is_recently_processed(uid, fieldid) and session_type == "main": if not field_data.get("PreviousDataRequests"): print("Performing regular satellite data processing.") run_satellite_analysis(uid, fieldid, field_data, None, None, None, None) satellite_processed = True previous_requests = field_data.get("PreviousDataRequests") if previous_requests and session_type in ["bulk", "api", "main"]: for req_date in list(previous_requests.keys()): print(f"Processing historical request for date: {req_date}") process_historical_request_with_retries(uid, fieldid, field_data, req_date) satellite_processed = True field_ref.child("PreviousDataRequests").child(req_date).delete() if satellite_processed: update_usage_counters(uid, processing_flags, session_type) except Exception: print(f"An unexpected error occurred in process_single_field: {traceback.format_exc()}") finally: if os.path.exists(field_path): shutil.rmtree(field_path) def process_historical_request_with_retries(uid, fieldid, field_data, requested_date, max_attempts=3): """Processes a single historical request with a retry mechanism.""" from_date, to_date = get_date_range_for_search(requested_date, 0, is_radar=False) radar_from_date, radar_to_date = get_date_range_for_search(requested_date, 0, is_radar=True) for attempt in range(max_attempts): try: run_satellite_analysis( uid, fieldid, field_data, from_date=from_date, to_date=to_date, radar_from_date=radar_from_date, radar_to_date=radar_to_date ) print(f"Successfully processed historical request for {requested_date} on attempt {attempt + 1}.") return except Exception: print(f"Attempt {attempt + 1}/{max_attempts} failed for historical request {requested_date}.") print(traceback.format_exc()) if attempt < max_attempts - 1: time.sleep(10) print(f"All {max_attempts} attempts failed for historical request {requested_date}.") def run_satellite_analysis(uid, fieldid, field_data, from_date, to_date, radar_from_date, radar_to_date): """ The core function that searches for satellite data, generates images, reports, and uploads artifacts. """ sentinel_settings = db.reference("SentinelSettings4").get() today_str = date.today().strftime("%Y%m%d") is_historical_req = from_date is not None sensed_days = field_data.get("SensedDays", {}) sar_days = field_data.get("SARDays", {}) dem_days = field_data.get("DEMDays", {}) latest_sensed_day = max(sensed_days.keys()) if sensed_days else "0" latest_rvi_day = max(sar_days.keys()) if sar_days else "0" latest_dem_day = max(dem_days.keys()) if dem_days else "0" if is_historical_req: latest_sensed_day = latest_rvi_day = latest_dem_day = "0" time_since_last_run = int(today_str) - int(latest_sensed_day) should_search = time_since_last_run > 4 or is_historical_req if not should_search: print("Skipping satellite search: field was processed recently.") return print("Searching for new satellite data...") latest_failed_day = max(field_data.get("FailedDays", {}).keys(), default="0") is_expired = int(field_data.get("Expired", 0)) is_paused = 1 if field_data.get("Paused") == "yes" else 0 client_id = sentinel_settings["ClientID"] client_secret = sentinel_settings["ClientSecret"] optical_search_json = search_new_sentinel( uid, fieldid, field_data, latest_sensed_day, latest_failed_day, is_expired, is_paused, from_date, to_date, client_id, client_secret, sentinel_settings["WMSID"] ) sar_search_json = find_sar( uid, fieldid, field_data, latest_rvi_day, is_expired, is_paused, radar_from_date, radar_to_date, client_id, client_secret, sentinel_settings["RVIID"] ) dem_search_json = find_dem( uid, fieldid, field_data, latest_dem_day, is_expired, is_paused, from_date, to_date, client_id, client_secret, sentinel_settings["DEMID"] ) get_land_use( uid, fieldid, field_data, latest_sensed_day, latest_failed_day, is_expired, is_paused, from_date, to_date, client_id, client_secret, sentinel_settings["WMSID"] ) imagedate = str(optical_search_json.get("LatestDay")) new_sar_date = str(sar_search_json.get("LatestDay", "0")).replace("-", "") new_dem_date = str(dem_search_json.get("LatestDay", "0")).replace("-", "") if not imagedate or imagedate == "0" or imagedate == "None": print("No new image date found from search. Aborting run.") return if imagedate in sensed_days or imagedate in field_data.get("FailedDays", {}): print(f"Image date {imagedate} has already been processed/failed. Aborting.") return result_status = "failed_pre_check" if optical_search_json.get("MGRS") != "NotA": generate_static_map_images(uid, fieldid, field_data, optical_search_json) print("Invoking Octave to make general maps...") octave.make_gen_maps(uid, fieldid) mgrs_tile = optical_search_json.get("MGRS") previous_day_for_tile = "11" # Default value if mgrs_tile: try: fetched_day = db.reference(f"LatestTileDates/{mgrs_tile}").get() if fetched_day: previous_day_for_tile = fetched_day except Exception as e: print(f"Could not fetch previous day for MGRS tile '{mgrs_tile}': {e}") db.reference(f"LatestTileDates/{str(mgrs_tile)}").set(imagedate) image_gen_params = { **optical_search_json, "FieldID": fieldid, "PlantDistance": field_data.get("PlantDistance", 0), "StartPixelLat": 180, "EndPixelLat": 180, "StartPixelLong": 180, "EndPixelLong": 180, "PreviousDay": previous_day_for_tile } print("Invoking Octave script to generate field images...") result_status = octave.monitored_field2022(image_gen_params) if "successful" in result_status: print("Image generation successful. Proceeding to reporting and uploading.") process_successful_run(uid, fieldid, field_data, imagedate, optical_search_json) send_webhook_data(uid, fieldid, imagedate) send_moni_noti(uid, field_data.get("FieldDescription", "N/A"), "") else: print(f"Image generation failed with status: {result_status}. Processing failure.") process_failed_run(uid, fieldid, field_data, imagedate, result_status) if int(new_sar_date) > int(latest_rvi_day.replace("-", "")): process_and_upload_sar_data(uid, fieldid, field_data, new_sar_date, optical_search_json) if int(new_dem_date) > int(latest_dem_day.replace("-", "")): process_and_upload_dem_data(uid, fieldid, field_data, new_dem_date, optical_search_json) # --- Sub-processing Functions --- def process_successful_run(uid, fieldid, field_data, imagedate, search_json): """Handles all tasks after a successful image generation.""" storage_client = get_storage_client() bucket = storage_client.get_bucket(CONFIG["GCS_BUCKET_NAME"]) map_coords( uid, fieldid, field_data.get("Coordinates"), imagedate, search_json["FieldMaxLat"], search_json["FieldMinLat"], search_json["FieldMaxLong"], search_json["FieldMinLong"], field_data.get("CropCode", -1), CONFIG["IMAGE_TYPES"], field_data.get("FieldArea") ) ndvi, ndwi = get_indices(uid, fieldid, imagedate) if field_data.get("GenTif") in ["yes", "abs_yes"]: for tif_type in CONFIG["IMAGE_TYPES"]: gen_geotiff2(search_json, uid, tif_type, f"{tif_type}.tif") report_obj = gen_report_new( uid, field_data.get("FieldAddress", "N/A"), search_json.get("CenterLat"), search_json.get("CenterLong"), imagedate, fieldid, field_data.get("FieldArea"), field_data.get("Language", "en"), 0, # Success field_data.get("Whitelabel", "farmonaut"), [search_json["FieldMinLat"], search_json["FieldMinLong"], search_json["FieldMaxLat"], search_json["FieldMaxLong"]], field_data.get("Coordinates"), None, ndvi, ndwi, field_data ) upload_all_artifacts(uid, fieldid, imagedate, bucket) db.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/LatestDay").set(imagedate) if report_obj: db.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/SensedDays/{imagedate}").set(report_obj) try: firestore_client = get_firestore_client() doc_ref = firestore_client.collection(uid).document(str(fieldid)) doc_ref.set({"LatestSensedDay": imagedate}, merge=True) except Exception as e: print(f"Warning: Failed to update Firestore document {uid}/{fieldid}: {e}") send_html_email( field_data.get("Email", "NA"), "Farmer", field_data.get("FieldAddress", "N/A"), " ", imagedate, " ", field_data.get("PaymentType", 0), uid, fieldid, field_data.get("Language", "en") ) def process_failed_run(uid, fieldid, field_data, imagedate, failure_status): """Handles tasks for a failed image generation run.""" print(f"Processing failure for field {fieldid}. Status: {failure_status}") db.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/FailedDays/{imagedate}").set(failure_status) if "failed1" in failure_status: report_obj = gen_report_new( uid, field_data.get("FieldAddress", "N/A"), field_data.get("CenterLat"), field_data.get("CenterLong"), imagedate, fieldid, field_data.get("FieldArea"), field_data.get("Language", "en"), 1, # Failure field_data.get("Whitelabel", "farmonaut"), [], field_data.get("Coordinates"), None, 0, 0, field_data ) db.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/FailedDays/{imagedate}").set(report_obj) sendfailedreport([], field_data.get("Email", "NA"), field_data.get("FieldAddress", "N/A"), imagedate, field_data.get("Whitelabel", "farmonaut"), None, None) send_error_noti(uid) def process_and_upload_sar_data(uid, fieldid, field_data, sar_date, search_json): """Processes and uploads SAR (RVI/RSM) images.""" print(f"Processing new SAR data for date: {sar_date}") try: merge_sar( uid, fieldid, field_data.get("Coordinates"), sar_date, search_json["FieldMaxLat"], search_json["FieldMinLat"], search_json["FieldMaxLong"], search_json["FieldMinLong"] ) bucket = get_storage_client().get_bucket(CONFIG["GCS_BUCKET_NAME"]) for img_type in ["rvi", "rsm"]: local_path = os.path.join(uid, fieldid, f"{img_type}.png") if not os.path.exists(local_path): continue gcs_path = f"PaidMonitoredFields/{uid}/{fieldid}/{sar_date}/{img_type}" bucket.blob(gcs_path).upload_from_filename(local_path) db.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/SARDays/{sar_date}").set("yes") except Exception: print(f"Error processing SAR data: {traceback.format_exc()}") def process_and_upload_dem_data(uid, fieldid, field_data, dem_date, search_json): """Processes and uploads DEM images.""" print(f"Processing new DEM data for date: {dem_date}") try: merge_dem( uid, fieldid, field_data.get("Coordinates"), dem_date, search_json["FieldMaxLat"], search_json["FieldMinLat"], search_json["FieldMaxLong"], search_json["FieldMinLong"] ) bucket = get_storage_client().get_bucket(CONFIG["GCS_BUCKET_NAME"]) local_path = os.path.join(uid, fieldid, "dem.png") if not os.path.exists(local_path): return gcs_path = f"PaidMonitoredFields/{uid}/{fieldid}/{dem_date}/dem" bucket.blob(gcs_path).upload_from_filename(local_path) db.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/DEMDays/{dem_date}").set("yes") except Exception: print(f"Error processing DEM data: {traceback.format_exc()}") # --- Utility and Helper Functions --- def generate_static_map_images(uid, fieldid, field_data, search_json): """Generates and uploads the small and large static map images if they don't exist.""" if not os.path.exists(os.path.join(uid, fieldid, "_static_map_large.png")): print("Generating large static map.") latlen, longlen, clat, clong = gmap_image_large( search_json["FieldMinLat"], search_json["FieldMaxLat"], search_json["FieldMinLong"], search_json["FieldMaxLong"], search_json["TileMaxLat"], search_json["TileMinLat"], search_json["TileMaxLong"], search_json["TileMinLong"], fieldid, uid, field_data.get("FieldArea") ) db.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}").update({ "FieldLatLenLarge": latlen, "FieldLongLenLarge": longlen, "CenterLatLarge": clat, "CenterLongLarge": clong }) if not os.path.exists(os.path.join(uid, fieldid, "_static_map.png")): print("Generating small static map.") latlen, longlen, clat, clong = gmap_image( search_json["FieldMinLat"], search_json["FieldMaxLat"], search_json["FieldMinLong"], search_json["FieldMaxLong"], search_json["TileMaxLat"], search_json["TileMinLat"], search_json["TileMaxLong"], search_json["TileMinLong"], fieldid, uid, field_data.get("FieldArea") ) db.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}").update({ "FieldLatLen": latlen, "FieldLongLen": longlen, "CenterLat": clat, "CenterLong": clong }) def upload_all_artifacts(uid, fieldid, imagedate, bucket): """Finds all generated images and reports and uploads them to GCS.""" print("Uploading all generated artifacts to Google Cloud Storage...") local_field_dir = os.path.join(uid, str(fieldid)) all_image_files = CONFIG["IMAGE_TYPES"] + CONFIG["CMAP2_IMAGE_TYPES"] + CONFIG["CHART_IMAGE_TYPES"] all_image_files.append("mask_img") for file_base_name in all_image_files: local_path = os.path.join(local_field_dir, f"{file_base_name}.png") if os.path.exists(local_path): gcs_path = f"PaidMonitoredFields/{uid}/{fieldid}/{imagedate}/{file_base_name}" try: bucket.blob(gcs_path).upload_from_filename(local_path) except Exception: print(f"Warning: Failed to upload {local_path}") for tif_name in CONFIG["TIF_IMAGE_TYPES"]: local_path = os.path.join(local_field_dir, tif_name) if os.path.exists(local_path): gcs_path = f"PaidMonitoredFields/{uid}/{fieldid}/{imagedate}/{tif_name}" bucket.blob(gcs_path).upload_from_filename(local_path) for report_ext in ["html", "pdf"]: local_path = os.path.join(local_field_dir, f"report.{report_ext}") if os.path.exists(local_path): gcs_path = f"PaidMonitoredFields/{uid}/{fieldid}/{imagedate}/report.{report_ext}" bucket.blob(gcs_path).upload_from_filename(local_path) def update_usage_counters(uid, processing_flags, session_type): """Updates API credits or satellite visit counts after processing.""" process_as = processing_flags.get("process_as") if process_as == 2: # Subscription based visits_needed = processing_flags.get("visits_needed", 0) user_obj = processing_flags.get("user_obj", {}) pre_visits = user_obj.get("TotalSatelliteVisits", 0) user_visits_ref = db.reference(f"/{uid}/MyProfile/TotalSatelliteVisits", app=process_field_flag.default_app_2) user_visits_ref.set(pre_visits + visits_needed) print(f"Updated TotalSatelliteVisits for {uid} by {visits_needed}.") elif process_as == 3 and session_type == "bulk": # API based credits_needed = processing_flags.get("credits_needed", 0) org_api_obj = processing_flags.get("org_api_obj", {}) org_api_obj["UsedUnits"] = int(org_api_obj.get("UsedUnits", 0)) + credits_needed org_api_obj["remainingUnits"] = int(org_api_obj.get("remainingUnits", 0)) - credits_needed db.reference(f"Organizations/{uid}").set(org_api_obj) print(f"Updated API credits for organization {uid} by {credits_needed}.") # --- Main Entry Point --- def server2024(uid=None, fieldid=None, session_type="main"): """ Main function to process satellite images for one or more fields. """ try: initialize_firebase() uid_list = get_fields_to_process(uid, fieldid) print(f"Found {sum(len(f) for f in uid_list.values()) if uid_list else 0} fields to process across {len(uid_list) if uid_list else 0} users.") if not uid_list: return for user_id, fields in uid_list.items(): if not fields: continue for field_id in fields: try: process_single_field(user_id, str(field_id), session_type) except Exception: print(f"--- FATAL ERROR processing {user_id}/{field_id} ---") print(traceback.format_exc()) print("--- Continuing to next field ---") except Exception: print("--- A CRITICAL-LEVEL ERROR OCCURRED IN server2024 ---") print(traceback.format_exc()) # if __name__ == "__main__": # print("Running server2024 in standalone mode.") # server2024(session_type="main") # print("Script finished.")