ROOTPLOIT
Server: LiteSpeed
System: Linux in-mum-web1878.main-hosting.eu 5.14.0-570.21.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Jun 11 07:22:35 EDT 2025 x86_64
User: u435929562 (435929562)
PHP: 7.4.33
Disabled: system, exec, shell_exec, passthru, mysql_list_dbs, ini_alter, dl, symlink, link, chgrp, leak, popen, apache_child_terminate, virtual, mb_send_mail
Upload Files
File: //opt/gsutil/third_party/google-reauth-python/google_reauth/reauth.py
# Copyright 2017 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A module that provides functions for handling rapt authentication.

Reauth is a process of obtaining additional authentication (such as password,
security token, etc.) while refreshing OAuth 2.0 credentials for a user.

Credentials that use the Reauth flow must have the reauth scope,
``https://www.googleapis.com/auth/accounts.reauth``.

This module provides a high-level function for executing the Reauth process,
:func:`refresh_access_token`, and lower-level helpers for doing the individual
steps of the reauth process.

Those steps are:

1. Obtaining a list of challenges from the reauth server.
2. Running through each challenge and sending the result back to the reauth
   server.
3. Refreshing the access token using the returned rapt token.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import json
import sys

from google_reauth import challenges
from google_reauth import errors
from google_reauth import _helpers
from google_reauth import _reauth_client
from six.moves import http_client
from six.moves import range


_REAUTH_SCOPE = 'https://www.googleapis.com/auth/accounts.reauth'

_REAUTH_NEEDED_ERROR = 'invalid_grant'
_REAUTH_NEEDED_ERROR_INVALID_RAPT = 'invalid_rapt'
_REAUTH_NEEDED_ERROR_RAPT_REQUIRED = 'rapt_required'

_AUTHENTICATED = 'AUTHENTICATED'
_CHALLENGE_REQUIRED = 'CHALLENGE_REQUIRED'
_CHALLENGE_PENDING = 'CHALLENGE_PENDING'


def _run_next_challenge(msg, http_request, access_token):
    """Get the next challenge from msg and run it.

    Args:
        msg: Reauth API response body (either from the initial request to
            https://reauth.googleapis.com/v2/sessions:start or from sending the
            previous challenge response to
            https://reauth.googleapis.com/v2/sessions/id:continue)
        http_request: callable to run http requests. Accepts uri, method, body
            and headers. Returns a tuple: (response, content)
        access_token: reauth access token

    Returns: rapt token.
    Raises:
        errors.ReauthError if reauth failed
    """
    for challenge in msg['challenges']:
        if challenge['status'] != 'READY':
            # Skip non-activated challneges.
            continue
        c = challenges.AVAILABLE_CHALLENGES.get(
                challenge['challengeType'], None)
        if not c:
            raise errors.ReauthFailError(
                'Unsupported challenge type {0}. Supported types: {1}'
                .format(challenge['challengeType'],
                        ','.join(list(challenges.AVAILABLE_CHALLENGES.keys())))
            )
        if not c.is_locally_eligible:
            raise errors.ReauthFailError(
                'Challenge {0} is not locally eligible'
                .format(challenge['challengeType']))
        client_input = c.obtain_challenge_input(challenge)
        if not client_input:
            return None
        return _reauth_client.send_challenge_result(
            http_request,
            msg['sessionId'],
            challenge['challengeId'],
            client_input,
            access_token)
    return None


def _obtain_rapt(http_request, access_token, requested_scopes, rounds_num=5):
    """Given an http request method and reauth access token, get rapt token.

    Args:
        http_request: callable to run http requests. Accepts uri, method, body
            and headers. Returns a tuple: (response, content)
        access_token: reauth access token
        requested_scopes: scopes required by the client application
        rounds_num: max number of attempts to get a rapt after the next
            challenge, before failing the reauth. This defines total number of
            challenges + number of additional retries if the chalenge input
            wasn't accepted.

    Returns: rapt token.
    Raises:
        errors.ReauthError if reauth failed
    """
    msg = None

    for _ in range(0, rounds_num):

        if not msg:
            msg = _reauth_client.get_challenges(
                http_request,
                list(challenges.AVAILABLE_CHALLENGES.keys()),
                access_token,
                requested_scopes)

        if msg['status'] == _AUTHENTICATED:
            return msg['encodedProofOfReauthToken']

        if not (msg['status'] == _CHALLENGE_REQUIRED or
                msg['status'] == _CHALLENGE_PENDING):
            raise errors.ReauthAPIError(
                'Challenge status {0}'.format(msg['status']))

        if not _helpers.is_interactive():
            raise errors.ReauthUnattendedError()

        msg = _run_next_challenge(msg, http_request, access_token)

    # If we got here it means we didn't get authenticated.
    raise errors.ReauthFailError()


def get_rapt_token(http_request, client_id, client_secret, refresh_token,
                   token_uri, scopes=None):
    """Given an http request method and refresh_token, get rapt token.

    Args:
        http_request: callable to run http requests. Accepts uri, method, body
            and headers. Returns a tuple: (response, content)
        client_id: client id to get access token for reauth scope.
        client_secret: client secret for the client_id
        refresh_token: refresh token to refresh access token
        token_uri: uri to refresh access token
        scopes: scopes required by the client application

    Returns: rapt token.
    Raises:
        errors.ReauthError if reauth failed
    """
    sys.stderr.write('Reauthentication required.\n')

    # Get access token for reauth.
    response, content = _reauth_client.refresh_grant(
        http_request=http_request,
        client_id=client_id,
        client_secret=client_secret,
        refresh_token=refresh_token,
        token_uri=token_uri,
        scopes=_REAUTH_SCOPE,
        headers={'Content-Type': 'application/x-www-form-urlencoded'})

    try:
        content = json.loads(content)
    except (TypeError, ValueError):
        raise errors.ReauthAccessTokenRefreshError(
            'Invalid response {0}'.format(_substr_for_error_message(content)))

    if response.status != http_client.OK:
        raise errors.ReauthAccessTokenRefreshError(
            _get_refresh_error_message(content), response.status)

    if 'access_token' not in content:
        raise errors.ReauthAccessTokenRefreshError(
            'Access token missing from the response')

    # Get rapt token from reauth API.
    rapt_token = _obtain_rapt(
        http_request,
        content['access_token'],
        requested_scopes=scopes)

    return rapt_token


def _rapt_refresh_required(content):
    """Checks if the rapt refresh is required.

    Args:
        content: refresh response content

    Returns:
        True if rapt refresh is required.
    """
    try:
        content = json.loads(content)
    except (TypeError, ValueError):
        return False
    return (
        content.get('error') == _REAUTH_NEEDED_ERROR and
        (content.get('error_subtype') == _REAUTH_NEEDED_ERROR_INVALID_RAPT or
         content.get('error_subtype') == _REAUTH_NEEDED_ERROR_RAPT_REQUIRED))


def _get_refresh_error_message(content):
    """Constructs an error from the http response.

    Args:
        response: http response
        content: parsed response content

    Returns:
        error message to show
    """
    error_msg = 'Invalid response.'
    if 'error' in content:
        error_msg = content['error']
        if 'error_description' in content:
            error_msg += ': ' + content['error_description']
    return error_msg


def _substr_for_error_message(content):
    """Returns content string to include in the error message"""
    return content if len(content) <= 100 else content[0:97] + "..."


def refresh_access_token(
        http_request, client_id, client_secret, refresh_token,
        token_uri, rapt=None, scopes=None, headers=None):
    """Refresh the access_token using the refresh_token.

    Args:
        http_request: callable to run http requests. Accepts uri, method, body
            and headers. Returns a tuple: (response, content)
        client_id: client id to get access token for reauth scope.
        client_secret: client secret for the client_id
        refresh_token: refresh token to refresh access token
        token_uri: uri to refresh access token
        scopes: scopes required by the client application

    Returns:
        Tuple[str, str, str, Optional[str], Optional[str], Optional[str]]: The
            rapt token, the access token, new refresh token, expiration,
            token id and response content returned by the token endpoint.
    Raises:
        errors.ReauthError if reauth failed
        errors.HttpAccessTokenRefreshError it access token refresh failed
    """

    response, content = _reauth_client.refresh_grant(
        http_request=http_request,
        client_id=client_id,
        client_secret=client_secret,
        refresh_token=refresh_token,
        token_uri=token_uri,
        rapt=rapt,
        headers=headers)

    if response.status != http_client.OK:
        # Check if we need a rapt token or if the rapt token is invalid.
        # Once we refresh the rapt token, retry the access token refresh.
        # If we did refresh the rapt token and still got an error, then the
        # refresh token is expired or revoked.

        if (_rapt_refresh_required(content)):
            rapt = get_rapt_token(
                http_request,
                client_id,
                client_secret,
                refresh_token,
                token_uri,
                scopes=scopes,
            )
            # retry with refreshed rapt
            response, content = _reauth_client.refresh_grant(
                http_request=http_request,
                client_id=client_id,
                client_secret=client_secret,
                refresh_token=refresh_token,
                token_uri=token_uri,
                rapt=rapt,
                headers=headers)

    try:
        content = json.loads(content)
    except (TypeError, ValueError):
        raise errors.HttpAccessTokenRefreshError(
            'Invalid response {0}'.format(_substr_for_error_message(content)),
            response.status)

    if response.status != http_client.OK:
        raise errors.HttpAccessTokenRefreshError(
            _get_refresh_error_message(content), response.status)

    access_token = content['access_token']
    refresh_token = content.get('refresh_token', None)
    expires_in = content.get('expires_in', None)
    id_token = content.get('id_token', None)
    return rapt, content, access_token, refresh_token, expires_in, id_token