Cómo leer y escribir datos de una tabla en PySpark

Como Leer Y Escribir Datos De Una Tabla En Pyspark



El procesamiento de datos en PySpark es más rápido si los datos se cargan en forma de tabla. Con esto, usando las Expresiones SQl, el procesamiento será rápido. Por lo tanto, convertir PySpark DataFrame/RDD en una tabla antes de enviarlo para su procesamiento es el mejor enfoque. Hoy, veremos cómo leer los datos de la tabla en PySpark DataFrame, escribir PySpark DataFrame en la tabla e insertar un nuevo DataFrame en la tabla existente usando las funciones integradas. ¡Vamos!

Pyspark.sql.DataFrameWriter.saveAsTable()

Primero, veremos cómo escribir el PySpark DataFrame existente en la tabla usando la función write.saveAsTable(). Toma el nombre de la tabla y otros parámetros opcionales como modos, partionBy, etc., para escribir el DataFrame en la tabla. Se almacena como un archivo de parquet.

Sintaxis:







dataframe_obj.write.saveAsTable(ruta/Nombre_tabla,modo,particiónPor,…)
  1. Table_name es el nombre de la tabla que se crea a partir de dataframe_obj.
  2. Podemos agregar/sobrescribir los datos de la tabla usando el parámetro de modo.
  3. La particiónBy toma las columnas únicas/múltiples para crear particiones basadas en valores en estas columnas proporcionadas.

Ejemplo 1:

Cree un marco de datos PySpark con 5 filas y 4 columnas. Escriba este marco de datos en una tabla llamada 'Agri_Table1'.



importar pyspark

desde pyspark.sql importar SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Sugerencia de Linux' ).getOrCreate()

# datos agrícolas con 5 filas y 5 columnas

agrícola =[{ 'Tipo de suelo' : 'Negro' , 'Irrigation_availability' : 'No' , 'Hectáreas' : 2500 , 'Soil_status' : 'Seco' ,
'País' : 'EE.UU' },

{ 'Tipo de suelo' : 'Negro' , 'Irrigation_availability' : 'Sí' , 'Hectáreas' : 3500 , 'Soil_status' : 'Húmedo' ,
'País' : 'India' },

{ 'Tipo de suelo' : 'Rojo' , 'Irrigation_availability' : 'Sí' , 'Hectáreas' : 210 , 'Soil_status' : 'Seco' ,
'País' : 'REINO UNIDO' },

{ 'Tipo de suelo' : 'Otro' , 'Irrigation_availability' : 'No' , 'Hectáreas' : 1000 , 'Soil_status' : 'Húmedo' ,
'País' : 'EE.UU' },

{ 'Tipo de suelo' : 'Arena' , 'Irrigation_availability' : 'No' , 'Hectáreas' : 500 , 'Soil_status' : 'Seco' ,
'País' : 'India' }]

 

# crear el marco de datos a partir de los datos anteriores

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Escriba el marco de datos anterior en la tabla.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )

Producción:







Podemos ver que se crea un archivo de parquet con los datos de PySpark anteriores.



Ejemplo 2:

Considere el DataFrame anterior y escriba 'Agri_Table2' en la tabla dividiendo los registros según los valores en la columna 'País'.

# Escriba el DataFrame anterior en la tabla con el parámetro particiónBy

agri_df.write.saveAsTable( 'Agri_Table2' ,particiónPor=[ 'País' ])

Producción:

Hay tres valores únicos en la columna 'País': 'India', 'Reino Unido' y 'EE. UU.'. Entonces, se crean tres particiones. Cada partición contiene los archivos de parquet.

Pyspark.sql.DataFrameReader.table()

Carguemos la tabla en PySpark DataFrame usando la función spark.read.table(). Solo toma un parámetro que es el nombre de la ruta/tabla. Carga directamente la tabla en PySpark DataFrame y todas las funciones de SQL que se aplican a PySpark DataFrame también se pueden aplicar en este DataFrame cargado.

Sintaxis:

spark_app.read.table(ruta/'Nombre_tabla')

En este escenario, usamos la tabla anterior que se creó a partir de PySpark DataFrame. Asegúrese de que necesita implementar los fragmentos de código del escenario anterior en su entorno.

Ejemplo:

Cargue la tabla 'Agri_Table1' en el DataFrame llamado 'loaded_data'.

datos_cargados = linuxhint_spark_app.read.table( 'Agri_Tabla1' )

loaded_data.show()

Producción:

Podemos ver que la tabla está cargada en PySpark DataFrame.

Ejecutando las Consultas SQL

Ahora, ejecutamos algunas consultas SQL en el DataFrame cargado usando la función spark.sql().

# Utilice el comando SELECCIONAR para mostrar todas las columnas de la tabla anterior.

linuxhint_spark_app.sql( 'SELECCIONAR * de Agri_Table1' ).espectáculo()

# Dónde cláusula

linuxhint_spark_app.sql( 'SELECCIONE * de Agri_Table1 WHERE Soil_status='Dry' ' ).espectáculo()

linuxhint_spark_app.sql( 'SELECCIONE * de Agri_Table1 DONDE Acres > 2000 ' ).espectáculo()

Producción:

  1. La primera consulta muestra todas las columnas y registros del DataFrame.
  2. La segunda consulta muestra los registros basados ​​en la columna 'Soil_status'. Solo hay tres registros con el elemento 'Seco'.
  3. La última consulta devuelve dos registros con 'Acres' que son mayores que 2000.

Pyspark.sql.DataFrameWriter.insertInto()

Usando la función insertInto(), podemos agregar el DataFrame a la tabla existente. Podemos usar esta función junto con selectExpr() para definir los nombres de las columnas y luego insertarla en la tabla. Esta función también toma tableName como parámetro.

Sintaxis:

DataFrame_obj.write.insertInto('Nombre_tabla')

En este escenario, usamos la tabla anterior que se creó a partir de PySpark DataFrame. Asegúrese de que necesita implementar los fragmentos de código del escenario anterior en su entorno.

Ejemplo:

Cree un nuevo DataFrame con dos registros e insértelos en la tabla 'Agri_Table1'.

importar pyspark

desde pyspark.sql importar SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Sugerencia de Linux' ).getOrCreate()

# datos agrícolas con 2 filas

agrícola =[{ 'Tipo de suelo' : 'Arena' , 'Irrigation_availability' : 'No' , 'Hectáreas' : 2500 , 'Soil_status' : 'Seco' ,
'País' : 'EE.UU' },

{ 'Tipo de suelo' : 'Arena' , 'Irrigation_availability' : 'No' , 'Hectáreas' : 1200 , 'Soil_status' : 'Húmedo' ,
'País' : 'Japón' }]

# crear el marco de datos a partir de los datos anteriores

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# escribir.insertarEn()

agri_df2.selectExpr( 'Hectáreas' , 'País' , 'Irrigation_availability' , 'Tipo de suelo' ,
'Estado_del_suelo' ).escribir.insertarEn( 'Agri_Table1' )

# Mostrar la Agri_Table1 final

linuxhint_spark_app.sql( 'SELECCIONAR * de Agri_Table1' ).espectáculo()

Producción:

Ahora, el número total de filas que están presentes en el DataFrame es 7.

Conclusión

Ahora comprende cómo escribir PySpark DataFrame en la tabla mediante la función write.saveAsTable(). Toma el nombre de la tabla y otros parámetros opcionales. Luego, cargamos esta tabla en PySpark DataFrame usando la función spark.read.table(). Solo toma un parámetro que es el nombre de la ruta/tabla. Si desea agregar el nuevo DataFrame a la tabla existente, use la función insertInto().