diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py index 1f87ed406f..efe14490b3 100644 --- a/synapse/media/media_repository.py +++ b/synapse/media/media_repository.py @@ -71,7 +71,10 @@ from synapse.media.thumbnailer import Thumbnailer, ThumbnailError from synapse.media.url_previewer import UrlPreviewer from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.replication.http.media import ReplicationCopyMediaServlet +from synapse.replication.http.media import ( + ReplicationCopyMediaServlet, + ReplicationDeleteMediaServlet, +) from synapse.state import CREATE_KEY, POWER_KEY from synapse.storage.databases.main.media_repository import ( LocalMedia, @@ -186,6 +189,13 @@ async def copy_media( "Sorry Mario, your MediaRepository related function is in another castle" ) + async def _remove_local_media_from_disk( + self, media_ids: List[str] + ) -> Tuple[List[str], int]: + raise NotImplementedError( + "Sorry Mario, your MediaRepository related function is in another castle" + ) + async def reached_pending_media_limit(self, auth_user: UserID) -> Tuple[bool, int]: raise NotImplementedError( "Sorry Mario, your MediaRepository related function is in another castle" @@ -581,6 +591,7 @@ def __init__(self, hs: "HomeServer"): super().__init__(hs) # initialize replication endpoint here self.copy_media_client = ReplicationCopyMediaServlet.make_client(hs) + self.delete_media_client = ReplicationDeleteMediaServlet.make_client(hs) async def copy_media( self, existing_mxc: MXCUri, auth_user: UserID, max_timeout_ms: int @@ -597,6 +608,18 @@ async def copy_media( ) return MXCUri.from_str(result["content_uri"]) + async def _remove_local_media_from_disk( + self, media_ids: List[str] + ) -> Tuple[List[str], int]: + """ + Call out to the worker responsible for handling media to delete this media object + """ + result = await self.delete_media_client( + instance_name=self.hs.config.worker.workers_doing_media_duty[0], + media_ids=media_ids, + ) + return result["deleted"], result["count"] + class MediaRepository(AbstractMediaRepository): def __init__(self, hs: "HomeServer"): diff --git a/synapse/replication/http/media.py b/synapse/replication/http/media.py index 42ecfcd9d4..fcb6f0e7d7 100644 --- a/synapse/replication/http/media.py +++ b/synapse/replication/http/media.py @@ -96,5 +96,46 @@ async def _handle_request( # type: ignore[override] return 200, {"content_uri": str(mxc_uri)} +class ReplicationDeleteMediaServlet(ReplicationEndpoint): + """Request the MediaRepository to delete a piece of media from filesystem. + + Request format: + + DELETE /_synapse/replication/delete_media + + { + "media_ids": [...], # List of media IDs to delete + } + + """ + + NAME = "delete_media" + PATH_ARGS = () + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + self.media_repo = hs.get_media_repository() + + @staticmethod + async def _serialize_payload( # type: ignore[override] + media_ids: list[str], + ) -> JsonDict: + """ + Args: + media_ids: The list of media IDs to delete. + """ + return {"media_ids": media_ids} + + async def _handle_request( # type: ignore[override] + self, + request: Request, + content: JsonDict, + ) -> Tuple[int, JsonDict]: + media_ids = content["media_ids"] + deleted, count = await self.media_repo._remove_local_media_from_disk(media_ids) + return 200, {"deleted": deleted, "count": count} + + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: ReplicationCopyMediaServlet(hs).register(http_server) + ReplicationDeleteMediaServlet(hs).register(http_server) diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py index 5b15fd707d..01c9d892c4 100644 --- a/synapse/storage/databases/main/censor_events.py +++ b/synapse/storage/databases/main/censor_events.py @@ -59,7 +59,7 @@ def __init__( @wrap_as_background_process("_censor_redactions") async def _censor_redactions(self) -> None: """Censors all redactions older than the configured period that haven't - been censored yet. + been censored yet and deletes any media attached to the redacted events. By censor we mean update the event_json table with the redacted event. """ @@ -104,12 +104,16 @@ async def _censor_redactions(self) -> None: ) updates = [] + media = [] for redaction_id, event_id in rows: redaction_event = await self.get_event(redaction_id, allow_none=True) original_event = await self.get_event( event_id, allow_rejected=True, allow_none=True ) + attached_media_ids = ( + await self.hs.get_datastores().main.get_attached_media_ids(event_id) + ) # The SQL above ensures that we have both the redaction and # original event, so if the `get_event` calls return None it @@ -131,6 +135,7 @@ async def _censor_redactions(self) -> None: pruned_json = None updates.append((redaction_id, event_id, pruned_json)) + media.extend(attached_media_ids) def _update_censor_txn(txn: LoggingTransaction) -> None: for redaction_id, event_id, pruned_json in updates: @@ -145,6 +150,7 @@ def _update_censor_txn(txn: LoggingTransaction) -> None: ) await self.db_pool.runInteraction("_update_censor_txn", _update_censor_txn) + await self.hs.get_media_repository()._remove_local_media_from_disk(media) def _censor_event_txn( self, txn: LoggingTransaction, event_id: str, pruned_json: str diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py index de16f9b736..8a2c557c15 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py @@ -19,6 +19,7 @@ # [This file includes modifications made by New Vector Limited] # # +import json import logging from enum import Enum from http import HTTPStatus @@ -45,6 +46,7 @@ LoggingDatabaseConnection, LoggingTransaction, ) +from synapse.storage.engines import PostgresEngine from synapse.types import JsonDict, UserID from synapse.util import json_encoder @@ -1022,6 +1024,11 @@ def delete_remote_media_txn(txn: LoggingTransaction) -> None: "remote_media_cache_thumbnails", keyvalues={"media_origin": media_origin, "media_id": media_id}, ) + self.db_pool.simple_delete_txn( + txn, + "media_attachments", + keyvalues={"server_name": media_origin, "media_id": media_id}, + ) await self.db_pool.runInteraction( "delete_remote_media", delete_remote_media_txn @@ -1413,3 +1420,33 @@ def _get_reference_count_txn(txn: LoggingTransaction) -> int: return await self.db_pool.runInteraction( "get_media_reference_count_for_sha256", _get_reference_count_txn ) + + async def get_attached_media_ids(self, event_id: str) -> list[str]: + """ + Get a list of media_ids that are attached to a specific event_id. + """ + + def get_attached_media_ids_txn(txn: LoggingTransaction) -> list[str]: + if isinstance(self.db_pool.engine, PostgresEngine): + # Use GIN index for Postgres + sql = """ + SELECT media_id + FROM media_attachments + WHERE restrictions_json @> %s AND server_name = %s + """ + json_param = json.dumps({"restrictions": {"event_id": event_id}}) + txn.execute(sql, (json_param, self.hs.hostname)) + else: + sql = """ + SELECT media_id + FROM media_attachments + WHERE restrictions_json->'restrictions'->>'event_id' = ? AND server_name = ? + """ + txn.execute(sql, (event_id, self.hs.hostname)) + + return [row[0] for row in txn.fetchall()] + + return await self.db_pool.runInteraction( + "get_attached_media_ids", + get_attached_media_ids_txn, + ) diff --git a/tests/replication/test_multi_media_repo.py b/tests/replication/test_multi_media_repo.py index 32bb1f8d53..09ceb96237 100644 --- a/tests/replication/test_multi_media_repo.py +++ b/tests/replication/test_multi_media_repo.py @@ -33,6 +33,7 @@ from twisted.web.server import Request from synapse.api.constants import EventTypes, HistoryVisibility +from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME from synapse.media._base import FileInfo from synapse.media.media_repository import MediaRepository from synapse.rest import admin @@ -967,6 +968,173 @@ def test_copy_remote_restricted_resource_fails_when_requester_does_not_have_acce self.assertEqual(channel.code, 403) +class DeleteRestrictedMediaOnEventRedactionReplicationTestCase( + BaseMultiWorkerStreamTestCase +): + """ + Tests that media attached to redacted events are deleted after the retention period + when `msc3911.enabled` is configured to be True. + """ + + servlets = [ + login.register_servlets, + admin.register_servlets, + room.register_servlets, + ] + use_isolated_media_paths = True + + def default_config(self) -> Dict[str, Any]: + config = super().default_config() + config.update( + { + "experimental_features": {"msc3911": {"enabled": True}}, + "media_repo_instances": ["media_worker_1"], + "run_background_tasks_on": MAIN_PROCESS_INSTANCE_NAME, + "redaction_retention_period": "7d", + } + ) + config["instance_map"] = { + "main": {"host": "testserv", "port": 8765}, + "media_worker_1": {"host": "testserv", "port": 1001}, + } + + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.user = self.register_user("user", "testpass") + self.user_tok = self.login("user", "testpass") + self.admin_user = self.register_user("admin", "pass", admin=True) + self.admin_user_tok = self.login("admin", "pass") + + def test_delete_media_on_event_redaction(self) -> None: + """ + Tests that media is deleted when its attached event is redacted. + """ + # Make sure that censor_redaction loops runs on main hs + assert self.hs.config.worker.run_background_tasks, ( + "Main HS should run background tasks" + ) + assert self.hs.config.server.redaction_retention_period is not None, ( + "Redaction retention should be configured" + ) + + # Create media worker and it does not run the background tasks + media_worker = self.make_worker_hs( + "synapse.app.generic_worker", + { + "worker_name": "media_worker_1", + "run_background_tasks_on": MAIN_PROCESS_INSTANCE_NAME, + }, + ) + media_worker.get_media_repository_resource().register_servlets( + self._hs_to_site[media_worker].resource, media_worker + ) + media.register_servlets(media_worker, self._hs_to_site[media_worker].resource) + media_repo = media_worker.get_media_repository() + + assert not media_worker.config.worker.run_background_tasks, ( + "Worker should not run background tasks" + ) + + # Create a private room + room_id = self.helper.create_room_as( + self.user, + is_public=False, + tok=self.user_tok, + ) + + # The media is created with user_tok + content = io.BytesIO(SMALL_PNG) + content_uri = self.get_success( + media_repo.create_or_update_content( + "image/png", + "test_png_upload", + content, + 67, + UserID.from_string(self.user), + restricted=True, + ) + ) + media_id = content_uri.media_id + + # User sends a message with media + channel = self.make_request( + "PUT", + f"/rooms/{room_id}/send/m.room.message/{str(time.time())}?org.matrix.msc3911.attach_media={str(content_uri)}", + content={"msgtype": "m.text", "body": "Hi, this is a message"}, + access_token=self.user_tok, + ) + assert channel.code == HTTPStatus.OK, channel.json_body + assert "event_id" in channel.json_body + event_id = channel.json_body["event_id"] + + # Redact the event + channel = self.make_request( + "POST", + f"/_matrix/client/r0/rooms/{room_id}/redact/{event_id}", + content={}, + access_token=self.user_tok, + ) + assert channel.code == HTTPStatus.OK, channel.json_body + + # Verify the event is redacted before censoring + event_dict = self.helper.get_event(room_id, event_id, self.user_tok) + assert "redacted_because" in event_dict, "Event should be redacted" + + # Media should still be accessible before retention period is over + channel = make_request( + self.reactor, + self._hs_to_site[media_worker], + "GET", + f"/_matrix/client/v1/media/download/{self.hs.hostname}/{media_id}?allow_redacted_media=true", + shorthand=False, + access_token=self.user_tok, + ) + assert channel.code == 200, channel.result + assert channel.result["body"] == SMALL_PNG + + # Fast forward 7 days and 6 minutes to make sure the censor_redactions looping + # call detects the events are eligible for censorship. + self.reactor.advance(7 * 24 * 60 * 60 + 6 * 60) + + # Since we fast forward the reactor time, give some moment for the background + # censor redactions task to get caught up. + self.pump(0.01) + + # Check that the media has been deleted from the database + deleted_media = self.get_success( + self.hs.get_datastores().main.get_local_media(media_id) + ) + assert deleted_media is None, deleted_media + + # Check if the file is deleted from the storage as well. + assert isinstance(media_repo, MediaRepository) + assert not os.path.exists(media_repo.filepaths.local_media_filepath(media_id)) + + # Verify the redaction was censored in the database + redaction_censored = self.get_success( + self.hs.get_datastores().main.db_pool.simple_select_one_onecol( + table="redactions", + keyvalues={"redacts": event_id}, + retcol="have_censored", + ) + ) + assert redaction_censored, ( + "Redaction should have been censored by _censor_redactions loop" + ) + + channel = make_request( + self.reactor, + self._hs_to_site[media_worker], + "GET", + f"/_matrix/client/v1/media/download/{self.hs.hostname}/{media_id}?allow_redacted_media=true", + shorthand=False, + access_token=self.user_tok, + ) + assert channel.code == 404, channel.result + assert channel.json_body["errcode"] == "M_NOT_FOUND" + + def _log_request(request: Request) -> None: """Implements Factory.log, which is expected by Request.finish""" logger.info("Completed request %s", request) diff --git a/tests/storage/test_censor_events.py b/tests/storage/test_censor_events.py new file mode 100644 index 0000000000..95a61063df --- /dev/null +++ b/tests/storage/test_censor_events.py @@ -0,0 +1,253 @@ +import io +import os +import time +from http import HTTPStatus +from typing import Optional, Tuple + +from twisted.test.proto_helpers import MemoryReactor +from twisted.web.resource import Resource + +from synapse.media.media_repository import ( + MediaRepository, +) +from synapse.rest import admin +from synapse.rest.client import login, media, room +from synapse.server import HomeServer +from synapse.types import JsonDict, UserID +from synapse.util import Clock + +from tests.test_utils import SMALL_PNG +from tests.unittest import HomeserverTestCase + + +class MediaDeletionOnRedactionCensorshipTests(HomeserverTestCase): + """Tests for deleting media attached to redacted events.""" + + servlets = [ + media.register_servlets, + login.register_servlets, + admin.register_servlets, + room.register_servlets, + ] + use_isolated_media_paths = True + + def default_config(self) -> JsonDict: + config = super().default_config() + config.setdefault("experimental_features", {}) + config["experimental_features"].update({"msc3911": {"enabled": True}}) + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + self.media_repo = hs.get_media_repository() + self.bad_user = self.register_user("bad_user", "hackme") + self.bad_user_tok = self.login("bad_user", "hackme") + self.user = self.register_user("user", "pass") + self.user_tok = self.login("user", "pass") + self.admin = self.register_user("admin", "admin_pass", admin=True) + self.admin_tok = self.login("admin", "admin_pass") + self.room = self.helper.create_room_as( + room_creator=self.admin, tok=self.admin_tok + ) + + def create_resource_dict(self) -> dict[str, Resource]: + resources = super().create_resource_dict() + resources["/_matrix/media"] = self.hs.get_media_repository_resource() + return resources + + def _redact_event( + self, + access_token: str, + room_id: str, + event_id: str, + expect_code: int = 200, + with_relations: Optional[list[str]] = None, + content: Optional[JsonDict] = None, + ) -> JsonDict: + """Helper function to send a redaction event. + + Returns the json body. + """ + path = "/_matrix/client/r0/rooms/%s/redact/%s" % (room_id, event_id) + + request_content = content or {} + if with_relations: + request_content["org.matrix.msc3912.with_relations"] = with_relations + + channel = self.make_request( + "POST", path, request_content, access_token=access_token + ) + assert channel.code == expect_code, channel.json_body + return channel.json_body + + def _create_test_resource(self) -> Tuple[list[str], list[str]]: + event_ids = [] + media_ids = [] + self.helper.join(self.room, self.user, tok=self.user_tok) + self.helper.join(self.room, self.bad_user, tok=self.bad_user_tok) + + for _ in range(3): + # Create restricted media + mxc_uri = self.get_success( + self.media_repo.create_or_update_content( + "image/png", + "test_png_upload", + io.BytesIO(SMALL_PNG), + 67, + UserID.from_string(self.bad_user), + restricted=True, + ) + ) + # Make sure media is saved + assert mxc_uri is not None + assert isinstance(self.media_repo, MediaRepository) + media_path = self.media_repo.filepaths.local_media_filepath( + mxc_uri.media_id + ) + self.assertTrue(os.path.exists(media_path)) + assert self.get_success(self.store.get_local_media(mxc_uri.media_id)) + media_ids.append(mxc_uri.media_id) + + # Bad user create events with media attached + channel = self.make_request( + "PUT", + f"/rooms/{self.room}/send/m.room.message/{str(time.time())}?org.matrix.msc3911.attach_media={str(mxc_uri)}", + content={"msgtype": "m.text", "body": "Hi, this is a message"}, + access_token=self.bad_user_tok, + ) + assert channel.code == HTTPStatus.OK, channel.json_body + assert "event_id" in channel.json_body + event_id = channel.json_body["event_id"] + event_ids.append(event_id) + + # Check media restrictions field has proper event_id + restrictions = self.get_success( + self.hs.get_datastores().main.get_media_restrictions( + mxc_uri.server_name, mxc_uri.media_id + ) + ) + assert restrictions is not None, str(restrictions) + assert restrictions.event_id == event_id + return media_ids, event_ids + + def test_get_attached_media_ids(self) -> None: + """Test db function `get_attached_media_ids`""" + # Create events with media attached + _, event_ids = self._create_test_resource() + + # Check if `get_attached_media_ids` can get media ids attached to an event + for event_id in event_ids: + attached_media_ids = self.get_success( + self.store.get_attached_media_ids(event_id) + ) + assert len(attached_media_ids) == 1 + + def test_redacting_media_deletes_attached_media(self) -> None: + """Test that the censor_redactions background task deletes media that was + redacted 7 days ago per the default configuration. + """ + # Create events with media attached + media_ids, event_ids = self._create_test_resource() + + # Redact the events + for event_id in event_ids: + self._redact_event( + self.admin_tok, + self.room, + event_id, + expect_code=200, + ) + # Confirm the events redaction + event_dict = self.helper.get_event(self.room, event_id, self.admin_tok) + assert "redacted_because" in event_dict, event_dict + + # Fast forward 7 days and 6 minutes to make sure the censor_redactions looping + # call detects the events are eligible for censorship. + self.reactor.advance(7 * 24 * 60 * 60 + 6 * 60) + + # Check if the media is deleted from storage. + for media_id in media_ids: + media = self.get_success(self.store.get_local_media(media_id)) + assert media is None + assert isinstance(self.media_repo, MediaRepository) + assert not os.path.exists( + self.media_repo.filepaths.local_media_filepath(media_id) + ) + + def test_normal_users_lose_access_to_media_right_after_redaction(self) -> None: + """Test that normal users lose access to media after the event they + were attached to has been redacted. + """ + # Create events with media attached + media_ids, event_ids = self._create_test_resource() + + # Redact the bad user's events + for event_id in event_ids: + self._redact_event( + self.admin_tok, + self.room, + event_id, + expect_code=200, + ) + event_dict = self.helper.get_event(self.room, event_id, self.admin_tok) + self.assertIn("redacted_because", event_dict, event_dict) + + # Normal user trying to access redacted media should get 404 + for media_id in media_ids: + channel = self.make_request( + "GET", + f"/_matrix/client/v1/media/download/{self.hs.hostname}/{media_id}", + shorthand=False, + access_token=self.user_tok, + ) + assert channel.code == 404, channel.json_body + + def test_moderators_still_have_access_to_media_after_redaction_until_permanent_deletion( + self, + ) -> None: + """Test that users with moderator privileges still have access to media + after the event they were attached to has been redacted. + """ + # Create events with media attached + media_ids, event_ids = self._create_test_resource() + + # Redact the bad user's events + for event_id in event_ids: + self._redact_event( + self.admin_tok, + self.room, + event_id, + expect_code=200, + ) + event_dict = self.helper.get_event(self.room, event_id, self.admin_tok) + self.assertIn("redacted_because", event_dict, event_dict) + + # User with moderator privileges still have access to redacted media + for media_id in media_ids: + channel = self.make_request( + "GET", + f"/_matrix/client/v1/media/download/{self.hs.hostname}/{media_id}?allow_redacted_media=true", + shorthand=False, + access_token=self.admin_tok, + ) + assert channel.code == 200, channel.json_body + + # After 7 days and 6 minutes, media that are attached to the redacted events + # should be permanently deleted from disk and moderators no longer have access to + # them + self.reactor.advance(7 * 24 * 60 * 60 + 6 * 60) + + for media_id in media_ids: + channel = self.make_request( + "GET", + f"/_matrix/client/v1/media/download/{self.hs.hostname}/{media_id}?allow_redacted_media=true", + shorthand=False, + access_token=self.admin_tok, + ) + assert channel.code == 404, channel.json_body + + # Check that the media has been deleted + deleted_media = self.get_success( + self.hs.get_datastores().main.get_local_media(media_id) + ) + assert deleted_media is None