io.scheduler task scheduling and simulation

Author: Chris Zheng  (z@caudate.me)
Date: 27 November 2018
Repository: https://github.com/zcaudate/hara
Version: 3.0.2

1    Introduction

hara.io.scheduler provides an easy and intuitive interface for task scheduling. Much emphasis has been placed on task management, including the ability to inspect and kill active tasks. Furthermore, testing time is reduced as the library includes a simulation framework to enable speed-up of the original schedule.

1.1    Installation

Add to project.clj dependencies:

[hara/io.scheduler "3.0.2"]

All functions are in the hara.io.scheduler namespace.

 (use (quote hara.io.scheduler))

2    Scheduler



restart! ^

restarts the scheduler after a forced shutdown

v 3.0
(defn restart!
  [scheduler]
  (shutdown! scheduler)
  (start! scheduler))
link
(restart! sch) ;; All Threads will stop and restart

running? ^

checks to see if the scheduler is running

v 3.0
(defn running?
  [scheduler]
  (component/started? (:clock scheduler)))
link
(running? sch) => false

scheduler ^

creates a schedular from handlers, or both handlers and config

v 3.0
(defn scheduler
  ([handlers] (scheduler handlers {}))
  ([handlers config] (scheduler handlers config {}))
  ([handlers config global]
   (component/system
    {:array     [{:constructor (array/seed-fn handlers)
                  :setup array/initialise} :cache :registry :ticker]
     :clock     [clock/clock :ticker]
     :ticker    [(fn [_] (atom {:time nil :array nil}))]
     :registry  [(fn [_] (state/cache {} {:tag "scheduler.registry"}))]
     :cache     [(fn [_] (state/cache {} {:tag "scheduler.cache"}))]}
    (-> global
        (update-in [:array] nested/merge-nested config)
        (nested/merge-new-nested *defaults*))
    {:tag "scheduler"})))
link
(def sch (scheduler {:print-task-1 {:handler (fn [t] (prn "Hello 1") (Thread/sleep 2000)) :schedule "/5 * * * * * *"} :print-task-2 {:handler (fn [t] (prn "Hello 2") (Thread/sleep 2000)) :schedule "/2 * * * * * *"}}))

shutdown! ^

forcibly shuts down the scheduler, immediately killing all running threads

v 3.0
(defn shutdown!
  [scheduler]
  (kill-instances scheduler)
  (stop! scheduler))
link
(shutdown! sch) ;; All tasks will stop and all running instances killed

simulate ^

simulates the scheduler running for a certain interval:

v 3.0
(defn simulate
  [scheduler {:keys [start end step pause mode]}]
  (swap! (-> scheduler :clock :state) assoc :disabled true)
  (let [scheduler (component/start scheduler)
        clk  (-> scheduler :clock :meta)
        start-val (time/to-long start)
        end-val   (time/to-long end)
        step      (cond (nil? step) 1
                        (number? step) step
                        :else (long (/ (time/to-long step) 1000)))
        pause     (or pause 0)
        mode      (or mode :sync)
        timespan  (range start-val end-val (* 1000 step))]
    (doseq [t-val timespan]
      (let [t (time/from-long t-val clk)]
        (reset! (:ticker scheduler)
                {:time t :array (tab/to-time-array t) :instance {:mode mode}})
        (if-not (zero? pause)
          (Thread/sleep pause))))
    (swap! (-> scheduler :clock :state) dissoc :disabled)
    (component/stop scheduler)))
link
(simulate (scheduler {:print-task {:handler (fn [t params instance] (str t params)) :schedule "/2 * * * * * *" :params {:value "hello world"}}}) {:start (java.util.Date. 0) :end (java.util.Date. 100000) :pause 10})

start! ^

starts the scheduler

v 3.0
(defn start!
  [scheduler]
  (component/start scheduler))
link
(start! sch) ;; => {:ticker {:time #inst "2016-10-25T01:20:06.000-00:00", ;; :array [6 20 8 2 25 10 2016]} ;; :clock {:start-time #inst "2016-10-25T01:18:52.184-00:00", ;; :current-time #inst "2016-10-25T01:20:06.001-00:00", ;; :running true}, ;; :cache {}, ;; :registry {:print-task-2 (#inst "2016-10-25T01:20:06.000-00:00"), ;; :print-task-1 (#inst "2016-10-25T01:20:05.000-00:00")}, ;; :array {:handlers [], ;; :ticker {:time #inst "2016-10-25T01:20:06.000-00:00", ;; :array [6 20 8 2 25 10 2016]} ;; :registry {:print-task-2 (#inst "2016-10-25T01:20:06.000-00:00"), ;; :print-task-1 (#inst "2016-10-25T01:20:05.000-00:00")}, ;; :cache {}}}

stop! ^

stops the scheduler

v 3.0
(defn stop!
  [scheduler]
  (component/stop scheduler))
link
(stop! sch) ;; Schedule will stop but running instances will continue to run until completion ;; ;; => {:array {:handlers ;; [{:status :ready, ;; :val {:name :print-task-1, ;; :mode :async, ;; :arglist [:timestamp :params :instance]}} ;; {:status :ready, ;; :val {:name :print-task-2, ;; :mode :async, ;; :arglist [:timestamp :params :instance]}}]}, ;; :registry #reg {}, ;; :cache #cache {}, ;; :clock #clock {:start-time nil, :current-time nil, :running false}, ;; :ticker {:time #inst "2016-10-25T01:22:58.000-00:00", ;; :array [58 22 8 2 25 10 2016]}}

stopped? ^

checks to see if the scheduler is stopped

v 3.0
(defn stopped?
  [scheduler]
  (component/stopped? (:clock scheduler)))
link
(stopped? sch) => true

uptime ^

checks to see how long the scheduler has been running

v 3.0
(defn uptime
  [scheduler]
  (if-let [start (-> scheduler :clock deref :start-time)]
    (-  (System/currentTimeMillis)
        (time/to-long start))))
link
(uptime sch) ;; when the scheduler is stopped, uptime is `nil` => nil (start! sch) (uptime sch) ;; uptime is from when the scheduler is started => 7936

3    Task



add-task ^

add a task to the scheduler

v 3.0
(defn add-task
  [scheduler name props]
  (dosync (ova/append! (-> scheduler :array :handlers)
                       (array/build-handler name props {}))))
link
(add-task (scheduler {}) :hello {:handler (fn [t params] (println params)) :schedule "* * * * * * *" :params {:data "foo"}})

delete-task ^

deletes a specific task in the scheduler

v 3.0
(defn delete-task
  [scheduler name]
  (dosync (ova/remove! (-> scheduler :array :handlers) [:name name])))
link
(delete-task sch :print-task-2)

disable-task ^

disables a specific task in the scheduler

v 3.0
(defn disable-task
  [scheduler name]
  (dosync (ova/smap! (-> scheduler :array :handlers) [:name name]
                     assoc :disabled true)))
link
(disable-task sch :print-task-1) ;; Task is disabled when `start!` is called

empty-tasks ^

clears all tasks in the scheduler

v 3.0
(defn empty-tasks
  [scheduler]
  (dosync (ova/empty! (-> scheduler :array :handlers))))
link
(empty-tasks sch)

enable-task ^

enables a specific task in the scheduler

v 3.0
(defn enable-task
  [scheduler name]
  (dosync (ova/smap! (-> scheduler :array :handlers) [:name name]
                     dissoc :disabled)))
link
(enable-task sch :print-task-1) ;; Task runs on schedule when `start!` is called

get-task ^

gets a specific task in the scheduler

v 3.0
(defn get-task
  [scheduler name]
  (first (ova/select (-> scheduler :array :handlers) [:name name])))
link
(get-task sch :print-task-1) ;; => #proc{:name :print-task-1, ;; :mode :async, ;; :arglist [:timestamp :params :instance]}

list-tasks ^

lists all tasks in the scheduler

v 3.0
(defn list-tasks
  [scheduler]
  (persistent! (-> scheduler :array :handlers)))
link
(list-tasks sch) ;; => [#proc{:name :print-task-1, ;; :mode :async, ;; :arglist [:timestamp :params :instance]} ;; #proc{:name :print-task-2, ;; :mode :async, ;; :arglist [:timestamp :params :instance] }]

reparametise-task ^

changes the schedule for an already existing task

v 3.0
(defn reparametise-task
  [scheduler name opts]
  (dosync (ova/smap! (-> scheduler :array :handlers) [:name name]
                     update-in [:params] merge opts)))
link
(-> (scheduler {:hello {:handler (fn [t params] (println params)) :schedule "* * * * * * *" :params {:data "foo"}}}) (reparametise-task :hello {:data "bar"}))

reschedule-task ^

changes the schedule for an already existing task

v 3.0
(defn reschedule-task
  [scheduler name schedule]
  (dosync (ova/smap! (-> scheduler :array :handlers) [:name name]
                     assoc
                     :schedule schedule
                     :schedule-array (tab/parse-tab schedule))))
link
(-> (scheduler {:hello {:handler (fn [t params] (println params)) :schedule "* * * * * * *" :params {:data "foo"}}}) (reschedule-task :hello "/5 * * * * * *"))

4    Instance



get-instance ^

gets an instance of the running task

v 3.0
(defn get-instance
  [scheduler name id]
  (-> scheduler
      :registry
      :store
      deref
      (get-in [name id])))
link
(get-instance sch :print-task-1 #inst "2016-10-25T11:39:05.000-00:00") ;; retrieves a running instances in the scheduler

kill-instance ^

kills a single instance of the running task

v 3.0
(defn kill-instance
  [scheduler name id]
  (-> (get-instance scheduler name id)
      (procedure/procedure-kill)))
link
(kill-instance sch :print-task-1 #inst "2016-10-25T11:39:05.000-00:00") ;; kills a running instances in the scheduler

kill-instances ^

kills all instances of the running task

v 3.0
(defn kill-instances
  ([scheduler]
   (vec (for [inst (list-instances scheduler)]
          (procedure/procedure-kill inst))))
  ([scheduler name]
   (vec (for [inst (list-instances scheduler name)]
          (procedure/procedure-kill inst)))))
link
(kill-instances sch) ;; kills all running instances in the scheduler (kill-instances sch :print-task-1) ;; kills all running instances for a particular task

list-instances ^

lists all running instances of a tasks in the scheduler

v 3.0
(defn list-instances
  ([scheduler]
   (for [tsk  (list-tasks scheduler)
         inst (list-instances scheduler (:name tsk))]
     inst))
  ([scheduler name]
   (-> scheduler
       :registry
       :store
       deref
       (get name)
       vals)))
link
(list-instances sch) ;; lists all running instances in the scheduler (list-instances sch :print-task-1) ;; lists all running instances for a particular task

trigger! ^

manually executes a task, bypassing the scheduler

v 3.0
(defn trigger!
  ([scheduler name]
   (let [opts   (-> scheduler :clock :meta)]
     (trigger! scheduler name (time/now opts))))
  ([scheduler name key]
   (if-let [{:keys [params] :as task} (get-task scheduler name)]
     ((-> task
          (assoc :registry (:registry scheduler)))
      key params {}))))
link
(trigger! sch :print-task-1)

5    Walkthrough

hara.io.scheduler has been built around a concept of a tasks and schedulers. We start off by defining basic task:

(def print-task
  {:handler (fn [t] (println "TIME:" t))
   :schedule "/2 * * * * * *"})

The scheduler is defined as follows

(def print-scheduler
  (scheduler {:print-task print-task}))

Calling start! on the scheduler results in the task triggering every two seconds:

(start! print-scheduler)

;; TIME: #inst "2016-10-24T04:00:00.000-00:00"
;; TIME: #inst "2016-10-24T04:00:02.000-00:00"
;; TIME: #inst "2016-10-24T04:00:04.000-00:00"
;; TIME: #inst "2016-10-24T04:00:06.000-00:00"
;; TIME: #inst "2016-10-24T04:00:08.000-00:00"
;; TIME: #inst "2016-10-24T04:00:10.000-00:00"

Calling stop! will stop the scheduler from running:

(stop! print-scheduler)

;; TIME: #inst "2016-10-24T04:00:12.000-00:00"
;; <OUTPUT STOPS>

5.1    Schedule

Each task has a :schedule entry. The value is a string specifying when it is supposed to run. The string is of the same format as crontab - seven elements separated by spaces. The elements are used to match the time, expressed as seven numbers:

 second minute hour day-of-week day-of-month month year

The rules for a match between the crontab and the current time are:

  • A means match on A
  • * means match on any number
  • E1,E2 means match on both E1 and E2
  • A-B means match on any number between A and B inclusive
  • /N means match on any number divisible by N
  • A-B/N means match on any number divisible by N between A and B inclusive

Where A, B and N are numbers; E1 and E2 are expressions. All seven elements in the string have to match in order for the task to be triggered.

;; Triggered every 5 seconds

"/5 * * * * * *"


;; Triggered every 5 seconds between 32 and 60 seconds

"32-60/5 * * * * * *"

;; Triggered every 5 seconds on the 9th aand 10th
;; minute of every hour on every Friday from June
;; to August between years 2012 to 2020.

"/5  9,10  * 5 * 6-8 2012-2020"

5.2    Simulation

Simulations are a great way to check if the system is working correctly. This allows an entire system to be tested for correctness. How simulate works is that it decouples the clock from the task array and forces tasks to trigger on the range of date inputs provided.

(def T1 #inst "1999-12-31T23:59:50.00-00:00")
(def T2 #inst "2000-01-01T00:00:10.00-00:00")

The simulation is then run from T1 to T2 and the results are shown instantaneously

(def sch1 (scheduler {:print-task print-task}))

(simulate sch1
          {:start T1
           :end   T2})
;; > Hello There : #inst "1999-12-31T23:59:50.000-00:00"
;; > Hello There : #inst "1999-12-31T23:59:52.000-00:00"
;; > Hello There : #inst "1999-12-31T23:59:54.000-00:00"
;; > Hello There : #inst "1999-12-31T23:59:56.000-00:00"
;; > Hello There : #inst "1999-12-31T23:59:58.000-00:00"
;; > Hello There : #inst "2000-01-01T00:00:00.000-00:00"
;; > Hello There : #inst "2000-01-01T00:00:02.000-00:00"
;; > Hello There : #inst "2000-01-01T00:00:04.000-00:00"
;; > Hello There : #inst "2000-01-01T00:00:06.000-00:00"
;; > Hello There : #inst "2000-01-01T00:00:08.000-00:00"

We can control the way the simulation is run through other params

(simulate sch1
          {:start T1
           :end   T2
           :mode  :async
           :pause 0
           :step  3})
;; > Hello There : #inst "1999-12-31T23:59:56.000-00:00"
;; > Hello There : #inst "2000-01-01T00:00:02.000-00:00"
;; > Hello There : #inst "2000-01-01T00:00:08.000-00:00"
;; > Hello There : #inst "1999-12-31T23:59:50.000-00:00"

:mode can be either :sync (default) or :async. :step is the number of second to wait to test again and pause is the sleep time in milliseconds.

5.3    Interval and Pause

It can be seen that we can simulate the actual speed of outputs by keeping the step as 1 and increasing the pause time to 1000ms

(simulate sch1
          {:start T1
           :end   T2
           :mode  :async
           :pause 1000
           :step  1})

5.3.1    Speeding Up

In the following example, the step has been increased to 2 whilst the pause time has decreased to 100ms. This results in a 20x increase in the speed of outputs.

(simulate sch1
          {:start T1
           :end   T2
           :mode  :async
           :pause 100
           :step  2})

;; > Hello There : #inst "1999-12-31T23:59:50.000-00:00"

;;            ... wait 0.1 seconds ...

;; > Hello There : #inst "1999-12-31T23:59:52.000-00:00"

;;            ... wait 0.1 seconds ...

;; > Hello There : #inst "1999-12-31T23:59:54.000-00:00"

Being able to adjust these simulation parameters are really powerful testing tools and saves an incredible amount of time in development. For example, we can quickly test the year long output of a task that is scheduled to run once an hour very quickly by making the interval 3600 seconds and the pause time to the same length of time that the task takes to finish.

5.4    Globals

The global defaults are contained in hara.io.scheduler/*defaults*:

(with-out-str (println hara.io.scheduler/*defaults*))
=> {:clock {:type "java.util.Date",
            :timezone "Asia/Kolkata",
            :interval 1,
            :truncate :second},
    :registry {},
    :cache {},
    :ticker {}}

For the purposes of the reader, only the :clock entry of *defaults* is important. To override the defaults, define the scheduler with the settings that needs to be customised. To set the time component used to be java.time.Instant, define the scheduler as follows:

(def sch2 (scheduler {:hello {:handler  (fn [t params] (println t))
                              :schedule "/2 * * * * * *"
                              :params {}}}
                     {}
                     {:clock {:type "java.time.Instant"}}))

(start! sch2)
;;> #<Instant 2016-03-05T03:24:06Z>

;;  ... wait 2 seconds ...

;;> #<Instant 2016-03-05T03:24:08Z>

;;  ... printing out instances of java.time.Instant every 2 seconds ...

(stop! sch2)

5.5    Date Type

It is also possible to use the clojure map representation (the default in hara.time)

(def sch2 (scheduler {:hello {:handler  (fn [t params] (println t))
                              :schedule "/2 * * * * * *"
                              :params {}}}
                     {}
                     {:clock {:type "clojure.lang.PersistentArrayMap"
                              :timezone "GMT"}}))

(start! sch2)

;;> {:day 6, :hour 20, :timezone GMT, :second 38, :month 3,
;;   :type clojure.lang.PersistentHashMap, :year 2016, :millisecond 0, :minute 30}

;;  ... wait 2 seconds ...

;;> {:day 6, :hour 20, :timezone GMT, :second 40, :month 3,
;;   :type clojure.lang.PersistentHashMap, :year 2016, :millisecond 0, :minute 30}

;;  ... printing out instances of java.time.Instant every 2 seconds ...

(stop! sch2)

5.6    Timezone

Having a :timezone value in the clock will ensure that the right timezone is set. The default will always be the system local time, but it can be set to any timezone. To see this in effect, the Calendar object is used and EST is applied.

(def sch2 (scheduler {:hello {:handler  (fn [t params] (println t))
                              :schedule "/2 * * * * * *"
                              :params {}}}
                     {}
                     {:clock {:type "java.util.Calendar"
                              :timezone "EST"}}))

(start! sch2)

;;> #inst "2016-03-06T15:37:38.000-05:00"

;;  ... wait 2 seconds ...

;;> #inst "2016-03-06T15:37:40.000-05:00"

;;  ... wait 2 seconds ...

;;> #inst "2016-03-06T15:37:42.000-05:00"

(stop! sch2)