Integración nativa de Apache Airflow con Databricks SQL.

Integración nativa de Apache Airflow con Databricks SQL.

  • Big Data
  • abril 29, 2022
  • No Comment
  • 55
  • 8 minutes read


Nos complace anunciar una serie de mejoras en la compatibilidad con Apache Airflow para Databricks. Estas nuevas capacidades facilitan la creación de canalizaciones sólidas de datos y aprendizaje automático (ML) en el popular orquestador de código abierto. Con las últimas mejoras, como el nuevo DatabricksSqlOperator, los clientes ahora pueden usar Airflow para consultar e ingerir datos usando SQL estándar en Databricks, realizar análisis y tareas de ML en una computadora portátil, activar Delta Live Tables para transformar datos en Seahouse y mucho más. .

Apache Airflow es una plataforma popular y extensible para crear, programar y monitorear datos y canalizaciones de aprendizaje automático (conocidos como DAG en la jerga de Airflow) mediante programación usando Python. Airflow viene con una gran cantidad de operadores integrados que facilitan la interacción con todo, desde bases de datos hasta almacenamiento en la nube. Databricks es compatible con Airflow desde 2017, lo que permite a los usuarios de Airflow activar flujos de trabajo que combinan cuadernos, JAR y secuencias de comandos de Python en la plataforma Lakehouse de Databricks, que se adapta a los flujos de trabajo de aprendizaje automático y datos más exigentes del mundo.

Hagamos un recorrido por las nuevas características con una tarea del mundo real: crear una canalización de datos simple que cargue datos meteorológicos recién entrantes desde una API en una tabla delta sin usar cuadernos de Databricks para realizar esta tarea. A los efectos de esta publicación de blog, haremos todo en Azure, pero el proceso es casi idéntico en AWS y GCP. Además, realizaremos todos los pasos en un extremo de SQL, pero el proceso es bastante similar si prefiere usar un clúster de Databricks de uso general en su lugar. El último ejemplo de DAG se ve así en la interfaz de usuario de Airflow:

    Airflow DAG para ingerir datos en la tabla SQL de Databricks

En aras de la brevedad, omitiremos parte del código de esta publicación de blog. Puedes ver el código completo aquí.

Instalar y configurar Airflow

Esta publicación de blog asume que tiene Airflow 2.1.0 o una versión posterior instalada y ha configurado una conexión de Databricks. Instale la última versión del proveedor de Databricks para Apache Airflow:


pip install apache-airflow-providers-databricks

Crear una tabla para almacenar datos meteorológicos

Definimos el Airflow DAG para que se ejecute a diario. la primera tarea crear mesaejecuta una declaración SQL que crea una tabla llamada flujo de aire_tiempo en el cual Originalmente Esquema si la tabla aún no existe. Esta tarea demuestra DatabricksSqlOperator, que puede ejecutar cualquier instrucción SQL en el proceso de Databricks, incluidos los puntos de conexión de SQL.

with DAG(
        "load_weather_into_dbsql",
        start_date=days_ago(0),
        schedule_interval="@daily",
        default_args=default_args,
        catchup=False,
) as dag:
  table = "default.airflow_weather"
  schema = "date date, condition STRING, humidity double, precipitation double, " 
           "region STRING, temperature long, wind long, " 
           "next_days ARRAY<STRUCT>" 

  create_table = DatabricksSqlOperator(
    task_id="create_table",
    sql=[f"create table if not exists {table}({schema}) using delta"],
  )

Recupere datos meteorológicos de la API y cárguelos en el almacenamiento en la nube

A continuación, usaremos PythonOperator para realizar una solicitud a la API meteorológica y guardar los resultados en un archivo JSON en una ubicación temporal.

Una vez que tengamos los datos meteorológicos localmente, los cargaremos en el almacenamiento en la nube usando LocalFilesystemToWasbOperator ya que estamos usando Azure Storage. Por supuesto, Airflow también admite la carga de archivos en Amazon S3 o Google Cloud Storage:

get_weather_data = PythonOperator(task_id="get_weather_data",
                                  python_callable=get_weather_data,
                                  op_kwargs={"output_path": "/tmp/{{ds}}.json"},
                                  )

copy_data_to_adls = LocalFilesystemToWasbOperator(
  task_id='upload_weather_data',
  wasb_conn_id='wasbs-prod,
  file_path="/tmp/{{ds}}.json",
  container_name="test",
  blob_name="airflow/landing/{{ds}}.json",
)

Tenga en cuenta que arriba, la variable {{ds}} se usa para decirle a Airflow que reemplace la variable con la fecha de ejecución de la tarea programada, lo que nos brinda nombres de archivo consistentes y sin conflictos.

Registrar datos en una tabla

Finalmente, estamos listos para importar datos a una hoja de cálculo. Para hacer esto, usamos el práctico DatabricksCopyIntoOperator, que genera una instrucción COPY INTO SQL. El comando COPY INTO es una forma simple pero poderosa de incluir de manera idempotente archivos del almacenamiento en la nube en una tabla:

import_weather_data = DatabricksCopyIntoOperator(
    task_id="import_weather_data",
    expression_list="date::date, * except(date)",
    table_name=table,
    file_format="JSON",
     file_location="abfss://mycontainer@mystoreaccount.dfs.core.windows.net/airflow/landing/", files=["{{ds}}.json"])

¡Eso es todo! Ahora tenemos una tubería de datos confiable que inserta datos de una API en una tabla con solo unas pocas líneas de código.

Pero eso no es todo …

También nos complace anunciar mejoras que hacen que la integración de Airflow con Databricks sea muy sencilla.

  • Se actualizó DatabricksSubmitRunOperator para usar la última API de trabajos v2.1. Con la nueva API, es mucho más fácil configurar los controles de acceso para los trabajos enviados mediante DatabricksSubmitRunOperator, lo que permite a los desarrolladores o equipos de soporte acceder fácilmente a la interfaz de usuario y los registros del trabajo.
  • Airflow ahora puede activar canalizaciones de Delta Live Table.
  • Los DAG de Airflow ahora pueden pasar parámetros para tipos de tareas JAR.
  • Es posible actualizar los repositorios de Databricks a una rama o etiqueta específica para garantizar que los trabajos siempre usen la última versión del código.
  • En Azure, es posible usar tokens de Azure Active Directory en lugar de tokens de acceso personal (PAT). Por ejemplo, si Airflow se ejecuta en una máquina virtual de Azure con una identidad administrada, los operadores de Databricks podrían usar identidades administradas para autenticarse en Azure Databricks sin necesidad de un token PAT. Obtenga más información sobre estas y otras mejoras de autenticación aquí.

El futuro es brillante para los usuarios de Airflow en Databricks

Estamos entusiasmados con estas mejoras y emocionados de ver lo que la comunidad de Airflow está creando con Databricks. Nos encantaría escuchar sus comentarios sobre las funciones que deberíamos agregar a continuación.



Related post

Una guía completa para la gestión de las relaciones con los clientes

Una guía completa para la gestión de las relaciones…

Cuando se trata de administrar un negocio, una de las cosas más importantes es mantener contentos a sus clientes. No hay…
Nanotechnology Now – Comunicado de prensa: Nodos cuánticos con átomos calientes

Nanotechnology Now – Comunicado de prensa: Nodos cuánticos con…

Inicio > Prensa > Nodo cuántico con átomos calientes Una partícula de luz de la fuente de fotones individuales (abajo) se…
Esto lo ayuda a mantenerse fiel a las mejores prácticas de desarrollo estándar al desarrollar con cuadernos Databricks.

Esto lo ayuda a mantenerse fiel a las mejores…

Los portátiles son una forma popular de empezar a trabajar rápidamente con datos sin configurar un entorno complicado. Los autores de…

Leave a Reply

Tu dirección de correo electrónico no será publicada.