KEDA, Windows, and Batch Jobs
I finally had a reason to dive into KEDA in conjunction with AKS and learned a lot. I’ve been working with a customer who’s been looking to migrate a homegrown, on-premises batch process to Azure Kubernetes Service. KEDA, Kubernetes Event Driven Autoscaling, looked to be a great way to solve this challenge, but there were some requirements that made the answer not-so-simple. The first challenge was, can it work with Windows workloads?
The answer to the first question is, YES! Let’s walk through this:
KEDA & Windows
I started with the excellent samples at https://github.com/tomconte/sample-keda-queue-jobs, but I needed to take these and substantially update them 1) to use KEDA 2.0, and 2) to support Windows workers.
These samples use an illustrative application that places message (eg, ‘jobs’) in an Azure Storage queue. KEDA is then used to schedule Kubernetes jobs using this queue as a trigger. Also notice we are scheduling Kubernetes jobs, not deployments; these work extremely well for this use case but are a little less common than deployments.
AKS Cluster & installing KEDA
We’ll start by creating the AKS cluster:
RG=keda-sample
LOCATION=westus2
CLUSTER_NAME="kedatest"
az group create -l $LOCATION -n $RG
az aks create \
-g $RG \
-n $CLUSTER_NAME \
--node-count 1 \
--node-vm-size Standard_DS3_v2 \
--generate-ssh-keys \
--node-osdisk-type Managed \
--enable-cluster-autoscaler \
--min-count 1 \
--max-count 10 \
--network-plugin azure
az aks get-credentials -g $RG -n $CLUSTER_NAME
One of the things we’re going to do for this use case is use the new AKS deallocate scale down mode to improve scale out performance.
az aks nodepool update \
--scale-down-mode Deallocate \
--name nodepool1 \
--cluster-name $CLUSTER_NAME \
--resource-group $RG
Let’s install KEDA:
helm repo add kedacore https://kedacore.github.io/charts
helm repo update
kubectl create namespace keda
helm install keda kedacore/keda --version 2.4.0 --namespace keda
And finally, let’s add a Windows node pool:
az aks nodepool add -g $RG --cluster-name $CLUSTER_NAME -n win1 \
--os-type Windows \
--enable-cluster-autoscaler \
--min-count 0 --max-count 10 \
--scale-down-mode Deallocate
Storage Queue
Now we need to set up the storage queue:
STORAGE_ACCOUNT_NAME=kedademo # set to your storage acct name; must be globally unique
export QUEUE_NAME=keda-queue
az storage account create -g $RG -n $STORAGE_ACCOUNT_NAME
az storage queue create -n $QUEUE_NAME
export AzureWebJobsStorage=$(az storage account show-connection-string --name $STORAGE_ACCOUNT_NAME --query connectionString -o tsv) # this env variable will be used later by our application
# save the storage account connection string as a Kubernetes Secret
kubectl create secret generic secrets \
--from-literal=AzureWebJobsStorage=$AzureWebJobsStorage
Worker application
Next, we need to build a Windows container image with our worker application. I’ll use Azure Container registry to do the actual image build. I’ll also share the source code for the application at the end of this blog post.
export ACR=kedatest01 # set to your ACR name; must be globally unique
az acr create -n $ACR -g $RG --sku Standard
az aks update -n $CLUSTER_NAME -g $RG --attach-acr $ACR
az acr build -r $ACR -t $ACR.azurecr.io/queue-consumer-windows --platform windows queue-consumer-windows
Running the sample
We’re now ready to start using our sample application. As I mentioned above, we’re going to use a KEDA ScaledJob to handle scaling out workers. Here’s the Kubernetes manifest: (which is a file named azurequeue_scaledobject_jobs_windows.yaml)
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: azure-queue-scaledobject-jobs-win
namespace: default
spec:
pollingInterval: 30
maxReplicaCount: 50
successfulJobsHistoryLimit: 1
failedJobsHistoryLimit: 1
jobTargetRef:
parallelism: 1
completions: 1
activeDeadlineSeconds: 600
backoffLimit: 6
template:
spec:
nodeSelector:
kubernetes.io/os: windows
containers:
- name: consumer-job
image: $ACR.azurecr.io/queue-consumer-windows
resources:
requests:
cpu: 100m
memory: 2000Mi # intentionally set high in order to trigger cluster autoscaler
limits:
cpu: 100m
memory: 2000Mi
env:
- name: AzureWebJobsStorage
valueFrom:
secretKeyRef:
name: secrets
key: AzureWebJobsStorage
- name: QUEUE_NAME
value: keda-queue
triggers:
- type: azure-queue
metadata:
queueName: keda-queue
queueLength: '1'
connectionFromEnv: AzureWebJobsStorage
Deploy this as follows:
cat azurequeue_scaledobject_jobs_windows.yaml| envsubst | kubectl apply -f -
Notice in the above that I’m using the tool envsubst
to dynamically insert the value of the $ACR environment variable into the manifest.
And finally, run the python app to load up the queue with work (100 messages):
python3 -m venv .venv # only run once
source .venv/bin/activate
pip install -r requirements.txt # only run once
python send_messages.py 100 # adds 100 messages into the queue
At this point, KEDA will create up to MaxReplicaCount jobs (50, per our configuration) to process the messages which have been placed in the storage queue. As each job will consume 2Gb of memory, very quickly the jobs will not be schedulable, and the Cluster Autoscaler will kick in to scale out the cluster. As jobs complete, KEDA will schedule new ones until it runs out of work (eg, the queue is empty). You can watch jobs being automatically scheduled using the command:
kubectl get jobs
In the next blog post, we’ll cover some additional tweaking that can be added to this configuration to support long running jobs.
Code Samples
Queue Consumer
import os
import time
from azure.storage.queue import QueueClient
try:
connection_string = os.environ['AzureWebJobsStorage']
queue_name = os.environ['QUEUE_NAME']
except KeyError:
print('Error: missing environment variable AzureWebJobsStorage or QUEUE_NAME')
exit(1)
queue = QueueClient.from_connection_string(conn_str=connection_string, queue_name=queue_name)
# Get a single message
message = next(queue.receive_messages())
# Print the message
print(message)
# Delete message from the queue
queue.delete_message(message)
# Sleep for a while, simulating a long-running job
time.sleep(30)
Queue Consumer Dockerfile
# 1809 required for AKS
FROM python:windowsservercore-1809
WORKDIR /app
RUN pip install azure-storage-queue
COPY queue_consumer.py /app
CMD ["python", "queue_consumer.py"]
send_messages.py
import os
import sys
import time
from azure.storage.queue import QueueClient
try:
connection_string = os.environ['AzureWebJobsStorage']
queue_name = os.environ['QUEUE_NAME']
except KeyError:
print('Error: missing environment variable AzureWebJobsStorage or QUEUE_NAME')
exit(1)
queue = QueueClient.from_connection_string(conn_str=connection_string, queue_name=queue_name)
for message in range(0, int(sys.argv[1])):
queue.send_message(content='foo_'+str(message))