diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py index 24c2b86935..efe14490b3 100644 --- a/synapse/media/media_repository.py +++ b/synapse/media/media_repository.py @@ -33,6 +33,7 @@ import twisted.web.http from twisted.internet.defer import Deferred +from synapse import event_auth from synapse.api.constants import EventTypes, HistoryVisibility, Membership from synapse.api.errors import ( Codes, @@ -70,13 +71,17 @@ from synapse.media.thumbnailer import Thumbnailer, ThumbnailError from synapse.media.url_previewer import UrlPreviewer from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.replication.http.media import ReplicationCopyMediaServlet +from synapse.replication.http.media import ( + ReplicationCopyMediaServlet, + ReplicationDeleteMediaServlet, +) +from synapse.state import CREATE_KEY, POWER_KEY from synapse.storage.databases.main.media_repository import ( LocalMedia, MediaRestrictions, RemoteMedia, ) -from synapse.types import JsonDict, Requester, UserID +from synapse.types import JsonDict, RedactedMediaBypass, Requester, UserID from synapse.types.state import StateFilter from synapse.util import json_decoder from synapse.util.async_helpers import Linearizer @@ -111,6 +116,7 @@ def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() self.server_name = hs.hostname self.store = hs.get_datastores().main + self._storage_controllers = hs.get_storage_controllers() self._is_mine_server_name = hs.is_mine_server_name self.msc3911_config = hs.config.experimental.msc3911 @@ -183,6 +189,13 @@ async def copy_media( "Sorry Mario, your MediaRepository related function is in another castle" ) + async def _remove_local_media_from_disk( + self, media_ids: List[str] + ) -> Tuple[List[str], int]: + raise NotImplementedError( + "Sorry Mario, your MediaRepository related function is in another castle" + ) + async def reached_pending_media_limit(self, auth_user: UserID) -> Tuple[bool, int]: raise NotImplementedError( "Sorry Mario, your MediaRepository related function is in another castle" @@ -241,6 +254,7 @@ async def get_local_media( requester: Optional[Requester] = None, allow_authenticated: bool = True, federation: bool = False, + allow_redacted_media: bool = False, ) -> None: raise NotImplementedError( "Sorry Mario, your MediaRepository related function is in another castle" @@ -312,7 +326,10 @@ async def validate_media_restriction( return attachments async def is_media_visible( - self, requesting_user: UserID, media_info_object: Union[LocalMedia, RemoteMedia] + self, + requesting_user: UserID, + media_info_object: Union[LocalMedia, RemoteMedia], + redacted_media_bypass_config: Optional[RedactedMediaBypass] = None, ) -> None: """ Verify that media requested for download should be visible to the user making @@ -349,6 +366,46 @@ async def is_media_visible( if attached_event_id: event_base = await self.store.get_event(attached_event_id) + if event_base.internal_metadata.is_redacted(): + if ( + not redacted_media_bypass_config + or not redacted_media_bypass_config.requesting_bypass + ): + # If the event the media is attached to is redacted, don't serve that + # media to the user. Moderators and admins should probably be excluded + # from this restriction + raise NotFoundError() + + # Which means a bypass was requested + if redacted_media_bypass_config.is_admin: + # System admins get to bypass the rest of the checks + return + else: + # Not an admin, let's check they have a high enough power level. + # Lifted this directly from RoomEventServlet for msc2815 + auth_events = ( + await self._storage_controllers.state.get_current_state( + event_base.room_id, + StateFilter.from_types( + [ + POWER_KEY, + CREATE_KEY, + ] + ), + ) + ) + + redact_level = event_auth.get_named_level(auth_events, "redact", 50) + user_level = event_auth.get_user_power_level( + requesting_user.to_string(), auth_events + ) + if user_level < redact_level: + raise SynapseError( + 403, + "You don't have permission to view redacted events in this room.", + errcode=Codes.FORBIDDEN, + ) + if event_base.is_state(): # The standard event visibility utility, filter_events_for_client(), # does not seem to meet the needs of a good UX when restricting and @@ -534,6 +591,7 @@ def __init__(self, hs: "HomeServer"): super().__init__(hs) # initialize replication endpoint here self.copy_media_client = ReplicationCopyMediaServlet.make_client(hs) + self.delete_media_client = ReplicationDeleteMediaServlet.make_client(hs) async def copy_media( self, existing_mxc: MXCUri, auth_user: UserID, max_timeout_ms: int @@ -550,6 +608,18 @@ async def copy_media( ) return MXCUri.from_str(result["content_uri"]) + async def _remove_local_media_from_disk( + self, media_ids: List[str] + ) -> Tuple[List[str], int]: + """ + Call out to the worker responsible for handling media to delete this media object + """ + result = await self.delete_media_client( + instance_name=self.hs.config.worker.workers_doing_media_duty[0], + media_ids=media_ids, + ) + return result["deleted"], result["count"] + class MediaRepository(AbstractMediaRepository): def __init__(self, hs: "HomeServer"): @@ -944,7 +1014,11 @@ def respond_not_yet_uploaded(self, request: SynapseRequest) -> None: ) async def get_local_media_info( - self, request: SynapseRequest, media_id: str, max_timeout_ms: int + self, + request: SynapseRequest, + media_id: str, + max_timeout_ms: int, + allow_redacted_media: bool = False, ) -> Optional[LocalMedia]: """Gets the info dictionary for given local media ID. If the media has not been uploaded yet, this function will wait up to ``max_timeout_ms`` @@ -956,6 +1030,7 @@ async def get_local_media_info( the file_id for local content.) max_timeout_ms: the maximum number of milliseconds to wait for the media to be uploaded. + allow_redacted_media: Returns: Either the info dictionary for the given local media ID or @@ -979,7 +1054,17 @@ async def get_local_media_info( # The file has been uploaded, so stop looping if media_info.media_length is not None: if isinstance(request.requester, Requester): - await self.is_media_visible(request.requester.user, media_info) + # Only check media visibility if this is for a local request + is_admin = await self.auth.is_server_admin(request.requester) + redacted_media_bypass_config = RedactedMediaBypass( + allow_redacted_media, is_admin + ) + + await self.is_media_visible( + request.requester.user, + media_info, + redacted_media_bypass_config, + ) return media_info # Check if the media ID has expired and still hasn't been uploaded to. @@ -1008,6 +1093,7 @@ async def get_local_media( requester: Optional[Requester] = None, allow_authenticated: bool = True, federation: bool = False, + allow_redacted_media: bool = False, ) -> None: """Responds to requests for local media, if exists, or returns 404. @@ -1019,6 +1105,7 @@ async def get_local_media( the filename in the Content-Disposition header of the response. max_timeout_ms: the maximum number of milliseconds to wait for the media to be uploaded. + allow_redacted_media: requester: The user making the request, to verify restricted media. Only used for local users, not over federation allow_authenticated: whether media marked as authenticated may be served to this request @@ -1027,7 +1114,9 @@ async def get_local_media( Returns: Resolves once a response has successfully been written to request """ - media_info = await self.get_local_media_info(request, media_id, max_timeout_ms) + media_info = await self.get_local_media_info( + request, media_id, max_timeout_ms, allow_redacted_media + ) if not media_info: return @@ -1039,10 +1128,6 @@ async def get_local_media( # if MSC3911 is enabled, check visibility of the media for the user and retrieve # any restrictions if self.msc3911_config.enabled: - if requester is not None: - # Only check media visibility if this is for a local request. This will - # raise directly back to the client if not visible - await self.is_media_visible(requester.user, media_info) restrictions = await self.validate_media_restriction( request, media_info, None, federation ) @@ -1096,6 +1181,7 @@ async def get_remote_media( use_federation_endpoint: bool, requester: Optional[Requester] = None, allow_authenticated: bool = True, + allow_redacted_media: bool = False, ) -> None: """Respond to requests for remote media. @@ -1114,6 +1200,7 @@ async def get_remote_media( used for local users, not over federation allow_authenticated: whether media marked as authenticated may be served to this request + allow_redacted_media: Returns: Resolves once a response has successfully been written to request @@ -1146,7 +1233,8 @@ async def get_remote_media( ip_address, use_federation_endpoint, allow_authenticated, - requester, + allow_redacted_media=allow_redacted_media, + requester=requester, ) # Check if the media is cached on the client, if so return 304. We need @@ -1182,6 +1270,7 @@ async def get_remote_media_info( use_federation: bool, allow_authenticated: bool, requester: Optional[Requester] = None, + allow_redacted_media: bool = False, ) -> RemoteMedia: """Gets the media info associated with the remote file, downloading if necessary. @@ -1198,6 +1287,7 @@ async def get_remote_media_info( request requester: The user making the request, to verify restricted media. Only used for local users, not over federation + allow_redacted_media: Returns: The media info of the file @@ -1220,7 +1310,8 @@ async def get_remote_media_info( ip_address, use_federation, allow_authenticated, - requester, + allow_redacted_media=allow_redacted_media, + requester=requester, ) # Ensure we actually use the responder so that it releases resources @@ -1239,6 +1330,7 @@ async def _get_remote_media_impl( ip_address: str, use_federation_endpoint: bool, allow_authenticated: bool, + allow_redacted_media: bool = False, requester: Optional[Requester] = None, ) -> Tuple[Optional[Responder], RemoteMedia]: """Looks for media in local cache, if not there then attempt to @@ -1256,6 +1348,7 @@ async def _get_remote_media_impl( use_federation_endpoint: whether to request the remote media over the new federation /download endpoint allow_authenticated: + allow_redacted_media: requester: The user making the request, to verify restricted media. Only used for local users, not over federation @@ -1276,8 +1369,14 @@ async def _get_remote_media_impl( # exists in the local database and again further down for after it was # retrieved from the remote. if self.msc3911_config.enabled and requester is not None: + is_admin = await self.auth.is_server_admin(requester) + redacted_media_bypass_config = RedactedMediaBypass( + allow_redacted_media, is_admin + ) # This will raise directly back to the client if not visible - await self.is_media_visible(requester.user, media_info) + await self.is_media_visible( + requester.user, media_info, redacted_media_bypass_config + ) # file_id is the ID we use to track the file locally. If we've already # seen the file then reuse the existing ID, otherwise generate a new @@ -1336,8 +1435,14 @@ async def _get_remote_media_impl( and self.msc3911_config.enabled and requester is not None ): + is_admin = await self.auth.is_server_admin(requester) + redacted_media_bypass_config = RedactedMediaBypass( + allow_redacted_media, is_admin + ) # This will raise directly back to the client if not visible - await self.is_media_visible(requester.user, media_info) + await self.is_media_visible( + requester.user, media_info, redacted_media_bypass_config + ) file_id = media_info.filesystem_id diff --git a/synapse/replication/http/media.py b/synapse/replication/http/media.py index 42ecfcd9d4..fcb6f0e7d7 100644 --- a/synapse/replication/http/media.py +++ b/synapse/replication/http/media.py @@ -96,5 +96,46 @@ async def _handle_request( # type: ignore[override] return 200, {"content_uri": str(mxc_uri)} +class ReplicationDeleteMediaServlet(ReplicationEndpoint): + """Request the MediaRepository to delete a piece of media from filesystem. + + Request format: + + DELETE /_synapse/replication/delete_media + + { + "media_ids": [...], # List of media IDs to delete + } + + """ + + NAME = "delete_media" + PATH_ARGS = () + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + self.media_repo = hs.get_media_repository() + + @staticmethod + async def _serialize_payload( # type: ignore[override] + media_ids: list[str], + ) -> JsonDict: + """ + Args: + media_ids: The list of media IDs to delete. + """ + return {"media_ids": media_ids} + + async def _handle_request( # type: ignore[override] + self, + request: Request, + content: JsonDict, + ) -> Tuple[int, JsonDict]: + media_ids = content["media_ids"] + deleted, count = await self.media_repo._remove_local_media_from_disk(media_ids) + return 200, {"deleted": deleted, "count": count} + + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: ReplicationCopyMediaServlet(hs).register(http_server) + ReplicationDeleteMediaServlet(hs).register(http_server) diff --git a/synapse/rest/client/media.py b/synapse/rest/client/media.py index d2c491bcd9..8abe506e17 100644 --- a/synapse/rest/client/media.py +++ b/synapse/rest/client/media.py @@ -40,6 +40,7 @@ ) from synapse.http.servlet import ( RestServlet, + parse_boolean, parse_integer, parse_json_object_from_request, parse_string, @@ -277,9 +278,19 @@ async def on_GET( ) max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS) + # TODO: determine if the parameter needs an unstable identifier + allow_redacted_media = parse_boolean( + request, "allow_redacted_media", default=False + ) + if self._is_mine_server_name(server_name): await self.media_repo.get_local_media( - request, media_id, file_name, max_timeout_ms, requester + request, + media_id, + file_name, + max_timeout_ms, + allow_redacted_media=allow_redacted_media, + requester=requester, ) else: ip_address = request.getClientAddress().host @@ -292,6 +303,7 @@ async def on_GET( ip_address, True, requester, + allow_redacted_media=allow_redacted_media, ) diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py index 5b15fd707d..01c9d892c4 100644 --- a/synapse/storage/databases/main/censor_events.py +++ b/synapse/storage/databases/main/censor_events.py @@ -59,7 +59,7 @@ def __init__( @wrap_as_background_process("_censor_redactions") async def _censor_redactions(self) -> None: """Censors all redactions older than the configured period that haven't - been censored yet. + been censored yet and deletes any media attached to the redacted events. By censor we mean update the event_json table with the redacted event. """ @@ -104,12 +104,16 @@ async def _censor_redactions(self) -> None: ) updates = [] + media = [] for redaction_id, event_id in rows: redaction_event = await self.get_event(redaction_id, allow_none=True) original_event = await self.get_event( event_id, allow_rejected=True, allow_none=True ) + attached_media_ids = ( + await self.hs.get_datastores().main.get_attached_media_ids(event_id) + ) # The SQL above ensures that we have both the redaction and # original event, so if the `get_event` calls return None it @@ -131,6 +135,7 @@ async def _censor_redactions(self) -> None: pruned_json = None updates.append((redaction_id, event_id, pruned_json)) + media.extend(attached_media_ids) def _update_censor_txn(txn: LoggingTransaction) -> None: for redaction_id, event_id, pruned_json in updates: @@ -145,6 +150,7 @@ def _update_censor_txn(txn: LoggingTransaction) -> None: ) await self.db_pool.runInteraction("_update_censor_txn", _update_censor_txn) + await self.hs.get_media_repository()._remove_local_media_from_disk(media) def _censor_event_txn( self, txn: LoggingTransaction, event_id: str, pruned_json: str diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py index de16f9b736..8a2c557c15 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py @@ -19,6 +19,7 @@ # [This file includes modifications made by New Vector Limited] # # +import json import logging from enum import Enum from http import HTTPStatus @@ -45,6 +46,7 @@ LoggingDatabaseConnection, LoggingTransaction, ) +from synapse.storage.engines import PostgresEngine from synapse.types import JsonDict, UserID from synapse.util import json_encoder @@ -1022,6 +1024,11 @@ def delete_remote_media_txn(txn: LoggingTransaction) -> None: "remote_media_cache_thumbnails", keyvalues={"media_origin": media_origin, "media_id": media_id}, ) + self.db_pool.simple_delete_txn( + txn, + "media_attachments", + keyvalues={"server_name": media_origin, "media_id": media_id}, + ) await self.db_pool.runInteraction( "delete_remote_media", delete_remote_media_txn @@ -1413,3 +1420,33 @@ def _get_reference_count_txn(txn: LoggingTransaction) -> int: return await self.db_pool.runInteraction( "get_media_reference_count_for_sha256", _get_reference_count_txn ) + + async def get_attached_media_ids(self, event_id: str) -> list[str]: + """ + Get a list of media_ids that are attached to a specific event_id. + """ + + def get_attached_media_ids_txn(txn: LoggingTransaction) -> list[str]: + if isinstance(self.db_pool.engine, PostgresEngine): + # Use GIN index for Postgres + sql = """ + SELECT media_id + FROM media_attachments + WHERE restrictions_json @> %s AND server_name = %s + """ + json_param = json.dumps({"restrictions": {"event_id": event_id}}) + txn.execute(sql, (json_param, self.hs.hostname)) + else: + sql = """ + SELECT media_id + FROM media_attachments + WHERE restrictions_json->'restrictions'->>'event_id' = ? AND server_name = ? + """ + txn.execute(sql, (event_id, self.hs.hostname)) + + return [row[0] for row in txn.fetchall()] + + return await self.db_pool.runInteraction( + "get_attached_media_ids", + get_attached_media_ids_txn, + ) diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 914bb6cb23..d6518e14ac 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -23,6 +23,7 @@ import logging import re import string +from collections import namedtuple from enum import Enum from typing import ( TYPE_CHECKING, @@ -1530,3 +1531,8 @@ class ScheduledTask: result: Optional[JsonMapping] # Optional error that should be assigned a value when the status is FAILED error: Optional[str] + + +RedactedMediaBypass = namedtuple( + "RedactedMediaBypass", ["requesting_bypass", "is_admin"] +) diff --git a/tests/replication/test_multi_media_repo.py b/tests/replication/test_multi_media_repo.py index 32bb1f8d53..09ceb96237 100644 --- a/tests/replication/test_multi_media_repo.py +++ b/tests/replication/test_multi_media_repo.py @@ -33,6 +33,7 @@ from twisted.web.server import Request from synapse.api.constants import EventTypes, HistoryVisibility +from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME from synapse.media._base import FileInfo from synapse.media.media_repository import MediaRepository from synapse.rest import admin @@ -967,6 +968,173 @@ def test_copy_remote_restricted_resource_fails_when_requester_does_not_have_acce self.assertEqual(channel.code, 403) +class DeleteRestrictedMediaOnEventRedactionReplicationTestCase( + BaseMultiWorkerStreamTestCase +): + """ + Tests that media attached to redacted events are deleted after the retention period + when `msc3911.enabled` is configured to be True. + """ + + servlets = [ + login.register_servlets, + admin.register_servlets, + room.register_servlets, + ] + use_isolated_media_paths = True + + def default_config(self) -> Dict[str, Any]: + config = super().default_config() + config.update( + { + "experimental_features": {"msc3911": {"enabled": True}}, + "media_repo_instances": ["media_worker_1"], + "run_background_tasks_on": MAIN_PROCESS_INSTANCE_NAME, + "redaction_retention_period": "7d", + } + ) + config["instance_map"] = { + "main": {"host": "testserv", "port": 8765}, + "media_worker_1": {"host": "testserv", "port": 1001}, + } + + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.user = self.register_user("user", "testpass") + self.user_tok = self.login("user", "testpass") + self.admin_user = self.register_user("admin", "pass", admin=True) + self.admin_user_tok = self.login("admin", "pass") + + def test_delete_media_on_event_redaction(self) -> None: + """ + Tests that media is deleted when its attached event is redacted. + """ + # Make sure that censor_redaction loops runs on main hs + assert self.hs.config.worker.run_background_tasks, ( + "Main HS should run background tasks" + ) + assert self.hs.config.server.redaction_retention_period is not None, ( + "Redaction retention should be configured" + ) + + # Create media worker and it does not run the background tasks + media_worker = self.make_worker_hs( + "synapse.app.generic_worker", + { + "worker_name": "media_worker_1", + "run_background_tasks_on": MAIN_PROCESS_INSTANCE_NAME, + }, + ) + media_worker.get_media_repository_resource().register_servlets( + self._hs_to_site[media_worker].resource, media_worker + ) + media.register_servlets(media_worker, self._hs_to_site[media_worker].resource) + media_repo = media_worker.get_media_repository() + + assert not media_worker.config.worker.run_background_tasks, ( + "Worker should not run background tasks" + ) + + # Create a private room + room_id = self.helper.create_room_as( + self.user, + is_public=False, + tok=self.user_tok, + ) + + # The media is created with user_tok + content = io.BytesIO(SMALL_PNG) + content_uri = self.get_success( + media_repo.create_or_update_content( + "image/png", + "test_png_upload", + content, + 67, + UserID.from_string(self.user), + restricted=True, + ) + ) + media_id = content_uri.media_id + + # User sends a message with media + channel = self.make_request( + "PUT", + f"/rooms/{room_id}/send/m.room.message/{str(time.time())}?org.matrix.msc3911.attach_media={str(content_uri)}", + content={"msgtype": "m.text", "body": "Hi, this is a message"}, + access_token=self.user_tok, + ) + assert channel.code == HTTPStatus.OK, channel.json_body + assert "event_id" in channel.json_body + event_id = channel.json_body["event_id"] + + # Redact the event + channel = self.make_request( + "POST", + f"/_matrix/client/r0/rooms/{room_id}/redact/{event_id}", + content={}, + access_token=self.user_tok, + ) + assert channel.code == HTTPStatus.OK, channel.json_body + + # Verify the event is redacted before censoring + event_dict = self.helper.get_event(room_id, event_id, self.user_tok) + assert "redacted_because" in event_dict, "Event should be redacted" + + # Media should still be accessible before retention period is over + channel = make_request( + self.reactor, + self._hs_to_site[media_worker], + "GET", + f"/_matrix/client/v1/media/download/{self.hs.hostname}/{media_id}?allow_redacted_media=true", + shorthand=False, + access_token=self.user_tok, + ) + assert channel.code == 200, channel.result + assert channel.result["body"] == SMALL_PNG + + # Fast forward 7 days and 6 minutes to make sure the censor_redactions looping + # call detects the events are eligible for censorship. + self.reactor.advance(7 * 24 * 60 * 60 + 6 * 60) + + # Since we fast forward the reactor time, give some moment for the background + # censor redactions task to get caught up. + self.pump(0.01) + + # Check that the media has been deleted from the database + deleted_media = self.get_success( + self.hs.get_datastores().main.get_local_media(media_id) + ) + assert deleted_media is None, deleted_media + + # Check if the file is deleted from the storage as well. + assert isinstance(media_repo, MediaRepository) + assert not os.path.exists(media_repo.filepaths.local_media_filepath(media_id)) + + # Verify the redaction was censored in the database + redaction_censored = self.get_success( + self.hs.get_datastores().main.db_pool.simple_select_one_onecol( + table="redactions", + keyvalues={"redacts": event_id}, + retcol="have_censored", + ) + ) + assert redaction_censored, ( + "Redaction should have been censored by _censor_redactions loop" + ) + + channel = make_request( + self.reactor, + self._hs_to_site[media_worker], + "GET", + f"/_matrix/client/v1/media/download/{self.hs.hostname}/{media_id}?allow_redacted_media=true", + shorthand=False, + access_token=self.user_tok, + ) + assert channel.code == 404, channel.result + assert channel.json_body["errcode"] == "M_NOT_FOUND" + + def _log_request(request: Request) -> None: """Implements Factory.log, which is expected by Request.finish""" logger.info("Completed request %s", request) diff --git a/tests/rest/client/test_media_download.py b/tests/rest/client/test_media_download.py index e71b4c8acc..cd9c0915b5 100644 --- a/tests/rest/client/test_media_download.py +++ b/tests/rest/client/test_media_download.py @@ -72,6 +72,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: "profile_test_user", "testpass" ) self.other_profile_test_user_tok = self.login("profile_test_user", "testpass") + self.admin_user = self.register_user("bossman", "testpass", admin=True) + self.admin_tok = self.login("bossman", "testpass") def _create_restricted_media(self, user: str) -> MXCUri: mxc_uri = self.get_success( @@ -91,17 +93,22 @@ def fetch_media( mxc_uri: MXCUri, access_token: Optional[str] = None, expected_code: int = 200, + attempt_bypass: bool = False, ) -> None: """ Test retrieving the media. We do not care about the content of the media, just that the response is correct """ + path = f"/_matrix/client/v1/media/download/{mxc_uri.server_name}/{mxc_uri.media_id}" + if attempt_bypass: + path += "?allow_redacted_media=true" channel = self.make_request( "GET", - f"/_matrix/client/v1/media/download/{mxc_uri.server_name}/{mxc_uri.media_id}", + path, access_token=access_token or self.creator_tok, + shorthand=False, ) - assert channel.code == expected_code, channel.code + assert channel.code == expected_code, channel.json_body def test_local_media_download_unrestricted(self) -> None: """Test that unrestricted media is not affected""" @@ -338,3 +345,336 @@ def test_local_media_download_attached_to_state_event_failure(self) -> None: # This user has joined the room and can now see this image. Can't see the # related membership event, but :man-shrug: self.fetch_media(mxc_uri, access_token=self.other_user_tok) + + def _redact_event( + self, + access_token: str, + room_id: str, + event_id: str, + expect_code: int = 200, + with_relations: Optional[list[str]] = None, + content: Optional[JsonDict] = None, + ) -> JsonDict: + """Helper function to send a redaction event. + + Returns the json body. + """ + path = "/_matrix/client/r0/rooms/%s/redact/%s" % (room_id, event_id) + + request_content = content or {} + if with_relations: + request_content["org.matrix.msc3912.with_relations"] = with_relations + + channel = self.make_request( + "POST", path, request_content, access_token=access_token + ) + self.assertEqual(channel.code, expect_code) + return channel.json_body + + def test_local_media_download_attached_to_redacted_event_normal(self) -> None: + """ + Test that can local media attached to image event can be restricted if redacted + """ + mxc_uri = self._create_restricted_media(self.creator) + room_id = self.helper.create_room_as(self.creator, tok=self.creator_tok) + + # set room history_visibility to joined, otherwise it will be 'shared' + self.helper.send_state( + room_id=room_id, + event_type=EventTypes.RoomHistoryVisibility, + body={"history_visibility": HistoryVisibility.JOINED}, + tok=self.creator_tok, + ) + + self.helper.join(room_id, self.other_user, tok=self.other_user_tok) + + image = { + "body": "test_png_upload", + "info": {"h": 1, "mimetype": "image/png", "size": 67, "w": 1}, + "msgtype": "m.image", + "url": str(mxc_uri), + } + json_body = self.helper.send_event( + room_id, + "m.room.message", + content=image, + tok=self.creator_tok, + expect_code=200, + attach_media_mxc=str(mxc_uri), + ) + assert "event_id" in json_body + + # Both users should be able to see the event + self.fetch_media(mxc_uri) + self.fetch_media(mxc_uri, access_token=self.other_user_tok) + + # now, redact that event, and try and retrieve the media again + self._redact_event(self.creator_tok, room_id, json_body["event_id"]) + + self.fetch_media(mxc_uri, expected_code=404) + self.fetch_media(mxc_uri, access_token=self.other_user_tok, expected_code=404) + + def test_local_media_download_attached_to_redacted_event_admin(self) -> None: + """ + Test that can local media attached to image event can be restricted if redacted. + Specifically, test that a system administrator can bypass that if requested + """ + mxc_uri = self._create_restricted_media(self.creator) + room_id = self.helper.create_room_as(self.creator, tok=self.creator_tok) + + # set room history_visibility to joined, otherwise it will be 'shared' + self.helper.send_state( + room_id=room_id, + event_type=EventTypes.RoomHistoryVisibility, + body={"history_visibility": HistoryVisibility.JOINED}, + tok=self.creator_tok, + ) + + self.helper.join(room_id, self.other_user, tok=self.other_user_tok) + self.helper.join(room_id, self.admin_user, tok=self.admin_tok) + + image = { + "body": "test_png_upload", + "info": {"h": 1, "mimetype": "image/png", "size": 67, "w": 1}, + "msgtype": "m.image", + "url": str(mxc_uri), + } + json_body = self.helper.send_event( + room_id, + "m.room.message", + content=image, + tok=self.creator_tok, + expect_code=200, + attach_media_mxc=str(mxc_uri), + ) + assert "event_id" in json_body + + # Both users should be able to see the event + self.fetch_media(mxc_uri) + self.fetch_media(mxc_uri, access_token=self.other_user_tok) + self.fetch_media(mxc_uri, access_token=self.admin_tok) + + # now, redact that event, and try and retrieve the media again + self._redact_event(self.creator_tok, room_id, json_body["event_id"]) + + self.fetch_media(mxc_uri, expected_code=404) + self.fetch_media(mxc_uri, access_token=self.other_user_tok, expected_code=404) + self.fetch_media(mxc_uri, access_token=self.admin_tok, expected_code=404) + + # Let's see if the bypass works + self.fetch_media(mxc_uri, access_token=self.admin_tok, attempt_bypass=True) + + def test_local_media_download_attached_to_redacted_event_room_moderator( + self, + ) -> None: + """ + Test that can local media attached to image event can be restricted if redacted. + Specifically, test that a room moderator can bypass that if requested and + empowered to + """ + mxc_uri = self._create_restricted_media(self.creator) + room_id = self.helper.create_room_as(self.creator, tok=self.creator_tok) + + # set room history_visibility to joined, otherwise it will be 'shared' + self.helper.send_state( + room_id=room_id, + event_type=EventTypes.RoomHistoryVisibility, + body={"history_visibility": HistoryVisibility.JOINED}, + tok=self.creator_tok, + ) + + # Adjust power levels in the room. Redacting is defaulted to 50, so let's bump + # the other user. "user_default" dictates this was at "0" + pl = self.helper.get_state( + room_id, EventTypes.PowerLevels, tok=self.creator_tok + ) + pl["users"][self.other_user] = 50 + self.helper.send_state( + room_id, EventTypes.PowerLevels, body=pl, tok=self.creator_tok + ) + + self.helper.join(room_id, self.other_user, tok=self.other_user_tok) + self.helper.join(room_id, self.admin_user, tok=self.admin_tok) + + image = { + "body": "test_png_upload", + "info": {"h": 1, "mimetype": "image/png", "size": 67, "w": 1}, + "msgtype": "m.image", + "url": str(mxc_uri), + } + json_body = self.helper.send_event( + room_id, + "m.room.message", + content=image, + tok=self.creator_tok, + expect_code=200, + attach_media_mxc=str(mxc_uri), + ) + assert "event_id" in json_body + + # Both users should be able to see the event + self.fetch_media(mxc_uri) + self.fetch_media(mxc_uri, access_token=self.other_user_tok) + self.fetch_media(mxc_uri, access_token=self.admin_tok) + + # now, redact that event, and try and retrieve the media again + self._redact_event(self.creator_tok, room_id, json_body["event_id"]) + + self.fetch_media(mxc_uri, expected_code=404) + self.fetch_media(mxc_uri, access_token=self.other_user_tok, expected_code=404) + self.fetch_media(mxc_uri, access_token=self.admin_tok, expected_code=404) + + # Let's see if the bypass works + self.fetch_media(mxc_uri, access_token=self.other_user_tok, attempt_bypass=True) + + def test_local_media_download_attached_to_redacted_state_event_normal(self) -> None: + """Test that a simple membership avatar is viewable when appropriate""" + mxc_uri = self._create_restricted_media(self.creator) + room_id = self.helper.create_room_as(self.creator, tok=self.creator_tok) + + # set room history_visibility to joined + self.helper.send_state( + room_id=room_id, + event_type=EventTypes.RoomHistoryVisibility, + body={"history_visibility": HistoryVisibility.JOINED}, + tok=self.creator_tok, + ) + + self.helper.join(room_id, self.other_user, tok=self.other_user_tok) + + membership_content = { + EventContentFields.MEMBERSHIP: Membership.JOIN, + "avatar_url": str(mxc_uri), + } + json_body = self.helper.send_state( + room_id, + EventTypes.Member, + body=membership_content, + tok=self.creator_tok, + expect_code=200, + state_key=self.creator, + attach_media_mxc=str(mxc_uri), + ) + assert "event_id" in json_body + + # Both users should be able to see the media + self.fetch_media(mxc_uri) + self.fetch_media(mxc_uri, access_token=self.other_user_tok) + + # now, redact that event, and try and retrieve the media again + self._redact_event(self.creator_tok, room_id, json_body["event_id"]) + + self.fetch_media(mxc_uri, expected_code=404) + self.fetch_media(mxc_uri, access_token=self.other_user_tok, expected_code=404) + + def test_local_media_download_attached_to_redacted_state_event_admin(self) -> None: + """ + Test that a simple membership avatar is viewable when appropriate. Specifically, + test that a system administrator can bypass that if requested + + """ + mxc_uri = self._create_restricted_media(self.creator) + room_id = self.helper.create_room_as(self.creator, tok=self.creator_tok) + + # set room history_visibility to joined + self.helper.send_state( + room_id=room_id, + event_type=EventTypes.RoomHistoryVisibility, + body={"history_visibility": HistoryVisibility.JOINED}, + tok=self.creator_tok, + ) + + self.helper.join(room_id, self.other_user, tok=self.other_user_tok) + self.helper.join(room_id, self.admin_user, tok=self.admin_tok) + + membership_content = { + EventContentFields.MEMBERSHIP: Membership.JOIN, + "avatar_url": str(mxc_uri), + } + json_body = self.helper.send_state( + room_id, + EventTypes.Member, + body=membership_content, + tok=self.creator_tok, + expect_code=200, + state_key=self.creator, + attach_media_mxc=str(mxc_uri), + ) + assert "event_id" in json_body + + # Both users should be able to see the media + self.fetch_media(mxc_uri) + self.fetch_media(mxc_uri, access_token=self.other_user_tok) + self.fetch_media(mxc_uri, access_token=self.admin_tok) + + # now, redact that event, and try and retrieve the media again + self._redact_event(self.creator_tok, room_id, json_body["event_id"]) + + self.fetch_media(mxc_uri, expected_code=404) + self.fetch_media(mxc_uri, access_token=self.other_user_tok, expected_code=404) + self.fetch_media(mxc_uri, access_token=self.admin_tok, expected_code=404) + + # Let's see if the bypass works + self.fetch_media(mxc_uri, access_token=self.admin_tok, attempt_bypass=True) + + def test_local_media_download_attached_to_redacted_state_event_room_moderator( + self, + ) -> None: + """ + Test that a simple membership avatar is viewable when appropriate. + Specifically, test that a room moderator can bypass that if requested and + empowered to + """ + mxc_uri = self._create_restricted_media(self.creator) + room_id = self.helper.create_room_as(self.creator, tok=self.creator_tok) + + # set room history_visibility to joined + self.helper.send_state( + room_id=room_id, + event_type=EventTypes.RoomHistoryVisibility, + body={"history_visibility": HistoryVisibility.JOINED}, + tok=self.creator_tok, + ) + + # Adjust power levels in the room. Redacting is defaulted to 50, so let's bump + # the other user. "user_default" dictates this was at "0" + pl = self.helper.get_state( + room_id, EventTypes.PowerLevels, tok=self.creator_tok + ) + pl["users"][self.other_user] = 50 + self.helper.send_state( + room_id, EventTypes.PowerLevels, body=pl, tok=self.creator_tok + ) + + self.helper.join(room_id, self.other_user, tok=self.other_user_tok) + self.helper.join(room_id, self.admin_user, tok=self.admin_tok) + + membership_content = { + EventContentFields.MEMBERSHIP: Membership.JOIN, + "avatar_url": str(mxc_uri), + } + json_body = self.helper.send_state( + room_id, + EventTypes.Member, + body=membership_content, + tok=self.creator_tok, + expect_code=200, + state_key=self.creator, + attach_media_mxc=str(mxc_uri), + ) + assert "event_id" in json_body + + # Both users should be able to see the media + self.fetch_media(mxc_uri) + self.fetch_media(mxc_uri, access_token=self.other_user_tok) + self.fetch_media(mxc_uri, access_token=self.admin_tok) + + # now, redact that event, and try and retrieve the media again + self._redact_event(self.creator_tok, room_id, json_body["event_id"]) + + self.fetch_media(mxc_uri, expected_code=404) + self.fetch_media(mxc_uri, access_token=self.other_user_tok, expected_code=404) + self.fetch_media(mxc_uri, access_token=self.admin_tok, expected_code=404) + + # Let's see if the bypass works + self.fetch_media(mxc_uri, access_token=self.other_user_tok, attempt_bypass=True) diff --git a/tests/storage/test_censor_events.py b/tests/storage/test_censor_events.py new file mode 100644 index 0000000000..95a61063df --- /dev/null +++ b/tests/storage/test_censor_events.py @@ -0,0 +1,253 @@ +import io +import os +import time +from http import HTTPStatus +from typing import Optional, Tuple + +from twisted.test.proto_helpers import MemoryReactor +from twisted.web.resource import Resource + +from synapse.media.media_repository import ( + MediaRepository, +) +from synapse.rest import admin +from synapse.rest.client import login, media, room +from synapse.server import HomeServer +from synapse.types import JsonDict, UserID +from synapse.util import Clock + +from tests.test_utils import SMALL_PNG +from tests.unittest import HomeserverTestCase + + +class MediaDeletionOnRedactionCensorshipTests(HomeserverTestCase): + """Tests for deleting media attached to redacted events.""" + + servlets = [ + media.register_servlets, + login.register_servlets, + admin.register_servlets, + room.register_servlets, + ] + use_isolated_media_paths = True + + def default_config(self) -> JsonDict: + config = super().default_config() + config.setdefault("experimental_features", {}) + config["experimental_features"].update({"msc3911": {"enabled": True}}) + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + self.media_repo = hs.get_media_repository() + self.bad_user = self.register_user("bad_user", "hackme") + self.bad_user_tok = self.login("bad_user", "hackme") + self.user = self.register_user("user", "pass") + self.user_tok = self.login("user", "pass") + self.admin = self.register_user("admin", "admin_pass", admin=True) + self.admin_tok = self.login("admin", "admin_pass") + self.room = self.helper.create_room_as( + room_creator=self.admin, tok=self.admin_tok + ) + + def create_resource_dict(self) -> dict[str, Resource]: + resources = super().create_resource_dict() + resources["/_matrix/media"] = self.hs.get_media_repository_resource() + return resources + + def _redact_event( + self, + access_token: str, + room_id: str, + event_id: str, + expect_code: int = 200, + with_relations: Optional[list[str]] = None, + content: Optional[JsonDict] = None, + ) -> JsonDict: + """Helper function to send a redaction event. + + Returns the json body. + """ + path = "/_matrix/client/r0/rooms/%s/redact/%s" % (room_id, event_id) + + request_content = content or {} + if with_relations: + request_content["org.matrix.msc3912.with_relations"] = with_relations + + channel = self.make_request( + "POST", path, request_content, access_token=access_token + ) + assert channel.code == expect_code, channel.json_body + return channel.json_body + + def _create_test_resource(self) -> Tuple[list[str], list[str]]: + event_ids = [] + media_ids = [] + self.helper.join(self.room, self.user, tok=self.user_tok) + self.helper.join(self.room, self.bad_user, tok=self.bad_user_tok) + + for _ in range(3): + # Create restricted media + mxc_uri = self.get_success( + self.media_repo.create_or_update_content( + "image/png", + "test_png_upload", + io.BytesIO(SMALL_PNG), + 67, + UserID.from_string(self.bad_user), + restricted=True, + ) + ) + # Make sure media is saved + assert mxc_uri is not None + assert isinstance(self.media_repo, MediaRepository) + media_path = self.media_repo.filepaths.local_media_filepath( + mxc_uri.media_id + ) + self.assertTrue(os.path.exists(media_path)) + assert self.get_success(self.store.get_local_media(mxc_uri.media_id)) + media_ids.append(mxc_uri.media_id) + + # Bad user create events with media attached + channel = self.make_request( + "PUT", + f"/rooms/{self.room}/send/m.room.message/{str(time.time())}?org.matrix.msc3911.attach_media={str(mxc_uri)}", + content={"msgtype": "m.text", "body": "Hi, this is a message"}, + access_token=self.bad_user_tok, + ) + assert channel.code == HTTPStatus.OK, channel.json_body + assert "event_id" in channel.json_body + event_id = channel.json_body["event_id"] + event_ids.append(event_id) + + # Check media restrictions field has proper event_id + restrictions = self.get_success( + self.hs.get_datastores().main.get_media_restrictions( + mxc_uri.server_name, mxc_uri.media_id + ) + ) + assert restrictions is not None, str(restrictions) + assert restrictions.event_id == event_id + return media_ids, event_ids + + def test_get_attached_media_ids(self) -> None: + """Test db function `get_attached_media_ids`""" + # Create events with media attached + _, event_ids = self._create_test_resource() + + # Check if `get_attached_media_ids` can get media ids attached to an event + for event_id in event_ids: + attached_media_ids = self.get_success( + self.store.get_attached_media_ids(event_id) + ) + assert len(attached_media_ids) == 1 + + def test_redacting_media_deletes_attached_media(self) -> None: + """Test that the censor_redactions background task deletes media that was + redacted 7 days ago per the default configuration. + """ + # Create events with media attached + media_ids, event_ids = self._create_test_resource() + + # Redact the events + for event_id in event_ids: + self._redact_event( + self.admin_tok, + self.room, + event_id, + expect_code=200, + ) + # Confirm the events redaction + event_dict = self.helper.get_event(self.room, event_id, self.admin_tok) + assert "redacted_because" in event_dict, event_dict + + # Fast forward 7 days and 6 minutes to make sure the censor_redactions looping + # call detects the events are eligible for censorship. + self.reactor.advance(7 * 24 * 60 * 60 + 6 * 60) + + # Check if the media is deleted from storage. + for media_id in media_ids: + media = self.get_success(self.store.get_local_media(media_id)) + assert media is None + assert isinstance(self.media_repo, MediaRepository) + assert not os.path.exists( + self.media_repo.filepaths.local_media_filepath(media_id) + ) + + def test_normal_users_lose_access_to_media_right_after_redaction(self) -> None: + """Test that normal users lose access to media after the event they + were attached to has been redacted. + """ + # Create events with media attached + media_ids, event_ids = self._create_test_resource() + + # Redact the bad user's events + for event_id in event_ids: + self._redact_event( + self.admin_tok, + self.room, + event_id, + expect_code=200, + ) + event_dict = self.helper.get_event(self.room, event_id, self.admin_tok) + self.assertIn("redacted_because", event_dict, event_dict) + + # Normal user trying to access redacted media should get 404 + for media_id in media_ids: + channel = self.make_request( + "GET", + f"/_matrix/client/v1/media/download/{self.hs.hostname}/{media_id}", + shorthand=False, + access_token=self.user_tok, + ) + assert channel.code == 404, channel.json_body + + def test_moderators_still_have_access_to_media_after_redaction_until_permanent_deletion( + self, + ) -> None: + """Test that users with moderator privileges still have access to media + after the event they were attached to has been redacted. + """ + # Create events with media attached + media_ids, event_ids = self._create_test_resource() + + # Redact the bad user's events + for event_id in event_ids: + self._redact_event( + self.admin_tok, + self.room, + event_id, + expect_code=200, + ) + event_dict = self.helper.get_event(self.room, event_id, self.admin_tok) + self.assertIn("redacted_because", event_dict, event_dict) + + # User with moderator privileges still have access to redacted media + for media_id in media_ids: + channel = self.make_request( + "GET", + f"/_matrix/client/v1/media/download/{self.hs.hostname}/{media_id}?allow_redacted_media=true", + shorthand=False, + access_token=self.admin_tok, + ) + assert channel.code == 200, channel.json_body + + # After 7 days and 6 minutes, media that are attached to the redacted events + # should be permanently deleted from disk and moderators no longer have access to + # them + self.reactor.advance(7 * 24 * 60 * 60 + 6 * 60) + + for media_id in media_ids: + channel = self.make_request( + "GET", + f"/_matrix/client/v1/media/download/{self.hs.hostname}/{media_id}?allow_redacted_media=true", + shorthand=False, + access_token=self.admin_tok, + ) + assert channel.code == 404, channel.json_body + + # Check that the media has been deleted + deleted_media = self.get_success( + self.hs.get_datastores().main.get_local_media(media_id) + ) + assert deleted_media is None