Gin+robfig/cron/v3 實(shí)現(xiàn)任務(wù)調(diào)度系統(tǒng):定時(shí)、周期、立即執(zhí)行、重試與恢復(fù)全支持!
最近在開(kāi)發(fā)自動(dòng)化平臺(tái)、漏洞工單系統(tǒng)、監(jiān)控平臺(tái)等后臺(tái)服務(wù)時(shí),我們經(jīng)常需要任務(wù)調(diào)度系統(tǒng),來(lái)定時(shí)執(zhí)行某些邏輯,比如周期性數(shù)據(jù)同步、定時(shí)告警、定時(shí)下發(fā)任務(wù)等。

本文將手把手教你如何使用Go語(yǔ)言的Gin 框架+robfig/cron/v3實(shí)現(xiàn)一個(gè)自定義任務(wù)調(diào)度系統(tǒng),支持以下核心功能:
- 任務(wù)定時(shí)執(zhí)行
 - 周期執(zhí)行(Cron 表達(dá)式)
 - 立即執(zhí)行任務(wù)
 - 暫停/恢復(fù)任務(wù)
 - 重試任務(wù)
 
技術(shù)棧
- Gin:HTTP 接口服務(wù)框架
 - robfig/cron v3:任務(wù)調(diào)度庫(kù),支持 Cron 表達(dá)式解析
 - 原生 Goroutine 和上下文管理:控制任務(wù)生命周期
 
項(xiàng)目結(jié)構(gòu)
做一個(gè)簡(jiǎn)單的demo項(xiàng)目,所以項(xiàng)目目錄結(jié)構(gòu)比較簡(jiǎn)單,具體如下所示:
?  robfig_cron_gin tree .
.
├── go.mod
├── go.sum
├── handler
│   └── task_handler.go
├── main.go
├── router
│   └── router.go
└── scheduler
    ├── scheduler.go
    └── task.go
4 directories, 7 files初始化項(xiàng)目,并添加依賴(lài),具體如下所示:
go get github.com/robfig/cron/v3
go get github.com/gin-gonic/gin原理簡(jiǎn)析
(1) robfig/cron工作機(jī)制
cron/v3是Go最成熟的任務(wù)調(diào)度庫(kù)之一,支持標(biāo)準(zhǔn)Cron表達(dá)式(包括秒級(jí)),本質(zhì)上維護(hù)了一個(gè)調(diào)度器,通過(guò) AddFunc() 添加任務(wù)后,每到設(shè)定的時(shí)間點(diǎn)自動(dòng)執(zhí)行。
(2) 自定義調(diào)度器
我們封裝了一個(gè) Scheduler:
- 內(nèi)部持有 *cron.Cron
 - 每個(gè)任務(wù)用 map[string]EntryID 管理,支持增刪查
 - 支持RunNow() 手動(dòng)觸發(fā)
 - 支持暫停(remove)和恢復(fù)(重新 add)
 
代碼實(shí)現(xiàn)
(1) 定義任務(wù)結(jié)構(gòu)體scheduler/task.go
package scheduler
import "context"
type Task struct {
	ID       string
	Name     string
	Schedule string
	JobFunc  func(ctx context.Context)
	Retry    int
}(2) 調(diào)度器核心邏輯scheduler/scheduler.go
package scheduler
import (
	"context"
	"sync"
	"github.com/robfig/cron/v3"
)
type Scheduler struct {
	c      *cron.Cron
	mu     sync.Mutex
	tasks  map[string]cron.EntryID
	funcs  map[string]*Task
	status map[string]string
}
func NewScheduler() *Scheduler {
	return &Scheduler{
		c:      cron.New(cron.WithSeconds()),
		tasks:  make(map[string]cron.EntryID),
		funcs:  make(map[string]*Task),
		status: make(map[string]string),
	}
}
func (s *Scheduler) Start() {
	s.c.Start()
}
func (s *Scheduler) Stop() {
	s.c.Stop()
}
func (s *Scheduler) AddTask(t *Task) error {
	s.mu.Lock()
	defer s.mu.Unlock()
	id, err := s.c.AddFunc(t.Schedule, func() {
		ctx := context.Background()
		t.JobFunc(ctx)
	})
	if err != nil {
		return err
	}
	s.tasks[t.ID] = id
	s.funcs[t.ID] = t
	s.status[t.ID] = "running"
	return nil
}
func (s *Scheduler) RunNow(taskID string) {
	s.mu.Lock()
	defer s.mu.Unlock()
	if t, ok := s.funcs[taskID]; ok {
		go t.JobFunc(context.Background())
	}
}
func (s *Scheduler) PauseTask(taskID string) {
	s.mu.Lock()
	defer s.mu.Unlock()
	if id, ok := s.tasks[taskID]; ok {
		s.c.Remove(id)
		s.status[taskID] = "paused"
	}
}
func (s *Scheduler) ResumeTask(taskID string) error {
	if t, ok := s.funcs[taskID]; ok {
		return s.AddTask(t)
	}
	return nil
}
func (s *Scheduler) RetryTask(taskID string) {
	s.RunNow(taskID) // 簡(jiǎn)單實(shí)現(xiàn),等同于立即執(zhí)行
}
func (s *Scheduler) Status(taskID string) string {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.status[taskID]
}(3) 接口實(shí)現(xiàn)邏輯handler/task_handler.go
package handler
import (
	"context"
	"net/http"
	"robfig_cron_gin/scheduler"
	"github.com/gin-gonic/gin"
)
var sched *scheduler.Scheduler
func Init(s *scheduler.Scheduler) {
	sched = s
}
func AddTask(c *gin.Context) {
	var req struct {
		ID       string `json:"id"`
		Name     string `json:"name"`
		Schedule string `json:"schedule"`
	}
	if err := c.BindJSON(&req); err != nil {
		c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
		return
	}
	task := &scheduler.Task{
		ID:       req.ID,
		Name:     req.Name,
		Schedule: req.Schedule,
		JobFunc: func(ctx context.Context) {
			// 模擬任務(wù)執(zhí)行邏輯
			println("Task executed:", req.ID)
		},
	}
	err := sched.AddTask(task)
	if err != nil {
		c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
		return
	}
	c.JSON(http.StatusOK, gin.H{"message": "task added"})
}
func RunNow(c *gin.Context) {
	id := c.Param("id")
	sched.RunNow(id)
	c.JSON(http.StatusOK, gin.H{"message": "task executed now"})
}
func PauseTask(c *gin.Context) {
	id := c.Param("id")
	sched.PauseTask(id)
	c.JSON(http.StatusOK, gin.H{"message": "task paused"})
}
func ResumeTask(c *gin.Context) {
	id := c.Param("id")
	err := sched.ResumeTask(id)
	if err != nil {
		c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
		return
	}
	c.JSON(http.StatusOK, gin.H{"message": "task resumed"})
}(4) Gin 路由配置router/router.go
package router
import (
	"robfig_cron_gin/handler"
	"github.com/gin-gonic/gin"
)
func SetupRouter() *gin.Engine {
	r := gin.Default()
	r.POST("/task", handler.AddTask)
	r.POST("/task/:id/run", handler.RunNow)
	r.POST("/task/:id/pause", handler.PauseTask)
	r.POST("/task/:id/resume", handler.ResumeTask)
	return r
}(5) 服務(wù)入口main.go
package main
import (
	"robfig_cron_gin/handler"
	"robfig_cron_gin/router"
	"robfig_cron_gin/scheduler"
)
func main() {
	s := scheduler.NewScheduler()
	s.Start()
	defer s.Stop()
	handler.Init(s)
	r := router.SetupRouter()
	r.Run(":9311")
}運(yùn)行并測(cè)試項(xiàng)目
運(yùn)行程序命令如下所示:
go run .測(cè)試命令如下所示:
?  ~ curl -X POST http://localhost:9311/task \
  -H 'Content-Type: application/json' \
  -d '{"id":"task1", "name":"MyTask", "schedule":"*/10 * * * * *"}'
{"message":"task added"}%
?  ~ curl -X POST http://localhost:9311/task/task1/pause
{"message":"task paused"}%
?  ~ curl -X POST http://localhost:9311/task/task1/run
{"message":"task executed now"}%
?  ~ curl -X POST http://localhost:9311/task/task1/pause
{"message":"task paused"}%
?  ~ curl -X POST http://localhost:9311/task/task1/resume
{"message":"task resumed"}%效果如下圖所示:

后續(xù)可擴(kuò)展方向
- 支持任務(wù)持久化到數(shù)據(jù)庫(kù)(MySQL/Postgres)
 - 支持失敗重試策略(使用backoff、retry)
 - 支持任務(wù)執(zhí)行日志持久化 + WebSocket實(shí)時(shí)推送
 - 支持多實(shí)例集群調(diào)度,分布式鎖控制任務(wù)唯一執(zhí)行
 
總結(jié)
通過(guò)Gin + robfig/cron,我們實(shí)現(xiàn)了一個(gè)輕量、功能靈活的任務(wù)調(diào)度系統(tǒng)。結(jié)構(gòu)清晰、接口豐富,非常適合內(nèi)嵌進(jìn)后端服務(wù)系統(tǒng)中,如定時(shí)同步平臺(tái)、自動(dòng)化任務(wù)管理系統(tǒng)、CI/CD執(zhí)行器等。















 
 
 











 
 
 
 