Task Scheduling#

goflow provides worker pools for dynamic task execution and a scheduler for time-based jobs.

Worker Pool#

Worker pools manage a fixed number of goroutines that process submitted tasks.

import "github.com/vnykmshr/goflow/pkg/scheduling/workerpool"

// 5 workers, queue capacity of 100 tasks
pool, err := workerpool.NewSafe(5, 100)
if err != nil {
    log.Fatal(err)
}
defer func() { <-pool.Shutdown() }()

// Submit a task
pool.Submit(workerpool.TaskFunc(func(ctx context.Context) error {
    // Do work
    return nil
}))

Configuration#

pool, err := workerpool.NewWithConfigSafe(workerpool.Config{
    WorkerCount: 10,
    QueueSize:   1000,
    TaskTimeout: 30 * time.Second,
})
ParameterDescription
WorkerCountNumber of concurrent workers
QueueSizeMaximum pending tasks
TaskTimeoutDefault timeout per task

Graceful Shutdown#

The pool supports graceful shutdown, completing in-flight tasks:

// Returns a channel that closes when shutdown completes
done := pool.Shutdown()
<-done

When to Use#

  • Background job processing
  • Parallel task execution
  • Bounded concurrency for resource protection

Scheduler#

The scheduler runs tasks at specified times or intervals using cron expressions.

import "github.com/vnykmshr/goflow/pkg/scheduling/scheduler"

sched := scheduler.New()
sched.Start()
defer sched.Stop()

// Schedule with cron expression
sched.ScheduleCron("cleanup", "0 2 * * *", func(ctx context.Context) error {
    // Runs at 2 AM daily
    return nil
})

// Schedule at fixed intervals
sched.ScheduleInterval("health-check", 30*time.Second, func(ctx context.Context) error {
    return nil
})

Cron Expressions#

Standard cron format with optional seconds:

ExpressionDescription
@hourlyEvery hour
@dailyEvery day at midnight
@weeklyEvery Sunday at midnight
0 * * * *Every hour
*/5 * * * *Every 5 minutes
0 2 * * *Daily at 2 AM

When to Use#

  • Scheduled maintenance tasks
  • Periodic data synchronization
  • Report generation
  • Health checks

Pipeline#

Pipelines process data through multiple stages.

import "github.com/vnykmshr/goflow/pkg/scheduling/pipeline"

p := pipeline.New()

// Add stages using function syntax
p.AddStageFunc("validate", func(ctx context.Context, input interface{}) (interface{}, error) {
    // Validate input
    return input, nil
})
p.AddStageFunc("process", func(ctx context.Context, input interface{}) (interface{}, error) {
    // Process data
    return input, nil
})
p.AddStageFunc("store", func(ctx context.Context, input interface{}) (interface{}, error) {
    // Store result
    return input, nil
})

// Execute pipeline
result, err := p.Execute(ctx, inputData)

When to Use#

  • ETL workflows
  • Multi-stage data processing
  • Assembly-line style operations

Context Cancellation#

All scheduling components respect context cancellation:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

pool.SubmitWithContext(ctx, task)

API Reference#

See pkg.go.dev/github.com/vnykmshr/goflow/pkg/scheduling for complete API documentation.

Examples#