ROOTPLOIT
Server: LiteSpeed
System: Linux in-mum-web1878.main-hosting.eu 5.14.0-570.21.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Jun 11 07:22:35 EDT 2025 x86_64
User: u435929562 (435929562)
PHP: 7.4.33
Disabled: system, exec, shell_exec, passthru, mysql_list_dbs, ini_alter, dl, symlink, link, chgrp, leak, popen, apache_child_terminate, virtual, mb_send_mail
Upload Files
File: //proc/self/root/opt/gsutil/third_party/pyu2f/pyu2f/tests/customauthenticator_test.py
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for pyu2f.convenience.customauthenticator."""

import base64
import json
import struct
import sys

import mock
from pyu2f import errors
from pyu2f import model
from pyu2f.convenience import customauthenticator


if sys.version_info[:2] < (2, 7):
  import unittest2 as unittest  # pylint: disable=g-import-not-at-top
else:
  import unittest  # pylint: disable=g-import-not-at-top


# Input/ouput values recorded from a successful signing flow
SIGN_SUCCESS = {
    'app_id': 'test_app_id',
    'app_id_hash_encoded': 'TnMguTdPn7OcIO9f-0CgfQdY254bvc6WR-DTPZnJ49w',
    'challenge': b'asdfasdf',
    'challenge_hash_encoded': 'qhJtbTQvsU0BmLLpDWes-3zFGbegR2wp1mv5BJ2BwC0',
    'key_handle_encoded': ('iBbl9-VYt-XSdWeHVNX-gfQcXGzlrAQ7BcngVNUxWijIQQlnZEI'
                           '4Vb0Bp2ydBCbIQu_5rNlKqPH6NK1TtnM7fA'),
    'origin': 'test_origin',
    'signature_data_encoded': ('AQAAAI8wRQIhALlIPo6Hg8HwzELdYRIXnAnpsiHYCSXHex'
                               'CS34eiS2ixAiBt3TRmKE1A9WyMjc3JGrGI7gSPg-QzDSNL'
                               'aIj7JwcCTA'),
    'client_data_encoded': ('eyJjaGFsbGVuZ2UiOiAiWVhOa1ptRnpaR1kiLCAib3JpZ2luI'
                            'jogInRlc3Rfb3JpZ2luIiwgInR5cCI6ICJuYXZpZ2F0b3IuaW'
                            'QuZ2V0QXNzZXJ0aW9uIn0'),
    'u2f_version': 'U2F_V2',
    'registered_key': model.RegisteredKey(base64.urlsafe_b64decode(
        'iBbl9-VYt-XSdWeHVNX-gfQcXGzlrAQ7BcngVNUxWijIQQlnZEI4Vb0Bp2ydBCbIQu'
        '_5rNlKqPH6NK1TtnM7fA=='
    ))
}


@mock.patch.object(sys, 'stderr', new=mock.MagicMock())
class CustomAuthenticatorTest(unittest.TestCase):

  @mock.patch.object(customauthenticator.os.environ, 'get', return_value=None)
  def testEnvVarNotSet(self, os_get_method):
    authenticator = customauthenticator.CustomAuthenticator('testorigin')

    self.assertFalse(authenticator.IsAvailable(),
                     'Should return false when no env var is present')

    # Assert exception if Authenticate is called when no env var is set
    with self.assertRaises(errors.PluginError):
      authenticator.Authenticate('appid', {})

  @mock.patch.object(customauthenticator.subprocess, 'Popen')
  @mock.patch.object(customauthenticator.os.environ, 'get',
                     return_value='gnubbyagent')
  def testSuccessAuthenticate(self, os_get_method, popen_method):
    """Test plugin Authenticate success."""
    valid_plugin_response = {
        'type': 'sign_helper_reply',
        'code': 0,
        'errorDetail': '',
        'responseData': {
            'appIdHash': SIGN_SUCCESS['app_id_hash_encoded'],
            'challengeHash': SIGN_SUCCESS['challenge_hash_encoded'],
            'keyHandle': SIGN_SUCCESS['key_handle_encoded'],
            'version': SIGN_SUCCESS['u2f_version'],
            'signatureData': SIGN_SUCCESS['signature_data_encoded']},
        'data': None
    }
    valid_plugin_response_json = json.dumps(valid_plugin_response).encode()
    valid_plugin_response_len = struct.pack('<I',
                                            len(valid_plugin_response_json))

    # process returns sign response in json
    mock_communicate_method = mock.MagicMock()
    mock_communicate_method.return_value = [valid_plugin_response_len +
                                            valid_plugin_response_json]

    # process returns with return code of 0
    mock_wait_method = mock.MagicMock()
    mock_wait_method.return_value = 0

    process_mock = mock.MagicMock()
    process_mock.communicate = mock_communicate_method
    process_mock.wait = mock_wait_method

    popen_method.return_value = process_mock

    # Call IsAvailable()
    authenticator = customauthenticator.CustomAuthenticator(
        SIGN_SUCCESS['origin'])

    self.assertTrue(authenticator.IsAvailable(),
                    'Should return true if env var is present')
    self.assertTrue(os_get_method.called)

    # Call Authenticate()
    challenge_data = [{'key': SIGN_SUCCESS['registered_key'],
                       'challenge': SIGN_SUCCESS['challenge']}]
    result = authenticator.Authenticate(SIGN_SUCCESS['app_id'], challenge_data)

    # Validate expected plugin request
    self.assertTrue(popen_method.called)

    self.assertTrue(mock_communicate_method.called)
    communicate_args = mock_communicate_method.call_args[0]
    self.assertEqual(len(communicate_args), 1,
                     'communicate() should have been called with two args')

    communicate_stdin = communicate_args[0]
    communicate_json_len_le = communicate_stdin[:4]
    communicate_json_len = struct.unpack('<I', communicate_json_len_le)[0]
    communicate_json = communicate_stdin[4:]
    self.assertEqual(len(communicate_json), communicate_json_len,
                     'communicate() should have been called with correct'
                     'length field')

    communicate_dict = json.loads(communicate_json.decode("utf8"))
    self.assertEqual(communicate_dict.get('type'), 'sign_helper_request')
    self.assertEqual(communicate_dict.get('timeoutSeconds'), 5)
    self.assertEqual(communicate_dict.get('localAlways'), True)
    challenges = communicate_dict.get('signData')

    # Validate Challenge portion of plugin request
    self.assertIsNotNone(challenges)
    self.assertEqual(len(challenges), 1)
    challenge = challenges[0]
    self.assertEqual(challenge.get('appIdHash'),
                     SIGN_SUCCESS['app_id_hash_encoded'])
    self.assertEqual(challenge.get('challengeHash'),
                     SIGN_SUCCESS['challenge_hash_encoded'])
    self.assertEqual(challenge.get('keyHandle'),
                     SIGN_SUCCESS['key_handle_encoded'])
    self.assertEqual(challenge.get('version'),
                     SIGN_SUCCESS['u2f_version'])

    mock_wait_method.assert_called_with()

    # Validate Authenticate() response
    self.assertEqual(result['applicationId'], SIGN_SUCCESS['app_id'])
    self.assertEqual(result['clientData'], SIGN_SUCCESS['client_data_encoded'])
    self.assertEqual(result['keyHandle'], SIGN_SUCCESS['key_handle_encoded'])
    self.assertEqual(result['signatureData'],
                     SIGN_SUCCESS['signature_data_encoded'])

  @mock.patch.object(customauthenticator.subprocess, 'Popen')
  @mock.patch.object(customauthenticator.os.environ, 'get',
                     return_value='gnubbyagent')
  def testPluginReturnsMalformedJson(self, os_get_method, popen_method):
    """Test when plugin returns non-json response."""
    plugin_response = b'abc'
    plugin_response_len = struct.pack('<I', len(plugin_response))

    # process returns sign response in json
    mock_communicate_method = mock.MagicMock()
    mock_communicate_method.return_value = [plugin_response_len +
                                            plugin_response]

    # process returns with return code of 0
    mock_wait_method = mock.MagicMock()
    mock_wait_method.return_value = 0

    process_mock = mock.MagicMock()
    process_mock.communicate = mock_communicate_method
    process_mock.wait = mock_wait_method

    popen_method.return_value = process_mock

    # Call Authenticate()
    challenge_data = [{'key': SIGN_SUCCESS['registered_key'],
                       'challenge': SIGN_SUCCESS['challenge']}]

    authenticator = customauthenticator.CustomAuthenticator(
        SIGN_SUCCESS['origin'])
    with self.assertRaises(errors.PluginError):
      authenticator.Authenticate(SIGN_SUCCESS['app_id'], challenge_data)

  @mock.patch.object(customauthenticator.subprocess, 'Popen')
  @mock.patch.object(customauthenticator.os.environ, 'get',
                     return_value='gnubbyagent')
  def testPluginResponseMissingType(self, os_get_method, popen_method):
    """Test when plugin response is missing sign_helper_reply type."""
    valid_plugin_response = {
        'errorDetail': '',
        'code': 0,
        'responseData': {
            'appIdHash': SIGN_SUCCESS['app_id_hash_encoded'],
            'challengeHash': SIGN_SUCCESS['challenge_hash_encoded'],
            'keyHandle': SIGN_SUCCESS['key_handle_encoded'],
            'version': SIGN_SUCCESS['u2f_version'],
            'signatureData': SIGN_SUCCESS['signature_data_encoded']},
        'data': None
    }
    plugin_response_json = json.dumps(valid_plugin_response).encode()
    plugin_response_len = struct.pack('<I', len(plugin_response_json))

    # process returns sign response in json
    mock_communicate_method = mock.MagicMock()
    mock_communicate_method.return_value = [plugin_response_len +
                                            plugin_response_json]

    # process returns with return code of 0
    mock_wait_method = mock.MagicMock()
    mock_wait_method.return_value = 0

    process_mock = mock.MagicMock()
    process_mock.communicate = mock_communicate_method
    process_mock.wait = mock_wait_method

    popen_method.return_value = process_mock

    # Call Authenticate()
    challenge_data = [{'key': SIGN_SUCCESS['registered_key'],
                       'challenge': SIGN_SUCCESS['challenge']}]

    authenticator = customauthenticator.CustomAuthenticator(
        SIGN_SUCCESS['origin'])
    with self.assertRaises(errors.PluginError):
      authenticator.Authenticate(SIGN_SUCCESS['app_id'], challenge_data)

  @mock.patch.object(customauthenticator.subprocess, 'Popen')
  @mock.patch.object(customauthenticator.os.environ, 'get',
                     return_value='gnubbyagent')
  def testPluginReturnsIncompleteResponse(self, os_get_method, popen_method):
    """Test when plugin response has missing fields."""
    valid_plugin_response = {
        'type': 'sign_helper_reply',
        'errorDetail': '',
        'responseData': {
            'appIdHash': SIGN_SUCCESS['app_id_hash_encoded'],
            'challengeHash': SIGN_SUCCESS['challenge_hash_encoded'],
            'keyHandle': SIGN_SUCCESS['key_handle_encoded'],
            'version': SIGN_SUCCESS['u2f_version'],
            'signatureData': SIGN_SUCCESS['signature_data_encoded']},
        'data': None
    }
    plugin_response_json = json.dumps(valid_plugin_response).encode()
    plugin_response_len = struct.pack('<I', len(plugin_response_json))

    # process returns sign response in json
    mock_communicate_method = mock.MagicMock()
    mock_communicate_method.return_value = [plugin_response_len +
                                            plugin_response_json]

    # process returns with return code of 0
    mock_wait_method = mock.MagicMock()
    mock_wait_method.return_value = 0

    process_mock = mock.MagicMock()
    process_mock.communicate = mock_communicate_method
    process_mock.wait = mock_wait_method

    popen_method.return_value = process_mock

    # Call Authenticate()
    challenge_data = [{'key': SIGN_SUCCESS['registered_key'],
                       'challenge': SIGN_SUCCESS['challenge']}]

    authenticator = customauthenticator.CustomAuthenticator(
        SIGN_SUCCESS['origin'])
    with self.assertRaises(errors.PluginError):
      authenticator.Authenticate(SIGN_SUCCESS['app_id'], challenge_data)

  @mock.patch.object(customauthenticator.subprocess, 'Popen')
  @mock.patch.object(customauthenticator.os.environ, 'get',
                     return_value='gnubbyagent')
  def testPluginReturnsTouchRequired(self, os_get_method, popen_method):
    """Test when plugin with error 'touch required'."""
    valid_plugin_response = {
        'type': 'sign_helper_reply',
        'errorDetail': '',
        'code': customauthenticator.SK_SIGNING_PLUGIN_TOUCH_REQUIRED,
        'responseData': {
            'appIdHash': SIGN_SUCCESS['app_id_hash_encoded'],
            'challengeHash': SIGN_SUCCESS['challenge_hash_encoded'],
            'keyHandle': SIGN_SUCCESS['key_handle_encoded'],
            'version': SIGN_SUCCESS['u2f_version'],
            'signatureData': SIGN_SUCCESS['signature_data_encoded']},
        'data': None
    }
    plugin_response_json = json.dumps(valid_plugin_response).encode()
    plugin_response_len = struct.pack('<I', len(plugin_response_json))

    # process returns sign response in json
    mock_communicate_method = mock.MagicMock()
    mock_communicate_method.return_value = [plugin_response_len +
                                            plugin_response_json]

    # process returns with return code of 0
    mock_wait_method = mock.MagicMock()
    mock_wait_method.return_value = 0

    process_mock = mock.MagicMock()
    process_mock.communicate = mock_communicate_method
    process_mock.wait = mock_wait_method

    popen_method.return_value = process_mock

    # Call Authenticate()
    challenge_data = [{'key': SIGN_SUCCESS['registered_key'],
                       'challenge': SIGN_SUCCESS['challenge']}]

    authenticator = customauthenticator.CustomAuthenticator(
        SIGN_SUCCESS['origin'])

    with self.assertRaises(errors.U2FError) as cm:
      authenticator.Authenticate(SIGN_SUCCESS['app_id'], challenge_data)
    self.assertEqual(cm.exception.code, errors.U2FError.TIMEOUT)

  @mock.patch.object(customauthenticator.subprocess, 'Popen')
  @mock.patch.object(customauthenticator.os.environ, 'get',
                     return_value='gnubbyagent')
  def testPluginReturnsWrongData(self, os_get_method, popen_method):
    """Test when plugin with error 'wrong data'."""
    valid_plugin_response = {
        'type': 'sign_helper_reply',
        'errorDetail': '',
        'code': customauthenticator.SK_SIGNING_PLUGIN_WRONG_DATA,
        'responseData': {
            'appIdHash': SIGN_SUCCESS['app_id_hash_encoded'],
            'challengeHash': SIGN_SUCCESS['challenge_hash_encoded'],
            'keyHandle': SIGN_SUCCESS['key_handle_encoded'],
            'version': SIGN_SUCCESS['u2f_version'],
            'signatureData': SIGN_SUCCESS['signature_data_encoded']},
        'data': None
    }
    plugin_response_json = json.dumps(valid_plugin_response).encode()
    plugin_response_len = struct.pack('<I', len(plugin_response_json))

    # process returns sign response in json
    mock_communicate_method = mock.MagicMock()
    mock_communicate_method.return_value = [plugin_response_len +
                                            plugin_response_json]

    # process returns with return code of 0
    mock_wait_method = mock.MagicMock()
    mock_wait_method.return_value = 0

    process_mock = mock.MagicMock()
    process_mock.communicate = mock_communicate_method
    process_mock.wait = mock_wait_method

    popen_method.return_value = process_mock

    # Call Authenticate()
    challenge_data = [{'key': SIGN_SUCCESS['registered_key'],
                       'challenge': SIGN_SUCCESS['challenge']}]

    authenticator = customauthenticator.CustomAuthenticator(
        SIGN_SUCCESS['origin'])

    with self.assertRaises(errors.U2FError) as cm:
      authenticator.Authenticate(SIGN_SUCCESS['app_id'], challenge_data)
    self.assertEqual(cm.exception.code, errors.U2FError.DEVICE_INELIGIBLE)


if __name__ == '__main__':
  unittest.main()