#!/usr/bin/env python3 # # DISCLAIMER: THIS SCRIPT COMES WITH NO WARRANTY OR GUARANTEE # OF ANY KIND. # # DISCLAIMER 2: THIS TOOL USES A CEPH FEATURE MARKED "(developers only)" # YOU SHOULD NOT RUN THIS UNLESS YOU KNOW EXACTLY HOW THOSE # FUNCTIONALITIES WORK. # # upmap-remapped.py # # Usage (print only): ./upmap-remapped.py # Usage (production): ./upmap-remapped.py | sh # # Optional to ignore PGs that are backfilling and not backfill+wait: # Usage: ./upmap-remapped.py --ignore-backfilling # # This tool will use ceph's pg-upmap-items functionality to # quickly modify all PGs which are currently remapped to become # active+clean. I use it in combination with the ceph-mgr upmap # balancer and the norebalance state for these use-cases: # # - Change crush rules or tunables. # - Adding capacity (add new host, rack, ...). # # In general, the correct procedure for using this script is: # # 1. Backup your osdmaps, crush maps, ... # 2. Set the norebalance flag. # 3. Make your change (tunables, add osds, etc...) # 4. Run this script a few times. (Remember to | sh) # 5. Cluster should now be 100% active+clean. # 6. Unset the norebalance flag. # 7. The ceph-mgr balancer in upmap mode should now gradually # remove the upmap-items entries which were created by this # tool. # # Hacked by: Dan van der Ster import json, subprocess, sys def get_command_output(command): result = subprocess.run(command, capture_output=True, universal_newlines=True, check=True, shell=True) return result.stdout try: import rados cluster = rados.Rados(conffile='/etc/ceph/ceph.conf') cluster.connect() except: use_shell = True else: use_shell = False def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) try: if use_shell: OSDS = json.loads(get_command_output('ceph osd ls -f json | jq -r .')) DF = json.loads(get_command_output('ceph osd df -f json | jq -r .nodes')) else: cmd = {"prefix": "osd ls", "format": "json"} ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5) output = output.decode('utf-8').strip() OSDS = json.loads(output) cmd = {"prefix": "osd df", "format": "json"} ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5) output = output.decode('utf-8').strip() DF = json.loads(output)['nodes'] except ValueError: eprint('Error loading OSD IDs') sys.exit(1) ignore_backfilling = False for arg in sys.argv[1:]: if arg == "--ignore-backfilling": eprint ("All actively backfilling PGs will be ignored.") ignore_backfilling = True def crush_weight(id): for o in DF: if o['id'] == id: return o['crush_weight'] * o['reweight'] return 0 def gen_upmap(up, acting, replicated=False): assert(len(up) == len(acting)) # Create mappings needed to make the PG clean mappings = [(u, a) for u, a in zip(up, acting) if u != a and u in OSDS and crush_weight(a) > 0] # Remove indirect mappings on replicated pools # e.g. ceph osd pg-upmap-items 4.5fd 603 383 499 804 804 530 & if replicated: p = list(mappings) u = set([x[0] for x in p]) a = set([x[1] for x in p]) mappings = list(zip(u-a, a-u)) # Order the mappings on erasure-coded pools so that data is moved off an osd # before it is moved on to it. # e.g. ceph osd pg-upmap-items 15.c9 714 803 929 714 else: # Handle the situation where the src and dst of one mapping matches the dst # and src of another. Example: (314, 272) & (272, 314) for (x, y) in mappings: if (y, x) in mappings: mappings.remove((x, y)) mappings.remove((y, x)) # Do multiple passes of a modified bubble sort to order the mappings so that # data is moved off an OSD before it is moved on to it. Stop when no # mappings are swapped. while True: swapped = False for i in range(len(mappings)-1): for j in range(i+1, len(mappings)): if mappings[j][0] == mappings[i][1] and mappings[j][1] != mappings[i][0]: mappings[i], mappings[j] = mappings[j], mappings[i] swapped = True if not swapped: break return mappings def upmap_pg_items(pgid, mapping): if len(mapping): print('ceph osd pg-upmap-items %s ' % pgid, end='') for pair in mapping: print('%s %s ' % pair, end='') print('&') def rm_upmap_pg_items(pgid): print('ceph osd rm-pg-upmap-items %s &' % pgid) # start here # discover remapped pgs try: if use_shell: remapped_json = get_command_output('ceph pg ls remapped -f json | jq -r .') else: cmd = {"prefix": "pg ls", "states": ["remapped"], "format": "json"} ret, output, err = cluster.mon_command(json.dumps(cmd), b'', timeout=5) remapped_json = output.decode('utf-8').strip() try: remapped = json.loads(remapped_json)['pg_stats'] except KeyError: eprint("There are no remapped PGs") sys.exit(0) except ValueError: eprint('Error loading remapped pgs') sys.exit(1) # discover existing upmaps try: if use_shell: osd_dump_json = get_command_output('ceph osd dump -f json | jq -r .') else: cmd = {"prefix": "osd dump", "format": "json"} ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5) osd_dump_json = output.decode('utf-8').strip() upmaps = json.loads(osd_dump_json)['pg_upmap_items'] except ValueError: eprint('Error loading existing upmaps') sys.exit(1) # discover pools replicated or erasure pool_type = {} try: if use_shell: osd_pool_ls_detail = get_command_output('ceph osd pool ls detail') else: cmd = {"prefix": "osd pool ls", "detail": "detail", "format": "plain"} ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5) osd_pool_ls_detail = output.decode('utf-8').strip() for line in osd_pool_ls_detail.split('\n'): if 'pool' in line: x = line.split(' ') pool_type[x[1]] = x[3] except: eprint('Error parsing pool types') sys.exit(1) # discover if each pg is already upmapped has_upmap = {} for pg in upmaps: pgid = str(pg['pgid']) has_upmap[pgid] = True # handle each remapped pg print(r'while ceph status | grep -q "peering\|activating\|laggy"; do sleep 2; done') num = 0 for pg in remapped: if num == 50: print(r'wait; sleep 4; while ceph status | grep -q "peering\|activating\|laggy"; do sleep 2; done') num = 0 if ignore_backfilling: if "backfilling" in pg['state']: continue pgid = pg['pgid'] try: if has_upmap[pgid]: rm_upmap_pg_items(pgid) num += 1 continue except KeyError: pass up = pg['up'] acting = pg['acting'] pool = pgid.split('.')[0] if pool_type[pool] == 'replicated': try: pairs = gen_upmap(up, acting, replicated=True) except: continue elif pool_type[pool] == 'erasure': try: pairs = gen_upmap(up, acting) except: continue else: eprint('Unknown pool type for %s' % pool) sys.exit(1) upmap_pg_items(pgid, pairs) num += 1 print(r'wait; sleep 4; while ceph status | grep -q "peering\|activating\|laggy"; do sleep 2; done') cluster.shutdown()