From f51ee8085781c33b2c038e655bc23e117c07d078 Mon Sep 17 00:00:00 2001 From: dnslin Date: Mon, 29 Dec 2025 15:04:35 +0800 Subject: [PATCH 1/6] feat(multipart): implement multipart upload functionality with session management --- internal/fs/multipart.go | 319 ++++++++++++++++++++++++++++++++++++ internal/model/multipart.go | 61 +++++++ server/handles/multipart.go | 144 ++++++++++++++++ server/router.go | 2 + 4 files changed, 526 insertions(+) create mode 100644 internal/fs/multipart.go create mode 100644 internal/model/multipart.go create mode 100644 server/handles/multipart.go diff --git a/internal/fs/multipart.go b/internal/fs/multipart.go new file mode 100644 index 000000000..4f172e12b --- /dev/null +++ b/internal/fs/multipart.go @@ -0,0 +1,319 @@ +package fs + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "sync" + "time" + + "github.com/OpenListTeam/OpenList/v4/internal/conf" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/stream" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" + "github.com/google/uuid" + pkgerrors "github.com/pkg/errors" +) + +const ( + // DefaultChunkSize is the default chunk size (5MB) + DefaultChunkSize = 5 * 1024 * 1024 + // SessionMaxLifetime is the maximum lifetime of a multipart upload session + SessionMaxLifetime = 2 * time.Hour +) + +// multipartSessionManager manages multipart upload sessions +type multipartSessionManager struct { + sessions map[string]*model.MultipartUploadSession + mu sync.RWMutex +} + +// Global multipart session manager +var MultipartSessionManager = NewMultipartSessionManager() + +// NewMultipartSessionManager creates a new session manager +func NewMultipartSessionManager() *multipartSessionManager { + return &multipartSessionManager{ + sessions: make(map[string]*model.MultipartUploadSession), + } +} + +// InitMultipartUpload initializes a new multipart upload session +func (m *multipartSessionManager) InitMultipartUpload( + ctx context.Context, + storageID uint, + dstDirPath string, + fileName string, + fileSize int64, + chunkSize int64, + contentType string, +) (*model.MultipartUploadSession, error) { + if chunkSize <= 0 { + chunkSize = DefaultChunkSize + } + if chunkSize < 1024 { + chunkSize = 1024 + } + + totalChunks := int((fileSize + chunkSize - 1) / chunkSize) + if totalChunks == 0 { + totalChunks = 1 + } + + uploadID := uuid.New().String() + + chunkDir := filepath.Join(conf.Conf.TempDir, "uploads", uploadID) + if err := utils.CreateNestedDirectory(chunkDir); err != nil { + return nil, pkgerrors.Wrap(err, "failed to create chunk directory") + } + + now := time.Now() + session := &model.MultipartUploadSession{ + UploadID: uploadID, + StorageID: storageID, + DstDirPath: dstDirPath, + FileName: fileName, + FileSize: fileSize, + ChunkSize: chunkSize, + TotalChunks: totalChunks, + ContentType: contentType, + ChunkDir: chunkDir, + UploadedChunks: make(map[int]model.ChunkInfo), + CreatedAt: now, + ExpiresAt: now.Add(SessionMaxLifetime), + } + + m.mu.Lock() + m.sessions[uploadID] = session + m.mu.Unlock() + + go m.cleanupSession(uploadID, session.ExpiresAt) + + return session, nil +} + +// UploadChunk uploads a single chunk +func (m *multipartSessionManager) UploadChunk( + ctx context.Context, + uploadID string, + chunkIndex int, + chunkSize int64, + reader io.Reader, + md5 string, +) (*model.ChunkUploadResp, error) { + session, err := m.GetSession(uploadID) + if err != nil { + return nil, err + } + + if chunkIndex < 0 || chunkIndex >= session.TotalChunks { + return nil, pkgerrors.New("chunk index out of range") + } + + // Idempotent: if chunk already uploaded, return success + if _, exists := session.UploadedChunks[chunkIndex]; exists { + return m.getUploadResponse(session), nil + } + + chunkFileName := fmt.Sprintf("%d.chunk", chunkIndex) + chunkFilePath := filepath.Join(session.ChunkDir, chunkFileName) + chunkFile, err := utils.CreateNestedFile(chunkFilePath) + if err != nil { + return nil, pkgerrors.Wrap(err, "failed to create chunk file") + } + defer chunkFile.Close() + + written, err := utils.CopyWithBuffer(chunkFile, reader) + if err != nil { + os.Remove(chunkFilePath) + return nil, pkgerrors.Wrap(err, "failed to write chunk") + } + + if written != chunkSize { + os.Remove(chunkFilePath) + return nil, pkgerrors.New("chunk size mismatch") + } + + m.mu.Lock() + session.UploadedChunks[chunkIndex] = model.ChunkInfo{ + Index: chunkIndex, + Size: chunkSize, + MD5: md5, + UploadedAt: time.Now(), + } + m.mu.Unlock() + + return m.getUploadResponse(session), nil +} + +// CompleteMultipartUpload merges all chunks and uploads to storage +func (m *multipartSessionManager) CompleteMultipartUpload( + ctx context.Context, + uploadID string, +) error { + session, err := m.GetSession(uploadID) + if err != nil { + return err + } + + if len(session.UploadedChunks) != session.TotalChunks { + return pkgerrors.New(fmt.Sprintf("incomplete upload: %d/%d chunks uploaded", + len(session.UploadedChunks), session.TotalChunks)) + } + + mergedReader, err := NewChunkMergedReader(session) + if err != nil { + return err + } + defer mergedReader.Close() + + fileStream := &stream.FileStream{ + Obj: &model.Object{ + Name: session.FileName, + Size: session.FileSize, + Modified: time.Now(), + }, + Reader: mergedReader, + Mimetype: session.ContentType, + } + + err = putDirectly(ctx, session.DstDirPath, fileStream) + if err != nil { + return err + } + + m.mu.Lock() + delete(m.sessions, uploadID) + m.mu.Unlock() + + m.cleanupSessionFiles(session) + return nil +} + +// GetSession retrieves a session by upload ID +func (m *multipartSessionManager) GetSession(uploadID string) (*model.MultipartUploadSession, error) { + m.mu.RLock() + session, exists := m.sessions[uploadID] + m.mu.RUnlock() + + if !exists { + return nil, pkgerrors.New("multipart upload session not found") + } + + if time.Now().After(session.ExpiresAt) { + m.cleanupSessionFiles(session) + m.mu.Lock() + delete(m.sessions, uploadID) + m.mu.Unlock() + return nil, pkgerrors.New("multipart upload session expired") + } + + return session, nil +} + +func (m *multipartSessionManager) getUploadResponse(session *model.MultipartUploadSession) *model.ChunkUploadResp { + indices := make([]int, 0, len(session.UploadedChunks)) + for i := range session.UploadedChunks { + indices = append(indices, i) + } + sort.Ints(indices) + + var totalBytes int64 + for _, info := range session.UploadedChunks { + totalBytes += info.Size + } + + return &model.ChunkUploadResp{ + ChunkIndex: indices[len(indices)-1], + UploadedChunks: indices, + UploadedBytes: totalBytes, + } +} + +func (m *multipartSessionManager) cleanupSessionFiles(session *model.MultipartUploadSession) { + if session.ChunkDir != "" { + os.RemoveAll(session.ChunkDir) + } +} + +func (m *multipartSessionManager) cleanupSession(uploadID string, expiresAt time.Time) { + time.Sleep(time.Until(expiresAt)) + + m.mu.Lock() + session, exists := m.sessions[uploadID] + if exists { + delete(m.sessions, uploadID) + m.mu.Unlock() + m.cleanupSessionFiles(session) + } else { + m.mu.Unlock() + } +} + +// ChunkMergedReader reads chunks in order and merges them +type ChunkMergedReader struct { + session *model.MultipartUploadSession + readers []io.ReadCloser + current int + currentReader io.Reader +} + +func NewChunkMergedReader(session *model.MultipartUploadSession) (*ChunkMergedReader, error) { + readers := make([]io.ReadCloser, session.TotalChunks) + + for i := 0; i < session.TotalChunks; i++ { + chunkFileName := fmt.Sprintf("%d.chunk", i) + chunkFilePath := filepath.Join(session.ChunkDir, chunkFileName) + + f, err := os.Open(chunkFilePath) + if err != nil { + for j := 0; j < i; j++ { + readers[j].Close() + } + return nil, pkgerrors.Wrap(err, "failed to open chunk file") + } + readers[i] = f + } + + return &ChunkMergedReader{ + session: session, + readers: readers, + current: 0, + currentReader: nil, + }, nil +} + +func (r *ChunkMergedReader) Read(p []byte) (n int, err error) { + for r.current < len(r.readers) { + if r.currentReader == nil { + r.currentReader = r.readers[r.current] + } + + n, err = r.currentReader.Read(p) + if n > 0 { + return n, err + } + + if err == io.EOF { + r.currentReader = nil + r.current++ + continue + } + + return n, err + } + + return 0, io.EOF +} + +func (r *ChunkMergedReader) Close() error { + for _, reader := range r.readers { + if reader != nil { + reader.Close() + } + } + return nil +} diff --git a/internal/model/multipart.go b/internal/model/multipart.go new file mode 100644 index 000000000..d0b56481c --- /dev/null +++ b/internal/model/multipart.go @@ -0,0 +1,61 @@ +package model + +import "time" + +// MultipartUploadSession represents a multipart upload session +type MultipartUploadSession struct { + UploadID string `json:"upload_id"` + UserID int64 `json:"user_id"` + StorageID uint `json:"storage_id"` + DstDirPath string `json:"dst_dir_path"` + FileName string `json:"file_name"` + FileSize int64 `json:"file_size"` + ChunkSize int64 `json:"chunk_size"` + TotalChunks int `json:"total_chunks"` + ContentType string `json:"content_type"` + ChunkDir string `json:"chunk_dir"` + UploadedChunks map[int]ChunkInfo `json:"uploaded_chunks"` + CreatedAt time.Time `json:"created_at"` + ExpiresAt time.Time `json:"expires_at"` +} + +// ChunkInfo represents information about an uploaded chunk +type ChunkInfo struct { + Index int `json:"index"` + Size int64 `json:"size"` + MD5 string `json:"md5,omitempty"` + UploadedAt time.Time `json:"uploaded_at"` +} + +// MultipartInitReq is the request body for initializing a multipart upload +type MultipartInitReq struct { + Path string `json:"path" form:"path"` + FileName string `json:"file_name" form:"file_name"` + FileSize int64 `json:"file_size" form:"file_size"` + ChunkSize int64 `json:"chunk_size" form:"chunk_size"` +} + +// MultipartInitResp is the response for initializing a multipart upload +type MultipartInitResp struct { + UploadID string `json:"upload_id"` + ChunkSize int64 `json:"chunk_size"` + TotalChunks int `json:"total_chunks"` +} + +// ChunkUploadResp is the response for uploading a chunk +type ChunkUploadResp struct { + ChunkIndex int `json:"chunk_index"` + UploadedChunks []int `json:"uploaded_chunks"` + UploadedBytes int64 `json:"uploaded_bytes"` +} + +// MultipartCompleteReq is the request body for completing a multipart upload +type MultipartCompleteReq struct { + UploadID string `json:"upload_id" form:"upload_id"` +} + +// MultipartCompleteResp is the response for completing a multipart upload +type MultipartCompleteResp struct { + Object *Obj `json:"object,omitempty"` + Task any `json:"task,omitempty"` +} diff --git a/server/handles/multipart.go b/server/handles/multipart.go new file mode 100644 index 000000000..a182e97c8 --- /dev/null +++ b/server/handles/multipart.go @@ -0,0 +1,144 @@ +package handles + +import ( + "io" + "net/url" + stdpath "path" + "strconv" + + "github.com/OpenListTeam/OpenList/v4/internal/conf" + "github.com/OpenListTeam/OpenList/v4/internal/fs" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" + "github.com/OpenListTeam/OpenList/v4/server/common" + "github.com/gin-gonic/gin" +) + +func FsMultipart(c *gin.Context) { + action := c.Query("action") + switch action { + case "init": + fsMultipartInit(c) + case "upload": + fsMultipartUpload(c) + case "complete": + fsMultipartComplete(c) + default: + common.ErrorStrResp(c, "invalid action", 400) + } +} + +func fsMultipartInit(c *gin.Context) { + var req model.MultipartInitReq + if err := c.ShouldBind(&req); err != nil { + common.ErrorResp(c, err, 400) + return + } + + path, err := url.PathUnescape(req.Path) + if err != nil { + common.ErrorResp(c, err, 400) + return + } + + user := c.Request.Context().Value(conf.UserKey).(*model.User) + path, err = user.JoinPath(path) + if err != nil { + common.ErrorResp(c, err, 403) + return + } + + storage, err := fs.GetStorage(path, &fs.GetStoragesArgs{}) + if err != nil { + common.ErrorResp(c, err, 400) + return + } + if storage.Config().NoUpload { + common.ErrorStrResp(c, "Current storage doesn't support upload", 405) + return + } + + dstDirPath, fileName := stdpath.Split(path) + mimetype := utils.GetMimeType(fileName) + + session, err := fs.MultipartSessionManager.InitMultipartUpload( + c.Request.Context(), + storage.GetStorage().ID, + dstDirPath, + fileName, + req.FileSize, + req.ChunkSize, + mimetype, + ) + if err != nil { + common.ErrorResp(c, err, 500) + return + } + + common.SuccessResp(c, model.MultipartInitResp{ + UploadID: session.UploadID, + ChunkSize: session.ChunkSize, + TotalChunks: session.TotalChunks, + }) +} + +func fsMultipartUpload(c *gin.Context) { + uploadID := c.GetHeader("X-Upload-Id") + if uploadID == "" { + common.ErrorStrResp(c, "missing X-Upload-Id header", 400) + return + } + + chunkIndexStr := c.GetHeader("X-Chunk-Index") + chunkIndex, err := strconv.Atoi(chunkIndexStr) + if err != nil { + common.ErrorStrResp(c, "invalid X-Chunk-Index header", 400) + return + } + + chunkSizeStr := c.GetHeader("X-Chunk-Size") + chunkSize, err := strconv.ParseInt(chunkSizeStr, 10, 64) + if err != nil { + common.ErrorStrResp(c, "invalid X-Chunk-Size header", 400) + return + } + + md5 := c.GetHeader("X-Chunk-Md5") + + resp, err := fs.MultipartSessionManager.UploadChunk( + c.Request.Context(), + uploadID, + chunkIndex, + chunkSize, + c.Request.Body, + md5, + ) + if err != nil { + common.ErrorResp(c, err, 500) + return + } + + io.ReadAll(c.Request.Body) + c.Request.Body.Close() + + common.SuccessResp(c, resp) +} + +func fsMultipartComplete(c *gin.Context) { + var req model.MultipartCompleteReq + if err := c.ShouldBind(&req); err != nil { + common.ErrorResp(c, err, 400) + return + } + + err := fs.MultipartSessionManager.CompleteMultipartUpload( + c.Request.Context(), + req.UploadID, + ) + if err != nil { + common.ErrorResp(c, err, 500) + return + } + + common.SuccessResp(c, gin.H{"success": true}) +} diff --git a/server/router.go b/server/router.go index 32e933de3..d4f5442dd 100644 --- a/server/router.go +++ b/server/router.go @@ -218,6 +218,8 @@ func _fs(g *gin.RouterGroup) { g.POST("/archive/decompress", handles.FsArchiveDecompress) // Direct upload (client-side upload to storage) g.POST("/get_direct_upload_info", middlewares.FsUp, handles.FsGetDirectUploadInfo) + // Multipart upload + g.POST("/multipart", middlewares.FsUp, handles.FsMultipart) } func _task(g *gin.RouterGroup) { From 6706e43eed965f03c987a03659fd92a651a7f182 Mon Sep 17 00:00:00 2001 From: dnslin Date: Mon, 29 Dec 2025 16:04:56 +0800 Subject: [PATCH 2/6] feat(multipart): enhance multipart upload with session management and error handling --- internal/fs/multipart.go | 142 ++++++++++++++++++------------- internal/model/multipart.go | 58 ++++--------- server/handles/multipart.go | 164 ++++++++++++++++++++++-------------- server/router.go | 2 +- 4 files changed, 198 insertions(+), 168 deletions(-) diff --git a/internal/fs/multipart.go b/internal/fs/multipart.go index 4f172e12b..30a138fac 100644 --- a/internal/fs/multipart.go +++ b/internal/fs/multipart.go @@ -13,44 +13,46 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/conf" "github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/internal/stream" + "github.com/OpenListTeam/OpenList/v4/internal/task" "github.com/OpenListTeam/OpenList/v4/pkg/utils" "github.com/google/uuid" pkgerrors "github.com/pkg/errors" ) const ( - // DefaultChunkSize is the default chunk size (5MB) - DefaultChunkSize = 5 * 1024 * 1024 - // SessionMaxLifetime is the maximum lifetime of a multipart upload session - SessionMaxLifetime = 2 * time.Hour + DefaultChunkSize = 5 * 1024 * 1024 + SessionMaxLifetime = 2 * time.Hour ) -// multipartSessionManager manages multipart upload sessions type multipartSessionManager struct { sessions map[string]*model.MultipartUploadSession mu sync.RWMutex } -// Global multipart session manager -var MultipartSessionManager = NewMultipartSessionManager() - -// NewMultipartSessionManager creates a new session manager -func NewMultipartSessionManager() *multipartSessionManager { - return &multipartSessionManager{ - sessions: make(map[string]*model.MultipartUploadSession), - } +var MultipartSessionManager = &multipartSessionManager{ + sessions: make(map[string]*model.MultipartUploadSession), } -// InitMultipartUpload initializes a new multipart upload session -func (m *multipartSessionManager) InitMultipartUpload( - ctx context.Context, - storageID uint, +// InitOrGetSession initializes a new session or returns existing one +func (m *multipartSessionManager) InitOrGetSession( + uploadID string, dstDirPath string, fileName string, fileSize int64, chunkSize int64, contentType string, + overwrite bool, ) (*model.MultipartUploadSession, error) { + // If uploadID provided, try to get existing session + if uploadID != "" { + session, err := m.GetSession(uploadID) + if err != nil { + return nil, err + } + return session, nil + } + + // Create new session if chunkSize <= 0 { chunkSize = DefaultChunkSize } @@ -63,17 +65,15 @@ func (m *multipartSessionManager) InitMultipartUpload( totalChunks = 1 } - uploadID := uuid.New().String() - - chunkDir := filepath.Join(conf.Conf.TempDir, "uploads", uploadID) + newUploadID := uuid.New().String() + chunkDir := filepath.Join(conf.Conf.TempDir, "multipart", newUploadID) if err := utils.CreateNestedDirectory(chunkDir); err != nil { return nil, pkgerrors.Wrap(err, "failed to create chunk directory") } now := time.Now() session := &model.MultipartUploadSession{ - UploadID: uploadID, - StorageID: storageID, + UploadID: newUploadID, DstDirPath: dstDirPath, FileName: fileName, FileSize: fileSize, @@ -82,27 +82,26 @@ func (m *multipartSessionManager) InitMultipartUpload( ContentType: contentType, ChunkDir: chunkDir, UploadedChunks: make(map[int]model.ChunkInfo), + Overwrite: overwrite, CreatedAt: now, ExpiresAt: now.Add(SessionMaxLifetime), } m.mu.Lock() - m.sessions[uploadID] = session + m.sessions[newUploadID] = session m.mu.Unlock() - go m.cleanupSession(uploadID, session.ExpiresAt) + go m.cleanupAfterExpiry(newUploadID, session.ExpiresAt) return session, nil } -// UploadChunk uploads a single chunk +// UploadChunk uploads a single chunk (idempotent) func (m *multipartSessionManager) UploadChunk( - ctx context.Context, uploadID string, chunkIndex int, chunkSize int64, reader io.Reader, - md5 string, ) (*model.ChunkUploadResp, error) { session, err := m.GetSession(uploadID) if err != nil { @@ -113,9 +112,13 @@ func (m *multipartSessionManager) UploadChunk( return nil, pkgerrors.New("chunk index out of range") } + m.mu.RLock() + _, exists := session.UploadedChunks[chunkIndex] + m.mu.RUnlock() + // Idempotent: if chunk already uploaded, return success - if _, exists := session.UploadedChunks[chunkIndex]; exists { - return m.getUploadResponse(session), nil + if exists { + return m.buildResponse(session), nil } chunkFileName := fmt.Sprintf("%d.chunk", chunkIndex) @@ -134,41 +137,40 @@ func (m *multipartSessionManager) UploadChunk( if written != chunkSize { os.Remove(chunkFilePath) - return nil, pkgerrors.New("chunk size mismatch") + return nil, pkgerrors.Errorf("chunk size mismatch: expected %d, got %d", chunkSize, written) } m.mu.Lock() session.UploadedChunks[chunkIndex] = model.ChunkInfo{ Index: chunkIndex, Size: chunkSize, - MD5: md5, UploadedAt: time.Now(), } m.mu.Unlock() - return m.getUploadResponse(session), nil + return m.buildResponse(session), nil } -// CompleteMultipartUpload merges all chunks and uploads to storage -func (m *multipartSessionManager) CompleteMultipartUpload( +// CompleteUpload merges all chunks and uploads to storage +func (m *multipartSessionManager) CompleteUpload( ctx context.Context, uploadID string, -) error { + asTask bool, +) (task.TaskExtensionInfo, error) { session, err := m.GetSession(uploadID) if err != nil { - return err + return nil, err } if len(session.UploadedChunks) != session.TotalChunks { - return pkgerrors.New(fmt.Sprintf("incomplete upload: %d/%d chunks uploaded", - len(session.UploadedChunks), session.TotalChunks)) + return nil, pkgerrors.Errorf("incomplete upload: %d/%d chunks uploaded", + len(session.UploadedChunks), session.TotalChunks) } - mergedReader, err := NewChunkMergedReader(session) + mergedReader, err := newChunkMergedReader(session) if err != nil { - return err + return nil, err } - defer mergedReader.Close() fileStream := &stream.FileStream{ Obj: &model.Object{ @@ -176,21 +178,31 @@ func (m *multipartSessionManager) CompleteMultipartUpload( Size: session.FileSize, Modified: time.Now(), }, - Reader: mergedReader, - Mimetype: session.ContentType, + Reader: mergedReader, + Mimetype: session.ContentType, + WebPutAsTask: asTask, + Closers: utils.NewClosers(mergedReader), + } + + var t task.TaskExtensionInfo + if asTask { + t, err = PutAsTask(ctx, session.DstDirPath, fileStream) + } else { + err = PutDirectly(ctx, session.DstDirPath, fileStream) } - err = putDirectly(ctx, session.DstDirPath, fileStream) if err != nil { - return err + mergedReader.Close() + return nil, err } + // Cleanup session m.mu.Lock() delete(m.sessions, uploadID) m.mu.Unlock() m.cleanupSessionFiles(session) - return nil + return t, nil } // GetSession retrieves a session by upload ID @@ -214,7 +226,10 @@ func (m *multipartSessionManager) GetSession(uploadID string) (*model.MultipartU return session, nil } -func (m *multipartSessionManager) getUploadResponse(session *model.MultipartUploadSession) *model.ChunkUploadResp { +func (m *multipartSessionManager) buildResponse(session *model.MultipartUploadSession) *model.ChunkUploadResp { + m.mu.RLock() + defer m.mu.RUnlock() + indices := make([]int, 0, len(session.UploadedChunks)) for i := range session.UploadedChunks { indices = append(indices, i) @@ -226,10 +241,17 @@ func (m *multipartSessionManager) getUploadResponse(session *model.MultipartUplo totalBytes += info.Size } + chunkIndex := -1 + if len(indices) > 0 { + chunkIndex = indices[len(indices)-1] + } + return &model.ChunkUploadResp{ - ChunkIndex: indices[len(indices)-1], + UploadID: session.UploadID, + ChunkIndex: chunkIndex, UploadedChunks: indices, UploadedBytes: totalBytes, + TotalChunks: session.TotalChunks, } } @@ -239,7 +261,7 @@ func (m *multipartSessionManager) cleanupSessionFiles(session *model.MultipartUp } } -func (m *multipartSessionManager) cleanupSession(uploadID string, expiresAt time.Time) { +func (m *multipartSessionManager) cleanupAfterExpiry(uploadID string, expiresAt time.Time) { time.Sleep(time.Until(expiresAt)) m.mu.Lock() @@ -254,14 +276,14 @@ func (m *multipartSessionManager) cleanupSession(uploadID string, expiresAt time } // ChunkMergedReader reads chunks in order and merges them -type ChunkMergedReader struct { - session *model.MultipartUploadSession - readers []io.ReadCloser - current int +type chunkMergedReader struct { + session *model.MultipartUploadSession + readers []io.ReadCloser + current int currentReader io.Reader } -func NewChunkMergedReader(session *model.MultipartUploadSession) (*ChunkMergedReader, error) { +func newChunkMergedReader(session *model.MultipartUploadSession) (*chunkMergedReader, error) { readers := make([]io.ReadCloser, session.TotalChunks) for i := 0; i < session.TotalChunks; i++ { @@ -278,15 +300,15 @@ func NewChunkMergedReader(session *model.MultipartUploadSession) (*ChunkMergedRe readers[i] = f } - return &ChunkMergedReader{ - session: session, - readers: readers, - current: 0, + return &chunkMergedReader{ + session: session, + readers: readers, + current: 0, currentReader: nil, }, nil } -func (r *ChunkMergedReader) Read(p []byte) (n int, err error) { +func (r *chunkMergedReader) Read(p []byte) (n int, err error) { for r.current < len(r.readers) { if r.currentReader == nil { r.currentReader = r.readers[r.current] @@ -309,7 +331,7 @@ func (r *ChunkMergedReader) Read(p []byte) (n int, err error) { return 0, io.EOF } -func (r *ChunkMergedReader) Close() error { +func (r *chunkMergedReader) Close() error { for _, reader := range r.readers { if reader != nil { reader.Close() diff --git a/internal/model/multipart.go b/internal/model/multipart.go index d0b56481c..9bfcc2f43 100644 --- a/internal/model/multipart.go +++ b/internal/model/multipart.go @@ -4,58 +4,32 @@ import "time" // MultipartUploadSession represents a multipart upload session type MultipartUploadSession struct { - UploadID string `json:"upload_id"` - UserID int64 `json:"user_id"` - StorageID uint `json:"storage_id"` - DstDirPath string `json:"dst_dir_path"` - FileName string `json:"file_name"` - FileSize int64 `json:"file_size"` - ChunkSize int64 `json:"chunk_size"` - TotalChunks int `json:"total_chunks"` - ContentType string `json:"content_type"` - ChunkDir string `json:"chunk_dir"` + UploadID string `json:"upload_id"` + DstDirPath string `json:"dst_dir_path"` + FileName string `json:"file_name"` + FileSize int64 `json:"file_size"` + ChunkSize int64 `json:"chunk_size"` + TotalChunks int `json:"total_chunks"` + ContentType string `json:"content_type"` + ChunkDir string `json:"chunk_dir"` UploadedChunks map[int]ChunkInfo `json:"uploaded_chunks"` - CreatedAt time.Time `json:"created_at"` - ExpiresAt time.Time `json:"expires_at"` + Overwrite bool `json:"overwrite"` + CreatedAt time.Time `json:"created_at"` + ExpiresAt time.Time `json:"expires_at"` } // ChunkInfo represents information about an uploaded chunk type ChunkInfo struct { Index int `json:"index"` Size int64 `json:"size"` - MD5 string `json:"md5,omitempty"` UploadedAt time.Time `json:"uploaded_at"` } -// MultipartInitReq is the request body for initializing a multipart upload -type MultipartInitReq struct { - Path string `json:"path" form:"path"` - FileName string `json:"file_name" form:"file_name"` - FileSize int64 `json:"file_size" form:"file_size"` - ChunkSize int64 `json:"chunk_size" form:"chunk_size"` -} - -// MultipartInitResp is the response for initializing a multipart upload -type MultipartInitResp struct { - UploadID string `json:"upload_id"` - ChunkSize int64 `json:"chunk_size"` - TotalChunks int `json:"total_chunks"` -} - // ChunkUploadResp is the response for uploading a chunk type ChunkUploadResp struct { - ChunkIndex int `json:"chunk_index"` - UploadedChunks []int `json:"uploaded_chunks"` - UploadedBytes int64 `json:"uploaded_bytes"` -} - -// MultipartCompleteReq is the request body for completing a multipart upload -type MultipartCompleteReq struct { - UploadID string `json:"upload_id" form:"upload_id"` -} - -// MultipartCompleteResp is the response for completing a multipart upload -type MultipartCompleteResp struct { - Object *Obj `json:"object,omitempty"` - Task any `json:"task,omitempty"` + UploadID string `json:"upload_id"` + ChunkIndex int `json:"chunk_index"` + UploadedChunks []int `json:"uploaded_chunks"` + UploadedBytes int64 `json:"uploaded_bytes"` + TotalChunks int `json:"total_chunks"` } diff --git a/server/handles/multipart.go b/server/handles/multipart.go index a182e97c8..8da9674df 100644 --- a/server/handles/multipart.go +++ b/server/handles/multipart.go @@ -7,6 +7,7 @@ import ( "strconv" "github.com/OpenListTeam/OpenList/v4/internal/conf" + "github.com/OpenListTeam/OpenList/v4/internal/errs" "github.com/OpenListTeam/OpenList/v4/internal/fs" "github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/pkg/utils" @@ -17,25 +18,26 @@ import ( func FsMultipart(c *gin.Context) { action := c.Query("action") switch action { - case "init": - fsMultipartInit(c) case "upload": fsMultipartUpload(c) case "complete": fsMultipartComplete(c) default: - common.ErrorStrResp(c, "invalid action", 400) + common.ErrorStrResp(c, "invalid action, must be 'upload' or 'complete'", 400) } } -func fsMultipartInit(c *gin.Context) { - var req model.MultipartInitReq - if err := c.ShouldBind(&req); err != nil { - common.ErrorResp(c, err, 400) - return - } - - path, err := url.PathUnescape(req.Path) +func fsMultipartUpload(c *gin.Context) { + defer func() { + if n, _ := io.ReadFull(c.Request.Body, []byte{0}); n == 1 { + _, _ = utils.CopyWithBuffer(io.Discard, c.Request.Body) + } + _ = c.Request.Body.Close() + }() + + // Get File-Path header (already validated by FsUp middleware) + path := c.GetHeader("File-Path") + path, err := url.PathUnescape(path) if err != nil { common.ErrorResp(c, err, 400) return @@ -48,97 +50,129 @@ func fsMultipartInit(c *gin.Context) { return } - storage, err := fs.GetStorage(path, &fs.GetStoragesArgs{}) - if err != nil { - common.ErrorResp(c, err, 400) - return - } - if storage.Config().NoUpload { - common.ErrorStrResp(c, "Current storage doesn't support upload", 405) + dir, name := stdpath.Split(path) + if shouldIgnoreSystemFile(name) { + common.ErrorStrResp(c, errs.IgnoredSystemFile.Error(), 403) return } - dstDirPath, fileName := stdpath.Split(path) - mimetype := utils.GetMimeType(fileName) - - session, err := fs.MultipartSessionManager.InitMultipartUpload( - c.Request.Context(), - storage.GetStorage().ID, - dstDirPath, - fileName, - req.FileSize, - req.ChunkSize, - mimetype, - ) - if err != nil { - common.ErrorResp(c, err, 500) - return - } - - common.SuccessResp(c, model.MultipartInitResp{ - UploadID: session.UploadID, - ChunkSize: session.ChunkSize, - TotalChunks: session.TotalChunks, - }) -} - -func fsMultipartUpload(c *gin.Context) { + // Get upload ID (optional for first chunk) uploadID := c.GetHeader("X-Upload-Id") - if uploadID == "" { - common.ErrorStrResp(c, "missing X-Upload-Id header", 400) - return - } + // Get chunk index (required) chunkIndexStr := c.GetHeader("X-Chunk-Index") chunkIndex, err := strconv.Atoi(chunkIndexStr) if err != nil { - common.ErrorStrResp(c, "invalid X-Chunk-Index header", 400) + common.ErrorStrResp(c, "invalid or missing X-Chunk-Index header", 400) return } - chunkSizeStr := c.GetHeader("X-Chunk-Size") - chunkSize, err := strconv.ParseInt(chunkSizeStr, 10, 64) - if err != nil { - common.ErrorStrResp(c, "invalid X-Chunk-Size header", 400) + // Get chunk size from Content-Length + chunkSize := c.Request.ContentLength + if chunkSize <= 0 { + common.ErrorStrResp(c, "missing Content-Length header", 400) return } - md5 := c.GetHeader("X-Chunk-Md5") + // For first chunk (no uploadID), need file size and chunk size + var session *model.MultipartUploadSession + if uploadID == "" { + // First chunk - initialize session + fileSizeStr := c.GetHeader("X-File-Size") + fileSize, err := strconv.ParseInt(fileSizeStr, 10, 64) + if err != nil || fileSize <= 0 { + common.ErrorStrResp(c, "invalid or missing X-File-Size header for first chunk", 400) + return + } + + expectedChunkSizeStr := c.GetHeader("X-Chunk-Size") + expectedChunkSize, err := strconv.ParseInt(expectedChunkSizeStr, 10, 64) + if err != nil || expectedChunkSize <= 0 { + common.ErrorStrResp(c, "invalid or missing X-Chunk-Size header for first chunk", 400) + return + } + + overwrite := c.GetHeader("Overwrite") != "false" + + // Check if file exists when not overwriting + if !overwrite { + if res, _ := fs.Get(c.Request.Context(), path, &fs.GetArgs{NoLog: true}); res != nil { + common.ErrorStrResp(c, "file exists", 403) + return + } + } + + mimetype := c.GetHeader("Content-Type") + if mimetype == "" || mimetype == "application/octet-stream" { + mimetype = utils.GetMimeType(name) + } + + session, err = fs.MultipartSessionManager.InitOrGetSession( + "", + dir, + name, + fileSize, + expectedChunkSize, + mimetype, + overwrite, + ) + if err != nil { + common.ErrorResp(c, err, 500) + return + } + } else { + // Subsequent chunk - get existing session + session, err = fs.MultipartSessionManager.InitOrGetSession( + uploadID, + "", "", 0, 0, "", false, + ) + if err != nil { + common.ErrorResp(c, err, 400) + return + } + } + // Upload chunk resp, err := fs.MultipartSessionManager.UploadChunk( - c.Request.Context(), - uploadID, + session.UploadID, chunkIndex, chunkSize, c.Request.Body, - md5, ) if err != nil { common.ErrorResp(c, err, 500) return } - io.ReadAll(c.Request.Body) - c.Request.Body.Close() - common.SuccessResp(c, resp) } func fsMultipartComplete(c *gin.Context) { - var req model.MultipartCompleteReq - if err := c.ShouldBind(&req); err != nil { - common.ErrorResp(c, err, 400) + uploadID := c.GetHeader("X-Upload-Id") + if uploadID == "" { + common.ErrorStrResp(c, "missing X-Upload-Id header", 400) return } - err := fs.MultipartSessionManager.CompleteMultipartUpload( + asTask := c.GetHeader("As-Task") == "true" + + t, err := fs.MultipartSessionManager.CompleteUpload( c.Request.Context(), - req.UploadID, + uploadID, + asTask, ) if err != nil { common.ErrorResp(c, err, 500) return } - common.SuccessResp(c, gin.H{"success": true}) + if t == nil { + common.SuccessResp(c, gin.H{"success": true}) + return + } + + common.SuccessResp(c, gin.H{ + "success": true, + "task": getTaskInfo(t), + }) } diff --git a/server/router.go b/server/router.go index d4f5442dd..afd483a69 100644 --- a/server/router.go +++ b/server/router.go @@ -219,7 +219,7 @@ func _fs(g *gin.RouterGroup) { // Direct upload (client-side upload to storage) g.POST("/get_direct_upload_info", middlewares.FsUp, handles.FsGetDirectUploadInfo) // Multipart upload - g.POST("/multipart", middlewares.FsUp, handles.FsMultipart) + g.PUT("/multipart", middlewares.FsUp, uploadLimiter, handles.FsMultipart) } func _task(g *gin.RouterGroup) { From b584465732faec02fe962f292cfa054d0e1dd496 Mon Sep 17 00:00:00 2001 From: DnsLin <49158572+dnslin@users.noreply.github.com> Date: Mon, 29 Dec 2025 16:24:45 +0800 Subject: [PATCH 3/6] Update server/handles/multipart.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: DnsLin <49158572+dnslin@users.noreply.github.com> --- server/handles/multipart.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/handles/multipart.go b/server/handles/multipart.go index 8da9674df..efc9f1003 100644 --- a/server/handles/multipart.go +++ b/server/handles/multipart.go @@ -97,7 +97,7 @@ func fsMultipartUpload(c *gin.Context) { // Check if file exists when not overwriting if !overwrite { if res, _ := fs.Get(c.Request.Context(), path, &fs.GetArgs{NoLog: true}); res != nil { - common.ErrorStrResp(c, "file exists", 403) + common.ErrorStrResp(c, "file already exists and overwrite is disabled", 403) return } } From 692de85813af31f7456a1a445b2cdd0019437dd1 Mon Sep 17 00:00:00 2001 From: DnsLin <49158572+dnslin@users.noreply.github.com> Date: Mon, 29 Dec 2025 16:25:04 +0800 Subject: [PATCH 4/6] Update internal/fs/multipart.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: DnsLin <49158572+dnslin@users.noreply.github.com> --- internal/fs/multipart.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/internal/fs/multipart.go b/internal/fs/multipart.go index 30a138fac..c504e585a 100644 --- a/internal/fs/multipart.go +++ b/internal/fs/multipart.go @@ -241,14 +241,8 @@ func (m *multipartSessionManager) buildResponse(session *model.MultipartUploadSe totalBytes += info.Size } - chunkIndex := -1 - if len(indices) > 0 { - chunkIndex = indices[len(indices)-1] - } - return &model.ChunkUploadResp{ UploadID: session.UploadID, - ChunkIndex: chunkIndex, UploadedChunks: indices, UploadedBytes: totalBytes, TotalChunks: session.TotalChunks, From 5900f2acc811dfc0f7e1d198d1cef0abf29d3d47 Mon Sep 17 00:00:00 2001 From: DnsLin <49158572+dnslin@users.noreply.github.com> Date: Mon, 29 Dec 2025 16:25:52 +0800 Subject: [PATCH 5/6] Update internal/fs/multipart.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: DnsLin <49158572+dnslin@users.noreply.github.com> --- internal/fs/multipart.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/fs/multipart.go b/internal/fs/multipart.go index c504e585a..80db8f5bb 100644 --- a/internal/fs/multipart.go +++ b/internal/fs/multipart.go @@ -162,12 +162,16 @@ func (m *multipartSessionManager) CompleteUpload( return nil, err } + // Protect access to session.UploadedChunks to avoid races with concurrent chunk uploads. + m.mu.RLock() if len(session.UploadedChunks) != session.TotalChunks { + m.mu.RUnlock() return nil, pkgerrors.Errorf("incomplete upload: %d/%d chunks uploaded", len(session.UploadedChunks), session.TotalChunks) } mergedReader, err := newChunkMergedReader(session) + m.mu.RUnlock() if err != nil { return nil, err } From a36849f546adb3381ea7a09a42a4ba7c3e8b9c30 Mon Sep 17 00:00:00 2001 From: DnsLin <49158572+dnslin@users.noreply.github.com> Date: Mon, 29 Dec 2025 16:26:21 +0800 Subject: [PATCH 6/6] Update internal/fs/multipart.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: DnsLin <49158572+dnslin@users.noreply.github.com> --- internal/fs/multipart.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/internal/fs/multipart.go b/internal/fs/multipart.go index 80db8f5bb..9c3d8013e 100644 --- a/internal/fs/multipart.go +++ b/internal/fs/multipart.go @@ -135,15 +135,26 @@ func (m *multipartSessionManager) UploadChunk( return nil, pkgerrors.Wrap(err, "failed to write chunk") } - if written != chunkSize { - os.Remove(chunkFilePath) - return nil, pkgerrors.Errorf("chunk size mismatch: expected %d, got %d", chunkSize, written) + isLastChunk := chunkIndex == session.TotalChunks-1 + if isLastChunk { + // For the last chunk, allow a smaller size (the file may not divide evenly by the chunk size), + // but still reject chunks that exceed the expected size. + if written > chunkSize { + os.Remove(chunkFilePath) + return nil, pkgerrors.Errorf("chunk size mismatch: expected at most %d, got %d", chunkSize, written) + } + } else { + // For non-final chunks, enforce strict equality with the expected chunk size. + if written != chunkSize { + os.Remove(chunkFilePath) + return nil, pkgerrors.Errorf("chunk size mismatch: expected %d, got %d", chunkSize, written) + } } m.mu.Lock() session.UploadedChunks[chunkIndex] = model.ChunkInfo{ Index: chunkIndex, - Size: chunkSize, + Size: written, UploadedAt: time.Now(), } m.mu.Unlock()