// This file is part of midnight-node. // Copyright (C) Midnight Foundation // SPDX-License-Identifier: Apache-2.0 // Licensed under the Apache License, Version 2.0 (the "License"); // You may not use this file except in compliance with the License. // You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. use crate::data_source::candidates_data_source::observed_async_trait; use crate::data_source::metrics::{MidnightDataSourceMetrics, start_sub_query_timer}; use crate::db::{MultiAssetCache, get_deregistrations, get_registrations}; use crate::{ CreateData, DeregistrationData, MidnightCNightObservationDataSource, ObservedUtxo, ObservedUtxoData, ObservedUtxoHeader, RegistrationData, SpendData, UtxoIndexInTx, }; use cardano_serialization_lib::{ Address, BaseAddress, ConstrPlutusData, Credential, Ed25519KeyHash, EnterpriseAddress, PlutusData, RewardAddress, ScriptHash, }; use midnight_primitives_cnight_observation::{ CNightAddresses, CardanoPosition, CardanoRewardAddressBytes, DustPublicKeyBytes, ObservedUtxos, }; use sidechain_domain::{McBlockHash, McBlockNumber, McTxHash, McTxIndexInBlock, TX_HASH_SIZE}; pub use sqlx::PgPool; use std::fmt::Debug; #[derive( Debug, Copy, Clone, PartialEq, PartialOrd, parity_scale_codec::Encode, parity_scale_codec::Decode, scale_info::TypeInfo, )] pub struct TxHash(pub [u8; TX_HASH_SIZE]); #[derive( Debug, Clone, PartialEq, parity_scale_codec::Encode, parity_scale_codec::Decode, scale_info::TypeInfo, )] pub struct TxPosition { pub block_hash: McBlockHash, pub block_number: McBlockNumber, pub block_index: McTxIndexInBlock, } #[derive(thiserror::Error, Debug)] pub enum MidnightCNightObservationDataSourceError { #[error("missing reference for block hash `{0}` in db-sync")] MissingBlockReference(McBlockHash), #[error("Error querying database: {0}")] DBQueryError(#[from] sqlx::error::Error), #[error("Error extracting network id from Cardano address")] CardanoNetworkError(String), #[error("Invalid value for mapping validator address")] MappingValidatorInvalidAddress(String), } #[derive(thiserror::Error, Debug)] pub enum RegistrationDatumDecodeError { #[error("Cardano credential not bytes")] CardanoCredentialNotBytes, #[error("Cardano credential invalid tag")] CardanoCredentialInvalidTag(u64), #[error("Cardano credential invalid key hash")] CardanoCredentialInvalidKeyHash, #[error("Cardano credential invalid script hash")] CardanoCredentialInvalidScriptHash, #[error("Dust address not bytes")] DustAddressNotBytes, #[error("Dust address invalid length")] DustAddressInvalidLength(usize), } pub struct MidnightCNightObservationDataSourceImpl { pub pool: PgPool, pub metrics_opt: Option, #[allow(dead_code)] cache_size: u16, multi_asset_cache: MultiAssetCache, } impl MidnightCNightObservationDataSourceImpl { pub fn new( pool: PgPool, metrics_opt: Option, cache_size: u16, ) -> Self { let multi_asset_cache = MultiAssetCache::new(pool.clone()); Self { pool, metrics_opt, cache_size, multi_asset_cache } } } observed_async_trait!( impl MidnightCNightObservationDataSource for MidnightCNightObservationDataSourceImpl { async fn get_utxos_up_to_capacity( &self, config: &CNightAddresses, start_position: &CardanoPosition, current_tip: McBlockHash, tx_capacity: usize, ) -> Result> { let cnight_asset_name = config.cnight_asset_name.as_bytes(); let mapping_validator_address = Address::from_bech32(&config.mapping_validator_address) .map_err(|e| { MidnightCNightObservationDataSourceError::MappingValidatorInvalidAddress( e.to_string(), ) })?; let cardano_network = mapping_validator_address.network_id().map_err(|_| { MidnightCNightObservationDataSourceError::CardanoNetworkError( config.mapping_validator_address.clone(), ) })?; let mapping_validator_policy_id = EnterpriseAddress::from_address(&mapping_validator_address) .ok_or(MidnightCNightObservationDataSourceError::MappingValidatorInvalidAddress( "Not EnterpriseAddress".to_string(), ))? .payment_cred() .to_scripthash() .ok_or(MidnightCNightObservationDataSourceError::MappingValidatorInvalidAddress( "MappingValidator address does not contain a script hash".to_string(), ))?; // Resolve multi_asset.id for auth token and cNight token (cached after first call) let auth_token_ident = self .multi_asset_cache .resolve_ident( &mapping_validator_policy_id.to_bytes(), config.auth_token_asset_name.as_bytes(), ) .await?; let cnight_ident = self .multi_asset_cache .resolve_ident(&config.cnight_policy_id, cnight_asset_name) .await?; // Get end position from cardano block hash let _block_timer = start_sub_query_timer(&self.metrics_opt, "cnight_get_block_by_hash"); let end: CardanoPosition = crate::db::get_block_by_hash(&self.pool, current_tip.clone()) .await? .ok_or(MidnightCNightObservationDataSourceError::MissingBlockReference(current_tip))? .into(); drop(_block_timer); // Increment the end position to tx_index + 1 of the current mainchain position let end = end.increment(); // The "capacity" argument is capacity in terms of TRANSACTIONS, // but the various sql queries below want a capacity in terms of UTXOs. // Use a generous overestimate of how many UTXOs each TX _may_ have. let utxo_capacity = tx_capacity * 64; // Call db methods to get UTXOs (offset + limit) until we reach our capacity // TODO: (possibly) Replace this with grabbing from a queue that's filled async by an offchain thread // ^ We may not have to do the above if the queries are fast enough let (registration_utxos, deregistration_utxos, asset_create_utxos, asset_spend_utxos) = tokio::try_join!( async { let _sq_timer = start_sub_query_timer(&self.metrics_opt, "cnight_get_registrations"); match auth_token_ident { Some(ident) => self.get_registration_utxos( cardano_network, ident, &config.mapping_validator_address, start_position, &end, utxo_capacity, 0, ) .await .map_err(Into::>::into), None => Ok(vec![]), } }, async { let _sq_timer = start_sub_query_timer(&self.metrics_opt, "cnight_get_deregistrations"); self.get_deregistration_utxos( cardano_network, &config.mapping_validator_address, start_position, &end, utxo_capacity, 0, ) .await .map_err(Into::>::into) }, async { let _sq_timer = start_sub_query_timer(&self.metrics_opt, "cnight_get_asset_creates"); match cnight_ident { Some(ident) => self.get_asset_create_utxos( cardano_network, ident, start_position, &end, utxo_capacity, 0, ) .await .map_err(Into::>::into), None => Ok(vec![]), } }, async { let _sq_timer = start_sub_query_timer(&self.metrics_opt, "cnight_get_asset_spends"); match cnight_ident { Some(ident) => self.get_asset_spend_utxos( cardano_network, ident, start_position, &end, utxo_capacity, 0, ) .await .map_err(Into::>::into), None => Ok(vec![]), } } )?; let mut utxos = Vec::with_capacity( registration_utxos.len() + deregistration_utxos.len() + asset_create_utxos.len() + asset_spend_utxos.len(), ); utxos.extend(registration_utxos); utxos.extend(deregistration_utxos); utxos.extend(asset_create_utxos); utxos.extend(asset_spend_utxos); utxos.sort(); // Truncate UTXOs but include full transactions let mut truncated_utxos = Vec::with_capacity(utxo_capacity); let mut num_txs = 0; let mut cur_tx: Option = None; for utxo in utxos { if cur_tx.as_ref().is_none_or(|tx| tx < &utxo.header.tx_position) { num_txs += 1; cur_tx = Some(utxo.header.tx_position.clone()); } if num_txs == tx_capacity { break; } truncated_utxos.push(utxo); } if num_txs < tx_capacity { // We couldn't find enough UTXOs in the range, which means we're up-to-date with the // current_tip Ok(ObservedUtxos { start: start_position.clone(), end, utxos: truncated_utxos }) } else { Ok(ObservedUtxos { start: start_position.clone(), end: truncated_utxos .last() .map_or(start_position.clone(), |u| u.header.tx_position.clone()) .increment(), utxos: truncated_utxos, }) } } } ); impl MidnightCNightObservationDataSourceImpl { fn decode_registration_datum( datum: ConstrPlutusData, ) -> Result<(Credential, DustPublicKeyBytes), RegistrationDatumDecodeError> { // We use a Vec here because the `get` method on `PlutusList` can panic let list: Vec = datum.data().into_iter().cloned().collect(); let Some(cardano_credential) = list.first().and_then(|d| d.as_constr_plutus_data()) else { return Err(RegistrationDatumDecodeError::CardanoCredentialNotBytes); }; let credential = match u64::from(cardano_credential.alternative()) { 0 => cardano_credential .data() .into_iter() .next() .and_then(|d| d.as_bytes()) .and_then(|hash_bytes| Ed25519KeyHash::from_bytes(hash_bytes).ok()) .map(|hash| Credential::from_keyhash(&hash)) .ok_or(RegistrationDatumDecodeError::CardanoCredentialInvalidKeyHash)?, 1 => cardano_credential .data() .into_iter() .next() .and_then(|d| d.as_bytes()) .and_then(|hash_bytes| ScriptHash::from_bytes(hash_bytes).ok()) .map(|hash| Credential::from_scripthash(&hash)) .ok_or(RegistrationDatumDecodeError::CardanoCredentialInvalidScriptHash)?, tag => { return Err(RegistrationDatumDecodeError::CardanoCredentialInvalidTag(tag)); }, }; let Some(dust_address) = list.get(1).and_then(|d| d.as_bytes()) else { return Err(RegistrationDatumDecodeError::DustAddressNotBytes); }; let dust_addr_length = dust_address.len(); let Ok(dust_address) = ::try_from(dust_address) else { return Err(RegistrationDatumDecodeError::DustAddressInvalidLength(dust_addr_length)); }; Ok((credential, dust_address)) } #[allow(clippy::too_many_arguments)] async fn get_registration_utxos( &self, cardano_network: u8, auth_token_ident: i64, address: &str, start: &CardanoPosition, end: &CardanoPosition, limit: usize, offset: usize, ) -> Result, MidnightCNightObservationDataSourceError> { let rows = get_registrations(&self.pool, address, auth_token_ident, start, end, limit, offset) .await?; let mut utxos = Vec::new(); for row in rows { let header = ObservedUtxoHeader { tx_position: CardanoPosition { block_hash: McBlockHash(row.block_hash.0), block_number: row.block_number.0, block_timestamp: row.block_timestamp.and_utc().into(), tx_index_in_block: row.tx_index_in_block.0, }, tx_hash: McTxHash(row.tx_hash.0), utxo_tx_hash: McTxHash(row.tx_hash.0), utxo_index: UtxoIndexInTx(row.utxo_index.0), }; let Some(constr) = row.full_datum.0.as_constr_plutus_data() else { log::error!("Plutus data for mapping validator not Constr ({header:?})"); continue; }; let (credential, dust_public_key) = match Self::decode_registration_datum(constr) { Ok(pair) => pair, Err(e) => { log::error!("Failed to decode registration datum: {e:?} ({header:?})"); continue; }, }; let reward_address = RewardAddress::new(cardano_network, &credential); // Unwrap here is OK - we know the reward_address is always 29 bytes let cardano_address = reward_address.to_address().to_bytes().try_into().unwrap(); let utxo = ObservedUtxo { header, data: ObservedUtxoData::Registration(RegistrationData { cardano_reward_address: CardanoRewardAddressBytes(cardano_address), dust_public_key, }), }; utxos.push(utxo); } Ok(utxos) } async fn get_deregistration_utxos( &self, cardano_network: u8, address: &str, start: &CardanoPosition, end: &CardanoPosition, limit: usize, offset: usize, ) -> Result, MidnightCNightObservationDataSourceError> { let rows = get_deregistrations(&self.pool, address, start, end, limit, offset).await?; let mut utxos = Vec::new(); for row in rows { let header = ObservedUtxoHeader { tx_position: CardanoPosition { block_hash: McBlockHash(row.block_hash.0), block_number: row.block_number.0, block_timestamp: row.block_timestamp.and_utc().into(), tx_index_in_block: row.tx_index_in_block.0, }, tx_hash: McTxHash(row.tx_hash.0), utxo_tx_hash: McTxHash(row.utxo_tx_hash.0), utxo_index: UtxoIndexInTx(row.utxo_index.0), }; let Some(constr) = row.full_datum.0.as_constr_plutus_data() else { log::error!("Plutus data for mapping validator not Constr ({header:?})"); continue; }; let (credential, dust_public_key) = match Self::decode_registration_datum(constr) { Ok(pair) => pair, Err(e) => { log::error!("Failed to decode registration datum: {e:?} ({header:?})"); continue; }, }; let reward_address = RewardAddress::new(cardano_network, &credential); // Unwrap here is OK - we know the reward_address is always 29 bytes let cardano_address = reward_address.to_address().to_bytes().try_into().unwrap(); let utxo = ObservedUtxo { header, data: ObservedUtxoData::Deregistration(DeregistrationData { cardano_reward_address: CardanoRewardAddressBytes(cardano_address), dust_public_key, }), }; utxos.push(utxo); } Ok(utxos) } #[allow(clippy::too_many_arguments)] async fn get_asset_create_utxos( &self, cardano_network: u8, ident: i64, start: &CardanoPosition, end: &CardanoPosition, limit: usize, offset: usize, ) -> Result, MidnightCNightObservationDataSourceError> { let rows = crate::db::get_asset_creates(&self.pool, ident, start, end, limit, offset).await?; let mut utxos = Vec::new(); for row in rows { let header = ObservedUtxoHeader { tx_position: CardanoPosition { block_hash: McBlockHash(row.block_hash.0), block_number: row.block_number.0, block_timestamp: row.block_timestamp.and_utc().into(), tx_index_in_block: row.tx_index_in_block.0, }, tx_hash: McTxHash(row.tx_hash.0), utxo_tx_hash: McTxHash(row.tx_hash.0), utxo_index: UtxoIndexInTx(row.utxo_index.0), }; let Some(cardano_address) = cardano_serialization_lib::Address::from_bech32(&row.holder_address).ok() else { log::debug!( "Cardano address {:?} not valid bech32 cardano address", &row.holder_address ); continue; }; let Some(base_address) = BaseAddress::from_address(&cardano_address) else { // Non-base addresses (enterprise, pointer, reward) carry no stake // credential so they can't be mapped to a reward address — skip silently. continue; }; let reward_address = RewardAddress::new(cardano_network, &base_address.stake_cred()); let owner = reward_address.to_address().to_bytes().try_into().unwrap(); let utxo = ObservedUtxo { header, data: ObservedUtxoData::AssetCreate(CreateData { value: row.quantity as u128, owner, utxo_tx_hash: McTxHash(row.tx_hash.0), utxo_tx_index: row.utxo_index.0, }), }; utxos.push(utxo); } Ok(utxos) } #[allow(clippy::too_many_arguments)] async fn get_asset_spend_utxos( &self, cardano_network: u8, ident: i64, start: &CardanoPosition, end: &CardanoPosition, limit: usize, offset: usize, ) -> Result, MidnightCNightObservationDataSourceError> { let rows = crate::db::get_asset_spends(&self.pool, ident, start, end, limit, offset).await?; let mut utxos = Vec::new(); for row in rows { let header = ObservedUtxoHeader { tx_position: CardanoPosition { block_hash: McBlockHash(row.block_hash.0), block_number: row.block_number.0, block_timestamp: row.block_timestamp.and_utc().into(), tx_index_in_block: row.tx_index_in_block.0, }, tx_hash: McTxHash(row.spending_tx_hash.0), utxo_tx_hash: McTxHash(row.utxo_tx_hash.0), utxo_index: UtxoIndexInTx(row.utxo_index.0), }; let Some(cardano_address) = cardano_serialization_lib::Address::from_bech32(&row.holder_address).ok() else { log::debug!( "Cardano address {:?} not valid bech32 cardano address", row.holder_address ); continue; }; let Some(base_address) = BaseAddress::from_address(&cardano_address) else { // Non-base addresses (enterprise, pointer, reward) carry no stake // credential so they can't be mapped to a reward address — skip silently. continue; }; let reward_address = RewardAddress::new(cardano_network, &base_address.stake_cred()); let owner = reward_address.to_address().to_bytes().try_into().unwrap(); let utxo = ObservedUtxo { header, data: ObservedUtxoData::AssetSpend(SpendData { value: row.quantity as u128, owner, utxo_tx_hash: McTxHash(row.utxo_tx_hash.0), utxo_tx_index: row.utxo_index.0, spending_tx_hash: McTxHash(row.spending_tx_hash.0), }), }; utxos.push(utxo); } Ok(utxos) } }