I wrote a script to break through OAuth authentication in Python and get an access token. I would like to add explanations later.
I referred to https://qiita.com/kai_kou/items/d03abd6012f32071c1aa.
oauth_authenticator
from http.server import HTTPServer
import ssl
from webbrowser import open_new
import random
import string
import urllib.parse
from access_token_request_handler import AccessTokenRequestHandler
class OAuthAuthenticator:
    def __init__(self, client_credential, client_info, authorize_url):
        #Credential reading
        self._client_credential = client_credential
        self._client_info = client_info
        self._authorize_url = authorize_url
        self._authorization_result = None
        self._app_uri = 'https://%s:%s' % (client_info['host'], client_info['port'])
        #Certificate reading
    def get_access_token(self):
        token = None
        params = {
            'client_id': self._client_credential['id'],
            'grant_type': 'authorization_code',
            'redirect_uri': self._app_uri,
            'response_type': 'code',
            'state': self.__randomname(40)
        }
        access_url = urllib.parse.urljoin(self._authorize_url, 'authorize')
        access_url = '?'.join([access_url, urllib.parse.urlencode(params)])
        #Authorization code request
        self.__request_authorization_code(access_url)
        #Token request
        handler = lambda request, address, server: AccessTokenRequestHandler(
            request, address, server, self._client_credential, self._app_uri, self._authorize_url
        )
        with HTTPServer((self._client_info['host'], self._client_info['port']), handler) as server:
            print('Server Starts - %s:%s' % (self._client_info['host'], self._client_info['port']))
            server.socket = self.__wrap_socket_ssl(server.socket)
            try:
                while token is None:
                    server.result = None
                    server.handle_request()
                    token = server.result
            except KeyboardInterrupt:
                pass
        print('Server Stops - %s:%s' % (self._client_info['host'], self._client_info['port']))
    def result(self):
        return self._authorization_result
    def __request_authorization_code(self, access_url):
        open_new(access_url)
    def __wrap_socket_ssl(self, socket):
        context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
        context.load_cert_chain('./ssl_test.crt', keyfile='./ssl_test.key')
        context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
        return context.wrap_socket(socket)
    def __randomname(self, n):
        randlst = [random.choice(string.ascii_letters + string.digits) for i in range(n)]
        return ''.join(randlst)
access_token_request_handler.py
from http.server import BaseHTTPRequestHandler
import urllib.parse
import requests
class AccessTokenRequestHandler(BaseHTTPRequestHandler):
    def __init__(self, request, address, server, client_credential, client_url, authorize_url):
        self._client_credential = client_credential
        self._authorize_url = authorize_url
        self._client_url = client_url
        super().__init__(request, address, server)
    def do_GET(self):
        self.__responde_200()
        #Get access token
        if 'code' in self.path:
            response = self.__request_access_token()
            self.server.result = response
            self.__write_response_message(response)
            return
            
        self.wfile.write(bytes('failed to get authorization code.', 'utf-8'))
        return
    def __write_response_message(self, response):
        print('status code:', response.status_code)
        self.__wfwrite('Status Code: %s' % response.status_code)
        print('response:', response.reason)
        self.__wfwrite('Response: %s' % response.reason)
        print(response.json())
        if response.ok:
            self.__wfwrite('<br>'.join(['%s: %s' % val for val in response.json().items()]))
        else:
            self.__wfwrite(response.json()['errors'][0]['message'])
    
    def __wfwrite(self, string):
        self.wfile.write(bytes('<p>%s</p>' % string, 'utf-8'))
    def __responde_200(self):
        self.send_response(200)
        self.end_headers()
    def __request_access_token(self):
        params = self.__params_from_path()
        access_url = urllib.parse.urljoin(self._authorize_url, 'token')
        post_params = {
            'client_id': self._client_credential['id'],
            'client_secret': self._client_credential['secret'],
            'grant_type': 'authorization_code',
            'redirect_uri': self._client_url,
            'code': params['code']
        }
        # The redirect URI is missing or do not match
        # Code doesn't exist or is invalid for the client
        response = requests.post(access_url, data=post_params, verify=False) # WARN: verify=False
        return response
    def __params_from_path(self):
        query = urllib.parse.urlparse(self.path).query
        params = urllib.parse.parse_qs(query)
        return params
main.py
from oauth_authenticator import OAuthAuthenticator
AUTHORIZATION_URL = 'https://0.0.0.0/oauth/v2/'
CLIENT_ID = '1_5j8ecbsu9cowo4wk8kwwcc8k8wc08c8o4sgo4s084cg880ggo0'
CLIENT_SECRET = '172h8p6mevy8w8cggc44gw4w4ookk4ockg440osggkw808c00g'
APP_URI = 'https://0.0.0.0:8888'
APP_HOST = '0.0.0.0'
APP_PORT = 8888
if __name__ == '__main__':
    client_info = (APP_HOST, APP_PORT)
    authenticator = OAuthAuthenticator(
        {'id': CLIENT_ID, 'secret': CLIENT_SECRET},
        {'host': '0.0.0.0', 'port': 8888},
        AUTHORIZATION_URL
    )
    authenticator.get_access_token()
    print('result', authenticator.result())
I used it as a reference
Recommended Posts