PySpark Pandas_Udf()

Pyspark Pandas Udf



Es posible transformar PySpark DataFrame usando la función pandas_udf(). Es una función definida por el usuario que se aplica en PySpark DataFrame con flecha. Podemos realizar las operaciones vectorizadas usando pandas_udf(). Se puede implementar pasando esta función como decorador. Profundicemos en esta guía para conocer la sintaxis, los parámetros y diferentes ejemplos.

Tema de contenidos:

Si desea obtener información sobre PySpark DataFrame y la instalación del módulo, siga este artículo .







Pyspark.sql.functions.pandas_udf()

El pandas_udf () está disponible en el módulo sql.functions en PySpark, que se puede importar usando la palabra clave 'from'. Se utiliza para realizar las operaciones vectorizadas en nuestro PySpark DataFrame. Esta función se implementa como un decorador pasando tres parámetros. Después de eso, podemos crear una función definida por el usuario que devuelva los datos en formato vectorial (como si usáramos series/NumPy para esto) usando una flecha. Dentro de esta función, podemos devolver el resultado.



Estructura y sintaxis:



Primero, veamos la estructura y sintaxis de esta función:

@pandas_udf(tipo de datos)
def function_name(operación) -> convert_format:
declaración de devolución

Aquí, function_name es el nombre de nuestra función definida. El tipo de datos especifica el tipo de datos que devuelve esta función. Podemos devolver el resultado usando la palabra clave 'return'. Todas las operaciones se realizan dentro de la función con la asignación de flecha.





Pandas_udf (Función y Tipo de Retorno)

  1. El primer parámetro es la función definida por el usuario que se le pasa.
  2. El segundo parámetro se usa para especificar el tipo de datos de retorno de la función.

Datos:

En toda esta guía, usamos solo un PySpark DataFrame para la demostración. Todas las funciones definidas por el usuario que definimos se aplican en este PySpark DataFrame. Asegúrese de crear este DataFrame en su entorno primero después de la instalación de PySpark.



importar pyspark

desde pyspark.sql importar SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Sugerencia de Linux' ).getOrCreate()

desde pyspark.sql.functions importar pandas_udf

desde pyspark.sql.types importar *

importar pandas como panda

# detalles vegetales

verdura =[{ 'tipo' : 'verdura' , 'nombre' : 'tomate' , 'ubicar_país' : 'EE.UU' , 'cantidad' : 800 },

{ 'tipo' : 'fruta' , 'nombre' : 'banana' , 'ubicar_país' : 'PORCELANA' , 'cantidad' : 20 },

{ 'tipo' : 'verdura' , 'nombre' : 'tomate' , 'ubicar_país' : 'EE.UU' , 'cantidad' : 800 },

{ 'tipo' : 'verdura' , 'nombre' : 'Mango' , 'ubicar_país' : 'JAPÓN' , 'cantidad' : 0 },

{ 'tipo' : 'fruta' , 'nombre' : 'limón' , 'ubicar_país' : 'INDIA' , 'cantidad' : 1700 },

{ 'tipo' : 'verdura' , 'nombre' : 'tomate' , 'ubicar_país' : 'EE.UU' , 'cantidad' : 1200 },

{ 'tipo' : 'verdura' , 'nombre' : 'Mango' , 'ubicar_país' : 'JAPÓN' , 'cantidad' : 0 },

{ 'tipo' : 'fruta' , 'nombre' : 'limón' , 'ubicar_país' : 'INDIA' , 'cantidad' : 0 }

]

# crear el marco de datos de mercado a partir de los datos anteriores

market_df = linuxhint_spark_app.createDataFrame(vegetal)

market_df.show()

Producción:

Aquí, creamos este DataFrame con 4 columnas y 8 filas. Ahora, usamos pandas_udf() para crear las funciones definidas por el usuario y aplicarlas a estas columnas.

Pandas_udf() con diferentes tipos de datos

En este escenario, creamos algunas funciones definidas por el usuario con pandas_udf() y las aplicamos en columnas y mostramos los resultados usando el método select(). En cada caso, usamos pandas.Series mientras realizamos las operaciones vectorizadas. Esto considera los valores de la columna como una matriz unidimensional y la operación se aplica en la columna. En el propio decorador, especificamos el tipo de retorno de la función.

Ejemplo 1: Pandas_udf() con tipo de cadena

Aquí, creamos dos funciones definidas por el usuario con el tipo de retorno de cadena para convertir los valores de columna de tipo de cadena a mayúsculas y minúsculas. Finalmente, aplicamos estas funciones en las columnas 'tipo' y 'localizar_país'.

# Convertir columna de tipo a mayúsculas con pandas_udf

@pandas_udf(TipoDeCadena())

def type_upper_case(i: panda.Series) -> panda.Series:

volver i.str.superior()

# Convierte la columna de localizar_país a minúsculas con pandas_udf

@pandas_udf(TipoDeCadena())

def country_lower_case(i: panda.Series) -> panda.Series:

devolver i.str.inferior()

# Mostrar las columnas usando select()

market_df.select( 'tipo' ,tipo_mayúsculas( 'tipo' ), 'ubicar_país' ,
país_minúsculas( 'ubicar_país' )).espectáculo()

Producción:

Explicación:

La función StringType() está disponible en el módulo pyspark.sql.types. Ya importamos este módulo al crear PySpark DataFrame.

  1. Primero, UDF (función definida por el usuario) devuelve las cadenas en mayúsculas usando la función str.upper(). El str.upper() está disponible en la estructura de datos de la serie (ya que estamos convirtiendo a series con una flecha dentro de la función) que convierte la cadena dada a mayúsculas. Finalmente, esta función se aplica a la columna 'tipo' que se especifica dentro del método select(). Anteriormente, todas las cadenas en la columna de tipo estaban en minúsculas. Ahora, se cambian a mayúsculas.
  2. En segundo lugar, UDF devuelve las cadenas en mayúsculas mediante la función str.lower(). El str.lower() está disponible en la estructura de datos de la serie que convierte la cadena dada a minúsculas. Finalmente, esta función se aplica a la columna 'tipo' que se especifica dentro del método select(). Anteriormente, todas las cadenas en la columna de tipo estaban en mayúsculas. Ahora, se cambian a minúsculas.

Ejemplo 2: Pandas_udf() con tipo entero

Vamos a crear una UDF que convierta la columna de enteros PySpark DataFrame en la serie Pandas y agreguemos 100 a cada valor. Pase la columna 'cantidad' a esta función dentro del método select().

# Añadir 100

@pandas_udf(TipoEntero())

def add_100(i: panda.Serie) -> panda.Serie:

volver i+ 100

# Pase la columna de cantidad a la función anterior y muestre.

market_df.select( 'cantidad' ,añadir_100( 'cantidad' )).espectáculo()

Producción:

Explicación:

Dentro de la UDF, iteramos todos los valores y los convertimos en Series. Después de eso, sumamos 100 a cada valor de la Serie. Finalmente, le pasamos la columna “cantidad” a esta función y vemos que a todos los valores se suma 100.

Pandas_udf() con diferentes tipos de datos usando Groupby() y Agg()

Veamos los ejemplos para pasar la UDF a las columnas agregadas. Aquí, los valores de las columnas se agrupan primero mediante la función groupby() y la agregación se realiza mediante la función agg(). Pasamos nuestro UDF dentro de esta función agregada.

Sintaxis:

pyspark_dataframe_object.groupby( 'columna_agrupación' ).agg(UDF
(pyspark_dataframe_object[ 'columna' ]))

Aquí, los valores en la columna de agrupación se agrupan primero. Luego, la agregación se realiza sobre cada dato agrupado con respecto a nuestra UDF.

Ejemplo 1: Pandas_udf() con media agregada()

Aquí, creamos una función definida por el usuario con un tipo de retorno flotante. Dentro de la función, calculamos el promedio usando la función mean(). Este UDF se pasa a la columna 'cantidad' para obtener la cantidad promedio para cada tipo.

# devuelve la media/promedio

@pandas_udf( 'flotar' )

def promedio_función(i: panda.Series) -> float:

volver i.significa()

# Pase la columna de cantidad a la función agrupando la columna de tipo.

mercado_df.groupby( 'tipo' ).agg(función_promedio(market_df[ 'cantidad' ])).espectáculo()

Producción:

Estamos agrupando según los elementos de la columna 'tipo'. Se forman dos grupos: 'fruta' y 'verdura'. Para cada grupo, se calcula y devuelve la media.

Ejemplo 2: Pandas_udf() con Aggregate Max() y Min()

Aquí, creamos dos funciones definidas por el usuario con el tipo de retorno entero (int). La primera UDF devuelve el valor mínimo y la segunda UDF devuelve el valor máximo.

# pandas_udf que devuelven el valor mínimo

@pandas_udf( 'En t' )

def min_(i: panda.Series) -> int:

volver i.min()

# pandas_udf que devuelven el valor máximo

@pandas_udf( 'En t' )

def max_(i: panda.Series) -> int:

devolver i.max()

# Pase la columna de cantidad a min_ pandas_udf agrupando localizar_país.

mercado_df.groupby( 'ubicar_país' ).agg(min_(mercado_df[ 'cantidad' ])).espectáculo()

# Pase la columna de cantidad a max_ pandas_udf agrupando localizar_país.

mercado_df.groupby( 'ubicar_país' ).agg(max_(mercado_df[ 'cantidad' ])).espectáculo()

Producción:

Para devolver valores mínimos y máximos, utilizamos las funciones min() y max() en el tipo de devolución de las UDF. Ahora, agrupamos los datos en la columna “locate_country”. Se forman cuatro grupos (“CHINA”, “INDIA”, “JAPÓN”, “EE.UU.”). Para cada grupo, devolvemos la cantidad máxima. Del mismo modo, devolvemos la cantidad mínima.

Conclusión

Básicamente, pandas_udf () se usa para realizar las operaciones vectorizadas en nuestro PySpark DataFrame. Hemos visto cómo crear pandas_udf() y aplicarlo al PySpark DataFrame. Para una mejor comprensión, discutimos los diferentes ejemplos considerando todos los tipos de datos (cadena, flotante e entero). Puede ser posible usar pandas_udf() con groupby() a través de la función agg().