Programmatically restart pods in a Kubernetes cluster with Python
I’ve been using Google Cloud Kubernetes (GKE) in staging for just over three months for a client project. It’s incredible how far this technology has come.
But this is not a critique of Kubernetes. It doesn’t need one, just look at which cloud companies support Kubernetes - hint, it’s all of them. (Azure, IBM, Amazon, Google)
There is a learning curve, especially since SSH is not the method of deployment or communication. When done correctly, like the way that Google authenticates has integrated authentication with kubectl - it’s a lot more powerful than anything I’ve used before. It reduces security risks by a factor since there is no chance of SSH key’s going out of sync or a colleague that left before you joined a year ago key is found on an machine that was somehow left off the ssh targets - it happens.
There are a few tools that I find indispensable for using Kubernetes:
- kex. An amazing tool that allows you to quickly run a command on a pod. It is prompt based, asking first which namespace and pod to use. Check out the gif on the site.
- kpoof Another great tool from farmotive. Allows you to quickly create a tunnel from a pod to a port on your machine. Great for debugging connection issues.
- kubernetes-deploy Although this is written in Ruby, it’s a great tool to automate deployment and running of once-off containers. This tooling saved me countless hours and is production ready. They use it at Shopify. I just wish there was a Python port.
I also like the CRON jobs that can be setup in Kubernetes. I had been using Django and Celery with the Database backed scheduler, but it kept failing at the oddest of times.
My CRON jobs in Kubernetes
I migrated them in a morning and have not had a problem since. I’m still using Celery, but all the tasks are kicked off by a Kubernetes CRON job that just enqueues the jobs in Celery.
Sorry, I got sidetracked. Back to the issue at hand, how to programmatically restart pods in a cluster.
Firstly, if you didn’t know, Kubernetes exposes credentials and environment variables into each pod that allows it to interact with the k8s API.
The token is exposed at /var/run/secrets/kubernetes.io/serviceaccount/token
, while the environment
variables are KUBERNETES_SERVICE_HOST
and KUBERNETES_PORT_443_TCP_PORT
.
This is Python 3 code below that restarts all the Celery pods in a namespace.
The starmap is a pretty cool concept. Basically I wanted to pass two variables into the function
restart_kubernetes_pod
but there wasn’t an easy way to do it without rewriting the function
which was used by other tools as well. #annoying
This is accomplished by using starmap and zip.
In the end, you end up with two arrays as follows:
>>> pods = ["test-celery-slaves-84fdb4f6dd-h29hc", "test-celery-slaves-81ddf4m6eg-c23mf"]
>>> namespaces = ["test", "test"]
>>> zip(namespaces, pods)
[('test', 'test-celery-slaves-84fdb4f6dd-h29hc'), ('test', 'test-celery-slaves-81ddf4m6eg-c23mf')]
>>> #... pass to starmap and let it populate the arguments of restart_kubernetes_pod.
import logging
import os
from multiprocessing.dummy import Pool as ThreadPool
import requests
logger = logging.getLogger(__name__)
def restart_kubernetes_pod(namespace, pod):
logger.info('Restart Kubernetes pod called with namespace %s and pod %s',
namespace, pod)
token = open('/var/run/secrets/kubernetes.io/serviceaccount/token', 'r').read()
kubernetes_host = os.environ.get('KUBERNETES_SERVICE_HOST')
kubernetes_port = os.environ.get('KUBERNETES_PORT_443_TCP_PORT')
headers = {'Authorization': 'Bearer %s' % token}
url = "https://{host}:{port}/api/v1/namespaces/{namespace}/pods/{pod}".format(
host=kubernetes_host,
port=kubernetes_port,
namespace=namespace,
pod=pod
)
resp = requests.delete(url, headers=headers, verify=False)
logger.info('Response from Kubernetes API %s', resp)
def restart_all_celery_pods(namespace):
token = open('/var/run/secrets/kubernetes.io/serviceaccount/token', 'r').read()
kubernetes_host = os.environ.get('KUBERNETES_SERVICE_HOST')
kubernetes_port = os.environ.get('KUBERNETES_PORT_443_TCP_PORT')
headers = {'Authorization': 'Bearer %s' % token}
url = "https://{host}:{port}/api/v1/namespaces/{namespace}/pods/".format(
host=kubernetes_host,
port=kubernetes_port,
namespace=namespace
)
resp = requests.get(url, headers=headers, verify=False)
json_to_parse = resp.json()
pods = []
namespaces = []
for pod in json_to_parse.get('items', []):
pod_name = pod['metadata']['name']
if "-celery" in pod_name:
pods.append(pod_name)
namespaces.append(namespace)
pool = ThreadPool(4)
pool.starmap(restart_kubernetes_pod, zip(namespaces, pods))
pool.close()
pool.join()
As you can see, it’s not too difficult to quickly build some interesting tooling for Kubernetes.