Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 59 additions & 32 deletions bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
ConversationHandler, CallbackContext, ContextTypes)

from logger_config import logger
from constants import TIMEOUT_IN_SEC, STATION_SELECT_ONE_TIME, STATION_SELECT_SUBSCRIBE, ONE_TIME, SUBSCRIBE, UNSUBSCRIBE, VALID_SUMMARY_INTERVALS, JOBQUEUE_DELAY, DEFAULT_USER_ID

from constants import (TIMEOUT_IN_SEC, STATION_SELECT_ONE_TIME,
STATION_SELECT_SUBSCRIBE, ONE_TIME, SUBSCRIBE,
UNSUBSCRIBE, VALID_SUMMARY_INTERVALS,
BOT_JOBQUEUE_DELAY, BOT_DEFAULT_USER_ID,
BOT_MAX_RESCHEDULE_TIME)


class PlotBot:
Expand Down Expand Up @@ -107,28 +112,27 @@ def __init__(self,
self.app.add_handler(one_time_forecast_handler)
self.app.add_error_handler(self._error)

# schedule jobs
self.app.job_queue.run_once(
self._override_basetime,
when=0,
name='Override basetime',
name='override basetime',
)
self.app.job_queue.run_repeating(
self._update_basetime,
first=120,
interval=60,
first=60,
name='Update basetime',
name='update basetime',
)
self.app.job_queue.run_repeating(
self._cache_plots,
interval=30,
first=30,
name='Cache plots',
name='cache plots',
)
self.app.job_queue.run_repeating(
self._broadcast_from_queue,
interval=90,
first=60,
name='Broadcast',
self._broadcast,
interval=30,
name='broadcast',
)

async def _override_basetime(self, context: CallbackContext):
Expand All @@ -138,11 +142,20 @@ async def _update_basetime(self, context: CallbackContext):
self._ecmwf.upgrade_basetime_global()
self._ecmwf.upgrade_basetime_stations()

async def _send_plot_from_queue(self, context: CallbackContext):
async def _process_request(self, context: CallbackContext):
job = context.job
user_id, station_name = job.data
plots = self._ecmwf.download_plots([station_name])
await self._send_plot_to_user(plots, station_name, user_id)

plots = self._ecmwf.download_plots([station_name
]).get(station_name, None)

# plots are available
if plots and len(plots) > 0:
await self._send_plots_to_user(plots, station_name, user_id)
job.schedule_removal()
else:
logger.info(
f"Plots not available for {station_name}, rescheduling job.")

def start(self):
logger.info('Starting bot')
Expand All @@ -153,7 +166,7 @@ async def _error(self, update: Update, context: CallbackContext):
if update:
user_id = update.message.chat_id
else:
user_id = DEFAULT_USER_ID
user_id = BOT_DEFAULT_USER_ID
logger.error(f"Exception while handling an update: {context.error}")
self._db.log_activity(
activity_type="bot-error",
Expand Down Expand Up @@ -338,10 +351,9 @@ async def _subscribe_for_station(self, update: Update,
)
self._db.add_subscription(msg_text, user.id)

self._schedule_process_request(f"subscription_{msg_text}_{user.id}",
data=(user.id, msg_text))
logger.info(f' {user.first_name} subscribed for Station {msg_text}')
context.job_queue.run_once(self._send_plot_from_queue,
JOBQUEUE_DELAY,
data=(user.id, msg_text))

self._db.log_activity(
activity_type="subscription",
Expand All @@ -351,6 +363,15 @@ async def _subscribe_for_station(self, update: Update,

return ConversationHandler.END

def _schedule_process_request(self, job_name, data):
self.app.job_queue.run_repeating(self._process_request,
first=BOT_JOBQUEUE_DELAY,
interval=60,
last=BOT_MAX_RESCHEDULE_TIME,
name=job_name,
data=data)
logger.debug(f"Scheduled job {job_name} with data {data}")

async def _request_one_time_forecast_for_station(
self, update: Update, context: CallbackContext) -> int:
user = update.message.from_user
Expand All @@ -361,9 +382,9 @@ async def _request_one_time_forecast_for_station(
reply_markup=ReplyKeyboardRemove(),
)

context.job_queue.run_once(self._send_plot_from_queue,
JOBQUEUE_DELAY,
data=(user.id, msg_text))
self._schedule_process_request(
f"one_time_forecast_{msg_text}_{user.id}",
data=(user.id, msg_text))
logger.info(
f' {user.first_name} requested forecast for Station {msg_text}')

Expand All @@ -387,22 +408,28 @@ async def _cancel(self, update: Update, context: CallbackContext) -> int:
async def _cache_plots(self, context: CallbackContext):
self._ecmwf.cache_plots()

async def _send_plot_to_user(self, plots, station_name, user_id):
logger.debug(f'Send plot to user: {user_id}')
async def _send_plots_to_user(self, plots, station_name, user_id):
logger.debug(f'Send plots of {station_name} to user: {user_id}')

try:
await self.app.bot.send_message(chat_id=user_id, text=station_name)
for plot in plots[station_name]:
for plot in plots:
logger.debug(f'Plot: {plot}')
await self.app.bot.send_photo(chat_id=user_id,
photo=open(plot, 'rb'))
except Exception as e:
logger.error(f'Error sending plot to user {user_id}: {e}')
logger.error(f'Error sending plots to user {user_id}: {e}')

async def _broadcast_from_queue(self, context: CallbackContext):
plots = self._ecmwf.download_latest_plots(
async def _broadcast(self, context: CallbackContext):
latest_plots = self._ecmwf.download_latest_plots(
self._db.stations_with_subscribers())
if plots:
for station_name in plots:
for user_id in self._db.get_subscriptions_by_station(
station_name):
await self._send_plot_to_user(plots, station_name, user_id)
logger.info('plots sent to all users')
if latest_plots:
for station_name, plots in latest_plots.items():
if len(plots) == 0:
continue
else:
for user_id in self._db.get_subscriptions_by_station(
station_name):
await self._send_plots_to_user(plots, station_name,
user_id)
logger.info(f'Broadcasted {station_name}')
6 changes: 3 additions & 3 deletions constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
5)
VALID_SUMMARY_INTERVALS = ['24 HOURS', '7 DAYS', '30 DAYS', '1 YEAR']

JOBQUEUE_DELAY = 10

DEFAULT_USER_ID = 999
BOT_DEFAULT_USER_ID = 999
BOT_MAX_RESCHEDULE_TIME = 600 # [s]
BOT_JOBQUEUE_DELAY = 10 # [s]