# prepare_data.py - Prepare test data for Milvus backup/restore testing # Supports two scenarios: # 1. Single-stage: All data inserted at once (for testing backup/restore of old version data) # 2. Multi-stage: Data inserted in stages (for testing cross-version backup/restore with incremental data) # # Usage: # Single-stage mode (default): python prepare_data.py # Multi-stage mode: python prepare_data.py --stage 1 # then later: --stage 2 import time import numpy as np from pymilvus import ( connections, utility, FieldSchema, CollectionSchema, DataType, Collection, MilvusClient, ) from pymilvus.exceptions import MilvusException import argparse SCHEMA_VERSION_CONSISTENT_KEY = "schema_version_consistent_segments" SCHEMA_VERSION_TOTAL_KEY = "schema_version_total_segments" def is_schema_version_not_ready(exc): message = str(exc) return ( "schema version" in message and ( "not ready" in message or "consistency check failed" in message ) ) def wait_schema_version_consistent(client, collection_name, timeout=60, poll_interval=0.2): deadline = time.time() + timeout last_consistent = None last_total = None while time.time() < deadline: stats = client.get_collection_stats(collection_name=collection_name) consistent = stats.get(SCHEMA_VERSION_CONSISTENT_KEY) total = stats.get(SCHEMA_VERSION_TOTAL_KEY) if consistent is None and total is None: return if consistent is not None and total is not None and int(consistent) == int(total): return last_consistent = consistent last_total = total time.sleep(poll_interval) raise TimeoutError( f"schema version is not consistent for {collection_name}: " f"{last_consistent}/{last_total} segments are consistent" ) def add_collection_field_with_schema_retry( client, collection_name, field_name, data_type, timeout=120, **kwargs, ): deadline = time.time() + timeout last_error = None while time.time() < deadline: wait_schema_version_consistent( client, collection_name, timeout=max(1, min(30, deadline - time.time())), ) try: return client.add_collection_field( collection_name=collection_name, field_name=field_name, data_type=data_type, **kwargs, ) except MilvusException as exc: if not is_schema_version_not_ready(exc): raise last_error = exc print(f"schema version not ready after adding fields, retrying: {exc}") time.sleep(1) raise TimeoutError( f"timed out adding field {field_name} to {collection_name}" ) from last_error def main(uri="http://127.0.0.1:19530", token="root:Milvus", stage=None, total_entities=3000): fmt = "\n=== {:30} ===\n" dim = 8 ################################################################################# # 1. connect to Milvus # Add a new connection alias `default` for Milvus server in `localhost:19530` # Actually the "default" alias is a buildin in PyMilvus. # If the address of Milvus is the same as `localhost:19530`, you can omit all # parameters and call the method as: `connections.connect()`. # # Note: the `using` parameter of the following methods is default to "default". print(fmt.format("start connecting to Milvus")) print(fmt.format(f"Milvus uri: {uri}")) connections.connect("default", uri=uri, token=token) has = utility.has_collection("hello_milvus") print(f"Does collection hello_milvus exist in Milvus: {has}") ################################################################################# # 2. create collection # We're going to create a collection with 3 fields. # +-+------------+------------+------------------+------------------------------+ # | | field name | field type | other attributes | field description | # +-+------------+------------+------------------+------------------------------+ # |1| "pk" | Int64 | is_primary=True | "primary field" | # | | | | auto_id=False | | # +-+------------+------------+------------------+------------------------------+ # |2| "random" | Double | | "a double field" | # +-+------------+------------+------------------+------------------------------+ # |3|"embeddings"| FloatVector| dim=8 | "float vector with dim 8" | # +-+------------+------------+------------------+------------------------------+ fields = [ FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=False), FieldSchema(name="random", dtype=DataType.DOUBLE), FieldSchema(name="var", dtype=DataType.VARCHAR, max_length=65535), FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim) ] schema = CollectionSchema(fields, "hello_milvus") print(fmt.format("Create collection `hello_milvus`")) hello_milvus = Collection("hello_milvus", schema, consistency_level="Strong") ################################################################################ # 3. insert data # We are going to insert rows of data into the collection # Data to be inserted must be organized in fields. # # The insert() method returns: # - either automatically generated primary keys by Milvus if auto_id=True in the schema; # - or the existing primary key field from the entities if auto_id=False in the schema. # Only insert data to hello_milvus when stage is None or 1 if stage != 2: print(fmt.format("Start inserting entities to hello_milvus")) rng = np.random.default_rng(seed=19530) # hello_milvus always inserts all data when inserting num_entities = total_entities pk_list = [i for i in range(num_entities)] random_list = rng.random(num_entities).tolist() var_list = [str(i) for i in range(num_entities)] # Always use original format embeddings_list = rng.random((num_entities, dim)) # Split data into 10 batches for insertion batch_size = num_entities // 10 if batch_size == 0: batch_size = 1 for j in range(10): start_idx = j * batch_size end_idx = (j + 1) * batch_size if j < 9 else num_entities if start_idx >= num_entities: break # Prepare batch data batch_entities = [ pk_list[start_idx:end_idx], random_list[start_idx:end_idx], var_list[start_idx:end_idx], embeddings_list[start_idx:end_idx].tolist() if isinstance(embeddings_list, np.ndarray) else embeddings_list[start_idx:end_idx] ] # Insert batch data hello_milvus.insert(batch_entities) time.sleep(1) # Add delay to prevent inserting too quickly print(f"epoch {j+1}/10") hello_milvus.flush() else: print("Stage 2: Skipping data insertion to hello_milvus") rng = np.random.default_rng(seed=19530) # Initialize rng for hello_milvus2 print(f"Number of entities in hello_milvus: {hello_milvus.num_entities}") # create another collection fields2 = [ FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=True), FieldSchema(name="random", dtype=DataType.DOUBLE), FieldSchema(name="var", dtype=DataType.VARCHAR, max_length=65535), FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim) ] schema2 = CollectionSchema(fields2, "hello_milvus2") print(fmt.format("Create collection `hello_milvus2`")) hello_milvus2 = Collection("hello_milvus2", schema2, consistency_level="Strong") # For hello_milvus2, apply stage-based data generation if stage is None: # Original scenario: all data in one go num_entities2 = total_entities entity_offset2 = 0 elif stage == 1: # Multi-stage scenario: first half of data num_entities2 = total_entities // 2 entity_offset2 = 0 else: # stage == 2 # Multi-stage scenario: second half of data num_entities2 = total_entities - (total_entities // 2) entity_offset2 = total_entities // 2 if stage is None: var_list2 = [str(i) for i in range(num_entities2)] # Original format else: var_list2 = [f"stage{stage}_entity_{i + entity_offset2}" for i in range(num_entities2)] entities2 = [ rng.random(num_entities2).tolist(), # field random, only supports list var_list2, rng.random((num_entities2, dim)), # field embeddings, supports numpy.ndarray and list ] hello_milvus2.insert(entities2) hello_milvus2.flush() hello_milvus2.insert(entities2) hello_milvus2.flush() if stage is None: print(f"Number of entities in hello_milvus2: {hello_milvus2.num_entities}") else: print(f"Stage {stage} - Number of entities in hello_milvus2: {hello_milvus2.num_entities}") print(fmt.format(f"Stage {stage} completed for hello_milvus2")) print(f"Stage {stage} inserted {num_entities2} entities starting from offset {entity_offset2}") # hello_milvus_dynamic exercises the dynamic schema path. The $meta field # attributes are version-dependent (Milvus #46419 added Nullable=true and # DefaultValue={} starting from v2.6.8 / master), so creating this collection # on a pre-#46419 Milvus and then doing secondary restore reproduces the # schema misalignment described in zilliztech/milvus-backup#1013. fields_dynamic = [ FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=False), FieldSchema(name="random", dtype=DataType.DOUBLE), FieldSchema(name="var", dtype=DataType.VARCHAR, max_length=65535), FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim), ] schema_dynamic = CollectionSchema( fields_dynamic, "hello_milvus_dynamic", enable_dynamic_field=True ) print(fmt.format("Create collection `hello_milvus_dynamic`")) hello_milvus_dynamic = Collection( "hello_milvus_dynamic", schema_dynamic, consistency_level="Strong" ) if stage != 2: print(fmt.format("Start inserting entities to hello_milvus_dynamic")) num_entities_d = total_entities rng_d = np.random.default_rng(seed=19531) rows = [] for i in range(num_entities_d): row = { "pk": i, "random": float(rng_d.random()), "var": str(i), "embeddings": rng_d.random(dim).tolist(), # Populate dynamic fields on every row so the collection has # real dynamic schema usage, not just a declared $meta field. "extra_int": i, "extra_str": f"dyn_{i}", } rows.append(row) batch_size = max(1, num_entities_d // 10) for j in range(0, num_entities_d, batch_size): hello_milvus_dynamic.insert(rows[j : j + batch_size]) time.sleep(1) hello_milvus_dynamic.flush() print( f"Number of entities in hello_milvus_dynamic: {hello_milvus_dynamic.num_entities}" ) else: print("Stage 2: Skipping data insertion to hello_milvus_dynamic") def main_added(uri="http://127.0.0.1:19530", token="root:Milvus", total_entities=3000): """Create hello_milvus_added. Inserts half the rows, flushes, calls add_collection_field to add five nullable fields (with default_value where supported), then inserts the remaining rows populating the new fields. The added fields carry Nullable=true and default_value attributes that must round-trip through backup metadata into secondary restore. Requires Milvus master / 2.6+: add_collection_field does not exist on older versions, so this entry point is split out from main() and only invoked by the secondary-restore CI workflow. """ fmt = "\n=== {:30} ===\n" dim = 8 print(fmt.format("start connecting to Milvus")) print(fmt.format(f"Milvus uri: {uri}")) connections.connect("default", uri=uri, token=token) fields_added = [ FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=False), FieldSchema(name="random", dtype=DataType.DOUBLE), FieldSchema(name="var", dtype=DataType.VARCHAR, max_length=65535), FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim), ] schema_added = CollectionSchema(fields_added, "hello_milvus_added") print(fmt.format("Create collection `hello_milvus_added`")) hello_milvus_added = Collection( "hello_milvus_added", schema_added, consistency_level="Strong" ) first_half = total_entities // 2 rng_a = np.random.default_rng(seed=19532) print(fmt.format("Insert first half to hello_milvus_added")) first_rows = [ [i for i in range(first_half)], rng_a.random(first_half).tolist(), [str(i) for i in range(first_half)], rng_a.random((first_half, dim)).tolist(), ] hello_milvus_added.insert(first_rows) hello_milvus_added.flush() # Use MilvusClient because the ORM Collection wrapper does not expose # add_collection_field directly. All fields are nullable; some carry a # default_value so the backup round-trip must preserve both attributes # (see zilliztech/milvus-backup#1013). print(fmt.format("add_collection_field to hello_milvus_added")) client = MilvusClient(uri=uri, token=token) added_fields = [ ("added_int64", DataType.INT64, {"nullable": True, "default_value": 0}), ( "added_varchar", DataType.VARCHAR, {"nullable": True, "max_length": 256, "default_value": "default"}, ), ("added_double", DataType.DOUBLE, {"nullable": True, "default_value": 0.0}), ("added_bool", DataType.BOOL, {"nullable": True, "default_value": False}), ("added_json", DataType.JSON, {"nullable": True}), ] for field_name, data_type, kwargs in added_fields: add_collection_field_with_schema_retry( client, "hello_milvus_added", field_name, data_type, **kwargs, ) print(f"added field {field_name} ({data_type.name}) {kwargs}") print(fmt.format("Insert second half to hello_milvus_added")) second_half = total_entities - first_half second_rows = [] for k in range(second_half): i = first_half + k second_rows.append( { "pk": i, "random": float(rng_a.random()), "var": str(i), "embeddings": rng_a.random(dim).tolist(), "added_int64": i * 10, "added_varchar": f"value_{i}", "added_double": float(i) * 1.5, "added_bool": i % 2 == 0, "added_json": {"k": i}, } ) # Use MilvusClient.insert here so the schema cache picks up the # newly-added fields. client.insert(collection_name="hello_milvus_added", data=second_rows) client.flush(collection_name="hello_milvus_added") print( f"Number of entities in hello_milvus_added: {hello_milvus_added.num_entities}" ) if __name__ == "__main__": args = argparse.ArgumentParser(description="prepare data for backup/restore testing") args.add_argument("--uri", type=str, default="http://127.0.0.1:19530", help="Milvus server uri") args.add_argument("--token", type=str, default="root:Milvus", help="Milvus server token") args.add_argument("--stage", type=int, choices=[1, 2], required=False, help="Stage 1 or 2 for multi-stage data preparation (only affects hello_milvus2). Omit for single-stage mode") args.add_argument("--total-entities", type=int, default=3000, help="Total number of entities (hello_milvus always gets all, hello_milvus2 respects stage)") args.add_argument("--scenario", choices=["base", "added"], default="base", help="base: hello_milvus / hello_milvus2 / hello_milvus_dynamic. added: hello_milvus_added (requires Milvus master / 2.6+).") args = args.parse_args() if args.scenario == "base": main(args.uri, args.token, args.stage, args.total_entities) else: main_added(args.uri, args.token, args.total_entities)