""" Authenticate a user """ from __future__ import unicode_literals import json import getpass from os.path import join import logging import platform import sys import re import requests from six.moves import input from urllib.parse import urlparse import webbrowser from ..utils.config import ( store_token, get_config, OIDC_CLIENT_ID ) from .. import errors from ..utils.api import RepoApi from ..utils.local_auth_server import WebServer from .base import SubCommandBase logger = logging.getLogger('repo_cli') def get_login_and_password(args): if getattr(args, 'login_username', None): username = args.login_username else: username = input('Username: ') # password = getattr(args, 'login_password', None) return username, password def interactive_get_token(args, fail_if_already_exists=True): # bs = get_server_api(args.token, args.site) config = get_config(site=args.site) token = None # This function could be called from a totally different CLI, so we don't # know if the attribute hostname exists. # args.site or config.get('default_site') url = config.get('url') username, password = get_login_and_password(args) for _ in range(3): try: if password is None: password = getpass.getpass(stream=sys.stderr) api = RepoApi(base_url=url) token = api.login(username, password) # token = login_user(username, password, url) if not token: msg = 'Unable to request the user token. Server was unable to return any valid token!' logger.error(msg) raise errors.RepoCLIError(msg) return token['user'] except errors.Unauthorized: logger.error('Invalid Username password combination, please try again') password = None continue return token['user'] def interactive_login(args): token = interactive_get_token(args) store_token(token, args) logger.info('login successful') return token def main(args): interactive_login(args) class SubCommand(SubCommandBase): name = "login" manages_auth = True def main(self): self.login() def login(self): token = self.interactive_get_token() store_token(token, self.args) is_admin = self.api.is_admin_jwt msg = 'login as ADMIN successful' if is_admin else 'login successful' self.log.info(msg) return token # ['id'] def get_login_and_password(self): if getattr(self.args, 'login_username', None): username = self.args.login_username else: username = input('Username: ') self.username = username password = getattr(self.args, 'login_password', None) return username, password def interactive_get_token(self): config = get_config() if config.get('oauth2'): token = self.oauth2_get_token() else: token = self.direct_get_token() return token def get_and_validate_user_token(self): user_token = self.api.get_user_token() if not user_token: msg = 'Unable to request the user token. Server was unable to return any valid user token!' logger.error(msg) raise errors.RepoCLIError(msg) return user_token def direct_get_token(self): username, password = self.get_login_and_password() for _ in range(3): try: if password is None: password = getpass.getpass(stream=sys.stderr) jwt_token = self.api.login(username, password) user_token = self.get_and_validate_user_token() return user_token except errors.Unauthorized: logger.error('Invalid Username password combination, please try again') password = None continue raise errors.RepoCLIError("You've reached maximum login attempts") def get_openid_configuration_url(self): url = self.api.get_authorize_url() matches = re.match('(/auth/realms/([a-z]+)/).*', url.path) if not matches: logger.info(f'Auth path is not supported: {url.path}') raise errors.WrongRepoAuthSetup() url = url._replace(path=matches[1] + '.well-known/openid-configuration') return url.geturl() def oauth2_get_token(self): openid_configuration_url=self.get_openid_configuration_url() self.log.debug(f"OpenID configuration: {openid_configuration_url}") server = WebServer( client_id=OIDC_CLIENT_ID, openid_configuration_url=openid_configuration_url ) thread = server.start() thread.start() self.log.info("Opening browser to get a token...") webbrowser.open(server.localhost_url()) thread.join(120) if not server.access_token: msg = 'Unable to request the user token. Server was unable to return any valid token!' logger.error(msg) raise errors.RepoCLIError(msg) self.api._jwt = server.access_token user_token = self.get_and_validate_user_token() return user_token def add_parser(self, subparsers): self.subparser = subparser = subparsers.add_parser('login', help='Authenticate a user', description=__doc__) subparser.add_argument('--hostname', default=platform.node(), help="Specify the host name of this login, this should be unique (default: %(default)s)") subparser.add_argument('--username', dest='login_username', help="Specify your username. If this is not given, you will be prompted") subparser.add_argument('--password', dest='login_password', help="Specify your password. If this is not given, you will be prompted") subparser.set_defaults(main=self.main)