Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 44 additions & 23 deletions application/single_app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,14 +309,34 @@ def check_retention_policy():

if personal_enabled or group_enabled or public_enabled:
current_time = datetime.now(timezone.utc)
execution_hour = settings.get('retention_policy_execution_hour', 2)

# Check if we're in the execution hour
if current_time.hour == execution_hour:
# Check if we haven't run today yet
# Check if next scheduled run time has passed
next_run = settings.get('retention_policy_next_run')
should_run = False

if next_run:
try:
next_run_dt = datetime.fromisoformat(next_run)
# Run if we've passed the scheduled time
if current_time >= next_run_dt:
should_run = True
except Exception as parse_error:
print(f"Error parsing next_run timestamp: {parse_error}")
# If we can't parse, fall back to checking last_run
last_run = settings.get('retention_policy_last_run')
if last_run:
try:
last_run_dt = datetime.fromisoformat(last_run)
# Run if last run was more than 23 hours ago
if (current_time - last_run_dt).total_seconds() > (23 * 3600):
should_run = True
except:
should_run = True
else:
should_run = True
else:
# No next_run set, check last_run instead
last_run = settings.get('retention_policy_last_run')
should_run = False

if last_run:
try:
last_run_dt = datetime.fromisoformat(last_run)
Expand All @@ -326,29 +346,30 @@ def check_retention_policy():
except:
should_run = True
else:
# Never run before, execute now
should_run = True

if should_run:
print(f"Executing scheduled retention policy at {current_time.isoformat()}")
from functions_retention_policy import execute_retention_policy
results = execute_retention_policy(manual_execution=False)

if should_run:
print(f"Executing scheduled retention policy at {current_time.isoformat()}")
from functions_retention_policy import execute_retention_policy
results = execute_retention_policy(manual_execution=False)

if results.get('success'):
print(f"Retention policy execution completed: "
f"{results['personal']['conversations']} personal conversations, "
f"{results['personal']['documents']} personal documents, "
f"{results['group']['conversations']} group conversations, "
f"{results['group']['documents']} group documents, "
f"{results['public']['conversations']} public conversations, "
f"{results['public']['documents']} public documents deleted.")
else:
print(f"Retention policy execution failed: {results.get('errors')}")
if results.get('success'):
print(f"Retention policy execution completed: "
f"{results['personal']['conversations']} personal conversations, "
f"{results['personal']['documents']} personal documents, "
f"{results['group']['conversations']} group conversations, "
f"{results['group']['documents']} group documents, "
f"{results['public']['conversations']} public conversations, "
f"{results['public']['documents']} public documents deleted.")
else:
print(f"Retention policy execution failed: {results.get('errors')}")

except Exception as e:
print(f"Error in retention policy check: {e}")

# Check every hour
time.sleep(3600)
# Check every 5 minutes for more responsive scheduling
time.sleep(300)

# Start the retention policy check thread
retention_thread = threading.Thread(target=check_retention_policy, daemon=True)
Expand Down
2 changes: 1 addition & 1 deletion application/single_app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
EXECUTOR_TYPE = 'thread'
EXECUTOR_MAX_WORKERS = 30
SESSION_TYPE = 'filesystem'
VERSION = "0.235.012"
VERSION = "0.235.025"


SECRET_KEY = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production')
Expand Down
94 changes: 68 additions & 26 deletions application/single_app/functions_retention_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from functions_activity_logging import log_conversation_deletion, log_conversation_archival
from functions_notifications import create_notification, create_group_notification, create_public_workspace_notification
from functions_debug import debug_print
from functions_appinsights import log_event
from datetime import datetime, timezone, timedelta


Expand All @@ -36,6 +37,7 @@ def get_all_user_settings():
))
return users
except Exception as e:
log_event("get_all_user_settings_error", {"error": str(e)})
debug_print(f"Error fetching all user settings: {e}")
return []

Expand All @@ -55,6 +57,7 @@ def get_all_groups():
))
return groups
except Exception as e:
log_event("get_all_groups_error", {"error": str(e)})
debug_print(f"Error fetching all groups: {e}")
return []

Expand All @@ -74,6 +77,7 @@ def get_all_public_workspaces():
))
return workspaces
except Exception as e:
log_event("get_all_public_workspaces_error", {"error": str(e)})
debug_print(f"Error fetching all public workspaces: {e}")
return []

Expand Down Expand Up @@ -156,6 +160,7 @@ def execute_retention_policy(workspace_scopes=None, manual_execution=False):
return results

except Exception as e:
log_event("execute_retention_policy_error", {"error": str(e), "workspace_scopes": workspace_scopes, "manual_execution": manual_execution})
debug_print(f"Error executing retention policy: {e}")
results['success'] = False
results['errors'].append(str(e))
Expand Down Expand Up @@ -196,6 +201,8 @@ def process_personal_retention():
if conversation_retention_days == 'none' and document_retention_days == 'none':
continue

debug_print(f"Processing retention for user {user_id}: conversations={conversation_retention_days} days, documents={document_retention_days} days")

user_deletion_summary = {
'user_id': user_id,
'conversations_deleted': 0,
Expand All @@ -216,6 +223,7 @@ def process_personal_retention():
user_deletion_summary['conversation_details'] = conv_results['details']
results['conversations'] += conv_results['count']
except Exception as e:
log_event("process_personal_retention_conversations_error", {"error": str(e), "user_id": user_id})
debug_print(f"Error processing conversations for user {user_id}: {e}")

# Process documents
Expand All @@ -230,6 +238,7 @@ def process_personal_retention():
user_deletion_summary['document_details'] = doc_results['details']
results['documents'] += doc_results['count']
except Exception as e:
log_event("process_personal_retention_documents_error", {"error": str(e), "user_id": user_id})
debug_print(f"Error processing documents for user {user_id}: {e}")

# Send notification if anything was deleted
Expand All @@ -241,6 +250,7 @@ def process_personal_retention():
return results

except Exception as e:
log_event("process_personal_retention_error", {"error": str(e)})
debug_print(f"Error in process_personal_retention: {e}")
return results

Expand Down Expand Up @@ -299,6 +309,7 @@ def process_group_retention():
group_deletion_summary['conversation_details'] = conv_results['details']
results['conversations'] += conv_results['count']
except Exception as e:
log_event("process_group_retention_conversations_error", {"error": str(e), "group_id": group_id})
debug_print(f"Error processing conversations for group {group_id}: {e}")

# Process documents
Expand All @@ -313,6 +324,7 @@ def process_group_retention():
group_deletion_summary['document_details'] = doc_results['details']
results['documents'] += doc_results['count']
except Exception as e:
log_event("process_group_retention_documents_error", {"error": str(e), "group_id": group_id})
debug_print(f"Error processing documents for group {group_id}: {e}")

# Send notification if anything was deleted
Expand All @@ -324,6 +336,7 @@ def process_group_retention():
return results

except Exception as e:
log_event("process_group_retention_error", {"error": str(e)})
debug_print(f"Error in process_group_retention: {e}")
return results

Expand Down Expand Up @@ -382,6 +395,7 @@ def process_public_retention():
workspace_deletion_summary['conversation_details'] = conv_results['details']
results['conversations'] += conv_results['count']
except Exception as e:
log_event("process_public_retention_conversations_error", {"error": str(e), "public_workspace_id": workspace_id})
debug_print(f"Error processing conversations for public workspace {workspace_id}: {e}")

# Process documents
Expand All @@ -396,6 +410,7 @@ def process_public_retention():
workspace_deletion_summary['document_details'] = doc_results['details']
results['documents'] += doc_results['count']
except Exception as e:
log_event("process_public_retention_documents_error", {"error": str(e), "public_workspace_id": workspace_id})
debug_print(f"Error processing documents for public workspace {workspace_id}: {e}")

# Send notification if anything was deleted
Expand All @@ -407,6 +422,7 @@ def process_public_retention():
return results

except Exception as e:
log_event("process_public_retention_error", {"error": str(e)})
debug_print(f"Error in process_public_retention: {e}")
return results

Expand Down Expand Up @@ -447,31 +463,42 @@ def delete_aged_conversations(retention_days, workspace_type='personal', user_id
cutoff_iso = cutoff_date.isoformat()

# Query for aged conversations
# Check for null/undefined FIRST to avoid comparing null values with dates
query = f"""
SELECT c.id, c.title, c.last_activity_at, c.{partition_field}
FROM c
WHERE c.{partition_field} = @partition_value
AND (c.last_activity_at < @cutoff_date OR IS_NULL(c.last_activity_at))
AND (NOT IS_DEFINED(c.last_activity_at)
OR IS_NULL(c.last_activity_at)
OR (IS_DEFINED(c.last_activity_at) AND NOT IS_NULL(c.last_activity_at) AND c.last_activity_at < @cutoff_date))
"""

parameters = [
{"name": "@partition_value", "value": partition_value},
{"name": "@cutoff_date", "value": cutoff_iso}
]

aged_conversations = list(container.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True
))
debug_print(f"Querying aged conversations: workspace_type={workspace_type}, partition_field={partition_field}, partition_value={partition_value}, cutoff_date={cutoff_iso}, retention_days={retention_days}")

try:
aged_conversations = list(container.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True
))
debug_print(f"Found {len(aged_conversations)} aged conversations for {workspace_type} workspace")
except Exception as query_error:
log_event("delete_aged_conversations_query_error", {"error": str(query_error), "workspace_type": workspace_type, "partition_value": partition_value})
debug_print(f"Error querying aged conversations for {workspace_type} (partition_value={partition_value}): {query_error}")
return {'count': 0, 'details': []}

deleted_details = []

for conv in aged_conversations:
conversation_id = conv.get('id')
conversation_title = conv.get('title', 'Untitled')

try:
conversation_id = conv.get('id')
conversation_title = conv.get('title', 'Untitled')

# Read full conversation for archiving/logging
conversation_item = container.read_item(
item=conversation_id,
Expand Down Expand Up @@ -535,7 +562,7 @@ def delete_aged_conversations(retention_days, workspace_type='personal', user_id
is_bulk_operation=True,
group_id=conversation_item.get('group_id'),
public_workspace_id=conversation_item.get('public_workspace_id'),
deletion_reason='retention_policy'
additional_context={'deletion_reason': 'retention_policy'}
)

# Delete conversation
Expand All @@ -553,7 +580,9 @@ def delete_aged_conversations(retention_days, workspace_type='personal', user_id
debug_print(f"Deleted conversation {conversation_id} ({conversation_title}) due to retention policy")

except Exception as e:
debug_print(f"Error deleting conversation {conversation_id}: {e}")
conv_id = conv.get('id', 'unknown') if conv else 'unknown'
log_event("delete_aged_conversations_deletion_error", {"error": str(e), "conversation_id": conv_id, "workspace_type": workspace_type})
debug_print(f"Error deleting conversation {conv_id}: {e}")

return {
'count': len(deleted_details),
Expand Down Expand Up @@ -593,37 +622,48 @@ def delete_aged_documents(retention_days, workspace_type='personal', user_id=Non
deletion_user_id = user_id

# Calculate cutoff date
# Documents use format like '2026-01-08T21:49:15Z' so we match that format
cutoff_date = datetime.now(timezone.utc) - timedelta(days=retention_days)
cutoff_iso = cutoff_date.isoformat()
cutoff_iso = cutoff_date.strftime('%Y-%m-%dT%H:%M:%SZ')

# Query for aged documents
# Documents use 'last_updated' field (not 'last_activity_at' like conversations)
# Use simple date comparison - documents always have last_updated field
query = f"""
SELECT c.id, c.file_name, c.title, c.last_activity_at, c.{partition_field}, c.user_id
SELECT c.id, c.file_name, c.title, c.last_updated, c.user_id
FROM c
WHERE c.{partition_field} = @partition_value
AND (c.last_activity_at < @cutoff_date OR IS_NULL(c.last_activity_at))
AND c.last_updated < @cutoff_date
"""

parameters = [
{"name": "@partition_value", "value": partition_value},
{"name": "@cutoff_date", "value": cutoff_iso}
]

aged_documents = list(container.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True
))
debug_print(f"Querying aged documents: workspace_type={workspace_type}, partition_field={partition_field}, partition_value={partition_value}, cutoff_date={cutoff_iso}, retention_days={retention_days}")

try:
aged_documents = list(container.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True
))
debug_print(f"Found {len(aged_documents)} aged documents for {workspace_type} workspace")
except Exception as query_error:
log_event("delete_aged_documents_query_error", {"error": str(query_error), "workspace_type": workspace_type, "partition_value": partition_value})
debug_print(f"Error querying aged documents for {workspace_type} (partition_value={partition_value}): {query_error}")
return {'count': 0, 'details': []}

deleted_details = []

for doc in aged_documents:
document_id = doc.get('id')
file_name = doc.get('file_name', 'Unknown')
title = doc.get('title', file_name)
doc_user_id = doc.get('user_id') or deletion_user_id

try:
document_id = doc.get('id')
file_name = doc.get('file_name', 'Unknown')
title = doc.get('title', file_name)
doc_user_id = doc.get('user_id') or deletion_user_id

# Delete document chunks from search index
delete_document_chunks(document_id, group_id, public_workspace_id)

Expand All @@ -634,13 +674,15 @@ def delete_aged_documents(retention_days, workspace_type='personal', user_id=Non
'id': document_id,
'file_name': file_name,
'title': title,
'last_activity_at': doc.get('last_activity_at')
'last_updated': doc.get('last_updated')
})

debug_print(f"Deleted document {document_id} ({file_name}) due to retention policy")

except Exception as e:
debug_print(f"Error deleting document {document_id}: {e}")
doc_id = doc.get('id', 'unknown') if doc else 'unknown'
log_event("delete_aged_documents_deletion_error", {"error": str(e), "document_id": doc_id, "workspace_type": workspace_type})
debug_print(f"Error deleting document {doc_id}: {e}")

return {
'count': len(deleted_details),
Expand Down
Loading