{ "cells": [ { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "# default_exp core" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# module name here\n", "\n", "> API details." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "#hide\n", "from nbdev.showdoc import *" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "#export\n", "import json,tweepy,hmac,hashlib\n", "\n", "from pdb import set_trace\n", "from http.server import HTTPServer, BaseHTTPRequestHandler\n", "from fastcore.imports import *\n", "from fastcore.script import *\n", "from configparser import ConfigParser" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "#export\n", "_cfg = ConfigParser(interpolation=None)\n", "_cfg.read(['twitter.ini'])\n", "_cfg = _cfg['DEFAULT']\n", "globals().update(**_cfg)\n", "\n", "_auth = tweepy.OAuthHandler(consumer_key, consumer_secret)\n", "_auth.set_access_token(access_token, access_token_secret)\n", "_api = tweepy.API(_auth)" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "#export\n", "def tweet_text(payload):\n", " \"Send a tweet announcing release based on `payload`\"\n", " rel_json = payload['release']\n", " url = rel_json['url']\n", " owner,repo = re.findall(r'https://api.github.com/repos/([^/]+)/([^/]+)/', url)[0]\n", " tweet_tmpl = \"\"\"New #{repo} release: v{tag_name}. {html_url}\n", "\n", " {body}\"\"\"\n", " res = tweet_tmpl.format(repo=repo, tag_name=rel_json['tag_name'], html_url=rel_json['html_url'], body=rel_json['body'])\n", " if len(res)<=280: return res\n", " return res[:279] + \"…\"" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [], "source": [ "gh_secret = b'something'" ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [], "source": [ "#export\n", "def check_secret(content, headers):\n", " digest = hmac.new(gh_secret, content, hashlib.sha1).hexdigest()\n", " assert f'sha1={digest}' == headers.get('X-Hub-Signature')" ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [], "source": [ "#export\n", "class _RequestHandler(BaseHTTPRequestHandler):\n", " def do_POST(self):\n", " self.send_response(200)\n", " self.end_headers()\n", " content = self.rfile.read(int(self.headers.get('content-length')))\n", " payload = json.loads(content.decode())\n", " if payload['action']=='released':\n", " check_secret(content, self.headers)\n", " tweet = tweet_text(payload)\n", " stat = _api.update_status(tweet)\n", " print(stat.id)\n", " self.wfile.write('ok'.encode(encoding='utf_8'))" ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [], "source": [ "#export\n", "@call_parse\n", "def run_server(hostname: Param(\"Host name or IP\" , str)='localhost',\n", " port: Param(\"Port to listen on\", int)=8000):\n", " \"Run a GitHub webhook server that tweets about new releases\"\n", " print(f\"Listening on {hostname}:{port}\")\n", " with HTTPServer((hostname, port), _RequestHandler) as httpd: httpd.serve_forever()" ] }, { "cell_type": "code", "execution_count": 41, "metadata": {}, "outputs": [], "source": [ "# run_server()" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "scrolled": false }, "outputs": [], "source": [ "#hide\n", "# with HTTPServer(server_address, RequestHandler) as httpd: httpd.handle_request()\n", "# rel = Path('release.json').read_text()\n", "# wh_json = json.loads(Path('ping.json').read_text())\n", "# _api.destroy_status(1311413699366678529);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Export -" ] }, { "cell_type": "code", "execution_count": 42, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Converted 00_core.ipynb.\n", "Converted index.ipynb.\n" ] } ], "source": [ "#hide\n", "from nbdev.export import notebook2script\n", "notebook2script()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.7" }, "toc": { "base_numbering": 1, "nav_menu": {}, "number_sections": false, "sideBar": true, "skip_h1_title": true, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": false, "toc_position": {}, "toc_section_display": true, "toc_window_display": false } }, "nbformat": 4, "nbformat_minor": 2 }