import os import time import traceback import random import threading import datetime from datetime import date, timedelta import base64 import requests import pytz import firebase_admin from firebase_admin import credentials, db, firestore from socket import * from oct2py import octave from io import BytesIO from google.cloud import storage from oauth2client.service_account import ServiceAccountCredentials from PIL import Image, ImageFilter from geopy.geocoders import Nominatim import scipy from scipy import ndimage # =============== Custom Modules =============== from send_webhook_data import send_webhook_data from send_notification import send_notification from sendemail import sendemail from sen_start_noti import sen_start_noti from send_sar_email import send_sar_email from gen_report_new2 import gen_report_new from merge_sar import merge_sar from find_sar import find_sar from merge_dem import merge_dem from find_dem import find_dem from make_bigquery import make_bigquery from send_moni_noti import send_moni_noti from send_error_noti import send_error_noti from gmap_image import gmap_image from gmap_image_large import gmap_image_large from find_img import find_img from find_img_large import find_img_large from merge_img import merge_img from all_proc import all_proc from contour_images import contour_images from send_expiring_noti import send_expiring_noti from send_expired_noti import send_expired_noti from make_trial_bigquery import make_trial_bigquery from gen_geotiff import gen_geotiff from sendgeotifs import sendgeotifs from gen_report import gen_report from get_weather_data import get_weather_data from sendonlyreport import sendonlyreport from gen_failed_report import gen_failed_report from sendfailedreport import sendfailedreport from map_coords import map_coords from search_new_sentinel import search_new_sentinel from convert_to_pdf import convert_to_pdf from latlon_jp2_to_pixel import latlon_jp2_to_pixel from gen_geotiff2 import gen_geotiff2 from search_sentinel_again import search_sentinel_again from get_prev_date import get_prev_date from make_bigquery_again import make_bigquery_again import pdftotree from convert_to_html import convert_to_html from get_land_use import get_land_use from make_dir import make_dir from create_interactive_html_report import make_interactive_html_report import process_field_flag from send_whatsapp2024 import generate_jeevnai_whatsapp_image from satellite_visit_email import send_html_email def server2024(user_id, field_id): """ Main function to process and update field data using Sentinel satellite imagery, DEM, SAR data, weather data, etc. It updates Firestore/Realtime database entries, generates reports, and handles image uploads to Firebase storage. :param user_id: User ID (string) :param field_id: Field ID (string) """ # ===================== Basic Setup ===================== today = date.today() today_yyyymmdd = today.strftime("%Y%m%d") # e.g. 20250108 # Attempt to initialize Firebase if not already credential_obj = credentials.Certificate("servicekey.json") try: firebase_admin.initialize_app( credential_obj, {"databaseURL": "https://farmbase-b2f7e-31c0c.firebaseio.com/"} ) except Exception as init_exception: print(f"Firebase app initialization error: {init_exception}") # Firebase references firestore_db = firestore.client() storage_client = storage.Client(project="farmbase-b2f7e") bucket_name = "farmbase-b2f7e.appspot.com" # Load sentinel settings from Firebase sentinel_settings = db.reference("SentinelSettings4").get() # List of disabled users disabled_users = [ "snQYQZqQx3SmVbRztmEqYn5Mkcz2", "KZQ7TZIYXnXN0b07OtrL1hlyYij1", "CeMGYvLXrGR5ZThxZ46iV7vY8sa2", "TCXcp5VIsfhHZrh0nm2VsgBtcGy2", "mFFHQdEtiSbn2hbYQAwwoIdYVi02", "4fPRPyszwLOjweG1qbpiCx3CFQo1", ] # ===================== Build a Dictionary of Users & Fields to Process ===================== if user_id not in disabled_users: if user_id is None and field_id is None: # Get all paid monitored fields all_users_fields = db.reference("PaidMonitoredFields").child("PMF").get(False, True) all_users_dict = {} for current_user_id, current_user_fields in all_users_fields.items(): if current_user_id not in disabled_users: all_users_dict[current_user_id] = db.reference("PaidMonitoredFields").child("PMF").child( current_user_id ).get(False, True) elif user_id is not None and field_id is None: # Get all fields for a single user user_fields = db.reference("PaidMonitoredFields").child("PMF").child(user_id).get(False, True) all_users_dict = {user_id: user_fields} else: # Get a specific field for a user single_field_obj = ( db.reference("PaidMonitoredFields") .child("PMF") .child(user_id) .child(field_id) .get(False, True) ) field_data = {field_id: single_field_obj} all_users_dict = {user_id: field_data} else: # User is disabled all_users_dict = {} # ===================== Utility Functions ===================== def get_to_and_from_date(requested_date_str, latest_sensed_day_flag): """ Convert requested_date_str (YYYYMMDD) into a from_date and to_date range by shifting ~4-5 days. :param requested_date_str: string in YYYYMMDD format :param latest_sensed_day_flag: indicates if there's a latest sensed day already used (0 or 1) :return: (from_date, to_date) in ISO date-time format, plus an l_date in YYYYMMDD """ current_day = int(requested_date_str[6:]) current_month = int(requested_date_str[4:6]) current_year = int(requested_date_str[:4]) start_year = current_year # Adjust month/day based on boundary conditions if current_day < 5 and current_month > 1: if latest_sensed_day_flag == 0: start_day = 31 - current_day - 5 else: start_day = 31 - current_day - 4 start_month = current_month - 1 elif current_day < 5 and current_month == 1: if latest_sensed_day_flag == 0: start_day = 31 - current_day - 5 else: start_day = 31 - current_day - 4 start_month = 12 start_year -= 1 else: if latest_sensed_day_flag == 0: start_day = current_day - 5 else: start_day = current_day - 4 start_month = current_month # Prevent zero or negative day if start_day <= 0: start_day = 1 if current_day <= 0: current_day = 1 # Format to strings with leading zeros if needed if start_month < 10: start_month = f"0{start_month}" else: start_month = str(start_month) if current_month < 10: current_month = f"0{current_month}" else: current_month = str(current_month) if start_day < 10: start_day = f"0{start_day}" else: start_day = str(start_day) if current_day < 10: current_day = f"0{current_day}" else: current_day = str(current_day) from_date = f"{start_year}-{start_month}-{start_day}T00:00:00" to_date = f"{current_year}-{current_month}-{current_day}T23:59:59" l_date = f"{current_year}{current_month}{current_day}" return from_date, to_date def is_data_already_processed(check_user_id, check_field_id): """ Checks if data was processed recently (less than 5 days ago) for the given user & field. :param check_user_id: user ID (string) :param check_field_id: field ID (string) :return: True if processed <5 days ago, otherwise False """ latest_day_str = ( db.reference("PaidMonitoredFields") .child("PMF") .child(check_user_id) .child(check_field_id) .child("LatestDay") .get() ) current_date_str = date.today().strftime("%Y%m%d") if latest_day_str is not None: day_diff = int(current_date_str) - int(latest_day_str) else: day_diff = 5 if day_diff < 5: print(("Farm recently processed:", latest_day_str, day_diff)) return True return False def shall_we_process_data(check_user_id, check_field_id): """ Determine if we should process the data for this field. Checks 'Expired' and 'Paused' flags. Also calls process_field_flag.process_field(...) to see if the field meets other conditions. :param check_user_id: user ID :param check_field_id: field ID :return: dict: {"process_field": bool, "update_counter": bool, "total_satellite_visits": int} """ expired_val = ( db.reference("PaidMonitoredFields") .child("PMF") .child(check_user_id) .child(check_field_id) .child("Expired") .get() or 0 ) try: expired_val = int(expired_val) except Exception: expired_val = 0 paused_val = ( db.reference("PaidMonitoredFields") .child("PMF") .child(check_user_id) .child(check_field_id) .child("Paused") .get() or "no" ) is_paused = 1 if str(paused_val).lower() == "yes" else 0 # If farm is expired or paused, no processing if expired_val != 0 or is_paused == 1: print(("Farm expired or paused:", expired_val, is_paused)) return {"process_field": False, "update_counter": False} # Otherwise, check advanced conditions process_decision = process_field_flag.process_field(check_user_id, check_field_id) print(process_decision) print("Proceeding to process data if indicated.") return process_decision def satellite_data( current_user_id, language, current_field_id, from_date_val, to_date_val, sentinel_config, ): """ Main logic to handle Sentinel, SAR, DEM retrieval and subsequent processing for a field. Generates images, reports, uploads to storage, updates Realtime DB, triggers notifications. :param current_user_id: user ID :param language: language code(s) string, e.g. "en" or "en,fr" :param current_field_id: field ID :param from_date_val: start date for data retrieval (ISO format) or None :param to_date_val: end date for data retrieval (ISO format) or None :param sentinel_config: dict with {ClientID, ClientSecret, WMSID, RVIID, DEMID} """ # Unpack sentinel settings client_id = sentinel_config["ClientID"] client_secret = sentinel_config["ClientSecret"] wms_id = sentinel_config["WMSID"] rvi_id = sentinel_config["RVIID"] dem_id = sentinel_config["DEMID"] # Get field object from DB field_obj = ( db.reference("PaidMonitoredFields") .child("PMF") .child(current_user_id) .child(current_field_id) .get() ) # Initialize placeholders latest_sensed_day_str = 0 latest_failed_day_str = 0 latest_rvi_day_str = 0 latest_dem_day_str = 0 # Try to get field address/description field_address = field_obj.get("FieldDescription", field_obj.get("FieldAddress", "Farm")) whitelabel_name = field_obj.get("Whitelabel", "farmonaut") # Extract whitelabel email & password if any try: whitelabel_data = db.reference("WhitelabelEmails").child(whitelabel_name).get() whitelabel_email = whitelabel_data["Email"] whitelabel_password = whitelabel_data["Password"] except Exception: whitelabel_email = None whitelabel_password = None # Determine language (some users have overrides) try: language = field_obj["Language"] except Exception: language = "en" # Special-cases for user-languages if current_user_id == "HC1KG5a2e1ZhXaPMpEcylZmeMYM2": language = "ar" elif current_user_id == "snQYQZqQx3SmVbRztmEqYn5Mkcz2": language = "te" elif current_user_id == "mFFHQdEtiSbn2hbYQAwwoIdYVi02": language = "uz" elif current_user_id == "8aGkNQm166bmk8cjHVHtwGli2DD2": language = "pa,hi,en" elif "biopixel" in whitelabel_name: language = "fr,ar" # Additional field metadata field_description = field_obj.get("FieldDescription", "NotAvailable") plant_distance = field_obj.get("PlantDistance", 0) # Payment check try: payment_status = field_obj["Paid"].lower() is_paid = 1 if payment_status == "yes" else 0 except Exception: is_paid = 0 # If the user has paid if is_paid == 1: # Gather more data sensed_days = field_obj.get("SensedDays", None) failed_days = field_obj.get("FailedDays", None) coordinates_list = field_obj.get("Coordinates", None) field_area_str = field_obj.get("FieldArea", 0) field_area_val = float(field_area_str) # Paused / Expired flags pause_check = field_obj.get("Paused", "no") is_paused = 1 if pause_check.lower() == "yes" else 0 try: expired_check = field_obj["Expired"] is_expired = 1 if int(expired_check) != 0 else 0 except Exception: is_expired = 0 # Payment type => number of months (or special values) try: payment_type_val = float(field_obj["PaymentType"]) except Exception: payment_type_val = 0 # Identify the latest sensed day if any is_new_field = 0 if sensed_days is not None: total_sensed_days_count = 0 for day_key in sensed_days.keys(): total_sensed_days_count += 1 if int(day_key) > int(latest_sensed_day_str): latest_sensed_day_str = day_key is_new_field = 0 else: is_new_field = 1 # Identify the latest failed day if any if failed_days is not None: for day_key in failed_days.keys(): if int(day_key) > int(latest_failed_day_str): latest_failed_day_str = day_key # Check or update expiration logic field_id_as_float = float(current_field_id) / 1000 order_date_full = datetime.datetime.fromtimestamp(field_id_as_float).strftime("%Y-%m-%d %H:%M:%S.%f")[:10] order_year, order_month, order_day = order_date_full.split("-") # Current date current_date_str = date.today().strftime("%Y%m%d") today_year, today_month, today_day = ( current_date_str[:4], current_date_str[4:6], current_date_str[6:], ) order_date_dt = date(int(order_year), int(order_month), int(order_day)) current_date_dt = date(int(today_year), int(today_month), int(today_day)) days_since_order = (current_date_dt - order_date_dt).days # If usage (sensed days) or time-based subscription is exceeded if ( not is_new_field and sensed_days is not None and len(sensed_days.keys()) >= 6 * payment_type_val + 1 and days_since_order > 30 * payment_type_val and payment_type_val != -2 ): is_expired = 1 db.reference("PaidMonitoredFields").child("PMF").child(current_user_id).child(current_field_id).child( "Expired" ).set("1") send_expired_noti(current_user_id) else: db.reference("PaidMonitoredFields").child("PMF").child(current_user_id).child(current_field_id).child( "Expired" ).delete() is_expired = 0 # Check if subscription is expiring soon total_sensed_days_so_far = 0 if sensed_days is not None: total_sensed_days_so_far = len(sensed_days.keys()) if ( (6 * payment_type_val - total_sensed_days_so_far < 3) and (30 * payment_type_val - days_since_order < 10) and payment_type_val != -2 ): db.reference("PaidMonitoredFields").child("PMF").child(current_user_id).child(current_field_id).child( "Expiring" ).set("yes") else: db.reference("PaidMonitoredFields").child("PMF").child(current_user_id).child(current_field_id).child( "Expiring" ).set("no") # If custom date range is set, re-initialize day trackers if from_date_val is not None: latest_sensed_day_str = 0 latest_failed_day_str = 0 latest_rvi_day_str = 0 latest_dem_day_str = 0 if latest_sensed_day_str is None: latest_sensed_day_str = 0 # If it's time to fetch new data (no new data for last 4+ days, not expired/paused) if (int(today_yyyymmdd) - int(latest_sensed_day_str) > 4) and (is_expired == 0) and (is_paused == 0): # For a new field if is_new_field == 1 and is_paid == 1: sar_response = find_sar( current_user_id, current_field_id, field_obj, latest_rvi_day_str, is_expired, is_paused, from_date_val, to_date_val, client_id, client_secret, rvi_id, ) latest_rvi_day_str = sar_response["LatestDay"] land_response = get_land_use( current_user_id, current_field_id, field_obj, latest_sensed_day_str, latest_failed_day_str, is_expired, is_paused, from_date_val, to_date_val, client_id, client_secret, wms_id, ) dem_response = find_dem( current_user_id, current_field_id, field_obj, latest_dem_day_str, is_expired, is_paused, from_date_val, to_date_val, client_id, client_secret, dem_id, ) latest_dem_day_str = dem_response["LatestDay"] sentinel_search_response = search_new_sentinel( current_user_id, current_field_id, field_obj, latest_sensed_day_str, latest_failed_day_str, is_expired, is_paused, from_date_val, to_date_val, client_id, client_secret, wms_id, ) elif is_new_field == 0 and is_paid == 1 and is_expired == 0: # Already has data, not new sar_response = find_sar( current_user_id, current_field_id, field_obj, latest_rvi_day_str, is_expired, is_paused, from_date_val, to_date_val, client_id, client_secret, rvi_id, ) latest_rvi_day_str = sar_response["LatestDay"] land_response = get_land_use( current_user_id, current_field_id, field_obj, latest_sensed_day_str, latest_failed_day_str, is_expired, is_paused, from_date_val, to_date_val, client_id, client_secret, wms_id, ) dem_response = find_dem( current_user_id, current_field_id, field_obj, latest_dem_day_str, is_expired, is_paused, from_date_val, to_date_val, client_id, client_secret, dem_id, ) latest_dem_day_str = dem_response["LatestDay"] sentinel_search_response = search_new_sentinel( current_user_id, current_field_id, field_obj, latest_sensed_day_str, latest_failed_day_str, is_expired, is_paused, from_date_val, to_date_val, client_id, client_secret, wms_id, ) else: # Not paid or some other condition sentinel_search_response = {"MGRS": "NotA", "LatestDay": today_yyyymmdd} # Extract the newly found image date image_date_str = str(sentinel_search_response["LatestDay"]) mgrs_id = sentinel_search_response["MGRS"] # Attempt to retrieve previous day tile date try: previous_tile_date = db.reference("LatestTileDates").child(mgrs_id).get() if previous_tile_date is None: db.reference("LatestTileDates").child(str(mgrs_id)).set(image_date_str) except Exception: previous_tile_date = "11" sentinel_search_response["PreviousDay"] = previous_tile_date else: # No new image needed image_date_str = 0 else: # If field is not paid image_date_str = 0 sensed_days = None failed_days = None is_paused = 0 is_expired = 0 # Check if the newly fetched date is already in failed days failed_day_flag = 0 if failed_days and image_date_str != 0: for failed_day_key in failed_days.keys(): if failed_day_key == image_date_str: print("Already failed processing for this date.") failed_day_flag = 1 break # Check if the newly fetched date is already in sensed days sensed_day_flag = 0 if sensed_days and image_date_str != 0: for sensed_day_key in sensed_days.keys(): if sensed_day_key == image_date_str: print("Already done processing for this date:", image_date_str) sensed_day_flag = 1 break # Check if we should generate GeoTIFF try: gen_tif_val = field_obj["GenTif"] generate_tifs = 1 if gen_tif_val in ["yes", "abs_yes"] else 0 except Exception: generate_tifs = 0 # Get weather data if new date is found and field not expired if image_date_str and is_expired == 0: try: field_min_lat = field_obj["FieldMinLat"] field_min_long = field_obj["FieldMinLong"] get_weather_data(field_min_lat, field_min_long, today_yyyymmdd, current_user_id, current_field_id) except Exception: pass # Build a pattern code for debugging or quick checks # pattern = [sensed_day_flag, is_paid, failed_day_flag, big_query_perf, paused, expired] big_query_perf = 1 if (image_date_str not in [None, 0]) else 0 status_pattern = ( str(sensed_day_flag) + str(is_paid) + str(failed_day_flag) + str(big_query_perf) + str(is_paused) + str(is_expired) ) print(("pattern:", status_pattern)) # ===================== Core Processing (Octave or Merging) ===================== # If pattern indicates we have new, valid data to process if status_pattern == "010100": # Generate an interactive map if field is near expiration if field_obj.get("Expiring", "no") == "yes": send_expiring_noti(current_user_id) # Gather field bounding coordinates to generate static maps try: field_max_lat = field_obj["FieldMaxLat"] field_min_lat = field_obj["FieldMinLat"] field_max_long = field_obj["FieldMaxLong"] field_min_long = field_obj["FieldMinLong"] except Exception: field_max_lat = field_min_lat = field_max_long = field_min_long = 0 # Create directories if missing if not os.path.exists(current_user_id): os.makedirs(current_user_id) if not os.path.exists(f"{current_user_id}/{current_field_id}"): os.makedirs(f"{current_user_id}/{current_field_id}") # Attempt to open large static map, else create it try: large_map_file = f"{current_user_id}/{current_field_id}/_static_map_large.png" Image.open(large_map_file) except Exception as e: print(f"Generating large static map: {e}") gmap_image_large( field_min_lat, field_max_lat, field_min_long, field_max_long, field_obj.get("TileMaxLat", 0), field_obj.get("TileMinLat", 0), field_obj.get("TileMaxLong", 0), field_obj.get("TileMinLong", 0), current_field_id, current_user_id, field_area_val, ) # Attempt to open normal static map, else create it try: normal_map_file = f"{current_user_id}/{current_field_id}/_static_map.png" Image.open(normal_map_file) except Exception as e: print(f"Generating static map: {e}") gmap_image( field_min_lat, field_max_lat, field_min_long, field_max_long, field_obj.get("TileMaxLat", 0), field_obj.get("TileMinLat", 0), field_obj.get("TileMaxLong", 0), field_obj.get("TileMinLong", 0), current_field_id, current_user_id, field_area_val, ) # Helper to invoke Octave-based field image generation def make_field_images(param_json): return octave.monitored_field2022(param_json) # Prepare metadata for Octave new_field_data = { "LatestDay": str(image_date_str), "PreviousDay": field_obj.get("PreviousDay", None), "MGRS": field_obj.get("MGRS", "N/A"), "FieldID": current_field_id, "PlantDistance": plant_distance, "FieldArea": float(field_area_val) / 10000, "StartPixelLat": 180, "EndPixelLat": 180, "StartPixelLong": 180, "EndPixelLong": 180, } # Call Octave to generate images field_generation_status = make_field_images(new_field_data) # If new SAR data arrived, merge it storage_bucket = storage_client.get_bucket(bucket_name) try: if int(latest_rvi_day_str) > 0 and is_expired == 0 and is_paused == 0: merge_dem( current_user_id, current_field_id, coordinates_list, latest_dem_day_str, field_max_lat, field_min_lat, field_max_long, field_min_long, ) dem_filename = f"{current_user_id}/{current_field_id}/dem.png" dem_destination = f"PaidMonitoredFields/{current_user_id}/{current_field_id}/{latest_rvi_day_str}/dem" blob_dem = storage_bucket.blob(dem_destination) blob_dem.upload_from_filename(dem_filename) except Exception: print(traceback.format_exc()) try: if int(latest_rvi_day_str) > 0 and is_expired == 0 and is_paused == 0: db.reference("PaidMonitoredFields").child("PMF").child(current_user_id).child( current_field_id ).child("SARDays").child(str(latest_rvi_day_str)).set("yes") merge_sar( current_user_id, current_field_id, coordinates_list, latest_rvi_day_str, field_max_lat, field_min_lat, field_max_long, field_min_long, ) for sar_img in ["rvi", "rsm"]: source_sar_img = f"{current_user_id}/{current_field_id}/{sar_img}.png" dest_sar_img = f"PaidMonitoredFields/{current_user_id}/{current_field_id}/{latest_rvi_day_str}/{sar_img}" sar_blob = storage_bucket.blob(dest_sar_img) sar_blob.upload_from_filename(source_sar_img) # Merge DEM again if needed merge_dem( current_user_id, current_field_id, coordinates_list, latest_dem_day_str, field_max_lat, field_min_lat, field_max_long, field_min_long, ) dem_filename2 = f"{current_user_id}/{current_field_id}/dem.png" dem_destination2 = f"PaidMonitoredFields/{current_user_id}/{current_field_id}/{latest_dem_day_str}/dem" dem_blob2 = storage_bucket.blob(dem_destination2) dem_blob2.upload_from_filename(dem_filename2) except Exception: print(traceback.format_exc()) # Check the result of the field image generation if "successful" not in field_generation_status: # Mark field as failed if "failed" in field_generation_status: db.reference("PaidMonitoredFields").child("PMF").child(current_user_id).child( current_field_id ).child("FailedDays").child(str(image_date_str)).set("yes") send_error_noti(current_user_id) else: # Processing successful db.reference("PaidMonitoredFields").child("PMF").child(current_user_id).child( current_field_id ).child("LatestDay").set(str(image_date_str)) db.reference("PaidMonitoredFields").child("PMF").child(current_user_id).child( current_field_id ).child("SensedDays").child(str(image_date_str)).set("OK") # Update Firestore try: city_ref = firestore_db.collection(current_user_id).document(str(current_field_id)) city_ref.update({"LatestSensedDay": str(image_date_str)}) except Exception: pass # Send monitoring notification send_moni_noti(current_user_id, field_obj.get("FieldDescription", ""), "") if current_user_id == "HC1KG5a2e1ZhXaPMpEcylZmeMYM2": send_webhook_data(current_field_id) def should_delete_field(check_user_id, check_field_id, field_data_obj): """ Checks if a field should be deleted based on expiration & payment type. If so, it moves the field data to 'DeletedFields' and removes it from 'PaidMonitoredFields'. Returns 1 if deleted, 0 otherwise. """ expired_field_val = ( db.reference("PaidMonitoredFields") .child("PMF") .child(check_user_id) .child(check_field_id) .child("Expired") .get() or 0 ) try: expired_field_val = int(expired_field_val) except Exception: expired_field_val = 0 payment_type_field = ( db.reference("PaidMonitoredFields") .child("PMF") .child(check_user_id) .child(check_field_id) .child("PaymentType") .get() or 0 ) try: payment_type_field = float(payment_type_field) except Exception: payment_type_field = 0 # Grace period in months before deletion months_to_wait_before_deletion = 1 if payment_type_field != -2 and (expired_field_val == 1 or payment_type_field == 0): # If never paid (0) or subscription ended (expired=1) if payment_type_field == 0: # 1 day grace period for never-paid fields payment_milliseconds = 24 * 60 * 60 * 1000 else: try: payment_milliseconds = ( (int(payment_type_field) + months_to_wait_before_deletion) * 30 * 24 * 60 * 60 * 1000 ) except Exception: payment_milliseconds = months_to_wait_before_deletion * 30 * 24 * 60 * 60 * 100 try: if (int(round(time.time() * 1000)) - int(check_field_id)) > payment_milliseconds: print(("Deleting field:", check_user_id, check_field_id)) field_full_obj = ( db.reference("PaidMonitoredFields") .child("PMF") .child(check_user_id) .child(check_field_id) .get() ) # Move it to DeletedFields try: db.reference("DeletedFields").child("PMF").child(check_user_id).child( check_field_id ).set(field_full_obj) except Exception: print("Field object not found for deletion.") # Remove from PaidMonitoredFields db.reference("PaidMonitoredFields").child("PMF").child(check_user_id).child( check_field_id ).delete() return 1 except Exception: print(traceback.format_exc()) # If error, still delete the field db.reference("PaidMonitoredFields").child("PMF").child(check_user_id).child( check_field_id ).delete() return 1 return 0 def process_data_for_field( processing_user_id, processing_field_id, language, user_fields_data, sentinel_config ): """ Orchestrates the logic for a single field: 1. Cleanup directories 2. Check for deletion 3. Check if we should process data 4. Calls satellite_data if needed 5. Re-process previous data requests if any """ # Remove existing directory and recreate remove_command = f"rm -rf {processing_user_id}/{processing_field_id}" os.system(remove_command) make_dir(processing_user_id, processing_field_id) print("Processing data for user:", processing_user_id, "Field:", processing_field_id) # Retrieve the field object from the dictionary field_data_obj = user_fields_data[processing_field_id] # Check if field should be deleted is_deleted = should_delete_field(processing_user_id, processing_field_id, field_data_obj) # Check if we should process the field process_decision = shall_we_process_data(processing_user_id, processing_field_id) if process_decision["process_field"]: if is_deleted == 1: print("Field is deleted; skipping processing.") else: try: # Process only if not processed in the last 4 days if not is_data_already_processed(processing_user_id, processing_field_id): print("Processing fresh data now...") satellite_data( processing_user_id, language, processing_field_id, None, None, sentinel_config ) # Update counter if indicated if process_decision["update_counter"]: total_visits = process_decision["total_satellite_visits"] ref_path = f"/{processing_user_id}/MyProfile/TotalSatelliteVisits" db_app2 = process_field_flag.db.reference(ref_path, app=process_field_flag.default_app_2) db_app2.set(total_visits + 1) # Optionally generate a WhatsApp promotional image # generate_jeevnai_whatsapp_image('https://farmonaut.com/jeevn_ai_promo.jpg', processing_user_id, processing_field_id, language, None) # Check old data requests (if user manually requested historical data) prev_requests = field_data_obj.get("PreviousDataRequests", None) if prev_requests is not None: for timestamp_key, req_obj in prev_requests.items(): print("Found older data request date:", timestamp_key) from_date_val, to_date_val = get_to_and_from_date(timestamp_key, 0) attempt_count = 0 while attempt_count < 3: try: satellite_data( processing_user_id, language, processing_field_id, from_date_val, to_date_val, sentinel_config, ) # Update global satellite visits if needed if process_decision["update_counter"]: total_visits = process_decision["total_satellite_visits"] ref_path = f"/{processing_user_id}/MyProfile/TotalSatelliteVisits" db_app2 = process_field_flag.db.reference(ref_path, app=process_field_flag.default_app_2) db_app2.set(total_visits + 1) attempt_count = 3 except Exception: print(traceback.format_exc()) attempt_count += 1 time.sleep(10) # Remove the entry once done db.reference("PaidMonitoredFields").child("PMF").child(processing_user_id).child( processing_field_id ).child("PreviousDataRequests").child(timestamp_key).delete() except Exception: print(traceback.format_exc()) # Cleanup os.system(remove_command) # ===================== Iterate Over All Users & Fields ===================== for current_user_id, fields_data in all_users_dict.items(): # Skip certain users if current_user_id not in ["CeMGYvLXrGR5ZThxZ46iV7vY8sa2", "TCXcp5VIsfhHZrh0nm2VsgBtcGy2"]: # Default language is English unless determined otherwise default_language = "en" try: for single_field_id, single_field_obj in fields_data.items(): process_data_for_field( current_user_id, single_field_id, default_language, fields_data, sentinel_settings ) except Exception: print(traceback.format_exc()) # Optionally stop the VM if needed: # requests.post('https://us-central1-farmbase-b2f7e.cloudfunctions.net/stopVM')