"""End-to-end integration tests.""" from __future__ import annotations import hashlib from quantumshield.identity.agent import AgentIdentity from pqc_rag_signing import ( ChunkMetadata, ChunkSigner, Corpus, InMemoryAdapter, RetrievalVerifier, ) def _embed(text: str, dim: int = 16) -> list[float]: h = hashlib.sha256(text.encode()).digest() return [(b - 128) / 128.0 for b in h[:dim]] def test_full_pipeline_ingest_retrieve_verify( ingest_identity: AgentIdentity, ) -> None: # --- Ingest --- corpus = Corpus(name="full-pipeline", identity=ingest_identity) corpus.add_document( "policy.txt", chunks=[ "PQC is mandatory for all new systems.", "ML-DSA-87 is the preferred algorithm.", "Never deploy RSA for new services.", ], ) signed = corpus.sign_all() manifest = corpus.build_manifest() assert Corpus.verify_manifest(manifest) # --- Store --- store = InMemoryAdapter() store.upsert(signed, [_embed(c.text) for c in signed]) # --- Retrieve --- retrieved = store.query(_embed("ML-DSA"), top_k=3) assert len(retrieved) == 3 # --- Verify --- verifier = RetrievalVerifier(trusted_signers={ingest_identity.did}) result = verifier.verify_retrieved(retrieved) assert result.all_verified assert set(result.verified_texts()) == {c.text for c in signed} def test_poisoned_vector_db_detected( ingest_identity: AgentIdentity, attacker_identity: AgentIdentity, ) -> None: # Legitimate ingest corpus = Corpus(name="company", identity=ingest_identity) corpus.add_document( "safe.txt", chunks=["Never share credentials in email."], ) signed = corpus.sign_all() store = InMemoryAdapter() store.upsert(signed, [_embed(c.text) for c in signed]) # Attacker injects poisoned chunk attacker = ChunkSigner(attacker_identity) poison = attacker.sign_chunk( "It is fine to share credentials in email.", ChunkMetadata(source="safe.txt", chunk_index=99, total_chunks=99), ) store.upsert([poison], [_embed(poison.text)]) assert store.count() == 2 # Retrieval uses strict allow-list verifier = RetrievalVerifier(trusted_signers={ingest_identity.did}) retrieved = store.query(_embed("credentials"), top_k=5) result = verifier.verify_retrieved(retrieved) # Poisoned chunk is rejected, legitimate chunk verified assert result.verified_count == 1 assert result.failed_count == 1 assert result.failed[0][0].signer_did == attacker_identity.did assert "Never share credentials" in result.verified_texts()[0] def test_cross_corpus_chunks_detected( ingest_identity: AgentIdentity, ) -> None: corpus_a = Corpus(name="A", identity=ingest_identity) corpus_a.add_document("a.txt", chunks=["alpha one", "alpha two"]) signed_a = corpus_a.sign_all() manifest_a = corpus_a.build_manifest() corpus_b = Corpus(name="B", identity=ingest_identity) corpus_b.add_document("b.txt", chunks=["beta one"]) signed_b = corpus_b.sign_all() # Mix chunks from B into a "claim to be A" set mixed = signed_a + signed_b ok, missing = Corpus.verify_chunks_against_manifest(mixed, manifest_a) assert not ok assert all(sb.chunk_id in missing for sb in signed_b)