from server2024 import server2024 import firebase_admin from firebase_admin import credentials from firebase_admin import db from firebase_admin import messaging import time import os import sys import json import traceback from logger.main_logger2 import Logger import signal # get args try: session_type = sys.argv[1] except: print("err getting args", traceback.format_exc()) session_type = "0" # Default session type if no arg provided # init firebase cred = credentials.Certificate("servicekey.json") try: firebase_admin.initialize_app( cred, {"databaseURL": "https://farmbase-b2f7e-31c0c.firebaseio.com/"} ) except: print("fire running") # init logger based on session type if int(session_type) == 0: logger = Logger("main_tmux") elif int(session_type) == 1: logger = Logger("api_tmux") elif int(session_type) == 2: logger = Logger("bulk_tmux") else: logger = Logger("default_tmux") # set cleanup def signal_handler(signal, frame): if logger: logger.shutdown() signal.signal(signal.SIGINT, signal_handler) # constants uid = "snQYQZqQx3SmVbRztmEqYn5Mkcz2" def bulk_api_thread(ww, is_test=False): # run forever while True: try: bulk_pending_list = db.reference("BulkPendingData").get() pending_list = db.reference( "PendingData" ).get() # old way of single historical request (NOT USED ANYMORE) # check single request if pending_list is not None: time.sleep(60) continue # check bulk request if bulk_pending_list is None: print("No bulk pending requests") time.sleep(60) continue for uid, field_obj in bulk_pending_list.items(): for fieldid, status_obj in field_obj.items(): print("processing", uid, fieldid) if not is_test: # print("calling server2024") server2024(uid, fieldid, "bulk") # nextRelease # remove bulk request obj if all done hist_obj = ( None if is_test else db.reference("PaidMonitoredFields") .child("PMF") .child(uid) .child(fieldid) .child("PreviousDataRequests") .get() ) if hist_obj is None: # all requests processed print("all requests processed", fieldid) if not is_test: # print("deleting bulk pending data", uid, fieldid) db.reference("BulkPendingData").child(uid).child( fieldid ).delete() # nextRelease else: print("not all requests processed", fieldid) # try: # for(a,b) in hist_obj.items(): # abb = 1 # except: # print('all requests processed') # db.reference('BulkPendingData').child(uid).child(fieldid).delete() except Exception as e: # Get the exception information exc_type, exc_value, exc_traceback = sys.exc_info() # Get the line number lineno = traceback.extract_tb(exc_traceback)[-1].lineno # log error if logger and logger.mlogger: logger.mlogger.error( "bulk main err", {"line_no": lineno, "trace": traceback.format_exc()}, ) else: print("Logger not initialized properly") print("builk err trace", traceback.format_exc()) print("sleeping for 1 mins.") time.sleep(60) def api_thread(ww): iters = 0 while True: try: pending_list = db.reference("PendingData").get() print(pending_list) if pending_list is None: print("No pending requests") time.sleep(60) continue for p, q in pending_list.items(): uid = p if uid != "ipRHhCOFIDV2pxgg7Nfz1ufZBmV2": for x, y in q.items(): fieldid = x server2024(uid, fieldid, "api") # check if any more req left hist_obj = ( db.reference("PaidMonitoredFields") .child("PMF") .child(uid) .child(fieldid) .child("PreviousDataRequests") .get() ) print("hist_obj", hist_obj) db.reference("PendingData").child(uid).child(fieldid).delete() if hist_obj is None: print("all requests processed") db.reference("PendingData").child(uid).child( fieldid ).delete() except Exception as e: # Get the exception information exc_type, exc_value, exc_traceback = sys.exc_info() # Get the line number lineno = traceback.extract_tb(exc_traceback)[-1].lineno # log error if logger and logger.mlogger: logger.mlogger.error( "api main err", {"line_no": lineno, "trace": traceback.format_exc()}, ) else: print("Logger not initialized properly") print("api err trace", traceback.format_exc()) print("sleeping for 1 mins.") time.sleep(60) def tmux_thread(ww): while True: tmux_init() time.sleep(86400) # time equals to 24 hours def send_keys_tmux_session(session_num, fields_arr): fields_obj = {} for single_field in fields_arr: fields_obj[single_field[1]] = single_field[0] # if single_field[1] == '1680249793393': # print(single_field[1]) session_char = "p" file_name = session_char + str(session_num) + ".json" with open(file_name, "w") as json_file: json.dump(fields_obj, json_file) # with open(file_name, "w") as outfile: # outfile.write(fields_obj) print(len(fields_arr)) tmux_cmd = ( "tmux send-keys -t " + session_char + str(session_num) + ' "python3.7 ' + " init_sat_tmux.py '" + file_name + "'\" Enter" ) # tmux_cmd = 'tmux send-keys -t s' + str(session_num) + ' "python3.7 ' + ' init_sat_tmux.py ' + str(fields_obj) + '" Enter' print(tmux_cmd) os.system(tmux_cmd) def tmux_init(): """ Initialize tmux sessions for farm monitoring. This function will divide the fields into 'n_tmux' sessions and send the commands to the tmux sessions. :return: None """ all_fields = [] field_num = 0 n_tmux = 9 # priority_proportion = 0.8 temp_list = db.reference("PaidMonitoredFields").child("PMF").get(False, True) # temp_list = {} # temp_list["ipRHhCOFIDV2pxgg7Nfz1ufZBmV2"] = True # temp_list["F34CLcmyzzczz7gt764WRQKXda72"] = True # temp_list["HF9dSIoDEYcTwebrRWVzb08SQln2"] = True clip_interval_num = 0 # priority_uids = ["M53J9SImW9My4vghlCaaWLYxsc22", "sAiJIWRbNOR5grDmRabVxW5BXp03"] disabledUIDs = [ "jsBfju6ZH5YR0WLswL9g77XnQaw2", "snQYQZqQx3SmVbRztmEqYn5Mkcz2", "KZQ7TZIYXnXN0b07OtrL1hlyYij1", "CeMGYvLXrGR5ZThxZ46iV7vY8sa2", "TCXcp5VIsfhHZrh0nm2VsgBtcGy2", "mFFHQdEtiSbn2hbYQAwwoIdYVi02", "sAiJIWRbNOR5grDmRabVxW5BXp03", "D4xU2QGhooXfK6qiEeHdAlp0wk53", "HxugQlTZOYXSHIAv5Qh63iBlMof2", "ipRHhCOFIDV2pxgg7Nfz1ufZBmV2", "4fPRPyszwLOjweG1qbpiCx3CFQo1", ] # Iterate over the fields and add them to the 'all_fields' list if temp_list: for uid in temp_list: if uid in disabledUIDs: continue uid_obj = ( db.reference("PaidMonitoredFields").child("PMF").child(uid).get(False, True) ) if uid_obj: for fieldid in uid_obj: all_fields.append((uid, fieldid)) field_num = field_num + 1 # Calculate the interval for dividing the fields into 'n_tmux' sessions if field_num > 0: interval = round(field_num / (n_tmux - 1)) print([field_num, interval]) clipped_fields_arr = [] i = 0 priority_fields_num = 0 clip_interval_num = 0 # Iterate over the fields and send the commands to the tmux sessions for single_field in all_fields: uid, fieldid = single_field[0], single_field[1] if clip_interval_num < interval: clipped_fields_arr.append(single_field) priority_fields_num = priority_fields_num + 1 clip_interval_num = clip_interval_num + 1 else: # print('priority field') send_keys_tmux_session((i + 1), clipped_fields_arr) clipped_fields_arr = [] clip_interval_num = 0 i = i + 1 time.sleep(10) if clipped_fields_arr: # Send any remaining fields send_keys_tmux_session((i + 1), clipped_fields_arr) else: print("No fields found") def _test(): """Test Function""" bulk_api_thread("", True) # START FUNCTION CALL print("session_type", session_type) if int(session_type) == 0: # Logger is already initialized above tmux_thread("1") print("tmux") elif int(session_type) == 1: # Logger is already initialized above api_thread("1") print("api") elif int(session_type) == 2: # Logger is already initialized above bulk_api_thread("1") print("bulk api")