ROOTPLOIT
Server: LiteSpeed
System: Linux in-mum-web1878.main-hosting.eu 5.14.0-570.21.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Jun 11 07:22:35 EDT 2025 x86_64
User: u435929562 (435929562)
PHP: 7.4.33
Disabled: system, exec, shell_exec, passthru, mysql_list_dbs, ini_alter, dl, symlink, link, chgrp, leak, popen, apache_child_terminate, virtual, mb_send_mail
Upload Files
File: //proc/self/root/opt/gsutil/third_party/apitools/apitools/gen/util.py
#!/usr/bin/env python
#
# Copyright 2015 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Assorted utilities shared between parts of apitools."""
from __future__ import print_function
from __future__ import unicode_literals

import collections
import contextlib
import gzip
import json
import keyword
import logging
import os
import re
import tempfile

import six
from six.moves import urllib_parse
import six.moves.urllib.error as urllib_error
import six.moves.urllib.request as urllib_request


class Error(Exception):

    """Base error for apitools generation."""


class CommunicationError(Error):

    """Error in network communication."""


def _SortLengthFirstKey(a):
    return -len(a), a


class Names(object):

    """Utility class for cleaning and normalizing names in a fixed style."""
    DEFAULT_NAME_CONVENTION = 'LOWER_CAMEL'
    NAME_CONVENTIONS = ['LOWER_CAMEL', 'LOWER_WITH_UNDER', 'NONE']

    def __init__(self, strip_prefixes,
                 name_convention=None,
                 capitalize_enums=False):
        self.__strip_prefixes = sorted(strip_prefixes, key=_SortLengthFirstKey)
        self.__name_convention = (
            name_convention or self.DEFAULT_NAME_CONVENTION)
        self.__capitalize_enums = capitalize_enums

    @staticmethod
    def __FromCamel(name, separator='_'):
        name = re.sub(r'([a-z0-9])([A-Z])', r'\1%s\2' % separator, name)
        return name.lower()

    @staticmethod
    def __ToCamel(name, separator='_'):
        # TODO(craigcitro): Consider what to do about leading or trailing
        # underscores (such as `_refValue` in discovery).
        return ''.join(s[0:1].upper() + s[1:] for s in name.split(separator))

    @staticmethod
    def __ToLowerCamel(name, separator='_'):
        name = Names.__ToCamel(name, separator=separator)
        return name[0].lower() + name[1:]

    def __StripName(self, name):
        """Strip strip_prefix entries from name."""
        if not name:
            return name
        for prefix in self.__strip_prefixes:
            if name.startswith(prefix):
                return name[len(prefix):]
        return name

    @staticmethod
    def CleanName(name):
        """Perform generic name cleaning."""
        name = re.sub('[^_A-Za-z0-9]', '_', name)
        if name[0].isdigit():
            name = '_%s' % name
        while keyword.iskeyword(name) or name == 'exec':
            name = '%s_' % name
        # If we end up with __ as a prefix, we'll run afoul of python
        # field renaming, so we manually correct for it.
        if name.startswith('__'):
            name = 'f%s' % name
        return name

    @staticmethod
    def NormalizeRelativePath(path):
        """Normalize camelCase entries in path."""
        path_components = path.split('/')
        normalized_components = []
        for component in path_components:
            if re.match(r'{[A-Za-z0-9_]+}$', component):
                normalized_components.append(
                    '{%s}' % Names.CleanName(component[1:-1]))
            else:
                normalized_components.append(component)
        return '/'.join(normalized_components)

    def NormalizeEnumName(self, enum_name):
        if self.__capitalize_enums:
            enum_name = enum_name.upper()
        return self.CleanName(enum_name)

    def ClassName(self, name, separator='_'):
        """Generate a valid class name from name."""
        # TODO(craigcitro): Get rid of this case here and in MethodName.
        if name is None:
            return name
        # TODO(craigcitro): This is a hack to handle the case of specific
        # protorpc class names; clean this up.
        if name.startswith(('protorpc.', 'message_types.',
                            'apitools.base.protorpclite.',
                            'apitools.base.protorpclite.message_types.')):
            return name
        name = self.__StripName(name)
        name = self.__ToCamel(name, separator=separator)
        return self.CleanName(name)

    def MethodName(self, name, separator='_'):
        """Generate a valid method name from name."""
        if name is None:
            return None
        name = Names.__ToCamel(name, separator=separator)
        return Names.CleanName(name)

    def FieldName(self, name):
        """Generate a valid field name from name."""
        # TODO(craigcitro): We shouldn't need to strip this name, but some
        # of the service names here are excessive. Fix the API and then
        # remove this.
        name = self.__StripName(name)
        if self.__name_convention == 'LOWER_CAMEL':
            name = Names.__ToLowerCamel(name)
        elif self.__name_convention == 'LOWER_WITH_UNDER':
            name = Names.__FromCamel(name)
        return Names.CleanName(name)


@contextlib.contextmanager
def Chdir(dirname, create=True):
    if not os.path.exists(dirname):
        if not create:
            raise OSError('Cannot find directory %s' % dirname)
        else:
            os.mkdir(dirname)
    previous_directory = os.getcwd()
    try:
        os.chdir(dirname)
        yield
    finally:
        os.chdir(previous_directory)


def NormalizeVersion(version):
    # Currently, '.' is the only character that might cause us trouble.
    return version.replace('.', '_')


def _ComputePaths(package, version, root_url, service_path):
    """Compute the base url and base path.

    Attributes:
      package: name field of the discovery, i.e. 'storage' for storage service.
      version: version of the service, i.e. 'v1'.
      root_url: root url of the service, i.e. 'https://www.googleapis.com/'.
      service_path: path of the service under the rool url, i.e. 'storage/v1/'.

    Returns:
      base url: string, base url of the service,
        'https://www.googleapis.com/storage/v1/' for the storage service.
      base path: string, common prefix of service endpoints after the base url.
    """
    full_path = urllib_parse.urljoin(root_url, service_path)
    api_path_component = '/'.join((package, version, ''))
    if api_path_component not in full_path:
        return full_path, ''
    prefix, _, suffix = full_path.rpartition(api_path_component)
    return prefix + api_path_component, suffix


class ClientInfo(collections.namedtuple('ClientInfo', (
        'package', 'scopes', 'version', 'client_id', 'client_secret',
        'user_agent', 'client_class_name', 'url_version', 'api_key',
        'base_url', 'base_path', 'mtls_base_url'))):

    """Container for client-related info and names."""

    @classmethod
    def Create(cls, discovery_doc,
               scope_ls, client_id, client_secret, user_agent, names, api_key,
               version_identifier):
        """Create a new ClientInfo object from a discovery document."""
        scopes = set(
            discovery_doc.get('auth', {}).get('oauth2', {}).get('scopes', {}))
        scopes.update(scope_ls)
        package = discovery_doc['name']
        version = (
            version_identifier or NormalizeVersion(discovery_doc['version']))
        url_version = discovery_doc['version']
        base_url, base_path = _ComputePaths(package, url_version,
                                            discovery_doc['rootUrl'],
                                            discovery_doc['servicePath'])

        mtls_root_url = discovery_doc.get('mtlsRootUrl', '')
        mtls_base_url = ''
        if mtls_root_url:
            mtls_base_url, _ = _ComputePaths(package, url_version,
                                             mtls_root_url,
                                             discovery_doc['servicePath'])

        client_info = {
            'package': package,
            'version': version,
            'url_version': url_version,
            'scopes': sorted(list(scopes)),
            'client_id': client_id,
            'client_secret': client_secret,
            'user_agent': user_agent,
            'api_key': api_key,
            'base_url': base_url,
            'base_path': base_path,
            'mtls_base_url': mtls_base_url,
        }
        client_class_name = '%s%s' % (
            names.ClassName(client_info['package']),
            names.ClassName(client_info['version']))
        client_info['client_class_name'] = client_class_name
        return cls(**client_info)

    @property
    def default_directory(self):
        return self.package

    @property
    def client_rule_name(self):
        return '%s_%s_client' % (self.package, self.version)

    @property
    def client_file_name(self):
        return '%s.py' % self.client_rule_name

    @property
    def messages_rule_name(self):
        return '%s_%s_messages' % (self.package, self.version)

    @property
    def services_rule_name(self):
        return '%s_%s_services' % (self.package, self.version)

    @property
    def messages_file_name(self):
        return '%s.py' % self.messages_rule_name

    @property
    def messages_proto_file_name(self):
        return '%s.proto' % self.messages_rule_name

    @property
    def services_proto_file_name(self):
        return '%s.proto' % self.services_rule_name


def ReplaceHomoglyphs(s):
    """Returns s with unicode homoglyphs replaced by ascii equivalents."""
    homoglyphs = {
        '\xa0': ' ',  #   ?
        '\u00e3': '',  # TODO(gsfowler) drop after .proto spurious char elided
        '\u00a0': ' ',  #   ?
        '\u00a9': '(C)',  # COPYRIGHT SIGN (would you believe "asciiglyph"?)
        '\u00ae': '(R)',  # REGISTERED SIGN (would you believe "asciiglyph"?)
        '\u2014': '-',  # EM DASH
        '\u2018': "'",  # LEFT SINGLE QUOTATION MARK
        '\u2019': "'",  # RIGHT SINGLE QUOTATION MARK
        '\u201c': '"',  # LEFT DOUBLE QUOTATION MARK
        '\u201d': '"',  # RIGHT DOUBLE QUOTATION MARK
        '\u2026': '...',  # HORIZONTAL ELLIPSIS
        '\u2e3a': '-',  # TWO-EM DASH
    }

    def _ReplaceOne(c):
        """Returns the homoglyph or escaped replacement for c."""
        equiv = homoglyphs.get(c)
        if equiv is not None:
            return equiv
        try:
            c.encode('ascii')
            return c
        except UnicodeError:
            pass
        try:
            return c.encode('unicode-escape').decode('ascii')
        except UnicodeError:
            return '?'

    return ''.join([_ReplaceOne(c) for c in s])


def CleanDescription(description):
    """Return a version of description safe for printing in a docstring."""
    if not isinstance(description, six.string_types):
        return description
    if six.PY3:
        # https://docs.python.org/3/reference/lexical_analysis.html#index-18
        description = description.replace('\\N', '\\\\N')
        description = description.replace('\\u', '\\\\u')
        description = description.replace('\\U', '\\\\U')
    description = ReplaceHomoglyphs(description)
    return description.replace('"""', '" " "')


class SimplePrettyPrinter(object):

    """Simple pretty-printer that supports an indent contextmanager."""

    def __init__(self, out):
        self.__out = out
        self.__indent = ''
        self.__skip = False
        self.__comment_context = False

    @property
    def indent(self):
        return self.__indent

    def CalculateWidth(self, max_width=78):
        return max_width - len(self.indent)

    @contextlib.contextmanager
    def Indent(self, indent='  '):
        previous_indent = self.__indent
        self.__indent = '%s%s' % (previous_indent, indent)
        yield
        self.__indent = previous_indent

    @contextlib.contextmanager
    def CommentContext(self):
        """Print without any argument formatting."""
        old_context = self.__comment_context
        self.__comment_context = True
        yield
        self.__comment_context = old_context

    def __call__(self, *args):
        if self.__comment_context and args[1:]:
            raise Error('Cannot do string interpolation in comment context')
        if args and args[0]:
            if not self.__comment_context:
                line = (args[0] % args[1:]).rstrip()
            else:
                line = args[0].rstrip()
            line = ReplaceHomoglyphs(line)
            try:
                print('%s%s' % (self.__indent, line), file=self.__out)
            except UnicodeEncodeError:
                line = line.encode('ascii', 'backslashreplace').decode('ascii')
                print('%s%s' % (self.__indent, line), file=self.__out)
        else:
            print('', file=self.__out)


def _NormalizeDiscoveryUrls(discovery_url):
    """Expands a few abbreviations into full discovery urls."""
    if discovery_url.startswith('http'):
        return [discovery_url]
    elif '.' not in discovery_url:
        raise ValueError('Unrecognized value "%s" for discovery url')
    api_name, _, api_version = discovery_url.partition('.')
    return [
        'https://www.googleapis.com/discovery/v1/apis/%s/%s/rest' % (
            api_name, api_version),
        'https://%s.googleapis.com/$discovery/rest?version=%s' % (
            api_name, api_version),
    ]


def _Gunzip(gzipped_content):
    """Returns gunzipped content from gzipped contents."""
    f = tempfile.NamedTemporaryFile(suffix='gz', mode='w+b', delete=False)
    try:
        f.write(gzipped_content)
        f.close()  # force file synchronization
        with gzip.open(f.name, 'rb') as h:
            decompressed_content = h.read()
        return decompressed_content
    finally:
        os.unlink(f.name)


def _GetURLContent(url):
    """Download and return the content of URL."""
    response = urllib_request.urlopen(url)
    encoding = response.info().get('Content-Encoding')
    if encoding == 'gzip':
        content = _Gunzip(response.read())
    else:
        content = response.read()
    return content


def FetchDiscoveryDoc(discovery_url, retries=5):
    """Fetch the discovery document at the given url."""
    discovery_urls = _NormalizeDiscoveryUrls(discovery_url)
    discovery_doc = None
    last_exception = None
    for url in discovery_urls:
        for _ in range(retries):
            try:
                content = _GetURLContent(url)
                if isinstance(content, bytes):
                    content = content.decode('utf8')
                discovery_doc = json.loads(content)
                if discovery_doc:
                    return discovery_doc
            except (urllib_error.HTTPError, urllib_error.URLError) as e:
                logging.info(
                    'Attempting to fetch discovery doc again after "%s"', e)
                last_exception = e
    if discovery_doc is None:
        raise CommunicationError(
            'Could not find discovery doc at any of %s: %s' % (
                discovery_urls, last_exception))