Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ import org.apache.texera.service.util.S3StorageClient.{
MAXIMUM_NUM_OF_MULTIPART_S3_PARTS,
MINIMUM_NUM_OF_MULTIPART_S3_PART
}
import org.jooq.{DSLContext, EnumType}
import org.jooq.impl.DSL
import org.jooq.impl.DSL.{inline => inl}
import org.jooq.{DSLContext, EnumType}

import java.io.{InputStream, OutputStream}
import java.net.{HttpURLConnection, URI, URL, URLDecoder}
import java.nio.charset.StandardCharsets
Expand All @@ -81,6 +82,16 @@ object DatasetResource {
.getInstance()
.createDSLContext()

private def singleFileUploadMaxBytes(ctx: DSLContext, defaultMiB: Long = 20L): Long = {
val limit = ctx
.select(DSL.field("value", classOf[String]))
.from(DSL.table(DSL.name("texera_db", "site_settings")))
.where(DSL.field("key", classOf[String]).eq("single_file_upload_max_size_mib"))
.fetchOneInto(classOf[String])
Try(Option(limit).getOrElse(defaultMiB.toString).trim.toLong)
.getOrElse(defaultMiB) * 1024L * 1024L
}

/**
* Helper function to get the dataset from DB using did
*/
Expand Down Expand Up @@ -672,14 +683,16 @@ class DatasetResource {
@QueryParam("ownerEmail") ownerEmail: String,
@QueryParam("datasetName") datasetName: String,
@QueryParam("filePath") filePath: String,
@QueryParam("numParts") numParts: Optional[Integer],
@QueryParam("fileSizeBytes") fileSizeBytes: Optional[java.lang.Long],
@QueryParam("partSizeBytes") partSizeBytes: Optional[java.lang.Long],
@Auth user: SessionUser
): Response = {
val uid = user.getUid
val dataset: Dataset = getDatasetBy(ownerEmail, datasetName)

operationType.toLowerCase match {
case "init" => initMultipartUpload(dataset.getDid, filePath, numParts, uid)
case "init" =>
initMultipartUpload(dataset.getDid, filePath, fileSizeBytes, partSizeBytes, uid)
case "finish" => finishMultipartUpload(dataset.getDid, filePath, uid)
case "abort" => abortMultipartUpload(dataset.getDid, filePath, uid)
case _ =>
Expand Down Expand Up @@ -740,7 +753,55 @@ class DatasetResource {
if (session == null)
throw new NotFoundException("Upload session not found. Call type=init first.")

val expectedParts = session.getNumPartsRequested
val expectedParts: Int = session.getNumPartsRequested
val fileSizeBytesValue: Long = session.getFileSizeBytes
val partSizeBytesValue: Long = session.getPartSizeBytes

if (fileSizeBytesValue <= 0L) {
throw new WebApplicationException(
s"Upload session has an invalid file size of $fileSizeBytesValue. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
if (partSizeBytesValue <= 0L) {
throw new WebApplicationException(
s"Upload session has an invalid part size of $partSizeBytesValue. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}

// lastPartSize = fileSize - partSize*(expectedParts-1)
val nMinus1: Long = expectedParts.toLong - 1L
if (nMinus1 < 0L) {
throw new WebApplicationException(
s"Upload session has an invalid number of requested parts of $expectedParts. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
if (nMinus1 > 0L && partSizeBytesValue > Long.MaxValue / nMinus1) {
throw new WebApplicationException(
"Overflow while computing last part size",
Response.Status.INTERNAL_SERVER_ERROR
)
}
val prefixBytes: Long = partSizeBytesValue * nMinus1
if (prefixBytes > fileSizeBytesValue) {
throw new WebApplicationException(
s"Upload session is invalid: computed bytes before last part ($prefixBytes) exceed declared file size ($fileSizeBytesValue). Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
val lastPartSize: Long = fileSizeBytesValue - prefixBytes
if (lastPartSize <= 0L || lastPartSize > partSizeBytesValue) {
throw new WebApplicationException(
s"Upload session is invalid: computed last part size ($lastPartSize bytes) must be within 1..$partSizeBytesValue bytes. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}

val allowedSize: Long =
if (partNumber < expectedParts) partSizeBytesValue else lastPartSize

if (partNumber > expectedParts) {
throw new BadRequestException(
s"$partNumber exceeds the requested parts on init: $expectedParts"
Expand All @@ -754,10 +815,17 @@ class DatasetResource {
)
}

if (contentLength != allowedSize) {
throw new BadRequestException(
s"Invalid part size for partNumber=$partNumber. " +
s"Expected Content-Length=$allowedSize, got $contentLength."
)
}

val physicalAddr = Option(session.getPhysicalAddress).map(_.trim).getOrElse("")
if (physicalAddr.isEmpty) {
throw new WebApplicationException(
"Upload session is missing physicalAddress. Re-init the upload.",
"Upload session is missing physicalAddress. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
Expand All @@ -768,7 +836,7 @@ class DatasetResource {
catch {
case e: IllegalArgumentException =>
throw new WebApplicationException(
s"Upload session has invalid physicalAddress. Re-init the upload. (${e.getMessage})",
s"Upload session has invalid physicalAddress. Restart the upload. (${e.getMessage})",
Response.Status.INTERNAL_SERVER_ERROR
)
}
Expand Down Expand Up @@ -800,7 +868,7 @@ class DatasetResource {
if (partRow == null) {
// Should not happen if init pre-created rows
throw new WebApplicationException(
s"Part row not initialized for part $partNumber. Re-init the upload.",
s"Part row not initialized for part $partNumber. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
Expand Down Expand Up @@ -1410,21 +1478,11 @@ class DatasetResource {
dataset
}

private def validateAndNormalizeFilePathOrThrow(filePath: String): String = {
val path = Option(filePath).getOrElse("").replace("\\", "/")
if (
path.isEmpty ||
path.startsWith("/") ||
path.split("/").exists(seg => seg == "." || seg == "..") ||
path.exists(ch => ch == 0.toChar || ch < 0x20.toChar || ch == 0x7f.toChar)
) throw new BadRequestException("Invalid filePath")
path
}

private def initMultipartUpload(
did: Integer,
encodedFilePath: String,
numParts: Optional[Integer],
fileSizeBytes: Optional[java.lang.Long],
partSizeBytes: Optional[java.lang.Long],
uid: Integer
): Response = {

Expand All @@ -1441,12 +1499,63 @@ class DatasetResource {
URLDecoder.decode(encodedFilePath, StandardCharsets.UTF_8.name())
)

val numPartsValue = numParts.toScala.getOrElse {
throw new BadRequestException("numParts is required for initialization")
val fileSizeBytesValue: Long =
fileSizeBytes
.orElseThrow(() =>
new BadRequestException("fileSizeBytes is required for initialization")
)

if (fileSizeBytesValue <= 0L) {
throw new BadRequestException("fileSizeBytes must be > 0")
}

val partSizeBytesValue: Long =
partSizeBytes
.orElseThrow(() =>
new BadRequestException("partSizeBytes is required for initialization")
)

if (partSizeBytesValue <= 0L) {
throw new BadRequestException("partSizeBytes must be > 0")
}

// singleFileUploadMaxBytes applies to TOTAL bytes (sum of all parts == file size)
val totalMaxBytes: Long = singleFileUploadMaxBytes(ctx)
if (totalMaxBytes <= 0L) {
throw new WebApplicationException(
"singleFileUploadMaxBytes must be > 0",
Response.Status.INTERNAL_SERVER_ERROR
)
}
if (fileSizeBytesValue > totalMaxBytes) {
throw new BadRequestException(
s"fileSizeBytes=$fileSizeBytesValue exceeds singleFileUploadMaxBytes=$totalMaxBytes"
)
}

// Compute numParts = ceil(fileSize / partSize) = (fileSize + partSize - 1) / partSize
val addend: Long = partSizeBytesValue - 1L
if (addend < 0L || fileSizeBytesValue > Long.MaxValue - addend) {
throw new WebApplicationException(
"Overflow while computing numParts",
Response.Status.INTERNAL_SERVER_ERROR
)
}

val numPartsLong: Long = (fileSizeBytesValue + addend) / partSizeBytesValue
if (numPartsLong < 1L || numPartsLong > MAXIMUM_NUM_OF_MULTIPART_S3_PARTS.toLong) {
throw new BadRequestException(
s"Computed numParts=$numPartsLong is out of range 1..$MAXIMUM_NUM_OF_MULTIPART_S3_PARTS"
)
}
if (numPartsValue < 1 || numPartsValue > MAXIMUM_NUM_OF_MULTIPART_S3_PARTS) {
val numPartsValue: Int = numPartsLong.toInt

// S3 multipart constraint: all non-final parts must be >= 5MiB.
// If we have >1 parts, then partSizeBytesValue is the non-final part size.
if (numPartsValue > 1 && partSizeBytesValue < MINIMUM_NUM_OF_MULTIPART_S3_PART) {
throw new BadRequestException(
"numParts must be between 1 and " + MAXIMUM_NUM_OF_MULTIPART_S3_PARTS
s"partSizeBytes=$partSizeBytesValue is too small. " +
s"All non-final parts must be >= $MINIMUM_NUM_OF_MULTIPART_S3_PART bytes."
)
}

Expand Down Expand Up @@ -1478,7 +1587,6 @@ class DatasetResource {
val uploadIdStr = presign.getUploadId
val physicalAddr = presign.getPhysicalAddress

// If anything fails after this point, abort LakeFS multipart
try {
val rowsInserted = ctx
.insertInto(DATASET_UPLOAD_SESSION)
Expand All @@ -1487,7 +1595,9 @@ class DatasetResource {
.set(DATASET_UPLOAD_SESSION.UID, uid)
.set(DATASET_UPLOAD_SESSION.UPLOAD_ID, uploadIdStr)
.set(DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, physicalAddr)
.set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED, numPartsValue)
.set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED, Integer.valueOf(numPartsValue))
.set(DATASET_UPLOAD_SESSION.FILE_SIZE_BYTES, java.lang.Long.valueOf(fileSizeBytesValue))
.set(DATASET_UPLOAD_SESSION.PART_SIZE_BYTES, java.lang.Long.valueOf(partSizeBytesValue))
.onDuplicateKeyIgnore()
.execute()

Expand Down Expand Up @@ -1531,7 +1641,6 @@ class DatasetResource {
Response.ok().build()
} catch {
case e: Exception =>
// rollback will remove session + parts rows; we still must abort LakeFS
try {
LakeFSStorageClient.abortPresignedMultipartUploads(
repositoryName,
Expand Down Expand Up @@ -1597,7 +1706,7 @@ class DatasetResource {
val physicalAddr = Option(session.getPhysicalAddress).map(_.trim).getOrElse("")
if (physicalAddr.isEmpty) {
throw new WebApplicationException(
"Upload session is missing physicalAddress. Re-init the upload.",
"Upload session is missing physicalAddress. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
Expand All @@ -1620,7 +1729,7 @@ class DatasetResource {

if (totalCnt != expectedParts) {
throw new WebApplicationException(
s"Part table mismatch: expected $expectedParts rows but found $totalCnt. Re-init the upload.",
s"Part table mismatch: expected $expectedParts rows but found $totalCnt. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
Expand Down Expand Up @@ -1671,7 +1780,29 @@ class DatasetResource {
physicalAddr
)

// Cleanup: delete the session; parts are removed by ON DELETE CASCADE
// FINAL SERVER-SIDE SIZE CHECK (do not rely on init)
val actualSizeBytes =
Option(objectStats.getSizeBytes).map(_.longValue()).getOrElse(-1L)

if (actualSizeBytes <= 0L) {
throw new WebApplicationException(
"lakeFS did not return sizeBytes for completed multipart upload",
Response.Status.INTERNAL_SERVER_ERROR
)
}

val maxBytes = singleFileUploadMaxBytes(ctx)
val tooLarge = actualSizeBytes > maxBytes

if (tooLarge) {
try {
LakeFSStorageClient.resetObjectUploadOrDeletion(dataset.getRepositoryName, filePath)
} catch {
case _: Throwable => ()
}
}

// always cleanup session
ctx
.deleteFrom(DATASET_UPLOAD_SESSION)
.where(
Expand All @@ -1682,6 +1813,13 @@ class DatasetResource {
)
.execute()

if (tooLarge) {
throw new WebApplicationException(
s"Upload exceeded max size: actualSizeBytes=$actualSizeBytes maxBytes=$maxBytes",
Response.Status.REQUEST_ENTITY_TOO_LARGE
)
}

Response
.ok(
Map(
Expand Down Expand Up @@ -1741,7 +1879,7 @@ class DatasetResource {
val physicalAddr = Option(session.getPhysicalAddress).map(_.trim).getOrElse("")
if (physicalAddr.isEmpty) {
throw new WebApplicationException(
"Upload session is missing physicalAddress. Re-init the upload.",
"Upload session is missing physicalAddress. Restart the upload.",
Response.Status.INTERNAL_SERVER_ERROR
)
}
Expand Down
Loading
Loading