import os import datetime import pytz import logging import traceback import time from typing import Dict, Optional, Tuple, List, Set, Any from socket import * from google.cloud import storage from firebase_admin import credentials, firestore, initialize_app, db from PIL import Image from dateutil.relativedelta import relativedelta from gen_report_new2 import gen_report_new from merge_sar import merge_sar from find_sar import find_sar from merge_dem import merge_dem from find_dem import find_dem from send_moni_noti import send_moni_noti from send_error_noti import send_error_noti from send_expiring_noti import send_expiring_noti from send_expired_noti import send_expired_noti from sendfailedreport import sendfailedreport from gmap_image import gmap_image from gmap_image_large import gmap_image_large from get_weather_data import get_weather_data from map_coords import map_coords from search_new_sentinel import search_new_sentinel from gen_geotiff2 import gen_geotiff2 from get_land_use import get_land_use from make_dir import make_dir from satellite_visit_email import send_html_email from send_webhook_data import send_webhook_data from oct2py import octave from process_field_flag import process_field_flag # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Constants PROJECT_ID = "farmbase-b2f7e" BUCKET_NAME = f"{PROJECT_ID}.appspot.com" DATABASE_URL = "https://farmbase-b2f7e-31c0c.firebaseio.com/" DISABLED_UIDS = [ "snQYQZqQx3SmVbRztmEqYn5Mkcz2", "KZQ7TZIYXnXN0b07OtrL1hlyYij1", "CeMGYvLXrGR5ZThxZ46iV7vY8sa2", "TCXcp5VIsfhHZrh0nm2VsgBtcGy2", "mFFHQdEtiSbn2hbYQAwwoIdYVi02", "4fPRPyszwLOjweG1qbpiCx3CFQo1", "KYThGMoIlISJG4mVXArK6Y5QPDh1", "hOV9C5CalZc9mIYCQQxplvgAfkG3", "ipRHhCOFIDV2pxgg7Nfz1ufZBmV2", "4fPRPyszwLOjweG1qbpiCx3CFQo1" ] IMAGE_TYPES = [ "tci", "etci", "ndvi", "evi", "rvi", "rsm", "ndwi", "ndre", "vari", "soc", "savi", "ndmi", "evapo", "avi", "bsi", "si", "dem", "hybrid_blind", "vssi", "lulc", "lai" ] CMAP_TYPES = [ "soc_cmap2", "ndre_cmap2", "ndvi_cmap2", "evi_cmap2", "ndwi_cmap2", "vari_cmap2", "savi_cmap2", "avi_cmap2", "bsi_cmap2", "si_cmap2", "ndmi_cmap2", "vssi_cmap2", "lai_cmap2" ] CHART_TYPES = [ "ndvi_pie", "evi_pie", "ndwi_pie", "ndre_pie", "vari_pie", "savi_pie", "avi_pie", "bsi_pie", "si_pie", "soc_pie", "ndmi_pie", "vssi_pie", "lai_pie", "ndvi_linegraph", "evi_linegraph", "ndwi_linegraph", "ndre_linegraph", "vari_linegraph", "savi_linegraph", "avi_linegraph", "bsi_linegraph", "si_linegraph", "soc_linegraph", "ndmi_linegraph", "vssi_linegraph", "weather_linegraph", "lai_linegraph" ] TIF_TYPES = [ "tci.tif", "hybrid.tif", "etci.tif", "ndvi.tif", "evi.tif", "soc.tif", "ndre.tif", "vari.tif" ] def initialize_firebase() -> Optional[Any]: """Initialize Firebase with service account credentials.""" try: cred = credentials.Certificate("servicekey.json") initialize_app(cred, {"databaseURL": DATABASE_URL}) return firestore.client() except Exception as e: logger.error(f"Failed to initialize Firebase: {e}") return None def get_to_and_from_date(requested_date: str, latest_sensed_day: int) -> Tuple[str, str]: """Calculate start and end dates for data processing.""" current_date = int(requested_date[6:]) current_month = int(requested_date[4:6]) current_year = int(requested_date[:4]) days_offset = 5 if latest_sensed_day == 0 else 4 if current_date < days_offset and current_month > 1: start_date = 31 - current_date - days_offset start_month = current_month - 1 start_year = current_year elif current_date < days_offset and current_month == 1: start_date = 31 - current_date - days_offset start_month = 12 start_year = current_year - 1 else: start_date = current_date - days_offset start_month = current_month start_year = current_year start_date = max(1, start_date) current_date = max(1, current_date) return ( f"{start_year}-{start_month:02d}-{start_date:02d}T00:00:00", f"{current_year}-{current_month:02d}-{current_date:02d}T23:59:59" ) def get_to_and_from_date_radar(requested_date: str, latest_sensed_day: int) -> Tuple[str, str]: """Calculate radar-specific start and end dates.""" current_date = int(requested_date[6:]) current_month = int(requested_date[4:6]) current_year = int(requested_date[:4]) days_offset = 13 if latest_sensed_day == 0 else 12 if current_date < days_offset and current_month > 1: start_date = 31 - current_date - days_offset start_month = current_month - 1 start_year = current_year elif current_date < days_offset and current_month == 1: start_date = 31 - current_date - days_offset start_month = 12 start_year = current_year - 1 else: start_date = current_date - days_offset start_month = current_month start_year = current_year start_date = max(1, start_date) current_date = max(1, current_date) return ( f"{start_year}-{start_month:02d}-{start_date:02d}T00:00:00", f"{current_year}-{current_month:02d}-{current_date:02d}T23:59:59" ) def is_data_already_processed(uid: str, fieldid: str, db_ref: Any) -> bool: """Check if field data has been recently processed.""" latest_day = db_ref.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/LatestDay").get() today = datetime.date.today().strftime("%Y%m%d") if latest_day is None: return False day_diff = int(today) - int(latest_day) return day_diff < 5 def should_delete_field(uid: str, fieldid: str, field_obj: Dict, db_ref: Any) -> bool: """Determine if a field should be deleted based on expiration and payment status.""" expired = field_obj.get("Expired", "0") payment_type = field_obj.get("PaymentType", "0") try: expired = int(expired) payment_type = float(payment_type) except (ValueError, TypeError): expired = 0 payment_type = 0 if payment_type != -2 and (expired == 1 or payment_type == 0): months_to_wait = 1 payment_millis = ( 24 * 60 * 60 * 1000 if payment_type == 0 else (payment_type + months_to_wait) * 30 * 24 * 60 * 60 * 1000 ) try: if (int(round(time.time() * 1000)) - int(fieldid)) > payment_millis: try: db_ref.reference(f"DeletedFields/PMF/{uid}/{fieldid}").set(field_obj) except Exception as e: logger.error(f"Failed to move field to DeletedFields: {e}") db_ref.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}").delete() return True except Exception as e: logger.error(f"Error checking deletion conditions: {e}") db_ref.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}").delete() return True return False def get_field_metadata(fie: Dict, uid: str) -> Dict: """Extract and process field metadata.""" metadata = { "field_description": fie.get("FieldDescription", fie.get("FieldAddress", "Farm")), "whitelabel": fie.get("Whitelabel", "farmonaut"), "language": fie.get("Language", "en"), "plant_distance": fie.get("PlantDistance", 0), "paid": fie.get("Paid", "no").lower() == "yes", "paused": fie.get("Paused", "no") == "yes", "expired": int(fie.get("Expired", "0")), "total_paid_months": float(fie.get("TotalPaidMonths", fie.get("PaymentType", 0))), "coordinates": fie.get("Coordinates", []), "field_area": fie.get("FieldArea", 0), "sensed_days": fie.get("SensedDays", {}), "failed_days": fie.get("FailedDays", {}), "sar_days": fie.get("SARDays", {}), "dem_days": fie.get("DEMDays", {}), "gen_tif": fie.get("GenTif", "no") in ["yes", "abs_yes"], "field_min_lat": fie.get("FieldMinLat", 0), "field_min_long": fie.get("FieldMinLong", 0), "email": fie.get("Email", "NA"), "crop_code": fie.get("CropCode", -1), "expiring": fie.get("Expiring", "no") == "yes" } # Override language for specific UIDs language_overrides = { "HC1KG5a2e1ZhXaPMpEcylZmeMYM2": "ar", "snQYQZqQx3SmVbRztmEqYn5Mkcz2": "te", "mFFHQdEtiSbn2hbYQAwwoIdYVi02": "uz", "8aGkNQm166bmk8cjHVHtwGli2DD2": "pa,hi,en" } if uid in language_overrides: metadata["language"] = language_overrides[uid] elif "biopixel" in metadata["whitelabel"]: metadata["language"] = "fr,ar" try: whitelabel_obj = db.reference(f"WhitelabelEmails/{metadata['whitelabel']}").get() metadata["whitelabel_email"] = whitelabel_obj.get("Email") metadata["whitelabel_password"] = whitelabel_obj.get("Password") except Exception: metadata["whitelabel_email"] = None metadata["whitelabel_password"] = None return metadata def get_indices(uid: str, fieldid: str, latest_day: str, db_ref: Any) -> Tuple[float, float]: """Retrieve NDVI and NDWI indices from the database.""" try: ndvi = db_ref.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/Health/ndvi/{latest_day}").get() ndwi = db_ref.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/Health/ndwi/{latest_day}").get() return float(ndvi) / 100, float(ndwi) / 100 except Exception as e: logger.error(f"Error retrieving indices: {e}") return 0.0, 0.0 def upload_images( uid: str, fieldid: str, imagedate: str, images: List[str], bucket: Any ) -> None: """Upload images to Google Cloud Storage.""" for image_name in images: original_name = image_name if image_name == "tci": image_name = "TCI" elif image_name == "etci": image_name = "ETCI" elif "pie" in image_name: image_name += "chart" elif image_name == "mask_img": original_name = "mask" image_path = f"{uid}/{fieldid}/{image_name}.png" destination = f"PaidMonitoredFields/{uid}/{fieldid}/{imagedate}/{original_name}" try: blob = bucket.blob(destination) blob.upload_from_filename(image_path) except Exception as e: logger.error(f"Failed to upload {image_name}: {e}") def upload_reports( languages: str, uid: str, whitelabel: str, imagedate: str, field_address: str, fieldid: str, bucket: Any ) -> None: """Upload reports to Google Cloud Storage for all specified languages.""" all_langs = languages.split(",") ff = field_address.replace(" ", "_").replace(",", "_").replace("/", "_").replace("'", "_") for idx, lang in enumerate(all_langs): doc_name = f"{uid}/{whitelabel}_report_{lang}_{imagedate}_{ff}.pdf" destination = ( f"PaidMonitoredFields/{uid}/{fieldid}/{imagedate}/report.pdf" if idx == 0 else f"PaidMonitoredFields/{uid}/{fieldid}/{imagedate}/report{lang}.pdf" ) try: blob = bucket.blob(destination) blob.upload_from_filename(doc_name) except Exception as e: logger.error(f"Failed to upload report for language {lang}: {e}") def make_field_images(new_string_json: Dict) -> str: """Generate field images using Octave.""" try: return octave.monitored_field2022(new_string_json) except Exception as e: logger.error(f"Error generating field images: {e}") return "failed" def process_satellite_data( uid: str, fieldid: str, fie: Dict, sentinel_settings: Dict, from_date: Optional[str], to_date: Optional[str], radar_from_date: Optional[str], radar_to_date: Optional[str], db_ref: Any ) -> Tuple[Dict, str, str]: """Process satellite data for a given field.""" metadata = get_field_metadata(fie, uid) client_id = sentinel_settings["ClientID"] client_secret = sentinel_settings["ClientSecret"] latest_sensed_day = max((int(day) for day in metadata["sensed_days"]), default=0) latest_failed_day = max((int(day) for day in metadata["failed_days"]), default=0) latest_rvi_day = max((int(day) for day in metadata["sar_days"]), default=0) latest_dem_day = max((int(day) for day in metadata["dem_days"]), default=0) if from_date: latest_sensed_day = latest_failed_day = latest_rvi_day = latest_dem_day = 0 today = datetime.date.today().strftime("%Y%m%d") if (int(today) - latest_sensed_day <= 4 or metadata["expired"] or metadata["paused"]): return {"LatestDay": "0", "MGRS": "NotA"}, str(latest_rvi_day), str(latest_dem_day) req_json = {} if metadata["paid"] and not metadata["expired"]: req_json_sar = find_sar( uid, fieldid, fie, latest_rvi_day, metadata["expired"], metadata["paused"], radar_from_date, radar_to_date, client_id, client_secret, sentinel_settings["RVIID"] ) latest_rvi_day = req_json_sar["LatestDay"] req_json_land = get_land_use( uid, fieldid, fie, latest_sensed_day, latest_failed_day, metadata["expired"], metadata["paused"], from_date, to_date, client_id, client_secret, sentinel_settings["WMSID"] ) req_json_dem = find_dem( uid, fieldid, fie, latest_dem_day, metadata["expired"], metadata["paused"], from_date, to_date, client_id, client_secret, sentinel_settings["DEMID"] ) latest_dem_day = req_json_dem["LatestDay"] req_json = search_new_sentinel( uid, fieldid, fie, latest_sensed_day, latest_failed_day, metadata["expired"], metadata["paused"], from_date, to_date, client_id, client_secret, sentinel_settings["WMSID"] ) if not req_json: req_json = {"MGRS": "NotA", "LatestDay": today} # Update LatestTileDates mgrs = req_json["MGRS"] imagedate = str(req_json["LatestDay"]) try: previous_day = db_ref.reference(f"LatestTileDates/{mgrs}").get() or "11" db_ref.reference(f"LatestTileDates/{mgrs}").set(imagedate) except Exception as e: logger.error(f"Error updating LatestTileDates: {e}") req_json["PreviousDay"] = previous_day return req_json, latest_rvi_day, latest_dem_day def check_subscription_status( uid: str, fieldid: str, fie: Dict, db_ref: Any ) -> Tuple[bool, int]: """Check and update subscription status.""" metadata = get_field_metadata(fie, uid) today = datetime.date.today().strftime("%Y%m%d") total_sensed_days = len(metadata["sensed_days"]) try: int_fieldid = float(fieldid) / 1000 order_date = datetime.datetime.fromtimestamp(int_fieldid).strftime("%Y-%m-%d") delta = datetime.date.today() - datetime.datetime.strptime(order_date, "%Y-%m-%d").date() days_since_order = delta.days except Exception: days_since_order = 0 expired = metadata["expired"] if (total_sensed_days >= 6 * metadata["total_paid_months"] + 1 and days_since_order > 30 * metadata["total_paid_months"] and metadata["total_paid_months"] != -2): expired = 1 db_ref.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/Expired").set("1") send_expired_noti(uid) else: db_ref.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/Expired").delete() expiring = ( 6 * metadata["total_paid_months"] - total_sensed_days < 3 and 30 * metadata["total_paid_months"] - days_since_order < 10 and metadata["total_paid_months"] != -2 ) db_ref.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/Expiring").set("yes" if expiring else "no") return expired, 1 if expiring else 0 def process_field_data( uid: str, fieldid: str, fie: Dict, sentinel_settings: Dict, db_ref: Any, bucket: Any ) -> None: """Process field data including images and reports.""" metadata = get_field_metadata(fie, uid) today = datetime.date.today().strftime("%Y%m%d") req_json, latest_rvi_day, latest_dem_day = process_satellite_data( uid, fieldid, fie, sentinel_settings, None, None, None, None, db_ref ) imagedate = str(req_json["LatestDay"]) # Check if data was already processed imgdate_in_senseddays = imagedate in metadata["sensed_days"] imgdate_in_faileddays = imagedate in metadata["failed_days"] expired, expiring = check_subscription_status(uid, fieldid, fie, db_ref) c_code = ( f"{int(imgdate_in_senseddays)}{int(metadata['paid'])}" f"{int(imgdate_in_faileddays)}1{int(metadata['paused'])}{expired}" ) logger.info(f"Processing pattern: {c_code}") if c_code != "010100": if not imgdate_in_senseddays and metadata["paid"] and not imgdate_in_faileddays: db_ref.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/FailedDays/{imagedate}").set("yes") send_error_noti(uid) return # Generate weather data if metadata["gen_tif"] and not expired: try: weather_data = fie.get("Weather", {}).get(today) if not weather_data: get_weather_data(metadata["field_min_lat"], metadata["field_min_long"], today, uid, fieldid) except Exception as e: logger.error(f"Error getting weather data: {e}") # Generate maps try: octave.make_gen_maps(uid, fieldid) except Exception as e: logger.error(f"Error generating maps: {e}") # Process SAR and DEM data field_max_lat = req_json.get("FieldMaxLat", 0) field_min_lat = req_json.get("FieldMinLat", 0) field_max_long = req_json.get("FieldMaxLong", 0) field_min_long = req_json.get("FieldMinLong", 0) last_rvi_day = max((int(day) for day in metadata["sar_days"]), default=0) last_dem_day = max((int(day) for day in metadata["dem_days"]), default=0) if int(latest_rvi_day.replace("-", "")) > last_rvi_day and not expired and not metadata["paused"]: try: merge_sar( uid, fieldid, metadata["coordinates"], latest_rvi_day, field_max_lat, field_min_lat, field_max_long, field_min_long ) merge_dem( uid, fieldid, metadata["coordinates"], latest_dem_day, field_max_lat, field_min_lat, field_max_long, field_min_long ) for img in ["rvi", "rsm"]: destination = f"PaidMonitoredFields/{uid}/{fieldid}/{latest_rvi_day}/{img}" file_name = f"{uid}/{fieldid}/{img}.png" try: blob = bucket.blob(destination) blob.upload_from_filename(file_name) except Exception as e: logger.error(f"Error uploading {img}: {e}") destination = f"PaidMonitoredFields/{uid}/{fieldid}/{latest_dem_day}/dem" file_name = f"{uid}/{fieldid}/dem.png" try: blob = bucket.blob(destination) blob.upload_from_filename(file_name) except Exception as e: logger.error(f"Error uploading DEM: {e}") db_ref.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/SARDays/{latest_rvi_day}").set("yes") except Exception as e: logger.error(f"Error processing SAR/DEM: {e}") # Process images mgrs = req_json["MGRS"] result_status = "failed4" if "NotA" in mgrs else make_field_images(req_json) if "successful" not in result_status: db_ref.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/FailedDays/{imagedate}").set("yes") send_error_noti(uid) if metadata["gen_tif"]: report_return = gen_report_new( uid, metadata["field_description"], 0, 0, imagedate, fieldid, metadata["field_area"], metadata["language"], 1, metadata["whitelabel"], [field_min_lat, field_min_long, field_max_lat, field_max_long], metadata["coordinates"], None, 0, 0, fie ) upload_reports( metadata["language"], uid, metadata["whitelabel"], imagedate, metadata["field_description"], fieldid, bucket ) sendfailedreport( set(), metadata["email"], metadata["field_description"], imagedate, metadata["whitelabel"], metadata["whitelabel_email"], metadata["whitelabel_password"] ) return # Process successful case try: center_lat = fie.get("CenterLatLarge", (float(field_max_lat) + float(field_min_lat)) / 2) center_long = fie.get("CenterLongLarge", (float(field_max_long) + float(field_min_long)) / 2) # Generate map images for img_type, func in [ ("_static_map_large.png", gmap_image_large), ("_static_map.png", gmap_image) ]: img_path = f"{uid}/{fieldid}/{img_type}" if not os.path.exists(img_path): latlen, longlen, clat, clong = func( field_min_lat, field_max_lat, field_min_long, field_max_long, req_json["TileMaxLat"], req_json["TileMinLat"], req_json["TileMaxLong"], req_json["TileMinLong"], fieldid, uid, metadata["field_area"] ) db_ref.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/FieldLatLen{'Large' if 'large' in img_type else ''}").set(latlen) db_ref.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/FieldLongLen{'Large' if 'large' in img_type else ''}").set(longlen) db_ref.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/CenterLat{'Large' if 'large' in img_type else ''}").set(clat) db_ref.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/CenterLong{'Large' if 'large' in img_type else ''}").set(clong) # Map coordinates map_coords( uid, fieldid, metadata["coordinates"], imagedate, field_max_lat, field_min_lat, field_max_long, field_min_long, metadata["crop_code"], IMAGE_TYPES + CMAP_TYPES + CHART_TYPES, metadata["field_area"] ) # Generate GeoTIFFs if metadata["gen_tif"]: for tif_name in TIF_TYPES: gen_geotiff2(req_json, uid, tif_name.replace(".tif", ""), tif_name) destination = f"PaidMonitoredFields/{uid}/{fieldid}/{imagedate}/{tif_name}" try: blob = bucket.blob(destination) blob.upload_from_filename(f"{uid}/{fieldid}/{tif_name}") except Exception as e: logger.error(f"Error uploading GeoTIFF {tif_name}: {e}") # Generate and upload report ndvi, ndwi = get_indices(uid, fieldid, imagedate, db_ref) report_return = gen_report_new( uid, metadata["field_description"], center_lat, center_long, imagedate, fieldid, metadata["field_area"], metadata["language"], 0, metadata["whitelabel"], [field_min_lat, field_min_long, field_max_lat, field_max_long], metadata["coordinates"], None, ndvi, ndwi, fie ) upload_images(uid, fieldid, imagedate, IMAGE_TYPES + CMAP_TYPES + CHART_TYPES, bucket) upload_images(uid, fieldid, imagedate, ["mask_img"], bucket) upload_reports( metadata["language"], uid, metadata["whitelabel"], imagedate, metadata["field_description"], fieldid, bucket ) # Update database and send notifications if report_return: db_ref.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/SensedDays/{imagedate}").set(report_return) db_ref.reference(f"PaidMonitoredFields/PMF/{uid}/{fieldid}/LatestDay").set(imagedate) send_html_email( metadata["email"], "Farmer", metadata["field_description"], " ", imagedate, " ", fie.get("PaymentType", 0), uid, fieldid, metadata["language"] ) send_webhook_data(uid, fieldid, imagedate) try: db_f.collection(uid).document(fieldid).update({"LatestSensedDay": imagedate}) except Exception as e: logger.error(f"Error updating Firestore: {e}") if metadata["whitelabel"].lower() == "farmonaut": send_moni_noti(uid, metadata["field_description"], "") if metadata["expiring"]: send_expiring_noti(uid) except Exception as e: logger.error(f"Error processing successful case: {e}") def server2024(uid: Optional[str], fieldid: Optional[str], session_type: str) -> None: """ Process satellite images for fields. Args: uid: User ID (optional) fieldid: Field ID (optional) session_type: Type of session ('main', 'api', 'bulk') """ db_f = initialize_firebase() if not db_f: logger.error("Firebase initialization failed") return storage_client = storage.Client(project=PROJECT_ID) bucket = storage_client.bucket(BUCKET_NAME) db_ref = db today = datetime.date.today().strftime("%Y%m%d") sentinel_settings = db_ref.reference("SentinelSettings4").get() uid_list = {} if uid not in DISABLED_UIDS: if not uid and not fieldid: temp_list = db_ref.reference("PaidMonitoredFields/PMF").get(False, True) for m_uid in temp_list: if m_uid not in DISABLED_UIDS: uid_list[m_uid] = db_ref.reference(f"PaidMonitoredFields/PMF/{m_uid}").get(False, True) elif uid and not fieldid: uid_list[uid] = db_ref.reference(f"PaidMonitoredFields/PMF/{uid}").get(False, True) else: uid_list[uid] = {fieldid: True} logger.info(f"Processing {len(uid_list)} users, UID: {uid}, FieldID: {fieldid}") for m_uid, fields in uid_list.items(): for field_id in fields: make_dir(m_uid, field_id) fie = db_ref.reference(f"PaidMonitoredFields/PMF/{m_uid}/{field_id}").get() if should_delete_field(m_uid, field_id, fie, db_ref): continue process_data = process_field_flag.process_field(m_uid, field_id) if not process_data["process_field"]: os.system(f"rm -rf {m_uid}/{field_id}") continue try: previous_requests = fie.get("PreviousDataRequests", {}) satellite_processed = False if not is_data_already_processed(m_uid, field_id, db_ref) and session_type == "main": process_field_data(m_uid, field_id, fie, sentinel_settings, db_ref, bucket) satellite_processed = True for req_date in previous_requests: from_date, to_date = get_to_and_from_date(req_date, 0) radar_from_date, radar_to_date = get_to_and_from_date_radar(req_date, 0) for attempt in range(3): try: process_field_data( m_uid, field_id, fie, sentinel_settings, db_ref, bucket ) satellite_processed = True break except Exception as e: logger.error(f"Attempt {attempt + 1} failed: {traceback.format_exc()}") time.sleep(10) db_ref.reference(f"PaidMonitoredFields/PMF/{m_uid}/{field_id}/PreviousDataRequests/{req_date}").delete() if satellite_processed and process_data["process_as"] in [2, 3]: if process_data["process_as"] == 2: visits_needed = process_data.get("visits_needed", 0) user_obj = process_data.get("user_obj", {}) pre_visits = user_obj.get("TotalSatelliteVisits", 0) user_visits_ref = process_field_flag.db.reference( f"/{m_uid}/MyProfile/TotalSatelliteVisits", app=process_field_flag.default_app_2 ) user_visits_ref.set(pre_visits + visits_needed) elif process_data["process_as"] == 3 and session_type == "bulk": credits_needed = process_data.get("credits_needed", 0) org_api_obj = process_data.get("org_api_obj", {}) org_api_obj["UsedUnits"] = int(org_api_obj.get("UsedUnits", 0)) + credits_needed org_api_obj["remainingUnits"] = int(org_api_obj.get("remainingUnits", 0)) - credits_needed db_ref.reference(f"Organizations/{m_uid}").set(org_api_obj) except Exception as e: logger.error(f"Error processing field {field_id}: {traceback.format_exc()}") os.system(f"rm -rf {m_uid}/{field_id}")