Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simplify the cron scheduling, remove blocking for first run to get courses #600

Merged
merged 1 commit into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions backend/src/models/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@ const (
var AllDefaultProviderJobs = []JobType{GetCoursesJob, GetMilestonesJob, GetActivityJob}
var AllContentProviderJobs = []JobType{ScrapeKiwixJob, RetryVideoDownloadsJob, SyncVideoMetadataJob, PutVideoMetadataJob}

func (jt JobType) IsVideoJob() bool {
switch jt {
case RetryVideoDownloadsJob, SyncVideoMetadataJob, PutVideoMetadataJob:
return true
}
return false
}

func (jt JobType) IsLibraryJob() bool {
switch jt {
case ScrapeKiwixJob:
return true
}
return false
}

func (jt JobType) PubName() string {
return fmt.Sprintf("tasks.%s", string(jt))
}
Expand Down
35 changes: 0 additions & 35 deletions backend/tasks/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package main

import (
"UnlockEdv2/src/models"
"encoding/json"
"fmt"
"os"
"time"

"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
"gorm.io/driver/postgres"
"gorm.io/gorm"
Expand Down Expand Up @@ -95,36 +93,3 @@ func (jr *JobRunner) handleCreateOCProviderTask(job *models.CronJob, providerId
}
return &task, nil
}

func (jr *JobRunner) fetchInitialProviderCourses(prov *models.ProviderPlatform, jobId string, task *models.RunnableTask) error {
done := make(chan bool)
sub, err := jr.nats.Subscribe("tasks.get_courses.completed", func(msg *nats.Msg) {
var completedParams map[string]interface{}
if err := json.Unmarshal(msg.Data, &completedParams); err != nil {
log.Errorf("failed to unmarshal completion message: %v", err)
return
}
if completedParams["job_id"] == jobId && completedParams["provider_platform_id"] == prov.ID {
done <- true
}
})
if err != nil {
log.Errorf("failed to subscribe to completion subject: %v", err)
return err
}
defer func() {
err := sub.Unsubscribe()
if err != nil {
log.Errorf("failed to unsubscribe from completion subject: %v", err)
}
}()
jr.runTask(task)
select {
case <-done:
log.Info("Course fetching job completed. Continuing with the rest of the jobs...")
case <-time.After(WaitTime):
log.Error("Timeout waiting for course fetching job to complete")
return fmt.Errorf("timeout waiting for course fetching job to complete")
}
return nil
}
80 changes: 28 additions & 52 deletions backend/tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"UnlockEdv2/src/models"
"encoding/json"
"os"
"slices"

"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -52,32 +53,35 @@ func (jr *JobRunner) generateOpenContentProviderTasks() ([]models.RunnableTask,
log.Errorf("failed to fetch all open content providers: %v", err)
return nil, err
}
for idx := range providers {
for _, jobType := range models.AllContentProviderJobs {
switch jobType {
case models.ScrapeKiwixJob:
job := models.CronJob{Name: string(jobType)}
if providers[idx].Title == models.Kiwix {
task, err := jr.handleCreateOCProviderTask(&job, providers[idx].ID)
if err != nil {
log.Errorf("failed to create task: %v", err)
continue
}
otherTasks = append(otherTasks, *task)
}
case models.RetryVideoDownloadsJob, models.PutVideoMetadataJob, models.SyncVideoMetadataJob:
job := models.CronJob{Name: string(jobType)}
if providers[idx].Title == models.Youtube {
task, err := jr.handleCreateOCProviderTask(&job, providers[idx].ID)
if err != nil {
log.Errorf("failed to create task: %v", err)
continue
}
otherTasks = append(otherTasks, *task)
}
default:
vidProvider := slices.IndexFunc(providers, func(p models.OpenContentProvider) bool {
return p.Title == models.Youtube
})
libProvider := slices.IndexFunc(providers, func(p models.OpenContentProvider) bool {
return p.Title == models.Kiwix
})
for _, jobType := range models.AllContentProviderJobs {
if jobType.IsLibraryJob() {
if libProvider == -1 {
continue
}
job := models.CronJob{Name: string(jobType)}
task, err := jr.handleCreateOCProviderTask(&job, providers[libProvider].ID)
if err != nil {
log.Errorf("failed to create task: %v", err)
continue
}
otherTasks = append(otherTasks, *task)
} else if jobType.IsVideoJob() {
if vidProvider == -1 {
continue
}
job := models.CronJob{Name: string(jobType)}
task, err := jr.handleCreateOCProviderTask(&job, providers[vidProvider].ID)
if err != nil {
log.Errorf("failed to create task: %v", err)
continue
}
otherTasks = append(otherTasks, *task)
}
}
return otherTasks, nil
Expand All @@ -92,36 +96,8 @@ func (jr *JobRunner) generateProviderTasks() ([]models.RunnableTask, error) {
log.Infof("Found %d active providers", len(providers))
tasksToRun := make([]models.RunnableTask, 0)
for _, provider := range providers {
var courses []models.Course
if err := jr.db.Find(&courses, "provider_platform_id = ?", provider.ID).Error; err != nil {
log.Errorf("failed to fetch courses for provider %d: %v", provider.ID, err)
continue
}
firstRun := false
if len(courses) == 0 {
getCoursesJob, err := jr.createIfNotExists(models.GetCoursesJob)
if err != nil {
log.Errorf("failed to create GetCoursesJob: %v", err)
continue
}
getCoursesTask := models.RunnableTask{JobID: getCoursesJob.ID, ProviderPlatformID: &provider.ID, Status: models.StatusPending}
err = jr.intoProviderPlatformTask(getCoursesJob, provider.ID, &getCoursesTask)
if err != nil {
log.Errorf("failed to create GetCoursesTask: %v", err)
continue
}
firstRun = true
err = jr.fetchInitialProviderCourses(&provider, getCoursesJob.ID, &getCoursesTask)
if err != nil {
log.Errorf("failed to fetch initial provider courses: %v", err)
continue
}
}
provJobs := provider.GetDefaultCronJobs()
for _, jobType := range provJobs {
if jobType == models.GetCoursesJob && firstRun {
continue
}
created, err := jr.createIfNotExists(jobType)
if err != nil {
log.Errorf("failed to create job: %v", err)
Expand Down
34 changes: 10 additions & 24 deletions provider-middleware/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,8 @@ func (sh *ServiceHandler) handleCourses(ctx context.Context, msg *nats.Msg) {
params := *service.GetJobParams()
jobId := params["job_id"].(string)
providerPlatformId := int(params["provider_platform_id"].(float64))
err = service.ImportCourses(sh.db)
if err != nil {
sh.cleanupJob(contxt, providerPlatformId, jobId, false)
logger().Println("error importing kolibri courses", err)
return
}
finished := nats.NewMsg("tasks.get_courses.completed")
finished.Data = []byte(`{"job_id": "` + jobId + `"}`)
err = sh.nats.PublishMsg(finished)
if err != nil {
logger().Errorln("Failed to publish message to NATS")
}
sh.cleanupJob(contxt, providerPlatformId, jobId, true)
success := service.ImportCourses(sh.db) != nil
sh.cleanupJob(contxt, providerPlatformId, jobId, success)
}

func (sh *ServiceHandler) handleScrapeLibraries(ctx context.Context, msg *nats.Msg) {
Expand All @@ -92,14 +81,8 @@ func (sh *ServiceHandler) handleScrapeLibraries(ctx context.Context, msg *nats.M
}
jobId := body["job_id"].(string)
kiwixService := NewKiwixService(provider, &body)
err = kiwixService.ImportLibraries(contxt, sh.db)
if err != nil {
logger().Errorf("error importing libraries from msg %v", err)
sh.cleanupJob(contxt, int(provider.ID), jobId, false)
return
}

sh.cleanupJob(contxt, int(provider.ID), jobId, true)
success := kiwixService.ImportLibraries(contxt, sh.db) != nil
sh.cleanupJob(contxt, int(provider.ID), jobId, success)
}

/**
Expand Down Expand Up @@ -146,6 +129,7 @@ func (sh *ServiceHandler) handleMilestonesForCourseUser(ctx context.Context, msg
logger().Println("initiating GetMilestonesForCourseUser milestones")
params := *service.GetJobParams()
courses := extractArrayMap(params, "courses")
success := true
users := extractArrayMap(params, "user_mappings")
jobId := params["job_id"].(string)
lastRunStr := params["last_run"].(string)
Expand All @@ -164,12 +148,13 @@ func (sh *ServiceHandler) handleMilestonesForCourseUser(ctx context.Context, msg
err = service.ImportMilestones(course, users, sh.db, lastRun)
time.Sleep(TIMEOUT_WAIT * time.Second) // to avoid rate limiting with the provider
if err != nil {
success = false
logger().Errorf("Failed to retrieve milestones: %v", err)
continue
}
}
}
sh.cleanupJob(contxt, providerPlatformId, jobId, true)
sh.cleanupJob(contxt, providerPlatformId, jobId, success)
}

func (sh *ServiceHandler) handleAcitivityForCourse(ctx context.Context, msg *nats.Msg) {
Expand All @@ -180,6 +165,7 @@ func (sh *ServiceHandler) handleAcitivityForCourse(ctx context.Context, msg *nat
logger().WithFields(logrus.Fields{"error": err.Error()}).Error("Failed to initialize service")
return
}
success := true
params := *service.GetJobParams()
logger().Println("params for activity job: ", params)
courses := extractArrayMap(params, "courses")
Expand All @@ -193,13 +179,13 @@ func (sh *ServiceHandler) handleAcitivityForCourse(ctx context.Context, msg *nat
default:
err = service.ImportActivityForCourse(course, sh.db)
if err != nil {
sh.cleanupJob(contxt, providerPlatformId, jobId, false)
success = false
logger().Errorf("failed to get course activity: %v", err)
continue
}
}
}
sh.cleanupJob(contxt, providerPlatformId, jobId, true)
sh.cleanupJob(contxt, providerPlatformId, jobId, success)
}

func (sh *ServiceHandler) handleAddVideos(ctx context.Context, msg *nats.Msg) {
Expand Down
2 changes: 1 addition & 1 deletion provider-middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (sh *ServiceHandler) cleanupJob(ctx context.Context, provId int, jobId stri
log.Infof("job %s succeeded?: %v \n cleaning up task", jobId, success)
var task models.RunnableTask
if err := sh.db.WithContext(ctx).Model(models.RunnableTask{}).
Find(&task, "(provider_platform_id = ? AND job_id = ?) OR (open_content_provider_id = ? AND job_id = ?)", provId, jobId, provId, jobId).
First(&task, "(provider_platform_id = ? AND job_id = ?) OR (open_content_provider_id = ? AND job_id = ?)", provId, jobId, provId, jobId).
Error; err != nil {
log.Errorf("failed to fetch task: %v", err)
return
Expand Down
Loading