Skip to content

Run processing background job on Kubernetes for Ruby

License

Notifications You must be signed in to change notification settings

yuemori/kube_queue

Repository files navigation

KubeQueue

Build Status

Installation

Add this line to your application's Gemfile:

gem 'kube_queue'

And then execute:

$ bundle

Or install it yourself as:

$ gem install kube_queue

Getting Started

Implement worker:

class TestWorker
  include KubeQueue::Worker

  job_name 'kube-queue-test'
  image "my-registry/my-image"
  container_name 'kube-queue-test'

  command 'bundle', 'exec', 'kube_queue', 'TestWorker', '-r', './test_worker.rb'

  def perform(payload)
    puts payload['message']
  end
end

Setting kubernetes configuration.

KubeQueue.kubernetes_configure do |client|
  client.url = ENV['K8S_URL']
  client.ssl_ca_file = ENV['K8S_CA_CERT_FILE']
  client.auth_token = File.read(ENV['K8S_TOKEN'])
end

and run:

TestWorker.enqueue(message: 'hello')

# delay
TestWorker.enqueue_at(message: 'hello', Time.now + 100)

ActiveJob Support

Write to application.rb:

Rails.application.config.active_job.adapter = :kube_queue

Just put your job into app/jobs . Example:

# app/jobs/print_message_job.rb
class PrintMessageJob < ApplicationJob
  include KubeQueue::Worker

  worker_name 'print-message-job'
  image "your-registry/your-image"
  container_name 'your-container-name'

  def perform(payload)
    logger.info payload[:message]
  end
end

and run:

irb(main):001:0> job = PrintMessageJob.perform_later(message: 'hello, kubernetes!')
Enqueued PrintMessageJob (Job ID: 0bf15b35-62d8-4380-9173-99839ce735ff) to KubeQueue(default) with arguments: {:message=>"hello, kubernetes!"}
=> #<PrintMessageJob:0x00007fbfd00c7848 @arguments=[{:message=>"hello, kubernetes!"}], @job_id="0bf15b35-62d8-4380-9173-99839ce735ff", @queue_name="default", @priority=nil, @executions=0>
irb(main):002:0> job.status
=> #<K8s::Resource startTime="2019-08-12T15:56:37Z", active=1>
irb(main):003:0> job.status
=> #<K8s::Resource conditions=[{:type=>"Complete", :status=>"True", :lastProbeTime=>"2019-08-12T15:57:03Z", :lastTransitionTime=>"2019-08-12T15:57:03Z"}], startTime="2019-08-12T15:56:37Z", completionTime="2019-08-12T15:57:03Z", succeeded=1>

See more examples in here.

Run job on locally

bundle exec kube_queue runner JOB_NAME [PAYLOAD]

See more information by kube_queue help or read here.

Advanced Tips

Get a job status

job = ComputePiJob.perform_later
job.status

scheduled job dosent supported now.

Check a generating manifest

# from class
puts ComputePiJob.manifest

# from instance
job = ComputePiJob.perform_later
puts job.manifest

Retry job

Kubernetes Job has a own retry mechanism, if set backoff_limit and/or restart_policy to use it.

class ComputePiJob
  include KubeQueue::Worker

  worker_name 'pi'
  image 'perl'
  container_name 'pi'
  command "perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"

  backoff_limit 10
  restart_policy 'Never'
end

More information, see the official document here.

Timeout

Kubernetes Job has a own timeout mechanism, if set the active_deadline_seconds to use it.

class ComputePiJob
  include KubeQueue::Worker

  worker_name 'pi'
  image 'perl'
  container_name 'pi'
  command "perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"

  active_deadline_seconds 300
end

More information, see the official document here.

Managing container resources

When you specify a Pod, you can optional specify hou much CPU and memory container needs.

class ComputePiJob
  include KubeQueue::Worker

  worker_name 'pi'
  image 'perl'
  container_name 'pi'
  command "perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"

  cpu_limit '0.3'
  cpu_request '0.2'
  memory_limit '100m'
  memory_request '50m'
end

More information, see the official document here.

Use environment variable from ConfigMap/Secret

class ComputePiJob
  include KubeQueue::Worker

  worker_name 'pi'
  image 'perl'
  container_name 'pi'
  command "perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"

  env_from_secret 'mysecret1', 'mysecret2'
  env_from_config_map 'myapp'
end

Features

  • Add tests.
  • Support multiple kubernetes client configuration.
  • Logging informations.
  • Support to get CronJob status.

Development(on GCP/GKE)

setup:

# create service account and cluster role.
kubectl apply -f examples/k8s/service-account.yaml

# get ca.crt and token
kubectl get secret -n kube-system kube-queue-test-token-xxx -o jsonpath="{['data']['token']}" | base64 -d > secrets/token
kubectl get secret -n kube-system kube-queue-test-token-xxx -o jsonpath="{['data']['ca\.crt']}" | base64 -d > secrets/ca.crt

# build image
gcloud builds submit --config cloudbuild.yaml .

run:

K8S_URL=https://xx.xxx.xxx.xxx K8S_CA_CERT_FILE=$(pwd)/secrets/ca.crt K8S_TOKEN=$(pwd)/secrets/token IMAGE_NAME=gcr.io/your-project/kube-queue bin/console

irb(main):001:0> TestWorker.enqueue(message: 'hello, kubernetes!')