// This file is part of midnight-node. // Copyright (C) 2025 Midnight Foundation // SPDX-License-Identifier: Apache-2.0 // Licensed under the Apache License, Version 2.0 (the "License"); // You may not use this file except in compliance with the License. // You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. use crate::db::{ get_deregistrations, get_redemption_creates, get_redemption_spends, get_registrations, }; use crate::{ CreateData, DeregistrationData, MidnightCNightObservationDataSource, ObservedUtxo, ObservedUtxoData, ObservedUtxoHeader, RedemptionCreateData, RedemptionSpendData, RegistrationData, SpendData, UtxoIndexInTx, }; use cardano_serialization_lib::{ Address, BaseAddress, ConstrPlutusData, Credential, Ed25519KeyHash, EnterpriseAddress, PlutusData, RewardAddress, ScriptHash, }; use derive_new::new; use midnight_primitives_cnight_observation::{ CNightAddresses, CardanoPosition, CardanoRewardAddressBytes, DustPublicKeyBytes, ObservedUtxos, }; use partner_chains_db_sync_data_sources::McFollowerMetrics; use sidechain_domain::{McBlockHash, McBlockNumber, McTxHash, McTxIndexInBlock, TX_HASH_SIZE}; pub use sqlx::PgPool; use std::fmt::Debug; #[derive( Debug, Copy, Clone, PartialEq, PartialOrd, parity_scale_codec::Encode, parity_scale_codec::Decode, scale_info::TypeInfo, )] pub struct TxHash(pub [u8; TX_HASH_SIZE]); #[derive( Debug, Clone, PartialEq, parity_scale_codec::Encode, parity_scale_codec::Decode, scale_info::TypeInfo, )] pub struct TxPosition { pub block_hash: McBlockHash, pub block_number: McBlockNumber, pub block_index: McTxIndexInBlock, } #[derive(thiserror::Error, Debug)] pub enum MidnightCNightObservationDataSourceError { #[error("missing reference for block hash `{0}` in db-sync")] MissingBlockReference(McBlockHash), #[error("Error querying database")] DBQueryError(#[from] sqlx::error::Error), #[error("Error extracting network id from Cardano address")] CardanoNetworkError(String), #[error("Invalid value for mapping validator address")] MappingValidatorInvalidAddress(String), } #[derive(thiserror::Error, Debug)] pub enum RegistrationDatumDecodeError { #[error("Cardano credential not bytes")] CardanoCredentialNotBytes, #[error("Cardano credential invalid tag")] CardanoCredentialInvalidTag(u64), #[error("Cardano credential invalid key hash")] CardanoCredentialInvalidKeyHash, #[error("Cardano credential invalid script hash")] CardanoCredentialInvalidScriptHash, #[error("Dust address not bytes")] DustAddressNotBytes, #[error("Dust address invalid length")] DustAddressInvalidLength(usize), } #[derive(new)] pub struct MidnightCNightObservationDataSourceImpl { pub pool: PgPool, pub metrics_opt: Option, #[allow(dead_code)] cache_size: u16, } // If we need better logging here, we could use use db_sync_follower::observed_async_trait // But perhaps there are better options for tracing #[async_trait::async_trait] impl MidnightCNightObservationDataSource for MidnightCNightObservationDataSourceImpl { async fn get_utxos_up_to_capacity( &self, config: &CNightAddresses, start_position: &CardanoPosition, current_tip: McBlockHash, tx_capacity: usize, ) -> Result> { let cnight_asset_name = config.cnight_asset_name.as_bytes(); let mapping_validator_address = Address::from_bech32(&config.mapping_validator_address) .map_err(|e| { MidnightCNightObservationDataSourceError::MappingValidatorInvalidAddress( e.to_string(), ) })?; let cardano_network = mapping_validator_address.network_id().map_err(|_| { MidnightCNightObservationDataSourceError::CardanoNetworkError( config.mapping_validator_address.clone(), ) })?; let mapping_validator_policy_id = EnterpriseAddress::from_address(&mapping_validator_address) .ok_or(MidnightCNightObservationDataSourceError::MappingValidatorInvalidAddress( "Not EnterpriseAddress".to_string(), ))? .payment_cred() .to_scripthash() .ok_or(MidnightCNightObservationDataSourceError::MappingValidatorInvalidAddress( "MappingValidator address does not contain a script hash".to_string(), ))?; // Get end position from cardano block hash let end: CardanoPosition = crate::db::get_block_by_hash(&self.pool, current_tip.clone()) .await? .ok_or(MidnightCNightObservationDataSourceError::MissingBlockReference(current_tip))? .into(); // Increment the end position to tx_index + 1 of the current mainchain position let end = end.increment(); // The "capacity" argument is capacity in terms of TRANSACTIONS, // but the various sql queries below want a capacity in terms of UTXOs. // Use a generous overestimate of how many UTXOs each TX _may_ have. let utxo_capacity = tx_capacity * 64; // Call db methods to get UTXOs (offset + limit) until we reach our capacity // TODO: (possibly) Replace this with grabbing from a queue that's filled async by an offchain thread // ^ We may not have to do the above if the queries are fast enough let mut utxos = [ self.get_registration_utxos( cardano_network, &mapping_validator_policy_id, &config.mapping_validator_address, &config.auth_token_asset_name, start_position, &end, utxo_capacity, 0, ) .await?, self.get_deregistration_utxos( cardano_network, &config.mapping_validator_address, start_position, &end, utxo_capacity, 0, ) .await?, self.get_asset_create_utxos( cardano_network, config.cnight_policy_id, cnight_asset_name, start_position, &end, utxo_capacity, 0, ) .await?, self.get_asset_spend_utxos( cardano_network, config.cnight_policy_id, cnight_asset_name, start_position, &end, utxo_capacity, 0, ) .await?, self.get_redemption_create_utxos( cardano_network, &config.redemption_validator_address, config.cnight_policy_id, cnight_asset_name, start_position, &end, utxo_capacity, 0, ) .await?, self.get_redemption_spend_utxos( cardano_network, &config.redemption_validator_address, config.cnight_policy_id, cnight_asset_name, start_position, &end, utxo_capacity, 0, ) .await?, ] .concat(); utxos.sort(); // Truncate UTXOs but include full transactions let mut truncated_utxos = Vec::with_capacity(utxo_capacity); let mut num_txs = 0; let mut cur_tx: Option = None; for utxo in utxos { if cur_tx.as_ref().is_none_or(|tx| tx < &utxo.header.tx_position) { num_txs += 1; cur_tx = Some(utxo.header.tx_position.clone()); } if num_txs == tx_capacity { break; } truncated_utxos.push(utxo); } if num_txs < tx_capacity { // We couldn't find enough UTXOs in the range, which means we're up-to-date with the // current_tip Ok(ObservedUtxos { start: start_position.clone(), end, utxos: truncated_utxos }) } else { Ok(ObservedUtxos { start: start_position.clone(), end: truncated_utxos .last() .map_or(start_position.clone(), |u| u.header.tx_position.clone()) .increment(), utxos: truncated_utxos, }) } } } impl MidnightCNightObservationDataSourceImpl { fn decode_registration_datum( datum: ConstrPlutusData, ) -> Result<(Credential, DustPublicKeyBytes), RegistrationDatumDecodeError> { // We use a Vec here because the `get` method on `PlutusList` can panic let list: Vec = datum.data().into_iter().cloned().collect(); let Some(cardano_credential) = list.first().and_then(|d| d.as_constr_plutus_data()) else { return Err(RegistrationDatumDecodeError::CardanoCredentialNotBytes); }; let credential = match u64::from(cardano_credential.alternative()) { 0 => cardano_credential .data() .into_iter() .next() .and_then(|d| d.as_bytes()) .and_then(|hash_bytes| Ed25519KeyHash::from_bytes(hash_bytes).ok()) .map(|hash| Credential::from_keyhash(&hash)) .ok_or(RegistrationDatumDecodeError::CardanoCredentialInvalidKeyHash)?, 1 => cardano_credential .data() .into_iter() .next() .and_then(|d| d.as_bytes()) .and_then(|hash_bytes| ScriptHash::from_bytes(hash_bytes).ok()) .map(|hash| Credential::from_scripthash(&hash)) .ok_or(RegistrationDatumDecodeError::CardanoCredentialInvalidScriptHash)?, tag => { return Err(RegistrationDatumDecodeError::CardanoCredentialInvalidTag(tag)); }, }; let Some(dust_address) = list.get(1).and_then(|d| d.as_bytes()) else { return Err(RegistrationDatumDecodeError::DustAddressNotBytes); }; let dust_addr_length = dust_address.len(); let Ok(dust_address) = ::try_from(dust_address) else { return Err(RegistrationDatumDecodeError::DustAddressInvalidLength(dust_addr_length)); }; Ok((credential, dust_address)) } #[allow(clippy::too_many_arguments)] async fn get_redemption_create_utxos( &self, cardano_network: u8, address: &str, policy_id: [u8; 28], asset_name: &[u8], start: &CardanoPosition, end: &CardanoPosition, limit: usize, offset: usize, ) -> Result, Box> { let rows = get_redemption_creates( &self.pool, address, policy_id, asset_name, start, end, limit, offset, ) .await .map_err(|e| format!("Failed to fetch data: {e}"))?; let mut utxos = Vec::new(); for row in rows { let header = ObservedUtxoHeader { tx_position: CardanoPosition { block_hash: McBlockHash(row.block_hash.0), block_number: row.block_number.0, block_timestamp: row.block_timestamp.and_utc().into(), tx_index_in_block: row.tx_index_in_block.0, }, tx_hash: McTxHash(row.tx_hash.0), utxo_tx_hash: McTxHash(row.tx_hash.0), utxo_index: UtxoIndexInTx(row.utxo_index.0), }; let Some(constr) = row.full_datum.0.as_constr_plutus_data() else { log::error!("Plutus data for mapping validator not Constr ({header:?})"); continue; }; let list = constr.data(); let Some(owner_bytes) = list.get(0).as_bytes() else { log::error!("Owner Cardano address not bytes ({header:?})"); continue; }; let Some(cardano_address) = cardano_serialization_lib::Address::from_bytes(owner_bytes.clone()).ok() else { log::error!( "Cardano address {owner_bytes:?} not valid cardano address ({header:?})" ); continue; }; let Some(base_address) = BaseAddress::from_address(&cardano_address) else { log::error!( "Cardano Address {:?} has no delegation part", cardano_address.to_hex() ); continue; }; let reward_address = RewardAddress::new(cardano_network, &base_address.stake_cred()); let owner = reward_address.to_address().to_bytes().try_into().unwrap(); let utxo = ObservedUtxo { header, data: ObservedUtxoData::RedemptionCreate(RedemptionCreateData { owner, value: row.quantity as u128, utxo_tx_hash: McTxHash(row.tx_hash.0), utxo_tx_index: row.utxo_index.0, }), }; utxos.push(utxo); } Ok(utxos) } #[allow(clippy::too_many_arguments)] async fn get_redemption_spend_utxos( &self, cardano_network: u8, address: &str, policy_id: [u8; 28], asset_name: &[u8], start: &CardanoPosition, end: &CardanoPosition, limit: usize, offset: usize, ) -> Result, Box> { let rows = get_redemption_spends( &self.pool, address, policy_id, asset_name, start, end, limit, offset, ) .await .map_err(|e| format!("Failed to fetch data: {e}"))?; let mut utxos = Vec::new(); for row in rows { let header = ObservedUtxoHeader { tx_position: CardanoPosition { block_hash: McBlockHash(row.block_hash.0), block_number: row.block_number.0, block_timestamp: row.block_timestamp.and_utc().into(), tx_index_in_block: row.tx_index_in_block.0, }, tx_hash: McTxHash(row.tx_hash.0), utxo_tx_hash: McTxHash(row.utxo_tx_hash.0), utxo_index: UtxoIndexInTx(row.utxo_index.0), }; let Some(constr) = row.full_datum.0.as_constr_plutus_data() else { log::error!( "INTERNAL ERROR: Plutus data for mapping validator not Constr ({header:?})" ); continue; }; let list = constr.data(); let Some(owner_bytes) = list.get(0).as_bytes() else { log::error!("Owner Cardano address not bytes ({header:?})"); continue; }; let Some(cardano_address) = cardano_serialization_lib::Address::from_bytes(owner_bytes.clone()).ok() else { log::error!( "Cardano address {owner_bytes:?} not valid cardano address ({header:?})" ); continue; }; let Some(base_address) = BaseAddress::from_address(&cardano_address) else { log::error!( "Cardano Address {:?} has no delegation part", cardano_address.to_hex() ); continue; }; let reward_address = RewardAddress::new(cardano_network, &base_address.stake_cred()); let owner = reward_address.to_address().to_bytes().try_into().unwrap(); let utxo = ObservedUtxo { header, data: ObservedUtxoData::RedemptionSpend(RedemptionSpendData { value: row.quantity as u128, owner, utxo_tx_hash: McTxHash(row.utxo_tx_hash.0), utxo_tx_index: row.utxo_index.0, spending_tx_hash: McTxHash(row.tx_hash.0), }), }; utxos.push(utxo); } Ok(utxos) } #[allow(clippy::too_many_arguments)] async fn get_registration_utxos( &self, cardano_network: u8, mapping_validator_policy_id: &ScriptHash, address: &str, auth_asset_name: &str, start: &CardanoPosition, end: &CardanoPosition, limit: usize, offset: usize, ) -> Result, MidnightCNightObservationDataSourceError> { let rows = get_registrations( &self.pool, address, mapping_validator_policy_id, auth_asset_name, start, end, limit, offset, ) .await?; let mut utxos = Vec::new(); for row in rows { let header = ObservedUtxoHeader { tx_position: CardanoPosition { block_hash: McBlockHash(row.block_hash.0), block_number: row.block_number.0, block_timestamp: row.block_timestamp.and_utc().into(), tx_index_in_block: row.tx_index_in_block.0, }, tx_hash: McTxHash(row.tx_hash.0), utxo_tx_hash: McTxHash(row.tx_hash.0), utxo_index: UtxoIndexInTx(row.utxo_index.0), }; let Some(constr) = row.full_datum.0.as_constr_plutus_data() else { log::error!("Plutus data for mapping validator not Constr ({header:?})"); continue; }; let (credential, dust_public_key) = match Self::decode_registration_datum(constr) { Ok(pair) => pair, Err(e) => { log::error!("Failed to decode registration datum: {e:?} ({header:?})"); continue; }, }; let reward_address = RewardAddress::new(cardano_network, &credential); // Unwrap here is OK - we know the reward_address is always 29 bytes let cardano_address = reward_address.to_address().to_bytes().try_into().unwrap(); let utxo = ObservedUtxo { header, data: ObservedUtxoData::Registration(RegistrationData { cardano_reward_address: CardanoRewardAddressBytes(cardano_address), dust_public_key, }), }; utxos.push(utxo); } Ok(utxos) } async fn get_deregistration_utxos( &self, cardano_network: u8, address: &str, start: &CardanoPosition, end: &CardanoPosition, limit: usize, offset: usize, ) -> Result, MidnightCNightObservationDataSourceError> { let rows = get_deregistrations(&self.pool, address, start, end, limit, offset).await?; let mut utxos = Vec::new(); for row in rows { let header = ObservedUtxoHeader { tx_position: CardanoPosition { block_hash: McBlockHash(row.block_hash.0), block_number: row.block_number.0, block_timestamp: row.block_timestamp.and_utc().into(), tx_index_in_block: row.tx_index_in_block.0, }, tx_hash: McTxHash(row.tx_hash.0), utxo_tx_hash: McTxHash(row.utxo_tx_hash.0), utxo_index: UtxoIndexInTx(row.utxo_index.0), }; let Some(constr) = row.full_datum.0.as_constr_plutus_data() else { log::error!("Plutus data for mapping validator not Constr ({header:?})"); continue; }; let (credential, dust_public_key) = match Self::decode_registration_datum(constr) { Ok(pair) => pair, Err(e) => { log::error!("Failed to decode registration datum: {e:?} ({header:?})"); continue; }, }; let reward_address = RewardAddress::new(cardano_network, &credential); // Unwrap here is OK - we know the reward_address is always 29 bytes let cardano_address = reward_address.to_address().to_bytes().try_into().unwrap(); let utxo = ObservedUtxo { header, data: ObservedUtxoData::Deregistration(DeregistrationData { cardano_reward_address: CardanoRewardAddressBytes(cardano_address), dust_public_key, }), }; utxos.push(utxo); } Ok(utxos) } #[allow(clippy::too_many_arguments)] async fn get_asset_create_utxos( &self, cardano_network: u8, policy_id: [u8; 28], asset_name: &[u8], start: &CardanoPosition, end: &CardanoPosition, limit: usize, offset: usize, ) -> Result, MidnightCNightObservationDataSourceError> { let rows = crate::db::get_asset_creates( &self.pool, policy_id, asset_name, start, end, limit, offset, ) .await?; let mut utxos = Vec::new(); for row in rows { let header = ObservedUtxoHeader { tx_position: CardanoPosition { block_hash: McBlockHash(row.block_hash.0), block_number: row.block_number.0, block_timestamp: row.block_timestamp.and_utc().into(), tx_index_in_block: row.tx_index_in_block.0, }, tx_hash: McTxHash(row.tx_hash.0), utxo_tx_hash: McTxHash(row.tx_hash.0), utxo_index: UtxoIndexInTx(row.utxo_index.0), }; let Some(cardano_address) = cardano_serialization_lib::Address::from_bech32(&row.holder_address).ok() else { log::error!( "Cardano address {:?} not valid bech32 cardano address", &row.holder_address ); continue; }; let Some(base_address) = BaseAddress::from_address(&cardano_address) else { log::error!("Cardano Address {:?} has no delegation part", &row.holder_address); continue; }; let reward_address = RewardAddress::new(cardano_network, &base_address.stake_cred()); let owner = reward_address.to_address().to_bytes().try_into().unwrap(); let utxo = ObservedUtxo { header, data: ObservedUtxoData::AssetCreate(CreateData { value: row.quantity as u128, owner, utxo_tx_hash: McTxHash(row.tx_hash.0), utxo_tx_index: row.utxo_index.0, }), }; utxos.push(utxo); } Ok(utxos) } #[allow(clippy::too_many_arguments)] async fn get_asset_spend_utxos( &self, cardano_network: u8, policy_id: [u8; 28], asset_name: &[u8], start: &CardanoPosition, end: &CardanoPosition, limit: usize, offset: usize, ) -> Result, MidnightCNightObservationDataSourceError> { let rows = crate::db::get_asset_spends( &self.pool, policy_id, asset_name, start, end, limit, offset, ) .await?; let mut utxos = Vec::new(); for row in rows { let header = ObservedUtxoHeader { tx_position: CardanoPosition { block_hash: McBlockHash(row.block_hash.0), block_number: row.block_number.0, block_timestamp: row.block_timestamp.and_utc().into(), tx_index_in_block: row.tx_index_in_block.0, }, tx_hash: McTxHash(row.spending_tx_hash.0), utxo_tx_hash: McTxHash(row.utxo_tx_hash.0), utxo_index: UtxoIndexInTx(row.utxo_index.0), }; let Some(cardano_address) = cardano_serialization_lib::Address::from_bech32(&row.holder_address).ok() else { log::error!( "Cardano address {:?} not valid bech32 cardano address", row.holder_address ); continue; }; let Some(base_address) = BaseAddress::from_address(&cardano_address) else { log::error!("Cardano Address {:?} has no delegation part", &row.holder_address); continue; }; let reward_address = RewardAddress::new(cardano_network, &base_address.stake_cred()); let owner = reward_address.to_address().to_bytes().try_into().unwrap(); let utxo = ObservedUtxo { header, data: ObservedUtxoData::AssetSpend(SpendData { value: row.quantity as u128, owner, utxo_tx_hash: McTxHash(row.utxo_tx_hash.0), utxo_tx_index: row.utxo_index.0, spending_tx_hash: McTxHash(row.spending_tx_hash.0), }), }; utxos.push(utxo); } Ok(utxos) } }