Extending Tork
Intro
Tork's architecture makes it fairly easy to add functionality to the core engine.
This guide is intended for developers that want to extend Tork's behavior.
For configuring its existing behavior, refer to the configuration guide.
Using Tork as a library
Create a new directory:
mkdir tork-plus
cd tork-plus
Init the Go project:
go mod init github.com/example/tork-plus
Get the Tork dependency:
go get github.com/runabol/tork
go: added github.com/runabol/tork v0.1.66
Create a main.go
with the minimum bolierplate necessary to start Tork:
package main
import (
"fmt"
"os"
"github.com/runabol/tork/cli"
"github.com/runabol/tork/conf"
)
func main() {
if err := conf.LoadConfig(); err != nil {
fmt.Println(err)
os.Exit(1)
}
if err := cli.New().Run(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
Update your dependencies:
go mod tidy
Start Tork:
% go run main.go
If all goes well, your should see something like this:
_______ _______ ______ ___ _
| || || _ | | | | |
|_ _|| _ || | || | |_| |
| | | | | || |_||_ | _|
| | | |_| || __ || |_
| | | || | | || _ |
|___| |_______||___| |_||___| |_|
0.1.66
NAME:
tork - a distributed workflow engine
USAGE:
tork [global options] command [command options] [arguments...]
COMMANDS:
run Run Tork
migration Run the db migration script
health Perform a health check
help, h Shows a list of commands or help for one command
GLOBAL OPTIONS:
--help, -h show help
Custom endpoint
Let's use the RegisterEndpoint
hook to register a new endpoint:
Update your main.go
to look like this:
package main
import (
"fmt"
"net/http"
"os"
"github.com/runabol/tork/cli"
"github.com/runabol/tork/conf"
"github.com/runabol/tork/engine"
"github.com/runabol/tork/middleware"
)
func main() {
if err := conf.LoadConfig(); err != nil {
fmt.Println(err)
os.Exit(1)
}
handler := func(c middleware.Context) error {
return c.String(http.StatusOK, "Hello")
}
engine.RegisterEndpoint(http.MethodGet, "/myendpoint", handler)
if err := cli.New().Run(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
Start Tork in standalone
mode:
go run main.go run standalone
Let's try to call our new endpoint:
curl http://localhost:8000/myendpoint
If all goes well you should see:
Hello
Middleware
Middleware functions allow you to intercept the operation of Tork at various points in order to handle commons tasks and/or enchance its functionality in some way.
Tork supports 4 types of middlewares:
Some common use cases for middleware:
- Authentication
- Authorization
- Logging
- Rate limiting
- Collecting metrics
- CORS
- Adding extra headers
- Modifying a job/task state
HTTP middleware
HTTP middleware functions are functions that intercept the Coordinator API's HTTP request-response cycle. These functions are executed sequentially, and they have the ability to modify the request and response objects or end the request-response cycle prematurely. They act as intermediaries between the client and your route handlers, allowing you to perform actions before or after a request is handled by a specific route.
If the current middleware function does not end the request-response cycle, it must call next() to pass control to the next middleware function. Otherwise, the request will be left hanging.
Example of a simple middleware function to time requests:
func main () {
// code before
mw := func(next web.HandlerFunc) web.HandlerFunc {
return func(c web.Context) error {
before := time.Now()
// happens before the request is processed
next(c)
// happens after the request is processed
log.Info().Msgf("The request to %s took %s to process",
c.Request().URL.Path,
time.Since(before),
)
return nil
}
}
engine.RegisterWebMiddleware(mw)
// code after
}
curl http://localhost:8000/health
{ "status": "UP", "version": "0.1.66" }
And in the logs you should see something like this:
12:13AM INF The request to /health took 88.125µs to process
Job middleware
Job middleware functions intercept state changes to jobs within the same Coordinator that they are running on.
They act as intermediaries between the client and the Coordinator handlers, allowing you to perform actions before or after a job is handled by the Coordinator.
If the current middleware function does not end the handling of the job, it must call next()
to pass control to the next middleware function. Otherwise, the job will never get handled.
Example of a middleware that logs job state change:
func main () {
// code before
mw := func(next job.HandlerFunc) job.HandlerFunc {
return func(ctx context.Context, j *tork.Job) error {
log.Debug().
Msgf("received job %s at state %s", j.ID, j.State)
return next(ctx, j)
}
}
engine.RegisterJobMiddleware(mw)
// code after
}
Task middleware
Task middleware functions intercept state changes to tasks within the same Coordinator that they are running on.
They act as intermediaries between the client and the Coordinator handlers, allowing you to perform actions before or after a task is handled by the Coordinator.
If the current middleware function does not end the handling of the task, it must call next()
to pass control to the next middleware function. Otherwise, the task will never get handled.
Example of a middleware that logs task state change:
func main () {
// code before
mw := func(next task.HandlerFunc) task.HandlerFunc {
return func(ctx context.Context, t *tork.Task) error {
log.Debug().
Msgf("received task %s at state %s", t.ID, t.State)
return next(ctx, t)
}
}
engine.RegisterTaskMiddleware(mw)
// code after
}
Node middleware
Node middleware functions intercept heartbeat messages from the worker nodes to the Coordinator.
They act as intermediaries between the worker nodes and the Coordinator handlers, allowing you to perform actions before or after a heartbeat is handled by the Coordinator.
If the current middleware function does not end the handling of the heartbeat, it must call next()
to pass control to the next middleware function. Otherwise, the hearbeat will never get handled.
Example of a middleware that logs heartbeats:
func main () {
// code before
mw := func(next node.HandlerFunc) node.HandlerFunc {
return func(ctx context.Context, n *tork.Node) error {
log.Debug().
Msgf("received heartbeat from %s", n.Hostname)
return next(ctx, n)
}
}
engine.RegisterNodeMiddleware(mw)
// code after
}
Built-in middleware
There are several middleware functions that can be enabled and configured:
- CORS
- Basic Auth
- Rate Limit
- Redact
- Request Logger
- Webhook - responsible for executing job webhooks. Example of job webhooks section:
webhooks:
- url: http://example.com/my/webhook # POST (required)
event: job.StateChange # event type
# optional: headers to send when calling the webhook endpoint
headers:
my-header: somevalue
# optional: conditional execution of the webhook
if: "{{ job.State == 'COMPLETED' }}"
- Host Env - allows to inject a list of env vars from the host to any tasks running on that host. Supports aliases using
:
. Example config:
[middleware.task.hostenv]
vars = ["SOME_ENV_VAR","OTHER_ENV_VAR:VAR_NAME_IN_CONTAINER"]