From 18b1430a4430747514230529f7afe069ab769a52 Mon Sep 17 00:00:00 2001 From: yulike Date: Mon, 23 Aug 2021 12:01:24 -0400 Subject: [PATCH 1/2] terra update --- models/terra/terra__transfers.sql | 190 +++++++++++++++++++----------- 1 file changed, 121 insertions(+), 69 deletions(-) diff --git a/models/terra/terra__transfers.sql b/models/terra/terra__transfers.sql index c7c8c871..2fabc1e0 100644 --- a/models/terra/terra__transfers.sql +++ b/models/terra/terra__transfers.sql @@ -2,31 +2,30 @@ config( materialized='incremental', sort='block_timestamp', - unique_key='block_id', + unique_key='block_id || event_from || event_to', incremental_strategy='delete+insert', cluster_by=['block_timestamp'], tags=['snowflake', 'terra', 'transfers'] ) }} - WITH prices as ( SELECT - date_trunc('hour', block_timestamp) as hour, - currency, - symbol, - avg(price_usd) as price_usd - FROM {{ ref('terra__oracle_prices')}} - GROUP BY 1,2,3 + date_trunc('hour', block_timestamp) as hour, + currency, + symbol, + avg(price_usd) as price_usd + FROM {{ ref('terra__oracle_prices')}} + GROUP BY 1,2,3 ), symbol as ( SELECT - currency, - symbol - FROM {{ ref('terra__oracle_prices')}} - WHERE block_timestamp >= CURRENT_DATE - 2 - GROUP BY 1,2 + currency, + symbol + FROM {{ ref('terra__oracle_prices')}} + WHERE block_timestamp >= CURRENT_DATE - 2 + GROUP BY 1,2 ), inputs as( @@ -39,82 +38,135 @@ inputs as( tx_id, msg_type, a.value:address::string as event_from, - a.value:coins[0]:amount / POW(10,6) as event_amount, - a.value:coins[0]:denom::string as event_currency, + a.value:coins[0]:amount / POW(10,6) as event_from_amount, + a.value:coins[0]:denom::string as event_from_currency, a.index as input_index FROM {{source('silver_terra', 'msgs')}} , lateral flatten(input => msg_value:inputs) a WHERE msg_module = 'bank' AND msg_type = 'bank/MsgMultiSend' - - {% if is_incremental() %} - AND block_timestamp >= getdate() - interval '1 days' - -- {% else %} - -- AND block_timestamp >= getdate() - interval '9 months' - {% endif %} + {% if is_incremental() %} + AND block_timestamp >= getdate() - interval '1 days' + {% endif %} ), outputs as( SELECT tx_id, a.value:address::string as event_to, + a.value:coins[0]:amount / POW(10,6) as event_to_amount, + a.value:coins[0]:denom::string as event_to_currency, a.index as output_index FROM {{source('silver_terra', 'msgs')}} , lateral flatten(input => msg_value:outputs) a WHERE msg_module = 'bank' AND msg_type = 'bank/MsgMultiSend' - - {% if is_incremental() %} - AND block_timestamp >= getdate() - interval '1 days' - -- {% else %} - -- AND block_timestamp >= getdate() - interval '9 months' - {% endif %} + {% if is_incremental() %} + AND block_timestamp >= getdate() - interval '1 days' + {% endif %} +), + +transfers_multisend AS ( + SELECT + blockchain, + chain_id, + tx_status, + block_id, + block_timestamp, + i.tx_id, + msg_type, + event_from, + event_from_amount, + event_from_currency, + event_to, + event_to_amount, + event_to_currency, + input_index, + output_index + FROM inputs i + + JOIN outputs o + ON i.tx_id = o.tx_id ), -transfers as( -SELECT - blockchain, - chain_id, - tx_status, - block_id, - block_timestamp, - i.tx_id, - msg_type, - event_from, - event_to, - event_amount, - event_currency -FROM inputs i - -JOIN outputs o - ON i.tx_id = o.tx_id - AND i.input_index = o.output_index +transfers_multisend_1_m_txs AS ( + SELECT DISTINCT + tx_id + FROM transfers_multisend + GROUP BY tx_id + HAVING MAX(input_index) = 0 AND MAX(output_index) <> 0 +), + +transfers_multisend_1_1_txs AS ( + SELECT DISTINCT + tx_id + FROM transfers_multisend + WHERE tx_id NOT IN (SELECT tx_id FROM transfers_multisend_1_m_txs) +), + +transfers AS ( + ( + SELECT + blockchain, + chain_id, + tx_status, + block_id, + block_timestamp, + transfers_multisend_1_1_txs.tx_id, + msg_type, + event_from, + event_to, + event_from_amount AS event_amount, + event_from_currency AS event_currency + FROM transfers_multisend_1_1_txs + JOIN transfers_multisend + ON transfers_multisend_1_1_txs.tx_id = transfers_multisend.tx_id + WHERE transfers_multisend.input_index = transfers_multisend.output_index + ) -UNION - -SELECT - blockchain, - chain_id, - tx_status, - block_id, - block_timestamp, - tx_id, - msg_type, - msg_value:from_address::string as event_from, - msg_value:to_address::string as event_to, - msg_value:amount[0]:amount / pow(10,6) as event_amount, - msg_value:amount[0]:denom::string as event_currency -FROM {{source('silver_terra', 'msgs')}} -WHERE msg_module = 'bank' - AND msg_type = 'bank/MsgSend' - -{% if is_incremental() %} - AND block_timestamp >= getdate() - interval '1 days' --- {% else %} --- AND block_timestamp >= getdate() - interval '9 months' -{% endif %} -) + UNION ALL + ( + SELECT + blockchain, + chain_id, + tx_status, + block_id, + block_timestamp, + transfers_multisend_1_m_txs.tx_id, + msg_type, + event_from, + event_to, + event_to_amount AS event_amount, + event_to_currency AS event_currency + FROM transfers_multisend_1_m_txs + JOIN transfers_multisend + ON transfers_multisend_1_m_txs.tx_id = transfers_multisend.tx_id + ) + + UNION ALL + + ( + SELECT + blockchain, + chain_id, + tx_status, + block_id, + block_timestamp, + tx_id, + msg_type, + msg_value:from_address::string as event_from, + msg_value:to_address::string as event_to, + msg_value:amount[0]:amount / pow(10,6) as event_amount, + msg_value:amount[0]:denom::string as event_currency + FROM {{source('silver_terra', 'msgs')}} + WHERE msg_module = 'bank' + AND msg_type = 'bank/MsgSend' + {% if is_incremental() %} + AND block_timestamp >= getdate() - interval '1 days' + {% endif %} + ) +) SELECT t.blockchain, From d7a65b57c127b296c3555ec4f5857683fd1f5e66 Mon Sep 17 00:00:00 2001 From: yulike Date: Mon, 23 Aug 2021 17:33:09 -0400 Subject: [PATCH 2/2] update the primary key and removed the sort logic --- models/terra/terra__transfers.sql | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/models/terra/terra__transfers.sql b/models/terra/terra__transfers.sql index 2fabc1e0..1cbfd5bb 100644 --- a/models/terra/terra__transfers.sql +++ b/models/terra/terra__transfers.sql @@ -1,8 +1,7 @@ {{ config( materialized='incremental', - sort='block_timestamp', - unique_key='block_id || event_from || event_to', + unique_key='block_id || tx_id || event_from || event_to || event_index', incremental_strategy='delete+insert', cluster_by=['block_timestamp'], tags=['snowflake', 'terra', 'transfers'] @@ -117,7 +116,8 @@ transfers AS ( event_from, event_to, event_from_amount AS event_amount, - event_from_currency AS event_currency + event_from_currency AS event_currency, + transfers_multisend.output_index AS event_index FROM transfers_multisend_1_1_txs JOIN transfers_multisend ON transfers_multisend_1_1_txs.tx_id = transfers_multisend.tx_id @@ -138,7 +138,8 @@ transfers AS ( event_from, event_to, event_to_amount AS event_amount, - event_to_currency AS event_currency + event_to_currency AS event_currency, + transfers_multisend.output_index AS event_index FROM transfers_multisend_1_m_txs JOIN transfers_multisend ON transfers_multisend_1_m_txs.tx_id = transfers_multisend.tx_id @@ -158,7 +159,8 @@ transfers AS ( msg_value:from_address::string as event_from, msg_value:to_address::string as event_to, msg_value:amount[0]:amount / pow(10,6) as event_amount, - msg_value:amount[0]:denom::string as event_currency + msg_value:amount[0]:denom::string as event_currency, + msg_index AS event_index FROM {{source('silver_terra', 'msgs')}} WHERE msg_module = 'bank' AND msg_type = 'bank/MsgSend' @@ -188,7 +190,8 @@ SELECT to_labels.address_name as event_to_address_name, t.event_amount, t.event_amount * price_usd as event_amount_usd, - s.symbol as event_currency + s.symbol as event_currency, + t.event_index FROM transfers t LEFT OUTER JOIN prices o