Source code for http_message_signatures.resolvers
import urllib.parse
import http_sfv
from .exceptions import HTTPMessageSignaturesException
from .structures import CaseInsensitiveDict
class HTTPSignatureComponentResolver:
derived_component_names = {
"@method",
"@target-uri",
"@authority",
"@scheme",
"@request-target",
"@path",
"@query",
"@query-params",
"@status",
"@request-response"
}
# TODO: describe interface
def __init__(self, message):
self.message = message
self.message_type = "request"
if hasattr(message, "status_code"):
self.message_type = "response"
self.url = message.url
self.headers = CaseInsensitiveDict(message.headers)
def resolve(self, component_node: http_sfv.Item):
component_id = str(component_node.value)
if component_id.startswith("@"): # derived component
if component_id not in self.derived_component_names:
raise HTTPMessageSignaturesException(f'Unknown covered derived component name {component_id}')
resolver = getattr(self, "get_" + component_id[1:].replace("-", "_"))
return resolver(**component_node.params)
if component_id not in self.headers:
raise HTTPMessageSignaturesException(f'Covered header field "{component_id}" not found in the message')
return self.headers[component_id]
def get_method(self):
if self.message_type == "response":
return self.message.request.method.upper()
return self.message.method.upper()
def get_target_uri(self):
return self.url
def get_authority(self):
return urllib.parse.urlsplit(self.url).netloc.lower()
def get_scheme(self):
return urllib.parse.urlsplit(self.url).scheme.lower()
def get_request_target(self):
return self.get_path() + "?" + self.get_query()
def get_path(self):
return urllib.parse.urlsplit(self.url).path
def get_query(self):
return "?" + urllib.parse.urlsplit(self.url).query
def get_query_params(self, *, name: str):
query = urllib.parse.parse_qs(urllib.parse.urlsplit(self.url).query, keep_blank_values=True)
if name not in query:
raise HTTPMessageSignaturesException(f'Query parameter "{name}" not found in the message URL')
if len(query[name]) != 1:
raise HTTPMessageSignaturesException('Query parameters with multiple values are not supported.')
return query[name][0]
def get_status(self):
if self.message_type != "response":
raise HTTPMessageSignaturesException('Unexpected "@status" component in a request signature')
return str(self.message.status_code)
def get_request_response(self, *, key: str):
# See 2.2.11 Request-Response Signature Binding
# self.message.request.headers["Signature"][key]
raise NotImplementedError()
class HTTPSignatureKeyResolver:
def resolve_public_key(self, key_id: str):
raise NotImplementedError("This method must be implemented by a subclass.")
def resolve_private_key(self, key_id: str):
raise NotImplementedError("This method must be implemented by a subclass.")