Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 119 additions & 14 deletions synapse/media/media_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import twisted.web.http
from twisted.internet.defer import Deferred

from synapse import event_auth
from synapse.api.constants import EventTypes, HistoryVisibility, Membership
from synapse.api.errors import (
Codes,
Expand Down Expand Up @@ -70,13 +71,17 @@
from synapse.media.thumbnailer import Thumbnailer, ThumbnailError
from synapse.media.url_previewer import UrlPreviewer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.media import ReplicationCopyMediaServlet
from synapse.replication.http.media import (
ReplicationCopyMediaServlet,
ReplicationDeleteMediaServlet,
)
from synapse.state import CREATE_KEY, POWER_KEY
from synapse.storage.databases.main.media_repository import (
LocalMedia,
MediaRestrictions,
RemoteMedia,
)
from synapse.types import JsonDict, Requester, UserID
from synapse.types import JsonDict, RedactedMediaBypass, Requester, UserID
from synapse.types.state import StateFilter
from synapse.util import json_decoder
from synapse.util.async_helpers import Linearizer
Expand Down Expand Up @@ -111,6 +116,7 @@ def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.server_name = hs.hostname
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self._is_mine_server_name = hs.is_mine_server_name
self.msc3911_config = hs.config.experimental.msc3911

Expand Down Expand Up @@ -183,6 +189,13 @@ async def copy_media(
"Sorry Mario, your MediaRepository related function is in another castle"
)

async def _remove_local_media_from_disk(
self, media_ids: List[str]
) -> Tuple[List[str], int]:
raise NotImplementedError(
"Sorry Mario, your MediaRepository related function is in another castle"
)

async def reached_pending_media_limit(self, auth_user: UserID) -> Tuple[bool, int]:
raise NotImplementedError(
"Sorry Mario, your MediaRepository related function is in another castle"
Expand Down Expand Up @@ -241,6 +254,7 @@ async def get_local_media(
requester: Optional[Requester] = None,
allow_authenticated: bool = True,
federation: bool = False,
allow_redacted_media: bool = False,
) -> None:
raise NotImplementedError(
"Sorry Mario, your MediaRepository related function is in another castle"
Expand Down Expand Up @@ -312,7 +326,10 @@ async def validate_media_restriction(
return attachments

async def is_media_visible(
self, requesting_user: UserID, media_info_object: Union[LocalMedia, RemoteMedia]
self,
requesting_user: UserID,
media_info_object: Union[LocalMedia, RemoteMedia],
redacted_media_bypass_config: Optional[RedactedMediaBypass] = None,
) -> None:
"""
Verify that media requested for download should be visible to the user making
Expand Down Expand Up @@ -349,6 +366,46 @@ async def is_media_visible(

if attached_event_id:
event_base = await self.store.get_event(attached_event_id)
if event_base.internal_metadata.is_redacted():
if (
not redacted_media_bypass_config
or not redacted_media_bypass_config.requesting_bypass
):
# If the event the media is attached to is redacted, don't serve that
# media to the user. Moderators and admins should probably be excluded
# from this restriction
raise NotFoundError()

# Which means a bypass was requested
if redacted_media_bypass_config.is_admin:
# System admins get to bypass the rest of the checks
return
else:
# Not an admin, let's check they have a high enough power level.
# Lifted this directly from RoomEventServlet for msc2815
auth_events = (
await self._storage_controllers.state.get_current_state(
event_base.room_id,
StateFilter.from_types(
[
POWER_KEY,
CREATE_KEY,
]
),
)
)

redact_level = event_auth.get_named_level(auth_events, "redact", 50)
user_level = event_auth.get_user_power_level(
requesting_user.to_string(), auth_events
)
if user_level < redact_level:
raise SynapseError(
403,
"You don't have permission to view redacted events in this room.",
errcode=Codes.FORBIDDEN,
)

if event_base.is_state():
# The standard event visibility utility, filter_events_for_client(),
# does not seem to meet the needs of a good UX when restricting and
Expand Down Expand Up @@ -534,6 +591,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
# initialize replication endpoint here
self.copy_media_client = ReplicationCopyMediaServlet.make_client(hs)
self.delete_media_client = ReplicationDeleteMediaServlet.make_client(hs)

async def copy_media(
self, existing_mxc: MXCUri, auth_user: UserID, max_timeout_ms: int
Expand All @@ -550,6 +608,18 @@ async def copy_media(
)
return MXCUri.from_str(result["content_uri"])

async def _remove_local_media_from_disk(
self, media_ids: List[str]
) -> Tuple[List[str], int]:
"""
Call out to the worker responsible for handling media to delete this media object
"""
result = await self.delete_media_client(
instance_name=self.hs.config.worker.workers_doing_media_duty[0],
media_ids=media_ids,
)
return result["deleted"], result["count"]


class MediaRepository(AbstractMediaRepository):
def __init__(self, hs: "HomeServer"):
Expand Down Expand Up @@ -944,7 +1014,11 @@ def respond_not_yet_uploaded(self, request: SynapseRequest) -> None:
)

async def get_local_media_info(
self, request: SynapseRequest, media_id: str, max_timeout_ms: int
self,
request: SynapseRequest,
media_id: str,
max_timeout_ms: int,
allow_redacted_media: bool = False,
) -> Optional[LocalMedia]:
"""Gets the info dictionary for given local media ID. If the media has
not been uploaded yet, this function will wait up to ``max_timeout_ms``
Expand All @@ -956,6 +1030,7 @@ async def get_local_media_info(
the file_id for local content.)
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
allow_redacted_media:

Returns:
Either the info dictionary for the given local media ID or
Expand All @@ -979,7 +1054,17 @@ async def get_local_media_info(
# The file has been uploaded, so stop looping
if media_info.media_length is not None:
if isinstance(request.requester, Requester):
await self.is_media_visible(request.requester.user, media_info)
# Only check media visibility if this is for a local request
is_admin = await self.auth.is_server_admin(request.requester)
redacted_media_bypass_config = RedactedMediaBypass(
allow_redacted_media, is_admin
)

await self.is_media_visible(
request.requester.user,
media_info,
redacted_media_bypass_config,
)
return media_info

# Check if the media ID has expired and still hasn't been uploaded to.
Expand Down Expand Up @@ -1008,6 +1093,7 @@ async def get_local_media(
requester: Optional[Requester] = None,
allow_authenticated: bool = True,
federation: bool = False,
allow_redacted_media: bool = False,
) -> None:
"""Responds to requests for local media, if exists, or returns 404.

Expand All @@ -1019,6 +1105,7 @@ async def get_local_media(
the filename in the Content-Disposition header of the response.
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
allow_redacted_media:
requester: The user making the request, to verify restricted media. Only
used for local users, not over federation
allow_authenticated: whether media marked as authenticated may be served to this request
Expand All @@ -1027,7 +1114,9 @@ async def get_local_media(
Returns:
Resolves once a response has successfully been written to request
"""
media_info = await self.get_local_media_info(request, media_id, max_timeout_ms)
media_info = await self.get_local_media_info(
request, media_id, max_timeout_ms, allow_redacted_media
)
if not media_info:
return

Expand All @@ -1039,10 +1128,6 @@ async def get_local_media(
# if MSC3911 is enabled, check visibility of the media for the user and retrieve
# any restrictions
if self.msc3911_config.enabled:
if requester is not None:
# Only check media visibility if this is for a local request. This will
# raise directly back to the client if not visible
await self.is_media_visible(requester.user, media_info)
restrictions = await self.validate_media_restriction(
request, media_info, None, federation
)
Expand Down Expand Up @@ -1096,6 +1181,7 @@ async def get_remote_media(
use_federation_endpoint: bool,
requester: Optional[Requester] = None,
allow_authenticated: bool = True,
allow_redacted_media: bool = False,
) -> None:
"""Respond to requests for remote media.

Expand All @@ -1114,6 +1200,7 @@ async def get_remote_media(
used for local users, not over federation
allow_authenticated: whether media marked as authenticated may be served to this
request
allow_redacted_media:

Returns:
Resolves once a response has successfully been written to request
Expand Down Expand Up @@ -1146,7 +1233,8 @@ async def get_remote_media(
ip_address,
use_federation_endpoint,
allow_authenticated,
requester,
allow_redacted_media=allow_redacted_media,
requester=requester,
)

# Check if the media is cached on the client, if so return 304. We need
Expand Down Expand Up @@ -1182,6 +1270,7 @@ async def get_remote_media_info(
use_federation: bool,
allow_authenticated: bool,
requester: Optional[Requester] = None,
allow_redacted_media: bool = False,
) -> RemoteMedia:
"""Gets the media info associated with the remote file, downloading
if necessary.
Expand All @@ -1198,6 +1287,7 @@ async def get_remote_media_info(
request
requester: The user making the request, to verify restricted media. Only
used for local users, not over federation
allow_redacted_media:

Returns:
The media info of the file
Expand All @@ -1220,7 +1310,8 @@ async def get_remote_media_info(
ip_address,
use_federation,
allow_authenticated,
requester,
allow_redacted_media=allow_redacted_media,
requester=requester,
)

# Ensure we actually use the responder so that it releases resources
Expand All @@ -1239,6 +1330,7 @@ async def _get_remote_media_impl(
ip_address: str,
use_federation_endpoint: bool,
allow_authenticated: bool,
allow_redacted_media: bool = False,
requester: Optional[Requester] = None,
) -> Tuple[Optional[Responder], RemoteMedia]:
"""Looks for media in local cache, if not there then attempt to
Expand All @@ -1256,6 +1348,7 @@ async def _get_remote_media_impl(
use_federation_endpoint: whether to request the remote media over the new federation
/download endpoint
allow_authenticated:
allow_redacted_media:
requester: The user making the request, to verify restricted media. Only
used for local users, not over federation

Expand All @@ -1276,8 +1369,14 @@ async def _get_remote_media_impl(
# exists in the local database and again further down for after it was
# retrieved from the remote.
if self.msc3911_config.enabled and requester is not None:
is_admin = await self.auth.is_server_admin(requester)
redacted_media_bypass_config = RedactedMediaBypass(
allow_redacted_media, is_admin
)
# This will raise directly back to the client if not visible
await self.is_media_visible(requester.user, media_info)
await self.is_media_visible(
requester.user, media_info, redacted_media_bypass_config
)

# file_id is the ID we use to track the file locally. If we've already
# seen the file then reuse the existing ID, otherwise generate a new
Expand Down Expand Up @@ -1336,8 +1435,14 @@ async def _get_remote_media_impl(
and self.msc3911_config.enabled
and requester is not None
):
is_admin = await self.auth.is_server_admin(requester)
redacted_media_bypass_config = RedactedMediaBypass(
allow_redacted_media, is_admin
)
# This will raise directly back to the client if not visible
await self.is_media_visible(requester.user, media_info)
await self.is_media_visible(
requester.user, media_info, redacted_media_bypass_config
)

file_id = media_info.filesystem_id

Expand Down
41 changes: 41 additions & 0 deletions synapse/replication/http/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,46 @@ async def _handle_request( # type: ignore[override]
return 200, {"content_uri": str(mxc_uri)}


class ReplicationDeleteMediaServlet(ReplicationEndpoint):
"""Request the MediaRepository to delete a piece of media from filesystem.

Request format:

DELETE /_synapse/replication/delete_media

{
"media_ids": [...], # List of media IDs to delete
}

"""

NAME = "delete_media"
PATH_ARGS = ()

def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.media_repo = hs.get_media_repository()

@staticmethod
async def _serialize_payload( # type: ignore[override]
media_ids: list[str],
) -> JsonDict:
"""
Args:
media_ids: The list of media IDs to delete.
"""
return {"media_ids": media_ids}

async def _handle_request( # type: ignore[override]
self,
request: Request,
content: JsonDict,
) -> Tuple[int, JsonDict]:
media_ids = content["media_ids"]
deleted, count = await self.media_repo._remove_local_media_from_disk(media_ids)
return 200, {"deleted": deleted, "count": count}


def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationCopyMediaServlet(hs).register(http_server)
ReplicationDeleteMediaServlet(hs).register(http_server)
Loading
Loading