Cómo se monitorean las consultas de transmisión en Pyspark

Cómo se monitorean las consultas de transmisión en Pyspark

  • Big Data
  • mayo 27, 2022
  • No Comment
  • 85
  • 11 minutes read


El streaming es una de las técnicas de procesamiento de datos más importantes para el registro y el análisis. Proporciona a los usuarios y desarrolladores capacidades de procesamiento de datos en tiempo real y de baja latencia para el análisis y la activación de acciones. Sin embargo, monitorear las cargas de trabajo de transmisión de datos es un desafío porque los datos se procesan continuamente a medida que llegan. Debido a esta naturaleza constante del procesamiento de secuencias, es más difícil solucionar problemas durante el desarrollo y la producción sin métricas, alertas y paneles en tiempo real.

La transmisión estructurada en Apache Spark™ aborda el problema de la supervisión al proporcionar:

Anteriormente, la API de Observable faltaba en PySpark, lo que obligaba a los usuarios a usar la API de Scala para sus consultas de transmisión a fin de aprovechar la funcionalidad de alertas y paneles con otros sistemas externos. La falta de esta funcionalidad en Python se ha vuelto cada vez más crítica a medida que Python ha crecido en importancia, con casi el 70 % de los comandos de notebook que se ejecutan en Databricks en Python.

En Databricks Runtime 11, nos complace anunciar que la API de Observable ahora está disponible en PySpark. En esta publicación de blog, presentamos la API de Python Observable para transmisión estructurada junto con un ejemplo paso a paso de un escenario que agrega lógica de alerta a una consulta de transmisión.

API observable

Los desarrolladores ahora pueden enviar métricas de transmisión a sistemas externos, p. B. para alertas y paneles con métricas personalizadas mediante el uso de una combinación de la interfaz de escucha de consultas de transmisión y la API observable en PySpark. La interfaz Streaming Query Listener es una clase abstracta que se debe heredar y debe implementar todos los métodos como se muestra a continuación:

from pyspark.sql.streaming import StreamingQueryListener


class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` will always be
        latest no matter when this method is called. Therefore, the status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may be changed before/when you process the event.
        For example, you may find :class:`StreamingQuery`
        is terminated when you are processing `QueryProgressEvent`.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass


my_listener = MyListener()

Tenga en cuenta que todos funcionan de forma asíncrona.

  • StreamingQueryListener.onQueryStarted se activa cuando se inicia una consulta de transmisión, p. DataStreamWriter.start.
  • StreamingQueryListener.onQueryProgress se llama cuando se completa la ejecución de cada microlote.
  • StreamingQueryListener.onQueryTerminated llamado cuando finaliza la consulta, p. B. StreamingQuery.stop.

El oyente debe agregarse para ser activado a través de StreamingQueryManager y también se puede eliminar más tarde como se muestra a continuación:

spark.streams.addListener(my_listener)
spark.streams.removeListener(my_listener)

Para recopilar métricas personalizadas, deben agregarse a través de DataFrame.observe. Las métricas personalizadas se definen como funciones agregadas arbitrarias, como count("value") Como se muestra abajo.

df.observe("name", count(column), ...)

escenario de advertencia de error

En esta sección, describimos un ejemplo de un caso de uso del mundo real utilizando la API de Observable. Suponga que tiene un directorio que recibe constantemente nuevos archivos CSV de otro sistema y necesita transmitirlos. En este ejemplo, usamos un sistema de archivos local en aras de la simplicidad para que la API sea fácil de entender. Los siguientes fragmentos de código se pueden copiar y pegar en el pyspark Shell para ejecutar y probar.

Primero, importemos las clases y paquetes de Python requeridos, y luego creemos un directorio llamado my_csv_dir utilizado en este escenario.

import os
import shutil
import time
from pathlib import Path

from pyspark.sql.functions import count, col, lit
from pyspark.sql.streaming import StreamingQueryListener

# NOTE: replace `basedir` with the fused path, e.g., "/dbfs/tmp" in Databricks
# notebook.
basedir = os.getcwd()  # "/dbfs/tmp"

# My CSV files will be created in this directory later after cleaning 'my_csv_dir'
# directory up in case you already ran this example below.
my_csv_dir = os.path.join(basedir, "my_csv_dir")
shutil.rmtree(my_csv_dir, ignore_errors=True)
os.makedirs(my_csv_dir)

A continuación, definamos nuestro propio oyente de consultas de transmisión personalizado. El oyente advierte cuando hay demasiados registros incorrectos durante la ingesta de CSV para cada proceso. Si los registros con formato incorrecto representan más del 50 % del número total de registros procesados, imprimiremos un mensaje de registro. Sin embargo, en escenarios de producción, puede conectarse a los sistemas externos en lugar de solo imprimir.

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")


# Add my listener.
my_listener = MyListener()
spark.streams.addListener(my_listener)

En este ejemplo, para activar el oyente, lo insertamos antes de la consulta. Sin embargo, tenga en cuenta que puede agregar el oyente independientemente del inicio y el final de la consulta, ya que funcionan de forma asíncrona. Esto le permite adjuntar y separar sus consultas de transmisión en curso sin pausarlas.

Ahora comencemos una consulta de transmisión que incorporará los archivos. my_csv_dir Directorio. Durante el procesamiento, también controlamos la cantidad de registros incorrectos y registros procesados. La fuente de datos CSV guarda menos registros con formato incorrecto _corrupt_recordpor defecto, entonces contamos la columna para el número de registros incorrectos.

# Now, start a streaming query that monitors 'my_csv_dir' directory.
# Every time when there are new CSV files arriving here, we will process them.
my_csv = spark.readStream.schema(
    "my_key INT, my_val DOUBLE, _corrupt_record STRING"
).csv(Path(my_csv_dir).as_uri())
# `DataFrame.observe` computes the counts of processed and malformed records,
# and sends an event to the listener.
my_observed_csv = my_csv.observe(
    "metric",
    count(lit(1)).alias("cnt"),  # number of processed rows
    count(col("_corrupt_record")).alias("malformed"))  # number of malformed rows
my_query = my_observed_csv.writeStream.format(
    "console").queryName("My observer").start()

Ahora que tenemos definidas las funciones de consulta y alerta de transmisión, creemos archivos CSV para que puedan incluirse en el modo de transmisión:

# Now, we will write CSV data to be processed in a streaming manner on time.
# This CSV file is all well-formed.
with open(os.path.join(my_csv_dir, "my_csv_1.csv"), "w") as f:
    _ = f.write("1,1.1n")
    _ = f.write("123,123.123n")

time.sleep(5)  # Assume that another CSV file arrived in 5 seconds.

# Ouch! it has two malformed records out of 3. My observer query should alert it!
with open(os.path.join(my_csv_dir, "my_csv_error.csv"), "w") as f:
    _ = f.write("1,1.123n")
    _ = f.write("Ouch! malformed record!n")
    _ = f.write("Arrgggh!n")

time.sleep(5)  # OK, all done. Let's stop the query in 5 seconds.
my_query.stop()
spark.streams.removeListener(my_listener)

Aquí vemos que el inicio de la consulta, la finalización y los procesos están registrados correctamente. Debido a que los archivos CSV contienen dos registros incorrectos, la alerta se activa correctamente con el siguiente mensaje de error:

...
ALERT! Ouch! there are too many malformed records 2 out of 3!
...

Conclusión

Los usuarios de PySpark ahora pueden configurar sus métricas personalizadas y verlas a través de la interfaz de escucha de consultas de transmisión y la API observable. Puede adjuntar y desconectar dinámicamente esta lógica para ejecutar consultas según sea necesario. Esta característica satisface la necesidad de tableros, alertas e informes a otros sistemas externos.

La interfaz Streaming Query Listener y la API Observable están disponibles en DBR 11 Beta y se espera que estén disponibles en el futuro Apache Spark. Pruebe ambas características nuevas hoy en Databricks a través de DBR 11 Beta.

La interfaz Streaming Query Listener y la API Observable están disponibles en DBR 11 Beta y se espera que estén disponibles en el futuro Apache Spark.



Related post

¿Puede la adopción de EV escalar como lo hicieron los teléfonos móviles?

¿Puede la adopción de EV escalar como lo hicieron…

La transición hacia un futuro de energía limpia requiere la eliminación rápida de los vehículos convencionales y la introducción gradual de…
Los vehículos enchufables al 10,3 % en junio mientras las ventas de vehículos eléctricos en Italia ganan impulso

Los vehículos enchufables al 10,3 % en junio mientras…

El mercado automovilístico italiano está mostrando signos de recuperación y, como resultado, las matriculaciones de vehículos enchufables aumentaron hacia el final…
El Consorcio Acelerador de Telururo de Cadmio tiene como objetivo reducir los costos y acelerar el despliegue de tecnologías solares de película delgada bajas en carbono.

El Consorcio Acelerador de Telururo de Cadmio tiene como…

Esta semana, el Departamento de Energía de EE. UU. (DOE) anunció un nuevo consorcio de tres años destinado a acelerar el…

Leave a Reply

Tu dirección de correo electrónico no será publicada.