Skip to content

Commit 394220d

Browse files
committed
chore: Refactor code to import pictures from cache
1 parent e9c6e08 commit 394220d

File tree

4 files changed

+279
-35
lines changed

4 files changed

+279
-35
lines changed

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ IMG_NAME = "ghcr.io/libertech-fr/sesame-taiga_crawler"
55
BASE_NAME = "sesame"
66
APP_NAME = "sesame-taiga_crawler"
77
PLATFORM = "linux/amd64"
8+
STC_RUN = ""
89
include .env
910

1011
.DEFAULT_GOAL := help
@@ -28,7 +29,7 @@ run-crawler-docker: ## Lance le crawler Sesame - Taiga avec python !
2829
--platform $(PLATFORM) \
2930
--name $(APP_NAME) \
3031
-v $(CURDIR):/data \
31-
$(IMG_NAME)
32+
$(IMG_NAME) python main.py $(STC_RUN)
3233

3334
run-crawler: ## Lance le crawler Sesame - Taiga avec python !
3435
@python3 main.py

main.py

Lines changed: 47 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
import hashlib
77
import sys
88
from datetime import datetime
9-
import argparse
109

1110
from src.a_moins_b import a_moins_b
11+
from src.export_pictures import export_pictures
1212
from src.export_ind import export_ind
1313
from src.import_ind import import_ind
14+
from src.import_pictures import import_pictures
1415

1516
logging.basicConfig(level=logging.INFO)
1617
logger: logging.Logger = logging.getLogger(__name__)
@@ -36,64 +37,76 @@
3637
"pass_ensa": hashlib.sha1(ensa_pass.encode()).hexdigest(),
3738
}
3839

40+
print(ensa_infos)
41+
print(headers)
42+
3943
collections = [
44+
# {
45+
# "function": export_ind,
46+
# "method": "ExportInd",
47+
# "params": {
48+
# **ensa_infos,
49+
# "type": "etd",
50+
# "id": "*",
51+
# },
52+
# },
53+
# {
54+
# "function": export_ind,
55+
# "method": "ExportInd",
56+
# "params": {
57+
# **ensa_infos,
58+
# "type": "adm",
59+
# "id": "*",
60+
# },
61+
# },
62+
# {
63+
# "function": export_ind,
64+
# "method": "ExportInd",
65+
# "params": {
66+
# **ensa_infos,
67+
# "type": "esn",
68+
# "id": "*",
69+
# },
70+
# },
4071
{
41-
"function": export_ind,
42-
"method": "ExportInd",
72+
"function": export_pictures,
73+
"method": "ExportPhotos",
74+
"methodBase64": "ExportPhoto",
4375
"params": {
4476
**ensa_infos,
4577
"type": "etd",
46-
"id": "*"
47-
},
48-
},
49-
{
50-
"function": export_ind,
51-
"method": "ExportInd",
52-
"params": {
53-
**ensa_infos,
54-
"type": "adm",
55-
"id": "*"
78+
"id": "*",
5679
},
57-
},
58-
{
59-
"function": export_ind,
60-
"method": "ExportInd",
61-
"params": {
80+
"paramsBase64": {
81+
"type": "etd",
6282
**ensa_infos,
63-
"type": "esn",
64-
"id": "*"
6583
},
6684
},
6785
]
6886

6987

70-
async def main():
71-
parser = argparse.ArgumentParser()
72-
parser.add_argument('--run', help='all | taiga | sesame',default='all')
73-
parser.add_argument('--an', help='Année universitaire à importer ',default="0")
74-
args = parser.parse_args()
75-
if args.an != 0 :
76-
print(f"Import pour l'annee {args.an}")
77-
for col in collections:
78-
col.get('params')['au']=int(args.an)
79-
80-
if args.run == 'taiga' or args.run == 'all':
88+
async def main(args):
89+
run = ""
90+
if len(args) == 2:
91+
run = args[1]
92+
if run == 'taiga' or run == '':
8193
logger.info("Starting Taiga crawler...")
8294
await a_moins_b(url, 0, -1, headers)
8395
collection_tasks = [col.get('function')(url, col, headers) for col in collections]
8496
await asyncio.gather(*collection_tasks)
8597
print("Taiga crawler ended successful !!!")
86-
if args.run == 'sesame' or args.run == 'all':
98+
if run == 'sesame' or run == '':
8799
print("Starting import_ind...")
88100
start_time = datetime.now()
89-
await import_ind()
101+
# await import_ind()
102+
await import_pictures()
90103
end_time = datetime.now()
91104
execution_time = end_time - start_time
92105
print(f"import_ind completed in {execution_time}")
93106

94107
if __name__ == '__main__':
95108
loop = asyncio.get_event_loop()
96109
try:
97-
loop.run_until_complete(main())
110+
loop.run_until_complete(main(sys.argv))
98111
finally:
99112
loop.close()

src/export_pictures.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import json
2+
import os
3+
import uuid
4+
import requests
5+
import base64
6+
import logging
7+
import urllib3
8+
from logging import Logger
9+
10+
urllib3.disable_warnings()
11+
logging.basicConfig(level=logging.INFO)
12+
logger: Logger = logging.getLogger(__name__)
13+
14+
def compare_fingerprints(current, new):
15+
current_data = {(data['ident'], data['size'], data['date']) for data in current}
16+
new_data = {(data['ident'], data['size'], data['date']) for data in new}
17+
return new_data - current_data
18+
19+
def copy_file(source, destination):
20+
with open(source, 'r', encoding='utf-8') as src_file:
21+
data = src_file.read()
22+
with open(destination, 'w', encoding='utf-8') as dest_file:
23+
dest_file.write(data)
24+
25+
async def export_pictures(url, col, headers):
26+
payload = {
27+
"jsonrpc": "2.0",
28+
"method": col.get('method'),
29+
"params": col.get("params"),
30+
"id": str(uuid.uuid4()),
31+
}
32+
try:
33+
response = requests.post(url, json=payload, headers=headers, verify=False, timeout=10000)
34+
response.raise_for_status()
35+
if response.text == 'ko!':
36+
raise Exception("ko!")
37+
elif not response.json()['result']['output']:
38+
raise Exception("Empty response from ExportPhotos", response.json())
39+
data = response.json()['result']['output']
40+
os.makedirs(f'./cache', exist_ok=True)
41+
os.makedirs(f'./cache/pictures', exist_ok=True)
42+
43+
if os.path.exists(f'./cache/pictures/taiga_{col.get("params")["type"]}.json'):
44+
logger.info(f'<./cache/pictures/taiga_{col.get("params")["type"]}.json> Already exists checking datas...')
45+
current = json.load(open(f'./cache/pictures/taiga_{col.get("params")["type"]}.json', 'r', encoding='utf-8'))
46+
compared = compare_fingerprints(current['data'], data[0][1])
47+
48+
if compared.__len__() > 0:
49+
logger.info(f'<./cache/pictures/taiga_{col.get("params")["type"]}.json> Already exists moving to .old file !')
50+
os.rename(f'./cache/pictures/taiga_{col.get("params")["type"]}.json', f'./cache/pictures/taiga_{col.get("params")["type"]}.json.old')
51+
else:
52+
logger.info(f'<./cache/pictures/taiga_{col.get("params")["type"]}.json> All datas are the same !')
53+
54+
with open(f'./cache/pictures/taiga_{col.get("params")["type"]}.json', 'w', encoding='utf-8') as fichier:
55+
json.dump(
56+
{
57+
"type": data[0][0][0],
58+
"data": data[0][1],
59+
"total": data[0][0][1],
60+
},
61+
fichier,
62+
ensure_ascii=False,
63+
indent=4,
64+
)
65+
logger.info(f"{col.get('method')}")
66+
67+
if os.path.exists(f'./cache/pictures/taiga_{col.get("params")["type"]}.json.old'):
68+
compare_now = compare_fingerprints(
69+
json.load(open(f'./cache/pictures/taiga_{col.get("params")["type"]}.json.old', 'r', encoding='utf-8'))['data'],
70+
json.load(open(f'./cache/pictures/taiga_{col.get("params")["type"]}.json', 'r', encoding='utf-8'))['data'],
71+
)
72+
73+
if compare_now.__len__() > 0:
74+
logger.info(f'<./cache/pictures/taiga_{col.get("params")["type"]}.json> Datas are different, starting exportation...')
75+
for picture in compare_now:
76+
await export_picture(url, col, headers, picture[0])
77+
else:
78+
logger.info(f'<./cache/pictures/taiga_{col.get("params")["type"]}.json> Datas are the same, nothing to do !')
79+
else:
80+
logger.info(f'<./cache/pictures/taiga_{col.get("params")["type"]}.json> No old file found, starting exportation...')
81+
for picture in data[0][1]:
82+
await export_picture(url, col, headers, picture.get('ident'))
83+
84+
copy_file(f'./cache/pictures/taiga_{col.get("params")["type"]}.json', f'./cache/pictures/taiga_{col.get("params")["type"]}.json.old')
85+
except requests.exceptions.HTTPError as e:
86+
logger.warning(f"Failed to insert {col.get('method')}: {e} \n {e.response.text}")
87+
88+
89+
async def export_picture(url, col, headers, id):
90+
payload = {
91+
"jsonrpc": "2.0",
92+
"method": col.get('methodBase64'),
93+
"params": {
94+
**col.get("paramsBase64"),
95+
"id": id.replace(col.get("paramsBase64").get('type') + '-', ''),
96+
},
97+
"id": str(uuid.uuid4()),
98+
}
99+
100+
try:
101+
response = requests.post(url, json=payload, headers=headers, verify=False, timeout=10000)
102+
response.raise_for_status()
103+
if response.text == 'ko!':
104+
raise Exception("ko!")
105+
elif not response.json()['result']['output']:
106+
raise Exception("Empty response from ExportPhoto", response.json())
107+
data = response.json()['result']['output']
108+
os.makedirs(f'./cache/pictures/files', exist_ok=True)
109+
with open(f'./cache/pictures/files/{id}.jpg', 'wb') as fichier:
110+
fichier.write(base64.b64decode(data[0][0].get('fichier')))
111+
logger.info(f"{col.get('methodBase64')} -> {id}")
112+
except requests.exceptions.HTTPError as e:
113+
logger.warning(f"Failed to insert {col.get('methodBase64')}: {e} \n {e.response.text}")

src/import_pictures.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import asyncio
2+
import json
3+
import os
4+
from data_weaver import weave_entries, weave_entry
5+
import aiohttp
6+
import dotenv
7+
import yaml
8+
9+
dotenv.load_dotenv()
10+
sesame_api_baseurl = os.getenv('SESAME_API_BASEURL')
11+
sesame_api_token = os.getenv('SESAME_API_TOKEN')
12+
sesame_import_parallels_files = int(os.getenv('SESAME_IMPORT_PARALLELS_FILES', 1))
13+
sesame_import_parallels_entries = int(os.getenv('SESAME_IMPORT_PARALLELS_ENTRIES', 5))
14+
15+
async def gather_with_concurrency(n, tasks):
16+
semaphore = asyncio.Semaphore(n)
17+
18+
async def sem_task(task):
19+
async with semaphore:
20+
await asyncio.sleep(0.1) # Simulate i/o time
21+
return await task
22+
23+
return await asyncio.gather(*[sem_task(task) for task in tasks])
24+
25+
async def read_response(response):
26+
message = await response.content.read()
27+
jsonMessage = json.loads(message)
28+
print(jsonMessage)
29+
30+
async def send_request(session, url, json):
31+
print(f"Sending request to {url} with {json.get('id')}")
32+
headers = {
33+
"Authorization": f"Bearer {sesame_api_token}",
34+
}
35+
params = {
36+
"filters[inetOrgPerson.employeeNumber]": f"{json.get('id')}",
37+
"filters[inetOrgPerson.employeeType]": "TAIGA",
38+
}
39+
40+
print('filters', params)
41+
42+
try:
43+
form = aiohttp.FormData()
44+
form.add_field('file', json.get('file'), filename='photoJpeg.jpg', content_type='image/jpeg')
45+
46+
async with session.post(url, data=form, headers=headers, params=params) as response:
47+
print(f"Request to {url} successful: {response.status}")
48+
if response.status == 304:
49+
print(f"Cached entry {json.get('id')}")
50+
else:
51+
print(f"Response to {json.get('id')}:")
52+
await read_response(response)
53+
response.raise_for_status() # Raises error for 4xx/5xx responses
54+
except aiohttp.ClientResponseError as e:
55+
# This catches responses like 400, 404, 500 etc.
56+
print(f"Request to {url} failed with status {e.status}: {e.message}")
57+
except aiohttp.ClientError as e:
58+
print(f"Request to {url} failed: {str(e)}")
59+
except asyncio.TimeoutError:
60+
print(f"Request to {url} timed out")
61+
62+
async def process_data(data, file, session):
63+
files = []
64+
print(f"Processing {file}")
65+
66+
for entry in data:
67+
with open(f'./cache/pictures/files/{entry["ident"]}.jpg', 'rb') as fichier:
68+
picture = fichier.read()
69+
files.append({
70+
"id": entry["ident"].split('-')[1],
71+
"file": picture,
72+
})
73+
74+
tasks = [send_request(session, f'{sesame_api_baseurl}/management/identities/upsert/photo', part) for part in files]
75+
await gather_with_concurrency(sesame_import_parallels_files, tasks)
76+
print(f"Processed {file}")
77+
78+
def list_files_in_dir(directory):
79+
files = [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
80+
return files
81+
82+
def filter_datas(datasCurrent, datasOld):
83+
datas = []
84+
old_data_tuples = {(data['ident'], data['size'], data['date']) for data in datasOld}
85+
86+
for data in datasCurrent:
87+
data_tuple = (data['ident'], data['size'], data['date'])
88+
if data_tuple not in old_data_tuples:
89+
datas.append(data)
90+
91+
return datas
92+
93+
async def import_pictures():
94+
cache_files = os.listdir('./cache/pictures/files')
95+
datasCurrent = {}
96+
datasOld = {}
97+
datas = {}
98+
99+
files = list_files_in_dir('./cache/pictures')
100+
101+
for file in files:
102+
if file.endswith(".old"):
103+
with open(f'./cache/pictures/{file}', 'r', encoding='utf-8') as fichier:
104+
datasOld[file.split('.')[0]] = json.load(fichier).get('data')
105+
else:
106+
with open(f'./cache/pictures/{file}', 'r', encoding='utf-8') as fichier:
107+
datasCurrent[file.split('.')[0]] = json.load(fichier).get('data')
108+
109+
for file in files:
110+
if datasOld.get(file.split('.')[0]) is not None:
111+
datas[file.split('.')[0]] = filter_datas(datasCurrent[file.split('.')[0]], datasOld[file.split('.')[0]])
112+
else:
113+
datas[file.split('.')[0]] = datasCurrent[file.split('.')[0]]
114+
115+
async with aiohttp.ClientSession() as session:
116+
tasks = [process_data(datas[entry], entry, session) for entry in datas]
117+
await gather_with_concurrency(sesame_import_parallels_files, tasks)

0 commit comments

Comments
 (0)