# UID = "CFv7IjeJR8ZKFWSXb95qzgvymCv1" # Kapil farmonaut # UID = "HF9dSIoDEYcTwebrRWVzb08SQln2" ## loni_dcm # UID = "W851sDRFXHUkocpMIgIzWm9mI8B2" # demo account import requests from datetime import datetime import json import threading import firebase_admin from firebase_admin import db, credentials, storage def get_sat_image(UID, fieldId, SensedDay, ImageType): endpointUrl = "https://us-central1-farmbase-b2f7e.cloudfunctions.net/getFieldImage" bodyObj = { "UID": UID, "FieldID": fieldId, "ImageType": ImageType, "SensedDay": SensedDay, "Colormap": "1", } response = requests.post(endpointUrl, json=bodyObj) return response.json()["url"] def calculate_pixel_count(image_url): from PIL import Image import numpy as np import requests from io import BytesIO # Function to download image from URL def download_image_from_url(url): response = requests.get(url) if response.status_code == 200: image = Image.open(BytesIO(response.content)) return image else: print("Failed to download image") return None # Open the image # image = Image.open(image_url) image = download_image_from_url(image_url) # Convert the image to a numpy array image_array = np.array(image) # Define the color palette colors = { (255, 255, 255, 255): "White", (0, 0, 0, 0): "EmptyColor", (17, 167, 95, 255): "Green", # #11a75f (145, 16, 44, 255): "Red", # #91102c (234, 79, 59, 255): "Orange", # #ea4f3b (60, 19, 97, 255): "Purple", # #3c1361 } # Count pixels for each color color_counts = {} total_pixels = 0 # Initialize total pixel count for color, name in colors.items(): if name != "EmptyColor": # Exclude "EmptyColor" count = np.sum(np.all(image_array == color, axis=-1)) total_pixels += count # Increment total_pixels color_counts[name] = count # Calculate percentage for each color color_percentages = {} for name, count in color_counts.items(): percentage = (count / total_pixels) * 100 color_percentages[name] = percentage return color_percentages def process_one_field_data(fieldId, UID, data_list): str_ref_db_userId = "/PaidMonitoredFields/PMF/"+ UID + "/" + str(fieldId) #str_ref_db_userId = "/PaidMonitoredFields/PMF/" + UID + "/" + str(fieldId) ref = db.reference(f"{str_ref_db_userId}") fieldName = ref.child("FieldDescription").get() if fieldName is None: fieldName = ref.child("FieldAddress").get() if fieldName is None: fieldName = ref.child("Name").get() if fieldName is None: fieldName = ref.child("Phone").get() if fieldName is None: fieldName = "" fieldArea = ref.child("FieldArea").get() fieldArea = "{:,} sq m".format(fieldArea) lastSatelliteVisit = ref.child("LatestDay").get() if lastSatelliteVisit is None: ## skip return coordinates_db = ref.child("Coordinates").get() if coordinates_db is None: return # Define custom sorting order lat_lon_pairs = [] def custom_sort(key): if key == "a": return 0 elif key.startswith("P_"): return int(key[2:]) # Sort the keys of the Coordinates dictionary using the custom sorting function sorted_coordinates_keys = sorted( coordinates_db.keys(), key=custom_sort, ) # Iterate over the sorted keys for point in sorted_coordinates_keys: coordinates = coordinates_db[point] latitude = float(coordinates["Latitude"]) longitude = float(coordinates["Longitude"]) lat_lon_pairs.append([latitude, longitude]) if lat_lon_pairs: lat_lon_pairs.append(lat_lon_pairs[0]) # Separate the latitude and longitude values latitudes = [coord[0] for coord in lat_lon_pairs] longitudes = [coord[1] for coord in lat_lon_pairs] # Generate image URL ImageType = "hybrid" image_url = get_sat_image(UID, fieldId, lastSatelliteVisit, ImageType) # Calculate pixel count percentage_data = calculate_pixel_count(image_url) row_dict = { "fieldName": fieldName, "fieldArea": fieldArea, "lastSatelliteVisit": datetime.strptime(lastSatelliteVisit, "%Y%m%d").strftime( "%d %B %Y" ), "coordinates": lat_lon_pairs, "maxLat": max(latitudes), "maxLon": max(longitudes), "minLat": min(latitudes), "minLon": min(longitudes), "url": image_url, "Satellite_Data": { "white": "{:.2f}%".format(float(percentage_data["White"])), "green": "{:.2f}%".format(float(percentage_data["Green"])), "orange": "{:.2f}%".format(float(percentage_data["Orange"])), "purple": "{:.2f}%".format(float(percentage_data["Purple"])), "red": "{:.2f}%".format(float(percentage_data["Red"])), }, } # Append the dictionary to the list data_list.append(row_dict) #print(datetime.now()) def make_interactive_html_report(UID): # # Firebase Config/Setup # #------------------------------------------------------------------------------- database_url = "https://farmbase-b2f7e-31c0c.firebaseio.com/" storage_url = "farmbase-b2f7e.appspot.com" cred = credentials.Certificate( "servicekey.json" ) try:#if not firebase_admin._apps: firebase_admin.initialize_app( cred, {"databaseURL": database_url, "storageBucket": storage_url} ) except: #firebase_admin.initialize_app( # cred, {"databaseURL": database_url, "storageBucket": storage_url} #) print("Already initialized") #firebase_admin.get_app() #default_app = firebase_admin._apps["[DEFAULT]"] # Get all FieldIDs of the user str_ref_db_userId = "/PaidMonitoredFields/PMF/" + UID ref = db.reference(f"{str_ref_db_userId}") FieldIDs = list(ref.get(shallow=True).keys()) # Shallow to get only keys FieldIDs = sorted(FieldIDs) last_field_added_timestamp = float(FieldIDs[-1]) / 1000 try: # Get modified time for interactive_report.html bucket = storage.bucket() blob = bucket.blob("PaidMonitoredFields/" + UID + "/interactive_report.html") blob.reload() report_timestamp = blob.time_created.timestamp() condition_new_field_added = last_field_added_timestamp - report_timestamp >= 0 current_timestamp = datetime.now().timestamp() twenty_four_hours = 24 * 60 * 60 condition_24hr_passed = current_timestamp - report_timestamp >= twenty_four_hours except: condition_new_field_added = True condition_24hr_passed = True # condition_24hr_passed needed if the new satellite visit has occured if condition_new_field_added or condition_24hr_passed: data_list = [] batch_size = 100 for i in range((len(FieldIDs) + batch_size - 1) // batch_size): threads = [] print(batch_size * (i), batch_size * (i + 1)) for fieldId in FieldIDs[ batch_size * (i) : batch_size * (i + 1) ]: # Limit the batch size t = threading.Thread(target=process_one_field_data, args=(fieldId, UID, data_list)) t.start() threads.append(t) for t in threads: t.join() # Wait for all threads to complete i = i + 1 # Convert the list of dictionaries to a JSON string json_data = json.dumps(data_list, indent=2) # Read the content of the HTML file template_file_name = "report_template_interactive.html" #file_name = datetime.today().strftime("%Y%m%d") + "_" + UID[:5] + ".html" file_name = UID + "/interactive_report.html" with open(template_file_name, "r", encoding="utf-8") as file: content_str = file.read() ## Report Data content_str = content_str.replace("DATA_TO_REPLACE", json_data) ## Report Date content_str = content_str.replace( "DATE_TO_REPLACE", datetime.today().strftime("%d %B %Y") ) # Save the final report = modified HTML content with open(file_name, "w") as f: f.write(content_str)