#!/usr/bin/env python3 """ Ceph balancer. (c) 2020-2024 Jonas Jelten GPLv3 or later """ import argparse import binascii import datetime import itertools import io import json import logging import lzma import math import re import shlex import socket import statistics import struct import subprocess import sys import time import uuid from enum import Enum from collections import defaultdict from functools import lru_cache from itertools import chain, zip_longest from pprint import pformat, pprint from typing import Optional, Callable, Dict, List, Tuple def parse_args(): cli = argparse.ArgumentParser() cli.add_argument("-v", "--verbose", action="count", default=0, help="increase program verbosity") cli.add_argument("-q", "--quiet", action="count", default=0, help="decrease program verbosity") cli.add_argument("--profile", action="store_true", help=("activate the performance profiler for the balancer itself")) sp = cli.add_subparsers(dest='mode') sp.required = True ### parsers used in subcommands statep = argparse.ArgumentParser(add_help=False) statep.add_argument("--state", "-s", help="load cluster state from this jsonfile") osdsizep = argparse.ArgumentParser(add_help=False) osdsizep.add_argument('--osdsize', choices=['device', 'weighted', 'crush'], default="crush", help=("what parameter to take for determining the osd size. default: %(default)s. " "device=device_size, weighted=devsize*weight, crush=crushweight*weight")) # upmap item filtering upmapignorep = argparse.ArgumentParser(add_help=False) upmapignorep.add_argument('--ignore-state-upmaps', action='store_true', help='pretend the cluster had no upmap items at all') # for experiments with generated movements upmapp = argparse.ArgumentParser(add_help=False, parents=[upmapignorep]) upmapp.add_argument('--add-upmaps', help=('after loading cluster state, simulate application of pg movements' 'from an output generated by "balance"')) upmapp.add_argument('--emit-ignored-upmaps', action='store_true', help='return the ignored statemaps as "reverted" when generating new upmaps (e.g. in balancing)') predictionp = argparse.ArgumentParser(add_help=False) predictionp.add_argument('--avail-prediction', choices=['weight', 'limiting'], default='limiting', help=("algorithm to use for available pool size prediction. default: %(default)s\n" "weight: use ceph's original prediction based on crush weights. " "it's unfortunately wrong for multi-take crush rules, " "and it doesn't respect the current pg placements at all.\n" "limiting: determine space by finding the pool's limiting osd and extrapolate possible usage.")) usedestimatep = argparse.ArgumentParser(add_help=False) usedestimatep.add_argument('--osdused', choices=["delta", "shardsum"], default="shardsum", help=('how is the osd usage predicted during simulation? default: %(default)s. ' "delta: adjust the builtin osd usage report by in-move pg deltas - more accurate but doesn't account pending data deletion.\n" "shardsum: estimate the usage by summing up all pg shardsizes - doesn't account PG metadata overhead.")) savemappingp = argparse.ArgumentParser(add_help=False) savemappingp.add_argument("--save-mappings", help="filename to store the resulting up osdid set for all pgs (to see where things are placed)") ### subcommands gathersp = sp.add_parser('gather', help="only gather cluster information, i.e. generate a state file") gathersp.add_argument("output_file", help="file to store cluster balancing information to") showsp = sp.add_parser('show', parents=[statep, upmapp, predictionp, osdsizep, usedestimatep, savemappingp], help=("show cluster properties like free pool space or OSD utilizations. " "it shows all info for the 'acting' state by default. " "use '--pgstate up' to look into the future and print how it will look after all movements are done.")) showsp.add_argument('--only-crushclass', help="only display devices of this crushclass") showsp.add_argument('--sort-shardsize', action='store_true', help="sort the pool overview by shardsize") showsp.add_argument('--osds', action='store_true', help="show info about all the osds instead of just the pool overview") showsp.add_argument('--format', choices=['plain', 'json'], default='plain', help="output formatting: plain or json. default: %(default)s") showsp.add_argument('--pgstate', choices=['up', 'acting'], default='acting', help="which PG state to consider: up (planned) or acting (active). default: %(default)s") showsp.add_argument('--per-pool-count', action='store_true', help="in text formatting mode, show how many pgs for each pool are mapped") showsp.add_argument('--normalize-pg-count', action='store_true', help="normalize the pg count by disk size") showsp.add_argument('--sort-pg-count', type=int, help="sort osds by pg count of given pool id") showsp.add_argument('--sort-utilization', action='store_true', help="sort osds by utilization") showsp.add_argument('--use-weighted-utilization', action='store_true', help="calculate osd utilization by weighting device size") showsp.add_argument('--use-shardsize-sum', action='store_true', help="calculate osd utilization by adding all PG shards on it") showsp.add_argument('--show-max-avail', action='store_true', help="show how much space would be available if the pool had infinity many pgs") showsp.add_argument('--osd-fill-min', type=int, default=0, help='minimum fill %% to show an osd, default: %(default)s%%') showsp.add_argument('--save-upmap-progress', help="filename to store cluster stats after each after each upmap change") remappsp = sp.add_parser('showremapped', parents=[statep, osdsizep], help="show current PG remaps and their progress") remappsp.add_argument('--by-osd', action='store_true', help="group the results by osd") remappsp.add_argument('--osds', help="only look at these osds when using --by-osd, comma separated") balancep = sp.add_parser('balance', parents=[statep, upmapp, osdsizep, usedestimatep, savemappingp], help="distribute PGs for better capacity and performance in your cluster") balancep.add_argument('--output', '-o', default="-", help="output filename for resulting movement instructions. default stdout.") balancep.add_argument('--max-pg-moves', '-m', type=int, default=10, help='maximum number of pg movements to find, default: %(default)s') balancep.add_argument('--only-pool', help='comma separated list of pool names to consider for balancing') balancep.add_argument('--only-poolid', help='comma separated list of pool ids to consider for balancing') balancep.add_argument('--only-crushclass', help='comma separated list of crush classes to balance') balancep.add_argument('--source-osds', help=("only consider these osds as movement source, separated by ',' or ' '. " 'to balance just one bucket, use "$(ceph osd ls-tree bucketname)"')) balancep.add_argument('--pg-choice', choices=['largest', 'median', 'auto'], default='largest', help=('method to select a PG move candidate on a OSD based on its size. ' 'auto tries to determine the best PG size by looking at ' 'the currently emptiest OSD. ' 'default: %(default)s')) balancep.add_argument('--osdfrom', choices=["fullest", "limiting", "alternate"], default="alternate", help=('how to determine the source osd for a movement? default: %(default)s. ' "fullest=start with the fullest osd (by percent of device size). " "limiting=start with the fullest osd that actually limits the usable pool space. " "alternate=alternate between limiting and fullest devices")) balancep.add_argument('--ignore-ideal-pgcounts', choices=['all', 'source', 'destination', 'none'], default='none', help=("don't consider balancing by placement group count on an source/destination/both OSD. " "if you set this, the ceph's built-in balancer and this one will have a fight.")) balancep.add_argument('--ignore-pgsize-toolarge', action="store_true", help=("don't pre-filter PGs to rule out those that will for sure not gain any space " "- the target OSD would become fuller than the source OSD of a movement is.")) balancep.add_argument('--ignore-target-usage', action="store_true", help=("don't ensure the target device is less used than the source after a move. ")) balancep.add_argument('--ensure-target-limits', action="store_true", help=("make sure the target device does not become the pool's size limit after a move")) balancep.add_argument('--ensure-optimal-moves', action='store_true', help='make sure that only movements which win full shardsizes are done') balancep.add_argument('--ensure-variance-decrease', action='store_true', help='make sure that only movements which decrease the fill rate variance are performed') balancep.add_argument('--max-move-attempts', type=int, default=2, help=("current source osd can't be emptied more, " "try this many more other osds candidates to empty. default: %(default)s")) balancep.add_argument('--save-timings', help="filename to save timing information for each generated move") pooldiffp = sp.add_parser('poolosddiff', parents=[statep, osdsizep]) pooldiffp.add_argument('--pgstate', choices=['up', 'acting'], default="acting", help="what pg set to take, up or acting (default acting).") pooldiffp.add_argument('pool1', help="use this pool for finding involved osds") pooldiffp.add_argument('pool2', help="compare to this pool which osds are involved") sp.add_parser('repairstats', parents=[statep, osdsizep], help="which OSDs repaired their stored data?") testp = sp.add_parser('test', help="test internal stuff") testp.add_argument('--name', '-n', help='doctest name to run') osdmapp = sp.add_parser('osdmap', parents=[], help="compatibility with ceph osd maps") osdmapsp = osdmapp.add_subparsers(dest='osdmapmode') osdmapsp.required = True osdmapexportsp = osdmapsp.add_parser('export', parents=[statep, upmapignorep], help="create osdmap files") osdmapexportsp.add_argument("output_file", help="osdmap filename to save to") args = cli.parse_args() return args def log_setup(setting, default=1): """ Perform setup for the logger. Run before any logging.log thingy is called. if setting is 0: the default is used, which is WARNING. else: setting + default is used. """ levels = (logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG, logging.NOTSET) factor = clamp(default + setting, 0, len(levels) - 1) level = levels[factor] logging.basicConfig(level=level, format="[%(asctime)s] %(message)s") logging.captureWarnings(True) def clamp(number, smallest, largest): """ return number but limit it to the inclusive given value range """ return max(smallest, min(number, largest)) class strlazy: """ to be used like this: logging.debug("rolf %s", strlazy(lambda: do_something())) so do_something is only called when the debug message is actually printed do_something could also be an f-string. """ def __init__(self, fun): self.fun = fun def __str__(self): return self.fun() class PGState(Enum): """ what pg state set to use """ UP = 1 # use the future planned pg assignments ACTING = 2 # use the current data-serving pg assignments class OSDSizeMethod(Enum): """ how to determine the OSD size """ CRUSH = 1 # use the crush size DEVICE = 2 # use the device size WEIGHTED = 3 # weighted device size class OSDUsedMethod(Enum): """ how to determine the OSD usage size during simulation. we don't know what the OSD will actually do for movement and cleanup, so we have options to choose from how to estimate the new usage. """ # adjusting the reported osd usage report by adding fractions of currently-in-move pg sizes. # more accurate but doesn't account pending data deletion. DELTA = 1 # estimate the usage by summing up all pg shardsizes, # doesn't account PG metadata. SHARDSUM = 2 class OSDFromChoiceMethod(Enum): """ how to choose a osd to move data from """ FULLEST = 1 # use the fullest osd LIMITING = 2 # use devices limiting the pool available space ALTERNATE = 3 # alternate between limiting and fullest devices class PGChoiceMethod(Enum): """ how to select a pg for movement """ # take the largest pg from the best source osd LARGEST = 1 # take the median pg size from the best source osd MEDIAN = 2 # determine the best pg size automatically by looking at the ideal space needed on the emptiest osd. AUTO = 3 class PoolFreeMethod(Enum): """ how available pool space is predicted """ # use device utilization and osd weight distribution WEIGHT = 0 # determine the limiting osd device and exact placement group availability LIMITING = 1 def jsoncall(cmd, swallow_stderr=False): if not isinstance(cmd, list): raise ValueError("need cmd as list") stderrval = subprocess.DEVNULL if swallow_stderr else None rawdata = subprocess.check_output(cmd, stderr=stderrval) # in ceph reef, inf is encoded in invalid format for python's json. rawdata = rawdata.replace(b':inf', b':Infinity') rawdata = rawdata.decode() return json.loads(rawdata) def pformatsize(size_bytes, commaplaces=1): prefixes = ((1, 'K'), (2, 'M'), (3, 'G'), (4, 'T'), (5, 'P'), (6, 'E'), (7, 'Z')) for exp, name in prefixes: if abs(size_bytes) >= 1024 ** exp and abs(size_bytes) < 1024 ** (exp + 1): new_size = size_bytes / 1024 ** exp fstring = "%%.%df%%s" % commaplaces return fstring % (new_size, name) return "%.1fB" % size_bytes def cephtime_to_datetime(cephtime: str) -> datetime.datetime: """ converts a ceph dump timestamp to python datetime. """ try: # we've seen: 0.000000, seems to be an old pool :) val = float(cephtime) return datetime.datetime.fromtimestamp(val) except ValueError: # we have to insert a : in the timezone part so python is happy... cephtime_with_fixed_timezone = cephtime[:-2] + ":" + cephtime[-2:] return datetime.datetime.fromisoformat(cephtime_with_fixed_timezone) def datetime_to_osdmaptime(when: datetime.datetime): ns = when.time().microsecond * 1000 ts = when.timestamp() sec = int(ts // 1) # since osdmap stores real nanoseconds, we can't produce perfect results # to reduce confusion when comparing diffs, mark the broken nanoseconds return struct.pack(" # printf("sizeof(struct sockaddr_in): %zu\n", sizeof(struct sockaddr_in)); # -> # sizeof(struct sockaddr_in): 16 # sizeof(struct sockaddr_in6): 28 elen = 16 if addr_family == socket.AF_INET else 28 # elen: u32 sockaddr length: sizeof sockaddr_in or sockaddr_in6 ret.extend(struct.pack(' 14 bytes total size sockaddr_data = struct.pack('!H4s8x', port, ip_bytes) elif addr_family == socket.AF_INET6: # struct sockaddr_in6 { # u16 family; // -- omitted # u16 sin6_port; // Transport layer port # u32 sin6_flowinfo; // IPv6 flow information # struct in6_addr sin6_addr; // IPv6 address 16 bytes # u32 sin6_scope_id; # }; # omit u16 family as first field sockaddr_data = struct.pack('!HI16sI', port, 0, ip_bytes, 0) else: raise RuntimeError("unhandled family") # only extend by elen - sizeof(family), since we already wrote the family assert len(sockaddr_data) == (elen - 2) ret.extend(sockaddr_data) return ret def encode_pg_t_from_pgid(pgid): """ given: pgid (e.g. 32.af23) returns: ceph binary encoding of pg_t """ # pg_t struct # u8 v=1, pool, seed, 'was preferred' (sic) srcpool, srcseed = pgid.split('.') return struct.pack('>> moves_from_up_acting(up_osds=[], acting_osds=[], is_ec=False) [] >>> moves_from_up_acting(up_osds=[], acting_osds=[], is_ec=True) [] >>> moves_from_up_acting(up_osds=[1, 2, 3], acting_osds=[1, 2, 3], is_ec=True) [] >>> moves_from_up_acting(up_osds=[1, 2, 3], acting_osds=[1, 2, 3], is_ec=False) [] # ec tests: >>> moves_from_up_acting(up_osds=[10, 2, 5, 4], acting_osds=[1, 2, 3, 4], is_ec=True) [((1,), (10,)), ((3,), (5,))] >>> moves_from_up_acting(up_osds=[10, 2, 5, 4], acting_osds=[1, 2, -1, 4], is_ec=True) [((1,), (10,)), ((-1,), (5,))] # non-ec tests: >>> moves_from_up_acting(up_osds=[397, 902, 888, 74], acting_osds=[397, 902, 888], is_ec=False) [((-1,), (74,))] >>> moves_from_up_acting(up_osds=[397, 902, 888, 74], acting_osds=[397, 902], is_ec=False) [((-1, -1), (74, 888))] >>> moves_from_up_acting(up_osds=[1, 2, 3, 4], acting_osds=[1, 3, 2], is_ec=False) [((-1,), (4,))] >>> moves_from_up_acting(up_osds=[1, 2, 3, 4], acting_osds=[5, 3, 2], is_ec=False) [((-1, 5), (1, 4))] >>> moves_from_up_acting(up_osds=[1, 2, 3, 4], acting_osds=[3, 2], is_ec=False) [((-1, -1), (1, 4))] """ moves = list() if is_ec: # for ec, order is important. for up_osd, acting_osd in zip(up_osds, acting_osds): if up_osd != acting_osd: moves.append(((acting_osd,), (up_osd,))) else: ups = set(up_osds) actings = set(acting_osds) # all ups that are not yet acting to_osds = list(ups - actings) # all that are acting but don't stay in up. from_osds = list(actings - ups) missing_osds = len(to_osds) - len(from_osds) from_osds.extend([-1] * missing_osds) if not len(from_osds) == len(to_osds): raise Exception(f"|from| != |to|: |{from_osds}| != |{to_osds}|") if from_osds: moves.append(( tuple(sorted(from_osds)), tuple(sorted(to_osds)), )) return moves def remaps_merge(target_remaps: Dict[int, int], merge_remaps: Optional[Dict[int, int]] = None, in_place: bool = False): """ remove cycles and transitive remaps from a remap dict. modifies 'target_remaps'! if merge_remaps is given, merge those remaps onto the `target_remaps` dict. >>> remaps_merge({}) {} >>> remaps_merge({1: 2, 3: 4}) {1: 2, 3: 4} >>> remaps_merge({1: 2, 2: 3}) {1: 3} >>> remaps_merge({1: 2}, {2: 3}) {1: 3} >>> remaps_merge({1: 2, 2: 1}) {} >>> remaps_merge({1: 2, 2: 3, 3: 4}) {1: 4} >>> remaps_merge({1: 2, 2: 3}, {3: 4}) {1: 4} >>> remaps_merge({216: 205, 360: 294}, {294: 216}) {360: 205} >>> remaps_merge({1: 2, 2: 3}, {1: 4}) {1: 4} >>> remaps_merge({1: 2, 2: 3}, {1: 4, 4: 5}) {1: 5} """ ret = dict() if in_place: destination = target_remaps else: destination = target_remaps.copy() def merge_chains(remaps): for osd_from, osd_to in remaps.copy().items(): while True: # given (a, b) is there (b, c)? next_to = remaps.get(osd_to) if next_to is not None: if osd_from == next_to: # symmetric: it's (a, b), (b, a) del remaps[osd_to] if merge_remaps is None: # in-place operation del remaps[next_to] else: # transitive: it's (a, b), (b, c) so we set (a, c) and remove (b, c) remaps[osd_from] = next_to del remaps[osd_to] # follow the remap-chain osd_to = next_to else: break merge_chains(destination) if merge_remaps is not None: destination.update(merge_remaps) merge_chains(destination) if False: # TODO: remove once confident enough the above is correct :) for new_from, new_to in destination.items(): if new_from == new_to: raise RuntimeError(f"somewhere something went wrong: " f"we map from osd.{new_from} to osd.{new_to} in " f"{destination}") while True: next_to = destination.get(new_to) if next_to is not None: raise RuntimeError(f"something went wrong: " f"there's still transitive remaps left for {new_to}: " f"{destination}") else: break return destination def pool_from_pg(pg): return int(pg.split(".")[0]) def bucket_fill(id, bucket_info, parent_id=None): """ returns the list of all child buckets for a given id plus for each of those, their children. """ bucket = bucket_info[id] children = list() ids = dict() this_bucket = { "id": id, "name": bucket["name"], "type_name": bucket["type_name"], "weight": bucket["weight"], "parent": parent_id, "children": children, } ids[id] = this_bucket for child_item in bucket["items"]: child = bucket_info[child_item["id"]] cid = child["id"] if cid < 0: new_nodes, new_ids = bucket_fill(cid, bucket_info, id) ids.update(new_ids) children.extend(new_nodes) else: # it's a device new_node = { "id": cid, "name": child["name"], "type_name": "osd", "class": child["class"], "parent": id, } ids[cid] = new_node children.append(new_node) return this_bucket, ids class ClusterState: # to detect if imported state files are incompatible STATE_VERSION = 1 def __init__(self, statefile: Optional[str] = None, osdsize_method: OSDSizeMethod = OSDSizeMethod.CRUSH): self.state = dict() self.load(statefile) self.osdsize_method = osdsize_method def load(self, statefile: Optional[str]): # use cluster state from a file if statefile: logging.info(f"loading cluster state from file {statefile}...") with lzma.open(statefile) as hdl: self.state = json.load(hdl) import_version = self.state['stateversion'] if import_version != self.STATE_VERSION: raise RuntimeError(f"imported file stores state in version {import_version}, but we need {self.STATE_VERSION}") else: logging.info(f"gathering cluster state via ceph api...") # this is shitty: this whole script depends on these outputs, # but they might be inconsistent, if the cluster had changes # between calls.... # it would be really nice if we could "start a transaction" self.state = dict( stateversion=self.STATE_VERSION, timestamp=datetime.datetime.now().isoformat(), versions=jsoncall("ceph versions --format=json".split()), health_detail=jsoncall("ceph health detail --format=json".split()), osd_dump=jsoncall("ceph osd dump --format json".split()), # ceph pg dump always echoes "dumped all" on stderr, silence that. pg_dump=jsoncall("ceph pg dump --format json".split(), swallow_stderr=True), osd_df_dump=jsoncall("ceph osd df --format json".split()), osd_df_tree_dump=jsoncall("ceph osd df tree --format json".split()), df_dump=jsoncall("ceph df detail --format json".split()), pool_dump=jsoncall("ceph osd pool ls detail --format json".split()), crush_dump=jsoncall("ceph osd crush dump --format json".split()), crush_class_osds=dict(), ) crush_classes = jsoncall("ceph osd crush class ls --format json".split()) for crush_class in crush_classes: class_osds = jsoncall(f"ceph osd crush class ls-osd {crush_class} --format json".split()) if not class_osds: continue self.state["crush_class_osds"][crush_class] = class_osds # check if the osdmap version changed meanwhile # => we'd have inconsistent state if self.state['osd_dump']['epoch'] != jsoncall("ceph osd dump --format json".split())['epoch']: raise RuntimeError("Cluster topology changed during information gathering (e.g. a pg changed state). " "Wait for things to calm down and try again") def dump(self, output_file): logging.info(f"cluster state dumped. now saving to {output_file}...") with lzma.open(output_file, "wt") as hdl: json.dump(self.state, hdl, indent='\t') logging.warning(f"cluster state saved to {output_file}") def export_osdmap(self, output_file, ignore_state_upmaps=False): logging.info(f"exporting cluster state as osdmap to {output_file}...") out = io.BytesIO() def writeb(data, name=None): logging.debug(strlazy(lambda: f"[{out.tell():>10x}] {binascii.hexlify(data)} {'<- %s' % name if name else ''}")) out.write(data) class StructLength: """ write a u32 length field and remember its position. """ def __init__(self, stream, name=None): self.stream = stream self.name = name or "struct" self.len_field_pos = stream.tell() # struct_len: u32 # fill the len hole with dummy data writeb(struct.pack('= 0 return_pos = self.stream.tell() out.seek(self.len_field_pos) writeb(struct.pack(' (u32 map.size()) pools = self.state['osd_dump']['pools'] writeb(struct.pack(' writeb(struct.pack(' removed_snaps writeb(struct.pack(' tiers; writeb(struct.pack(' properties; writeb(struct.pack(' (len(pool['options'])) # u32 options_len writeb(struct.pack('{val_type_id} for value={value}") pool_opts_struct.write_len() # u32 last_force_op_resend_prenautilus writeb(struct.pack(' application_metadata writeb(struct.pack(' writeb(struct.pack(' writeb(struct.pack(' writeb(struct.pack('client_addrs: vector> writeb(struct.pack(' writeb(struct.pack(' writeb(struct.pack(' writeb(struct.pack(' class_name buckets_with_class = dict() # buckets that have no device class (regular buckets) # we map them their shadow buckets (by class_id) # bucket_id -> {class_id -> bucket_id} bucket_shadows = defaultdict(dict) min_bucket_id = 0 for bucket in buckets: bucket_id = bucket["id"] bucket_name = bucket["name"] buckets_by_id[bucket_id] = bucket buckets_by_name[bucket_name] = bucket if bucket_id < min_bucket_id: min_bucket_id = bucket_id for bucket in buckets: bucket_id = bucket["id"] bucket_name = bucket["name"] if "~" in bucket_name: base_bucket_name, class_name = bucket_name.split("~") class_id = crush_class_ids.get(class_name) if class_id is None: # this is a class that has no osds, so we assign it a new id... class_id = len(crush_class_ids) crush_class_ids[class_name] = class_id buckets_with_class[bucket_id] = class_id # store what base bucket this shadow bucket is associated to. base_bucket_id = buckets_by_name[base_bucket_name]["id"] bucket_shadows[base_bucket_id][class_id] = bucket_id # the dump loops for (i=-1, i > -max_buckets-1, i--) # and dumps id=i max_buckets = -min_bucket_id # max_buckets: s32 writeb(struct.pack(' max_rule_id: max_rule_id = rule_id max_rules = max_rule_id + 1 writeb(struct.pack(' wraparound handling # which we happily ignore. crush_class_ids[device_class] = len(crush_class_ids) if device["id"] > max_device_id: max_device_id = device_id max_devices = max_device_id + 1 writeb(struct.pack(' (0xff + 1): raise RuntimeError("max 255 rules supported, as rule_id is encoded as u8") op_to_id = { "noop": (0, None, None), # CRUSH_RULE_NOOP "take": (1, "item", None), # CRUSH_RULE_TAKE arg1=which crush item to start walk "choose_firstn": (2, "num", "type"), # CRUSH_RULE_CHOOSE_FIRSTN arg1=num items to pick arg2=choose type "choose_indep": (3, "num", "type"), # CRUSH_RULE_CHOOSE_INDEP arg1=num items to pick arg2=choose type "emit": (4, None, None), # CRUSH_RULE_EMIT "chooseleaf_firstn": (6, "num", "type"), # CRUSH_RULE_CHOOSELEAF_FIRSTN arg1=num items to pick arg2=choose type "chooseleaf_indep": (7, "num", "type"), # CRUSH_RULE_CHOOSELEAF_INDEP arg1=num items to pick arg2=choose type "set_choose_tries": (8, "num", None), # CRUSH_RULE_SET_CHOOSE_TRIES override choose_total_tries "set_chooseleaf_tries": (9, "num", None), # CRUSH_RULE_SET_CHOOSELEAF_TRIES override chooseleaf_descend_once # ignore 10-13, they're not json-encoded anyway. } crush_types_to_id = dict() for crush_type in self.state['crush_dump']['types']: crush_types_to_id[crush_type["name"]] = crush_type["type_id"] for rule_id in range(max_rules): rule = rules_by_id.get(rule_id) # yes: u32 writeb(struct.pack(' crush_types = self.state['crush_dump']['types'] writeb(struct.pack(': crush item id -> item name writeb(struct.pack(' writeb(struct.pack(': nodeid->classid # this not only contains existing devices with their class, # but also shadow buckets (ie. servername~ssd)! # unfortunately, the class ids are not part of any of the json dumps :( # but as long as we create a sound mapping, we're good. writeb(struct.pack(' writeb(struct.pack('> # maps what shadow buckets are associated to a regular buckets # -> regular_bucket_id -> {class_id -> shadow_bucket_id} writeb(struct.pack('> ec_profiles = self.state['osd_dump']['erasure_code_profiles'] writeb(struct.pack('> pg_upmaps = self.state['osd_dump']['pg_upmap'] writeb(struct.pack('>> if ignore_state_upmaps: pg_upmap_items = [] else: pg_upmap_items = self.state['osd_dump']['pg_upmap_items'] writeb(struct.pack(' new_removed_snaps = self.state['osd_dump']['new_removed_snaps'] writeb(struct.pack(' # == interval_set # == map (maps start -> len) writeb(struct.pack(' new_purged_snaps = self.state['osd_dump']['new_purged_snaps'] writeb(struct.pack(' props self.poolnames = dict() # poolname => poolid self.crushrules = dict() # ruleid => props self.crushclass_osds = defaultdict(set) # crushclass => osdidset self.crushclasses_usage = dict() # crushclass => size props self.osd_crushclass = dict() # osdid => crushclass self.ec_profiles = dict() # erasure coding profile names # current crush placement overrides # map pgid -> {from: to, ...} self.upmap_items = dict() # map pg -> osds involved self.pg_osds_up = defaultdict(set) self.pg_osds_acting = defaultdict(set) # pg metadata # pgid -> pg dump pgstats entry self.pgs = dict() # osdid -> various osdinfos self.osds = dict() # osds used by a pool: # pool_id -> {osdid} self.pool_osds_up = defaultdict(set) self.pool_osds_acting = defaultdict(set) # crush root name -> (bucket_tree, child bucket_ids) self.bucket_roots = dict() # cluster id self.fsid = self.state['osd_dump']['fsid'] # what full percentage OSDs no longer accept data self.full_ratio = self.state["osd_dump"]["full_ratio"] for crush_class, class_osds in self.state["crush_class_osds"].items(): if not class_osds: continue self.crushclass_osds[crush_class].update(class_osds) for osdid in class_osds: self.osd_crushclass[osdid] = crush_class class_df_stats = self.state["df_dump"]["stats_by_class"][crush_class] # there's more stats, but raw is probably ok # cf. the ceph df output lists this at the top. # TODO: does this exclude the full-ratio lost space? it appears so? self.crushclasses_usage[crush_class] = { 'size': class_df_stats["total_bytes"], # sum of all devices in that class 'avail': class_df_stats["total_avail_bytes"], 'used': class_df_stats["total_used_raw_bytes"], 'percent_used': class_df_stats["total_used_raw_ratio"] * 100, 'osd_count': len(class_osds), } # longest poolname's length self.max_poolname_len = 0 for pool in self.state["osd_dump"]["pools"]: id = pool["pool"] name = pool["pool_name"] if len(name) > self.max_poolname_len: self.max_poolname_len = len(name) self.pools[id] = { 'name': name, 'crush_rule': pool["crush_rule"], 'pg_num': pool["pg_num"], # current pgs before merge 'pgp_num': pool["pg_placement_num"], # actual placed pg count 'pg_num_target': pool["pg_num_target"], # target pg num 'size': pool["size"], 'min_size': pool["min_size"], } self.poolnames[name] = id for ec_profile, ec_spec in self.state["osd_dump"]["erasure_code_profiles"].items(): self.ec_profiles[ec_profile] = { "data_chunks": int(ec_spec["k"]), "coding_chunks": int(ec_spec["m"]), } for pool_stat in self.state["pg_dump"]["pg_map"]["pool_stats"]: id = pool_stat["poolid"] self.pools[id].update({ "num_objects": pool_stat["stat_sum"]["num_objects"], "num_object_copies": pool_stat["stat_sum"]["num_object_copies"], "num_objects_degraded": pool_stat["stat_sum"]["num_objects_degraded"], "num_objects_misplaced": pool_stat["stat_sum"]["num_objects_misplaced"], "num_omap_bytes": pool_stat["stat_sum"]["num_omap_bytes"], "num_omap_keys": pool_stat["stat_sum"]["num_omap_keys"], # this is just without omap without redundancy "num_bytes": pool_stat["stat_sum"]["num_bytes"], }) for pooldf in self.state["df_dump"]["pools"]: id = pooldf["id"] pool = self.pools[id] pool.update({ "stored": pooldf["stats"]["stored"], # stored_data + stored_omap without redundancy "objects": pooldf["stats"]["objects"], # number of pool objects # data_bytes_used + omap_bytes_used including redundancy of data + omap "used": pooldf["stats"]["bytes_used"], # available storage amount according to ceph's internal calculation "store_avail": pooldf["stats"]["max_avail"], "percent_used": pooldf["stats"]["percent_used"], "quota_bytes": pooldf["stats"]["quota_bytes"], "quota_objects": pooldf["stats"]["quota_objects"], }) for poolmeta in self.state["pool_dump"]: id = poolmeta["pool_id"] pool = self.pools[id] pool_type = pool_repl_type(poolmeta["type"]) ec_profile = poolmeta["erasure_code_profile"] pg_shard_size_avg = pool["stored"] / pool["pg_num"] if pool_type == "ec": profile = self.ec_profiles[ec_profile] data_chunks = profile["data_chunks"] pg_shard_size_avg /= data_chunks blowup_rate = (data_chunks + profile["coding_chunks"]) / data_chunks elif pool_type == "repl": blowup_rate = poolmeta["size"] else: raise RuntimeError(f"unknown pool_type={pool_type}") pool.update({ "erasure_code_profile": ec_profile if pool_type == "ec" else None, "repl_type": pool_type, "pg_shard_size_avg": pg_shard_size_avg, "blowup_rate": blowup_rate, }) # apparently ceph 17.2.5 reports the same value for stored & used due to some bug? if pool['stored'] == pool['used']: # TODO: when ec supports omap, we may need to blowup omap differently. pool['used'] *= blowup_rate # create osd base structure for osd in self.state["osd_dump"]["osd_xinfo"]: self.osds[osd["osd"]] = { 'features': osd['features'], 'laggy_probability': osd['laggy_probability'], } for osd in self.state["osd_df_dump"]["nodes"]: osdid = osd["id"] self.osds[osdid].update({ "utilization": osd["utilization"], "status": osd["status"], }) # adjustments to the crush mappings # these are already applied in the pg->osd mapping infos via up/acting. for upmap_item in self.state["osd_dump"]["pg_upmap_items"]: remaps = dict() for remap in upmap_item["mappings"]: osd_from = remap["from"] osd_to = remap["to"] if osd_from in remaps: # apparently ceph applies the first upmap item only # for the same device. # observed by: # pg_upmaps[10.7e6] = [(361, 603), (751, 750), (361, 492)] # up_osds[10.7e6] = [546, 603, 10, 786, 750, 785, 799] continue remaps[osd_from] = osd_to # modify remaps to filter out redundant info remaps = remaps_merge(remaps) self.upmap_items[upmap_item["pgid"]] = remaps # map osd -> pgs on it osd_mappings = defaultdict( lambda: {'up': set(), 'primary': set(), 'acting': set()} ) for pginfo in self.state["pg_dump"]["pg_map"]["pg_stats"]: if pginfo["state"] in ("unknown",): # skip pgs with no active osds continue pgid = pginfo["pgid"] self.pgs[pgid] = pginfo up = replace_missing_osds(pginfo["up"]) pginfo["up"] = up acting = replace_missing_osds(pginfo["acting"]) pginfo["acting"] = acting primary = acting[0] self.pgs[pgid].update({ # osdid -> metadata amount estimated by object count "metadata_estimates": dict(), # average metadata size estimation (avg ov above dict) "metadata_bytes": 0, }) self.pg_osds_up[pgid] = up self.pg_osds_acting[pgid] = acting osd_mappings[primary]['primary'].add(pgid) for osd in up: osd_mappings[osd]['up'].add(pgid) for osd in acting: osd_mappings[osd]['acting'].add(pgid) # gather which pgs are on what osd # and which pools have which osds for osdid, osdpgs in osd_mappings.items(): if osdid == -1: continue osd_pools_up = set() osd_pools_acting = set() pgs_up = set() pgs_acting = set() pg_count_up = defaultdict(int) pg_count_acting = defaultdict(int) osd_objs_up = 0 osd_objs_acting = 0 for pgid in osdpgs['up']: poolid = pool_from_pg(pgid) osd_pools_up.add(poolid) pgs_up.add(pgid) pg_count_up[poolid] += 1 self.pool_osds_up[poolid].add(osdid) osd_objs_up += self.pgs[pgid]['stat_sum']['num_objects'] for pgid in osdpgs['acting']: poolid = pool_from_pg(pgid) osd_pools_acting.add(poolid) pgs_acting.add(pgid) pg_count_acting[poolid] += 1 self.pool_osds_acting[poolid].add(osdid) osd_objs_acting += self.pgs[pgid]['stat_sum']['num_objects'] self.osds[osdid].update({ 'pools_up': list(sorted(osd_pools_up)), 'pools_acting': list(sorted(osd_pools_acting)), 'pg_count_up': pg_count_up, 'pg_count_acting': pg_count_acting, 'pg_num_up': len(pgs_up), 'pgs_up': list(pgs_up), 'pg_num_acting': len(pgs_acting), 'pgs_acting': list(pgs_acting), 'objs_up': osd_objs_up, 'objs_acting': osd_objs_acting, }) for osd in self.state["osd_dump"]["osds"]: osdid = osd["osd"] # is None when crushclass is not known crushclass = self.osd_crushclass.get(osdid) if crushclass is None and osd["weight"] > 0: raise Exception(f"crushclass for osd.{osdid} unknown but weight is > 0") self.osds[osdid].update({ "weight": osd["weight"], "crush_weight": 0, # also in osd_df_dump ["crush_weight"], but we set it when walking the crush trees "cluster_addr": osd["cluster_addr"], "public_addr": osd["public_addr"], "state": tuple(osd["state"]), 'crush_class': crushclass, }) for osd_stat in self.state["pg_dump"]["pg_map"]["osd_stats"]: osdid = osd_stat['osd'] statfs = osd_stat['statfs'] self.osds[osdid].update({ "device_size": statfs["total"], "device_used": statfs["allocated"] + statfs["internal_metadata"], "device_used_data": statfs["allocated"], "device_used_meta": statfs["internal_metadata"], "device_used_omap": statfs["omap_allocated"], "device_available": statfs["available"], # TODO osd["statfs"]["data_compressed", "data_compressed_allocated", "data_compressed_original"] }) # estimate pg metadata sizes for osdid, osdpgs in osd_mappings.items(): if osdid == -1: continue osdinfo = self.osds[osdid] meta_amount = osdinfo["device_used_meta"] if meta_amount == 0: continue osd_objs_acting = osdinfo['objs_acting'] # TODO: this is off by the currently in-transfer pgs that occupy space on the current osdid as destination already # we'd have to remove this metadata estimate by subtracting it from device_meta_amount. # meta_amount = meta_amount * (1 - (partially_transferred_objects / total_obj_count_of_in_transfer_pgs) # -> unify this section with osd_transfer_remainings calculation. # TODO: we could unify it a bit accross pgs of a pool: under equal distribution between pgs, # each should have the same amount of metadata? (maybe weighted by pg's object count) for pgid in osdpgs['acting']: # this calculation is done multiple times: once for each acting pg shard! # this is an advantage: for each one we can calculate the metadata_bytes in context of the current shard's osd # and hence get a better estimate of the real metadata size: # calculate a list of all the metadata estimates, and in the end, store the average of them. # calculated by weighting osd metadata sizes by pg object counts pg_objects = self.pgs[pgid]['stat_sum']['num_objects'] if osd_objs_acting > 0: metadata_estimate = int(meta_amount * pg_objects / osd_objs_acting) else: metadata_estimate = 0 estimates = self.pgs[pgid]["metadata_estimates"] estimates[osdid] = metadata_estimate self.pgs[pgid]["metadata_bytes"] = int(sum(estimates.values()) / len(estimates)) #for pgid, p in self.pgs.items(): # print(f"id {pgid}, shrd={pformatsize(self.get_pg_shardsize(pgid))}, meta={pformatsize(p['metadata_bytes'])}, est={p['metadata_estimates']}") # osdid -> used kb change, based on the osd-level utilization report and ongoing movemements # to estimate osd utilization with DELTA method. # this is the expected utilization change once we reach the up pg state. self.osd_transfer_remainings = dict() for osdid, osdpgs in osd_mappings.items(): if osdid == -1: continue osd = self.osds[osdid] # this is the estimated size of remaining pg transfer amount # the already-transferred part is part of device_used. osd_transfer_remaining = 0 # this is a bit similar to the implementation of `showremapped` to figure out # movement progress. # # we want to estimate the up-utilization, i.e. the osd utilization when all moves # would be finished. # to estimate it, we need to add the remaining (partial) shards of (up - acting) = incoming # and remove the shards of (acting - up) = outgoing. # osd['device_used'] already contains the size of partially transferred shard's objects. # now we account for ongoing/planned transfers. # pgs that are transferred to the osd, so they will be acting soon(tm) for pg_incoming in (osdpgs['up'] - osdpgs['acting']): shardsize = self.get_pg_shardsize(pg_incoming) pginfo = self.pgs[pg_incoming] pg_objs = pginfo["stat_sum"]["num_objects"] if pg_objs <= 0: shardsize_already_transferred = 0 else: # in ceph 17 the degraded calculation is done in PeeringState::update_calc_stats # if replicated: # degraded += num_objects * (pool_size - len(up_osds)) # else: # ec # for shardid in pool_size: # degraded += objs_missing_on_shard[shardid] # # -> for each shard the missing objects are counted separately: # for each move to be done, num_objects_misplaced+=num_objects, # for each restore to be done, num_objects_degraded+=num_objects # # but we only get the sum of the missing objects for a pg, not per shard. # another problem: we only know the shard size, not the object sizes, # so we can only estimate the remaining object sizes. # -> utilization estimation will work best if there are no more remapped PGs! # we figure out how many shards are misplaced/degraded, # and divide these numbers to get an estimate per shard. # this we can then assign to our current osdid. pg_objs_misplaced = pginfo["stat_sum"]["num_objects_misplaced"] pg_objs_degraded = pginfo["stat_sum"]["num_objects_degraded"] # the pginfo statistics only provide us with pg-overall statistics # so we need to figure out how much of that affects the current osdid # -> see how many moves exist for the whole pg, # and estimate the fraction due to movements with this osdid. # the pg is moved and the source is missing pg_degraded_moves = 0 # the pg is moved and the source exists pg_misplaced_moves = 0 # this osd is the target of a pg move where the source is nonexistant pg_degraded_shards_on_osd = 0 # this osd is the target of a pg move where the source does exist pg_misplaced_shards_on_osd = 0 # get the real current remaps active in the cluster incoming_pg_moves = self.get_remaps(pginfo) # TODO in pginfo there's shards in object_location_counts, but that doesn't seem to contain # sensible content, just the pg object count? # maybe this info is available sometime in the future for osds_from, osds_to in incoming_pg_moves: # -1 in osds_from means -> move source is degraded # (from_osdid, ...), (to_osdid, ...) # osd list lengths are always equal for osd_from, osd_to in zip(osds_from, osds_to): if osd_from == -1: pg_degraded_moves += 1 if osd_to == osdid: pg_degraded_shards_on_osd += 1 elif osd_from != osd_to: pg_misplaced_moves += 1 if osd_to == osdid: pg_misplaced_shards_on_osd += 1 # the following assumes that all shards of this pg are recovered at the same rate, # when there's multiple from/to osds for this pg # we can't do better apparently since we don't get missing object # counts per shard, but just per-pg unfortunately. osd_pg_objs_to_restore = 0 # estimate the degraded object counts for each shard if pg_degraded_moves > 0: pg_shard_objs_degraded = pg_objs_degraded / pg_degraded_moves osd_pg_objs_to_restore += pg_shard_objs_degraded * pg_degraded_shards_on_osd # same for misplaced objects if pg_misplaced_moves > 0: pg_shard_objs_misplaced = pg_objs_misplaced / pg_misplaced_moves osd_pg_objs_to_restore += pg_shard_objs_misplaced * pg_misplaced_shards_on_osd # adjust fs size by average object size times estimated restoration count. # this is also kinda lame but it seems one can't get more info easily. pg_obj_size = shardsize / pg_objs pg_objs_transferred = pg_objs - osd_pg_objs_to_restore if pg_objs_transferred < 0: raise RuntimeError(f"pg {pg_incoming} to be moved to osd.{osdid} is misplaced " f"with {pg_objs_transferred}<0 objects already transferred") shardsize_already_transferred = int(pg_obj_size * pg_objs_transferred) # already-transferred is not missing - it's included in the device fill level already missing_shardsize = shardsize - shardsize_already_transferred if missing_shardsize < 0: raise Exception("a negative amount of shardsize is not yet transferred?") # add the estimated future shardsize osd_transfer_remaining += missing_shardsize # pgs that will be transferred off the osd for pg_outgoing in (osdpgs['acting'] - osdpgs['up']): osd_transfer_remaining -= self.get_pg_shardsize(pg_outgoing) self.osd_transfer_remainings[osdid] = osd_transfer_remaining if False: osd_fs_used = osd_transfer_remaining + osd['device_used'] logging.debug(strlazy(lambda: ( f"estimated {'osd.%s' % osdid: <8} weight={osd['weight']:.4f} " f"#acting={len(osdpgs['acting']):<3} " f"acting={pformatsize(osd['device_used'], 3)}={0 if osd['device_size'] == 0 else ((100 * osd['device_used']) / osd['device_size']):.03f}% " f"#up={len(osdpgs['up']):<3} " f"up={pformatsize(osd_fs_used, 3)}={0 if osd['device_size'] == 0 else ((100 * osd_fs_used) / osd['device_size']):.03f}% " f"size={pformatsize(osd['device_size'], 3)}"))) # store osd host name for node in self.state["osd_df_tree_dump"]["nodes"]: if node['type'] == "host": for osdid in node['children']: self.osds[osdid]["host_name"] = node['name'] # crush infos for rule in self.state["crush_dump"]["rules"]: id = rule['rule_id'] name = rule['rule_name'] steps = rule['steps'] self.crushrules[id] = { 'name': name, 'steps': steps, } # create the crush trees buckets = self.state["crush_dump"]["buckets"] # bucketid -> bucket dict # bucketid is a negative number bucket_info = dict() # all bucket ids of roots bucket_root_ids = list() # assign devices to bucket ids for device in self.state["crush_dump"]["devices"]: id = device["id"] assert id >= 0 bucket_info[id] = device for bucket in buckets: id = bucket["id"] assert id < 0 bucket_info[id] = bucket # collect all root buckets if bucket["type_name"] == "root": bucket_root_ids.append(id) # get osd crush weights for item in bucket["items"]: item_id = item["id"] # it's an osd if item_id >= 0: # json-crushweight is in 64-gbyte blocks apparently size = (item["weight"] / 64) * 1024 ** 3 self.osds[item_id].update({ "crush_weight": size, }) # populare all bucket roots # TODO: collect all other buckets too since they can be 'take'n too. for root_bucket_id in bucket_root_ids: bucket_tree, bucket_ids = bucket_fill(root_bucket_id, bucket_info) self.bucket_roots[bucket_tree['name']] = (bucket_tree, bucket_ids) del bucket_info @lru_cache(maxsize=None) def candidates_for_root(self, root_name): """ get the all osds where a crush rule could place shards. returns {osdid: osdweight} """ root_data = self.bucket_roots.get(root_name) if not root_data: raise RuntimeError(f"crush root {root_name} not known?") _, root_ids = root_data ret = dict() for nodeid in root_ids.keys(): if (nodeid >= 0 and self.osds[nodeid]['weight'] != 0 and self.osds[nodeid]['crush_weight'] != 0): ret[nodeid] = self.osds[nodeid]['weight'] * self.osds[nodeid]['crush_weight'] return ret @lru_cache(maxsize=None) def candidates_for_pool(self, poolid): """ get all osd candidates for a given pool (due to its crush rule). returns {osdid: osdweight_summed_and_normalized_to_weight_sum} """ pool = self.pools[poolid] pool_size = pool['size'] pool_pg_num = pool['pg_num'] pool_crushrule = self.crushrules[pool['crush_rule']] # cf. PGMap::get_rule_avail # {rootname -> relative_root_selection_weight} rootweights = rootweights_from_rule(pool_crushrule, pool_size) root_weight_sum = sum(rootweights.values()) osd_weights = defaultdict(lambda: 0.0) crush_sum = 0.0 for root_name, root_weight in rootweights.items(): root_weight_fraction = root_weight / root_weight_sum # for each crush root chosen in the pool's rule, get the candidates candidates = self.candidates_for_root(root_name) # accumulate osd weights by how often they can be chosen from a crush rule. for osdid, osdweight in candidates.items(): # apply crush rule weight (because the osd would be chosen more often due to the rule) osdweight *= root_weight_fraction crush_sum += osdweight osd_weights[osdid] += osdweight for osdid in osd_weights.keys(): osd_weights[osdid] /= crush_sum return osd_weights @lru_cache(maxsize=None) def trace_crush_root(self, osdid, root_name): """ in the given root, trace back all items from the osd up to the root """ found = False root_data = self.bucket_roots.get(root_name) if not root_data: raise RuntimeError(f"crush root {root_name} not known?") _, root_ids = root_data try_node_in_root = root_ids.get(osdid) if try_node_in_root is None: # osd is not part of this root, i.e. wrong device class return None node_id = try_node_in_root["id"] assert node_id == osdid # walk from leaf (osd) to the tree root bottomup = list() while True: if node_id is None: # we reached the root break bottomup.append({ "id": node_id, "type_name": root_ids[node_id]["type_name"], }) if root_ids[node_id]["name"] == root_name: found = True break node_id = root_ids[node_id]["parent"] if not found: raise RuntimeError(f"could not find a crush-path from osd={osdid} to {root_name!r}") topdown = list(reversed(bottomup)) return topdown def get_crushclass_osds(self, crushclass, skip_zeroweight : bool = False): """ get all osdids belonging to given crush class. if weight/crushweight is 0, it can be skipped. """ for osdid in self.crushclass_osds[crushclass]: if (skip_zeroweight and (self.osds[osdid]['weight'] == 0 or self.osds[osdid]['crush_weight'] == 0)): continue yield osdid def _get_osd_weighted_size(self, osdid): """ return the weighted OSD device size """ osd = self.osds[osdid] size = osd['device_size'] weight = osd['weight'] return size * weight def _get_osd_crush_weighted_size(self, osdid): """ return the weighted OSD device size """ osd = self.osds[osdid] size = osd['crush_weight'] weight = osd['weight'] return size * weight @lru_cache(maxsize=None) def get_osd_size(self, osdid: int, adjust_full_ratio: bool): """ return the osd size in bytes, depending on the size determination variant. can take into account the size loss due to "full_ratio" """ if self.osdsize_method == OSDSizeMethod.DEVICE: osd_size = self.osds[osdid]['device_size'] elif self.osdsize_method == OSDSizeMethod.WEIGHTED: osd_size = self._get_osd_weighted_size(osdid) elif self.osdsize_method == OSDSizeMethod.CRUSH: osd_size = self._get_osd_crush_weighted_size(osdid) else: raise RuntimeError(f"unknown osd weight method {self.osdsize_method!r}") if adjust_full_ratio: osd_size *= self.full_ratio return osd_size def pool_pg_shard_count_ideal(self, poolid, candidate_osds): """ return the ideal pg count for a poolid, given the candidate osd ids, expressed pgs/byte """ pool = self.pools[poolid] pool_total_pg_count = pool['size'] * pool['pg_num'] size_sum = 0 for osdid in candidate_osds: size_sum += self.get_osd_size(osdid, adjust_full_ratio=True) # uuh somehow no osd had a size or all weights 0? assert size_sum > 0 pgs_per_size = pool_total_pg_count / size_sum return pgs_per_size def osd_pool_pg_shard_count_ideal(self, poolid, osdid, candidate_osds): """ return the ideal pg count for a pool id for some osdid. """ osd_size = self.get_osd_size(osdid, adjust_full_ratio=True) if osd_size == 0: return 0 return self.pool_pg_shard_count_ideal(poolid, candidate_osds) * osd_size def root_uses_from_rule(rule, pool_size): """ rule: crush rule id pool_size: number of osds in one pg return {root_name: choice_count}, [root_for_first_emitted_osd, next_root, ...] for the given crush rule. """ # rootname -> number of chooses for this root root_usages = defaultdict(int) root_order = list() chosen = 0 root_name = None root_choice_count = None for step in rule["steps"]: if step["op"] == "take": # new root take root_name = step["item_name"] root_choice_count = 1 num = 1 elif step["op"].startswith("choose"): num = step["num"] elif step["op"] == "emit": if root_choice_count is not None: # limit to pool size root_choice_count = min(pool_size - chosen, root_choice_count) root_usages[root_name] += root_choice_count root_order.extend([root_name] * root_choice_count) chosen += root_choice_count if chosen == pool_size: break num = 1 else: continue if num <= 0: num += pool_size root_choice_count *= num if not root_usages or chosen == 0: raise RuntimeError(f"rule chooses no roots") return root_usages, root_order def rootweights_from_rule(rule, pool_size): """ given a crush rule and a pool size (involved osds in a pg), calculate the weights crush-roots are chosen for each pg. returns {root_name -> relative_choice_weight_to_all_root_choices} """ # root_name -> choice_count root_usages, _ = root_uses_from_rule(rule, pool_size) # normalize the weights: weight_sum = sum(root_usages.values()) root_weights = dict() for root_name, root_usage in root_usages.items(): root_weights[root_name] = root_usage / weight_sum return root_weights def get_max_reuses(rule, pool_size): """ generate a list of item reuses per rule step. one list entry per root take. take_index -> take_step -> max_reuses -> e.g. [[4, 2, 1, 1], [2, 1, 1]] for pool_size = 6, [[take root, choose2 rack, choose2 server, choose1 osd], [take otherroot, choose2 server, choose1 osd]] """ reuses = [] reuses_for_take: Optional[list] = None fanout_cum = 1 # calculate how often one bucket layer can be reused # this is the crush-constraint, set up by the rule for idx, step in enumerate(rule["steps"]): if step["op"] == "take": if reuses_for_take is not None: reuses.append(list(reversed(reuses_for_take))) reuses_for_take = [] fanout_cum = 1 num = 1 elif step["op"].startswith("choose"): num = step["num"] elif step["op"] == "emit": num = 1 else: continue reuses_for_take.append(fanout_cum) if num <= 0: num += pool_size fanout_cum *= num reuses.append(list(reversed(reuses_for_take))) return reuses class PGMoveChecker: """ for the given rule and utilized pg_osds, create a checker that can verify osd replacements are valid. """ def __init__(self, pg_mappings, move_pgid): self.cluster = pg_mappings.cluster # which pg to relocate self.pg = move_pgid self.pg_mappings = pg_mappings # current pg->[osd] mapping state self.pg_osds = pg_mappings.get_mapping(move_pgid) # acting osds managing this pg self.pool = self.cluster.pools[pool_from_pg(move_pgid)] self.pool_size = self.pool["size"] self.rule = self.cluster.crushrules[self.pool['crush_rule']] logging.debug(strlazy(lambda: (f"movecheck for pg {move_pgid} on {self.pg_osds} (poolsize={self.pool_size})"))) # crush root names and usages for this pg root_uses, root_order = root_uses_from_rule(self.rule, self.pool_size) if len(root_order) != len(self.pg_osds): raise RuntimeError(f"not as many roots as shards! root_order={root_order} osds={self.pg_osds}") # map osd -> crush root name self.osd_roots = dict() for osdid, root in zip(self.pg_osds, root_order): self.osd_roots[osdid] = root # root_name -> osdid candidates -> weight self.osd_candidates = dict() for root_name in root_uses.keys(): self.osd_candidates[root_name] = self.cluster.candidates_for_root(root_name) def get_osd_candidates(self, osd_from): """ return all possible candidate OSDs for the PG to relocate. returns [osdid, ...] """ root_name = self.osd_roots[osd_from] return self.osd_candidates[root_name] @staticmethod def use_item_type(item_uses, trace, item_type, rule_step): """ given a trace (or a part), walk forward, until a given item type is found. increase its use. """ for idx, item in enumerate(trace): if item["type_name"] == item_type: item_id = item["id"] cur_item_uses = item_uses[rule_step].get(item_id, 0) cur_item_uses += 1 item_uses[rule_step][item_id] = cur_item_uses return idx return None def prepare_crush_check(self): """ perform precalculations for moving this pg """ logging.debug(strlazy(lambda: f"prepare crush check for pg {self.pg} currently up={self.pg_osds}")) logging.debug(strlazy(lambda: f"rule:\n{pformat(self.rule)}")) # example: 4+2 ec -> size=6 # 4 ssds and 2 hdds # # root __________-9____________________________ _-13_ # room: -14 -15 # rack: _____-7_______ _________-8_____ ___-10____ -16 -17 # host: -1 -2 -3 -4 -5 -6 -11 -12 -18 -19 # osd: 1 2 | 3 4 | 5 6 | 7 8 | 9 10 | 11 12 | 13 14 | 15 16 17 18 | 19 20 # ^ ^ ^ ^ ^ ^ # # crush rule with two separate root choices: # 0 take root -9 # 1 choose 2 rack # 2 chooseleaf 2 hosts class ssd # 3 emit # # 4 take root -13 # 5 chooseleaf -2 rack class hdd # 6 emit # # inverse reuse aggregation, starting with 1, for each take. # reuses_per_step = [ # [4, 2, 2, 1], # first take # [2, 1, 1], # second take # ] # # current osds=[2, 4, 7, 9, 17 20] # # traces from root down to osd: # 2: [-9, -7, -1, 2] # 4: [-9, -7, -2, 4] # 7: [-9, -8, -4, 7] # 9: [-9, -8, -5, 9] # 17: [-13, -14, -16, -18, 17] # 20: [-13, -15, -17, -10, 20] # # mapping from osdindex to take-index (which take we're in) # osd_take_index=[0, 0, 0, 0, 1, 1] # mapping from rule step to take-index # rule_take_index=[0, 0, 0, 0, 1, 1, 1] # # reuses_per_step: take-index -> rule_step -> use_counts # [{0: {-9: 4}, # for rule_step 0 # 1: {-7: 2, -8: 2}, # for rule_step 1 # 2: {-1: 1, -2: 1, -4: 1, -5: 1}} # for rule_step 2 # {0: {-13: 2}, # for rule_step 4 # 1: {-16: 1, -17: 1}}] # for rule_step 5 # # from these use_count, subtract the trace of the replaced osd # # to eliminate candidates: # * replace old_osd with new_osd # * find take_index by looking up old_osd in osd_take_index # * select reuses from reuses_per_step by take_index # * check if new_osd is candidate of the same root as old_osd # * get trace to root after replacing with new_osd # * check use new counts against reuses_per_step # # replacement: # let's replace osd 2 by: # 1 -> get osd trace = [-9 -7, -1, 1] # max_use = reuses_per_step[takeindex=1] = [4, 2, 2, 1] # -9 used 3<4, -7 used 1<2, -1 used 0<1 -> ok # 2 -> ...skip, we want to replace it # 3 -> -9 used 3<4, -7 used 1<2, -2 used 1<1 -> fail # 4 -> keep, not replaced # 5 -> -9 used 3<4, -7 used 1<2, -3 used 0<1 -> ok # 6 -> -9 used 3<4, -7 used 1<2, -3 used 0<1 -> ok # 7 -> keep, not replaced # 8 -> -9 used 3<4, -8 used 2<2, -4 used 1<1 -> fail # [...] # 17 -> not in candidate list of root of 2! # # replacement candidates for 2: 1, 5, 6, 13 14 15 16 # # let's replace osd 17 by: # 1 -> not in candidate list of root of 17! # [...] # 18 -> ok # 19 -> -19 reused too often -> fail # take_index -> take_step -> allowed number of bucket reuse # e.g. [[6, 2, 1, 1], [3, 1, 1]] for [[root, rack, host, osds], [root, rack, osd]] max_reuses = get_max_reuses(self.rule, self.pool_size) logging.debug(strlazy(lambda: f"allowed reuses per rule step, starting at root: {pformat(max_reuses)}")) # did we encounter an emit? emit = False # collect trace for each osd. # osd -> crush-root-trace constraining_traces = dict() # how many osds were missing (== -1)? # doesn't make sense though since the up set should never contain -1, # but hey apparently it happens. pg_missing_up_osds = 0 # how far down the crush hierarchy have we stepped # 0 = observe the whole tree # each value cuts one layer of the current step's trace tree_depth = 0 # at what rule position is our processing rule_step = 0 # for each rule step, count how often items were used # the rule_steps continue counting after several root takes # rule_step -> {itemid -> use count} item_uses = defaultdict(dict) # a rule may have different crush roots # this offset slides through the rules' devices # for each root. devices_chosen = 0 # rule_step -> tree_depth to next rule (what tree layer is this rule step) # because "choose" steps may skip layers in the crush hierarchy # this is then used for the candidate checks: # when checking item uses of a new trace, # determines which of the trace elements to look at. rule_tree_depth = list() # only these traces are active due to the current "take" take_traces = dict() # at which "take" in the crushrule are we? take_index = -1 # what rule step was the last take? take_rule_step = -1 # at which rule step does a take start # take_index -> rule_step take_rule_steps = dict() # take_index -> rule_steps_taken take_rule_step_count = dict() # record which take a device selection came from # osdid -> take_index device_take_index = dict() # basically analyze how often an item was used # due to the crush rule choosing it # we go through each (meaningful) step in the rule, # see what it selects, and increase the item use counter. # this should match the reuses_per_step we determined before. # # since each step may "skip down" crush tree levels, we have to remember # what step used what crush level (so we can later check changes against the same level of a trace) for step in self.rule["steps"]: if step["op"].startswith("set_"): # skip rule configuration steps continue logging.debug(strlazy(lambda: f"processing crush step {step} with tree_depth={tree_depth}, " f"rule_step={rule_step}, item_uses={item_uses}")) if step["op"] == "take": # "take" just defines what crush-subtree we wanna use for choosing devices next. current_root_name = step["item_name"] tree_depth = 0 take_index += 1 take_rule_step = rule_step take_rule_steps[take_index] = rule_step rule_tree_depth.append(tree_depth) # how many devices will be selected in this root? # maximum is gonna be the pool size anyway root_device_count = min(self.pool_size - devices_chosen, max_reuses[take_index][0]) root_osds = self.pg_osds[devices_chosen:(devices_chosen + root_device_count)] # generate tracebacks for all osds that ends up in this root. # we collect root-traces of all acting osds of the pg we wanna check for. for pg_osd in root_osds: if pg_osd == -1: pg_missing_up_osds += 1 logging.debug(" trace for -1: ignoring missing") continue trace = self.cluster.trace_crush_root(pg_osd, current_root_name) logging.debug(strlazy(lambda: f" trace for {pg_osd:4d}: {trace}")) if trace is None or len(trace) < 2: raise RuntimeError(f"no trace found for {pg_osd} in {current_root_name}") constraining_traces[pg_osd] = trace # the root was "used" root_id = trace[0]["id"] root_use = item_uses[rule_step].get(root_id, 0) + 1 item_uses[rule_step][root_id] = root_use device_take_index[pg_osd] = take_index if not constraining_traces: raise RuntimeError(f"no device traces captured for step {step}") # only consider traces for the current choose step # and only consider existing devices (whyever it's possible to have -1 in the up set) take_traces = {osdid: constraining_traces[osdid] for osdid in root_osds if osdid != -1} # we just processed the root depth rule_step += 1 elif step["op"].startswith("choose"): # find the new tree_depth by looking how far we need to step for the choosen next bucket choose_type = step["type"] steps_taken = -1 for constraining_trace in take_traces.values(): # cut each trace at the current tree depth # in that part of the trace, walk forward until we can "choose" the the item we're looking for. # how many steps we took is then returned. used_steps = self.use_item_type(item_uses, constraining_trace[tree_depth:], choose_type, rule_step) if used_steps is None: raise RuntimeError(f"could not find item type {choose_type} " f"requested by rule step {step}") if steps_taken != -1 and used_steps != steps_taken: raise RuntimeError(f"for trace {constraining_trace} we needed {used_steps} steps " f"to reach {choose_type}, " f"but for the previous trace we took {steps_taken} steps!") steps_taken = used_steps # how many layers we went down the tree, i.e. how many entries in each trace did we proceed tree_depth += steps_taken # at what crush hierarchy layer are we now? rule_tree_depth.append(tree_depth) rule_step += 1 elif step["op"] == "emit": # we may not have reached osd tree level, so we step down further # if we have reached it already, we'll take 0 steps. steps_taken = -1 for constraining_trace in take_traces.values(): used_steps = self.use_item_type(item_uses, constraining_trace[tree_depth:], "osd", rule_step) if used_steps is None: raise RuntimeError(f"could not find trace steps down to osd" f"requested by rule step {step}") if steps_taken != -1 and used_steps != steps_taken: raise RuntimeError(f"for trace {constraining_trace} we needed {used_steps} steps " f"to reach {choose_type}, " f"but for the previous trace we took {steps_taken} steps!") steps_taken = used_steps rule_tree_depth.append(tree_depth + steps_taken) take_rule_step_count[take_index] = rule_step - take_rule_steps[take_index] + 1 rule_step += 1 emit = True # we now emitted this many devices - so we shift what device # of the pg we're looking at now. devices_chosen += root_device_count # did we enter remember the tree depth for each rule step? assert len(rule_tree_depth) == rule_step # did we remember how often items were reused in a rule step? assert len(item_uses) == rule_step, f"len(item_uses{item_uses}) != (rule_step={rule_step})" # this take must have as many steps as the max-reuse list has items assert take_rule_step_count[take_index] == len(max_reuses[take_index]),\ f"{take_rule_step_count[take_index]} != {len(max_reuses[take_index])}" # check, for the current take, if collected item uses respect the max_reuses_per_step for idx, step_reuses in enumerate(max_reuses[take_index]): for item, uses in item_uses[take_rule_step + idx].items(): # uses may be <= since crush rules can emit more osds than the pool size needs if uses > step_reuses: print(f"rule:\n{pformat(self.rule)}") print(f"at take: {take_index}") print(f"reuses: {max_reuses}") print(f"item_uses: {pformat(item_uses)}") print(f"constraining_traces: {pformat(constraining_traces)}") raise RuntimeError(f"during emit, rule take {take_index} at step " f"{take_rule_step + idx} item {item} was used {uses} > {step_reuses} expected") else: raise RuntimeError(f"unknown crush operation encountered: {step['op']}") if not emit: raise RuntimeError("no emit in crush rule processed") # we should have processed exactly pool-size many devices assert devices_chosen == self.pool_size, f"devices_chosen={devices_chosen} != poolsize={self.pool_size}" assert devices_chosen == len(constraining_traces) + pg_missing_up_osds, f"not enough traces collected for {self.pg} on {self.pg_osds}" # for each rule step there should be a tree depth entry assert len(rule_tree_depth) == rule_step # for each rule step, we should have registered item usages assert len(item_uses) == rule_step # we should have processed all takes, hence all per-take reuse constraints assert (take_index + 1) == len(max_reuses), f"{take_index + 1} != {len(max_reuses)}" self.constraining_traces = constraining_traces # osdid->crush-root-trace self.rule_tree_depth = rule_tree_depth # rule_step->tree_depth self.max_reuses = max_reuses # take_index->rule_step->allowed_item_reuses self.item_uses = item_uses # rule_step->{item->use_count} self.device_take_index = device_take_index # osdid->take_index self.take_rule_steps = take_rule_steps # take_index -> rule_step of take beginning self.take_rule_step_count = take_rule_step_count # take_index -> rule_step_count logging.debug(strlazy(lambda: f"crush check preparation done: rule_tree_depth={rule_tree_depth} item_uses={item_uses}")) def is_move_valid(self, old_osd, new_osd): """ verify that the given new osd does not violate the crush rules' constraints of placement. """ if new_osd in self.pg_osds: logging.debug(strlazy(lambda: f" crush invalid: osd.{new_osd} in {self.pg_osds}")) return False # the trace we no longer consider (since we replace the osd) old_trace = self.constraining_traces[old_osd] # what root name is the old osd in? # the new one needs to be under the same root. root_name = self.osd_roots[old_osd] if new_osd not in self.osd_candidates[root_name]: raise RuntimeError(f"target osd.{new_osd} not in same crush root as source") # which "take" was the old osd in - we need to respect the same "take" take_index = self.device_take_index[old_osd] # create trace for the replacement candidate new_trace = self.cluster.trace_crush_root(new_osd, root_name) if new_trace is None: # new_osd should never be tried as candidate if it's not part of root_name raise RuntimeError(f"no trace found for {new_osd} up to {root_name}") logging.debug(strlazy(lambda: f" trace for old osd.{old_osd}: {old_trace}")) logging.debug(strlazy(lambda: f" trace for new osd.{new_osd}: {new_trace}")) overuse = False # perform the crush steps again, checking for item reusages on each layer. step_start = self.take_rule_steps[take_index] step_count = self.take_rule_step_count[take_index] for idx, tree_stepwidth in enumerate(self.rule_tree_depth[step_start:step_start + step_count]): use_max_allowed = self.max_reuses[take_index][idx] # as we would remove the old osd trace, # the item would no longer be occupied in the new trace old_item = old_trace[tree_stepwidth]["id"] # this trace now adds to the item uses: new_item = new_trace[tree_stepwidth]["id"] # how often is new_item used now? # if not used at all, it's not in the dict. uses = self.item_uses[step_start + idx].get(new_item, 0) # we wanna use the new item now. # the use count doesn't increase when we already use the item on the current level if old_item != new_item: uses += 1 # if we used it, it'd be violating crush if uses > use_max_allowed: logging.debug(strlazy(lambda: (f" item reuse check fail: osd.{old_osd}@[{tree_stepwidth}]={old_item} -> " f"osd.{new_osd}@[{tree_stepwidth}]={new_item} x uses={uses} > max_allowed={use_max_allowed}"))) overuse = True break else: logging.debug(strlazy(lambda: (f" item reuse check ok: osd.{old_osd}@[{tree_stepwidth}]={old_item} -> " f"osd.{new_osd}@[{tree_stepwidth}]={new_item} x uses={uses} <= max_allowed={use_max_allowed}"))) return not overuse def get_placement_variance(self, osd_from=None, osd_to=None): """ calculate the variance of weighted OSD usage for all OSDs that are candidates for this PG osd_from -> osd_to: how would the variance look, if we had moved data. if only osd_from given: restrict the osd candidates to that source osd's crush tree. """ simulate_move = False if osd_from and osd_to: simulate_move = True pg_shardsize = self.cluster.get_pg_shardsize(self.pg) if osd_from is not None: candidates = self.get_osd_candidates(osd_from) else: candidates = itertools.chain(*self.osd_candidates.values()) osds_used = list() for osd in candidates: delta = 0 if simulate_move: if osd == osd_from: delta = -pg_shardsize elif osd == osd_to: delta = pg_shardsize if self.cluster.osds[osd]['weight'] == 0 or self.cluster.osds[osd]['crush_weight'] == 0: # relative usage of weight 0 is impossible continue osd_used = self.pg_mappings.get_osd_usage(osd, add_size=delta) osds_used.append(osd_used) var = stat_variance(osds_used) return var class PGMappings: """ PG mapping simulator used to calculate device usage when moving around pgs. """ def __init__(self, cluster: ClusterState, only_crushclasses: Optional[set] = None, only_poolids: Optional[set] = None, osdused_method: OSDUsedMethod = OSDUsedMethod.SHARDSUM, osd_from_choice_method: OSDFromChoiceMethod = OSDFromChoiceMethod.ALTERNATE, pg_choice_method: PGChoiceMethod = PGChoiceMethod.LARGEST, ignore_cluster_upmaps: bool = False, emit_ignored_upmaps: bool = False, add_upmaps: Optional[str] = None, analyzer: Optional["MappingAnalyzer"] = None, init_analyzer: Optional["MappingAnalyzer"] = None, need_simulation: bool = True): # the "real" devices, just used for their "fixed" properties like # device size self.cluster = cluster # how to simulate the osd usage self.osdused_method = osdused_method # how to select a osd where we then can move a pg from self.osd_from_choice_method = osd_from_choice_method # how to select pgs to move around self.pg_choice_method = pg_choice_method # consider only these crushclasses self.only_crushclasses = only_crushclasses or cluster.crushclass_osds.keys() # which pools are candidates for mapping changes self.pool_candidates = set() # up state: osdid -> {pg, ...} self.osd_pgs_up = defaultdict(set) # acting state: osdid -> {pg, ...} self.osd_pgs_acting = defaultdict(set) # choose the up mapping, since we wanna optimize the "future" cluster # up pg mapping: pgid -> [up_osd, ...] self.pg_mappings = dict() # osdid -> poolid -> count of shards # is updated for each move! self.osd_pool_up_shard_count = defaultdict(lambda: defaultdict(lambda: 0)) # osdid -> poolid -> count of shards # not updated for each moves - is the current acting mapping! self.osd_pool_acting_shard_count = defaultdict(lambda: defaultdict(lambda: 0)) # osdid -> sum size of all shards on the osd # used for usage estimation method SHARDSUM self.osd_shardsize_sum_up = defaultdict(lambda: 0) self.osd_shardsize_sum_acting = defaultdict(lambda: 0) # {osd we moved data to/from: amount moved} # used for both SHARDSUM and DELTA osd usage estimation methods self.osd_size_change = defaultdict(lambda: 0) # collect mapped sizes for SHARDSUM estimation for pgid, pginfo in self.cluster.pgs.items(): up_osds = pginfo["up"] acting_osds = pginfo["acting"] self.pg_mappings[pgid] = list(up_osds) poolid = pool_from_pg(pgid) for osdid in up_osds: self.osd_pgs_up[osdid].add(pgid) self.osd_pool_up_shard_count[osdid][poolid] += 1 self.osd_shardsize_sum_up[osdid] += self.cluster.get_pg_shardsize(pgid) for osdid in acting_osds: self.osd_pgs_acting[osdid].add(pgid) self.osd_pool_acting_shard_count[osdid][poolid] += 1 self.osd_shardsize_sum_acting[osdid] += self.cluster.get_pg_shardsize(pgid) # don't prepare as much if we don't want to simulate movements. self.enable_simulation = need_simulation if add_upmaps or ignore_cluster_upmaps or analyzer is not None: self.enable_simulation = True # un-do all present upmap items in the cluster self.ignore_cluster_upmaps = ignore_cluster_upmaps # create new upmap items to un-do existing upmap items # -> creates movements so a cluster would become upmap-free self.emit_ignored_upmaps = emit_ignored_upmaps # movements we did by apply_remap # pg->{osd_from: osd_to, ...} self._remaps = defaultdict(dict) # combined version of self._remaps and cluster.upmap_items # caches the merged remaps self._upmap_items = defaultdict(dict) ## things only needed for simulation if not self.enable_simulation: return # collect crushclasses from all enabled pools self._enabled_crushclasses = cluster.crushclass_osds.keys() self.pool_candidates = cluster.pools.keys() # if we have pool id restrictions if only_poolids: # only keep crushclasses used in the given pools self._enabled_crushclasses = set() for poolid in only_poolids: pool = cluster.pools[poolid] pool_size = pool['size'] pool_crushrule = cluster.crushrules[pool['crush_rule']] for root_name in root_uses_from_rule(pool_crushrule, pool_size)[0].keys(): for osdid in cluster.candidates_for_root(root_name).keys(): self._enabled_crushclasses.add(cluster.osd_crushclass[osdid]) self.pool_candidates = only_poolids # if we have a crushclass restriction, figure out which pools can be affected. # this further reduces the pool candidates if only_crushclasses: pool_crush_candidates = set() # figure out which pools are affected by these crushclasses for poolid, poolprops in cluster.pools.items(): rootweights = rootweights_from_rule(cluster.crushrules[poolprops['crush_rule']], poolprops["size"]) # crush root names used in that pool for crushroot in rootweights.keys(): # cut deviceclass from crush root name -> default~hdd -> get class hdd classsplit = crushroot.split('~') if len(classsplit) == 2: rootname, deviceclass = classsplit if deviceclass in only_crushclasses: pool_crush_candidates.add(poolid) else: # no explicit device class given in crush rule # -> check if any of this root's candidate's osd crushclass matches the enabled ones for osdid in cluster.candidates_for_root(crushroot): if cluster.osd_crushclass[osdid] in only_crushclasses: pool_crush_candidates.add(poolid) break # filter the pool candidates by the crush constraint. self.pool_candidates &= pool_crush_candidates self._enabled_crushclasses &= only_crushclasses # collect all possible osd candidates for discovered crushclasses remove weight=0 # which we can consider for moving from/to self.osd_candidates = set() # crushclass -> {osdid, ...} self.osd_candidates_class = dict() for crushclass in self._enabled_crushclasses: crushclass_osds = set(self.cluster.get_crushclass_osds(crushclass, skip_zeroweight=True)) self.osd_candidates.update(crushclass_osds) self.osd_candidates_class[crushclass] = crushclass_osds # pg movement statistics tracking # # analysis order: # simulate cluster state # ->init_analysis if emit_ignored_upmaps # revert cluster upmaps # ->init_analysis if not emit_ignored_upmaps # ->analysis # file upmaps (and in apply_remap, record moves) # # analysis for the base state self.init_analyzer = init_analyzer # analysis for the current state self.analyzer = analyzer # which movements did we un-do: pgid->{from: to}, ... # (i.e. we applied the opposite) self.reverted_upmaps = defaultdict(dict) if emit_ignored_upmaps and not ignore_cluster_upmaps: raise RuntimeError('when emitting ignored upmaps, upmap ignoring must be enabled') if ignore_cluster_upmaps: # base state without upmaps -> analyze if emit_ignored_upmaps: if self.init_analyzer is not None: self.init_analyzer.analyze(self) # revert existing upmaps in the mappings and size estimation self.revert_upmaps(emit_ignored_upmaps, skip_failures=True) if not emit_ignored_upmaps and self.init_analyzer is not None: # ignored upmaps are not emitted -> analyze after the reverts were done self.init_analyzer.analyze(self) if self.analyzer is not None: self.analyzer.analyze(self) if add_upmaps: # add custom movements self.apply_file_upmaps(add_upmaps) def revert_upmaps(self, emit: bool = False, skip_failures: bool = False): """ un-do all adjustments from the default crush-mapping state. """ if emit: def movement_applier(pgid, osdfrom, osdto): return self.apply_remap(pgid, osdfrom, osdto, revert=True) else: movement_applier = self._move_pg move_count = 0 move_amount = 0 for pgid, pg_upmaps in self.cluster.upmap_items.items(): for osd_from, osd_to in pg_upmaps.items(): move_size = self.cluster.get_pg_shardsize(pgid) logging.debug(strlazy(lambda: (f"move pg {pgid} from osd.{osd_to}->osd.{osd_from} " f"(size {pformatsize(move_size)}) to revert{' and emit' if emit else ''} upmap"))) move_ok, msg = movement_applier(pgid, osd_to, osd_from) if not move_ok: info = lambda: ( f'failed reverting move of {pgid} from osd.{osd_to} to osd.{osd_from}: {msg}\n' f"pg_upmaps[{pgid}] = {pg_upmaps}\n" f"cluster[{pgid}] = {self.cluster.pgs[pgid]['up']!r}\n" f"pgmappings[{pgid}] = {self.get_mapping(pgid)!r}\n" ) if skip_failures: logging.warning(strlazy(info)) else: raise RuntimeError(info()) self.reverted_upmaps[pgid][osd_from] = osd_to move_count += 1 move_amount += move_size logging.info(strlazy(lambda: f"reverted {move_count} moves ({pformatsize(move_amount, 2)})")) def apply_file_upmaps(self, filename): """ simulate the movement of the given upmap-items in a file. """ upmap_re = re.compile(r'^ceph osd pg-upmap-items ([0-9a-f\.]+) ([ 0-9]+)$') upmap_rm_re = re.compile(r'^ceph osd rm-pg-upmap-items ([0-9a-f\.]+)$') with open(filename) as upmap_file: move_count = 0 move_amount = 0 for upmap_line in upmap_file: if upmap_line.strip().startswith('#'): continue logging.debug( strlazy(lambda: (f"line {upmap_line!r}")) ) items_match = upmap_re.match(upmap_line) items_rm_match = upmap_rm_re.match(upmap_line) moves = list() if items_match: pgid = items_match.group(1) osds = items_match.group(2).split() if len(osds) % 2 > 0: raise RuntimeError(f"for pg={pgid}: uneven osd count") for i in range(int(len(osds) // 2)): # group the movement pairs moves.append((int(osds[i*2]), int(osds[i*2 + 1]))) elif items_rm_match: pgid = items_rm_match.group(1) reverted = self.reverted_upmaps.get(pgid, {}) for osd_from, osd_to in self.cluster.upmap_items.get(pgid, {}).items(): if reverted.get(osd_from) == osd_to: # if we reverted this move already, no need to move again. continue # assuming no pg-upmap-items line for the same pg was fed in previously # that added more mappings... # TODO: record the upmap-items we saw so far with previous commands, # so we can revert them here. moves.append((osd_to, osd_from)) else: raise RuntimeError(f'unknown line content {upmap_line}') logging.debug( strlazy(lambda: (f"process pgid={pgid} moves: {moves}")) ) for osd_from, osd_to in moves: # skip the move if it's already 'applied' # it was already considered when the cluster state was dumped # when generating file_upmaps, previously existing upmaps are preserved # and hence can't be applied again. cluster_upmaps = self.cluster.upmap_items.get(pgid, {}) reverted_upmaps = self.reverted_upmaps.get(pgid, {}) if cluster_upmaps.get(osd_from) == osd_to and\ reverted_upmaps.get(osd_from) != osd_to: continue # when we have a->b and now want to do a->c, we have to do b->c instead. pg_upmaps = self.get_pg_upmap(pgid) while True: prev_remap = pg_upmaps.get(osd_from) if prev_remap is not None: osd_from = prev_remap else: break move_ok, msg = self.apply_remap(pgid, osd_from, osd_to) if move_ok: move_count += 1 move_size = self.cluster.get_pg_shardsize(pgid) move_amount += move_size logging.debug( strlazy(lambda: (f"move pg={pgid} (size {pformatsize(move_size)}) from {osd_from}->{osd_to} " f"now mapped to: {self.get_mapping(pgid)}")) ) else: logging.warning( strlazy(lambda: (f"skipped moving pg={pgid} from {osd_from}->{osd_to}: {msg}\n" f"currently mapped to: {self.get_mapping(pgid)}\n" f"cluster mapping: {self.cluster.pg_osds_up[pgid]}\n" f"cluster upmaps for pg={pgid}: {cluster_upmaps}\n" f"recorded remaps for pg={pgid}: {self._remaps[pgid]}\n" f"reverted upmaps for pg={pgid}: {reverted_upmaps}"))) logging.info(strlazy(lambda: f"applied {move_count} moves from file ({pformatsize(move_amount, 2)})")) def save_mappings(self, filename): """ save the current mapping (useful for comparing clusters when debugging...) saves {pgid} {uposds} on each line. """ with open_or_stdout(filename, "w") as outfd: outfd.write("pg mappings:\n") for pgid, uposds in sorted(self.pg_mappings.items()): outfd.write(f"{pgid: <10} {uposds}\n") outfd.write("upmaps active:\n") for pgid, upmaps in sorted(self.get_upmaps(all_pgs=True).items()): if not upmaps: continue outfd.write(f"{pgid: <10} {upmaps}\n") def _move_pg(self, pgid, osd_from, osd_to) -> bool: """ simulate a remap pg from one osd to another. tracked sizes are estimated, but this is not recorded as a upmap-movement. you should probably use apply_remap to record this movement in the upmap list! this function does not record the movement in the upmap adjustment list (so one can revert upmaps without emitting that). """ pg_mapping = self.pg_mappings[pgid] if pgid not in self.osd_pgs_up[osd_from]: return False, f"move {pgid} on {pg_mapping} doesn't have source osd.{osd_from}" if osd_to in pg_mapping: return False, f"move {pgid} on {pg_mapping} from osd.{osd_from}->osd.{osd_to} with target already used" for i in range(len(pg_mapping)): if pg_mapping[i] == osd_from: pg_mapping[i] = osd_to break else: raise RuntimeError(f"osd_from={osd_from} not in {pg_mapping}") self.osd_pgs_up[osd_from].remove(pgid) self.osd_pgs_up[osd_to].add(pgid) # adjust the tracked sizes shard_size = self.cluster.get_pg_shardsize(pgid) self.osd_size_change[osd_from] -= shard_size self.osd_size_change[osd_to] += shard_size poolid = pool_from_pg(pgid) self.osd_pool_up_shard_count[osd_from][poolid] -= 1 self.osd_pool_up_shard_count[osd_to][poolid] += 1 # flush caches self._upmap_items.pop(pgid, None) self.get_class_osd_usages.cache_clear() self.get_osd_usage.cache_clear() self.get_osd_usage_size.cache_clear() self.get_pool_max_avail_limited.cache_clear() self.get_pool_max_avail_weight.cache_clear() return True, "" def apply_remap(self, pgid: str, osd_from: int, osd_to: int, revert: bool = False) -> bool: """ simulate a remap pg from one osd to another. this updates the emitted upmap mappings. """ # transfer between osds move_ok, msg = self._move_pg(pgid, osd_from, osd_to) if move_ok: logging.debug(strlazy(lambda: f"recording move of pg={pgid} from {osd_from}->{osd_to}")) # to track, insert a new mapping pair self._remaps[pgid][osd_from] = osd_to if self.analyzer is not None: if not revert: # TODO: maybe allow reverted move analysis if requested by cli arg self.analyzer.record_move(pgid, osd_from, osd_to) return move_ok, msg def get_enabled_crushclasses(self): """ osds of which crushclasses are we gonna touch? """ return self._enabled_crushclasses def get_potentially_affected_pools(self): """ returns set(poolid) for pools that may be adjusted through changes in the mappings """ return self.pool_candidates def get_osd_usages(self): """ calculate {osdid -> usage percent} """ return {osdid: self.get_osd_usage(osdid) for osdid in self.osd_candidates} @lru_cache(maxsize=None) def get_class_osd_usages(self): """ calculate {crushclass -> {osdid -> usage percent}} """ ret = dict() for crushclass in self._enabled_crushclasses: ret[crushclass] = {osdid: self.get_osd_usage(osdid) for osdid in self.osd_candidates_class[crushclass]} return ret def get_mapping(self, pgid): """ return the up set for a given pgid. returns [uposd, ...] """ return self.pg_mappings[pgid] def get_mappings(self): """ get the current mapping (useful for comparing clusters when debugging...) returns {"pgid": [uposd, ...]} """ return self.pg_mappings def get_osd_pgs_up(self, osdid): return self.osd_pgs_up[osdid] def get_osd_pgs_acting(self, osdid): return self.osd_pgs_acting[osdid] def get_pg_move_candidates(self, osdid): return PGCandidates(self, osdid, pool_candidates=self.pool_candidates, pg_choice_method=self.pg_choice_method) def get_osd_from_candidates(self): """ generate source candidates to move a pg from. method: fullest use the relatively fullest osd. method: limiting first, use the limiting osds from predicted pool usage, then continue with the relatively fullest osds. method: limit_hybrid: pick from limiting, then fullest, then limiting, etc. """ alternate = False pool_limit = False fullest_osd = False if self.osd_from_choice_method == OSDFromChoiceMethod.LIMITING: pool_limit = True elif self.osd_from_choice_method == OSDFromChoiceMethod.FULLEST: fullest_osd = True elif self.osd_from_choice_method == OSDFromChoiceMethod.ALTERNATE: alternate = True pool_limit = True fullest_osd = True else: raise RuntimeError(f"unknown OSDFromChoiceMethod: {self.osd_from_choice_method}") emitted = set() def limiting_osds(): # osdid -> sum of normalized limit_positions # most limiting one time: 1, least limiting one time: 1/osdcount # -> for each pool, sum those up. limiting_osds_weight = defaultdict(lambda: 0) for poolid in self.pool_candidates: # all limiting osds _min_avail, limiting_osdids = self.get_pool_max_avail_limited(poolid, negative_ok=True, all_limitings=True) osdcount = len(limiting_osdids) for idx, osdid in enumerate(limiting_osdids): limiting_osds_weight[osdid] += (1 - idx) / osdcount limiting_osd_usages = {osdid: self.get_osd_usage(osdid) for osdid in limiting_osds_weight.keys()} pool_count = len(self.pool_candidates) def cmp_limit_usage(osdid_usage): osdid, usage = osdid_usage # weight the usage (in percent) by the pool limiting percentage # if the same device limited 3 pools, it would be device_usage * 3/poolcount. return (limiting_osds_weight[osdid] / pool_count) * usage # not only sort by size, but prefer osds that limit many pools # osd_usage * pools_limited/pool_count for osdid, usage in sorted(limiting_osd_usages.items(), key=cmp_limit_usage, reverse=True): yield osdid, usage def fullest_osds(): for osdid, usage in sorted(self.get_osd_usages().items(), key=lambda osd: osd[1], reverse=True): yield osdid, usage def fullest_osds_fairclasses(): class_usages = self.get_class_osd_usages() class_variance = dict() class_osds_sorted = dict() for cls, usages in class_usages.items(): class_variance[cls] = stat_variance(usages.values()) sorted_osds = list(sorted(usages.items(), key=lambda osd: osd[1], reverse=True)) class_osds_sorted[cls] = sorted_osds # most varying crushclass first class_order = list(clsname for clsname, _ in sorted(class_variance.items(), key=lambda v: v[1], reverse=True)) # yield devices of high crushclass variance first class_pos = 0 class_idx = {cls: 0 for cls in class_osds_sorted.keys()} while True: if not class_order: # all classes exhausted, no more osd candidates return cls = class_order[class_pos % len(class_order)] pos = class_idx[cls] yield class_osds_sorted[cls][pos] if pos + 1 >= len(class_osds_sorted[cls]): del class_order[class_order.index(cls)] else: class_idx[cls] += 1 iterators = list() if pool_limit: iterators.append(limiting_osds()) if fullest_osd: # simpler: just the fullest osd while ignoring crush classes # iterators.append(fullest_osds()) # better: the fullest osd of each crush class, interleaved iterators.append(fullest_osds_fairclasses()) if alternate: def alternator(iterators): for value in zip_longest(*iterators): for elem in value: if elem is not None: yield elem iterators = [alternator(iterators)] for osdid, usage in chain(*iterators): if osdid in emitted: continue emitted.add(osdid) yield osdid, usage def get_osd_target_candidates(self, osd_candidates: Optional[dict] = None, move_pg: Optional[str] = None): """ generate target candidates to move a pg to. if given a pg to be moved, add that to the size estimation so we sort candidates by fullness "after" the move. method: emptiest use the relatively emptiest osd. """ move_pg_shardsize = 0 if move_pg: move_pg_shardsize = self.cluster.get_pg_shardsize(move_pg) # {osdid: usage_in_percent} # either the supplied candidates, or all possible osdids = osd_candidates or self.osd_candidates candidates = ((osdid, self.get_osd_usage(osdid, add_size=move_pg_shardsize)) for osdid in osdids) # emit empty first for osdid, usage in sorted(candidates, key=lambda osd: osd[1], reverse=False): yield osdid, usage @lru_cache(maxsize=None) def get_osd_usage_size(self, osdid, add_size=0, pgstate=PGState.UP): """ returns the occupied space on an osd. can be calculated by two variants by estimating the placed shards: - either by summing up all placed pg shard sizes (SHARDSUM) - or by subtracting moved-away shards from the OSD-allocated space (DELTA) temporarily add to the used data amount with add_size to test the resulting size. """ if self.osdused_method == OSDUsedMethod.DELTA: used = self.cluster.osds[osdid]['device_used'] if pgstate == PGState.UP: if not self.enable_simulation: raise RuntimeError('OSD usage method DELTA for --pgstate=up needs simulation') # this returns the estimated up-size by adjusting for ongoing transfers # (outgoing ones to be deleted, incoming ones to be completed) used += self.cluster.osd_transfer_remainings[osdid] elif pgstate == PGState.ACTING: # used already has the correct current amount. pass else: raise RuntimeError('unknown pgstate') elif self.osdused_method == OSDUsedMethod.SHARDSUM: # calculate the osd usage by summing all the mapped PGs shardsizes # used = sum(pgs on the osd) # no need to account for pending pg moves - we don't use the device usage report. if pgstate == PGState.UP: used = self.osd_shardsize_sum_up[osdid] elif pgstate == PGState.ACTING: used = self.osd_shardsize_sum_acting[osdid] else: raise RuntimeError('invalid pgstate') else: raise RuntimeError(f"unknown osd usage estimator: {self.osdused_method!r}") # data we moved around so far in the simulation used += self.osd_size_change[osdid] # testing size changes used += add_size return used @lru_cache(maxsize=None) def get_osd_usage(self, osdid, add_size=0): """ returns the occupied OSD space in percent. the fullness is calculated without the full_ratio loss. so when fullratio is 0.95 and and device was written to its limit, this function returns 0.95 instead of 1. """ osd_size = self.cluster.get_osd_size(osdid, adjust_full_ratio=False) if osd_size == 0: raise RuntimeError(f"getting relative usage of osd.{osdid} which has size 0 impossible") used = self.get_osd_usage_size(osdid, add_size) # make it relative used *= 100 / osd_size # logging.debug(f"{osdid} {pformatsize(used)}/{pformatsize(osd_size)} = {used/osd_size * 100:.2f}%") return used def get_cluster_variance(self): """ get {crushclass -> variance} for given mapping """ variances = dict() for crushclass, usages in self.get_class_osd_usages().items(): variances[crushclass] = stat_variance(usages.values()) return variances def get_remaps_shardsize_count(self): """ return (size_sum, count) of all calculated movements """ move_sum = 0 move_count = 0 for pgid, moves in self._remaps.items(): moves = remaps_merge(moves) count = len(moves) move_count += count move_sum += count * self.cluster.get_pg_shardsize(pgid) return move_sum, move_count def get_upmaps(self, all_pgs=False): """ get all applied mappings for generating movement instructions. all_pgs: True - return for all pgs, False - return for balancing-adjusted pgs only. return {pgid -> {map_from: map_to, ...}} """ upmap_results = dict() if all_pgs: pgs = self.pg_mappings.keys() else: pgs = self._remaps.keys() # only look at newly adjusted pgs for pgid in pgs: upmap_results[pgid] = self.get_pg_upmap(pgid) return upmap_results def get_upmap_items_commands(self, all_pgs=False): """ all_pgs: if true, return for all known pgs, else return for adjusted pgs only. return ceph upmap_items instructions """ for pgid, upmaps in self.get_upmaps(all_pgs).items(): if upmaps: upmap_str = " ".join([f"{osd_from} {osd_to}" for (osd_from, osd_to) in upmaps.items()]) yield f"ceph osd pg-upmap-items {pgid} {upmap_str}" else: # cluster had previous upmaps for this pg if len(self._get_cluster_upmaps(pgid)) > 0: yield f"ceph osd rm-pg-upmap-items {pgid}" def _get_cluster_upmaps(self, pgid): # current_upmaps are is [(osdfrom, osdto), ...] if self.ignore_cluster_upmaps and not self.emit_ignored_upmaps: # since we wanted to start with a "non-upmapped" cluster, # let's simulate there's no current upmaps. # (in the constructor we already inverted these upmaps) return {} else: return self.cluster.upmap_items.get(pgid, {}) def get_pg_upmap(self, pgid): """ get all applied mappings for a pg return {map_from: map_to, ...} """ cached = self._upmap_items.get(pgid) if cached is not None: return cached # this is already redundancy-filtered previous_remaps = self._get_cluster_upmaps(pgid) # this may not be redundancy-filtered new_remaps = self._remaps.get(pgid, {}) if not new_remaps and not previous_remaps: return {} # merge new upmaps # this actually modifies the self._remaps, intentionally. new_remaps = remaps_merge(new_remaps, in_place=True) if not previous_remaps: ret = new_remaps else: # now, let's merge new_remaps onto previous_remaps into results: ret = remaps_merge(previous_remaps.copy(), new_remaps) # remember in cache self._upmap_items[pgid] = ret return ret @lru_cache(maxsize=None) def get_pool_max_avail_weight(self, poolid, pgstate: PGState = PGState.UP, negative_ok: bool = False): """ given a pool id, predict how much space is available with the current mapping. similar how `ceph df` calculates available pool size approach using just crush weights. this calculates maximum available space if the pool had infinity many pgs. """ pool = self.cluster.pools[poolid] blowup_rate = pool['blowup_rate'] pool_size = pool['size'] # scale by objects_there/objects_expected if pool['num_object_copies'] > 0: blowup_rate *= (pool['num_object_copies'] - pool['num_objects_degraded']) / pool['num_object_copies'] # cf. PGMap::get_rule_avail # {osdid -> osd_weight_probability=[selectionweight/allcandidateweights]} osdid_candidates = self.cluster.candidates_for_pool(poolid) # how much space is available in a pool? # this is determined by the "fullest" osd: # we try for each osd how much it can take, # given its selection probabiliy (due to its weight) min_avail = 0 limiting_osd = None for osdid, osdweight in osdid_candidates.items(): # since we will adjust weight below, we need the raw device size # shrink the device size by configured cluster full_ratio device_size = self.cluster.get_osd_size(osdid, adjust_full_ratio=True) used_size = self.get_osd_usage_size(osdid, pgstate=pgstate) usable = device_size - used_size # how much space will be occupied on this osd by the pool? # for that, take the amount of current osd weight into account, # relative to whole pool weight possible for all osds. # how much of the whole distribution will be "this" osd. weighted_usable = usable / (osdweight * blowup_rate) if False: print( f"usable on {osdid:>4}: " f"size={pformatsize(device_size):>5} " f"used={pformatsize(used_size):>8} " f"usable={pformatsize(usable):>7} " f"blowup={blowup_rate:>7} " f"weight={osdweight:>7} " f"weighted={pformatsize(weighted_usable, 2):>7} " ) if limiting_osd is None or weighted_usable < min_avail: min_avail = weighted_usable limiting_osd = osdid if not negative_ok and min_avail < 0: min_avail = 0 return min_avail, limiting_osd @lru_cache(maxsize=None) def get_pool_max_avail_limited(self, poolid, pgstate: PGState = PGState.UP, negative_ok: bool = False, all_limitings: bool = False, add_size: Optional[Tuple[int, int]] = None): """ given a pool id, predict how much space is available with the current mapping. uses the current pg distribution on an osd and hence free_osd_space * (pool_pg_num / ((count(pool_pgs) on this osd) * blowup_rate)) this calculates maximum for the current pg placement. add_size (osdid, size): allows to simulate osd usage changes while testing for the limits. all_limitings: return a list of limiting osds, rather just the one (more compute time) returns (max_avail, [most_limiting_osdid, ...]) returns (max_avail, limiting_osdid) """ pool = self.cluster.pools[poolid] pool_pg_num = pool['pg_num'] pool_size = pool['size'] blowup_rate = pool['blowup_rate'] num_shards = pool_pg_num * pool_size osdid_candidates = self.cluster.candidates_for_pool(poolid).keys() # how much space is available in a pool? # we try for each osd how much it can take, # given its selection probabiliy (due to pg distributions) min_avail = 0 limiting_osd = None # osdid -> available space osds_avail = dict() if pgstate == PGState.UP: shard_counts = self.osd_pool_up_shard_count elif pgstate == PGState.ACTING: shard_counts = self.osd_pool_acting_shard_count else: raise RuntimeError() for osdid in osdid_candidates: # how much space will be occupied on this osd by the pool? # for that, take the amount of current osd weight into account, # relative to whole pool weight possible for all osds. # how much of the whole distribution will be "this" osd. pool_pg_shards_on_osd = shard_counts[osdid][poolid] if pool_pg_shards_on_osd == 0: # no pg of this pool is on this osd. # so this osd won't be filled up by this pool. continue # raw device size as reported by osd # shrink the device size by configured cluster full_ratio device_size = self.cluster.get_osd_size(osdid, adjust_full_ratio=True) size_adjust = 0 if not add_size or (add_size[0] == osdid) else add_size[1] used_size = self.get_osd_usage_size(osdid, pgstate=pgstate, add_size=size_adjust) usable = device_size - used_size # this is the "real" size prediction of an osd: # (usable_on_osd * pool_usable_rate) / osd_probability_for_getting_data_of_pool placement_probability = pool_pg_shards_on_osd / num_shards predicted_usable = usable / (placement_probability * blowup_rate) if limiting_osd is None or predicted_usable < min_avail: min_avail = predicted_usable limiting_osd = osdid osds_avail[osdid] = predicted_usable if False: print( f"usable on {osdid:>4}: " f"size={pformatsize(device_size):>5} " f"used={pformatsize(used_size):>8} " f"usable={pformatsize(usable):>7} " f"raw_blowup={blowup_rate:8.03f} " f"on_osd={pool_pg_shards_on_osd:>4} " f"probability={placement_probability:>7} " f"weighted={pformatsize(predicted_usable, 2):>7} " ) if limiting_osd is None: raise RuntimeError(f"no pg of poolid={poolid} mapped on any osd to use space") # if mappings are bigger than a device size can take, # available space becomes negative... if not negative_ok and min_avail < 0: min_avail = 0 if all_limitings: limiting_osds = sorted(osds_avail.items(), key=lambda elem: elem[1]) assert limiting_osd == limiting_osds[0][0] return min_avail, [elem[0] for elem in limiting_osds] else: return min_avail, limiting_osd class PGShardProps: """ info about one shard of a PG, i.e. the PG piece present on an OSD. """ def __init__(self, size, remapped, upmap_count): self.size = size # bool: true, if the shard is not yet fully moved, # i.e. its in up, but not in acting of the osd # so we can prefer moving a pg that's not yet copied fully self.remapped = remapped # int: number of remappings the pg currently has # the more upmap items it has, the farther it is away from crush # -> the lower, the better. self.upmap_count = upmap_count def __lt__(self, other): """ the "least" pg props entry is the one we try first. """ # TODO: smarter selection of pg movement candidates -> create weighted "score" to sort by # [x] order pgs by size, from big to small # [ ] order pgs by pg's num_omap_bytes # [ ] order pgs by object count? # [x] prefer pgs that are remapped (up != acting) # [x] prefer pgs that don't have upmaps already to minimize "distance" to crush mapping # TODO: for those, change the move loops: loop src, loop dst, loop candidate pgs. # [ ] prefer pgs that have a upmap item and could be removed (tricky - since we have to know destination OSD) # [ ] prefer pgs that have optimal moves: # prefer if predicted_target_usage < (current_target_usage + source_usage)/2 # [ ] prefer pgs whose upmap item uses the current osd as mapping target (-> so we don't add more items) # [ ] perfer pgs that have bad pool balance (i.e. their count on this osd doesn't match the expected one) (we enforce that constraint anyway, but we may never pick a pg from another pool that would be useful to balance) # [ ] only consider source/dest osds of a pg that was remapped, not any osd of the whole pg # "most important" sorting first # so if the "important" path is the same, sort by other means # prefer pgs that are remapped if self.remapped != other.remapped: return self.remapped if self.upmap_count != other.upmap_count: return self.upmap_count < other.upmap_count # prefer biggest sized pgs # TODO: integrate the automatic size selection feature if self.size != other.size: return self.size > other.size return False def __str__(self): return f"size={pformatsize(self.size)} remapped={self.remapped} upmaps={self.upmap_count}" class PGCandidates: """ Generate movement candidates to empty the given osd. """ def __init__(self, pg_mappings, osdid, pool_candidates, pg_choice_method): up_pgs = pg_mappings.get_osd_pgs_up(osdid) acting_pgs = pg_mappings.get_osd_pgs_acting(osdid) remapped_pgs = up_pgs - acting_pgs self.cluster = pg_mappings.cluster self.pg_candidates = list() self.pg_properties = dict() # we can move up pgs away - they are on the osd or planned to be on it. for pgid in up_pgs: pg_pool = pool_from_pg(pgid) if pool_candidates and pg_pool not in pool_candidates: continue self.pg_candidates.append(pgid) self.pg_properties[pgid] = PGShardProps( size=self.cluster.get_pg_shardsize(pgid), remapped=(pgid in remapped_pgs), # the shard is not yet fully moved upmap_count=len(pg_mappings.get_pg_upmap(pgid)) ) # best candidate first pg_candidates_desc = list(sorted(self.pg_candidates, key=lambda pg: self.pg_properties[pg])) # reorder potential pg_candidates by configurable approaches pg_walk_anchor = None # TODO: move these choice methods also into the PGShardProps comparison function if pg_choice_method == PGChoiceMethod.AUTO: # choose the PG choice method based on # the utilization differences of the fullest and emptiest candidate OSD. # if we could move there, most balance would be created. # use this to guess the shard size to move. best_target_osd, best_target_osd_usage = next(pg_mappings.get_osd_target_candidates()) osd_from_used_percent = pg_mappings.cluster.get_osd_usage(osdid) for idx, pg_candidate in enumerate(pg_candidates_desc): pg_candidate_size = self.pg_properties[pg_candidate].size # try the largest pg candidate first, and become smaller every step target_predicted_usage = pg_mappings.get_osd_usage(best_target_osd, add_size=pg_candidate_size) # if the optimal osd's predicted usage is < (target_usage + source_usage)/2 + limit # the whole PG fits (at least size-wise. the crush check comes later) # -> we try to move the biggest fitting PG first. mean_usage = (best_target_osd_usage + osd_from_used_percent) / 2 if target_predicted_usage <= mean_usage: logging.debug(strlazy(lambda: f" START with PG candidate {pg_candidate} due to perfect size fit " f"when moving to best osd.{best_target_osd}.")) pg_walk_anchor = idx break logging.debug(strlazy(lambda: f" SKIP candidate size estimation of {pg_candidate} " f"when moving to best osd.{best_target_osd}. " f"predicted={target_predicted_usage:.3f}% > mean={mean_usage:.3f}%")) if pg_walk_anchor is None: # no PG fitted, i.e. no pg was small enough for an optimal move. # so we use the smallest pg (which is still too big by the estimation above) # to work towards the optimal mean usage to achieve ideal balance. # in other words, overshoot the usage mean between the OSDs # by the minimally possible amount. self.pg_candidates = reversed(pg_candidates_desc) elif pg_choice_method == PGChoiceMethod.LARGEST: self.pg_candidates = pg_candidates_desc elif pg_choice_method == PGChoiceMethod.MEDIAN: # here, we decide to move not the largest/smallest pg, but rather the median one. # order PGs around the median-sized one # [5, 4, 3, 2, 1] => [3, 4, 2, 5, 1] pg_candidates_desc_sizes = [self.pg_properties[pg].size for pg in pg_candidates_desc] pg_candidates_median = statistics.median_low(pg_candidates_desc_sizes) pg_walk_anchor = pg_candidates_desc_sizes.index(pg_candidates_median) else: raise RuntimeError(f'unhandled shard choice {pg_choice_method!r}') # if we have a walk anchor, alternate pg sizes around that anchor point # and build the pg_candidates that way. if pg_walk_anchor is not None: self.pg_candidates = list() hit_left = False hit_right = False for walk_jump in A001057(): pg_walk_pos = pg_walk_anchor + walk_jump if hit_left and hit_right: break elif pg_walk_pos < 0: hit_left = True continue elif pg_walk_pos >= len(pg_candidates_desc): hit_right = True continue pg = pg_candidates_desc[pg_walk_pos] self.pg_candidates.append(pg) assert len(self.pg_candidates) == len(pg_candidates_desc) def get_candidates(self): """ return an iterable with pgids on the osd, in the order we should try moving them away. """ return self.pg_candidates def get_properties(self, pgid): return self.pg_properties[pgid] def get_size(self, pgid): return self.pg_properties[pgid].size def __len__(self): return len(self.pg_candidates) # should be a @dataclass, but that needs py3.7 class CrushclassUsage: def __init__(self, fill_average, usage_percent, osd_min, osd_min_used, osd_median, osd_median_used, osd_max, osd_max_used): self.fill_average: float = fill_average self.usage_percent: float = usage_percent self.osd_min: int = osd_min self.osd_min_used: float = osd_min_used self.osd_median: int = osd_median self.osd_median_used: float = osd_median_used self.osd_max: int = osd_max self.osd_max_used: float = osd_max_used class MappingAnalyzer: def __init__(self, pool_free_method: PoolFreeMethod = PoolFreeMethod.LIMITING, save_file: Optional[str] = None): self.pool_free_method = pool_free_method self.save_file = save_file self.pg_mappings: Optional[PGMappings] = None self.max_poolname_len: int = None self.cluster_variance = None self.pools_affected_by_balance = None self.pool_avail = None self.crushclass_usages = None # how many movements did we see? # 0 = none yet. self.move_idx = 0 if self.save_file: self.dump_moves = list() self.dump_pool_moves = list() self.dump_osd_moves = list() self.dump_cluster_moves = list() # so it doesn't crash later if pandas is not there... import pandas as pd def analyze(self, pg_mappings: PGMappings): self.pg_mappings = pg_mappings self.max_poolname_len = pg_mappings.cluster.max_poolname_len self._update_stats() def is_analyzed(self) -> bool: return self.pg_mappings is not None def record_move(self, pgid, osd_from, osd_to): self.move_idx += 1 self._update_stats((osd_from, osd_to)) if self.save_file: shard_size = self.pg_mappings.cluster.get_pg_shardsize(pgid) self.dump_moves.append((self.move_idx, shard_size, pool_from_pg(pgid), pgid, osd_from, osd_to)) def finish(self) -> None: if self.save_file: logging.info('generating output DataFrame...') import pandas as pd class PandasEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, pd.DataFrame): return obj.to_dict() return super().default(obj) cluster = self.pg_mappings.cluster # metadata osd_props = pd.DataFrame( [(osdid, cluster.get_osd_size(osdid, adjust_full_ratio=False), osdprops['crush_class']) for osdid, osdprops in cluster.osds.items()], columns=['osdid', 'osdsize', 'crushclass'], ) pool_props = pd.DataFrame( [(poolid, pooldata["name"], pooldata['pg_num'], pooldata['stored'], pooldata['used']) for poolid, pooldata in cluster.pools.items()], columns=['poolid', 'pool_name', 'pg_num', 'stored', 'used'], ) # movement logs moves = pd.DataFrame( self.dump_moves, columns=['move_idx', 'move_size', 'poolid', 'pgid', 'osd_from', 'osd_to'], ) pool_moves = pd.DataFrame( self.dump_pool_moves, columns=['move_idx', 'poolid', 'pool_available'], ) osd_moves = pd.DataFrame( self.dump_osd_moves, columns=['move_idx', 'osdid', 'osd_usage_percent', 'osd_used'], ) cluster_moves = pd.DataFrame( self.dump_cluster_moves, columns=['move_idx', 'crushclass', 'class_variance'], ) logging.info(f'saving data to {self.save_file!r}...') with open(self.save_file, "w") as outfd: json.dump( { "generated_at": datetime.datetime.now().astimezone().isoformat(), "fsid": cluster.fsid, "argv": sys.argv, "pool": pool_props, "osd": osd_props, "moves": moves, "cluster_moves": cluster_moves, "pool_moves": pool_moves, "osd_moves": osd_moves, }, outfd, cls=PandasEncoder, ) def _update_stats(self, modified_osds=tuple()): # TODO: use modified_osds for more efficient update? # if it's not the limit_osd, a pool didn't gain space, but it may have lost space. if self.pg_mappings is None: raise RuntimeError("analysis missing pg mappings") cluster = self.pg_mappings.cluster # cluster utilization variance by crushclass self.cluster_variance = self.pg_mappings.get_cluster_variance() # poolids for all that can be balanced due to their candidates osds having their contents moved around self.pools_affected_by_balance = self.pg_mappings.get_potentially_affected_pools() # pool utilization self.pool_avail = dict() for poolid in self.pools_affected_by_balance: poolname = cluster.pools[poolid]['name'] if self.pool_free_method == PoolFreeMethod.LIMITING: avail, limit_osd = self.pg_mappings.get_pool_max_avail_limited(poolid) elif self.pool_free_method == PoolFreeMethod.WEIGHT: avail, limit_osd = self.pg_mappings.get_pool_max_avail_weight(poolid) else: raise RuntimeError() self.pool_avail[poolid] = (poolname, avail, limit_osd) # per-crushclass osd usages self.crushclass_usages = dict() for crushclass in self.pg_mappings.get_enabled_crushclasses(): cc_osd_usages = dict() for osdid in cluster.get_crushclass_osds(crushclass, skip_zeroweight=True): cc_osd_usages[osdid] = self.pg_mappings.get_osd_usage(osdid) # osd size utilization osds_size_sorted = list(sorted(cc_osd_usages.items(), key=lambda x: x[1])) osd_min, osd_min_used = osds_size_sorted[0] osd_median, osd_median_used = osds_size_sorted[int(len(cc_osd_usages) // 2)] osd_max, osd_max_used = osds_size_sorted[-1] self.crushclass_usages[crushclass] = CrushclassUsage( fill_average=stat_mean(cc_osd_usages.values()), usage_percent=cluster.crushclasses_usage[crushclass]['percent_used'], osd_min=osd_min, osd_min_used=osd_min_used, osd_median=osd_median, osd_median_used=osd_median_used, osd_max=osd_max, osd_max_used=osd_max_used, ) if self.save_file: for osdid in cluster.get_crushclass_osds(crushclass, skip_zeroweight=True): self.dump_osd_moves.append(( self.move_idx, osdid, cc_osd_usages[osdid], self.pg_mappings.get_osd_usage_size(osdid), )) if self.save_file: for pool_id, (_, avail, limit_osd) in self.pool_avail.items(): self.dump_pool_moves.append((self.move_idx, pool_id, avail)) for crushclass, variance in self.cluster_variance.items(): self.dump_cluster_moves.append((self.move_idx, crushclass, variance)) def log(self): if self.pg_mappings is None: raise RuntimeError("initial analysis missing") logging.info("cluster variance for crushclasses:") for crushclass, variance in self.cluster_variance.items(): logging.info(f" {crushclass: >15}: {variance:.3f}") logging.debug("usable space, caused by osd") logging.debug(strlazy(lambda: f" {'pool'.ljust(self.max_poolname_len)} {'avail': >8} {'limitosd': >8}")) for poolname, avail, limit_osd in self.pool_avail.values(): poolname = poolname.ljust(self.max_poolname_len) logging.debug(strlazy(lambda: f" {poolname} {pformatsize(avail, 2): >8} {limit_osd: >8}")) logging.info("OSD fill rate by crushclass:") for crushclass, usage in self.crushclass_usages.items(): logging.info(f" {crushclass}: average={usage.fill_average:3.2f}%, " f"unconstrained={usage.usage_percent:3.2f}%") logging.info(f" min osd.{usage.osd_min: <5} {usage.osd_min_used:3.3f}%") logging.info(f" median osd.{usage.osd_median: <5} {usage.osd_median_used:3.3f}%") logging.info(f" max osd.{usage.osd_max: <5} {usage.osd_max_used:3.3f}%") def log_compare_with(self, ana_new: "MappingAnalyzer"): if not isinstance(ana_new, MappingAnalyzer): raise RuntimeError("can only compare with other analysis") if self.pg_mappings is None: raise RuntimeError("analysis missing") logging.info("OSD fill rate by crushclass:") logging.info(f" OLD NEW") for crushclass, old_usage in self.crushclass_usages.items(): new_usage = ana_new.crushclass_usages[crushclass] old_variance = self.cluster_variance[crushclass] new_variance = ana_new.cluster_variance[crushclass] logging.info(f" {crushclass}:") logging.info(f" raw {old_usage.usage_percent:7.3f}%" f" {new_usage.usage_percent:7.3f}%") logging.info(f" avg {old_usage.fill_average:7.3f}%" f" {new_usage.fill_average:7.3f}%") logging.info(f" variance {old_variance:8.3f} " f" {new_variance:8.3f}") logging.info(f" min osd.{old_usage.osd_min: <5} {old_usage.osd_min_used:7.3f}%" f" osd.{new_usage.osd_min: <5} {new_usage.osd_min_used:7.3f}%") logging.info(f" median osd.{old_usage.osd_median: <5} {old_usage.osd_median_used:7.3f}%" f" osd.{new_usage.osd_median: <5} {new_usage.osd_median_used:7.3f}%") logging.info(f" max osd.{old_usage.osd_max: <5} {old_usage.osd_max_used:7.3f}%" f" osd.{new_usage.osd_max: <5} {new_usage.osd_max_used:7.3f}%") logging.info("") logging.info(strlazy(lambda: f"new usable space:")) # poolid -> (prev, new, gained space) space_won = dict() for poolid in self.pools_affected_by_balance: _, prev_avail, prev_limit_osd = self.pool_avail[poolid] poolname, new_avail, new_limit_osd = ana_new.pool_avail[poolid] poolname = poolname.ljust(self.max_poolname_len) gain = new_avail - prev_avail space_won[poolid] = (poolname, prev_avail, new_avail, gain) if gain < 0: change = self.pg_mappings.osd_size_change.get(prev_limit_osd) if change is not None: change_newlimit = self.pg_mappings.osd_size_change.get(new_limit_osd) if change_newlimit is not None: change_newlimit_f = f"changed by {pformatsize(change_newlimit, 2)} overall" else: change_newlimit_f = "was not touched not at all" logging.info(f"pool {poolname} lost {pformatsize(gain, 2)} while we changed limiting osd.{prev_limit_osd} " f"by {pformatsize(change, 2)}, " f"now limited by osd.{new_limit_osd} (which {change_newlimit_f})") poolnameheader = 'pool name'.ljust(self.max_poolname_len) logging.info(strlazy(lambda: f" {poolnameheader} {'id': >4} {'previous': >8} {'new': >8} {'change': >8}")) gain_sum = 0 for poolid, (poolname, prev_avail, new_avail, gain) in space_won.items(): gain_sum += gain poolname = poolname.ljust(self.max_poolname_len) logging.info(strlazy(lambda: (f" {poolname} {poolid: >4} {pformatsize(prev_avail, 2): >8} " f"-> {pformatsize(new_avail, 2): >8} => {pformatsize(gain, 2): >8}"))) logging.info(strlazy(lambda: (f" {' ' * (self.max_poolname_len + 24)} sum: {pformatsize(gain_sum, 2): >8}"))) def A001057(): """ generate [0, 1, -1, 2, -2, 3, -3, ...] https://oeis.org/A001057 """ idx = 0 while True: val = int((1 - (2 * idx + 1) * (-1)**idx) / 4) idx += 1 yield val def list_highlight(osdlist, changepos, colorcode): """ highlight an element at given list position with an ansi color code. """ ret = list() for idx, osd in enumerate(osdlist): if idx == changepos: ret.append(f"\x1b[{colorcode};1m{osd}\x1b[m") else: ret.append(str(osd)) return f"[{', '.join(ret)}]" def balance(args, cluster): logging.info("running pg balancer") # this is basically my approach to OSDMap::calc_pg_upmaps # and a CrushWrapper::try_remap_rule python-implementation if args.osdused == "delta": osdused_method = OSDUsedMethod.DELTA elif args.osdused == "shardsum": osdused_method = OSDUsedMethod.SHARDSUM else: raise RuntimeError(f"unknown osd usage rate method {args.osdused!r}") if args.osdfrom == "fullest": osdfrom_method = OSDFromChoiceMethod.FULLEST elif args.osdfrom == "limiting": osdfrom_method = OSDFromChoiceMethod.LIMITING elif args.osdfrom == "alternate": osdfrom_method = OSDFromChoiceMethod.ALTERNATE else: raise RuntimeError(f"unknown osd usage rate method {args.osdused!r}") if args.pg_choice == "largest": pg_choice_method = PGChoiceMethod.LARGEST elif args.pg_choice == "median": pg_choice_method = PGChoiceMethod.MEDIAN elif args.pg_choice == "auto": pg_choice_method = PGChoiceMethod.AUTO else: raise RuntimeError(f"unknown osd usage rate method {args.pg_choice!r}") only_crushclasses = None if args.only_crushclass and not (args.only_pool or args.only_poolid): only_crushclasses = {cls.strip() for cls in args.only_crushclass.split(",") if cls.strip()} logging.info(f"only considering crushclasses {only_crushclasses}") only_poolids = None if args.only_poolid: only_poolids = {int(pool) for pool in args.only_poolid.split(",") if pool.strip()} if args.only_pool: only_poolids = {int(cluster.poolnames[pool.strip()]) for pool in args.only_pool.split(",") if pool.strip()} if only_poolids: logging.info(f"only considering pools {only_poolids}") if args.only_crushclass: logging.info(f" ignoring the only-crushclass option since explicit pools were given.") ignore_ideal_pgcounts_src = args.ignore_ideal_pgcounts in ('source', 'all') ignore_ideal_pgcounts_dest = args.ignore_ideal_pgcounts in ('destination', 'all') target_constrain_usage = not args.ignore_target_usage target_constrain_limit = args.ensure_target_limits # movement change tracking and logging analyzer = MappingAnalyzer() # remember start state for comparison init_analyzer = MappingAnalyzer() # we'll do all the optimizations in this mapping state pg_mappings = PGMappings(cluster, only_crushclasses=only_crushclasses, only_poolids=only_poolids, osdused_method=osdused_method, osd_from_choice_method=osdfrom_method, pg_choice_method=pg_choice_method, ignore_cluster_upmaps=args.ignore_state_upmaps, emit_ignored_upmaps=args.emit_ignored_upmaps, add_upmaps=args.add_upmaps, analyzer=analyzer, init_analyzer=init_analyzer, need_simulation=True) # what't the status before moving around shards init_analyzer.log() if args.save_timings: # move_idx -> duration since start move_calc_times = dict() move_calc_start = time.time() # so we can skip over the informational analysis phase duration analysis_duration = 0 # to restrict source osds source_osds = None if args.source_osds: splitter = ',' if ',' in args._source_osds else None source_osds = [int(osdid) for osdid in args.source_osds.split(splitter)] # number of found remaps found_remap_count = 0 force_finish = False found_remap = False while True: if found_remap_count >= args.max_pg_moves: logging.info("enough remaps found") break if force_finish: break if found_remap or found_remap_count == 0: source_attempts = 0 found_remap = False unsuccessful_pools = set() last_attempt = -1 # try to move the biggest pg from the fullest disk to the next suiting smaller disk for osd_from, osd_from_used_percent in pg_mappings.get_osd_from_candidates(): if found_remap or force_finish: break # filter only-allowed source osds if source_osds is not None: if osd_from not in source_osds: continue source_attempts += 1 if source_attempts > args.max_move_attempts: logging.info(f"couldn't empty osd.{last_attempt}, so we're done. " f"if you want to try more often, set --max-move-attempts=$nr, this may unlock " f"more balancing possibilities. " f"setting --ignore-ideal-pgcounts also unlocks more, but will then we will " f"fight with ceph's default balancer.") force_finish = True continue last_attempt = osd_from logging.debug("trying to empty osd.%s (%f %%)", osd_from, osd_from_used_percent) # these pgs are up on the source osd, # and are in the order of preference moving them away # TODO: it would be great if we could already know the destination osd candidates here # for better pg candidate selection. pg_candidates = pg_mappings.get_pg_move_candidates(osd_from) from_osd_pg_count = pg_mappings.osd_pool_up_shard_count[osd_from] # pools whose pgs are below count limit on the from osd -> no need to try other pgs of these pools from_osd_satisfied_pool_pgcount = set() # pools whose pgs are too big from_osd_satisfied_pool_pgsize = set() # now try to move the candidates to their possible destination # TODO: instead of having all candidates, # always get the "next" one from a "smart" iterator # there we can also feed in the current osd_usages_asc # in order to "optimize" candidates by "anticipating" "target" "osds" "." # even better, we can integrate the PGMoveChecker directly, and thus calculate # target OSD candidates _within_ the pg candidate generation. for move_pg_idx, move_pg in enumerate(pg_candidates.get_candidates()): if found_remap: break pg_pool = pool_from_pg(move_pg) if pg_pool in from_osd_satisfied_pool_pgcount: logging.debug("SKIP pg %s from osd.%s since pool=%s-pgs are already below expected", move_pg, osd_from, pg_pool) continue if pg_pool in from_osd_satisfied_pool_pgsize: logging.debug("SKIP pg %s from osd.%s since pool=%s-pgs can't fit on any of the osd_to candidates", move_pg, osd_from, pg_pool) continue if pg_pool in unsuccessful_pools: logging.debug("SKIP pg %s from osd.%s since pool (%s) can't be balanced more", move_pg, osd_from, pg_pool) continue move_pg_shardsize = pg_candidates.get_size(move_pg) logging.debug("TRY-0 moving pg %s (%s/%s) with %s from osd.%s", move_pg, move_pg_idx+1, len(pg_candidates), pformatsize(move_pg_shardsize), osd_from) try_pg_move = PGMoveChecker(pg_mappings, move_pg) osd_to_candidates = try_pg_move.get_osd_candidates(osd_from) pool_pg_shard_count_ideal = cluster.pool_pg_shard_count_ideal(pg_pool, osd_to_candidates) from_osd_pg_count_ideal = pool_pg_shard_count_ideal * cluster.get_osd_size(osd_from, adjust_full_ratio=True) # only move the pg if the source osd has more PGs of the pool than average # otherwise the regular balancer will fill this OSD again # with another PG (of the same pool) from somewhere if not ignore_ideal_pgcounts_src: from_idealcount_difference = from_osd_pg_count_ideal - from_osd_pg_count[pg_pool] if from_idealcount_difference >= 0.0 or math.isclose(from_idealcount_difference, 0.0): # other pgs of this pool would also violate this check. from_osd_satisfied_pool_pgcount.add(pg_pool) logging.debug(" BAD => skipping pg %s since source osd.%s " "doesn't have too many of pool=%s (%s <= %s)", move_pg, osd_from, pg_pool, from_osd_pg_count[pg_pool], from_osd_pg_count_ideal) continue logging.debug(" OK => taking pg %s from source osd.%s " "since it has too many of pool=%s (%s > %s)", move_pg, osd_from, pg_pool, from_osd_pg_count[pg_pool], from_osd_pg_count_ideal) else: logging.debug(" OK => taking pg %s from source osd.%s, " " pool=%s count %s, ideal %s", move_pg, osd_from, pg_pool, from_osd_pg_count[pg_pool], from_osd_pg_count_ideal) if not args.ignore_pgsize_toolarge: # pre-filter PGs to rule out those that will for sure not gain any space pg_small_enough = False for osd_to in osd_to_candidates: target_predicted_usage = pg_mappings.get_osd_usage(osd_to, add_size=move_pg_shardsize) # check if the target osd will be used less than the source # after we moved the pg if target_predicted_usage < osd_from_used_percent: pg_small_enough = True break if not pg_small_enough: # the pg is so big, it would increase the fill % of all osd_tos more than osd_from is # so we would increase the variance. logging.debug(" BAD => skipping pg %s, since it doesn't fit on any target osd candidate", move_pg) # don't try other pgs from this pool (they would also bee to big) from_osd_satisfied_pool_pgsize.add(pg_pool) continue try_pg_move.prepare_crush_check() # check variance for this crush root variance_before = try_pg_move.get_placement_variance(osd_from) for osd_to, osd_to_usage in pg_mappings.get_osd_target_candidates(osd_to_candidates, move_pg): if osd_to not in osd_to_candidates: raise RuntimeError("tried non-candidate target osd") if osd_to == osd_from: continue logging.debug("TRY-1 move %s osd.%s => osd.%s", move_pg, osd_from, osd_to) # in order to not fight the regular balancer, don't move the PG to a disk # where the weighted pg count of this pool is already good # otherwise the balancer will move a pg on the osd_from of this pool somewhere else # i.e. don't be the +1 to_osd_pg_count = pg_mappings.osd_pool_up_shard_count[osd_to] to_osd_pg_count_ideal = pool_pg_shard_count_ideal * cluster.get_osd_size(osd_to, adjust_full_ratio=True) if ignore_ideal_pgcounts_dest: logging.debug(strlazy(lambda: f" OK => osd.{osd_to} has pool={pg_pool} " f"{to_osd_pg_count[pg_pool]}, target {to_osd_pg_count_ideal}")) else: to_idealcount_difference = to_osd_pg_count_ideal - to_osd_pg_count[pg_pool] if (to_idealcount_difference <= 0 or math.isclose(to_idealcount_difference, 0.0)) and not to_osd_pg_count_ideal < 1.0: logging.debug(strlazy(lambda: f" BAD => osd.{osd_to} already has too many of pool={pg_pool} " f"({to_osd_pg_count[pg_pool]} >= {to_osd_pg_count_ideal})")) continue elif to_osd_pg_count_ideal >= 1.0: logging.debug(strlazy(lambda: f" OK => osd.{osd_to} has too few of pool={pg_pool} " f"({to_osd_pg_count[pg_pool]} < {to_osd_pg_count_ideal})")) else: logging.debug(strlazy(lambda: f" OK => osd.{osd_to} doesn't have pool={pg_pool} yet " f"({to_osd_pg_count[pg_pool]} < {to_osd_pg_count_ideal})")) if target_constrain_usage: # how full will the target be target_predicted_usage = pg_mappings.get_osd_usage(osd_to, add_size=move_pg_shardsize) source_usage = pg_mappings.get_osd_usage(osd_from) # check that the destination osd won't be more full than the source osd # but what if we're balanced very good already? wouldn't we allow this if the variance decreased anyway? # the nolimit target usage constraint may be better suited then. if target_predicted_usage > source_usage: logging.debug(strlazy(lambda: f" BAD target will be more full than source currently is: " f"osd.{osd_to} would have {target_predicted_usage:.3f}%, " f"and source's osd.{osd_from} currently is {source_usage:.3f}%")) continue if target_constrain_limit: _, new_limiting_osd = pg_mappings.get_pool_max_avail_limited(pg_pool, add_size=(osd_to, move_pg_shardsize)) if new_limiting_osd == osd_to: logging.debug(" BAD => osd.%s would become the new limiting osd", osd_to) continue # check if the movement size is nice if args.ensure_optimal_moves: # check if there's a target osd that will be filled less than mean of osd_to and osd_from after move # predicted_target_usage < (target_usage + source_usage)/2 + limit mean_usage = (pg_mappings.get_osd_usage(osd_to) + osd_from_used_percent) / 2 if target_predicted_usage > mean_usage: logging.debug(f" BAD non-optimal size {move_pg} predicted={target_predicted_usage:.3f}% > mean={mean_usage:.3f}%") continue if not try_pg_move.is_move_valid(osd_from, osd_to): logging.debug(f" BAD move {move_pg} osd.{osd_from} => osd.{osd_to}") continue # check if the variance is decreasing new_variance = try_pg_move.get_placement_variance(osd_from, osd_to) if args.ensure_variance_decrease and new_variance >= variance_before: # even if the variance increases, we ensure we do progress by the # guaranteed usage rate decline (or limiting device elimination) logging.debug(f" BAD => variance not decreasing: {new_variance:.6f} not < {variance_before:.6f}") continue new_mapping_pos = None # for logging below prev_pg_mapping = list(pg_mappings.get_mapping(move_pg)) new_pg_mapping = list() for idx, osdid in enumerate(prev_pg_mapping): if osdid == osd_from: osdid = osd_to new_mapping_pos = idx new_pg_mapping.append(osdid) if new_mapping_pos is None: raise RuntimeError(f"prev_mapping={prev_pg_mapping} doesn't contain osd_from={osd_from}") # record the mapping move_ok, msg = pg_mappings.apply_remap(move_pg, osd_from, osd_to) if not move_ok: raise RuntimeError(f'failed to move pg {move_pg} from osd.{osd_from}: {msg}') if args.save_timings: analysis_start = time.time() move_calc_times[len(move_calc_times)] = time.time() - move_calc_start - analysis_duration if new_variance > variance_before: variance_op = ">" elif new_variance < variance_before: variance_op = "<" else: variance_op = "==" logging.info(f" SAVE move {move_pg} osd.{osd_from} => osd.{osd_to} ") logging.info(f" props: {pg_candidates.get_properties(move_pg)}") logging.debug(strlazy(lambda: f" pg {move_pg} was on {list_highlight(prev_pg_mapping, new_mapping_pos, 31)}")) logging.debug(strlazy(lambda: f" pg {move_pg} now on {list_highlight(new_pg_mapping, new_mapping_pos, 32)}")) logging.info(f" => variance new={new_variance:.6f} {variance_op} {variance_before:.6f}=old") analyzer.log() if args.save_timings: analysis_duration += time.time() - analysis_start found_remap = True found_remap_count += 1 break if not found_remap: # we tried all osds to place this pg, # so the shardsize is just too big # if pg_size_choice is auto, we try to avoid this PG anyway, # but if we still end up here, it means the choices for moves are really # becoming tight. unsuccessful_pools.add(pg_pool) # end of to-loop # end of pg loop # end of from-loop move_size, move_count = pg_mappings.get_remaps_shardsize_count() # generation performance if args.save_timings: import socket import platform with open(args.save_timings, "w") as timingfd: json.dump({ "generated_at": datetime.datetime.now().astimezone().isoformat(), "hostname": socket.gethostname(), "cpuname": platform.processor(), "fsid": cluster.fsid, "argv": sys.argv, "max_steps": args.max_pg_moves, "move_count": move_count, "move_steps": found_remap_count, "move_size": move_size, "timings": move_calc_times, }, timingfd, indent=4) if args.save_mappings: pg_mappings.save_mappings(args.save_mappings) # show results! logging.info(80*"-") logging.info("generated %s remaps in %s steps.", move_count, found_remap_count) logging.info("total movement size: %s", pformatsize(move_size)) logging.info(80*"-") init_analyzer.log_compare_with(analyzer) logging.info(80*"-") # return what we have been waiting for :) with open_or_stdout(args.output, "w") as outfd: for cmd in pg_mappings.get_upmap_items_commands(): outfd.write(cmd) outfd.write("\n") def show(args, cluster): analyzer = None mappings = None if args.avail_prediction == "weight": pool_free_method = PoolFreeMethod.WEIGHT elif args.avail_prediction == "limiting": pool_free_method = PoolFreeMethod.LIMITING else: raise RuntimeError() if args.pgstate == "up": pgstate = PGState.UP elif args.pgstate == "acting": pgstate = PGState.ACTING else: raise RuntimeError() if args.osdused == "delta": osdused_method = OSDUsedMethod.DELTA elif args.osdused == "shardsum": osdused_method = OSDUsedMethod.SHARDSUM else: raise RuntimeError(f"unknown osd usage rate method {args.osdused!r}") if (args.show_max_avail or pool_free_method == PoolFreeMethod.LIMITING or args.osds or args.save_upmap_progress or args.save_mappings): if args.save_upmap_progress: analyzer = MappingAnalyzer( pool_free_method=pool_free_method, save_file=args.save_upmap_progress) mappings = PGMappings(cluster, osdused_method=osdused_method, ignore_cluster_upmaps=args.ignore_state_upmaps, add_upmaps=args.add_upmaps, need_simulation=(osdused_method == OSDUsedMethod.DELTA and pgstate == PGState.UP), analyzer=analyzer) if args.save_mappings: mappings.save_mappings(args.save_mappings) if args.format == 'plain': print(f"cluster {'up' if pgstate == PGState.UP else 'acting'} state") maxpoolnamelen = cluster.max_poolname_len maxcrushclasslen = 0 for crush_class in cluster.crushclass_osds.keys(): if len(crush_class) > maxcrushclasslen: maxcrushclasslen = len(crush_class) print() crushclasscolumnlen = max(maxcrushclasslen, len('class')) crushclassheader = 'class'.ljust(crushclasscolumnlen) print(f"{crushclassheader} {'size': >7} {'avail': >7} {'used': >7} {'%used': >7} {'osds': >6}") for crushclass, usage in cluster.crushclasses_usage.items(): crushclassname = crushclass.ljust(crushclasscolumnlen) size = pformatsize(usage['size'], 2) avail = pformatsize(usage['avail'], 2) used = pformatsize(usage['used'], 2) devcount = usage['osd_count'] print(f"{crushclassname} {size: >7} {avail: >7} {used: >7} {usage['percent_used']: >6.02f}% {devcount: >6}") # TODO: allow showing cluster hierarchy if args.show_max_avail: maxavail_hdr = f" {'maxavail': >8}" else: maxavail_hdr = "" poolname = 'name'.ljust(maxpoolnamelen) print() print(f"{'poolid': >6} {poolname} {'type': <7} {'size': >5} {'min': >3} {'pg_num': >6} {'stored': >7} {'used': >7} {'avail': >8}{maxavail_hdr} {'shrdsize': >8} crush") # default, sort by pool id sort_func = lambda x: x[0] if args.sort_shardsize: sort_func = lambda x: x[1]['pg_shard_size_avg'] stored_sum = 0 stored_sum_class = defaultdict(int) used_sum = 0 used_sum_class = defaultdict(int) pg_sum = 0 for poolid, poolprops in sorted(cluster.pools.items(), key=sort_func): poolname = poolprops['name'].ljust(maxpoolnamelen) repl_type = poolprops['repl_type'] if repl_type == "ec": profile = cluster.ec_profiles[poolprops['erasure_code_profile']] repl_type = f"ec{profile['data_chunks']}+{profile['coding_chunks']}" crushruleid = poolprops['crush_rule'] crushrule = cluster.crushrules[crushruleid] crushrulename = crushrule['name'] rootweights = rootweights_from_rule(crushrule, poolprops["size"]) rootweights_ppl = list() for crushroot, weight in rootweights.items(): stored_sum_class[crushroot] += poolprops['stored'] * weight used_sum_class[crushroot] += poolprops['used'] * weight rootweights_ppl.append(f"{crushroot}*{weight:.3f}") size = poolprops['size'] min_size = poolprops['min_size'] pg_num = poolprops['pg_num'] stored_amount = poolprops['stored'] used_amount = poolprops['used'] stored_sum += stored_amount used_sum += used_amount pg_sum += pg_num stored = pformatsize(stored_amount, 2) # used data without redundancy used = pformatsize(used_amount, 2) # raw usage incl redundancy # predicted pool free space - either our or ceph's original size prediction if pool_free_method == PoolFreeMethod.WEIGHT: # exactly the same as mappings.get_pool_max_avail_weight! avail = pformatsize(poolprops['store_avail'], 2) elif pool_free_method == PoolFreeMethod.LIMITING: avail = pformatsize(mappings.get_pool_max_avail_limited(poolid, pgstate=pgstate)[0], 2) else: raise RuntimeError() if args.show_max_avail: # if we had inf many pgs maxavail = pformatsize(mappings.get_pool_max_avail_weight(poolid, pgstate=pgstate)[0], 2) maxavail = f" {maxavail: >8}" else: maxavail = "" shard_size = pformatsize(poolprops['pg_shard_size_avg']) rootweights_pp = ",".join(rootweights_ppl) print(f"{poolid: >6} {poolname} {repl_type: <7} {size: >5} {min_size: >3} {pg_num: >6} {stored: >7} {used: >7} {avail: >8}{maxavail} {shard_size: >8} {crushruleid}:{crushrulename} {rootweights_pp: >12}") for crushroot in stored_sum_class.keys(): crush_tree_class = crushroot.split("~") if len(crush_tree_class) >= 2: # it's a shadow-tree - df dump provides stats for deviceclass usage directly crush_class = crush_tree_class[1] crushclass_usage = f"{cluster.crushclasses_usage[crush_class]['percent_used']:.3f}%" else: # it's a regular tree -> we need to figure out what crushclasses the rule uses # and combine the device-class usages # maybe usage=sum[all root's classes](rootweight*crushclass_usage[crush_class]) # or max(same stuff) since only the fullest device counts again? crushclass_usage = "" print(f"{crushroot: <14} {' ' * maxpoolnamelen} {' ' * 13} " f"{pformatsize(stored_sum_class[crushroot], 2): >7} " f"{pformatsize(used_sum_class[crushroot], 2): >7}" f"{crushclass_usage: >10}") print(f"sum {' ' * maxpoolnamelen} {' ' * 17} {pg_sum: >6} {pformatsize(stored_sum, 2): >7} {pformatsize(used_sum, 2): >7}") if args.osds: if args.only_crushclass: maxcrushclasslen = len(args.only_crushclass) maxcrushclasslen = min(maxcrushclasslen, len('class')) crushclassheader = 'cls'.rjust(maxcrushclasslen) osd_entries = list() for osdid, props in cluster.osds.items(): hostname = props.get('host_name', '?') crushclass = props['crush_class'] devsize = cluster.osds[osdid]['device_size'] if args.only_crushclass: if crushclass != args.only_crushclass: continue if crushclass is None: crushclass = "?" if devsize > 0: util = mappings.get_osd_usage(osdid) used = mappings.get_osd_usage_size(osdid) else: util = 0 used = 0 weight_val = props['weight'] cweight = props.get('crush_weight', 0) if args.use_weighted_utilization: if weight_val == 0: util = 0 else: util /= weight_val if util < args.osd_fill_min: continue if pgstate == PGState.UP: pg_count = props.get('pg_count_up', {}) pg_num = props.get('pg_num_up', 0) elif pgstate == PGState.ACTING: pg_count = props.get('pg_count_acting', dict()) pg_num = props.get('pg_num_acting', 0) else: raise RuntimeError() pool_list = dict() for pool, count in sorted(pg_count.items()): if args.normalize_pg_count: # normalize to terrabytes if devsize >= 0: count /= devsize / 1024 ** 4 else: count = 0 pool_list[pool] = count class_val = crushclass.rjust(maxcrushclasslen) osd_entries.append((osdid, hostname, class_val, devsize, weight_val, cweight, used, util, pg_num, pool_list)) # default sort by osdid sort_func = lambda x: x[0] if args.sort_utilization: sort_func = lambda x: x[6] if args.sort_pg_count is not None: sort_func = lambda x: x[7].get(args.sort_pg_count, 0) # header: print() print(f"{'osdid': >6} {'hostname': >10} {crushclassheader} {'devsize': >7} {'weight': >6} {'cweight': >7} {'used': >7} {'util': >5} {'pg_num': >6} pools") for osdid, hostname, crushclass, devsize, weight, cweight, used, util, pg_num, pool_pgs in sorted(osd_entries, key=sort_func): pool_overview = list() for pool, count in pool_pgs.items(): if args.per_pool_count: if type(count) == float: entry = f"{pool}({count:.1f})" else: entry = f"{pool}({count})" else: entry = f"{pool}" if args.sort_pg_count == pool: pool_overview.insert(0, entry) else: pool_overview.append(entry) util = "%.1f%%" % util weight = "%.2f" % weight pool_list_str = ' '.join(pool_overview) print(f"{osdid: >6} {hostname: >10} {crushclass} {pformatsize(devsize): >7} {weight: >6} {pformatsize(cweight): >7} {pformatsize(used): >7} {util: >5} {pg_num: >6} {pool_list_str}") elif args.format == 'json': if pool_free_method == PoolFreeMethod.WEIGHT: pool_info = cluster.pools elif pool_free_method == PoolFreeMethod.LIMITING: # better free-space size prediction pool_info = dict() for poolid, poolprops in cluster.pools.items(): pool_info[poolid] = dict(poolprops) pool_info[poolid]['store_avail'] = mappings.get_pool_max_avail_limited(poolid, pgstate=pgstate)[0] else: raise RuntimeError() ret = { 'pgstate': args.pgstate, 'pools': pool_info, 'osds': cluster.osds, } json.dump(ret, sys.stdout) else: raise RuntimeError(f'unhandled output format {args.format!r}') if analyzer is not None: analyzer.finish() def showremapped(args, cluster): # pgid -> move information pg_move_status = dict() # osdid => {"to": {pgid -> osdid}, "from": {pgid -> osdid}} osd_actions = defaultdict(lambda: defaultdict(dict)) for pgid, pginfo in cluster.pgs.items(): pgstate = pginfo["state"].split("+") if "remapped" in pgstate: # this calculation is a bit similar to what we do in PGMappings # pending move size estimation. moves = list() osd_move_count = 0 for osds_from, osds_to in cluster.get_remaps(pginfo): for osd_from, osd_to in zip(osds_from, osds_to): osd_actions[osd_from]["to"][pgid] = osd_to osd_actions[osd_to]["from"][pgid] = osd_from froms = ','.join(str(osdid) for osdid in osds_from) tos = ','.join(str(osdid) for osdid in osds_to) moves.append(f"{froms}->{tos}") osd_move_count += len(osds_to) # multiply with move-count since each shard remap adds all objects # to the num_objects_{misplaced,degraded} again objs_total = pginfo["stat_sum"]["num_objects"] * osd_move_count objs_misplaced = pginfo["stat_sum"]["num_objects_misplaced"] objs_degraded = pginfo["stat_sum"]["num_objects_degraded"] objs_to_restore = objs_misplaced + objs_degraded if objs_total > 0: progress = 1 - (objs_to_restore / objs_total) else: progress = 1 progress *= 100 state = "backfill" if "backfilling" in pgstate else "waiting" if "backfill_toofull" in pgstate: state = "toofull" if "degraded" in pgstate: state = f"degraded+{state:<8}" else: state = f" {state:<8}" pg_move_size = cluster.get_pg_shardsize(pgid) pg_move_size_pp = pformatsize(pg_move_size) pg_move_status[pgid] = { "state": state, "objs_total": objs_total, "objs_pending": objs_total - objs_to_restore, "size": pg_move_size, "sizepp": pg_move_size_pp, "progress": progress, "moves": moves, } if args.by_osd: # generate [(osdid, actions), ...] # where actions = {"to": {pgid->osdid}, "from": {pgid->osdid}} if args.osds: osdids = [int(osdid) for osdid in args.osds.split(",")] osdlist = sorted((osdid, osd_actions[osdid]) for osdid in osdids) else: osdlist = sorted(osd_actions.items()) for osdid, actions in osdlist: if osdid != -1: osd_d_size = cluster.get_osd_size(osdid, adjust_full_ratio=False) osd_d_size_pp = pformatsize(osd_d_size, 2) osd_d_used = cluster.osds[osdid]['device_used'] osd_d_used_pp = pformatsize(osd_d_used, 2) osd_c_size = cluster.osds[osdid]['crush_weight'] * cluster.osds[osdid]['weight'] osd_c_size_pp = pformatsize(osd_c_size, 2) if osd_d_size == 0: osd_d_fullness = 0 else: osd_d_fullness = osd_d_used / osd_d_size * 100 if osd_c_size == 0: osd_c_fullness = 0 else: osd_c_fullness = osd_d_used / osd_c_size * 100 osdname = f"osd.{osdid}" fullness = (f" drive={osd_d_fullness:.1f}% {osd_d_used_pp}/{osd_d_size_pp}" f" crush={osd_c_fullness:.1f}% {osd_d_used_pp}/{osd_c_size_pp}") else: osdname = "osd.missing" fullness = "" sum_to = len(actions['to']) sum_from = len(actions['from']) sum_data_to = sum((pg_move_status[pg]['size'] for pg in actions["to"].keys())) sum_data_from = sum((pg_move_status[pg]['size'] for pg in actions["from"].keys())) sum_data_delta = sum_data_from - sum_data_to sum_data_to_pp = pformatsize(sum_data_to, 2) sum_data_from_pp = pformatsize(sum_data_from, 2) sum_data_delta_pp = pformatsize(sum_data_delta, 2) print(f"{osdname}: {cluster.osds[osdid]['host_name']} =>{sum_to} {sum_data_to_pp} <={sum_from} {sum_data_from_pp}" f" (\N{Greek Capital Letter Delta}{sum_data_delta_pp}) {fullness}") for pgid, to_osd in actions["to"].items(): pgstatus = pg_move_status[pgid] print(f" ->{pgid: <6} {pgstatus['state']} {pgstatus['sizepp']: >6} {osdid: >4}->{to_osd: <4} {pgstatus['objs_pending']} of {pgstatus['objs_total']}, {pgstatus['progress']:.1f}%") for pgid, from_osd in actions["from"].items(): pgstatus = pg_move_status[pgid] print(f" <-{pgid: <6} {pgstatus['state']} {pgstatus['sizepp']: >6} {osdid: >4}<-{from_osd: <4} {pgstatus['objs_pending']} of {pgstatus['objs_total']}, {pgstatus['progress']:.1f}%") print() else: for pgid, pgmoveinfo in pg_move_status.items(): state = pgmoveinfo['state'] move_size_pp = pgmoveinfo['sizepp'] objs_total = pgmoveinfo['objs_total'] objs_pending = pgmoveinfo['objs_pending'] progress = pgmoveinfo['progress'] move_actions = ';'.join(pgmoveinfo['moves']) print(f"pg {pgid: <6} {state} {move_size_pp: >6}: {objs_pending} of {objs_total}, {progress:.1f}%, {move_actions}") def poolosddiff(args, cluster): if args.pgstate == "up": pool_osds = cluster.pool_osds_up elif args.pgstate == "acting": pool_osds = cluster.pool_osds_acting else: raise RuntimeError("unknown pgstate {args.pgstate!r}") p1_id = cluster.poolnames[args.pool1] p2_id = cluster.poolnames[args.pool2] p1_osds = pool_osds[p1_id] p2_osds = pool_osds[p2_id] ret = { "pool1": args.pool1, "pool2": args.pool2, "union": p1_osds | p2_osds, "intersect": p1_osds & p2_osds, "p1-p2": p1_osds - p2_osds, "p2-p1": p2_osds - p1_osds, } pprint(ret) def repairstats(args, cluster): stats_sum = cluster.state["pg_dump"]["pg_map"]["osd_stats_sum"] print(f"repaired reads globally: {stats_sum['num_shards_repaired']}") osd_repairs = list() for osdid, osdinfos in cluster.osds.items(): stats = osdinfos.get("stats") if stats is None: continue repairs = stats["num_shards_repaired"] if repairs != 0: osd_repairs.append((osdid, repairs)) for osdid, repairs in sorted(osd_repairs, key=lambda x: (x[1], x[0])): print(f"repaired on {osdid:>6}: {repairs}") def main(): args = parse_args() level = args.verbose - args.quiet log_setup(level) verbose = level >= 1 if args.mode == 'test': import doctest if args.name is None: failed, num_tests = doctest.testmod(exclude_empty=True, verbose=verbose) else: # like doctest.run_docstring_examples but returns results finder = doctest.DocTestFinder(verbose=verbose, recurse=False) runner = doctest.DocTestRunner(verbose=verbose) for test in finder.find(globals()[args.name], args.name, globs=globals()): runner.run(test) runner.summarize() failed, num_tests = runner.failures, runner.tries return 1 if failed > 0 else 0 elif args.mode == 'gather': state = ClusterState() state.dump(args.output_file) else: if args.mode == "osdmap": # so that arg doesn't have to be in the osdmap argparser... args.osdsize = "device" if args.osdsize == "device": osdsize_method = OSDSizeMethod.DEVICE elif args.osdsize == "weighted": osdsize_method = OSDSizeMethod.WEIGHTED elif args.osdsize == "crush": osdsize_method = OSDSizeMethod.CRUSH else: raise RuntimeError(f"unknown osd weight method {args.osdsize!r}") state = ClusterState(args.state, osdsize_method=osdsize_method) state.preprocess() if args.mode == 'balance': run = lambda: balance(args, state) elif args.mode == 'show': run = lambda: show(args, state) elif args.mode == 'showremapped': run = lambda: showremapped(args, state) elif args.mode == 'poolosddiff': run = lambda: poolosddiff(args, state) elif args.mode == 'repairstats': run = lambda: repairstats(args, state) elif args.mode == "osdmap": if args.osdmapmode == "export": run = lambda: state.export_osdmap( args.output_file, ignore_state_upmaps=args.ignore_state_upmaps ) else: raise RuntimeError(f"unknown osdmap mode {args.osdmapmode!r}") else: raise RuntimeError(f"unknown mode: {args.mode}") if args.profile: import cProfile from pstats import SortKey, Stats with cProfile.Profile() as pr: run() s = Stats(pr) s.sort_stats(SortKey.CUMULATIVE) s.print_stats() s.print_callers() else: run() return 0 if __name__ == "__main__": exit(main())