import os import time import traceback from datetime import date, datetime import firebase_admin from firebase_admin import credentials, db, firestore from google.cloud import storage from PIL import Image from oct2py import octave # =============== Custom (Used) Modules =============== from send_webhook_data import send_webhook_data from process_field_flag import process_field, db as process_db, default_app_2 from send_expiring_noti import send_expiring_noti from send_expired_noti import send_expired_noti from send_moni_noti import send_moni_noti from send_error_noti import send_error_noti from gmap_image_large import gmap_image_large from gmap_image import gmap_image from find_sar import find_sar from find_dem import find_dem from search_new_sentinel import search_new_sentinel from merge_sar import merge_sar from merge_dem import merge_dem from get_weather_data import get_weather_data from get_land_use import get_land_use def server2024(user_id, field_id): """ Main function to process and update field data using Sentinel satellite imagery, DEM, SAR data, weather data, etc. It updates Firestore/Realtime database entries, generates field images (via Octave), and handles data uploads to Firebase storage. :param user_id: The user ID for which fields are to be processed (string) :param field_id: The field ID to process (string); None if all fields for a user. """ # ===================== Basic Setup ===================== today_str = date.today().strftime("%Y%m%d") # e.g. '20250108' # Try initializing Firebase if not already try: firebase_admin.initialize_app( credentials.Certificate("servicekey.json"), {"databaseURL": "https://farmbase-b2f7e-31c0c.firebaseio.com/"} ) except Exception as e: print(f"Firebase app initialization error (possibly already initialized): {e}") # Set up Firestore & Storage firestore_db = firestore.client() storage_client = storage.Client(project="farmbase-b2f7e") bucket_name = "farmbase-b2f7e.appspot.com" # Load Sentinel settings from Firebase Realtime DB sentinel_settings = db.reference("SentinelSettings4").get() or {} # List of disabled users disabled_users = [ "snQYQZqQx3SmVbRztmEqYn5Mkcz2", "KZQ7TZIYXnXN0b07OtrL1hlyYij1", "CeMGYvLXrGR5ZThxZ46iV7vY8sa2", "TCXcp5VIsfhHZrh0nm2VsgBtcGy2", "mFFHQdEtiSbn2hbYQAwwoIdYVi02", "4fPRPyszwLOjweG1qbpiCx3CFQo1", ] # ===================== Build a Dictionary of Users & Fields to Process ===================== if user_id not in disabled_users: if user_id is None and field_id is None: # Get ALL users' paid monitored fields all_users_fields = db.reference("PaidMonitoredFields").child("PMF").get(False, True) or {} all_users_dict = {} for each_uid, fields_obj in all_users_fields.items(): if each_uid not in disabled_users: user_fields = db.reference("PaidMonitoredFields").child("PMF").child(each_uid).get(False, True) all_users_dict[each_uid] = user_fields elif user_id is not None and field_id is None: # Get all fields for this user user_fields = db.reference("PaidMonitoredFields").child("PMF").child(user_id).get(False, True) or {} all_users_dict = {user_id: user_fields} else: # Get a specific field single_field_obj = ( db.reference("PaidMonitoredFields") .child("PMF") .child(user_id) .child(field_id) .get(False, True) ) field_data = {field_id: single_field_obj} all_users_dict = {user_id: field_data} else: # No processing if user is disabled all_users_dict = {} # ===================== Helper Functions ===================== def get_to_and_from_date(requested_yyyymmdd, latest_sensed_day_exists): """ Convert requested_yyyymmdd (str: YYYYMMDD) into from_date & to_date ~4-5 days before that date. :param requested_yyyymmdd: a string in YYYYMMDD format :param latest_sensed_day_exists: integer (0 or 1) indicating if there's a latest sensed day :return: from_date (ISO datetime), to_date (ISO datetime) """ year = int(requested_yyyymmdd[:4]) month = int(requested_yyyymmdd[4:6]) day = int(requested_yyyymmdd[6:]) adjusted_year = year if day < 5 and month > 1: shift = 5 if latest_sensed_day_exists == 0 else 4 new_day = 31 - day - shift new_month = month - 1 elif day < 5 and month == 1: shift = 5 if latest_sensed_day_exists == 0 else 4 new_day = 31 - day - shift new_month = 12 adjusted_year -= 1 else: shift = 5 if latest_sensed_day_exists == 0 else 4 new_day = day - shift new_month = month if new_day < 1: new_day = 1 if day < 1: day = 1 # Zero-pad new_month_str = f"{new_month:02d}" new_day_str = f"{new_day:02d}" month_str = f"{month:02d}" day_str = f"{day:02d}" from_date = f"{adjusted_year}-{new_month_str}-{new_day_str}T00:00:00" to_date = f"{year}-{month_str}-{day_str}T23:59:59" return from_date, to_date def is_data_recently_processed(check_uid, check_field_id): """ Checks if data was processed in the last 5 days for the given user & field. :return: True if processed <5 days ago, False otherwise """ latest_day = ( db.reference("PaidMonitoredFields") .child("PMF") .child(check_uid) .child(check_field_id) .child("LatestDay") .get() ) current_day_str = date.today().strftime("%Y%m%d") if latest_day is not None: day_diff = int(current_day_str) - int(latest_day) if day_diff < 5: print(f"Field recently processed. Latest: {latest_day}, diff: {day_diff}") return True return False def can_process_field(check_uid, check_field_id): """ Checks if the field is not expired or paused, and calls process_field(...) to determine if advanced conditions are met. :return: dict with "process_field", "update_counter", "total_satellite_visits" """ expired_val = ( db.reference("PaidMonitoredFields") .child("PMF") .child(check_uid) .child(check_field_id) .child("Expired") .get() or "0" ) paused_val = ( db.reference("PaidMonitoredFields") .child("PMF") .child(check_uid) .child(check_field_id) .child("Paused") .get() or "no" ) try: expired_val = int(expired_val) except Exception: expired_val = 0 is_paused = 1 if paused_val.lower() == "yes" else 0 # If farm is expired or paused => do not process if expired_val != 0 or is_paused: print(f"Cannot process: Expired={expired_val}, Paused={is_paused}") return {"process_field": False, "update_counter": False} # Else check further via process_field_flag result_check = process_field(check_uid, check_field_id) print(f"Process check result: {result_check}") return result_check def remove_field_if_expired(del_uid, del_field_id): """ If a field is expired or never paid, and meets the time threshold, delete it from PaidMonitoredFields and move it to DeletedFields. Returns 1 if deleted, 0 if not. """ field_expired_val = ( db.reference("PaidMonitoredFields") .child("PMF") .child(del_uid) .child(del_field_id) .child("Expired") .get() or 0 ) try: field_expired_val = int(field_expired_val) except Exception: field_expired_val = 0 payment_type_val = ( db.reference("PaidMonitoredFields") .child("PMF") .child(del_uid) .child(del_field_id) .child("PaymentType") .get() or 0 ) try: payment_type_val = float(payment_type_val) except Exception: payment_type_val = 0 # Grace period months_to_deletion = 1 # If never paid or subscription ended if payment_type_val != -2 and (field_expired_val == 1 or payment_type_val == 0): if payment_type_val == 0: # 1 day grace period for never-paid pay_millis = 24 * 60 * 60 * 1000 else: try: pay_millis = int((payment_type_val + months_to_deletion) * 30 * 24 * 60 * 60 * 1000) except Exception: pay_millis = months_to_deletion * 30 * 24 * 60 * 60 * 100 try: # If enough time has passed since creation if (int(round(time.time() * 1000)) - int(del_field_id)) > pay_millis: print(f"Deleting field {del_uid}-{del_field_id} from PaidMonitoredFields...") field_snapshot = ( db.reference("PaidMonitoredFields") .child("PMF") .child(del_uid) .child(del_field_id) .get() ) try: db.reference("DeletedFields").child("PMF").child(del_uid).child(del_field_id).set(field_snapshot) except Exception: pass db.reference("PaidMonitoredFields").child("PMF").child(del_uid).child(del_field_id).delete() return 1 except Exception: print(traceback.format_exc()) # On error, still delete db.reference("PaidMonitoredFields").child("PMF").child(del_uid).child(del_field_id).delete() return 1 return 0 def satellite_data_processing( current_uid, current_lang, current_field_id, from_date, to_date, sentinel_config ): """ Main logic to fetch and process satellite data (Sentinel, SAR, DEM) for a given field. Generates images via Octave, merges SAR/DEM, uploads images to Firebase Storage, and updates Firestore/Realtime Database accordingly. """ # Sentinel config client_id = sentinel_config.get("ClientID", "") client_secret = sentinel_config.get("ClientSecret", "") wms_id = sentinel_config.get("WMSID", "") rvi_id = sentinel_config.get("RVIID", "") dem_id = sentinel_config.get("DEMID", "") field_data = ( db.reference("PaidMonitoredFields") .child("PMF") .child(current_uid) .child(current_field_id) .get() or {} ) # Basic checks & metadata is_paid = 0 if field_data.get("Paid", "").lower() == "yes": is_paid = 1 is_paused = 1 if (field_data.get("Paused", "no").lower() == "yes") else 0 try: is_expired = 1 if int(field_data.get("Expired", 0)) != 0 else 0 except Exception: is_expired = 0 # Potentially relevant existing days sensed_days = field_data.get("SensedDays", {}) failed_days = field_data.get("FailedDays", {}) # If from_date is provided, reset trackers to 0 to re-check latest_sensed_day_str = 0 latest_failed_day_str = 0 latest_rvi_day_str = 0 latest_dem_day_str = 0 if not from_date: # If from_date is None => normal run => see what's in DB if sensed_days: for day_key in sensed_days.keys(): if int(day_key) > int(latest_sensed_day_str): latest_sensed_day_str = day_key if failed_days: for day_key in failed_days.keys(): if int(day_key) > int(latest_failed_day_str): latest_failed_day_str = day_key # If user is paid, active, and enough days have passed since last sense today_str_local = date.today().strftime("%Y%m%d") process_new_data = ( is_paid == 1 and is_paused == 0 and is_expired == 0 and (int(today_str_local) - int(latest_sensed_day_str) > 4) ) image_date_str = "0" if process_new_data: # SAR, DEM, Land Use logic sar_data = find_sar( current_uid, current_field_id, field_data, latest_rvi_day_str, is_expired, is_paused, from_date, to_date, client_id, client_secret, rvi_id, ) latest_rvi_day_str = sar_data["LatestDay"] find_dem( current_uid, current_field_id, field_data, latest_dem_day_str, is_expired, is_paused, from_date, to_date, client_id, client_secret, dem_id ) get_land_use( current_uid, current_field_id, field_data, latest_sensed_day_str, latest_failed_day_str, is_expired, is_paused, from_date, to_date, client_id, client_secret, wms_id ) sentinel_search = search_new_sentinel( current_uid, current_field_id, field_data, latest_sensed_day_str, latest_failed_day_str, is_expired, is_paused, from_date, to_date, client_id, client_secret, wms_id, ) image_date_str = str(sentinel_search["LatestDay"]) mgrs_id = sentinel_search["MGRS"] # Update the tile date if newer try: previous_tile_date = db.reference("LatestTileDates").child(mgrs_id).get() if previous_tile_date is None: db.reference("LatestTileDates").child(str(mgrs_id)).set(image_date_str) except Exception: pass # If new date is found, check for duplication in SensedDays/FailedDays if image_date_str != "0": if image_date_str in failed_days: print(f"Already failed for {image_date_str}. Skipping.") return if image_date_str in sensed_days: print(f"Already sensed for {image_date_str}. Skipping.") return # If not expired => get weather data if is_expired == 0: try: lat_min = field_data.get("FieldMinLat", 0) lon_min = field_data.get("FieldMinLong", 0) get_weather_data(lat_min, lon_min, today_str_local, current_uid, current_field_id) except Exception: pass # For debugging a pattern (optional) # Pattern: [sensed_day_flag=0, paid=1, failed_day_flag=0, big_query=1, paused=0, expired=0] => "010100" # We'll skip that here; you can add any pattern check if needed. # =============== Generate Field Images via Octave =============== field_area_val = float(field_data.get("FieldArea", 0)) large_map_file = f"{current_uid}/{current_field_id}/_static_map_large.png" normal_map_file = f"{current_uid}/{current_field_id}/_static_map.png" # Create directories if not os.path.exists(current_uid): os.makedirs(current_uid) if not os.path.exists(f"{current_uid}/{current_field_id}"): os.makedirs(f"{current_uid}/{current_field_id}") # Generate large static map if missing if not os.path.exists(large_map_file): try: gmap_image_large( field_data.get("FieldMinLat", 0), field_data.get("FieldMaxLat", 0), field_data.get("FieldMinLong", 0), field_data.get("FieldMaxLong", 0), field_data.get("TileMaxLat", 0), field_data.get("TileMinLat", 0), field_data.get("TileMaxLong", 0), field_data.get("TileMinLong", 0), current_field_id, current_uid, field_area_val, ) except Exception: pass # Generate normal static map if missing if not os.path.exists(normal_map_file): try: gmap_image( field_data.get("FieldMinLat", 0), field_data.get("FieldMaxLat", 0), field_data.get("FieldMinLong", 0), field_data.get("FieldMaxLong", 0), field_data.get("TileMaxLat", 0), field_data.get("TileMinLat", 0), field_data.get("TileMaxLong", 0), field_data.get("TileMinLong", 0), current_field_id, current_uid, field_area_val, ) except Exception: pass # Prepare JSON for Octave function def make_field_images(param_json): return octave.monitored_field2022(param_json) param_json = { "LatestDay": image_date_str, "PreviousDay": field_data.get("PreviousDay", None), "MGRS": field_data.get("MGRS", "N/A"), "FieldID": current_field_id, "PlantDistance": field_data.get("PlantDistance", 0), "FieldArea": field_area_val / 10000.0, "StartPixelLat": 180, "EndPixelLat": 180, "StartPixelLong": 180, "EndPixelLong": 180, } result_status = make_field_images(param_json) # Merge SAR/DEM if new RVI day is found try: storage_bucket = storage_client.get_bucket(bucket_name) if int(result_status.get("LatestRVIDay", 0)) > 0: # Merge SAR merge_sar( current_uid, current_field_id, field_data.get("Coordinates", []), result_status["LatestRVIDay"], field_data.get("FieldMaxLat", 0), field_data.get("FieldMinLat", 0), field_data.get("FieldMaxLong", 0), field_data.get("FieldMinLong", 0), ) for sar_img in ["rvi", "rsm"]: local_file = f"{current_uid}/{current_field_id}/{sar_img}.png" remote_name = ( f"PaidMonitoredFields/{current_uid}/{current_field_id}/" f"{result_status['LatestRVIDay']}/{sar_img}" ) blob = storage_bucket.blob(remote_name) blob.upload_from_filename(local_file) # Merge DEM merge_dem( current_uid, current_field_id, field_data.get("Coordinates", []), result_status["LatestRVIDay"], field_data.get("FieldMaxLat", 0), field_data.get("FieldMinLat", 0), field_data.get("FieldMaxLong", 0), field_data.get("FieldMinLong", 0), ) dem_local_file = f"{current_uid}/{current_field_id}/dem.png" dem_remote_name = ( f"PaidMonitoredFields/{current_uid}/{current_field_id}/" f"{result_status['LatestRVIDay']}/dem" ) blob_dem = storage_bucket.blob(dem_remote_name) blob_dem.upload_from_filename(dem_local_file) except Exception: print(traceback.format_exc()) # Check if image generation was successful if isinstance(result_status, dict) and "successful" in result_status.get("status", ""): # Update DB db.reference("PaidMonitoredFields").child("PMF").child(current_uid).child( current_field_id ).child("LatestDay").set(image_date_str) db.reference("PaidMonitoredFields").child("PMF").child(current_uid).child( current_field_id ).child("SensedDays").child(image_date_str).set("OK") # Update Firestore try: doc_ref = firestore_db.collection(current_uid).document(str(current_field_id)) doc_ref.update({"LatestSensedDay": image_date_str}) except Exception: pass # Send monitoring notification send_moni_noti(current_uid, field_data.get("FieldDescription", ""), "") # Possibly trigger a webhook for a specific user if current_uid == "HC1KG5a2e1ZhXaPMpEcylZmeMYM2": send_webhook_data(current_field_id) else: # Mark as failed db.reference("PaidMonitoredFields").child("PMF").child(current_uid).child( current_field_id ).child("FailedDays").child(image_date_str).set("yes") send_error_noti(current_uid) def run_processing_for_field( proc_uid, proc_field_id, proc_lang, user_fields_obj, sentinel_cfg ): """ Orchestrates the logic for a single field: 1. Remove old directory and recreate 2. Potentially delete if expired 3. Check if we can process 4. Invoke satellite data processing if needed 5. Handle any previously requested data ranges """ remove_dir_cmd = f"rm -rf {proc_uid}/{proc_field_id}" os.system(remove_dir_cmd) # Recreate directory path_to_create = f"{proc_uid}/{proc_field_id}" if not os.path.exists(proc_uid): os.makedirs(proc_uid) if not os.path.exists(path_to_create): os.makedirs(path_to_create) print(f"Processing data for user={proc_uid}, field={proc_field_id}...") field_obj_local = user_fields_obj[proc_field_id] deleted_flag = remove_field_if_expired(proc_uid, proc_field_id) # Should we process the field? can_proc = can_process_field(proc_uid, proc_field_id) if can_proc["process_field"] and not deleted_flag: try: if not is_data_recently_processed(proc_uid, proc_field_id): # Fresh data satellite_data_processing(proc_uid, proc_lang, proc_field_id, None, None, sentinel_cfg) if can_proc["update_counter"]: visits_count = can_proc["total_satellite_visits"] ref_str = f"/{proc_uid}/MyProfile/TotalSatelliteVisits" db_app2 = process_db.reference(ref_str, app=default_app_2) db_app2.set(visits_count + 1) # Check old data requests prev_data_requests = field_obj_local.get("PreviousDataRequests", {}) for date_key in prev_data_requests: from_d, to_d = get_to_and_from_date(date_key, 0) attempt = 0 while attempt < 3: try: satellite_data_processing(proc_uid, proc_lang, proc_field_id, from_d, to_d, sentinel_cfg) if can_proc["update_counter"]: visits_count = can_proc["total_satellite_visits"] ref_str = f"/{proc_uid}/MyProfile/TotalSatelliteVisits" db_app2 = process_db.reference(ref_str, app=default_app_2) db_app2.set(visits_count + 1) break except Exception: print(traceback.format_exc()) attempt += 1 time.sleep(10) # Remove this request after processing db.reference("PaidMonitoredFields").child("PMF").child(proc_uid).child( proc_field_id ).child("PreviousDataRequests").child(date_key).delete() except Exception: print(traceback.format_exc()) # Cleanup os.system(remove_dir_cmd) # ===================== Iterate Over Users & Fields ===================== for each_uid, fields_dict in all_users_dict.items(): # Skip certain known IDs if each_uid in ["CeMGYvLXrGR5ZThxZ46iV7vY8sa2", "TCXcp5VIsfhHZrh0nm2VsgBtcGy2"]: continue default_lang = "en" # fields_dict is a dict: {fieldID: fieldDataObj} for fld_id in fields_dict: run_processing_for_field(each_uid, fld_id, default_lang, fields_dict, sentinel_settings) # Optionally stop the VM if needed: # os.system("curl -X POST 'https://us-central1-farmbase-b2f7e.cloudfunctions.net/stopVM'")