diff --git a/.env.development b/.env.development deleted file mode 100644 index 503313c..0000000 --- a/.env.development +++ /dev/null @@ -1,15 +0,0 @@ -MAPLE_BACKEND_IP = "0.0.0.0" -MAPLE_BACKEND_PORT = 3000 - -MAPLE_CHAT_IP = "0.0.0.0" -MAPLE_CHAT_PORT = 5003 - -MAPLE_DATA_FETCHER_SPIDERS_INTERVAL_SECONDS = 10 - -MAPLE_DATA_PATH = "data" -MAPLE_MODEL_ITERATION_PATH = "modelIteration" - -MAPLE_MODEL_ITERATION_DATA_PERSISTENCE_DAYS = 30 - -# SECRET KEYS -# MAPLE_CHATGPT35TURBO_APIKEY = '' diff --git a/.env.production b/.env.production deleted file mode 100644 index 6e2dd27..0000000 --- a/.env.production +++ /dev/null @@ -1,10 +0,0 @@ -MAPLE_BACKEND_IP = "134.117.214.192" -MAPLE_BACKEND_PORT = 80 - - - -MAPLE_DATA_FETCHER_SPIDERS_INTERVAL_SECONDS = 300 - - -# SECRET KEYS -# MAPLE_CHATGPT35TURBO_APIKEY = '' \ No newline at end of file diff --git a/.gitignore b/.gitignore index a9eef75..4383bb1 100644 --- a/.gitignore +++ b/.gitignore @@ -23,4 +23,5 @@ log_* logs/ *.log -summaries.csv \ No newline at end of file +summaries.csv +.env* \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index 13a9585..513a218 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -4,8 +4,6 @@ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ - - { "name": "Python: maple_models", "type": "python", @@ -18,7 +16,7 @@ "debug", "--logname", "maple_models_bert_debug", - "--debug-limits", + // "--debug-limits", // "--run-once" ], "console": "integratedTerminal", @@ -54,7 +52,7 @@ "-i", "600", "-l", - "info" + "debug" ], "console": "integratedTerminal", "justMyCode": true @@ -82,7 +80,7 @@ }, { "name": "Python: data transfer from prod", - "type": "python", + "type": "debugpy", "request": "launch", "program": "scripts/transfer_data.py", // "args": ["-s", "0.0.0.0:3000", "-d", "0.0.0.0:3000", "-n", "1"], @@ -103,7 +101,7 @@ }, { "name": "Python: Create summaries", - "type": "python", + "type": "debugpy", "request": "launch", "program": "scripts/create_summaries.py", // "args": ["-s", "0.0.0.0:3000", "-d", "0.0.0.0:3000", "-n", "1"], @@ -116,5 +114,23 @@ "console": "integratedTerminal", "justMyCode": true }, + { + "name": "Python: delete model iteration", + "type": "debugpy", + "request": "launch", + "program": "runtime_scripts/delete_model_iteration.py", + "args": [ + "-t", + "old", + "-a", + "-c", + "-l", + "debug", + "--use_config", + ], + "console": "integratedTerminal", + "justMyCode": true + }, + ] } \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index 82175a2..5ad5378 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -12,5 +12,17 @@ "maple_proc", "maple_structures", "~/rcs-utils", - ] + ], + "python.testing.unittestArgs": [ + "-v", + "-s", + "./maple_interface/tests", + "-s", + "tests", + "-p", + "test_*.py" + ], + "python.testing.pytestEnabled": false, + "python.testing.unittestEnabled": true, + "auto-scroll.enabled": false } \ No newline at end of file diff --git a/install.sh b/install.sh index 0b88b1b..e98f4dc 100755 --- a/install.sh +++ b/install.sh @@ -24,6 +24,8 @@ install_packages(){ pip install -r requirements.txt + pip install scrapy-fake-useragent + pip install --upgrade pip setuptools pip install python-socketio python-socketio[client] @@ -60,7 +62,7 @@ install_packages(){ pip install -e . cd ../ - [ -d RTPTResearch ] && echo "RTPTResearch directory already exist || git clone git@github.com:ResearchComputingServices/RTPTResearch.git + [ -d RTPTResearch ] && echo "RTPTResearch directory already exist" || git clone git@github.com:ResearchComputingServices/RTPTResearch.git cd RTPTResearch git pull pip install -e . @@ -71,8 +73,8 @@ install_packages(){ create_pm2_tasks(){ pm2 delete chatgpt 2> /dev/null && pm2 start runtime_scripts/chatgpt.py --interpreter .venv/bin/python3 - pm2 delete data_fetcher 2> /dev/null && pm2 start maple_data_fetcher/data_fetcher.py --interpreter .venv/bin/python3 -- -e prod -i 600 -l info - pm2 delete delete_model_iteration 2> /dev/null && pm2 start runtime_scripts/delete_model_iteration.py --interpreter .venv/bin/python3 -- -t old -a -l debug + pm2 delete data_fetcher 2> /dev/null && pm2 start runtime_scripts/data_fetcher.py --interpreter .venv/bin/python3 -- -e prod -i 600 -l info + pm2 delete delete_model_iteration 2> /dev/null && pm2 start runtime_scripts/delete_model_iteration.py --interpreter .venv/bin/python3 -- -t old -a -c -l debug --use_config pm2 delete maple_models_bert 2> /dev/null && pm2 start runtime_scripts/maple_models.py --interpreter .venv/bin/python3 --name maple_models_bert -- --model bert --level debug --logname maple_models_bert pm2 save pm2 kill diff --git a/maple_chat/maple_chatgpt/chatgpt_server.py b/maple_chat/maple_chatgpt/chatgpt_server.py index a4ae74c..74458b0 100644 --- a/maple_chat/maple_chatgpt/chatgpt_server.py +++ b/maple_chat/maple_chatgpt/chatgpt_server.py @@ -7,7 +7,7 @@ import time import random import rcs -from maple_processing.process import chatgpt_summary, chatgpt_topic_name, chatgpt_bullet_summary +from maple_processing.process import LLMProcess, chatgpt_summary, chatgpt_topic_name, chatgpt_bullet_summary from maple_structures import Article, Topic from maple_interface import MapleAPI from .utils import JobType @@ -78,7 +78,8 @@ def __init__( socket_io_port: int, socket_io_api_key: str, chatgpt_api_key: str = None, - article_fetching: bool = False) -> None: + article_fetching: bool = False, + use_config: bool = True) -> None: super().__init__(ping_timeout=600) self.logger=logging.getLogger('ChatgptServer') @@ -87,6 +88,8 @@ def __init__( self.maple_keys_in_use = [] self.maple_clients = [] self.maple_jobs = [] + self._maple_config = None + self._use_config = use_config self.maple_api = maple_api self._socket_io_port = socket_io_port @@ -99,7 +102,26 @@ def __init__( self.attach(self._app) self.register_namespace(ChatgptServerNamespace('/')) self.loop = asyncio.get_event_loop() - + + async def update_config(self): + FETCH_CONFIG_INTERVAL = 60 + while True: + if self._use_config: + max_attempts = 5 + attempts = 0 + while self._maple_config is None and attempts < max_attempts: + self.logger.debug("Fetching maple_config") + maple_config = self.maple_api.config_get() + if maple_config is not None: + if self._maple_config != maple_config: + self.logger.info('maple_config has changed') + self._maple_config = maple_config + self.logger.info('Updated maple_config') + break + attempts +=1 + await asyncio.sleep(1) + await asyncio.sleep(FETCH_CONFIG_INTERVAL) + def maple_add_job(self, sid: str, api_key: str, job_type: JobType, job_details: any): with self.maple_lock: @@ -207,7 +229,9 @@ async def _process_job_summary(self, job, force_update: bool = False): return for _ in range(3): try: - summary = await chatgpt_summary(article.content, job['api_key']) + llm_process = LLMProcess(config = self._maple_config) + summary = await llm_process.get_summary(article.content, job['api_key']) + # summary = await chatgpt_summary(article.content, job['api_key']) # summary = chatgpt_summary(article.content, job['api_key']) break except Exception as exc: @@ -227,7 +251,10 @@ async def _process_job_topic_name(self, job): job_send = job.copy() while True: try: - topic_name = await chatgpt_topic_name( job_send['job_details']['keyword'], job_send['api_key']) + llm_process = LLMProcess(config = self._maple_config) + topic_name = await llm_process.get_topic_name(job['job_details']['keyword'], job['api_key']) + + # topic_name = await chatgpt_topic_name( job_send['job_details']['keyword'], job_send['api_key']) job_send['results'] = topic_name break except Exception as exc: @@ -253,7 +280,11 @@ async def _process_job_bullet_summary(self, job): while True: try: - bullet_summary = await chatgpt_bullet_summary(job['job_details']['content'], job['api_key']) + articles = job['job_details']['content'] + llm_process = LLMProcess(config = self._maple_config) + bullet_summary = await llm_process.get_bullet_summary(articles, job['api_key']) + + # bullet_summary = await chatgpt_bullet_summary(articles, job['api_key']) job_send['results'] = bullet_summary break except Exception as exc: @@ -346,11 +377,13 @@ async def _process(self): def run(self): """run server and tasks """ + self.start_background_task(self.update_config) self.start_background_task(self._process) # self.loop.create_task(self._process()) if self._article_fetching: self.start_background_task(self._fetch_pending_summaries) # self.loop.create_task(self._fetch_pending_summaries()) + web.run_app( self._app, host = self._socket_io_ip, diff --git a/maple_data_fetcher/data_fetcher.py b/maple_data_fetcher/data_fetcher.py index 9eedfaf..63b40a1 100644 --- a/maple_data_fetcher/data_fetcher.py +++ b/maple_data_fetcher/data_fetcher.py @@ -5,8 +5,10 @@ import os import sys import time +from maple_interface.maple import MapleAPI import rcs from maple_config import config as cfg + sys.path.append(os.path.join(os.path.abspath(""), "newsscrapy")) print(sys.path) @@ -65,11 +67,14 @@ class DataFetcher: def __init__( self, + backend_ip: str, + backend_port: str, spiders: list = [scrapyCBC.CBCHeadlinesSpider, scrapyCTVNews.CTVNewsSpider], spider_output_file: bool = False, spider_log_level: str = "warning", spider_interval_sec: int = 120, - environment= '.env.development' + environment= '.env.development', + ) -> None: self.spider_output_file = spider_output_file @@ -82,7 +87,28 @@ def __init__( self._spider_interval_sec = spider_interval_sec self._spiders = spiders - + + self._maple_api = MapleAPI( + f"http://{backend_ip}:{backend_port}" + ) + + def _update_spider_interval_sec_from_config(self): + if self._environment == '.env.development': + logger.info("Development environment. Not updating spider interval from config.") + return + config = self._maple_api.config_get() + if isinstance(config, dict): + if 'spider_interval_seconds' in config: + if self._spider_interval_sec != config['spider_interval_seconds']: + logger.info("Updating spider interval from %s to %s", self._spider_interval_sec, config['spider_interval_seconds']) + self._spider_interval_sec = config['spider_interval_seconds'] + elif config == {}: + logger.warning("Failed to retrieve config from backend. Using last updated spider interval.") + else: + logger.error("spider_interval_seconds not found in config.") + else: + logger.warning("Failed to get config from backend. Using last updated spider interval.") + def _get_project_settings(self): self._scrapy_settings = get_project_settings() @@ -129,6 +155,8 @@ def _catch_error(self, failure): def _crawl(self, spider): """crawl a spider and set callback to schedule next crawl""" logger.info("Crawling spider: %s", spider) + # Fetch config from backend to update interval in case it changed. + self._update_spider_interval_sec_from_config() job = self._crawl_job(spider) job.addCallback(self._schedule_next_crawl, self._spider_interval_sec, spider) # job.errback(self._catch_error) @@ -173,6 +201,8 @@ def main(args): logger.debug("config: %s", config) data_fetcher = DataFetcher( + backend_ip=config['MAPLE_BACKEND_IP'], + backend_port=config['MAPLE_BACKEND_PORT'], spider_output_file=args.o, spider_interval_sec=args.i, spider_log_level=args.l, @@ -183,4 +213,4 @@ def main(args): if __name__ == "__main__": args = parser.parse_args() - main(args) + main(args) \ No newline at end of file diff --git a/maple_interface/maple_interface/maple.py b/maple_interface/maple_interface/maple.py index 3df1529..6893e88 100644 --- a/maple_interface/maple_interface/maple.py +++ b/maple_interface/maple_interface/maple.py @@ -1,7 +1,7 @@ +from typing import Union, List +import logging import requests from requests import exceptions as request_exc - -import logging from maple_structures import Article from maple_structures import Topic from maple_structures import Model @@ -17,12 +17,15 @@ def __init__( mapleapi, limit: int = 100, page: int = 0, - hours: int = None + hours: int = None, + skip : int = None, ) -> None: self._mapleapi = mapleapi self._limit = limit self._page_start = page + self._page = page self._hours = hours + self._skip = skip def __iter__(self): self._page = self._page_start @@ -32,7 +35,11 @@ def __iter__(self): def __next__(self): articles = self._mapleapi.article_get( - limit=self._limit, page=self._page, hours=self._hours) + limit=self._limit, + page=self._page, + hours=self._hours, + skip=self._skip, + ) if isinstance(articles, requests.Response): raise StopIteration @@ -92,6 +99,17 @@ def _delete(self, path: str, uuid: str, timeout=10): logger.error(exc) return response + def config_get(self): + response = self._get("config") + if response.status_code != 200: + return response + else: + try: + return response.json() + except Exception as exc: + logger.error(exc) + return {} + def article_post(self, article: Article, update: bool = False): "Posts an article in the database." response = self._post("article", params=None, body=article.to_dict()) @@ -129,7 +147,8 @@ def article_get( page: int = None, hours: int = None, url: str = None, - uuid: str = None): + uuid: str = None, + skip: int = None): params = dict() if limit is not None: params["limit"] = limit @@ -141,6 +160,8 @@ def article_get( params["url"] = url if uuid is not None: params['uuid'] = uuid + if skip is not None: + params['skip'] = skip response = self._get("article", params=params) if response.status_code != 200: return response @@ -155,13 +176,25 @@ def article_get( logger.error(exc) return [] - def article_iterator(self, limit: int = 100, page: int = None, hours: int = None): + def article_count_get(self) -> int: + response = self._get("article/count") + if response.status_code != 200: + return response + else: + try: + return response.json() + except Exception as exc: + logger.error(exc) + return 0 + + def article_iterator(self, limit: int = 100, page: int = None, hours: int = None, skip: int = None): '''function to iterate through articles.''' return iter(Articles( self, limit=limit if limit is not None else 100, page=page if page is not None else 0, - hours=hours)) + hours=hours, + skip=skip)) # while True: # articles = self.article_get(limit, page, hours) # if isinstance(articles, requests.Response): @@ -213,7 +246,8 @@ def topic_put(self, topic: Topic) -> Topic: if response.status_code == 200: try: return Topic.from_dict(response.json()) - except: + except Exception as exc: + logger.error(exc) return response return response @@ -304,8 +338,15 @@ def model_iteration_post(self, model_iteration: ModelIteration, include_model=Tr raise ConnectionError() return {} - def model_iteration_get(self): - response = self._get("model-iteration") + def model_iteration_get(self, uuid: str = None, reduced: bool = None, type_: str = None, complete: bool = None, **kwargs) -> List[ModelIteration]: + params = dict() + for var, val in zip(['uuid', 'reduced', 'type', 'complete'], [uuid, reduced, type_, complete]): + if val is not None: + if isinstance(val, bool): + params[var] = str(val).lower() + else: + params[var] = val + response = self._get("model-iteration", params=params, **kwargs) if response.status_code != 200: return response else: @@ -337,8 +378,8 @@ def model_iteration_put( return response return response - def model_iteration_delete(self, uuid: str): - response = self._delete(path="model-iteration", uuid=uuid) + def model_iteration_delete(self, uuid: str, timeout=10): + response = self._delete(path="model-iteration", uuid=uuid, timeout=timeout) return response.status_code def processed_post_many(self, processed: list[Processed]): diff --git a/maple_interface/tests/__init__.py b/maple_interface/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/maple_interface/tests/test_maple.py b/maple_interface/tests/test_maple.py new file mode 100644 index 0000000..3f11e52 --- /dev/null +++ b/maple_interface/tests/test_maple.py @@ -0,0 +1,79 @@ +import asyncio +from json import load +import unittest +from requests import Response +from maple_config.config import load_config +from maple_interface import MapleAPI +from maple_processing.process import personalized_summary, personalized_topic_name + + +class TestConfigGet(unittest.TestCase): + def setUp(self) -> None: + self.maple = MapleAPI( + authority="http://localhost:3000") + + def test_config_get(self): + config = self.maple.config_get() + if isinstance(config, Response): + print("Could not reach the server") + else: + self.assertTrue(isinstance(config, dict)) + keys = [ + "article_summary_length", + "max_bullet_points", + "model_iteration_persistence_days", + "spider_interval_seconds", + ] + for key in keys: + self.assertTrue(key in config, f"Key {key} not found in config") + + print(config) + +class TestPersonalizedRequests(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self) -> None: + # self.loop = asyncio.new_event_loop() + # asyncio.set_event_loop(None) + self.config = load_config(environment="development") + + self.maple = MapleAPI( + authority="http://localhost:3000") + self.backend_config = self.maple.config_get() + + + async def test_personalized_summary(self): + config = self.backend_config + article = self.maple.article_get(limit=1) + article = article[0] + # TODO remove comment and implement in backend. + # prompt = config['prompts']['default'] + + prompt = None + if config['model']['selectedModel'] in config['prompts']: + prompt = config['prompts'][config['model']['selectedModel']]['summary'] + elif config['model']['default'] in config['prompts']: + prompt = config['prompts'][config['model']['default']]['summary'] + + response = await personalized_summary( + host = config['model']['host'], + port= config['model']['port'], + api_key=config['model']['api_key'], + model_type=config['model']['selectedModel'], + prompt=prompt, + content=article.content, + ) + + print(response) + + async def test_personalized_topic_name(self): + config = self.backend_config + keywords = "politics, canada, immigration, refugees, economy" + + response = await personalized_topic_name( + host = config['model']['host'], + port= config['model']['port'], + api_key=config['model']['api_key'], + model_type=config['model']['selectedModel'], + keywords=[k.strip() for k in keywords.split(',')], + ) + + print(response) diff --git a/maple_proc/maple_processing/process.py b/maple_proc/maple_processing/process.py index 4976a2a..e5ead80 100644 --- a/maple_proc/maple_processing/process.py +++ b/maple_proc/maple_processing/process.py @@ -1,12 +1,13 @@ """functions to extract information from articles.""" import logging -import nltk import re import heapq -from maple_structures import Article -# from openai import OpenAI -from openai import AsyncOpenAI import asyncio +import aiohttp +import nltk +from openai import AsyncOpenAI, api_key +from maple_structures import Article + logger = logging.getLogger('maple_proc') @@ -111,7 +112,22 @@ def sentiment_analysis(articles: list[Article]) -> None: for article in articles: article.sentiment = sentiment.polarity_scores(article.content) - + +async def chatgpt_summary_v2(prompt, content: str, api_key: str): + client = AsyncOpenAI(api_key=api_key) + + completion = await client.chat.completions.create( + model="gpt-3.5-turbo-16k", + messages=[ + { + "role":"user", + "content": f"{prompt} '{content}'", + }, + ], + timeout = 40, + ) + return completion.choices[0].message.content + async def chatgpt_summary(content: str, api_key: str): client = AsyncOpenAI(api_key=api_key) @@ -127,6 +143,9 @@ async def chatgpt_summary(content: str, api_key: str): ) return completion.choices[0].message.content +default_prompts = { + 'summary': "summarize in less than 300 words the following content: ", +} async def chatgpt_topic_name(keywords: list[str], api_key: str): client = AsyncOpenAI(api_key=api_key) @@ -162,3 +181,238 @@ async def chatgpt_bullet_summary(content: list[str], api_key: str): out = completion.choices[0].message.content logger.debug("chatgpt bullet summary returned: %s", out) return out.split('\n') + + +async def personalized_summary( + host: str, + port: str, + model_type: str, + content: str, + api_key: str | None, + prompt: str = None, + timeout: int = 300, + max_tokens: int = None, + ): + headers = {} + if api_key is not None: + headers["x-api-key"] = api_key + if prompt is None: + prompt = default_prompts["summary"] + + body = { + "model_type": model_type, + "articles": [content], + "prompt": prompt, + } + if max_tokens is not None: + body["max_tokens"] = max_tokens + + async with aiohttp.ClientSession(trust_env=True) as session: + async with session.post( + f"{host}:{port}/llm/article_summary", + headers=headers, + timeout=timeout, + json=body, + ssl=False, + ) as response: + data = await response.json() + if len(data) == 0: + return None + return data[0] + +async def personalized_topic_name( + host: str, + port: str, + model_type: str, + keywords: list[str], + api_key: str | None, + prompt: str = None, + timeout: int = 300, + max_tokens: int = None, + ): + headers = {} + if api_key is not None: + headers["x-api-key"] = api_key + + # content = ', '.join(keywords) + body = { + "model_type": model_type, + "keywords": keywords, + "prompt": prompt if prompt is not None else "In two words represent this: ", + } + if max_tokens is not None: + body["max_tokens"] = max_tokens + + async with aiohttp.ClientSession(trust_env=True) as session: + async with session.post( + f"{host}:{port}/llm/topic_name", + headers=headers, + timeout=timeout, + json=body, + ssl=False, + ) as response: + data = await response.json() + if len(data) == 0: + return None + return data + +async def personalized_bullet_point(host: str, + port: str, + model_type: str, + articles: list[str], + api_key: str | None, + prompt: str = None, + timeout: int = 300, + max_tokens: int = None,): + headers = {} + if api_key is not None: + headers["x-api-key"] = api_key + body = { + "model_type": model_type, + "articles": articles, + "prompt": prompt, + } + if max_tokens is not None: + body["max_tokens"] = max_tokens + + async with aiohttp.ClientSession(trust_env=True) as session: + async with session.post( + f"{host}:{port}/llm/bullet_point", + headers=headers, + timeout=timeout, + json=body, + ssl=False, + ) as response: + data = await response.json() + if len(data) == 0: + return None + return data + +class LLMProcess: + def __init__(self, config:dict): + self.set_config(config) + + def set_config(self, config: dict): + self._config = config + + @property + def summary_prompt(self): + prompt = 'You are a reporter. Your task is to create a summary of an article with a limit of 50 words. Do not include any description of the task.' + if self._config: + model_name = self.get_model_name() + if model_name == 'chatgpt': + if 'chatgpt' in self._config['prompts']: + prompt = self._config['prompts']['chatgpt']['summary'] + else: + if model_name in self._config['prompts']: + prompt = self._config['prompts'][model_name]['summary'] + + return prompt + + @property + def topic_name_prompt(self): + prompt = 'You are provided with a list of keywords. Your task is to find the best possible word to represent them. Provide at least one word, and a maximum of 3 words.\n# Keywords:\n' + if self._config: + model_name = self.get_model_name() + if model_name == 'chatgpt': + if 'chatgpt' in self._config['prompts']: + prompt = self._config['prompts']['chatgpt']['topic_name'] + else: + if model_name in self._config['prompts']: + prompt = self._config['prompts'][model_name]['topic_name'] + return prompt + + @property + def bullet_summary_prompt(self): + prompt = "Given the articles, create a list with maximum of 5 bullet points, and each bullet point should not exceed 50 words.\n#Articles:\n" + if self._config: + model_name = self.get_model_name() + if model_name == 'chatgpt': + if 'chatgpt' in self._config['prompts']: + prompt = self._config['prompts']['chatgpt']['bullet_points'] + else: + if model_name in self._config['prompts']: + prompt = self._config['prompts'][model_name]['bullet_points'] + return prompt + + def get_model_type(self): + model_type = 'ChatGPT' + if self._config: + if 'model' in self._config: + if 'name' in self._config['model']: + model_type = self._config['model']['name'] + return model_type + + def get_model_name(self): + model_type = self.get_model_type() + model_name='chatgpt' + if model_type == 'ChatGPT': + model_name = model_type + elif model_type == 'Personalized': + model_name = self._config['model']['selectedModel'] + return model_name + + def get_api_key(self, api_key: str = None): + if self._config: + if self.get_model_type() == 'ChatGPT': + if 'api_key' in self._config['model']['config']: + api_key = self._config['model']['config']['api_key'] + elif self.get_model_type() == 'Personalized': + if 'api_key' in self._config['model']: + api_key = self._config['model']['api_key'] + return api_key + + async def get_summary(self, content: str, chatgpt_api_key: str): + model_type = self.get_model_type() + if not self._config: + return await chatgpt_summary_v2(self.summary_prompt, content, api_key=chatgpt_api_key) + else: + api_key_ = self.get_api_key(chatgpt_api_key) + if model_type == 'ChatGPT': + return await chatgpt_summary_v2(self.summary_prompt, content, api_key=api_key_) + elif model_type == 'Personalized': + return await personalized_summary( + host=self._config['model']['host'], + port=self._config['model']['port'], + model_type=self.get_model_name(), + content=content, + api_key=self.get_api_key(), + prompt=self.summary_prompt, + ) + + async def get_topic_name(self, keywords: list[str], chatgpt_api_key: str): + model_type = self.get_model_type() + if not self._config: + return await chatgpt_topic_name(keywords, chatgpt_api_key) + else: + api_key_ = self.get_api_key(chatgpt_api_key) + if model_type == 'ChatGPT': + return await chatgpt_topic_name(keywords, api_key_) + elif model_type == 'Personalized': + return await personalized_topic_name( + host=self._config['model']['host'], + port=self._config['model']['port'], + model_type=self.get_model_name(), + keywords=keywords, + api_key=api_key_, + prompt=self.topic_name_prompt, + ) + + async def get_bullet_summary(self, content: list[str], chatgpt_api_key: str): + model_type = self.get_model_type() + if not self._config: + return await chatgpt_bullet_summary(content, chatgpt_api_key) + else: + api_key_ = self.get_api_key(chatgpt_api_key) + if model_type == 'ChatGPT': + return await chatgpt_bullet_summary(content, api_key_) + elif model_type == 'Personalized': + return await personalized_bullet_point( + host=self._config['model']['host'], + port=self._config['model']['port'], + model_type=self.get_model_name(), + articles=content, + api_key=api_key_, + prompt=self.bullet_summary_prompt, + ) + \ No newline at end of file diff --git a/maple_proc/maple_processing/processing.py b/maple_proc/maple_processing/processing.py index 1af6583..25e18eb 100644 --- a/maple_proc/maple_processing/processing.py +++ b/maple_proc/maple_processing/processing.py @@ -24,7 +24,7 @@ class MapleProcessing: DEBUG_LIMIT_PROCESS_COUNT = 2000 - ARTICLE_PAGE_SIZE=1000 + ARTICLE_PAGE_SIZE=200 def __init__( self, *, @@ -40,6 +40,9 @@ def __init__( ): self.logger = logging.getLogger('MapleProcessing') self.maple_api = maple + self._last_updated_maple_config = None + self._maple_config = None + self.logger.debug(f"Maple config: {self.maple_config}") self._models = models self._hours = hours self._max_hours = max_hours @@ -57,7 +60,18 @@ def _init_vars(self): self._training_data = [] self._article_classified = [] self._processed = [] - + + @property + def maple_config(self): + if getattr(self, '_maple_config', None) is None: + self._maple_config = self.maple_api.config_get() + self._last_updated_maple_config = time.time() + elif time.time()-self._last_updated_maple_config > 60: + self._maple_config = self.maple_api.config_get() + self._last_updated_maple_config = time.time() + self.logger.debug("Maple config (%s): %s", self._last_updated_maple_config, self.maple_config) + return self._maple_config + @property def models(self): return [ @@ -133,8 +147,22 @@ def _classify_all_articles(self): model_structure=model_structure, keep_fields=['status']) + maple_config = self.maple_config + if maple_config: + if 'max_articles_per_model_iteration' in maple_config: + max_articles = maple_config['max_articles_per_model_iteration'] + else: + self.logger.warning("max_articles_per_model_iteration not found in config. Using default value (2000).") + max_articles = 2000 + articles_in_db = self.maple_api.article_count_get() + if isinstance(articles_in_db, int): + skip_article_count = articles_in_db - max_articles + else: + skip_article_count = 0 + else: + skip_article_count = 0 - for articles_it in self.maple_api.article_iterator(limit=self.ARTICLE_PAGE_SIZE, page=0): + for articles_it in self.maple_api.article_iterator(limit=self.ARTICLE_PAGE_SIZE, page=0, skip=skip_article_count): time_start = timeit.default_timer() # remove articles without chat_summaries. @@ -201,8 +229,9 @@ def _classify_all_articles(self): self.logger.debug('Successfully sent processed from %d to %d', pliststart, pliststart+plistsize) pliststart+= plistsize elif isinstance(response, Response): - self.logger.error('Failed to post processed. Reattempting. %s', response) - + wait_time=random.random()*2 + self.logger.error('Failed to post processed. Reattempting in %.2fs. %s', response, wait_time) + time.sleep(wait_time) self.logger.debug("Time to post processed: %.2fs", timeit.default_timer()-tstart_post) self._model_iteration.article_classified += len(processed_list) self._update_model_iteration(keep_fields=['article_classified']) @@ -702,13 +731,17 @@ def run(self, *, run_once: bool = False): self._chatgpt_tasks() - #TODO create plot data for topics and models self._create_chart_data() + # Set status of model_iteration to complete. for level in range(1, 4): model_structure = getattr(self._model_iteration, f'model_level{level}') model_structure.status = 'complete' + model_structure.path = os.path.join( + self.model_iteration_path, + f'model_level{level}' + ) self._update_model_structure( level=level, model_structure=model_structure, diff --git a/newsscrapy/newsscrapy/pipelines.py b/newsscrapy/newsscrapy/pipelines.py index 86b5463..c6e34f9 100644 --- a/newsscrapy/newsscrapy/pipelines.py +++ b/newsscrapy/newsscrapy/pipelines.py @@ -96,7 +96,7 @@ def process_item(self, item, spider): while True: if len(self._url_history) > 0: if (time.time() - self._url_history[0][1]) > 86400: - self.logger.debug('removing article from url history: %s', self._url_history[0][1]['url']) + self.logger.debug('removing article from url history: %s', self._url_history[0]) self._url_history.pop(0) else: break diff --git a/runtime_scripts/chatgpt.py b/runtime_scripts/chatgpt.py index 30ccb0b..5b9de75 100644 --- a/runtime_scripts/chatgpt.py +++ b/runtime_scripts/chatgpt.py @@ -38,6 +38,7 @@ socket_io_port=config['MAPLE_CHAT_PORT'], socket_io_api_key=config['MAPLE_CHAT_SOCKETIO_KEY'], article_fetching=True, + use_config = True, ) server.run() diff --git a/runtime_scripts/data_fetcher.py b/runtime_scripts/data_fetcher.py index 9eedfaf..bba29bd 100644 --- a/runtime_scripts/data_fetcher.py +++ b/runtime_scripts/data_fetcher.py @@ -1,29 +1,37 @@ """fetches data and stores in database""" import logging -from logging.handlers import RotatingFileHandler import argparse import os import sys -import time +import scrapy +import numpy as np +from scrapy.crawler import CrawlerRunner +from scrapy.utils.project import get_project_settings +from twisted.internet import defer + import rcs +from maple_interface.maple import MapleAPI + from maple_config import config as cfg + sys.path.append(os.path.join(os.path.abspath(""), "newsscrapy")) +sys.path.append(os.path.join(os.path.abspath(""), "../newsscrapy")) +from newsscrapy.spiders import scrapyCBC, scrapyCTVNews print(sys.path) -import scrapy -from scrapy.crawler import CrawlerProcess, CrawlerRunner -from scrapy.utils.project import get_project_settings + + scrapy.utils.reactor.install_reactor( "twisted.internet.asyncioreactor.AsyncioSelectorReactor" ) -from twisted.internet import defer + defer.setDebugging(True) -from newsscrapy.spiders import scrapyCBC, scrapyCTVNews + parser = argparse.ArgumentParser( @@ -54,7 +62,7 @@ logger = logging.getLogger("data_fetcher") -spiders = [ +spiders_ = [ scrapyCBC.CBCHeadlinesSpider, scrapyCTVNews.CTVNewsSpider, ] @@ -65,11 +73,14 @@ class DataFetcher: def __init__( self, + backend_ip: str, + backend_port: str, spiders: list = [scrapyCBC.CBCHeadlinesSpider, scrapyCTVNews.CTVNewsSpider], spider_output_file: bool = False, spider_log_level: str = "warning", spider_interval_sec: int = 120, - environment= '.env.development' + environment= '.env.development', + ) -> None: self.spider_output_file = spider_output_file @@ -82,7 +93,28 @@ def __init__( self._spider_interval_sec = spider_interval_sec self._spiders = spiders - + + self._maple_api = MapleAPI( + f"http://{backend_ip}:{backend_port}" + ) + + def _update_spider_interval_sec_from_config(self): + if self._environment == '.env.development': + logger.info("Development environment. Not updating spider interval from config.") + return + config = self._maple_api.config_get() + if isinstance(config, dict): + if 'spider_interval_seconds' in config: + if self._spider_interval_sec != config['spider_interval_seconds']: + logger.info("Updating spider interval from %s to %s", self._spider_interval_sec, config['spider_interval_seconds']) + self._spider_interval_sec = config['spider_interval_seconds'] + elif config == {}: + logger.warning("Failed to retrieve config from backend. Using last updated spider interval.") + else: + logger.error("spider_interval_seconds not found in config.") + else: + logger.warning("Failed to get config from backend. Using last updated spider interval.") + def _get_project_settings(self): self._scrapy_settings = get_project_settings() @@ -103,8 +135,71 @@ def _get_project_settings(self): } self._scrapy_settings["MAPLE_ENVIRONMENT"] = self._environment - + + self._scrapy_settings["COOKIES_ENABLED"] = False + self._scrapy_settings["DOWNLOAD_DELAY"] = np.random.uniform(1.1, 3.5) + self._scrapy_settings["RANDOMIZE_DOWNLOAD_DELAY"] = True + + # self._scrapy_settings["USER_AGENT"]= "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Googlebot/2.1; http://www.google.com/bot.html) Chrome/W.X.Y.Z‡ Safari/537.36" + # user_agent_keywords = [ + # "Ambitious", + # "Helpful", + # "Adventurous", + # "Bright", + # "Cheerful", + # "Considerate", + # "Courageous", + # "Adaptable", + # "Affable", + # "Affectionate", + # "Agreeable", + # "Attentive", + # "Authentic", + # "Brave", + # "Calm", + # "Creative", + # "Diligent", + # "Empathetic", + # "Witty", + # "Amazing", + # "Amiable", + # "Amicable", + # "Compassionate", + # "Curious" + # ] + # user_agent_adjectives = [f"{np.random.choice(user_agent_keywords)}" for _ in range(np.random.randint(2, 5))] + # user_agent_name = 'And'.join(user_agent_adjectives) + # self._scrapy_settings["USER_AGENT"]= f"my-{user_agent_name}-project (http://{'example'}.com)" + + self._scrapy_settings["DOWNLOADER_MIDDLEWARES"] = { + 'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None, + 'scrapy.downloadermiddlewares.retry.RetryMiddleware': None, + 'scrapy_fake_useragent.middleware.RandomUserAgentMiddleware': 400, + 'scrapy_fake_useragent.middleware.RetryUserAgentMiddleware': 401, + 'scrapy.downloadermiddlewares.httpproxy.HttpProxyMiddleware': 110, + } + + self._scrapy_settings["FAKEUSERAGENT_PROVIDERS"] = [ + 'scrapy_fake_useragent.providers.FakerProvider', + 'scrapy_fake_useragent.providers.FakeUserAgentProvider', + 'scrapy_fake_useragent.providers.FixedUserAgentProvider', + ] + + # If using proxies, should include them here. + # self._scrapy_settings["ROTATING_PROXY_LIST"] = [ + # 'http://proxy1.com:8000', + # 'http://proxy2.com:8031', + # # Add more proxies here + # ] + + self._scrapy_settings["AUTOTHROTTLE_ENABLED"] = True + self._scrapy_settings["AUTOTHROTTLE_START_DELAY"] = 1 + self._scrapy_settings["AUTOTHROTTLE_MAX_DELAY"] = 10 + self._scrapy_settings["AUTOTHROTTLE_TARGET_CONCURRENCY"] = 1.0 + self._scrapy_settings["AUTOTHROTTLE_DEBUG"] = False + self._scrapy_settings["ENV"] = "0.0.0.0" + logging.debug(self._scrapy_settings) def _crawl_job(self, spider): """create the crawl job""" @@ -129,6 +224,8 @@ def _catch_error(self, failure): def _crawl(self, spider): """crawl a spider and set callback to schedule next crawl""" logger.info("Crawling spider: %s", spider) + # Fetch config from backend to update interval in case it changed. + self._update_spider_interval_sec_from_config() job = self._crawl_job(spider) job.addCallback(self._schedule_next_crawl, self._spider_interval_sec, spider) # job.errback(self._catch_error) @@ -173,6 +270,9 @@ def main(args): logger.debug("config: %s", config) data_fetcher = DataFetcher( + backend_ip=config['MAPLE_BACKEND_IP'], + backend_port=config['MAPLE_BACKEND_PORT'], + spiders=spiders_, spider_output_file=args.o, spider_interval_sec=args.i, spider_log_level=args.l, @@ -183,4 +283,4 @@ def main(args): if __name__ == "__main__": args = parser.parse_args() - main(args) + main(args) \ No newline at end of file diff --git a/runtime_scripts/delete_model_iteration.py b/runtime_scripts/delete_model_iteration.py index 17ec71d..6973418 100644 --- a/runtime_scripts/delete_model_iteration.py +++ b/runtime_scripts/delete_model_iteration.py @@ -1,6 +1,5 @@ import logging -import coloredlogs import argparse import enum import datetime @@ -8,13 +7,19 @@ import time import os import shutil +import coloredlogs +from openai import timeout import rcs +from maple_structures.model import ModelIteration from maple_interface import MapleAPI from maple_config import config as cfg ENV = cfg.PRODUCTION logger = logging.getLogger('ModelIterationDeletion') +logger.info('Starting ModelIterationDeletion script') + +TIMEOUT = 120 class DeleteType(enum.Enum): all = 'all' @@ -22,25 +27,64 @@ class DeleteType(enum.Enum): old = 'old' -def delete_model_iteration(delete_type: DeleteType): +def delete_model_iteration(delete_type: DeleteType, use_config: bool = True): logger.info("Delete model iterations. Deletion type: %s", delete_type.value) - + config = cfg.load_config(ENV) - + maple = MapleAPI( authority=f"http://{config['MAPLE_BACKEND_IP']}:{config['MAPLE_BACKEND_PORT']}" ) - - model_iterations = maple.model_iteration_get() - logger.debug("Model iterations retrieved: %d", len(model_iterations)) - delete_model_iterations = [] - - if delete_type == DeleteType.old: days_limit = int(config['MAPLE_MODEL_ITERATION_DATA_PERSISTENCE_DAYS']) - logger.info("Attempt deletion of model iterations older than %d days", days_limit) + logger.debug("Use configuration: %s", use_config) + if use_config: + backend_config = maple.config_get() + logger.debug("Backend configuration: %s", backend_config) + if isinstance(backend_config, dict): + if 'model_iteration_persistence_days' in backend_config: + days_limit = int( + backend_config['model_iteration_persistence_days']) + else: + logger.error( + "Backend configuration does not have 'model_iteration_persistence_days'") + return + else: + logger.error( + "Failed to fetch backend configuration. Cancelling deletion.") + return + + logger.info( + "Attempt deletion of model iterations older than %d days", days_limit) + + model_iterations_reduced = maple.model_iteration_get(reduced=True, timeout=TIMEOUT) + model_iterations = [] + for model_iteration in model_iterations_reduced: + if delete_type == DeleteType.old: + datediff = datetime.datetime.now(datetime.timezone.utc)-datetime.datetime.fromisoformat( + model_iteration.createDate.replace('Z', '+00:00') + ) + if datediff > datetime.timedelta(days=days_limit): + model_iteration = maple.model_iteration_get(uuid=model_iteration.uuid, timeout=TIMEOUT) + if isinstance(model_iteration, list): + for model_iteration_ in model_iteration: + if isinstance(model_iteration_, ModelIteration): + model_iterations.append(model_iteration_) + else: + model_iteration = maple.model_iteration_get(uuid=model_iteration.uuid, timeout=TIMEOUT) + if isinstance(model_iteration, list): + for model_iteration_ in model_iteration: + if isinstance(model_iteration_, ModelIteration): + model_iterations.append(model_iteration_) + logger.debug( + "Model iterations reduced/fully retrieved: %d/%d", + len(model_iterations_reduced), + len(model_iterations)) + logger.debug("Model iterations retrieved: %d", len(model_iterations)) + + delete_model_iterations = [] for model_iteration in model_iterations: if delete_type == DeleteType.old: datediff = datetime.datetime.now(datetime.timezone.utc)-datetime.datetime.fromisoformat( @@ -53,7 +97,7 @@ def delete_model_iteration(delete_type: DeleteType): models = [ getattr(model_iteration, f'model_level{level}') for level in range(1, 4)] complete_models = [True if model.status == - 'complete' else False for model in models] + 'complete' else False for model in models] if not all(complete_models): delete_model_iterations.append(model_iteration) @@ -76,41 +120,59 @@ def delete_model_iteration(delete_type: DeleteType): model_iteration.uuid) if os.path.exists(model_iteration_data_path): try: - logger.debug('Removing files associated with model iteration %s', model_iteration.uuid) + logger.debug( + 'Removing files associated with model iteration %s', + model_iteration.uuid) shutil.rmtree(model_iteration_data_path) except Exception as exc: - logger.error('Failed to remove directory for model iteration %s. %s', model_iteration.uuid, exc) - logger.info('Model iteration %s will not be deleted.', model_iteration.uuid) + logger.error( + 'Failed to remove directory for model iteration %s. %s', + model_iteration.uuid, + exc) + logger.info( + 'Model iteration %s will not be deleted.', + model_iteration.uuid) continue else: - logger.debug('Model iteration %s does not have data. Skip data deletion.', model_iteration.uuid) - + logger.debug( + 'Model iteration %s does not have data. Skip data deletion.', + model_iteration.uuid) + if os.path.isfile(f"{model_iteration_data_path}.zip"): os.remove(f"{model_iteration_data_path}.zip") - + # remove from backend - response = maple.model_iteration_delete(model_iteration.uuid) + response = maple.model_iteration_delete(model_iteration.uuid, timeout=TIMEOUT) if response != 200: logger.error( - 'Failed deletion of model_iteration with uuid: %s. %s', model_iteration.uuid, response) + 'Failed deletion of model_iteration with uuid: %s. %s', + model_iteration.uuid, + response) except Exception as exc: logger.error('Failed to remove model iteration %s.', exc) -async def delete_model_iteration_async(delete_type: DeleteType, period_seconds: int = None): +async def delete_model_iteration_async( + delete_type: DeleteType, + period_seconds: int | None = None, + use_config: bool = True): + tstart = time.time() while True: try: - delete_model_iteration(delete_type=delete_type) + delete_model_iteration( + delete_type=delete_type, use_config=use_config) if period_seconds is None: wait_time = rcs.utils.time_to_midnight() else: - wait_time = period_seconds - ((time.time()-tstart)%period_seconds) + wait_time = period_seconds - \ + ((time.time()-tstart) % period_seconds) if wait_time < 0: wait_time = 0 logger.debug("wait_time is %f", wait_time) - logger.info('Next deletion will occur in %.2f hours', wait_time/60/60) + logger.info('Next deletion will occur in %.2f hours', + wait_time/60/60) await asyncio.sleep(wait_time) except Exception as exc: logger.error('Failed to remove model iteration. %s', exc) @@ -140,6 +202,13 @@ async def delete_model_iteration_async(delete_type: DeleteType, period_seconds: default='info', choices=['debug', 'info', 'warning', 'error', 'critical'], ) +parser.add_argument( + '-c', + '--use_config', + action='store_true', + help='''If provided, the configuration will be fetched from backend and used + for deletion in case type is old''', +) if __name__ == '__main__': args = parser.parse_args() @@ -151,21 +220,28 @@ async def delete_model_iteration_async(delete_type: DeleteType, period_seconds: n_log_files=1, use_postfix_hour=False, ) - + coloredlogs.install(level=getattr(logging, args.l.upper())) - + logger.debug('Arguments provided are: %s', args) - - delete_type = getattr(DeleteType, args.t) - + + delete_type_ = getattr(DeleteType, args.t) + if args.a: if not args.p: - logger.info('Period not provided. Deletion will occur every midnight.') - if delete_type != DeleteType.old: - raise ValueError("In async (a) mode, only 'old' deletion type is accepted.") + logger.info( + 'Period not provided. Deletion will occur every midnight.') + if delete_type_ != DeleteType.old: + raise ValueError( + "In async (a) mode, only 'old' deletion type is accepted.") logger.debug("Running script in async mode with period %d.", args.p) asyncio.run( - delete_model_iteration_async(delete_type=delete_type, period_seconds=args.p) + delete_model_iteration_async( + delete_type=delete_type_, + period_seconds=None if not args.p else args.p, + use_config=args.use_config) ) else: - delete_model_iteration(delete_type=delete_type) + delete_model_iteration( + delete_type=delete_type_, + use_config=args.use_config) diff --git a/runtime_scripts/maple_models.py b/runtime_scripts/maple_models.py index 6b17f6f..6172716 100644 --- a/runtime_scripts/maple_models.py +++ b/runtime_scripts/maple_models.py @@ -1,11 +1,10 @@ import logging -import coloredlogs import argparse import os +import coloredlogs import rcs -from maple_chatgpt import ChatgptClient, chatgpt_client +from maple_chatgpt import ChatgptClient from maple_processing import MapleProcessing, MapleBert, MapleModel -from maple_structures import Article from maple_interface import MapleAPI from maple_config import config as cfg diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_maple_api.py b/tests/test_maple_api.py new file mode 100644 index 0000000..67a5bc0 --- /dev/null +++ b/tests/test_maple_api.py @@ -0,0 +1,32 @@ +import unittest +from maple_interface import MapleAPI +from maple_config import config as cfg + +class TestMapleAPI(unittest.TestCase): + + def setUp(self): + config = cfg.load_config(cfg.PRODUCTION) + self.api = MapleAPI( + authority=f"http://{config['MAPLE_BACKEND_IP']}:{config['MAPLE_BACKEND_PORT']}") + + def test_get_article_count(self): + count = self.api.article_count_get() + self.assertIsInstance(count, int) + self.assertGreaterEqual(count, 0) + + def test_article_skip(self): + articles = self.api.article_get(limit=1) + article = articles[0] + articles = self.api.article_get(limit=1, skip=1) + self.assertNotEqual(article.uuid, articles[0].uuid) + + def test_article_iterator(self): + total_count = self.api.article_count_get() + skip = total_count - 2000 + articles = [] + for iterator in self.api.article_iterator(limit=200, page=0, skip=skip): + articles.extend(iterator) + self.assertEqual(len(articles), 2000) + +if __name__ == '__main__': + unittest.main()