add: demo server and client

This commit is contained in:
Charles
2024-04-06 20:45:15 -07:00
parent 5a149bc922
commit 72effb5b2a
100 changed files with 11603 additions and 62 deletions

View File

@@ -0,0 +1,43 @@
# coding: utf-8
# flake8: noqa
"""
PeerNetService API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
The version of the OpenAPI document: 0.0.1
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
__version__ = "1.0.0"
# import apis into sdk package
from openapi_client.api.peer_net_service_api import PeerNetServiceApi
# import ApiClient
from openapi_client.api_response import ApiResponse
from openapi_client.api_client import ApiClient
from openapi_client.configuration import Configuration
from openapi_client.exceptions import OpenApiException
from openapi_client.exceptions import ApiTypeError
from openapi_client.exceptions import ApiValueError
from openapi_client.exceptions import ApiKeyError
from openapi_client.exceptions import ApiAttributeError
from openapi_client.exceptions import ApiException
# import models into sdk package
from openapi_client.models.claim_ice_candidates_response import ClaimIceCandidatesResponse
from openapi_client.models.google_protobuf_any import GoogleProtobufAny
from openapi_client.models.ice_candidate import IceCandidate
from openapi_client.models.ice_session_description import IceSessionDescription
from openapi_client.models.knock import Knock
from openapi_client.models.list_knocks_response import ListKnocksResponse
from openapi_client.models.room import Room
from openapi_client.models.server import Server
from openapi_client.models.service import Service
from openapi_client.models.status import Status

View File

@@ -0,0 +1,5 @@
# flake8: noqa
# import apis into api package
from openapi_client.api.peer_net_service_api import PeerNetServiceApi

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,758 @@
# coding: utf-8
"""
PeerNetService API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
The version of the OpenAPI document: 0.0.1
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
import datetime
from dateutil.parser import parse
from enum import Enum
import json
import mimetypes
import os
import re
import tempfile
from urllib.parse import quote
from typing import Tuple, Optional, List, Dict
from openapi_client.configuration import Configuration
from openapi_client.api_response import ApiResponse, T as ApiResponseT
import openapi_client.models
from openapi_client import rest
from openapi_client.exceptions import (
ApiValueError,
ApiException,
BadRequestException,
UnauthorizedException,
ForbiddenException,
NotFoundException,
ServiceException
)
RequestSerialized = Tuple[str, str, Dict[str, str], Optional[str], List[str]]
class ApiClient:
"""Generic API client for OpenAPI client library builds.
OpenAPI generic API client. This client handles the client-
server communication, and is invariant across implementations. Specifics of
the methods and models for each application are generated from the OpenAPI
templates.
:param configuration: .Configuration object for this client
:param header_name: a header to pass when making calls to the API.
:param header_value: a header value to pass when making calls to
the API.
:param cookie: a cookie to include in the header when making calls
to the API
"""
PRIMITIVE_TYPES = (float, bool, bytes, str, int)
NATIVE_TYPES_MAPPING = {
'int': int,
'long': int, # TODO remove as only py3 is supported?
'float': float,
'str': str,
'bool': bool,
'date': datetime.date,
'datetime': datetime.datetime,
'object': object,
}
_pool = None
def __init__(
self,
configuration=None,
header_name=None,
header_value=None,
cookie=None
) -> None:
# use default configuration if none is provided
if configuration is None:
configuration = Configuration.get_default()
self.configuration = configuration
self.rest_client = rest.RESTClientObject(configuration)
self.default_headers = {}
if header_name is not None:
self.default_headers[header_name] = header_value
self.cookie = cookie
# Set default User-Agent.
self.user_agent = 'OpenAPI-Generator/1.0.0/python'
self.client_side_validation = configuration.client_side_validation
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
pass
@property
def user_agent(self):
"""User agent for this API client"""
return self.default_headers['User-Agent']
@user_agent.setter
def user_agent(self, value):
self.default_headers['User-Agent'] = value
def set_default_header(self, header_name, header_value):
self.default_headers[header_name] = header_value
_default = None
@classmethod
def get_default(cls):
"""Return new instance of ApiClient.
This method returns newly created, based on default constructor,
object of ApiClient class or returns a copy of default
ApiClient.
:return: The ApiClient object.
"""
if cls._default is None:
cls._default = ApiClient()
return cls._default
@classmethod
def set_default(cls, default):
"""Set default instance of ApiClient.
It stores default ApiClient.
:param default: object of ApiClient.
"""
cls._default = default
def param_serialize(
self,
method,
resource_path,
path_params=None,
query_params=None,
header_params=None,
body=None,
post_params=None,
files=None, auth_settings=None,
collection_formats=None,
_host=None,
_request_auth=None
) -> RequestSerialized:
"""Builds the HTTP request params needed by the request.
:param method: Method to call.
:param resource_path: Path to method endpoint.
:param path_params: Path parameters in the url.
:param query_params: Query parameters in the url.
:param header_params: Header parameters to be
placed in the request header.
:param body: Request body.
:param post_params dict: Request post form parameters,
for `application/x-www-form-urlencoded`, `multipart/form-data`.
:param auth_settings list: Auth Settings names for the request.
:param files dict: key -> filename, value -> filepath,
for `multipart/form-data`.
:param collection_formats: dict of collection formats for path, query,
header, and post parameters.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:return: tuple of form (path, http_method, query_params, header_params,
body, post_params, files)
"""
config = self.configuration
# header parameters
header_params = header_params or {}
header_params.update(self.default_headers)
if self.cookie:
header_params['Cookie'] = self.cookie
if header_params:
header_params = self.sanitize_for_serialization(header_params)
header_params = dict(
self.parameters_to_tuples(header_params,collection_formats)
)
# path parameters
if path_params:
path_params = self.sanitize_for_serialization(path_params)
path_params = self.parameters_to_tuples(
path_params,
collection_formats
)
for k, v in path_params:
# specified safe chars, encode everything
resource_path = resource_path.replace(
'{%s}' % k,
quote(str(v), safe=config.safe_chars_for_path_param)
)
# post parameters
if post_params or files:
post_params = post_params if post_params else []
post_params = self.sanitize_for_serialization(post_params)
post_params = self.parameters_to_tuples(
post_params,
collection_formats
)
post_params.extend(self.files_parameters(files))
# auth setting
self.update_params_for_auth(
header_params,
query_params,
auth_settings,
resource_path,
method,
body,
request_auth=_request_auth
)
# body
if body:
body = self.sanitize_for_serialization(body)
# request url
if _host is None:
url = self.configuration.host + resource_path
else:
# use server/host defined in path or operation instead
url = _host + resource_path
# query parameters
if query_params:
query_params = self.sanitize_for_serialization(query_params)
url_query = self.parameters_to_url_query(
query_params,
collection_formats
)
url += "?" + url_query
return method, url, header_params, body, post_params
def call_api(
self,
method,
url,
header_params=None,
body=None,
post_params=None,
_request_timeout=None
) -> rest.RESTResponse:
"""Makes the HTTP request (synchronous)
:param method: Method to call.
:param url: Path to method endpoint.
:param header_params: Header parameters to be
placed in the request header.
:param body: Request body.
:param post_params dict: Request post form parameters,
for `application/x-www-form-urlencoded`, `multipart/form-data`.
:param _request_timeout: timeout setting for this request.
:return: RESTResponse
"""
try:
# perform request and return response
response_data = self.rest_client.request(
method, url,
headers=header_params,
body=body, post_params=post_params,
_request_timeout=_request_timeout
)
except ApiException as e:
raise e
return response_data
def response_deserialize(
self,
response_data: rest.RESTResponse,
response_types_map: Optional[Dict[str, ApiResponseT]]=None
) -> ApiResponse[ApiResponseT]:
"""Deserializes response into an object.
:param response_data: RESTResponse object to be deserialized.
:param response_types_map: dict of response types.
:return: ApiResponse
"""
msg = "RESTResponse.read() must be called before passing it to response_deserialize()"
assert response_data.data is not None, msg
response_type = response_types_map.get(str(response_data.status), None)
if not response_type and isinstance(response_data.status, int) and 100 <= response_data.status <= 599:
# if not found, look for '1XX', '2XX', etc.
response_type = response_types_map.get(str(response_data.status)[0] + "XX", None)
# deserialize response data
response_text = None
return_data = None
try:
if response_type == "bytearray":
return_data = response_data.data
elif response_type == "file":
return_data = self.__deserialize_file(response_data)
elif response_type is not None:
match = None
content_type = response_data.getheader('content-type')
if content_type is not None:
match = re.search(r"charset=([a-zA-Z\-\d]+)[\s;]?", content_type)
encoding = match.group(1) if match else "utf-8"
response_text = response_data.data.decode(encoding)
return_data = self.deserialize(response_text, response_type)
finally:
if not 200 <= response_data.status <= 299:
raise ApiException.from_response(
http_resp=response_data,
body=response_text,
data=return_data,
)
return ApiResponse(
status_code = response_data.status,
data = return_data,
headers = response_data.getheaders(),
raw_data = response_data.data
)
def sanitize_for_serialization(self, obj):
"""Builds a JSON POST object.
If obj is None, return None.
If obj is str, int, long, float, bool, return directly.
If obj is datetime.datetime, datetime.date
convert to string in iso8601 format.
If obj is list, sanitize each element in the list.
If obj is dict, return the dict.
If obj is OpenAPI model, return the properties dict.
:param obj: The data to serialize.
:return: The serialized form of data.
"""
if obj is None:
return None
elif isinstance(obj, self.PRIMITIVE_TYPES):
return obj
elif isinstance(obj, list):
return [
self.sanitize_for_serialization(sub_obj) for sub_obj in obj
]
elif isinstance(obj, tuple):
return tuple(
self.sanitize_for_serialization(sub_obj) for sub_obj in obj
)
elif isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
elif isinstance(obj, dict):
obj_dict = obj
else:
# Convert model obj to dict except
# attributes `openapi_types`, `attribute_map`
# and attributes which value is not None.
# Convert attribute name to json key in
# model definition for request.
obj_dict = obj.to_dict()
return {
key: self.sanitize_for_serialization(val)
for key, val in obj_dict.items()
}
def deserialize(self, response_text, response_type):
"""Deserializes response into an object.
:param response: RESTResponse object to be deserialized.
:param response_type: class literal for
deserialized object, or string of class name.
:return: deserialized object.
"""
# fetch data from response object
try:
data = json.loads(response_text)
except ValueError:
data = response_text
return self.__deserialize(data, response_type)
def __deserialize(self, data, klass):
"""Deserializes dict, list, str into an object.
:param data: dict, list or str.
:param klass: class literal, or string of class name.
:return: object.
"""
if data is None:
return None
if isinstance(klass, str):
if klass.startswith('List['):
m = re.match(r'List\[(.*)]', klass)
assert m is not None, "Malformed List type definition"
sub_kls = m.group(1)
return [self.__deserialize(sub_data, sub_kls)
for sub_data in data]
if klass.startswith('Dict['):
m = re.match(r'Dict\[([^,]*), (.*)]', klass)
assert m is not None, "Malformed Dict type definition"
sub_kls = m.group(2)
return {k: self.__deserialize(v, sub_kls)
for k, v in data.items()}
# convert str to class
if klass in self.NATIVE_TYPES_MAPPING:
klass = self.NATIVE_TYPES_MAPPING[klass]
else:
klass = getattr(openapi_client.models, klass)
if klass in self.PRIMITIVE_TYPES:
return self.__deserialize_primitive(data, klass)
elif klass == object:
return self.__deserialize_object(data)
elif klass == datetime.date:
return self.__deserialize_date(data)
elif klass == datetime.datetime:
return self.__deserialize_datetime(data)
elif issubclass(klass, Enum):
return self.__deserialize_enum(data, klass)
else:
return self.__deserialize_model(data, klass)
def parameters_to_tuples(self, params, collection_formats):
"""Get parameters as list of tuples, formatting collections.
:param params: Parameters as dict or list of two-tuples
:param dict collection_formats: Parameter collection formats
:return: Parameters as list of tuples, collections formatted
"""
new_params: List[Tuple[str, str]] = []
if collection_formats is None:
collection_formats = {}
for k, v in params.items() if isinstance(params, dict) else params:
if k in collection_formats:
collection_format = collection_formats[k]
if collection_format == 'multi':
new_params.extend((k, value) for value in v)
else:
if collection_format == 'ssv':
delimiter = ' '
elif collection_format == 'tsv':
delimiter = '\t'
elif collection_format == 'pipes':
delimiter = '|'
else: # csv is the default
delimiter = ','
new_params.append(
(k, delimiter.join(str(value) for value in v)))
else:
new_params.append((k, v))
return new_params
def parameters_to_url_query(self, params, collection_formats):
"""Get parameters as list of tuples, formatting collections.
:param params: Parameters as dict or list of two-tuples
:param dict collection_formats: Parameter collection formats
:return: URL query string (e.g. a=Hello%20World&b=123)
"""
new_params: List[Tuple[str, str]] = []
if collection_formats is None:
collection_formats = {}
for k, v in params.items() if isinstance(params, dict) else params:
if isinstance(v, bool):
v = str(v).lower()
if isinstance(v, (int, float)):
v = str(v)
if isinstance(v, dict):
v = json.dumps(v)
if k in collection_formats:
collection_format = collection_formats[k]
if collection_format == 'multi':
new_params.extend((k, str(value)) for value in v)
else:
if collection_format == 'ssv':
delimiter = ' '
elif collection_format == 'tsv':
delimiter = '\t'
elif collection_format == 'pipes':
delimiter = '|'
else: # csv is the default
delimiter = ','
new_params.append(
(k, delimiter.join(quote(str(value)) for value in v))
)
else:
new_params.append((k, quote(str(v))))
return "&".join(["=".join(map(str, item)) for item in new_params])
def files_parameters(self, files=None):
"""Builds form parameters.
:param files: File parameters.
:return: Form parameters with files.
"""
params = []
if files:
for k, v in files.items():
if not v:
continue
file_names = v if type(v) is list else [v]
for n in file_names:
with open(n, 'rb') as f:
filename = os.path.basename(f.name)
filedata = f.read()
mimetype = (
mimetypes.guess_type(filename)[0]
or 'application/octet-stream'
)
params.append(
tuple([k, tuple([filename, filedata, mimetype])])
)
return params
def select_header_accept(self, accepts: List[str]) -> Optional[str]:
"""Returns `Accept` based on an array of accepts provided.
:param accepts: List of headers.
:return: Accept (e.g. application/json).
"""
if not accepts:
return None
for accept in accepts:
if re.search('json', accept, re.IGNORECASE):
return accept
return accepts[0]
def select_header_content_type(self, content_types):
"""Returns `Content-Type` based on an array of content_types provided.
:param content_types: List of content-types.
:return: Content-Type (e.g. application/json).
"""
if not content_types:
return None
for content_type in content_types:
if re.search('json', content_type, re.IGNORECASE):
return content_type
return content_types[0]
def update_params_for_auth(
self,
headers,
queries,
auth_settings,
resource_path,
method,
body,
request_auth=None
) -> None:
"""Updates header and query params based on authentication setting.
:param headers: Header parameters dict to be updated.
:param queries: Query parameters tuple list to be updated.
:param auth_settings: Authentication setting identifiers list.
:resource_path: A string representation of the HTTP request resource path.
:method: A string representation of the HTTP request method.
:body: A object representing the body of the HTTP request.
The object type is the return value of sanitize_for_serialization().
:param request_auth: if set, the provided settings will
override the token in the configuration.
"""
if not auth_settings:
return
if request_auth:
self._apply_auth_params(
headers,
queries,
resource_path,
method,
body,
request_auth
)
else:
for auth in auth_settings:
auth_setting = self.configuration.auth_settings().get(auth)
if auth_setting:
self._apply_auth_params(
headers,
queries,
resource_path,
method,
body,
auth_setting
)
def _apply_auth_params(
self,
headers,
queries,
resource_path,
method,
body,
auth_setting
) -> None:
"""Updates the request parameters based on a single auth_setting
:param headers: Header parameters dict to be updated.
:param queries: Query parameters tuple list to be updated.
:resource_path: A string representation of the HTTP request resource path.
:method: A string representation of the HTTP request method.
:body: A object representing the body of the HTTP request.
The object type is the return value of sanitize_for_serialization().
:param auth_setting: auth settings for the endpoint
"""
if auth_setting['in'] == 'cookie':
headers['Cookie'] = auth_setting['value']
elif auth_setting['in'] == 'header':
if auth_setting['type'] != 'http-signature':
headers[auth_setting['key']] = auth_setting['value']
elif auth_setting['in'] == 'query':
queries.append((auth_setting['key'], auth_setting['value']))
else:
raise ApiValueError(
'Authentication token must be in `query` or `header`'
)
def __deserialize_file(self, response):
"""Deserializes body to file
Saves response body into a file in a temporary folder,
using the filename from the `Content-Disposition` header if provided.
handle file downloading
save response body into a tmp file and return the instance
:param response: RESTResponse.
:return: file path.
"""
fd, path = tempfile.mkstemp(dir=self.configuration.temp_folder_path)
os.close(fd)
os.remove(path)
content_disposition = response.getheader("Content-Disposition")
if content_disposition:
m = re.search(
r'filename=[\'"]?([^\'"\s]+)[\'"]?',
content_disposition
)
assert m is not None, "Unexpected 'content-disposition' header value"
filename = m.group(1)
path = os.path.join(os.path.dirname(path), filename)
with open(path, "wb") as f:
f.write(response.data)
return path
def __deserialize_primitive(self, data, klass):
"""Deserializes string to primitive type.
:param data: str.
:param klass: class literal.
:return: int, long, float, str, bool.
"""
try:
return klass(data)
except UnicodeEncodeError:
return str(data)
except TypeError:
return data
def __deserialize_object(self, value):
"""Return an original value.
:return: object.
"""
return value
def __deserialize_date(self, string):
"""Deserializes string to date.
:param string: str.
:return: date.
"""
try:
return parse(string).date()
except ImportError:
return string
except ValueError:
raise rest.ApiException(
status=0,
reason="Failed to parse `{0}` as date object".format(string)
)
def __deserialize_datetime(self, string):
"""Deserializes string to datetime.
The string should be in iso8601 datetime format.
:param string: str.
:return: datetime.
"""
try:
return parse(string)
except ImportError:
return string
except ValueError:
raise rest.ApiException(
status=0,
reason=(
"Failed to parse `{0}` as datetime object"
.format(string)
)
)
def __deserialize_enum(self, data, klass):
"""Deserializes primitive type to enum.
:param data: primitive type.
:param klass: class literal.
:return: enum value.
"""
try:
return klass(data)
except ValueError:
raise rest.ApiException(
status=0,
reason=(
"Failed to parse `{0}` as `{1}`"
.format(data, klass)
)
)
def __deserialize_model(self, data, klass):
"""Deserializes list or dict to model.
:param data: dict, list.
:param klass: class literal.
:return: model object.
"""
return klass.from_dict(data)

View File

@@ -0,0 +1,21 @@
"""API response object."""
from __future__ import annotations
from typing import Optional, Generic, Mapping, TypeVar
from pydantic import Field, StrictInt, StrictBytes, BaseModel
T = TypeVar("T")
class ApiResponse(BaseModel, Generic[T]):
"""
API response object
"""
status_code: StrictInt = Field(description="HTTP status code")
headers: Optional[Mapping[str, str]] = Field(None, description="HTTP headers")
data: T = Field(description="Deserialized data given the data type")
raw_data: StrictBytes = Field(description="Raw data (HTTP response body)")
model_config = {
"arbitrary_types_allowed": True
}

View File

@@ -0,0 +1,436 @@
# coding: utf-8
"""
PeerNetService API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
The version of the OpenAPI document: 0.0.1
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
import copy
import logging
from logging import FileHandler
import multiprocessing
import sys
from typing import Optional
import urllib3
import http.client as httplib
JSON_SCHEMA_VALIDATION_KEYWORDS = {
'multipleOf', 'maximum', 'exclusiveMaximum',
'minimum', 'exclusiveMinimum', 'maxLength',
'minLength', 'pattern', 'maxItems', 'minItems'
}
class Configuration:
"""This class contains various settings of the API client.
:param host: Base url.
:param api_key: Dict to store API key(s).
Each entry in the dict specifies an API key.
The dict key is the name of the security scheme in the OAS specification.
The dict value is the API key secret.
:param api_key_prefix: Dict to store API prefix (e.g. Bearer).
The dict key is the name of the security scheme in the OAS specification.
The dict value is an API key prefix when generating the auth data.
:param username: Username for HTTP basic authentication.
:param password: Password for HTTP basic authentication.
:param access_token: Access token.
:param server_index: Index to servers configuration.
:param server_variables: Mapping with string values to replace variables in
templated server configuration. The validation of enums is performed for
variables with defined enum values before.
:param server_operation_index: Mapping from operation ID to an index to server
configuration.
:param server_operation_variables: Mapping from operation ID to a mapping with
string values to replace variables in templated server configuration.
The validation of enums is performed for variables with defined enum
values before.
:param ssl_ca_cert: str - the path to a file of concatenated CA certificates
in PEM format.
"""
_default = None
def __init__(self, host=None,
api_key=None, api_key_prefix=None,
username=None, password=None,
access_token=None,
server_index=None, server_variables=None,
server_operation_index=None, server_operation_variables=None,
ssl_ca_cert=None,
) -> None:
"""Constructor
"""
self._base_path = "http://localhost" if host is None else host
"""Default Base url
"""
self.server_index = 0 if server_index is None and host is None else server_index
self.server_operation_index = server_operation_index or {}
"""Default server index
"""
self.server_variables = server_variables or {}
self.server_operation_variables = server_operation_variables or {}
"""Default server variables
"""
self.temp_folder_path = None
"""Temp file folder for downloading files
"""
# Authentication Settings
self.api_key = {}
if api_key:
self.api_key = api_key
"""dict to store API key(s)
"""
self.api_key_prefix = {}
if api_key_prefix:
self.api_key_prefix = api_key_prefix
"""dict to store API prefix (e.g. Bearer)
"""
self.refresh_api_key_hook = None
"""function hook to refresh API key if expired
"""
self.username = username
"""Username for HTTP basic authentication
"""
self.password = password
"""Password for HTTP basic authentication
"""
self.access_token = access_token
"""Access token
"""
self.logger = {}
"""Logging Settings
"""
self.logger["package_logger"] = logging.getLogger("openapi_client")
self.logger["urllib3_logger"] = logging.getLogger("urllib3")
self.logger_format = '%(asctime)s %(levelname)s %(message)s'
"""Log format
"""
self.logger_stream_handler = None
"""Log stream handler
"""
self.logger_file_handler: Optional[FileHandler] = None
"""Log file handler
"""
self.logger_file = None
"""Debug file location
"""
self.debug = False
"""Debug switch
"""
self.verify_ssl = True
"""SSL/TLS verification
Set this to false to skip verifying SSL certificate when calling API
from https server.
"""
self.ssl_ca_cert = ssl_ca_cert
"""Set this to customize the certificate file to verify the peer.
"""
self.cert_file = None
"""client certificate file
"""
self.key_file = None
"""client key file
"""
self.assert_hostname = None
"""Set this to True/False to enable/disable SSL hostname verification.
"""
self.tls_server_name = None
"""SSL/TLS Server Name Indication (SNI)
Set this to the SNI value expected by the server.
"""
self.connection_pool_maxsize = multiprocessing.cpu_count() * 5
"""urllib3 connection pool's maximum number of connections saved
per pool. urllib3 uses 1 connection as default value, but this is
not the best value when you are making a lot of possibly parallel
requests to the same host, which is often the case here.
cpu_count * 5 is used as default value to increase performance.
"""
self.proxy: Optional[str] = None
"""Proxy URL
"""
self.proxy_headers = None
"""Proxy headers
"""
self.safe_chars_for_path_param = ''
"""Safe chars for path_param
"""
self.retries = None
"""Adding retries to override urllib3 default value 3
"""
# Enable client side validation
self.client_side_validation = True
self.socket_options = None
"""Options to pass down to the underlying urllib3 socket
"""
self.datetime_format = "%Y-%m-%dT%H:%M:%S.%f%z"
"""datetime format
"""
self.date_format = "%Y-%m-%d"
"""date format
"""
def __deepcopy__(self, memo):
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
if k not in ('logger', 'logger_file_handler'):
setattr(result, k, copy.deepcopy(v, memo))
# shallow copy of loggers
result.logger = copy.copy(self.logger)
# use setters to configure loggers
result.logger_file = self.logger_file
result.debug = self.debug
return result
def __setattr__(self, name, value):
object.__setattr__(self, name, value)
@classmethod
def set_default(cls, default):
"""Set default instance of configuration.
It stores default configuration, which can be
returned by get_default_copy method.
:param default: object of Configuration
"""
cls._default = default
@classmethod
def get_default_copy(cls):
"""Deprecated. Please use `get_default` instead.
Deprecated. Please use `get_default` instead.
:return: The configuration object.
"""
return cls.get_default()
@classmethod
def get_default(cls):
"""Return the default configuration.
This method returns newly created, based on default constructor,
object of Configuration class or returns a copy of default
configuration.
:return: The configuration object.
"""
if cls._default is None:
cls._default = Configuration()
return cls._default
@property
def logger_file(self):
"""The logger file.
If the logger_file is None, then add stream handler and remove file
handler. Otherwise, add file handler and remove stream handler.
:param value: The logger_file path.
:type: str
"""
return self.__logger_file
@logger_file.setter
def logger_file(self, value):
"""The logger file.
If the logger_file is None, then add stream handler and remove file
handler. Otherwise, add file handler and remove stream handler.
:param value: The logger_file path.
:type: str
"""
self.__logger_file = value
if self.__logger_file:
# If set logging file,
# then add file handler and remove stream handler.
self.logger_file_handler = logging.FileHandler(self.__logger_file)
self.logger_file_handler.setFormatter(self.logger_formatter)
for _, logger in self.logger.items():
logger.addHandler(self.logger_file_handler)
@property
def debug(self):
"""Debug status
:param value: The debug status, True or False.
:type: bool
"""
return self.__debug
@debug.setter
def debug(self, value):
"""Debug status
:param value: The debug status, True or False.
:type: bool
"""
self.__debug = value
if self.__debug:
# if debug status is True, turn on debug logging
for _, logger in self.logger.items():
logger.setLevel(logging.DEBUG)
# turn on httplib debug
httplib.HTTPConnection.debuglevel = 1
else:
# if debug status is False, turn off debug logging,
# setting log level to default `logging.WARNING`
for _, logger in self.logger.items():
logger.setLevel(logging.WARNING)
# turn off httplib debug
httplib.HTTPConnection.debuglevel = 0
@property
def logger_format(self):
"""The logger format.
The logger_formatter will be updated when sets logger_format.
:param value: The format string.
:type: str
"""
return self.__logger_format
@logger_format.setter
def logger_format(self, value):
"""The logger format.
The logger_formatter will be updated when sets logger_format.
:param value: The format string.
:type: str
"""
self.__logger_format = value
self.logger_formatter = logging.Formatter(self.__logger_format)
def get_api_key_with_prefix(self, identifier, alias=None):
"""Gets API key (with prefix if set).
:param identifier: The identifier of apiKey.
:param alias: The alternative identifier of apiKey.
:return: The token for api key authentication.
"""
if self.refresh_api_key_hook is not None:
self.refresh_api_key_hook(self)
key = self.api_key.get(identifier, self.api_key.get(alias) if alias is not None else None)
if key:
prefix = self.api_key_prefix.get(identifier)
if prefix:
return "%s %s" % (prefix, key)
else:
return key
def get_basic_auth_token(self):
"""Gets HTTP basic authentication header (string).
:return: The token for basic HTTP authentication.
"""
username = ""
if self.username is not None:
username = self.username
password = ""
if self.password is not None:
password = self.password
return urllib3.util.make_headers(
basic_auth=username + ':' + password
).get('authorization')
def auth_settings(self):
"""Gets Auth Settings dict for api client.
:return: The Auth Settings information dict.
"""
auth = {}
return auth
def to_debug_report(self):
"""Gets the essential information for debugging.
:return: The report for debugging.
"""
return "Python SDK Debug Report:\n"\
"OS: {env}\n"\
"Python Version: {pyversion}\n"\
"Version of the API: 0.0.1\n"\
"SDK Package Version: 1.0.0".\
format(env=sys.platform, pyversion=sys.version)
def get_host_settings(self):
"""Gets an array of host settings
:return: An array of host settings
"""
return [
{
'url': "",
'description': "No description provided",
}
]
def get_host_from_settings(self, index, variables=None, servers=None):
"""Gets host URL based on the index and variables
:param index: array index of the host settings
:param variables: hash of variable and the corresponding value
:param servers: an array of host settings or None
:return: URL based on host settings
"""
if index is None:
return self._base_path
variables = {} if variables is None else variables
servers = self.get_host_settings() if servers is None else servers
try:
server = servers[index]
except IndexError:
raise ValueError(
"Invalid index {0} when selecting the host settings. "
"Must be less than {1}".format(index, len(servers)))
url = server['url']
# go through variables and replace placeholders
for variable_name, variable in server.get('variables', {}).items():
used_value = variables.get(
variable_name, variable['default_value'])
if 'enum_values' in variable \
and used_value not in variable['enum_values']:
raise ValueError(
"The variable `{0}` in the host URL has invalid value "
"{1}. Must be {2}.".format(
variable_name, variables[variable_name],
variable['enum_values']))
url = url.replace("{" + variable_name + "}", used_value)
return url
@property
def host(self):
"""Return generated host."""
return self.get_host_from_settings(self.server_index, variables=self.server_variables)
@host.setter
def host(self, value):
"""Fix base path."""
self._base_path = value
self.server_index = None

View File

@@ -0,0 +1,199 @@
# coding: utf-8
"""
PeerNetService API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
The version of the OpenAPI document: 0.0.1
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
from typing import Any, Optional
from typing_extensions import Self
class OpenApiException(Exception):
"""The base exception class for all OpenAPIExceptions"""
class ApiTypeError(OpenApiException, TypeError):
def __init__(self, msg, path_to_item=None, valid_classes=None,
key_type=None) -> None:
""" Raises an exception for TypeErrors
Args:
msg (str): the exception message
Keyword Args:
path_to_item (list): a list of keys an indices to get to the
current_item
None if unset
valid_classes (tuple): the primitive classes that current item
should be an instance of
None if unset
key_type (bool): False if our value is a value in a dict
True if it is a key in a dict
False if our item is an item in a list
None if unset
"""
self.path_to_item = path_to_item
self.valid_classes = valid_classes
self.key_type = key_type
full_msg = msg
if path_to_item:
full_msg = "{0} at {1}".format(msg, render_path(path_to_item))
super(ApiTypeError, self).__init__(full_msg)
class ApiValueError(OpenApiException, ValueError):
def __init__(self, msg, path_to_item=None) -> None:
"""
Args:
msg (str): the exception message
Keyword Args:
path_to_item (list) the path to the exception in the
received_data dict. None if unset
"""
self.path_to_item = path_to_item
full_msg = msg
if path_to_item:
full_msg = "{0} at {1}".format(msg, render_path(path_to_item))
super(ApiValueError, self).__init__(full_msg)
class ApiAttributeError(OpenApiException, AttributeError):
def __init__(self, msg, path_to_item=None) -> None:
"""
Raised when an attribute reference or assignment fails.
Args:
msg (str): the exception message
Keyword Args:
path_to_item (None/list) the path to the exception in the
received_data dict
"""
self.path_to_item = path_to_item
full_msg = msg
if path_to_item:
full_msg = "{0} at {1}".format(msg, render_path(path_to_item))
super(ApiAttributeError, self).__init__(full_msg)
class ApiKeyError(OpenApiException, KeyError):
def __init__(self, msg, path_to_item=None) -> None:
"""
Args:
msg (str): the exception message
Keyword Args:
path_to_item (None/list) the path to the exception in the
received_data dict
"""
self.path_to_item = path_to_item
full_msg = msg
if path_to_item:
full_msg = "{0} at {1}".format(msg, render_path(path_to_item))
super(ApiKeyError, self).__init__(full_msg)
class ApiException(OpenApiException):
def __init__(
self,
status=None,
reason=None,
http_resp=None,
*,
body: Optional[str] = None,
data: Optional[Any] = None,
) -> None:
self.status = status
self.reason = reason
self.body = body
self.data = data
self.headers = None
if http_resp:
if self.status is None:
self.status = http_resp.status
if self.reason is None:
self.reason = http_resp.reason
if self.body is None:
try:
self.body = http_resp.data.decode('utf-8')
except Exception:
pass
self.headers = http_resp.getheaders()
@classmethod
def from_response(
cls,
*,
http_resp,
body: Optional[str],
data: Optional[Any],
) -> Self:
if http_resp.status == 400:
raise BadRequestException(http_resp=http_resp, body=body, data=data)
if http_resp.status == 401:
raise UnauthorizedException(http_resp=http_resp, body=body, data=data)
if http_resp.status == 403:
raise ForbiddenException(http_resp=http_resp, body=body, data=data)
if http_resp.status == 404:
raise NotFoundException(http_resp=http_resp, body=body, data=data)
if 500 <= http_resp.status <= 599:
raise ServiceException(http_resp=http_resp, body=body, data=data)
raise ApiException(http_resp=http_resp, body=body, data=data)
def __str__(self):
"""Custom error messages for exception"""
error_message = "({0})\n"\
"Reason: {1}\n".format(self.status, self.reason)
if self.headers:
error_message += "HTTP response headers: {0}\n".format(
self.headers)
if self.data or self.body:
error_message += "HTTP response body: {0}\n".format(self.data or self.body)
return error_message
class BadRequestException(ApiException):
pass
class NotFoundException(ApiException):
pass
class UnauthorizedException(ApiException):
pass
class ForbiddenException(ApiException):
pass
class ServiceException(ApiException):
pass
def render_path(path_to_item):
"""Returns a string representation of a path"""
result = ""
for pth in path_to_item:
if isinstance(pth, int):
result += "[{0}]".format(pth)
else:
result += "['{0}']".format(pth)
return result

View File

@@ -0,0 +1,26 @@
# coding: utf-8
# flake8: noqa
"""
PeerNetService API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
The version of the OpenAPI document: 0.0.1
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
# import models into model package
from openapi_client.models.claim_ice_candidates_response import ClaimIceCandidatesResponse
from openapi_client.models.google_protobuf_any import GoogleProtobufAny
from openapi_client.models.ice_candidate import IceCandidate
from openapi_client.models.ice_session_description import IceSessionDescription
from openapi_client.models.knock import Knock
from openapi_client.models.list_knocks_response import ListKnocksResponse
from openapi_client.models.room import Room
from openapi_client.models.server import Server
from openapi_client.models.service import Service
from openapi_client.models.status import Status

View File

@@ -0,0 +1,95 @@
# coding: utf-8
"""
PeerNetService API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
The version of the OpenAPI document: 0.0.1
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
from __future__ import annotations
import pprint
import re # noqa: F401
import json
from pydantic import BaseModel, ConfigDict, Field
from typing import Any, ClassVar, Dict, List, Optional
from openapi_client.models.ice_candidate import IceCandidate
from typing import Optional, Set
from typing_extensions import Self
class ClaimIceCandidatesResponse(BaseModel):
"""
ClaimIceCandidatesResponse
""" # noqa: E501
ice_candidates: Optional[List[IceCandidate]] = Field(default=None, alias="iceCandidates")
__properties: ClassVar[List[str]] = ["iceCandidates"]
model_config = ConfigDict(
populate_by_name=True,
validate_assignment=True,
protected_namespaces=(),
)
def to_str(self) -> str:
"""Returns the string representation of the model using alias"""
return pprint.pformat(self.model_dump(by_alias=True))
def to_json(self) -> str:
"""Returns the JSON representation of the model using alias"""
# TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> Optional[Self]:
"""Create an instance of ClaimIceCandidatesResponse from a JSON string"""
return cls.from_dict(json.loads(json_str))
def to_dict(self) -> Dict[str, Any]:
"""Return the dictionary representation of the model using alias.
This has the following differences from calling pydantic's
`self.model_dump(by_alias=True)`:
* `None` is only added to the output dict for nullable fields that
were set at model initialization. Other fields with value `None`
are ignored.
"""
excluded_fields: Set[str] = set([
])
_dict = self.model_dump(
by_alias=True,
exclude=excluded_fields,
exclude_none=True,
)
# override the default output from pydantic by calling `to_dict()` of each item in ice_candidates (list)
_items = []
if self.ice_candidates:
for _item in self.ice_candidates:
if _item:
_items.append(_item.to_dict())
_dict['iceCandidates'] = _items
return _dict
@classmethod
def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
"""Create an instance of ClaimIceCandidatesResponse from a dict"""
if obj is None:
return None
if not isinstance(obj, dict):
return cls.model_validate(obj)
_obj = cls.model_validate({
"iceCandidates": [IceCandidate.from_dict(_item) for _item in obj["iceCandidates"]] if obj.get("iceCandidates") is not None else None
})
return _obj

View File

@@ -0,0 +1,100 @@
# coding: utf-8
"""
PeerNetService API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
The version of the OpenAPI document: 0.0.1
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
from __future__ import annotations
import pprint
import re # noqa: F401
import json
from pydantic import BaseModel, ConfigDict, Field, StrictStr
from typing import Any, ClassVar, Dict, List, Optional
from typing import Optional, Set
from typing_extensions import Self
class GoogleProtobufAny(BaseModel):
"""
Contains an arbitrary serialized message along with a @type that describes the type of the serialized message.
""" # noqa: E501
type: Optional[StrictStr] = Field(default=None, description="The type of the serialized message.", alias="@type")
additional_properties: Dict[str, Any] = {}
__properties: ClassVar[List[str]] = ["@type"]
model_config = ConfigDict(
populate_by_name=True,
validate_assignment=True,
protected_namespaces=(),
)
def to_str(self) -> str:
"""Returns the string representation of the model using alias"""
return pprint.pformat(self.model_dump(by_alias=True))
def to_json(self) -> str:
"""Returns the JSON representation of the model using alias"""
# TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> Optional[Self]:
"""Create an instance of GoogleProtobufAny from a JSON string"""
return cls.from_dict(json.loads(json_str))
def to_dict(self) -> Dict[str, Any]:
"""Return the dictionary representation of the model using alias.
This has the following differences from calling pydantic's
`self.model_dump(by_alias=True)`:
* `None` is only added to the output dict for nullable fields that
were set at model initialization. Other fields with value `None`
are ignored.
* Fields in `self.additional_properties` are added to the output dict.
"""
excluded_fields: Set[str] = set([
"additional_properties",
])
_dict = self.model_dump(
by_alias=True,
exclude=excluded_fields,
exclude_none=True,
)
# puts key-value pairs in additional_properties in the top level
if self.additional_properties is not None:
for _key, _value in self.additional_properties.items():
_dict[_key] = _value
return _dict
@classmethod
def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
"""Create an instance of GoogleProtobufAny from a dict"""
if obj is None:
return None
if not isinstance(obj, dict):
return cls.model_validate(obj)
_obj = cls.model_validate({
"@type": obj.get("@type")
})
# store additional fields in additional_properties
for _key in obj.keys():
if _key not in cls.__properties:
_obj.additional_properties[_key] = obj.get(_key)
return _obj

View File

@@ -0,0 +1,95 @@
# coding: utf-8
"""
PeerNetService API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
The version of the OpenAPI document: 0.0.1
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
from __future__ import annotations
import pprint
import re # noqa: F401
import json
from pydantic import BaseModel, ConfigDict, Field, StrictInt, StrictStr
from typing import Any, ClassVar, Dict, List, Optional
from typing import Optional, Set
from typing_extensions import Self
class IceCandidate(BaseModel):
"""
IceCandidate
""" # noqa: E501
name: Optional[StrictStr] = None
candidate: Optional[StrictStr] = None
sdp_mid: Optional[StrictStr] = Field(default=None, alias="sdpMid")
sdp_line_index: Optional[StrictInt] = Field(default=None, alias="sdpLineIndex")
username_fragment: Optional[StrictStr] = Field(default=None, alias="usernameFragment")
__properties: ClassVar[List[str]] = ["name", "candidate", "sdpMid", "sdpLineIndex", "usernameFragment"]
model_config = ConfigDict(
populate_by_name=True,
validate_assignment=True,
protected_namespaces=(),
)
def to_str(self) -> str:
"""Returns the string representation of the model using alias"""
return pprint.pformat(self.model_dump(by_alias=True))
def to_json(self) -> str:
"""Returns the JSON representation of the model using alias"""
# TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> Optional[Self]:
"""Create an instance of IceCandidate from a JSON string"""
return cls.from_dict(json.loads(json_str))
def to_dict(self) -> Dict[str, Any]:
"""Return the dictionary representation of the model using alias.
This has the following differences from calling pydantic's
`self.model_dump(by_alias=True)`:
* `None` is only added to the output dict for nullable fields that
were set at model initialization. Other fields with value `None`
are ignored.
"""
excluded_fields: Set[str] = set([
])
_dict = self.model_dump(
by_alias=True,
exclude=excluded_fields,
exclude_none=True,
)
return _dict
@classmethod
def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
"""Create an instance of IceCandidate from a dict"""
if obj is None:
return None
if not isinstance(obj, dict):
return cls.model_validate(obj)
_obj = cls.model_validate({
"name": obj.get("name"),
"candidate": obj.get("candidate"),
"sdpMid": obj.get("sdpMid"),
"sdpLineIndex": obj.get("sdpLineIndex"),
"usernameFragment": obj.get("usernameFragment")
})
return _obj

View File

@@ -0,0 +1,91 @@
# coding: utf-8
"""
PeerNetService API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
The version of the OpenAPI document: 0.0.1
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
from __future__ import annotations
import pprint
import re # noqa: F401
import json
from pydantic import BaseModel, ConfigDict, Field, StrictStr
from typing import Any, ClassVar, Dict, List, Optional
from typing import Optional, Set
from typing_extensions import Self
class IceSessionDescription(BaseModel):
"""
IceSessionDescription
""" # noqa: E501
name: Optional[StrictStr] = Field(default=None, description="A unique identifier which can be used to send ICE candidates Maps to the session name")
sdp_type: Optional[StrictStr] = Field(default=None, description="Used to construct the remote description in WebRTC", alias="sdpType")
sdp: Optional[StrictStr] = None
__properties: ClassVar[List[str]] = ["name", "sdpType", "sdp"]
model_config = ConfigDict(
populate_by_name=True,
validate_assignment=True,
protected_namespaces=(),
)
def to_str(self) -> str:
"""Returns the string representation of the model using alias"""
return pprint.pformat(self.model_dump(by_alias=True))
def to_json(self) -> str:
"""Returns the JSON representation of the model using alias"""
# TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> Optional[Self]:
"""Create an instance of IceSessionDescription from a JSON string"""
return cls.from_dict(json.loads(json_str))
def to_dict(self) -> Dict[str, Any]:
"""Return the dictionary representation of the model using alias.
This has the following differences from calling pydantic's
`self.model_dump(by_alias=True)`:
* `None` is only added to the output dict for nullable fields that
were set at model initialization. Other fields with value `None`
are ignored.
"""
excluded_fields: Set[str] = set([
])
_dict = self.model_dump(
by_alias=True,
exclude=excluded_fields,
exclude_none=True,
)
return _dict
@classmethod
def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
"""Create an instance of IceSessionDescription from a dict"""
if obj is None:
return None
if not isinstance(obj, dict):
return cls.model_validate(obj)
_obj = cls.model_validate({
"name": obj.get("name"),
"sdpType": obj.get("sdpType"),
"sdp": obj.get("sdp")
})
return _obj

View File

@@ -0,0 +1,98 @@
# coding: utf-8
"""
PeerNetService API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
The version of the OpenAPI document: 0.0.1
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
from __future__ import annotations
import pprint
import re # noqa: F401
import json
from pydantic import BaseModel, ConfigDict, Field, StrictStr
from typing import Any, ClassVar, Dict, List, Optional
from openapi_client.models.ice_session_description import IceSessionDescription
from typing import Optional, Set
from typing_extensions import Self
class Knock(BaseModel):
"""
Knock
""" # noqa: E501
name: Optional[StrictStr] = None
offer: Optional[IceSessionDescription] = Field(default=None, description="The service being connected too")
answer: Optional[IceSessionDescription] = None
__properties: ClassVar[List[str]] = ["name", "offer", "answer"]
model_config = ConfigDict(
populate_by_name=True,
validate_assignment=True,
protected_namespaces=(),
)
def to_str(self) -> str:
"""Returns the string representation of the model using alias"""
return pprint.pformat(self.model_dump(by_alias=True))
def to_json(self) -> str:
"""Returns the JSON representation of the model using alias"""
# TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> Optional[Self]:
"""Create an instance of Knock from a JSON string"""
return cls.from_dict(json.loads(json_str))
def to_dict(self) -> Dict[str, Any]:
"""Return the dictionary representation of the model using alias.
This has the following differences from calling pydantic's
`self.model_dump(by_alias=True)`:
* `None` is only added to the output dict for nullable fields that
were set at model initialization. Other fields with value `None`
are ignored.
"""
excluded_fields: Set[str] = set([
])
_dict = self.model_dump(
by_alias=True,
exclude=excluded_fields,
exclude_none=True,
)
# override the default output from pydantic by calling `to_dict()` of offer
if self.offer:
_dict['offer'] = self.offer.to_dict()
# override the default output from pydantic by calling `to_dict()` of answer
if self.answer:
_dict['answer'] = self.answer.to_dict()
return _dict
@classmethod
def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
"""Create an instance of Knock from a dict"""
if obj is None:
return None
if not isinstance(obj, dict):
return cls.model_validate(obj)
_obj = cls.model_validate({
"name": obj.get("name"),
"offer": IceSessionDescription.from_dict(obj["offer"]) if obj.get("offer") is not None else None,
"answer": IceSessionDescription.from_dict(obj["answer"]) if obj.get("answer") is not None else None
})
return _obj

View File

@@ -0,0 +1,95 @@
# coding: utf-8
"""
PeerNetService API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
The version of the OpenAPI document: 0.0.1
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
from __future__ import annotations
import pprint
import re # noqa: F401
import json
from pydantic import BaseModel, ConfigDict
from typing import Any, ClassVar, Dict, List, Optional
from openapi_client.models.knock import Knock
from typing import Optional, Set
from typing_extensions import Self
class ListKnocksResponse(BaseModel):
"""
ListKnocksResponse
""" # noqa: E501
knocks: Optional[List[Knock]] = None
__properties: ClassVar[List[str]] = ["knocks"]
model_config = ConfigDict(
populate_by_name=True,
validate_assignment=True,
protected_namespaces=(),
)
def to_str(self) -> str:
"""Returns the string representation of the model using alias"""
return pprint.pformat(self.model_dump(by_alias=True))
def to_json(self) -> str:
"""Returns the JSON representation of the model using alias"""
# TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> Optional[Self]:
"""Create an instance of ListKnocksResponse from a JSON string"""
return cls.from_dict(json.loads(json_str))
def to_dict(self) -> Dict[str, Any]:
"""Return the dictionary representation of the model using alias.
This has the following differences from calling pydantic's
`self.model_dump(by_alias=True)`:
* `None` is only added to the output dict for nullable fields that
were set at model initialization. Other fields with value `None`
are ignored.
"""
excluded_fields: Set[str] = set([
])
_dict = self.model_dump(
by_alias=True,
exclude=excluded_fields,
exclude_none=True,
)
# override the default output from pydantic by calling `to_dict()` of each item in knocks (list)
_items = []
if self.knocks:
for _item in self.knocks:
if _item:
_items.append(_item.to_dict())
_dict['knocks'] = _items
return _dict
@classmethod
def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
"""Create an instance of ListKnocksResponse from a dict"""
if obj is None:
return None
if not isinstance(obj, dict):
return cls.model_validate(obj)
_obj = cls.model_validate({
"knocks": [Knock.from_dict(_item) for _item in obj["knocks"]] if obj.get("knocks") is not None else None
})
return _obj

View File

@@ -0,0 +1,99 @@
# coding: utf-8
"""
PeerNetService API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
The version of the OpenAPI document: 0.0.1
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
from __future__ import annotations
import pprint
import re # noqa: F401
import json
from pydantic import BaseModel, ConfigDict, Field, StrictStr
from typing import Any, ClassVar, Dict, List, Optional
from openapi_client.models.server import Server
from typing import Optional, Set
from typing_extensions import Self
class Room(BaseModel):
"""
Room
""" # noqa: E501
name: Optional[StrictStr] = None
display_name: Optional[StrictStr] = Field(default=None, alias="displayName")
servers: Optional[List[Server]] = None
__properties: ClassVar[List[str]] = ["name", "displayName", "servers"]
model_config = ConfigDict(
populate_by_name=True,
validate_assignment=True,
protected_namespaces=(),
)
def to_str(self) -> str:
"""Returns the string representation of the model using alias"""
return pprint.pformat(self.model_dump(by_alias=True))
def to_json(self) -> str:
"""Returns the JSON representation of the model using alias"""
# TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> Optional[Self]:
"""Create an instance of Room from a JSON string"""
return cls.from_dict(json.loads(json_str))
def to_dict(self) -> Dict[str, Any]:
"""Return the dictionary representation of the model using alias.
This has the following differences from calling pydantic's
`self.model_dump(by_alias=True)`:
* `None` is only added to the output dict for nullable fields that
were set at model initialization. Other fields with value `None`
are ignored.
"""
excluded_fields: Set[str] = set([
])
_dict = self.model_dump(
by_alias=True,
exclude=excluded_fields,
exclude_none=True,
)
# override the default output from pydantic by calling `to_dict()` of each item in servers (list)
_items = []
if self.servers:
for _item in self.servers:
if _item:
_items.append(_item.to_dict())
_dict['servers'] = _items
return _dict
@classmethod
def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
"""Create an instance of Room from a dict"""
if obj is None:
return None
if not isinstance(obj, dict):
return cls.model_validate(obj)
_obj = cls.model_validate({
"name": obj.get("name"),
"displayName": obj.get("displayName"),
"servers": [Server.from_dict(_item) for _item in obj["servers"]] if obj.get("servers") is not None else None
})
return _obj

View File

@@ -0,0 +1,103 @@
# coding: utf-8
"""
PeerNetService API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
The version of the OpenAPI document: 0.0.1
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
from __future__ import annotations
import pprint
import re # noqa: F401
import json
from pydantic import BaseModel, ConfigDict, Field, StrictStr
from typing import Any, ClassVar, Dict, List, Optional
from openapi_client.models.service import Service
from typing import Optional, Set
from typing_extensions import Self
class Server(BaseModel):
"""
Server
""" # noqa: E501
name: Optional[StrictStr] = None
rooms: Optional[List[StrictStr]] = Field(default=None, description="Tracks which rooms the server should be listed in; this will not be set when a room is listed to preserve privacy of servers.")
auth_token: Optional[StrictStr] = Field(default=None, description="Used to authenticate requests which access knocks for this server or attempt to update, delete or create services. In future calls, add a header with the format: Authorization: Bearer <auth_token>", alias="authToken")
display_name: Optional[StrictStr] = Field(default=None, alias="displayName")
services: Optional[List[Service]] = None
__properties: ClassVar[List[str]] = ["name", "rooms", "authToken", "displayName", "services"]
model_config = ConfigDict(
populate_by_name=True,
validate_assignment=True,
protected_namespaces=(),
)
def to_str(self) -> str:
"""Returns the string representation of the model using alias"""
return pprint.pformat(self.model_dump(by_alias=True))
def to_json(self) -> str:
"""Returns the JSON representation of the model using alias"""
# TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> Optional[Self]:
"""Create an instance of Server from a JSON string"""
return cls.from_dict(json.loads(json_str))
def to_dict(self) -> Dict[str, Any]:
"""Return the dictionary representation of the model using alias.
This has the following differences from calling pydantic's
`self.model_dump(by_alias=True)`:
* `None` is only added to the output dict for nullable fields that
were set at model initialization. Other fields with value `None`
are ignored.
"""
excluded_fields: Set[str] = set([
])
_dict = self.model_dump(
by_alias=True,
exclude=excluded_fields,
exclude_none=True,
)
# override the default output from pydantic by calling `to_dict()` of each item in services (list)
_items = []
if self.services:
for _item in self.services:
if _item:
_items.append(_item.to_dict())
_dict['services'] = _items
return _dict
@classmethod
def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
"""Create an instance of Server from a dict"""
if obj is None:
return None
if not isinstance(obj, dict):
return cls.model_validate(obj)
_obj = cls.model_validate({
"name": obj.get("name"),
"rooms": obj.get("rooms"),
"authToken": obj.get("authToken"),
"displayName": obj.get("displayName"),
"services": [Service.from_dict(_item) for _item in obj["services"]] if obj.get("services") is not None else None
})
return _obj

View File

@@ -0,0 +1,91 @@
# coding: utf-8
"""
PeerNetService API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
The version of the OpenAPI document: 0.0.1
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
from __future__ import annotations
import pprint
import re # noqa: F401
import json
from pydantic import BaseModel, ConfigDict, StrictStr
from typing import Any, ClassVar, Dict, List, Optional
from typing import Optional, Set
from typing_extensions import Self
class Service(BaseModel):
"""
Service
""" # noqa: E501
name: Optional[StrictStr] = None
protocol: Optional[StrictStr] = None
version: Optional[StrictStr] = None
__properties: ClassVar[List[str]] = ["name", "protocol", "version"]
model_config = ConfigDict(
populate_by_name=True,
validate_assignment=True,
protected_namespaces=(),
)
def to_str(self) -> str:
"""Returns the string representation of the model using alias"""
return pprint.pformat(self.model_dump(by_alias=True))
def to_json(self) -> str:
"""Returns the JSON representation of the model using alias"""
# TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> Optional[Self]:
"""Create an instance of Service from a JSON string"""
return cls.from_dict(json.loads(json_str))
def to_dict(self) -> Dict[str, Any]:
"""Return the dictionary representation of the model using alias.
This has the following differences from calling pydantic's
`self.model_dump(by_alias=True)`:
* `None` is only added to the output dict for nullable fields that
were set at model initialization. Other fields with value `None`
are ignored.
"""
excluded_fields: Set[str] = set([
])
_dict = self.model_dump(
by_alias=True,
exclude=excluded_fields,
exclude_none=True,
)
return _dict
@classmethod
def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
"""Create an instance of Service from a dict"""
if obj is None:
return None
if not isinstance(obj, dict):
return cls.model_validate(obj)
_obj = cls.model_validate({
"name": obj.get("name"),
"protocol": obj.get("protocol"),
"version": obj.get("version")
})
return _obj

View File

@@ -0,0 +1,99 @@
# coding: utf-8
"""
PeerNetService API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
The version of the OpenAPI document: 0.0.1
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
from __future__ import annotations
import pprint
import re # noqa: F401
import json
from pydantic import BaseModel, ConfigDict, Field, StrictInt, StrictStr
from typing import Any, ClassVar, Dict, List, Optional
from openapi_client.models.google_protobuf_any import GoogleProtobufAny
from typing import Optional, Set
from typing_extensions import Self
class Status(BaseModel):
"""
The `Status` type defines a logical error model that is suitable for different programming environments, including REST APIs and RPC APIs. It is used by [gRPC](https://github.com/grpc). Each `Status` message contains three pieces of data: error code, error message, and error details. You can find out more about this error model and how to work with it in the [API Design Guide](https://cloud.google.com/apis/design/errors).
""" # noqa: E501
code: Optional[StrictInt] = Field(default=None, description="The status code, which should be an enum value of [google.rpc.Code][google.rpc.Code].")
message: Optional[StrictStr] = Field(default=None, description="A developer-facing error message, which should be in English. Any user-facing error message should be localized and sent in the [google.rpc.Status.details][google.rpc.Status.details] field, or localized by the client.")
details: Optional[List[GoogleProtobufAny]] = Field(default=None, description="A list of messages that carry the error details. There is a common set of message types for APIs to use.")
__properties: ClassVar[List[str]] = ["code", "message", "details"]
model_config = ConfigDict(
populate_by_name=True,
validate_assignment=True,
protected_namespaces=(),
)
def to_str(self) -> str:
"""Returns the string representation of the model using alias"""
return pprint.pformat(self.model_dump(by_alias=True))
def to_json(self) -> str:
"""Returns the JSON representation of the model using alias"""
# TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> Optional[Self]:
"""Create an instance of Status from a JSON string"""
return cls.from_dict(json.loads(json_str))
def to_dict(self) -> Dict[str, Any]:
"""Return the dictionary representation of the model using alias.
This has the following differences from calling pydantic's
`self.model_dump(by_alias=True)`:
* `None` is only added to the output dict for nullable fields that
were set at model initialization. Other fields with value `None`
are ignored.
"""
excluded_fields: Set[str] = set([
])
_dict = self.model_dump(
by_alias=True,
exclude=excluded_fields,
exclude_none=True,
)
# override the default output from pydantic by calling `to_dict()` of each item in details (list)
_items = []
if self.details:
for _item in self.details:
if _item:
_items.append(_item.to_dict())
_dict['details'] = _items
return _dict
@classmethod
def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
"""Create an instance of Status from a dict"""
if obj is None:
return None
if not isinstance(obj, dict):
return cls.model_validate(obj)
_obj = cls.model_validate({
"code": obj.get("code"),
"message": obj.get("message"),
"details": [GoogleProtobufAny.from_dict(_item) for _item in obj["details"]] if obj.get("details") is not None else None
})
return _obj

View File

@@ -0,0 +1,255 @@
# coding: utf-8
"""
PeerNetService API
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
The version of the OpenAPI document: 0.0.1
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
import io
import json
import re
import ssl
import urllib3
from openapi_client.exceptions import ApiException, ApiValueError
SUPPORTED_SOCKS_PROXIES = {"socks5", "socks5h", "socks4", "socks4a"}
RESTResponseType = urllib3.HTTPResponse
def is_socks_proxy_url(url):
if url is None:
return False
split_section = url.split("://")
if len(split_section) < 2:
return False
else:
return split_section[0].lower() in SUPPORTED_SOCKS_PROXIES
class RESTResponse(io.IOBase):
def __init__(self, resp) -> None:
self.response = resp
self.status = resp.status
self.reason = resp.reason
self.data = None
def read(self):
if self.data is None:
self.data = self.response.data
return self.data
def getheaders(self):
"""Returns a dictionary of the response headers."""
return self.response.headers
def getheader(self, name, default=None):
"""Returns a given response header."""
return self.response.headers.get(name, default)
class RESTClientObject:
def __init__(self, configuration) -> None:
# urllib3.PoolManager will pass all kw parameters to connectionpool
# https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75 # noqa: E501
# https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680 # noqa: E501
# Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # noqa: E501
# cert_reqs
if configuration.verify_ssl:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
pool_args = {
"cert_reqs": cert_reqs,
"ca_certs": configuration.ssl_ca_cert,
"cert_file": configuration.cert_file,
"key_file": configuration.key_file,
}
if configuration.assert_hostname is not None:
pool_args['assert_hostname'] = (
configuration.assert_hostname
)
if configuration.retries is not None:
pool_args['retries'] = configuration.retries
if configuration.tls_server_name:
pool_args['server_hostname'] = configuration.tls_server_name
if configuration.socket_options is not None:
pool_args['socket_options'] = configuration.socket_options
if configuration.connection_pool_maxsize is not None:
pool_args['maxsize'] = configuration.connection_pool_maxsize
# https pool manager
self.pool_manager: urllib3.PoolManager
if configuration.proxy:
if is_socks_proxy_url(configuration.proxy):
from urllib3.contrib.socks import SOCKSProxyManager
pool_args["proxy_url"] = configuration.proxy
pool_args["headers"] = configuration.proxy_headers
self.pool_manager = SOCKSProxyManager(**pool_args)
else:
pool_args["proxy_url"] = configuration.proxy
pool_args["proxy_headers"] = configuration.proxy_headers
self.pool_manager = urllib3.ProxyManager(**pool_args)
else:
self.pool_manager = urllib3.PoolManager(**pool_args)
def request(
self,
method,
url,
headers=None,
body=None,
post_params=None,
_request_timeout=None
):
"""Perform requests.
:param method: http request method
:param url: http request url
:param headers: http request headers
:param body: request json body, for `application/json`
:param post_params: request post parameters,
`application/x-www-form-urlencoded`
and `multipart/form-data`
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
"""
method = method.upper()
assert method in [
'GET',
'HEAD',
'DELETE',
'POST',
'PUT',
'PATCH',
'OPTIONS'
]
if post_params and body:
raise ApiValueError(
"body parameter cannot be used with post_params parameter."
)
post_params = post_params or {}
headers = headers or {}
timeout = None
if _request_timeout:
if isinstance(_request_timeout, (int, float)):
timeout = urllib3.Timeout(total=_request_timeout)
elif (
isinstance(_request_timeout, tuple)
and len(_request_timeout) == 2
):
timeout = urllib3.Timeout(
connect=_request_timeout[0],
read=_request_timeout[1]
)
try:
# For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE`
if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']:
# no content type provided or payload is json
content_type = headers.get('Content-Type')
if (
not content_type
or re.search('json', content_type, re.IGNORECASE)
):
request_body = None
if body is not None:
request_body = json.dumps(body)
r = self.pool_manager.request(
method,
url,
body=request_body,
timeout=timeout,
headers=headers,
preload_content=False
)
elif content_type == 'application/x-www-form-urlencoded':
r = self.pool_manager.request(
method,
url,
fields=post_params,
encode_multipart=False,
timeout=timeout,
headers=headers,
preload_content=False
)
elif content_type == 'multipart/form-data':
# must del headers['Content-Type'], or the correct
# Content-Type which generated by urllib3 will be
# overwritten.
del headers['Content-Type']
r = self.pool_manager.request(
method,
url,
fields=post_params,
encode_multipart=True,
timeout=timeout,
headers=headers,
preload_content=False
)
# Pass a `string` parameter directly in the body to support
# other content types than JSON when `body` argument is
# provided in serialized form.
elif isinstance(body, str) or isinstance(body, bytes):
r = self.pool_manager.request(
method,
url,
body=body,
timeout=timeout,
headers=headers,
preload_content=False
)
elif headers['Content-Type'] == 'text/plain' and isinstance(body, bool):
request_body = "true" if body else "false"
r = self.pool_manager.request(
method,
url,
body=request_body,
preload_content=False,
timeout=timeout,
headers=headers)
else:
# Cannot generate the request from given parameters
msg = """Cannot prepare a request message for provided
arguments. Please check that your arguments match
declared content type."""
raise ApiException(status=0, reason=msg)
# For `GET`, `HEAD`
else:
r = self.pool_manager.request(
method,
url,
fields={},
timeout=timeout,
headers=headers,
preload_content=False
)
except urllib3.exceptions.SSLError as e:
msg = "\n".join([type(e).__name__, str(e)])
raise ApiException(status=0, reason=msg)
return RESTResponse(r)