#!/usr/bin/env python import argparse import json import os import re import requests import stat import subprocess import configparser from tld import get_fld CONFIGURATION_FILE = os.path.expanduser('~/') + '.cloudflare-ddns' EXTERNAL_IP_QUERY_APIS = ['https://api.ipify.org', 'https://checkip.amazonaws.com', 'https://v4.ident.me/', 'https://ifconfig.me/ip', 'https://ipv4.icanhazip.com/'] CLOUDFLARE_ZONE_QUERY_API = 'https://api.cloudflare.com/client/v4/zones' # GET CLOUDFLARE_ZONE_DNS_RECORDS_QUERY_API = 'https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records' # GET CLOUDFLARE_ZONE_DNS_RECORDS_UPDATE_API = 'https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{dns_record_id}' # PATCH # Backwards compatible with Python 2 try: input = raw_input except NameError: pass def load_arguments(): """ Arguments to the program. :return: An objects with argument name properties """ parser = argparse.ArgumentParser() parser.add_argument('--configure', action='store_true', help='Interactively configure the account and domain for the DDNS updates.') parser.add_argument('--update-now', action='store_true', help='Update DNS records right now.') parser.add_argument('--debug', action='store_true', help='Print detailed debug output.') return parser.parse_args() START_ARGS = load_arguments() def load_configuration(): """ Loads the configuration file from disk. :return: A dictionary that either has all the keys read from the configuration file, or an empty dictionary if there was an error reading the file. """ try: # Attempt to parse configuration file config = configparser.ConfigParser() config.read(CONFIGURATION_FILE) # Ensure all fields are present if 'domains' in config['Cloudflare DDNS']: if config['Cloudflare DDNS']['auth_type'] == 'token': if 'api_token' not in config['Cloudflare DDNS'] and os.environ.get('API_TOKEN'): config['Cloudflare DDNS']['api_token'] = os.environ.get('API_TOKEN') return config['Cloudflare DDNS'] elif config['Cloudflare DDNS']['auth_type'] == 'key' and all([key in config['Cloudflare DDNS'] for key in ['email', 'api_key']]): return config['Cloudflare DDNS'] else: print('Configuration file {config_file} is missing parameters. Run cloudflare-ddns --configure' ' to set the configuration.'.format(config_file=CONFIGURATION_FILE)) return {} except KeyError: print('Configuration file {config_file} not found or invalid! Did you run cloudflare-ddns --configure?' .format(config_file=CONFIGURATION_FILE)) return {} def initialize_configuration(): """ Initializes the configuration file via an interactive shell. :return: None, but writes the data to CONFIGURATION_FILE """ config = configparser.ConfigParser() config['Cloudflare DDNS'] = load_configuration() print('=============Configuring CloudFlare automatic DDNS update client=============') print('You may rerun this at any time with cloudflare-ddns --configure') print('Quit and cancel at any time with Ctrl-C\n') auth_type = None change_token = 'y' if os.path.isfile(CONFIGURATION_FILE): while True: change_token = input('Change API token/key (y/n): ') change_token = change_token.strip().lower() if change_token in {'y', 'n', 'yes', 'no'}: break if change_token in {'n', 'no'}: auth_type = config['Cloudflare DDNS']['auth_type'] while not auth_type: auth_type_input = input('Use API token or API key to authenticate?\n' 'See https://dash.cloudflare.com/profile/api-tokens for more info.\n' 'Choose [T]oken or [K]ey: ') if auth_type_input.strip().lower() in ['t', 'token']: auth_type = "token" elif auth_type_input.strip().lower() in ['k', 'key']: auth_type = "key" if auth_type == "token": if change_token in {'y', 'yes'}: config['Cloudflare DDNS']['auth_type'] = 'token' api_token = input('Enter the API token you created at https://dash.cloudflare.com/profile/api-tokens.\n' 'Required permissions are READ Account.Access: Organizations, Identity Providers, and Groups; ' 'READ Zone.Zone; EDIT Zone.DNS\nExample: XhtcWvLmWBFGRW-YSK52WBghKtfC40rtuysLAETs\n' 'CloudFlare API token: ') config['Cloudflare DDNS']['api_token'] = api_token if config.has_option('Cloudflare DDNS', 'email'): config.remove_option('Cloudflare DDNS', 'email') if config.has_option('Cloudflare DDNS', 'api_key'): config.remove_option('Cloudflare DDNS', 'api_key') elif auth_type == "key": if change_token in {'y', 'yes'}: config['Cloudflare DDNS']['auth_type'] = 'key' email = input('\nEnter the email address associated with your CloudFlare account.\nExample: developer@kevinlin.info\nEmail: ') api_key = input('\nEnter the API key associated with your CloudFlare account. You can find your API key at ' 'https://dash.cloudflare.com/profile/api-tokens\n' 'Example: 7d9dfl2fid74lsg50saa9j2dbqm67zn39v673\nCloudFlare API key: ') config['Cloudflare DDNS']['email'] = email config['Cloudflare DDNS']['api_key'] = api_key if config.has_option('Cloudflare DDNS', 'api_token'): config.remove_option('Cloudflare DDNS', 'api_token') else: raise RuntimeError("Invalid auth type provided!") domains = input('Enter the domains for which you would like to automatically update the DNS records, ' 'delimited by a single comma.\nExample: kevinlin.info,cloudflaremanager.com\n' 'Comma-delimited domains: ') config['Cloudflare DDNS']['domains'] = domains with open(CONFIGURATION_FILE, 'w') as config_file: config.write(config_file) os.chmod(CONFIGURATION_FILE, stat.S_IREAD | stat.S_IWRITE) print('\nConfiguration file written to {config_file} successfully.'.format(config_file=CONFIGURATION_FILE)) def get_external_ip(): """ Get the external IP of the network the script where the script is being executed. :return: A string representing the network's external IP address """ for api in EXTERNAL_IP_QUERY_APIS: try: if START_ARGS.debug: print("Fetching {}".format(api)) r = requests.get(api, timeout=6) return r.text.strip() # Fix bug with trailing whitespace, Fixes #39 except requests.exceptions.RequestException: print('Cannot fetch your external ip. {} not reachable.'.format(api)) print("All {} checked services did not return any external ip address. Please check your internet connection." .format(len(EXTERNAL_IP_QUERY_APIS))) return def get_ipv6(): """ Based on: https://gist.github.com/corny/7a07f5ac901844bd20c9 :return: A string representing one of the network's IPv6 addresses """ # Some network configurations might allow the gateway to provide unique local addresses(ULA) to the devices on the LAN. # In this case, one network adapter could have two non-temporary IPv6 addresses: # The IPv6 Address on the Internet(2000::/3) and The unique local address. # The second one cannot be accessed outside, it is designed to provide convenience on accessing devices on the same network. # The ULA prefix is statically configured by the gateway, which means devices on the same local network could have # a fixed IPv6 address locally even though the IPv6 address prefix on the Internet might change over time. # According to rfc 4193, the IPv6 address range fc00::/7 is reserved for this kind of purpose. def is_ula_addr(ipAddr): try: prefixValue = int(ipAddr[:ipAddr.find(':', 0)], 16) return (prefixValue & 0xfe00 ) == 0xfc00 except ValueError as e: return False inet6_finder = re.compile('^ inet6 ([0-9a-f:]+)') for line in subprocess.check_output(['ip', '-6', 'addr', 'list', 'scope', 'global', '-deprecated']).decode('utf-8').split('\n'): match = inet6_finder.match(line) if match is not None: # Multiple address might be present, assuming the first one is the best ipv6_addr = match.group(1) if not is_ula_addr(ipv6_addr): return ipv6_addr return None def update_dns_record(auth, zone_id, record, ip_address): if record is None or ip_address is None: return print('Updating the {type} record (ID {dns_record_id}) of (sub)domain {subdomain} (ID {zone_id}) to {ip_address}.' .format(type=record['type'], dns_record_id=record['id'], zone_id=zone_id, subdomain=record['name'], ip_address=ip_address)) if record['content'] == ip_address: print('DNS record is already up-to-date; taking no action') return update_resp = requests.patch( CLOUDFLARE_ZONE_DNS_RECORDS_UPDATE_API.format(zone_id=zone_id, dns_record_id=record['id']), headers=dict(list(auth.items()) + [('Content-Type', 'application/json')]), data=json.dumps({'content': ip_address}), timeout=6, ) if update_resp.json().get('success'): print('DNS record updated successfully!') else: print('DNS record failed to update.\nCloudFlare returned the following errors: {errors}.\n\n' 'CloudFlare returned the following messages: {messages}'.format(errors=update_resp.json()['errors'], messages=update_resp.json()['messages'])) def update_dns(subdomain, auth, ipv4_address, ipv6_address): """ Updates the specified domain with the given IP address, given authentication parameters. :param subdomain: String representing domain to update :param auth: Dictionary of API authentication credentials :param ipv4_address: IPv4 address with which to update the A record :param ipv6_address: IPv6 address with which to update the AAAA record :return: None """ # Extract the domain domain = get_fld(subdomain, fix_protocol=True) # Find the zone ID corresponding to the domain cur_page = 1 zone_names_to_ids = {} print('Listing all zones.') while True: zone_resp = requests.get(CLOUDFLARE_ZONE_QUERY_API, headers=auth, timeout=6, params={'per_page': 50, 'page': cur_page}) if zone_resp.status_code != 200: try: errors = [x.get("message") for x in zone_resp.json()["errors"]] print('===================================') print('Request failed.') print('Check your permissions and API key.') if START_ARGS.debug: print('Status code: {}'.format(zone_resp.status_code)) print('Text: "{}"'.format(zone_resp.text)) else: print('Use --debug for more details') for e in [x for x in errors if x]: print(e) # this will print detailed info about missing permission etc. # e.g. "requires permission 'com.cloudflare.api.account.zone.list' to list zones" except KeyError: print('Authentication error: make sure your email and API key are correct. ' 'To set new values, run cloudflare-ddns --configure') return data = zone_resp.json() total_pages = data['result_info']['total_pages'] for zone in data['result']: zone_names_to_ids[zone['name']] = zone['id'] if cur_page < total_pages: cur_page += 1 else: break if domain not in zone_names_to_ids: print('The domain {domain} doesn\'t appear to be one of your CloudFlare domains. We only found {domain_list}.' .format(domain=domain, domain_list=map(str, zone_names_to_ids.keys()))) return zone_id = zone_names_to_ids[domain] # Find DNS records print('Finding all DNS records for domain "{}".'.format(subdomain)) record_a = None record_aaaa = None r = requests.get( CLOUDFLARE_ZONE_DNS_RECORDS_QUERY_API.format(zone_id=zone_id), headers=auth, params={'name': subdomain}, timeout=6, ) if r.status_code != 200: try: errors = [x.get("message") for x in r.json()["errors"]] print('===================================') print('Request failed.') print('Check your permissions and API key.') if START_ARGS.debug: print('Status code: {}'.format(r.status_code)) print('Text: "{}"'.format(r.text)) else: print('Use --debug for more details') for e in [x for x in errors if x]: print(e) # this will print detailed info about missing permission etc. # e.g. "requires permission 'com.cloudflare.api.account.zone.list' to list zones" except KeyError: print('Authentication error: make sure your email and API key are correct. To set new values, run cloudflare-ddns --configure') return for dns_record in r.json()['result']: if dns_record['type'] == 'A': record_a = dns_record elif dns_record['type'] == 'AAAA': record_aaaa = dns_record if record_a is None and record_aaaa is None: print('No A or AAAA records defined for domain "{}".'.format(subdomain)) if START_ARGS.debug: print(r.json()) else: print('Use --debug for more details') return # Update the record as necessary update_dns_record(auth, zone_id, record_a, ipv4_address) update_dns_record(auth, zone_id, record_aaaa, ipv6_address) def main(): """ Main program: either make the configuration file or update the DNS """ if START_ARGS.configure: initialize_configuration() elif START_ARGS.update_now: config = load_configuration() if not config: raise RuntimeError('There was a problem with the configuration file {config_file}! ' 'Try running cloudflare-ddns --configure'.format(config_file=CONFIGURATION_FILE)) if config['auth_type'] == 'token': auth = {'Authorization': 'Bearer {token}'.format(token=config['api_token'])} elif config['auth_type'] == 'key': auth = {'X-Auth-Email': config['email'], 'X-Auth-Key': config['api_key']} else: raise RuntimeError('There was a problem with the configuration file {config_file}! ' 'Try running cloudflare-ddns --configure'.format(config_file=CONFIGURATION_FILE)) external_ip = get_external_ip() if external_ip: print('Found external IPv4: "{}"'.format(str(external_ip))) ipv6 = get_ipv6() if ipv6: print('Found external IPv6: "{}"'.format(str(ipv6))) if not external_ip and not ipv6: raise RuntimeError("Neither external IPv4 nor IPv6 found. Please check your internet connection.") try: domains = config['domains'].split(',') except KeyError: raise RuntimeError('There was a problem with the configuration file {config_file}! ' 'Try running cloudflare-ddns --configure'.format(config_file=CONFIGURATION_FILE)) if not domains: raise RuntimeError("No domains found to update.") print("Handling domains: {}.".format(domains)) for domain in domains: update_dns(domain, auth, external_ip, ipv6) else: print('No arguments passed; exiting.') print('Try cloudflare-ddns --help.') if __name__ == '__main__': main()