Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 126 additions & 71 deletions models/terra/terra__transfers.sql
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
{{
config(
materialized='incremental',
sort='block_timestamp',
unique_key='block_id',
unique_key='block_id || tx_id || event_from || event_to || event_index',
incremental_strategy='delete+insert',
cluster_by=['block_timestamp'],
tags=['snowflake', 'terra', 'transfers']
)
}}


WITH prices as (
SELECT
date_trunc('hour', block_timestamp) as hour,
currency,
symbol,
avg(price_usd) as price_usd
FROM {{ ref('terra__oracle_prices')}}
GROUP BY 1,2,3
date_trunc('hour', block_timestamp) as hour,
currency,
symbol,
avg(price_usd) as price_usd
FROM {{ ref('terra__oracle_prices')}}
GROUP BY 1,2,3
),

symbol as (
SELECT
currency,
symbol
FROM {{ ref('terra__oracle_prices')}}
WHERE block_timestamp >= CURRENT_DATE - 2
GROUP BY 1,2
currency,
symbol
FROM {{ ref('terra__oracle_prices')}}
WHERE block_timestamp >= CURRENT_DATE - 2
GROUP BY 1,2
),

inputs as(
Expand All @@ -39,82 +37,138 @@ inputs as(
tx_id,
msg_type,
a.value:address::string as event_from,
a.value:coins[0]:amount / POW(10,6) as event_amount,
a.value:coins[0]:denom::string as event_currency,
a.value:coins[0]:amount / POW(10,6) as event_from_amount,
a.value:coins[0]:denom::string as event_from_currency,
a.index as input_index
FROM {{source('silver_terra', 'msgs')}}
, lateral flatten(input => msg_value:inputs) a
WHERE msg_module = 'bank'
AND msg_type = 'bank/MsgMultiSend'

{% if is_incremental() %}
AND block_timestamp >= getdate() - interval '1 days'
-- {% else %}
-- AND block_timestamp >= getdate() - interval '9 months'
{% endif %}
{% if is_incremental() %}
AND block_timestamp >= getdate() - interval '1 days'
{% endif %}
),

outputs as(
SELECT
tx_id,
a.value:address::string as event_to,
a.value:coins[0]:amount / POW(10,6) as event_to_amount,
a.value:coins[0]:denom::string as event_to_currency,
a.index as output_index
FROM {{source('silver_terra', 'msgs')}}
, lateral flatten(input => msg_value:outputs) a
WHERE msg_module = 'bank'
AND msg_type = 'bank/MsgMultiSend'

{% if is_incremental() %}
AND block_timestamp >= getdate() - interval '1 days'
-- {% else %}
-- AND block_timestamp >= getdate() - interval '9 months'
{% endif %}
{% if is_incremental() %}
AND block_timestamp >= getdate() - interval '1 days'
{% endif %}
),

transfers_multisend AS (
SELECT
blockchain,
chain_id,
tx_status,
block_id,
block_timestamp,
i.tx_id,
msg_type,
event_from,
event_from_amount,
event_from_currency,
event_to,
event_to_amount,
event_to_currency,
input_index,
output_index
FROM inputs i

JOIN outputs o
ON i.tx_id = o.tx_id
),

transfers as(
SELECT
blockchain,
chain_id,
tx_status,
block_id,
block_timestamp,
i.tx_id,
msg_type,
event_from,
event_to,
event_amount,
event_currency
FROM inputs i

JOIN outputs o
ON i.tx_id = o.tx_id
AND i.input_index = o.output_index
transfers_multisend_1_m_txs AS (
SELECT DISTINCT
tx_id
FROM transfers_multisend
GROUP BY tx_id
HAVING MAX(input_index) = 0 AND MAX(output_index) <> 0
),

transfers_multisend_1_1_txs AS (
SELECT DISTINCT
tx_id
FROM transfers_multisend
WHERE tx_id NOT IN (SELECT tx_id FROM transfers_multisend_1_m_txs)
),

transfers AS (
(
SELECT
blockchain,
chain_id,
tx_status,
block_id,
block_timestamp,
transfers_multisend_1_1_txs.tx_id,
msg_type,
event_from,
event_to,
event_from_amount AS event_amount,
event_from_currency AS event_currency,
transfers_multisend.output_index AS event_index
FROM transfers_multisend_1_1_txs
JOIN transfers_multisend
ON transfers_multisend_1_1_txs.tx_id = transfers_multisend.tx_id
WHERE transfers_multisend.input_index = transfers_multisend.output_index
)

UNION

SELECT
blockchain,
chain_id,
tx_status,
block_id,
block_timestamp,
tx_id,
msg_type,
msg_value:from_address::string as event_from,
msg_value:to_address::string as event_to,
msg_value:amount[0]:amount / pow(10,6) as event_amount,
msg_value:amount[0]:denom::string as event_currency
FROM {{source('silver_terra', 'msgs')}}
WHERE msg_module = 'bank'
AND msg_type = 'bank/MsgSend'

{% if is_incremental() %}
AND block_timestamp >= getdate() - interval '1 days'
-- {% else %}
-- AND block_timestamp >= getdate() - interval '9 months'
{% endif %}
)
UNION ALL

(
SELECT
blockchain,
chain_id,
tx_status,
block_id,
block_timestamp,
transfers_multisend_1_m_txs.tx_id,
msg_type,
event_from,
event_to,
event_to_amount AS event_amount,
event_to_currency AS event_currency,
transfers_multisend.output_index AS event_index
FROM transfers_multisend_1_m_txs
JOIN transfers_multisend
ON transfers_multisend_1_m_txs.tx_id = transfers_multisend.tx_id
)

UNION ALL

(
SELECT
blockchain,
chain_id,
tx_status,
block_id,
block_timestamp,
tx_id,
msg_type,
msg_value:from_address::string as event_from,
msg_value:to_address::string as event_to,
msg_value:amount[0]:amount / pow(10,6) as event_amount,
msg_value:amount[0]:denom::string as event_currency,
msg_index AS event_index
FROM {{source('silver_terra', 'msgs')}}
WHERE msg_module = 'bank'
AND msg_type = 'bank/MsgSend'
{% if is_incremental() %}
AND block_timestamp >= getdate() - interval '1 days'
{% endif %}
)
)

SELECT
t.blockchain,
Expand All @@ -136,7 +190,8 @@ SELECT
to_labels.address_name as event_to_address_name,
t.event_amount,
t.event_amount * price_usd as event_amount_usd,
s.symbol as event_currency
s.symbol as event_currency,
t.event_index
FROM transfers t

LEFT OUTER JOIN prices o
Expand Down