KEDA, Windows, and Batch Jobs

I finally had a reason to dive into KEDA in conjunction with AKS and learned a lot. I’ve been working with a customer who’s been looking to migrate a homegrown, on-premises batch process to Azure Kubernetes Service. KEDA, Kubernetes Event Driven Autoscaling, looked to be a great way to solve this challenge, but there were some requirements that made the answer not-so-simple. The first challenge was, can it work with Windows workloads?

The answer to the first question is, YES! Let’s walk through this:

KEDA & Windows

I started with the excellent samples at https://github.com/tomconte/sample-keda-queue-jobs, but I needed to take these and substantially update them 1) to use KEDA 2.0, and 2) to support Windows workers.

These samples use an illustrative application that places message (eg, ‘jobs’) in an Azure Storage queue. KEDA is then used to schedule Kubernetes jobs using this queue as a trigger. Also notice we are scheduling Kubernetes jobs, not deployments; these work extremely well for this use case but are a little less common than deployments.

AKS Cluster & installing KEDA

We’ll start by creating the AKS cluster:

RG=keda-sample
LOCATION=westus2
CLUSTER_NAME="kedatest"
az group create -l $LOCATION -n $RG

az aks create \
 -g $RG \
 -n $CLUSTER_NAME \
 --node-count 1 \
 --node-vm-size Standard_DS3_v2 \
 --generate-ssh-keys \
  --node-osdisk-type Managed \
  --enable-cluster-autoscaler \
  --min-count 1 \
  --max-count 10 \
  --network-plugin azure 

az aks get-credentials -g $RG -n $CLUSTER_NAME

One of the things we’re going to do for this use case is use the new AKS deallocate scale down mode to improve scale out performance.

az aks nodepool update \
  --scale-down-mode Deallocate \
   --name nodepool1  \
   --cluster-name $CLUSTER_NAME \
   --resource-group $RG

Let’s install KEDA:

helm repo add kedacore https://kedacore.github.io/charts
helm repo update
kubectl create namespace keda
helm install keda kedacore/keda --version 2.4.0 --namespace keda

And finally, let’s add a Windows node pool:

az aks nodepool add -g $RG --cluster-name $CLUSTER_NAME -n win1  \
   --os-type Windows \
   --enable-cluster-autoscaler \
   --min-count 0 --max-count 10 \
   --scale-down-mode Deallocate

Storage Queue

Now we need to set up the storage queue:

STORAGE_ACCOUNT_NAME=kedademo # set to your storage acct name; must be globally unique
export QUEUE_NAME=keda-queue
az storage account create -g $RG -n $STORAGE_ACCOUNT_NAME
az storage queue create -n $QUEUE_NAME
export AzureWebJobsStorage=$(az storage account show-connection-string --name $STORAGE_ACCOUNT_NAME --query connectionString -o tsv)  # this env variable will be used later by our application

# save the storage account connection string as a Kubernetes Secret
kubectl create secret generic secrets \
    --from-literal=AzureWebJobsStorage=$AzureWebJobsStorage

Worker application

Next, we need to build a Windows container image with our worker application. I’ll use Azure Container registry to do the actual image build. I’ll also share the source code for the application at the end of this blog post.

export ACR=kedatest01  # set to your ACR name; must be globally unique
az acr create -n $ACR -g $RG --sku Standard
az aks update -n $CLUSTER_NAME -g $RG  --attach-acr $ACR

az acr build -r $ACR -t $ACR.azurecr.io/queue-consumer-windows  --platform windows queue-consumer-windows

Running the sample

We’re now ready to start using our sample application. As I mentioned above, we’re going to use a KEDA ScaledJob to handle scaling out workers. Here’s the Kubernetes manifest: (which is a file named azurequeue_scaledobject_jobs_windows.yaml)

apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
  name: azure-queue-scaledobject-jobs-win
  namespace: default
spec:
  pollingInterval: 30
  maxReplicaCount: 50
  successfulJobsHistoryLimit: 1
  failedJobsHistoryLimit: 1
  jobTargetRef:
    parallelism: 1
    completions: 1
    activeDeadlineSeconds: 600
    backoffLimit: 6
    template:
      spec:
        nodeSelector:
          kubernetes.io/os: windows
        containers:
        - name: consumer-job
          image: $ACR.azurecr.io/queue-consumer-windows
          resources:
            requests:
              cpu: 100m
              memory: 2000Mi # intentionally set high in order to trigger cluster autoscaler
            limits:
              cpu: 100m
              memory: 2000Mi
          env:
          - name: AzureWebJobsStorage
            valueFrom:
              secretKeyRef:
                name: secrets
                key: AzureWebJobsStorage
          - name: QUEUE_NAME
            value: keda-queue
  triggers:
  - type: azure-queue
    metadata:
      queueName: keda-queue
      queueLength: '1'
      connectionFromEnv: AzureWebJobsStorage

Deploy this as follows:

cat azurequeue_scaledobject_jobs_windows.yaml| envsubst | kubectl apply -f -

Notice in the above that I’m using the tool envsubst to dynamically insert the value of the $ACR environment variable into the manifest.

And finally, run the python app to load up the queue with work (100 messages):

python3 -m venv .venv # only run once
source .venv/bin/activate
pip install -r requirements.txt # only run once

python send_messages.py 100 # adds 100 messages into the queue

At this point, KEDA will create up to MaxReplicaCount jobs (50, per our configuration) to process the messages which have been placed in the storage queue. As each job will consume 2Gb of memory, very quickly the jobs will not be schedulable, and the Cluster Autoscaler will kick in to scale out the cluster. As jobs complete, KEDA will schedule new ones until it runs out of work (eg, the queue is empty). You can watch jobs being automatically scheduled using the command:

kubectl get jobs

In the next blog post, we’ll cover some additional tweaking that can be added to this configuration to support long running jobs.

Code Samples

Queue Consumer

import os
import time
from azure.storage.queue import QueueClient

try:
  connection_string = os.environ['AzureWebJobsStorage']
  queue_name = os.environ['QUEUE_NAME']
except KeyError:
  print('Error: missing environment variable AzureWebJobsStorage or QUEUE_NAME')
  exit(1)

queue = QueueClient.from_connection_string(conn_str=connection_string, queue_name=queue_name)

# Get a single message
message = next(queue.receive_messages())

# Print the message
print(message)

# Delete message from the queue
queue.delete_message(message)

# Sleep for a while, simulating a long-running job
time.sleep(30)

Queue Consumer Dockerfile

# 1809 required for AKS
FROM python:windowsservercore-1809
WORKDIR /app
RUN pip install azure-storage-queue
COPY queue_consumer.py /app
CMD ["python", "queue_consumer.py"]

send_messages.py

import os
import sys
import time
from azure.storage.queue import QueueClient

try:
  connection_string = os.environ['AzureWebJobsStorage']
  queue_name = os.environ['QUEUE_NAME']
except KeyError:
  print('Error: missing environment variable AzureWebJobsStorage or QUEUE_NAME')
  exit(1)

queue = QueueClient.from_connection_string(conn_str=connection_string, queue_name=queue_name)

for message in range(0, int(sys.argv[1])):
  queue.send_message(content='foo_'+str(message))