{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# default_exp core" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# core\n", "\n", "> API details." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "import json,tweepy,hmac,hashlib,traceback,shutil,time,fcntl,re\n", "\n", "from fastcore.imports import *\n", "from fastcore.foundation import *\n", "from fastcore.utils import *\n", "from fastcore.script import *\n", "from fastcore.meta import *\n", "from fastcore.test import *\n", "from configparser import ConfigParser\n", "from ipaddress import ip_address,ip_network\n", "from socketserver import ThreadingTCPServer\n", "from fastcgi.http import MinimalHTTPHandler\n", "from fastcgi import ReuseThreadingServer\n", "from ghapi.all import GhApi\n", "\n", "from textwrap import dedent" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## fastwebhook server" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def clean_tweet_body(body):\n", " \"Cleans links and sets proper @'s in the tweet body\"\n", " links = re.findall(r'\\[([^\\]]+)\\]\\(([^)]+)\\)', body)\n", " for issue, link in links:\n", " str_replace = \"\"\n", " if \"@\" in issue:\n", " str_replace = issue[1:]\n", " username = GhApi().users.get_by_username(str_replace).twitter_username\n", " if username: str_replace = f\"@{username}\"\n", " original_link = f\"[{issue}]({link})\"\n", " else: original_link = f\" ([{issue}]({link}))\"\n", " body = body.replace(original_link, str_replace)\n", " body = body.replace(\"### \", \"\")\n", " return body" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#hide\n", "body = '''### New Features\n", "- Some dummy feature ([#1234](https://github.com/user/repo/link-to-pr))\n", "\n", "### Bugs Squashed\n", "- Some dummy bugfix ([#2345](https://github.com/user/repo/linktoissue)), thanks to [@jph00](https://github.com/jph00)'''\n", "cleaned_body = '''New Features\n", "- Some dummy feature\n", "\n", "Bugs Squashed\n", "- Some dummy bugfix, thanks to @jeremyphoward'''\n", "\n", "test_eq(clean_tweet_body(body), cleaned_body)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#hide\n", "body = '''### New Features\n", "- Some dummy feature ([#1234](https://github.com/user/repo/link-to-pr))\n", "\n", "### Bugs Squashed\n", "- Some dummy bugfix ([#2345](https://github.com/user/repo/linktoissue)), thanks to [@fastai](https://github.com/fastai)'''\n", "cleaned_body = '''New Features\n", "- Some dummy feature\n", "\n", "Bugs Squashed\n", "- Some dummy bugfix, thanks to fastai'''\n", "\n", "test_eq(clean_tweet_body(body), cleaned_body)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def tweet_text(payload):\n", " \"Send a tweet announcing release based on `payload`\"\n", " rel_json = payload['release']\n", " url = rel_json['url']\n", " owner,repo = re.findall(r'https://api.github.com/repos/([^/]+)/([^/]+)/', url)[0]\n", " tweet_tmpl = \"New #{repo} release: v{tag_name}. {html_url}\\n\\n{body}\"\n", " res = tweet_tmpl.format(repo=repo, tag_name=rel_json['tag_name'],\n", " html_url=rel_json['html_url'], body=clean_tweet_body(rel_json['body']))\n", " if len(res)<=280: return res\n", " return res[:279] + \"…\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def check_sig(content, headers, secret):\n", " digest = hmac.new(secret, content, hashlib.sha1).hexdigest()\n", " if f'sha1={digest}' == headers.get('X-Hub-Signature'): return True\n", " self.wfile.write(b'mismatch')\n", " return False" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "class _RequestHandler(MinimalHTTPHandler):\n", " def _post(self):\n", " assert self.command == 'POST'\n", " if self.server.check_ip:\n", " src_ip = re.split(', *', self.headers.get('X-Forwarded-For', ''))[0] or self.client_address[0]\n", " src_ip = ip_address(src_ip)\n", " assert any((src_ip in wl) for wl in self.server.whitelist)\n", " self.send_response(200)\n", " self.end_headers()\n", " length = self.headers.get('content-length')\n", " if not length: return\n", " content = self.rfile.read(int(length))\n", " if self.server.debug:\n", " print(self.headers, content)\n", " return\n", " payload = json.loads(content.decode())\n", " if payload.get('action',None)!='released': return\n", " if not check_sig(content, self.headers, self.server.gh_secret): return\n", " tweet = tweet_text(payload)\n", " stat = self.server.api.update_status(tweet)\n", " self.wfile.write(f'{stat.id}'.encode())\n", " self.wfile.write(b'ok')\n", "\n", " def handle(self):\n", " try: self._post()\n", " except Exception as e: sys.stderr.write(traceback.format_exc())\n", "\n", " def log_message(self, fmt, *args): sys.stderr.write(fmt%args)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def reconfig(s):\n", " if hasattr(s, 'reconfigure'): return s.reconfigure(line_buffering=True)\n", " try:\n", " fl = fcntl.fcntl(s.fileno(), fcntl.F_GETFL)\n", " fl |= os.O_SYNC\n", " fcntl.fcntl(s.fileno(), fcntl.F_SETFL, fl)\n", " except io.UnsupportedOperation: pass" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "@call_parse\n", "def run_server(\n", " hostname:str='localhost', # Host name or IP\n", " port:int=8000, # Port to listen on\n", " debug:bool_arg=False, # If True, do not trigger actions, just print\n", " inifile:str='twitter.ini', # Path to settings ini file\n", " check_ip:bool_arg=True, # Check source IP against GitHub list\n", " single_request:bool_arg=False # Handle one request\n", "):\n", " \"Run a GitHub webhook server that tweets about new releases\"\n", " assert os.path.exists(inifile), f\"{inifile} not found\"\n", " cfg = ConfigParser(interpolation=None)\n", " cfg.read([inifile])\n", " cfg = cfg['DEFAULT']\n", " auth = tweepy.OAuthHandler(cfg['consumer_key'], cfg['consumer_secret'])\n", " auth.set_access_token(cfg['access_token'], cfg['access_token_secret'])\n", " os.environ['PYTHONUNBUFFERED'] = '1'\n", " print(f\"Listening on {(hostname,port)}\")\n", "\n", " with ReuseThreadingServer((hostname, port), _RequestHandler) as httpd:\n", " httpd.gh_secret = bytes(cfg['gh_secret'], 'utf-8')\n", " httpd.api = tweepy.API(auth)\n", " httpd.whitelist = L(urljson('https://api.github.com/meta')['hooks']).map(ip_network)\n", " httpd.check_ip,httpd.debug = check_ip,debug\n", " if single_request: httpd.handle_request()\n", " else:\n", " try: httpd.serve_forever()\n", " except KeyboardInterrupt: print(\"Closing\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Listening on ('localhost', 8000)\n" ] } ], "source": [ "time.sleep(0.5) # wait for previous server to stop\n", "threaded(partial(run_server, check_ip=False, debug=True, single_request=True))();" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Accept-Encoding: identity\n", "Content-Type: application/x-www-form-urlencoded\n", "Content-Length: 6\n", "Host: localhost:8000\n", "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9\n", "Accept-Language: en-US,en;q=0.9\n", "Cache-Control: max-age=0\n", "Sec-Fetch-Dest: document\n", "Sec-Fetch-Mode: navigate\n", "Sec-Fetch-Site: none\n", "Sec-Fetch-User: ?1\n", "Upgrade-Insecure-Requests: 1\n", "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36\n", "Connection: close\n", "\n", " b'spam=1'\n" ] }, { "data": { "text/plain": [ "''" ] }, "execution_count": null, "metadata": {}, "output_type": "execute_result" } ], "source": [ "time.sleep(0.5)\n", "urlread(\"http://localhost:8000\", spam=1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Installer" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "@call_parse\n", "def fastwebhook_install_service(\n", " hostname:str='0.0.0.0', # Host name or IP\n", " port:int=8000, # Port to listen on\n", " inifile:str='twitter.ini', # Path to settings ini file\n", " check_ip:bool_arg=True, # Check source IP against GitHub list\n", " service_path:str=\"/etc/systemd/system/\" # Directory to write service file to\n", "):\n", " \"Install fastwebhook as a service\"\n", " script_loc = shutil.which('fastwebhook')\n", " inifile = Path(inifile).absolute()\n", " _unitfile = dedent(f\"\"\"\n", " [Unit]\n", " Description=fastwebhook\n", " Wants=network-online.target\n", " After=network-online.target\n", "\n", " [Service]\n", " ExecStart={script_loc} --inifile {inifile} --check_ip {check_ip} --hostname {hostname} --port {port}\n", " Restart=always\n", "\n", " [Install]\n", " WantedBy=multi-user.target\"\"\")\n", " Path(\"fastwebhook.service\").write_text(_unitfile)\n", " run(f\"sudo cp fastwebhook.service {service_path}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This `fastcore.script` CLI installs `fastwebhook` as a `systemd` service. Run `fastwebhook_install_service --help` in your terminal for options." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Export -" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Converted 00_core.ipynb.\n", "Converted index.ipynb.\n" ] } ], "source": [ "#hide\n", "from nbdev.export import notebook2script\n", "notebook2script()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }