当 Java 遇上 Airflow:一次无法抗拒的深度编排之旅

点击箭头处“蓝色字”,关注我们哦!!


Airflow 是一个现代化的跨语言任务编排工具,使用它,你可以在 Java 项目中轻松实现复杂任务流的自动化,彻底告别手动管理任务的琐碎与低效。

一、为什么选择 Airflow 与 Java 搭配?

Java 作为一门 成熟且高性能的编程语言,在企业级开发中非常受欢迎。

Apache Airflow,则是一个专注于任务编排的工具,支持多种编程语言,并提供了灵活的任务调度能力。

将两者结合,可以在 分布式任务管理跨语言任务调用 上大显身手。

想象一下,你的系统需要每天处理定时任务,比如:爬取数据、清洗数据、生成报表,并触发其他服务执行后续动作。

如果没有合适的工具,开发和维护这样的任务流将异常困难

这时,Airflow 的出现就显得尤为重要!

二、准备工作:搭建 Airflow 和 Java 环境

在开始之前,我们需要完成以下准备工作:

1. 安装 Apache Airflow

使用 pip 安装 Airflow:

bash

pip install apache-airflow

配置好 Airflow 的工作目录 后,启动它的 Web 界面

bash

airflow db init
airflow webserver

注意:Airflow 强烈依赖 Python 环境,因此确保你的机器上已安装 Python 3.7 以上版本。

2. 配置 Java 项目

创建一个 Java Maven 项目,并在 pom.xml 中引入必要的依赖:

xml

<;dependencies>;
    <;!-- JSON 处理库 -->;
    <;dependency>;
        <;groupId>;com.fasterxml.jackson.core<;/groupId>;
        <;artifactId>;jackson-databind<;/artifactId>;
        <;version>;2.15.2<;/version>;
    <;/dependency>;
    <;!-- Apache HttpClient -->;
    <;dependency>;
        <;groupId>;org.apache.httpcomponents.client5<;/groupId>;
        <;artifactId>;httpclient5<;/artifactId>;
        <;version>;5.2.1<;/version>;
    <;/dependency>;
<;/dependencies>;

添加这些依赖后,记得刷新 Maven 项目,以确保所有依赖已正确加载。

3. 启用 Airflow REST API

Airflow 配置文件 airflow.cfg 中,找到 [api] 部分,将 auth_backend 设置为默认值:

ini

[api]
auth_backend = airflow.api.auth.backend.default

这将允许我们通过 REST API 与 Airflow 直接交互

三、编写第一个任务:用 Java 调用 Airflow

下面,我们将通过 Java 项目调用 Airflow 的任务 API,完成一个简单的任务编排。

1. 在 Airflow 中定义 DAG

Airflow 的任务是通过 DAG(有向无环图) 表示的。

我们先编写一个基础 DAG 文件,让它接收外部参数并运行:

python运行

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

# 定义一个简单的 Python 函数
def print_message(**kwargs):
    message = kwargs['dag_run'].conf.get('message', 'Hello from Airflow!')
    print(f"Task Message: {message}")

# 创建 DAG 实例
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),

with DAG('java_airflow_example', default_args=default_args, schedule_interval=None) as dag:
    task = PythonOperator(
        task_id='print_message',
        python_callable=print_message,
        provide_context=True
    )

将此文件保存为 dags/java_airflow_example.py,并重启 Airflow 服务

提示:确保文件路径正确,因为 Airflow 会自动扫描 dags/ 目录下的所有 DAG 文件。

2. 用 Java 调用 REST API

接下来,我们使用 Java 调用这个 DAG,并传递参数:

java

import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.core5.http.io.entity.StringEntity;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.HashMap;
import java.util.Map;

public class AirflowApiExample {

    public static void main(String[] args) {
        // Airflow API 的 URL
        String url = "http://localhost:8080/api/v1/dags/java_airflow_example/dagRuns";

        // 创建 HTTP 客户端
        try (CloseableHttpClient client = HttpClients.createDefault()) {
            // 创建 POST 请求
            HttpPost post = new HttpPost(url);

            // 设置请求头
            post.setHeader("Content-Type", "application/json");

            // 传递参数
            Map<;String, Object>; conf = new HashMap<;>;();
            conf.put("message", "Hello from Java!");

            Map<;String, Object>; payload = new HashMap<;>;();
            payload.put("conf", conf);

            // 将参数转换为 JSON
            ObjectMapper mapper = new ObjectMapper();
            String json = mapper.writeValueAsString(payload);

            // 设置请求体
            post.setEntity(new StringEntity(json));

            // 发送请求并获取响应
            try (CloseableHttpResponse response = client.execute(post)) {
                System.out.println("Response Code: " + response.getCode());
                System.out.println("Response: " + new String(response.getEntity().getContent().readAllBytes()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


运行此 Java 程序 后,Airflow 的 DAG 会被触发,并在日志中打印出 “Hello from Java!”

注意:确保 Airflow 服务已启动,并且 localhost:8080 是你的 Airflow 地址。

四、深入解析代码

1. 配置 REST URL

java_airflow_example 是我们之前定义的 DAG 名称

2. 传递 JSON 参数

使用 ObjectMapperJava 对象转换为 JSON 格式

Airflow 接受的参数格式非常严格,因此需要确保字段与 API 要求 完全匹配。

3. 错误处理

在生产环境中,建议为 HTTP 请求添加超时控制,并对响应结果进行更细致的 错误处理

五、总结与展望

通过本文,我们学习了如何结合 Java 和 Apache Airflow,实现现代化的任务编排和自动化。

Airflow 强大的跨语言能力灵活的任务调度特性,让它成为 Java 开发者的得力助手

希望这篇文章能对你有所启发!如果有任何疑问,欢迎留言讨论,我们一起探索更多可能性!