B
    Noi%                 @   s   d dl Z d dlZd dlZd dlZd dlZd dlZdZdZdZdZ	dZ
dZdZd	Ze jed
d dZejeed
edZdd Zdd Zdd Zdd Zdd Zdd Zdd Zedkred ed dS )    Nz./ai_report_batchesz!init_interactive_report_worker.py   Zai_report_workerx   
   	localhosti  T)exist_okzFNlots951$#$)hostportZdecode_responsespasswordc             C   s<   t t d }dtjddd}d|  d| d| dS )	Ni   Zabcdef0123456789   )kZbatch__z.json)inttimejoinrandomchoices)
session_idmsZrand r   8/home/ankuromar296_gmail_com/init_interactive_report2.pygenerate_batch_filename   s    r   c          
   C   sl   t  |  }tj }d|d< tjdddd|dtt| g|tjtjd t	
d	|  t	d
|  dd |S )N1ZPYTHONUNBUFFEREDtmuxznew-sessionz-dz-sz	python3.7)envstdoutstderrztmux:ai_report_sessionsztmux:ai_report_session:z:statusrunning)SESSION_NAME_PREFIXosenvironcopy
subprocessPopenWORKER_FILEstrDEVNULLrsaddset)r   namer   r   r   r   start_tmux_session%   s$    

r+   c             C   s0   t  |  }tjddd|gtjtjd}|dkS )Nr   zhas-sessionz-t)r   r   r   )r   r"   callr&   )r   r*   resultr   r   r   tmux_session_existsA   s    

r.   c             C   sr   t | }|dkrdS d}d}x,|  D ] \}}|d |k r&|d }|}q&W ||k rj|td krjd|d fS d	|fS )
z
    Find session with least load.
    If all busy AND below max_sessions -> create new session.

    Returns: (session_type, session_id)
    r   )new   Nl    J)request_count   r/   r0   existing)lenitemsREQUESTS_PER_BATCH)session_statesmax_sessionsactive_countZbest_idZlowestsidstr   r   r   pick_best_sessionK   s    r<   c              C   s   i } xt dD ]}t|}t|}tt d| dp:d}t | }t d| dp^d}|rl|tkrpd}tt d| dpd}||d	| |< qW | S )
Nztmux:ai_report_sessionsztmux:ai_report_session:z:last_heartbeatr   z:statusunknowndeadz:request_count)statusr1   )r'   smembersr   r.   getr   HEARTBEAT_TIMEOUT)Zstatesr:   aliveZhbager?   Z	req_countr   r   r   load_sessionsl   s    rE   c          	   C   s   t | }tjt|}t|d}t|| W d Q R X d| d}t	 }x(|
 D ]}|d| ||| qVW |d| d|  |d| dtt  |  td|  d| td|  d	t| |S )
Nwztmux:ai_report_batch:z	:requestsztmux:ai_report_active_requestsz:sessionz:created_atztmux:ai_report_session:z:queuez:request_count)r   r   pathr   	BATCH_DIRopenjsondumpr'   Zpipelinekeysr(   r)   r   r   ZexecuteZrpushZincrbyr4   )r   Zreq_objsfilenamefullpathfkeypiperidr   r   r   create_batch_file   s    rS   c                s  t d  r& fdd|  D n| t| t }|rNtd| d s^td dS t }tt dprt}x6| D ]*\}}|d	 d
krtd|  t	| qW t
 }tdt| d}x|t|k rb|||t  }fdd|D }	t||\}
}|
dkr:td|  t	| ddd||< t||	}td| d|  |t7 }qW x>t dD ]0}t|}t|sptd| d t	| qpW td dS )z5
    new_requests: dict {request_id: {...}, ...}
    ztmux:ai_report_active_requestsc                s   i | ]\}}| kr||qS r   r   ).0rR   Zrobj)active_req_idsr   r   
<dictcomp>   s   z(run_assignment_cycle.<locals>.<dictcomp>zSkipped z requests already in processzNo new requests to assignNztmux:ai_report_max_sessionsr?   r>   zRestarting dead session new_requestsr   c                s   i | ]} | |qS r   r   )rT   rR   )filtered_requestsr   r   rV      s    r/   zCreating new tmux session r   )r?   r1   zAssigned batch z to session ztmux:ai_report_sessionszSession z missing; restartingzCycle complete.)r'   r@   r5   r4   printrE   r   rA   MAX_SESSIONSr+   listrL   r6   r<   rS   r.   )rW   skippedr7   r8   r:   r;   Zreq_idsichunkZreq_obj_chunkZ	sess_typerM   r   )rU   rX   r   run_assignment_cycle   sJ    




r_   __main__r0   done)r   rJ   r   r   r"   ZredisrH   r$   r6   r   rB   rZ   Z
REDIS_HOSTZ
REDIS_PORTmakedirsZredis_pwZRedisr'   r   r+   r.   r<   rE   rS   r_   __name__rY   r   r   r   r   <module>   s6   	
!U