diff --git a/models/terra/terra__transfers.sql b/models/terra/terra__transfers.sql index 160c135c..4a2a8735 100644 --- a/models/terra/terra__transfers.sql +++ b/models/terra/terra__transfers.sql @@ -2,31 +2,30 @@ config( materialized='incremental', sort='block_timestamp', - unique_key='block_id', + unique_key='block_id || event_from || event_to', incremental_strategy='delete+insert', cluster_by=['block_timestamp'], tags=['snowflake', 'terra', 'transfers'] ) }} - WITH prices as ( SELECT - date_trunc('hour', block_timestamp) as hour, - currency, - symbol, - avg(price_usd) as price_usd - FROM {{ ref('terra__oracle_prices')}} - GROUP BY 1,2,3 + date_trunc('hour', block_timestamp) as hour, + currency, + symbol, + avg(price_usd) as price_usd + FROM {{ ref('terra__oracle_prices')}} + GROUP BY 1,2,3 ), symbol as ( SELECT - currency, - symbol - FROM {{ ref('terra__oracle_prices')}} - WHERE block_timestamp >= CURRENT_DATE - 2 - GROUP BY 1,2 + currency, + symbol + FROM {{ ref('terra__oracle_prices')}} + WHERE block_timestamp >= CURRENT_DATE - 2 + GROUP BY 1,2 ), inputs as( @@ -39,82 +38,141 @@ inputs as( tx_id, msg_type, a.value:address::string as event_from, - a.value:coins[0]:amount / POW(10,6) as event_amount, - a.value:coins[0]:denom::string as event_currency, + a.value:coins[0]:amount / POW(10,6) as event_from_amount, + a.value:coins[0]:denom::string as event_from_currency, a.index as input_index FROM {{source('silver_terra', 'msgs')}} , lateral flatten(input => msg_value:inputs) a WHERE msg_module = 'bank' AND msg_type = 'bank/MsgMultiSend' - - {% if is_incremental() %} - AND block_timestamp >= getdate() - interval '1 days' - {% else %} - AND block_timestamp >= getdate() - interval '9 months' - {% endif %} + {% if is_incremental() %} + AND block_timestamp >= getdate() - interval '1 days' + {% else %} + AND block_timestamp >= getdate() - interval '9 months' + {% endif %} ), outputs as( SELECT tx_id, a.value:address::string as event_to, + a.value:coins[0]:amount / POW(10,6) as event_to_amount, + a.value:coins[0]:denom::string as event_to_currency, a.index as output_index FROM {{source('silver_terra', 'msgs')}} , lateral flatten(input => msg_value:outputs) a WHERE msg_module = 'bank' AND msg_type = 'bank/MsgMultiSend' - - {% if is_incremental() %} - AND block_timestamp >= getdate() - interval '1 days' - {% else %} - AND block_timestamp >= getdate() - interval '9 months' - {% endif %} + {% if is_incremental() %} + AND block_timestamp >= getdate() - interval '1 days' + {% else %} + AND block_timestamp >= getdate() - interval '9 months' + {% endif %} +), + +transfers_multisend AS ( + SELECT + blockchain, + chain_id, + tx_status, + block_id, + block_timestamp, + i.tx_id, + msg_type, + event_from, + event_from_amount, + event_from_currency, + event_to, + event_to_amount, + event_to_currency, + input_index, + output_index + FROM inputs i + + JOIN outputs o + ON i.tx_id = o.tx_id ), -transfers as( -SELECT - blockchain, - chain_id, - tx_status, - block_id, - block_timestamp, - i.tx_id, - msg_type, - event_from, - event_to, - event_amount, - event_currency -FROM inputs i - -JOIN outputs o - ON i.tx_id = o.tx_id - AND i.input_index = o.output_index +transfers_multisend_1_m_txs AS ( + SELECT DISTINCT + tx_id + FROM transfers_multisend + GROUP BY tx_id + HAVING MAX(input_index) = 0 AND MAX(output_index) <> 0 +), + +transfers_multisend_1_1_txs AS ( + SELECT DISTINCT + tx_id + FROM transfers_multisend + WHERE tx_id NOT IN (SELECT tx_id FROM transfers_multisend_1_m_txs) +), + +transfers AS ( + ( + SELECT + blockchain, + chain_id, + tx_status, + block_id, + block_timestamp, + transfers_multisend_1_1_txs.tx_id, + msg_type, + event_from, + event_to, + event_from_amount AS event_amount, + event_from_currency AS event_currency + FROM transfers_multisend_1_1_txs + JOIN transfers_multisend + ON transfers_multisend_1_1_txs.tx_id = transfers_multisend.tx_id + WHERE transfers_multisend.input_index = transfers_multisend.output_index + ) -UNION - -SELECT - blockchain, - chain_id, - tx_status, - block_id, - block_timestamp, - tx_id, - msg_type, - msg_value:from_address::string as event_from, - msg_value:to_address::string as event_to, - msg_value:amount[0]:amount / pow(10,6) as event_amount, - msg_value:amount[0]:denom::string as event_currency -FROM {{source('silver_terra', 'msgs')}} -WHERE msg_module = 'bank' - AND msg_type = 'bank/MsgSend' - -{% if is_incremental() %} - AND block_timestamp >= getdate() - interval '1 days' -{% else %} - AND block_timestamp >= getdate() - interval '9 months' -{% endif %} -) + UNION ALL + ( + SELECT + blockchain, + chain_id, + tx_status, + block_id, + block_timestamp, + transfers_multisend_1_m_txs.tx_id, + msg_type, + event_from, + event_to, + event_to_amount AS event_amount, + event_to_currency AS event_currency + FROM transfers_multisend_1_m_txs + JOIN transfers_multisend + ON transfers_multisend_1_m_txs.tx_id = transfers_multisend.tx_id + ) + + UNION ALL + + ( + SELECT + blockchain, + chain_id, + tx_status, + block_id, + block_timestamp, + tx_id, + msg_type, + msg_value:from_address::string as event_from, + msg_value:to_address::string as event_to, + msg_value:amount[0]:amount / pow(10,6) as event_amount, + msg_value:amount[0]:denom::string as event_currency + FROM {{source('silver_terra', 'msgs')}} + WHERE msg_module = 'bank' + AND msg_type = 'bank/MsgSend' + {% if is_incremental() %} + AND block_timestamp >= getdate() - interval '1 days' + {% else %} + AND block_timestamp >= getdate() - interval '9 months' + {% endif %} + ) +) SELECT t.blockchain,