Lectura PySpark.Parquet()

Lectura Pyspark Parquet



En PySpark, la función write.parquet() escribe el DataFrame en el archivo de parquet y read.parquet() lee el archivo de parquet en PySpark DataFrame o cualquier otra fuente de datos. Para procesar las columnas en Apache Spark de manera rápida y eficiente, necesitamos comprimir los datos. La compresión de datos guarda nuestra memoria y todas las columnas se convierten en nivel plano. Eso significa que existe el almacenamiento de nivel de columna plana. El archivo que los almacena se conoce como archivo PARQUET.

En esta guía, nos centraremos principalmente en leer/cargar el archivo de parquet en PySpark DataFrame/SQL usando la función read.parquet() que está disponible en la clase pyspark.sql.DataFrameReader.

Tema de contenidos:







Obtener el archivo de parquet



Lea el archivo Parquet en el marco de datos PySpark



Lea el archivo Parquet en PySpark SQL





Pyspark.sql.DataFrameReader.parquet()

Esta función se utiliza para leer el archivo de parquet y cargarlo en PySpark DataFrame. Toma la ruta/nombre de archivo del archivo de parquet. Simplemente podemos usar la función read.parquet() ya que esta es la función genérica.

Sintaxis:



Veamos la sintaxis de read.parquet():

spark_app.read.parquet(nombre_de_archivo.parquet/ruta)

Primero, instale el módulo PySpark usando el comando pip:

pip instalar pyspark

Obtener el archivo de parquet

Para leer un archivo de parquet, necesita los datos en los que se genera el archivo de parquet a partir de esos datos. En esta parte, veremos cómo generar un archivo de parquet desde PySpark DataFrame.

Creemos un PySpark DataFrame con 5 registros y escribamos esto en el archivo de parquet 'industry_parquet'.

importar pyspark

desde pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Sugerencia de Linux' ).getOrCreate()

# crear el marco de datos que almacena los detalles de la industria

industry_df = linuxhint_spark_app.createDataFrame([Fila(Tipo= 'Agricultura' ,Área= 'EE.UU' ,
Calificación = 'Caliente' ,Total_empleados= 100 ),

Fila(Tipo= 'Agricultura' ,Área= 'India' ,Calificación= 'Caliente' ,Total_empleados= 200 ),

Fila(Tipo= 'Desarrollo' ,Área= 'EE.UU' ,Calificación= 'Cálido' ,Total_empleados= 100 ),

Fila(Tipo= 'Educación' ,Área= 'EE.UU' ,Calificación= 'Fresco' ,Total_empleados= 400 ),

Fila(Tipo= 'Educación' ,Área= 'EE.UU' ,Calificación= 'Cálido' ,Total_empleados= 20 )

])

# Marco de datos real

industry_df.show()

# Escriba el archivo industry_df en el archivo de parquet

industria_df.coalesce( 1 ).escribir.parquet( 'industria_parquet' )

Producción:

Este es el DataFrame que contiene 5 registros.

Se crea un archivo de parquet para el DataFrame anterior. Aquí, nuestro nombre de archivo con una extensión es 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet'. Usamos este archivo en todo el tutorial.

Lea el archivo Parquet en el marco de datos PySpark

Tenemos el archivo de parquet. Leamos este archivo usando la función read.parquet() y cárguelo en PySpark DataFrame.

importar pyspark

desde pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Sugerencia de Linux' ).getOrCreate()

# Lea el archivo de parquet en el objeto dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Mostrar el dataframe_from_parquet-DataFrame

dataframe_from_parquet.show()

Producción:

Mostramos el DataFrame usando el método show() que se creó a partir del archivo de parquet.

Consultas SQL con archivo Parquet

Después de cargar en DataFrame, es posible crear las tablas SQL y mostrar los datos que están presentes en DataFrame. Necesitamos crear una VISTA TEMPORAL y usar los comandos SQL para devolver los registros del DataFrame que se crea a partir del archivo de parquet.

Ejemplo 1:

Cree una vista temporal llamada 'Sectores' y use el comando SELECCIONAR para mostrar los registros en el marco de datos. Puedes referirte a esto tutorial que explica cómo crear una VISTA en Spark – SQL.

importar pyspark

desde pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Sugerencia de Linux' ).getOrCreate()

# Lea el archivo de parquet en el objeto dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Crear vista desde el archivo de parquet anterior llamado - 'Sectores'

dataframe_from_parquet.createOrReplaceTempView( 'Sectores' )

# Consulta para mostrar todos los registros de los Sectores

linuxhint_spark_app.sql( 'seleccione * de Sectores' ).espectáculo()

Producción:

Ejemplo 2:

Usando la VISTA anterior, escriba la consulta SQL:

  1. Para mostrar todos los registros de los Sectores que pertenecen a “India”.
  2. Para visualizar todos los registros de los Sectores con un empleado mayor a 100.
# Consulta para mostrar todos los registros de los Sectores pertenecientes a 'India'.

linuxhint_spark_app.sql( 'seleccione * de Sectores donde Area='India'' ).espectáculo()

# Consulta para mostrar todos los registros de los Sectores con empleados mayores de 100

linuxhint_spark_app.sql( 'seleccione * de Sectores donde Total_employees>100' ).espectáculo()

Producción:

Solo hay un registro con área que es 'India' y dos registros con empleados que es mayor a 100.

Lea el archivo Parquet en PySpark SQL

Primero, necesitamos crear una VISTA usando el comando CREAR. Usando la palabra clave 'ruta' dentro de la consulta SQL, podemos leer el archivo de parquet en Spark SQL. Después de la ruta, debemos especificar el nombre de archivo/ubicación del archivo.

Sintaxis:

chispa_aplicación.sql( 'CREAR VISTA TEMPORAL view_name UTILIZANDO OPCIONES de parquet (ruta' nombre_archivo.parquet ')' )

Ejemplo 1:

Cree una vista temporal llamada 'Sector2' y lea el archivo de parquet en ella. Usando la función sql(), escriba la consulta de selección para mostrar todos los registros que están presentes en la vista.

importar pyspark

desde pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Sugerencia de Linux' ).getOrCreate()

# Lea el archivo de parquet en Spark-SQL

linuxhint_spark_app.sql( 'CREAR VISTA TEMPORAL Sector2 UTILIZANDO OPCIONES de parquet (ruta' parte-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )

# Consulta para mostrar todos los registros del Sector2

linuxhint_spark_app.sql( 'seleccione * del Sector2' ).espectáculo()

Producción:

Ejemplo 2:

Use la VISTA anterior y escriba la consulta para mostrar todos los registros con la calificación de 'Hot' o 'Cool'.

# Consulta para mostrar todos los registros del Sector2 con calificación: caliente o frío.

linuxhint_spark_app.sql( 'seleccione * de Sector2 donde Calificación = 'Hot' O Calificación = 'Cool'' ).espectáculo()

Producción:

Hay tres registros con la calificación de 'Hot' o 'Cool'.

Conclusión

En PySpark, la función write.parquet() escribe el DataFrame en el archivo de parquet. La función read.parquet() lee el archivo de parquet en PySpark DataFrame o cualquier otra fuente de datos. Aprendimos cómo leer el archivo de parquet en PySpark DataFrame y en la tabla PySpark. Como parte de este tutorial, también discutimos cómo crear las tablas desde PySpark DataFrame y filtrar los datos usando la cláusula WHERE.