diff --git a/bot.py b/bot.py index 50390e6..6232fce 100644 --- a/bot.py +++ b/bot.py @@ -5,7 +5,12 @@ ConversationHandler, CallbackContext, ContextTypes) from logger_config import logger -from constants import TIMEOUT_IN_SEC, STATION_SELECT_ONE_TIME, STATION_SELECT_SUBSCRIBE, ONE_TIME, SUBSCRIBE, UNSUBSCRIBE, VALID_SUMMARY_INTERVALS, JOBQUEUE_DELAY, DEFAULT_USER_ID + +from constants import (TIMEOUT_IN_SEC, STATION_SELECT_ONE_TIME, + STATION_SELECT_SUBSCRIBE, ONE_TIME, SUBSCRIBE, + UNSUBSCRIBE, VALID_SUMMARY_INTERVALS, + BOT_JOBQUEUE_DELAY, BOT_DEFAULT_USER_ID, + BOT_MAX_RESCHEDULE_TIME) class PlotBot: @@ -107,28 +112,27 @@ def __init__(self, self.app.add_handler(one_time_forecast_handler) self.app.add_error_handler(self._error) + # schedule jobs self.app.job_queue.run_once( self._override_basetime, when=0, - name='Override basetime', + name='override basetime', ) self.app.job_queue.run_repeating( self._update_basetime, + first=120, interval=60, - first=60, - name='Update basetime', + name='update basetime', ) self.app.job_queue.run_repeating( self._cache_plots, interval=30, - first=30, - name='Cache plots', + name='cache plots', ) self.app.job_queue.run_repeating( - self._broadcast_from_queue, - interval=90, - first=60, - name='Broadcast', + self._broadcast, + interval=30, + name='broadcast', ) async def _override_basetime(self, context: CallbackContext): @@ -138,11 +142,20 @@ async def _update_basetime(self, context: CallbackContext): self._ecmwf.upgrade_basetime_global() self._ecmwf.upgrade_basetime_stations() - async def _send_plot_from_queue(self, context: CallbackContext): + async def _process_request(self, context: CallbackContext): job = context.job user_id, station_name = job.data - plots = self._ecmwf.download_plots([station_name]) - await self._send_plot_to_user(plots, station_name, user_id) + + plots = self._ecmwf.download_plots([station_name + ]).get(station_name, None) + + # plots are available + if plots and len(plots) > 0: + await self._send_plots_to_user(plots, station_name, user_id) + job.schedule_removal() + else: + logger.info( + f"Plots not available for {station_name}, rescheduling job.") def start(self): logger.info('Starting bot') @@ -153,7 +166,7 @@ async def _error(self, update: Update, context: CallbackContext): if update: user_id = update.message.chat_id else: - user_id = DEFAULT_USER_ID + user_id = BOT_DEFAULT_USER_ID logger.error(f"Exception while handling an update: {context.error}") self._db.log_activity( activity_type="bot-error", @@ -338,10 +351,9 @@ async def _subscribe_for_station(self, update: Update, ) self._db.add_subscription(msg_text, user.id) + self._schedule_process_request(f"subscription_{msg_text}_{user.id}", + data=(user.id, msg_text)) logger.info(f' {user.first_name} subscribed for Station {msg_text}') - context.job_queue.run_once(self._send_plot_from_queue, - JOBQUEUE_DELAY, - data=(user.id, msg_text)) self._db.log_activity( activity_type="subscription", @@ -351,6 +363,15 @@ async def _subscribe_for_station(self, update: Update, return ConversationHandler.END + def _schedule_process_request(self, job_name, data): + self.app.job_queue.run_repeating(self._process_request, + first=BOT_JOBQUEUE_DELAY, + interval=60, + last=BOT_MAX_RESCHEDULE_TIME, + name=job_name, + data=data) + logger.debug(f"Scheduled job {job_name} with data {data}") + async def _request_one_time_forecast_for_station( self, update: Update, context: CallbackContext) -> int: user = update.message.from_user @@ -361,9 +382,9 @@ async def _request_one_time_forecast_for_station( reply_markup=ReplyKeyboardRemove(), ) - context.job_queue.run_once(self._send_plot_from_queue, - JOBQUEUE_DELAY, - data=(user.id, msg_text)) + self._schedule_process_request( + f"one_time_forecast_{msg_text}_{user.id}", + data=(user.id, msg_text)) logger.info( f' {user.first_name} requested forecast for Station {msg_text}') @@ -387,22 +408,28 @@ async def _cancel(self, update: Update, context: CallbackContext) -> int: async def _cache_plots(self, context: CallbackContext): self._ecmwf.cache_plots() - async def _send_plot_to_user(self, plots, station_name, user_id): - logger.debug(f'Send plot to user: {user_id}') + async def _send_plots_to_user(self, plots, station_name, user_id): + logger.debug(f'Send plots of {station_name} to user: {user_id}') + try: await self.app.bot.send_message(chat_id=user_id, text=station_name) - for plot in plots[station_name]: + for plot in plots: + logger.debug(f'Plot: {plot}') await self.app.bot.send_photo(chat_id=user_id, photo=open(plot, 'rb')) except Exception as e: - logger.error(f'Error sending plot to user {user_id}: {e}') + logger.error(f'Error sending plots to user {user_id}: {e}') - async def _broadcast_from_queue(self, context: CallbackContext): - plots = self._ecmwf.download_latest_plots( + async def _broadcast(self, context: CallbackContext): + latest_plots = self._ecmwf.download_latest_plots( self._db.stations_with_subscribers()) - if plots: - for station_name in plots: - for user_id in self._db.get_subscriptions_by_station( - station_name): - await self._send_plot_to_user(plots, station_name, user_id) - logger.info('plots sent to all users') + if latest_plots: + for station_name, plots in latest_plots.items(): + if len(plots) == 0: + continue + else: + for user_id in self._db.get_subscriptions_by_station( + station_name): + await self._send_plots_to_user(plots, station_name, + user_id) + logger.info(f'Broadcasted {station_name}') diff --git a/constants.py b/constants.py index af9b9c0..318c8e2 100644 --- a/constants.py +++ b/constants.py @@ -8,6 +8,6 @@ 5) VALID_SUMMARY_INTERVALS = ['24 HOURS', '7 DAYS', '30 DAYS', '1 YEAR'] -JOBQUEUE_DELAY = 10 - -DEFAULT_USER_ID = 999 +BOT_DEFAULT_USER_ID = 999 +BOT_MAX_RESCHEDULE_TIME = 600 # [s] +BOT_JOBQUEUE_DELAY = 10 # [s]