diff --git a/backend/core/runner/run_task.go b/backend/core/runner/run_task.go index 40742992689..fcc7d1101e5 100644 --- a/backend/core/runner/run_task.go +++ b/backend/core/runner/run_task.go @@ -20,6 +20,7 @@ package runner import ( gocontext "context" "fmt" + "github.com/apache/incubator-devlake/core/models/common" "strings" "time" @@ -292,6 +293,7 @@ func RunPluginSubTasks( taskCtx.SetProgress(0, steps) subtaskNumber := 0 for _, subtaskMeta := range subtaskMetas { + subtaskNumber++ subtaskCtx, err := taskCtx.SubTaskContext(subtaskMeta.Name) if err != nil { // sth went wrong @@ -302,7 +304,6 @@ func RunPluginSubTasks( continue } // run subtask - subtaskNumber++ if progress != nil { progress <- plugin.RunningProgress{ Type: plugin.SetCurrentSubTask, @@ -310,23 +311,23 @@ func RunPluginSubTasks( SubTaskNumber: subtaskNumber, } } - subtaskFinsied := false + subtaskFinished := false if !subtaskMeta.ForceRunOnResume { if task.ID > 0 { sfc := errors.Must1(basicRes.GetDal().Count( dal.From(&models.Subtask{}), dal.Where("task_id = ? AND name = ? AND finished_at IS NOT NULL", task.ID, subtaskMeta.Name), ), ) - subtaskFinsied = sfc > 0 + subtaskFinished = sfc > 0 } } - if subtaskFinsied { + if subtaskFinished { logger.Info("subtask %s already finished previously", subtaskMeta.Name) } else { logger.Info("executing subtask %s", subtaskMeta.Name) start := time.Now() err = runSubtask(basicRes, subtaskCtx, task.ID, subtaskNumber, subtaskMeta.EntryPoint) - logger.Info("subtask %s finished in %s", subtaskMeta.Name, time.Since(start).Milliseconds()) + logger.Info("subtask %s finished in %d ms", subtaskMeta.Name, time.Since(start).Milliseconds()) if err != nil { err = errors.SubtaskErr.Wrap(err, fmt.Sprintf("subtask %s ended unexpectedly", subtaskMeta.Name), errors.WithData(&subtaskMeta)) logger.Error(err, "") @@ -348,8 +349,9 @@ func RunPluginSubTasks( // UpdateProgressDetail FIXME ... func UpdateProgressDetail(basicRes context.BasicRes, taskId uint64, progressDetail *models.TaskProgressDetail, p *plugin.RunningProgress) { - task := &models.Task{} - task.ID = taskId + task := &models.Task{ + Model: common.Model{ID: taskId}, + } subtask := &models.Subtask{} switch p.Type { case plugin.TaskSetProgress: @@ -367,17 +369,19 @@ func UpdateProgressDetail(basicRes context.BasicRes, taskId uint64, progressDeta progressDetail.TotalRecords = p.Total case plugin.SubTaskIncProgress: progressDetail.FinishedRecords = p.Current + // update subtask progress + where := dal.Where("task_id = ? and name = ?", taskId, progressDetail.SubTaskName) + err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{ + {ColumnName: "finished_records", Value: progressDetail.FinishedRecords}, + }, where) + if err != nil { + basicRes.GetLogger().Error(err, "failed to update _devlake_subtasks progress") + } case plugin.SetCurrentSubTask: progressDetail.SubTaskName = p.SubTaskName progressDetail.SubTaskNumber = p.SubTaskNumber - } - // update subtask progress - where := dal.Where("task_id = ? and name = ?", taskId, progressDetail.SubTaskName) - err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{ - {ColumnName: "finished_records", Value: progressDetail.FinishedRecords}, - }, where) - if err != nil { - basicRes.GetLogger().Error(err, "failed to update _devlake_subtasks progress") + default: + return } } diff --git a/backend/impls/context/default_subtask_context.go b/backend/impls/context/default_subtask_context.go index 6f05af66cc2..518644c61ba 100644 --- a/backend/impls/context/default_subtask_context.go +++ b/backend/impls/context/default_subtask_context.go @@ -46,7 +46,7 @@ func (c *DefaultSubTaskContext) IncProgress(quantity int) { c.defaultExecContext.IncProgress(plugin.SubTaskIncProgress, quantity) if c.LastProgressTime.IsZero() || c.LastProgressTime.Add(3*time.Second).Before(time.Now()) || c.current%1000 == 0 { c.LastProgressTime = time.Now() - c.BasicRes.GetLogger().Info("finished records: %d", c.current) + c.BasicRes.GetLogger().Info("finished records: %d(not exactly)", c.current) } else { c.BasicRes.GetLogger().Debug("finished records: %d", c.current) } diff --git a/backend/server/services/task_runner.go b/backend/server/services/task_runner.go index 0ad643bf38a..e9706c6e962 100644 --- a/backend/server/services/task_runner.go +++ b/backend/server/services/task_runner.go @@ -109,7 +109,8 @@ func runTaskStandalone(parentLog log.Logger, taskId uint64) errors.Error { } // now , create a progress update channel and kick off progress := make(chan plugin.RunningProgress, 100) - go updateTaskProgress(taskId, progress) + doneSignal := make(chan struct{}) + go updateTaskProgress(doneSignal, taskId, progress) err = runner.RunTask( ctx, basicRes.ReplaceLogger(parentLog), @@ -117,6 +118,8 @@ func runTaskStandalone(parentLog log.Logger, taskId uint64) errors.Error { taskId, ) close(progress) + // wait all progresses are handled + <-doneSignal return err } @@ -127,15 +130,21 @@ func getRunningTaskById(taskId uint64) *RunningTaskData { return runningTasks.tasks[taskId] } -func updateTaskProgress(taskId uint64, progress chan plugin.RunningProgress) { +func updateTaskProgress(done chan struct{}, taskId uint64, progress chan plugin.RunningProgress) { data := getRunningTaskById(taskId) if data == nil { return } progressDetail := data.ProgressDetail - for p := range progress { - runningTasks.mu.Lock() - runner.UpdateProgressDetail(basicRes, taskId, progressDetail, &p) - runningTasks.mu.Unlock() + for { + p, hasMore := <-progress + if hasMore { + runningTasks.mu.Lock() + runner.UpdateProgressDetail(basicRes, taskId, progressDetail, &p) + runningTasks.mu.Unlock() + } else { + done <- struct{}{} + break + } } }