How we use Airflow to optimize our DevOps workflow.

How we use airflow to optimize our DevOps workflow
How we use airflow to optimize our DevOps workflow

Introduction

Apache Airflow is a powerful tool for automating workflows and managing complex processes effectively. In DevOps, tasks like token validation, Kubernetes configuration management, and updating secrets are often repetitive and time-consuming.

With Airflow, we replaced manual work with automated workflows, saving time and reducing errors. We used chatgpt to generate a lot of code for us to script out repetitive tasks - and then we used airflow to run those regularly.

In this blog, we’ll explore how we implemented Airflow to solve real-world challenges and improve our DevOps processes step by step.

What is Apache Airflow?

Before looking at our implementations, let’s take a moment to understand what Apache Airflow is and why it’s an essential tool for DevOps teams.

Apache Airflow is an open-source platform designed for orchestrating workflows. It allows you to define workflows as Directed Acyclic Graphs (DAGs), where each task is represented as a node, and dependencies between tasks form the edges. This structure makes it easy to visualize, schedule, and monitor even the most complex workflows.

Why did we choose Airflow for our workflows?

  • Flexible and Modular: Airflow’s modular design lets you adapt it to suit your needs with ease.
  • Seamless Integration: It connects effortlessly with cloud services and other DevOps tools.
  • Dependable Operations: Airflow effectively manages task dependencies and handles failures gracefully.

If you’re interested in a more detailed explanation of Apache Airflow, check out our in-depth blog: Apache Airflow Use Cases for DevOps Engineers

The diagram below provides a simple overview of how Apache Airflow operates.

Apache Airflow Architecture
Apache Airflow Architecture

How We Use Apache Airflow at KubeNine

Let’s take a closer look at some of the key challenges we’ve solved using Apache Airflow and the corresponding implementations.

1. GitHub Token Validation

The Problem:
We use Github tokens with additional privileges in our CICD pipeline. Expiry of these tokens can impact all our CI/CD pipelines and API interactions.

Our Solution:
We created two airflow jobs- one that updates the GITHUB token in repository variables periodically - and the other which validates the token daily. If a token is invalid or expired, it notifies our team on Slack so the issue can be addressed promptly.

Code Implementation:

  1. Validating Tokens:
    The function checks the GitHub token's validity by comparing the authenticated username with the expected username.
def check_github_token(**context):
    GITHUB_TOKEN = Variable.get("GITHUB_TOKEN")
    GITHUB_USERNAME = "expected-username"
    try:
        github = Github(GITHUB_TOKEN)
        user = github.get_user()
        username = user.login
        if username == GITHUB_USERNAME:
            print(f"GitHub token is valid. Authenticated as {username}.")
        else:
            message = (
                f"Expected username: {GITHUB_USERNAME}, but authenticated as: {username}."
            )
            context['ti'].xcom_push(key='slack_message', value=message)
            raise Exception("GitHub username mismatch.")
    except Exception as e:
        message = f"Error: {str(e)}"
        context['ti'].xcom_push(key='slack_message', value=message)
        raise
  1. Sending Notifications:
    The task sends a Slack notification if the token validation fails.
send_slack_notification_task = SlackWebhookOperator(
    task_id='send_slack_notification',
    slack_webhook_conn_id='slack_notifications',
    message="{{ ti.xcom_pull(task_ids='check_github_token', key='slack_message') }}",
    channel="#notifications",
    username="airflow",
    trigger_rule='one_failed',
)

2. Kubernetes KUBECONFIG Management

The Problem:

For connecting to our on-prem Kubernetes clusters we use KUBECONFIG file stored as a secret on github action variable. If the KUBECONFIG file expires then it can lead to all our on-prem deployments failing.

Our Solution:
We created an Airflow workflow to validate KUBECONFIG files daily. If an issue is detected, our team receives a Slack alert with the details.

We store the KUBECONFIG on airflow - and we have another job to add the KUBECONFIG to all relevant github repositories.

Code Implementation:

  1. Validating KUBECONFIG Files:
    This function decodes the KUBECONFIG, validates its contents, and connects to the Kubernetes cluster.
def check_kubeconfig_expiry(**kwargs):
    kubeconfig_base64 = Variable.get("KUBECONFIG")
    kubeconfig_content = base64.b64decode(kubeconfig_base64).decode("utf-8")
    with tempfile.NamedTemporaryFile(delete=False) as temp_file:
        kubeconfig_path = temp_file.name
        temp_file.write(kubeconfig_content.encode("utf-8"))
    try:
        config.load_kube_config(config_file=kubeconfig_path)
        v1 = client.CoreV1Api()
        nodes = v1.list_node()
        for node in nodes.items:
            print(f"Node: {node.metadata.name}, Status: {node.status.conditions[-1].type}")
    except ApiException as e:
        print(f"Failed to connect to Kubernetes cluster: {e}")
        raise
    finally:
        os.remove(kubeconfig_path)
  1. Slack Integration:
    Alerts notify the team if KUBECONFIG is invalid.
send_slack_message_task = SlackWebhookOperator(
    task_id='send_slack_message_task',
    slack_webhook_conn_id='slack_notifications',
    message="KUBECONFIG is expired or invalid. Update it to ensure uninterrupted cluster access.",
    channel="#notifications",
    username="airflow",
    trigger_rule='one_failed',
)

3. Secret Management for GitHub Repositories

The Problem:

Secrets stored on Github can expire and we’ll have no idea about it. And we get to know about the expiry only when things fail.

Why Automate This Task?

Github actions secrets expiring can block teams during critical times. If we can catch the expired tokens faster we can act faster.

The Solution:
Using Apache Airflow, we created a workflow that:

  1. Safely retrieves secrets stored in Airflow Variables.
  2. Dynamically updates secrets in the specified repositories.
  3. Allows team members to specify which repositories to update using the Airflow UI.
  4. Sends a notification once the process is complete or if any issues arise.

Code Implementation:

Here’s how we built the solution step by step:

  1. Dynamic Repository Selection Using Airflow Parameters:
    We added two parameters to the Airflow DAG:
  • repo_owner: To specify the GitHub organization or username.
  • repositories: To define the list of repositories that need updates
with DAG(
    'add_github_secrets_dag_using_airflow_ui',
    default_args=default_args,
    description='A DAG to add secrets to GitHub repositories',
    schedule_interval=None,  # Run manually via the Airflow UI
    params={
        'repo_owner': Param(
            default='kubenine',
            type='string',
            title='Repository Owner',
            description='GitHub username or organization name.'
        ),
        'repositories': Param(
            default=['repo1', 'repo2', 'repo3'],
            type='array',
            title='Repositories',
            description='List of repository names.'
        )
    }
) as dag:
  1. Fetching Secrets Securely:
    All secrets are stored in Airflow Variables, ensuring they remain secure and encrypted.
def get_secrets_from_airflow():
    secrets = {
        'DOCKERHUB_USERNAME': Variable.get("DOCKERHUB_USERNAME"),
        'DOCKERHUB_TOKEN': Variable.get("DOCKERHUB_TOKEN"),
        'KUBECONFIG': Variable.get("KUBECONFIG"), 
        'GITHUB_TOKEN': Variable.get("GITHUB_TOKEN"),
    }
    return secrets
  1. Triggering Updates via the Airflow UI:
    The workflow allows users to:
  • Specify the repo_owner and the list of repositories.
  • Trigger the DAG without needing to interact with the actual secrets.
  • Automatically update secrets in the selected repositories.

4. AWS Billing Updates with Airflow

The Problem:
We have multiple AWS accounts that we use for testing various things. Often, developers leave resources running for days on these test accounts, which leads to unnecessary wastage.

The Solution:
We created an Airflow workflow that:

  1. Collects AWS billing data using the Cost Explorer API.
  2. Breaks down the costs by service for better visibility.
  3. Sends daily updates to the team via Slack in a formatted message.

Code Implementation:

  1. Fetching AWS Billing Details:
    The workflow retrieves cost data grouped by service using the AWS Cost Explorer API.
def get_aws_cost():
    os.environ['AWS_ACCESS_KEY_ID'] = Variable.get('DIPCHAND_AWS_ACCESS_KEY_ID')
    os.environ['AWS_SECRET_ACCESS_KEY'] = Variable.get("DIPCHAND_AWS_SECRET_ACCESS_KEY")
    
    client = boto3.client('ce')
    today = datetime.today().strftime('%Y-%m-%d')
    start_of_month = datetime.today().replace(day=1).strftime('%Y-%m-%d')
    
    response = client.get_cost_and_usage(
        TimePeriod={'Start': start_of_month, 'End': today},
        Granularity='MONTHLY',
        Metrics=['AmortizedCost'],
        GroupBy=[{'Type': 'DIMENSION', 'Key': 'SERVICE'}]
    )
    
    total_cost = 0.0
    cost_by_service = {}
    for group in response['ResultsByTime'][0]['Groups']:
        service = group['Keys'][0]
        cost = float(group['Metrics']['AmortizedCost']['Amount'])
        cost_by_service[service] = round(cost, 2)
        total_cost += cost

    return {'TotalCost': round(total_cost, 2), 'CostByService': cost_by_service}
  1. Sending the Notification to Slack:
    This task sends the message directly to the team’s Slack channel.
send_slack_message_task = SlackWebhookOperator(
    task_id='send_slack_message_task',
    slack_webhook_conn_id='slack_airflow_notifications',
    message="{{ task_instance.xcom_pull(task_ids='send_slack_notification_task') }}",
    channel="#aws-cost-notifications",
    username="airflow",
)

Here’s an example of the Slack notification generated by this workflow:


Additional Ideas for Automating with Apache Airflow

Now that we’ve explored some of the workflows we use at our company, here are a few more ideas you can implement with Apache Airflow to optimize your processes:

  1. Automatically Stopping AWS Testing Instances at Night
  • Identify EC2 instances with tags like Environment: Testing and schedule them to stop during off-hours when they are not needed, reducing cloud costs significantly.
  1. Scaling Resources Based on Usage Trends
  • Use Airflow to monitor resource usage (e.g., CPU, memory) and scale services dynamically during peak or off-peak hours to optimize costs and performance.
  1. Database Backup and Rotation
  • Schedule automatic database backups, verify their integrity, and rotate old backups to keep storage usage efficient while maintaining disaster recovery readiness.
  1. CI/CD Workflow Monitoring
  • Monitor your CI/CD pipelines for failed builds or deployments and send real-time alerts to your team for immediate resolution.
  1. Audit and Clean-Up Unused Cloud Resources
  • Identify unused resources like idle instances, old snapshots, or unattached volumes and clean them up to avoid unnecessary expenses.
  1. Periodic Security Checks
  • Run security scans across your infrastructure, checking for misconfigurations or vulnerabilities, and send detailed reports via Slack or email.

Conclusion

Apache Airflow has simplified how we manage and improve our DevOps workflows at KubeNine. By automating tasks like token validation, Kubernetes configuration checks, and secret management, we’ve reduced manual effort and minimized errors.

If you want to simplify your workflows and focus on building great products, KubeNine can take care of automation and infrastructure for you. With our expertise, your systems will run efficiently, allowing you to focus on what matters most.