import requests from datetime import datetime import json import threading import firebase_admin from firebase_admin import db, credentials, storage from firebase_admin import firestore from google.cloud.firestore_v1.base_query import FieldFilter import time def get_sat_image(UID, fieldId, SensedDay, ImageType): endpointUrl = "https://us-central1-farmbase-b2f7e.cloudfunctions.net/getFieldImage" bodyObj = { "UID": UID, "FieldID": fieldId, "ImageType": ImageType, "SensedDay": SensedDay, "Colormap": "1", } response = requests.post(endpointUrl, json=bodyObj) return response.json()["url"] def calculate_pixel_count(image_url): from PIL import Image import numpy as np import requests from io import BytesIO # Function to download image from URL def download_image_from_url(url): response = requests.get(url) if response.status_code == 200: image = Image.open(BytesIO(response.content)) return image else: print("Failed to download image") return None # Open the image # image = Image.open(image_url) image = download_image_from_url(image_url) # Convert the image to a numpy array image_array = np.array(image) # Define the color palette colors = { (255, 255, 255, 255): "White", # FFFFFF (0, 0, 0, 0): "EmptyColor", (17, 167, 95, 255): "Green", # #11a75f #11A75F (145, 16, 44, 255): "Red", # #91102c #91102C (247, 136, 90, 255): "Orange", # F7885A (60, 19, 97, 255): "Purple", # #3c1361 #3C1361 } # Count pixels for each color color_counts = {} total_pixels = 0 # Initialize total pixel count for color, name in colors.items(): if name != "EmptyColor": # Exclude "EmptyColor" count = np.sum(np.all(image_array == color, axis=-1)) total_pixels += count # Increment total_pixels color_counts[name] = count # Calculate percentage for each color color_percentages = {} for name, count in color_counts.items(): percentage = (count / total_pixels) * 100 color_percentages[name] = percentage return color_percentages def get_jeevan_ai_advisory(UID, FieldID, crop): endpointUrl = "https://us-central1-farmbase-b2f7e.cloudfunctions.net/askJeevnAPI" bodyObj = { "UID": UID, "FieldID": FieldID, # "1711985970653", "Crop": crop, # "grape", # "SowingMonth": sowing_month, # 0 } response = requests.post(endpointUrl, json=bodyObj) print(FieldID, " Status code: ", response.status_code) # print("Printing Entire Post Request") # print(FieldID, response.json()) return response.json() def process_one_field_data( UID, fieldId, field_data_list, jeevan_ai_data_list, db_firestore ): str_ref_db_userId = "/PaidMonitoredFields/PMF/" + UID + "/" + str(fieldId) ref = db.reference(f"{str_ref_db_userId}") if ref.child("Expired").get() == str(1) or ref.child("Paused").get() == "yes": return fieldName = ref.child("FieldDescription").get() if fieldName is None: fieldName = ref.child("FieldAddress").get() if fieldName is None: fieldName = ref.child("Name").get() if fieldName is None: fieldName = ref.child("Phone").get() if fieldName is None: fieldName = "" fieldArea = ref.child("FieldArea").get() # fieldArea = "{:,} sq m".format(fieldArea) # Convert square meters to area and format it to two decimal places # fieldArea = "{:.3f} hectares".format(fieldArea / 10000) # Convert square meters to acres and format it to two decimal places fieldArea = "{:.3f} acres".format(fieldArea * 0.000247105) lastSatelliteVisit = ref.child("LatestDay").get() if lastSatelliteVisit is None: ## skip return coordinates_db = ref.child("Coordinates").get() if coordinates_db is None: return # Define custom sorting order lat_lon_pairs = [] def custom_sort(key): if key == "a": return 0 elif key.startswith("P_"): return int(key[2:]) # Sort the keys of the Coordinates dictionary using the custom sorting function sorted_coordinates_keys = sorted( coordinates_db.keys(), key=custom_sort, ) # Iterate over the sorted keys for point in sorted_coordinates_keys: coordinates = coordinates_db[point] latitude = float(coordinates["Latitude"]) longitude = float(coordinates["Longitude"]) lat_lon_pairs.append([latitude, longitude]) if lat_lon_pairs: lat_lon_pairs.append(lat_lon_pairs[0]) # Separate the latitude and longitude values latitudes = [coord[0] for coord in lat_lon_pairs] longitudes = [coord[1] for coord in lat_lon_pairs] # Generate image URL ImageType = "hybrid" image_url = get_sat_image(UID, fieldId, lastSatelliteVisit, ImageType) # Calculate pixel count percentage_data = calculate_pixel_count(image_url) print(fieldId) row_dict = { "fieldID": fieldId, "fieldName": fieldName, "fieldArea": fieldArea, "lastSatelliteVisit": datetime.strptime(lastSatelliteVisit, "%Y%m%d").strftime( "%d %B %Y" ), "coordinates": lat_lon_pairs, "maxLat": max(latitudes), "maxLon": max(longitudes), "minLat": min(latitudes), "minLon": min(longitudes), "url": image_url, "Satellite_Data": { "white": "{:.2f}%".format(float(percentage_data["White"])), "green": "{:.2f}%".format(float(percentage_data["Green"])), "orange": "{:.2f}%".format(float(percentage_data["Orange"])), "purple": "{:.2f}%".format(float(percentage_data["Purple"])), "red": "{:.2f}%".format(float(percentage_data["Red"])), }, } field_data_list.append(row_dict) crop_name = "Unknown Crop" crop_code_dict = db.reference("/CropCodes").get() crops = ref.child("FieldCrops").get() if crops: crop_name = crops[0] else: crop_code = ref.child("CropCode").get() print(crop_code) if crop_code: crop_name = crop_code_dict.get(crop_code, "Unknown Crop") else: print(fieldId, "No crop defined for this field") print(crop_name) if crop_name != "Unknown Crop": advisory_tree = get_jeevan_ai_advisory(UID, fieldId, crop_name) advisory = advisory_tree.get("advisory") advisory["Crop"] = advisory_tree.get("Crop") jeevan_ai_row_dict = {fieldId: advisory} jeevan_ai_data_list.append(jeevan_ai_row_dict) else: print(fieldId, "Crop code not in database") def create_interactive_report(UID): ##### MAIN #####`` # # Firebase Config/Setup # #------------------------------------------------------------------------------- database_url = "https://farmbase-b2f7e-31c0c.firebaseio.com/" storage_url = "farmbase-b2f7e.appspot.com" cred = credentials.Certificate("servicekey.json") try: firebase_admin.initialize_app( cred, {"databaseURL": database_url, "storageBucket": storage_url} ) except: print("Already initialized") # default_app = firebase_admin._apps["[DEFAULT]"] db_firestore = firestore.client() # Get all FieldIDs of the user is_admin = db.reference("SalesAssociatesPromo").child("AdminUIDs").child(UID).get() if UID == "mmC6X2DpreSxO6580MtUKhyaVgq1": is_admin = None if is_admin is None: str_ref_db_userId = "/PaidMonitoredFields/PMF/" + UID ref = db.reference(f"{str_ref_db_userId}") FieldIDs = list(ref.get(shallow=True).keys()) # Shallow to get only keys if len(FieldIDs) > 50: print("number of fields more than 50") return FieldIDs = sorted(FieldIDs) last_field_added_timestamp = float(FieldIDs[-1]) / 1000 # print(len(FieldIDs)) try: # Get creation time for interactive_report.html bucket = storage.bucket() blob = bucket.blob( "PaidMonitoredFields/" + UID + "/interactive_report.html" ) blob.reload() report_timestamp = blob.time_created.timestamp() condition_new_field_added = ( last_field_added_timestamp - report_timestamp >= 0 ) current_timestamp = datetime.now().timestamp() twenty_four_hours = 24 * 60 * 60 condition_24hr_passed = ( current_timestamp - report_timestamp >= twenty_four_hours ) except: condition_new_field_added = True condition_24hr_passed = True condition_new_field_added = True print(condition_new_field_added, condition_24hr_passed) # condition_24hr_passed needed if the new satellite visit has occured if condition_new_field_added or condition_24hr_passed: field_data_list = [] jeevan_ai_data_list = [] batch_size = 100 for i in range((len(FieldIDs) + batch_size - 1) // batch_size): threads = [] print(batch_size * (i), batch_size * (i + 1)) for fieldId in FieldIDs[ batch_size * (i) : batch_size * (i + 1) ]: # Limit the batch size t = threading.Thread( target=process_one_field_data, args=( UID, fieldId, field_data_list, jeevan_ai_data_list, db_firestore, ), ) time.sleep(0.5) t.start() threads.append(t) for t in threads: t.join() # Wait for all threads to complete i = i + 1 # Convert the list of dictionaries to a JSON string json_data = json.dumps(field_data_list, indent=2) jeevanai_data = json.dumps(jeevan_ai_data_list, indent=2) # Read the content of the HTML file template_file_name = "template_Interactive_jeevanai_report.html" output_file_name = UID + ".html" output_file_name = UID + "/interactive_report.html" with open(template_file_name, "r", encoding="utf-8") as file: content_str = file.read() ## Report Data content_str = content_str.replace("DATA_TO_REPLACE", json_data) ## Report Date content_str = content_str.replace( "DATE_TO_REPLACE", datetime.today().strftime("%d %B %Y") ) ###jeevanai Data content_str = content_str.replace("jeevan_ai_data_replace", jeevanai_data) # Save the final report = modified HTML content with open(output_file_name, "w") as f: f.write(content_str) #### How to use # UID = "I0C7dewCrhfejCBqaAj2U9PyjQw2" # Kapil farmonaut # UID = "j7ji6kLaTmN9hbCCnnIRJAA7RB42" # UID = "HF9dSIoDEYcTwebrRWVzb08SQln2" ## loni_dcm - donot use for testing has very high number of farms # UID = "W851sDRFXHUkocpMIgIzWm9mI8B2" # demo account #UID = "lWgPLqmGG5Qxi6gsqIuxyIetsXj1" #UID = "Hcdz24Lz8MhMy4of7iqXzcDIrnB3" #create_interactive_report(UID)