Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
from routers import attach
from routers import inventory
from api import api_call
from tariff import Tariff
from config import API_KEY as APIKEY

app = FastAPI(title='SmartLinkAPI')

app.state.tariffs = {
tariff['billing_uuid']: unescape(tariff['name'])
tariff['billing_uuid']: Tariff(unescape(tariff['name']))
for tariff in api_call('tariff', 'get')['data'].values()
}
app.state.customer_groups = {
Expand Down
83 changes: 19 additions & 64 deletions routers/customer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from html import unescape
from ipaddress import IPv4Address
from datetime import datetime

from fastapi import APIRouter
from fastapi.requests import Request
Expand All @@ -8,6 +9,7 @@
from api import api_call
from utils import list_to_str, to_2gis_link, to_neo_link, normalize_items, extract_sn, remove_sn,\
parse_agreement, status_to_str, format_mac
from tariff import calc_disconnect

router = APIRouter(prefix='/customer')
PHONE_LENGTH = 9
Expand Down Expand Up @@ -62,7 +64,7 @@ def api_get_customer(request: Request, id: int):
return JSONResponse({'status': 'fail', 'detail': 'customer not found'}, 404)

tariffs = [
{'id': int(tariff['id']), 'name': request.app.state.tariffs[tariff['id']]}
{'id': int(tariff['id']), 'name': request.app.state.tariffs[tariff['id']].content}
for tariff in customer['tariff']['current'] if tariff['id']
]

Expand Down Expand Up @@ -95,66 +97,13 @@ def api_get_customer(request: Request, id: int):
else:
olt_id = olt['finish']['object_id']

will_disconnect = calc_disconnect(
[request.app.state.tariffs[tariff['id']] for tariff in customer['tariff']['current'] if tariff['id']],
customer['balance'], datetime.strptime(customer['date_connect'], '%Y-%m-%d')
) if customer.get('date_connect') else None
if will_disconnect:
will_disconnect = will_disconnect.strftime('%Y-%m-%d')

# INVENTORY
# items = api_call('inventory', 'get_inventory_amount', f'location=customer&object_id={id}')\
# .get('data', {})
# if isinstance(items, dict):
# items = items.values()

# item_names = [
# {
# 'id': str(item['id']),
# 'name': unescape(item['name']),
# 'catalog': item['inventory_section_catalog_id']
# }
# for item in api_call('inventory', 'get_inventory_catalog',
# f'id={list_to_str([str(i["inventory_type_id"]) for i in items])}')['data'].values()
# ]
# inventory = []
# for item in items:
# item_name = [i for i in item_names if i['id'] == str(item['inventory_type_id'])][0]
# inventory.append({
# 'id': item['id'],
# 'catalog_id': item['inventory_type_id'],
# 'name': item_name['name'],
# 'amount': item['amount'],
# 'category_id': item_name['catalog'],
# 'sn': item['serial_number']
# })


# TASK
# tasks_id = str_to_list(api_call('task', 'get_list', f'customer_id={id}')['list'])
# if tasks_id:
# tasks_data = normalize_items(api_call('task', 'show', f'id={list_to_str(tasks_id)}'))
# tasks = []
# for task in tasks_data:
# dates = {}
# if 'create' in task['date']:
# dates['create'] = task['date']['create']
# if 'update' in task['date']:
# dates['update'] = task['date']['update']
# if 'complete' in task['date']:
# dates['complete'] = task['date']['complete']
# if task['type']['name'] != 'Обращение абонента' and \
# task['type']['name'] != 'Регистрация звонка':
# tasks.append({
# 'id': task['id'],
# 'customer_id': task['customer'][0],
# 'employee_id': list(task['staff']['employee'].values())[0]
# if 'staff' in task and 'employee' in task['staff'] else None,
# 'name': task['type']['name'],
# 'status': {
# 'id': task['state']['id'],
# 'name': task['state']['name'],
# 'system_id': task['state']['system_role']
# },
# 'address': task['address']['text'],
# 'dates': dates
# })
# else:
# tasks = []
return {
'status': 'success',
'data': {
Expand All @@ -175,9 +124,6 @@ def api_get_customer(request: Request, id: int):
'is_disabled': bool(customer.get('is_disable', False)),
'is_potential': bool(customer.get('is_potential', False)),

# 'inventory': inventory,
# 'tasks': tasks,

# ONT
'olt_id': olt_id,
'sn': extract_sn(customer['full_name']),
Expand All @@ -188,7 +134,7 @@ def api_get_customer(request: Request, id: int):
# billing
'has_billing': bool(customer.get('is_in_billing', False)),
'billing': {
'id': int(customer['billing_id']) if 'billing_id' in customer and customer['billing_id'] else None,
'id': int(customer['billing_id']) if customer.get('billing_id') else None,
'crc': customer.get('crc_billing')
},
'balance': customer['balance'],
Expand All @@ -205,6 +151,15 @@ def api_get_customer(request: Request, id: int):
'geodata': geodata,

# timestamps
'created_at': customer.get('date_create'),
'connected_at': customer.get('date_connect'),
'agreed_at': datetime.strptime(customer['agreement'][0]['date'], '%d.%m.%Y').strftime('%Y-%m-%d'),
'positive_balance_at': customer.get('date_positive_balance'),
'last_active_at': customer.get('date_activity'),
'last_inet_active_at': customer.get('date_activity_inet'),
'will_disconnect_at': will_disconnect,

# OLD (for backward compitability)
'timestamps': {
'created_at': customer.get('date_create'),
'connected_at': customer.get('date_connect'),
Expand Down
96 changes: 96 additions & 0 deletions tariff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from re import search, escape
from enum import Enum
from datetime import datetime, timedelta
from functools import reduce

class TariffType(Enum):
BASE = 'base' # priced
PROMO = 'promo' # free
SALE = 'sale' # % sale
NONE = 'none' # no price/sale

class Tariff:
def __init__(self, content: str):
# self.id = id
self.content = content
self.price = 0
self.free_days = 0
self.sale = 0
self.sale_days = 0

content = content.lower()

if search(r'\(\d+\s*сом\)', content):
self.type = TariffType.BASE
match = search(r'\((\d+)\s*сом\)', content)
if match:
self.price = int(match.group(1))

elif 'бесплатно' in content:
self.type = TariffType.PROMO
match = search(r'(\S+)\s+бесплатно', content)
if match:
word = match.group(1)

multiplier_match = search(rf'(\d+)\s+{escape(match.group(1))}', content)
multiplier = multiplier_match.group(1) if multiplier_match else 1

if 'месяц' in word:
self.free_days = 30 * int(multiplier)

elif 'день' in word or 'дн' in word:
self.free_days = int(multiplier)

elif '%' in content:
self.type = TariffType.SALE
match = search(r'(\d+)%', content)
if match:
sale = match.group(1)
self.sale = int(sale)

word_match = search(fr'{sale}%\s+на\s+(\w+)', content)
if word_match:
word = word_match.group(1)

if 'год' in word:
self.sale_days = 365
else:
self.type = TariffType.NONE


def calc_disconnect(tariffs: list[Tariff], balance: float, connected_at: datetime) -> datetime | None:
base_sum = sum([t.price for t in tariffs]) / 30 # sum per day
free = sum([t.free_days for t in tariffs])
sale = reduce(lambda a, b: a * b, [t.sale / 100 for t in tariffs if t.type == TariffType.SALE], 0)

saled_sum = (base_sum - base_sum * sale) if sale else base_sum
sale_tariff = next((t for t in tariffs if t.type == TariffType.SALE), None) # first sale tariff
now = datetime.now()
days_since_connect = (now - connected_at).days

if sale_tariff and sale_tariff.sale_days > 0:
sale_remaining = max(0, sale_tariff.sale_days - days_since_connect)
sale_multiplier = 1 - sale_tariff.sale / 100
else:
sale_remaining = 0
sale_multiplier = 1 if not sale_tariff else 1 - sale_tariff.sale / 100

saled_sum = base_sum * sale_multiplier

# spend balance
if sale_remaining > 0:
# sale period cost
sale_period_cost = sale_remaining * saled_sum

if balance >= sale_period_cost:
# balance enough for whole sale period
balance -= sale_period_cost
days = free + sale_remaining + balance / base_sum
else:
# disconnect while sale period
days = free + balance / saled_sum
else:
# sale expired or no sale
days = free + balance / (saled_sum if sale_tariff and sale_tariff.sale_days == 0 else base_sum)

return now + timedelta(days=int(days))