Skip to content

Commit

Permalink
feat: refactor task module
Browse files Browse the repository at this point in the history
  • Loading branch information
xhofe committed Nov 20, 2023
1 parent de9647a commit 11a30c5
Show file tree
Hide file tree
Showing 14 changed files with 405 additions and 377 deletions.
3 changes: 3 additions & 0 deletions go.mod
Expand Up @@ -123,6 +123,7 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.3.0 // indirect
github.com/jaevor/go-nanoid v1.3.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down Expand Up @@ -183,6 +184,8 @@ require (
github.com/u2takey/go-utils v0.3.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xhofe/gsync v0.0.0-20230917091818-2111ceb38a25 // indirect
github.com/xhofe/tache v0.0.0-20231120085916-722855be0521 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.etcd.io/bbolt v1.3.7 // indirect
golang.org/x/arch v0.3.0 // indirect
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Expand Up @@ -221,6 +221,8 @@ github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZ
github.com/jackc/pgx/v5 v5.3.0 h1:/NQi8KHMpKWHInxXesC8yD4DhkXPrVhmnwYkjp9AmBA=
github.com/jackc/pgx/v5 v5.3.0/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8=
github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jaevor/go-nanoid v1.3.0 h1:nD+iepesZS6pr3uOVf20vR9GdGgJW1HPaR46gtrxzkg=
github.com/jaevor/go-nanoid v1.3.0/go.mod h1:SI+jFaPuddYkqkVQoNGHs81navCtH388TcrH0RqFKgY=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
Expand Down Expand Up @@ -433,6 +435,16 @@ github.com/winfsp/cgofuse v1.5.1-0.20230130140708-f87f5db493b5 h1:jxZvjx8Ve5sOXo
github.com/winfsp/cgofuse v1.5.1-0.20230130140708-f87f5db493b5/go.mod h1:uxjoF2jEYT3+x+vC2KJddEGdk/LU8pRowXmyVMHSV5I=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xhofe/gsync v0.0.0-20230917091818-2111ceb38a25 h1:eDfebW/yfq9DtG9RO3KP7BT2dot2CvJGIvrB0NEoDXI=
github.com/xhofe/gsync v0.0.0-20230917091818-2111ceb38a25/go.mod h1:fH4oNm5F9NfI5dLi0oIMtsLNKQOirUDbEMCIBb/7SU0=
github.com/xhofe/tache v0.0.0-20231110075853-2bd4b52dad9b h1:958N/31ioR0QSg6RarX1aqBsfmlOI2JeYiVzxeGdUAA=
github.com/xhofe/tache v0.0.0-20231110075853-2bd4b52dad9b/go.mod h1:1ISbKrHZNMMrXvgCdaFV0Vkc9Wbo7WV1q7Teovm4Huc=
github.com/xhofe/tache v0.0.0-20231119124711-c417893fc267 h1:MC271sH8UHYqr/IDz9PsqTlyD51HyFvxtQRTemwxR9s=
github.com/xhofe/tache v0.0.0-20231119124711-c417893fc267/go.mod h1:iKumPFvywf30FRpAHHCt64G0JHLMzT0K+wyGedHsmTQ=
github.com/xhofe/tache v0.0.0-20231120064353-a3585a237e25 h1:XZBuEzDB9Kqni/+zAKxl30iOdp80/GavUsCkPMiQMjg=
github.com/xhofe/tache v0.0.0-20231120064353-a3585a237e25/go.mod h1:iKumPFvywf30FRpAHHCt64G0JHLMzT0K+wyGedHsmTQ=
github.com/xhofe/tache v0.0.0-20231120085916-722855be0521 h1:m7O+xOqQRysjFngMhQ39RzCFdiCouFLvsrV7N2ScbUY=
github.com/xhofe/tache v0.0.0-20231120085916-722855be0521/go.mod h1:iKumPFvywf30FRpAHHCt64G0JHLMzT0K+wyGedHsmTQ=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
Expand Down
92 changes: 50 additions & 42 deletions internal/fs/copy.go
Expand Up @@ -3,24 +3,39 @@ package fs
import (
"context"
"fmt"
"net/http"
stdpath "path"
"sync/atomic"

"github.com/alist-org/alist/v3/internal/conf"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/internal/stream"
"github.com/alist-org/alist/v3/pkg/task"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/xhofe/tache"
"net/http"
stdpath "path"
)

var CopyTaskManager = task.NewTaskManager(3, func(tid *uint64) {
atomic.AddUint64(tid, 1)
})
type CopyTask struct {
tache.Base
Status string `json:"status"`
srcStorage, dstStorage driver.Driver
srcObjPath, dstDirPath string
}

func (t *CopyTask) GetName() string {
return fmt.Sprintf("copy [%s](%s) to [%s](%s)",
t.srcStorage.GetStorage().MountPath, t.srcObjPath, t.dstStorage.GetStorage().MountPath, t.dstDirPath)
}

func (t *CopyTask) GetStatus() string {
return t.Status
}

func (t *CopyTask) Run() error {
return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.srcObjPath, t.dstDirPath)
}

var CopyTaskManager = tache.NewManager[*CopyTask]()

// Copy if in the same storage, call move method
// if not, add copy task
Expand Down Expand Up @@ -63,72 +78,65 @@ func _copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool
}
}
// not in the same storage
CopyTaskManager.Submit(task.WithCancelCtx(&task.Task[uint64]{
Name: fmt.Sprintf("copy [%s](%s) to [%s](%s)", srcStorage.GetStorage().MountPath, srcObjActualPath, dstStorage.GetStorage().MountPath, dstDirActualPath),
Func: func(task *task.Task[uint64]) error {
return copyBetween2Storages(task, srcStorage, dstStorage, srcObjActualPath, dstDirActualPath)
},
}))
CopyTaskManager.Add(&CopyTask{
srcStorage: srcStorage,
dstStorage: dstStorage,
srcObjPath: srcObjActualPath,
dstDirPath: dstDirActualPath,
})
return true, nil
}

func copyBetween2Storages(t *task.Task[uint64], srcStorage, dstStorage driver.Driver, srcObjPath, dstDirPath string) error {
t.SetStatus("getting src object")
srcObj, err := op.Get(t.Ctx, srcStorage, srcObjPath)
func copyBetween2Storages(t *CopyTask, srcStorage, dstStorage driver.Driver, srcObjPath, dstDirPath string) error {
t.Status = "getting src object"
srcObj, err := op.Get(t.Ctx(), srcStorage, srcObjPath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", srcObjPath)
}
if srcObj.IsDir() {
t.SetStatus("src object is dir, listing objs")
objs, err := op.List(t.Ctx, srcStorage, srcObjPath, model.ListArgs{})
t.Status = "src object is dir, listing objs"
objs, err := op.List(t.Ctx(), srcStorage, srcObjPath, model.ListArgs{})
if err != nil {
return errors.WithMessagef(err, "failed list src [%s] objs", srcObjPath)
}
for _, obj := range objs {
if utils.IsCanceled(t.Ctx) {
if utils.IsCanceled(t.Ctx()) {
return nil
}
srcObjPath := stdpath.Join(srcObjPath, obj.GetName())
dstObjPath := stdpath.Join(dstDirPath, srcObj.GetName())
CopyTaskManager.Submit(task.WithCancelCtx(&task.Task[uint64]{
Name: fmt.Sprintf("copy [%s](%s) to [%s](%s)", srcStorage.GetStorage().MountPath, srcObjPath, dstStorage.GetStorage().MountPath, dstObjPath),
Func: func(t *task.Task[uint64]) error {
return copyBetween2Storages(t, srcStorage, dstStorage, srcObjPath, dstObjPath)
},
}))
CopyTaskManager.Add(&CopyTask{
srcStorage: srcStorage,
dstStorage: dstStorage,
srcObjPath: srcObjPath,
dstDirPath: dstObjPath,
})
}
} else {
CopyTaskManager.Submit(task.WithCancelCtx(&task.Task[uint64]{
Name: fmt.Sprintf("copy [%s](%s) to [%s](%s)", srcStorage.GetStorage().MountPath, srcObjPath, dstStorage.GetStorage().MountPath, dstDirPath),
Func: func(t *task.Task[uint64]) error {
err := copyFileBetween2Storages(t, srcStorage, dstStorage, srcObjPath, dstDirPath)
log.Debugf("copy file between storages: %+v", err)
return err
},
}))
t.Status = "src object is dir, added all copy tasks of objs"
return nil
}
return nil
return copyFileBetween2Storages(t, srcStorage, dstStorage, srcObjPath, dstDirPath)
}

func copyFileBetween2Storages(tsk *task.Task[uint64], srcStorage, dstStorage driver.Driver, srcFilePath, dstDirPath string) error {
srcFile, err := op.Get(tsk.Ctx, srcStorage, srcFilePath)
func copyFileBetween2Storages(tsk *CopyTask, srcStorage, dstStorage driver.Driver, srcFilePath, dstDirPath string) error {
srcFile, err := op.Get(tsk.Ctx(), srcStorage, srcFilePath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", srcFilePath)
}
link, _, err := op.Link(tsk.Ctx, srcStorage, srcFilePath, model.LinkArgs{
link, _, err := op.Link(tsk.Ctx(), srcStorage, srcFilePath, model.LinkArgs{
Header: http.Header{},
})
if err != nil {
return errors.WithMessagef(err, "failed get [%s] link", srcFilePath)
}
fs := stream.FileStream{
Obj: srcFile,
Ctx: tsk.Ctx,
Ctx: tsk.Ctx(),
}
// any link provided is seekable
ss, err := stream.NewSeekableStream(fs, link)
if err != nil {
return errors.WithMessagef(err, "failed get [%s] stream", srcFilePath)
}
return op.Put(tsk.Ctx, dstStorage, dstDirPath, ss, tsk.SetProgress, true)
return op.Put(tsk.Ctx(), dstStorage, dstDirPath, ss, tsk.SetProgress, true)
}
41 changes: 28 additions & 13 deletions internal/fs/put.go
Expand Up @@ -3,18 +3,34 @@ package fs
import (
"context"
"fmt"
"github.com/alist-org/alist/v3/internal/model"
"sync/atomic"

"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/errs"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/pkg/task"
"github.com/pkg/errors"
"github.com/xhofe/tache"
)

var UploadTaskManager = task.NewTaskManager(3, func(tid *uint64) {
atomic.AddUint64(tid, 1)
})
type UploadTask struct {
tache.Base
storage driver.Driver
dstDirActualPath string
file model.FileStreamer
}

func (t *UploadTask) GetName() string {
return fmt.Sprintf("upload %s to [%s](%s)", t.file.GetName(), t.storage.GetStorage().MountPath, t.dstDirActualPath)
}

func (t *UploadTask) GetStatus() string {
return "uploading"
}

func (t *UploadTask) Run() error {
return op.Put(t.Ctx(), t.storage, t.dstDirActualPath, t.file, t.SetProgress, true)
}

var UploadTaskManager = tache.NewManager[*UploadTask]()

// putAsTask add as a put task and return immediately
func putAsTask(dstDirPath string, file model.FileStreamer) error {
Expand All @@ -33,12 +49,11 @@ func putAsTask(dstDirPath string, file model.FileStreamer) error {
//file.SetReader(tempFile)
//file.SetTmpFile(tempFile)
}
UploadTaskManager.Submit(task.WithCancelCtx(&task.Task[uint64]{
Name: fmt.Sprintf("upload %s to [%s](%s)", file.GetName(), storage.GetStorage().MountPath, dstDirActualPath),
Func: func(task *task.Task[uint64]) error {
return op.Put(task.Ctx, storage, dstDirActualPath, file, task.SetProgress, true)
},
}))
UploadTaskManager.Add(&UploadTask{
storage: storage,
dstDirActualPath: dstDirActualPath,
file: file,
})
return nil
}

Expand Down
42 changes: 15 additions & 27 deletions internal/offline_download/aria2/aria2.go
Expand Up @@ -21,6 +21,10 @@ type Aria2 struct {
client rpc.Client
}

func (a *Aria2) Name() string {
return "aria2"
}

func (a *Aria2) Items() []model.SettingItem {
// aria2 settings
return []model.SettingItem{
Expand Down Expand Up @@ -58,16 +62,17 @@ func (a *Aria2) AddURL(args *tool.AddUrlArgs) (string, error) {
if err != nil {
return "", err
}
notify.Signals.Store(gid, args.Signal)
return gid, nil
}

func (a *Aria2) Remove(tid string) error {
_, err := a.client.Remove(tid)
func (a *Aria2) Remove(task *tool.DownloadTask) error {
_, err := a.client.Remove(task.GID)
return err
}

func (a *Aria2) Status(tid string) (*tool.Status, error) {
info, err := a.client.TellStatus(tid)
func (a *Aria2) Status(task *tool.DownloadTask) (*tool.Status, error) {
info, err := a.client.TellStatus(task.GID)
if err != nil {
return nil, err
}
Expand All @@ -85,15 +90,15 @@ func (a *Aria2) Status(tid string) (*tool.Status, error) {
}
s.Progress = float64(downloaded) / float64(total) * 100
if len(info.FollowedBy) != 0 {
s.NewTID = info.FollowedBy[0]
notify.Signals.Delete(tid)
//notify.Signals.Store(gid, m.c)
s.NewGID = info.FollowedBy[0]
notify.Signals.Delete(task.GID)
notify.Signals.Store(s.NewGID, task.Signal)
}
switch info.Status {
case "complete":
s.Completed = true
case "error":
s.Err = errors.Errorf("failed to download %s, error: %s", tid, info.ErrorMessage)
s.Err = errors.Errorf("failed to download %s, error: %s", task.GID, info.ErrorMessage)
case "active":
s.Status = "aria2: " + info.Status
if info.Seeder == "true" {
Expand All @@ -102,32 +107,15 @@ func (a *Aria2) Status(tid string) (*tool.Status, error) {
case "waiting", "paused":
s.Status = "aria2: " + info.Status
case "removed":
s.Err = errors.Errorf("failed to download %s, removed", tid)
s.Err = errors.Errorf("failed to download %s, removed", task.GID)
default:
return nil, errors.Errorf("[aria2] unknown status %s", info.Status)
}
return s, nil
}

func (a *Aria2) GetFiles(tid string) []tool.File {
//files, err := a.client.GetFiles(tid)
//if err != nil {
// return nil
//}
//return utils.MustSliceConvert(files, func(f rpc.FileInfo) tool.File {
// return tool.File{
// //ReadCloser: nil,
// Name: path.Base(f.Path),
// Size: f.Length,
// Path: "",
// Modified: time.Time{},
// }
//})
return nil
}

var _ tool.Tool = (*Aria2)(nil)

func init() {
tool.Tools.Add("aria2", &Aria2{})
tool.Tools.Add(&Aria2{})
}
22 changes: 11 additions & 11 deletions internal/offline_download/qbit/qbit.go
Expand Up @@ -13,6 +13,10 @@ type QBittorrent struct {
client qbittorrent.Client
}

func (a *QBittorrent) Name() string {
return "qBittorrent"
}

func (a *QBittorrent) Items() []model.SettingItem {
// qBittorrent settings
return []model.SettingItem{
Expand Down Expand Up @@ -44,13 +48,13 @@ func (a *QBittorrent) AddURL(args *tool.AddUrlArgs) (string, error) {
return args.UID, nil
}

func (a *QBittorrent) Remove(tid string) error {
err := a.client.Delete(tid, true)
func (a *QBittorrent) Remove(task *tool.DownloadTask) error {
err := a.client.Delete(task.GID, true)
return err
}

func (a *QBittorrent) Status(tid string) (*tool.Status, error) {
info, err := a.client.GetInfo(tid)
func (a *QBittorrent) Status(task *tool.DownloadTask) (*tool.Status, error) {
info, err := a.client.GetInfo(task.GID)
if err != nil {
return nil, err
}
Expand All @@ -62,19 +66,15 @@ func (a *QBittorrent) Status(tid string) (*tool.Status, error) {
case qbittorrent.ALLOCATING, qbittorrent.DOWNLOADING, qbittorrent.METADL, qbittorrent.PAUSEDDL, qbittorrent.QUEUEDDL, qbittorrent.STALLEDDL, qbittorrent.CHECKINGDL, qbittorrent.FORCEDDL, qbittorrent.CHECKINGRESUMEDATA, qbittorrent.MOVING:
s.Status = "[qBittorrent] downloading"
case qbittorrent.ERROR, qbittorrent.MISSINGFILES, qbittorrent.UNKNOWN:
s.Err = errors.Errorf("[qBittorrent] failed to download %s, error: %s", tid, info.State)
s.Err = errors.Errorf("[qBittorrent] failed to download %s, error: %s", task.GID, info.State)
default:
s.Err = errors.Errorf("[qBittorrent] unknown error occurred downloading %s", tid)
s.Err = errors.Errorf("[qBittorrent] unknown error occurred downloading %s", task.GID)
}
return s, nil
}

func (a *QBittorrent) GetFiles(tid string) []tool.File {
return nil
}

var _ tool.Tool = (*QBittorrent)(nil)

func init() {
tool.Tools.Add("qBittorrent", &QBittorrent{})
tool.Tools.Add(&QBittorrent{})
}

0 comments on commit 11a30c5

Please sign in to comment.