From 46762f26297c8a02f2bf77476b4c2f1ee5e3700c Mon Sep 17 00:00:00 2001 From: yulike Date: Wed, 4 Aug 2021 16:22:06 -0400 Subject: [PATCH 1/2] update the transfer table --- models/terra/terra__transfers.sql | 189 +++++++++++++++++++----------- 1 file changed, 122 insertions(+), 67 deletions(-) diff --git a/models/terra/terra__transfers.sql b/models/terra/terra__transfers.sql index 160c135c..e3872549 100644 --- a/models/terra/terra__transfers.sql +++ b/models/terra/terra__transfers.sql @@ -2,31 +2,30 @@ config( materialized='incremental', sort='block_timestamp', - unique_key='block_id', + unique_key='block_id || event_from || event_to', incremental_strategy='delete+insert', cluster_by=['block_timestamp'], tags=['snowflake', 'terra', 'transfers'] ) }} - WITH prices as ( SELECT - date_trunc('hour', block_timestamp) as hour, - currency, - symbol, - avg(price_usd) as price_usd - FROM {{ ref('terra__oracle_prices')}} - GROUP BY 1,2,3 + date_trunc('hour', block_timestamp) as hour, + currency, + symbol, + avg(price_usd) as price_usd + FROM {{ ref('terra__oracle_prices')}} + GROUP BY 1,2,3 ), symbol as ( SELECT - currency, - symbol - FROM {{ ref('terra__oracle_prices')}} - WHERE block_timestamp >= CURRENT_DATE - 2 - GROUP BY 1,2 + currency, + symbol + FROM {{ ref('terra__oracle_prices')}} + WHERE block_timestamp >= CURRENT_DATE - 2 + GROUP BY 1,2 ), inputs as( @@ -39,82 +38,138 @@ inputs as( tx_id, msg_type, a.value:address::string as event_from, - a.value:coins[0]:amount / POW(10,6) as event_amount, - a.value:coins[0]:denom::string as event_currency, + a.value:coins[0]:amount / POW(10,6) as event_from_amount, + a.value:coins[0]:denom::string as event_from_currency, a.index as input_index FROM {{source('silver_terra', 'msgs')}} , lateral flatten(input => msg_value:inputs) a WHERE msg_module = 'bank' AND msg_type = 'bank/MsgMultiSend' - - {% if is_incremental() %} - AND block_timestamp >= getdate() - interval '1 days' - {% else %} - AND block_timestamp >= getdate() - interval '9 months' - {% endif %} + {% if is_incremental() %} + AND block_timestamp >= getdate() - interval '1 days' + {% else %} + AND block_timestamp >= getdate() - interval '9 months' + {% endif %} ), outputs as( SELECT tx_id, a.value:address::string as event_to, + a.value:coins[0]:amount / POW(10,6) as event_to_amount, + a.value:coins[0]:denom::string as event_to_currency, a.index as output_index FROM {{source('silver_terra', 'msgs')}} , lateral flatten(input => msg_value:outputs) a WHERE msg_module = 'bank' AND msg_type = 'bank/MsgMultiSend' - - {% if is_incremental() %} - AND block_timestamp >= getdate() - interval '1 days' - {% else %} - AND block_timestamp >= getdate() - interval '9 months' - {% endif %} + {% if is_incremental() %} + AND block_timestamp >= getdate() - interval '1 days' + {% else %} + AND block_timestamp >= getdate() - interval '9 months' + {% endif %} ), -transfers as( -SELECT - blockchain, - chain_id, - tx_status, - block_id, - block_timestamp, - i.tx_id, - msg_type, - event_from, - event_to, - event_amount, - event_currency -FROM inputs i +transfers_multisend AS ( + SELECT + blockchain, + chain_id, + tx_status, + block_id, + block_timestamp, + i.tx_id, + msg_type, + event_from, + event_from_amount, + event_from_currency, + event_to, + event_to_amount, + event_to_currency, + input_index, + output_index + FROM inputs i -JOIN outputs o - ON i.tx_id = o.tx_id - AND i.input_index = o.output_index - -UNION + JOIN outputs o + ON i.tx_id = o.tx_id +), -SELECT - blockchain, - chain_id, - tx_status, - block_id, - block_timestamp, - tx_id, - msg_type, - msg_value:from_address::string as event_from, - msg_value:to_address::string as event_to, - msg_value:amount[0]:amount / pow(10,6) as event_amount, - msg_value:amount[0]:denom::string as event_currency -FROM {{source('silver_terra', 'msgs')}} -WHERE msg_module = 'bank' - AND msg_type = 'bank/MsgSend' +transfers_multisend_1_m_txs AS ( + SELECT DISTINCT + tx_id + FROM transfers_multisend + GROUP BY tx_id + HAVING MAX(input_index) = 0 AND MAX(output_index) <> 0 +), -{% if is_incremental() %} - AND block_timestamp >= getdate() - interval '1 days' -{% else %} - AND block_timestamp >= getdate() - interval '9 months' -{% endif %} -) +transfers_multisend_1_1_txs AS ( + SELECT DISTINCT + tx_id + FROM transfers_multisend + WHERE tx_id NOT IN (SELECT tx_id FROM transfers_multisend_1_m_txs) +), +transfers AS ( + SELECT + blockchain, + chain_id, + tx_status, + block_id, + block_timestamp, + transfers_multisend_1_1_txs.tx_id, + msg_type, + event_from, + event_to, + event_from_amount AS event_amount, + event_from_currency AS event_currency + FROM transfers_multisend_1_1_txs + + LEFT JOIN transfers_multisend + ON transfers_multisend_1_1_txs.tx_id = transfers_multisend.tx_id + + WHERE transfers_multisend.input_index = transfers_multisend.output_index + + UNION + + SELECT + blockchain, + chain_id, + tx_status, + block_id, + block_timestamp, + transfers_multisend_1_m_txs.tx_id, + msg_type, + event_from, + event_to, + event_to_amount AS event_amount, + event_to_currency AS event_currency + FROM transfers_multisend_1_m_txs + + LEFT JOIN transfers_multisend + ON transfers_multisend_1_m_txs.tx_id = transfers_multisend.tx_id + + UNION + + SELECT + blockchain, + chain_id, + tx_status, + block_id, + block_timestamp, + tx_id, + msg_type, + msg_value:from_address::string as event_from, + msg_value:to_address::string as event_to, + msg_value:amount[0]:amount / pow(10,6) as event_amount, + msg_value:amount[0]:denom::string as event_currency + FROM {{source('silver_terra', 'msgs')}} + WHERE msg_module = 'bank' + AND msg_type = 'bank/MsgSend' + {% if is_incremental() %} + AND block_timestamp >= getdate() - interval '1 days' + {% else %} + AND block_timestamp >= getdate() - interval '9 months' + {% endif %} +) SELECT t.blockchain, From beacec1962796ec56640c8bd823da46fbc1be797 Mon Sep 17 00:00:00 2001 From: yulike Date: Mon, 9 Aug 2021 11:41:36 -0400 Subject: [PATCH 2/2] updated the transfer data --- models/terra/terra__transfers.sql | 117 +++++++++++++++--------------- 1 file changed, 60 insertions(+), 57 deletions(-) diff --git a/models/terra/terra__transfers.sql b/models/terra/terra__transfers.sql index e3872549..4a2a8735 100644 --- a/models/terra/terra__transfers.sql +++ b/models/terra/terra__transfers.sql @@ -109,66 +109,69 @@ transfers_multisend_1_1_txs AS ( ), transfers AS ( - SELECT - blockchain, - chain_id, - tx_status, - block_id, - block_timestamp, - transfers_multisend_1_1_txs.tx_id, - msg_type, - event_from, - event_to, - event_from_amount AS event_amount, - event_from_currency AS event_currency - FROM transfers_multisend_1_1_txs - - LEFT JOIN transfers_multisend - ON transfers_multisend_1_1_txs.tx_id = transfers_multisend.tx_id - - WHERE transfers_multisend.input_index = transfers_multisend.output_index - - UNION - - SELECT - blockchain, - chain_id, - tx_status, - block_id, - block_timestamp, - transfers_multisend_1_m_txs.tx_id, - msg_type, - event_from, - event_to, - event_to_amount AS event_amount, - event_to_currency AS event_currency - FROM transfers_multisend_1_m_txs + ( + SELECT + blockchain, + chain_id, + tx_status, + block_id, + block_timestamp, + transfers_multisend_1_1_txs.tx_id, + msg_type, + event_from, + event_to, + event_from_amount AS event_amount, + event_from_currency AS event_currency + FROM transfers_multisend_1_1_txs + JOIN transfers_multisend + ON transfers_multisend_1_1_txs.tx_id = transfers_multisend.tx_id + WHERE transfers_multisend.input_index = transfers_multisend.output_index + ) - LEFT JOIN transfers_multisend - ON transfers_multisend_1_m_txs.tx_id = transfers_multisend.tx_id + UNION ALL + + ( + SELECT + blockchain, + chain_id, + tx_status, + block_id, + block_timestamp, + transfers_multisend_1_m_txs.tx_id, + msg_type, + event_from, + event_to, + event_to_amount AS event_amount, + event_to_currency AS event_currency + FROM transfers_multisend_1_m_txs + JOIN transfers_multisend + ON transfers_multisend_1_m_txs.tx_id = transfers_multisend.tx_id + ) - UNION + UNION ALL - SELECT - blockchain, - chain_id, - tx_status, - block_id, - block_timestamp, - tx_id, - msg_type, - msg_value:from_address::string as event_from, - msg_value:to_address::string as event_to, - msg_value:amount[0]:amount / pow(10,6) as event_amount, - msg_value:amount[0]:denom::string as event_currency - FROM {{source('silver_terra', 'msgs')}} - WHERE msg_module = 'bank' - AND msg_type = 'bank/MsgSend' - {% if is_incremental() %} - AND block_timestamp >= getdate() - interval '1 days' - {% else %} - AND block_timestamp >= getdate() - interval '9 months' - {% endif %} + ( + SELECT + blockchain, + chain_id, + tx_status, + block_id, + block_timestamp, + tx_id, + msg_type, + msg_value:from_address::string as event_from, + msg_value:to_address::string as event_to, + msg_value:amount[0]:amount / pow(10,6) as event_amount, + msg_value:amount[0]:denom::string as event_currency + FROM {{source('silver_terra', 'msgs')}} + WHERE msg_module = 'bank' + AND msg_type = 'bank/MsgSend' + {% if is_incremental() %} + AND block_timestamp >= getdate() - interval '1 days' + {% else %} + AND block_timestamp >= getdate() - interval '9 months' + {% endif %} + ) ) SELECT