Pyspark.sql.DataFrameWriter.saveAsTable()
Primero, veremos cómo escribir el PySpark DataFrame existente en la tabla usando la función write.saveAsTable(). Toma el nombre de la tabla y otros parámetros opcionales como modos, partionBy, etc., para escribir el DataFrame en la tabla. Se almacena como un archivo de parquet.
Sintaxis:
dataframe_obj.write.saveAsTable(ruta/Nombre_tabla,modo,particiónPor,…)
- Table_name es el nombre de la tabla que se crea a partir de dataframe_obj.
- Podemos agregar/sobrescribir los datos de la tabla usando el parámetro de modo.
- La particiónBy toma las columnas únicas/múltiples para crear particiones basadas en valores en estas columnas proporcionadas.
Ejemplo 1:
Cree un marco de datos PySpark con 5 filas y 4 columnas. Escriba este marco de datos en una tabla llamada 'Agri_Table1'.
importar pyspark
desde pyspark.sql importar SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Sugerencia de Linux' ).getOrCreate()
# datos agrícolas con 5 filas y 5 columnas
agrícola =[{ 'Tipo de suelo' : 'Negro' , 'Irrigation_availability' : 'No' , 'Hectáreas' : 2500 , 'Soil_status' : 'Seco' ,
'País' : 'EE.UU' },
{ 'Tipo de suelo' : 'Negro' , 'Irrigation_availability' : 'Sí' , 'Hectáreas' : 3500 , 'Soil_status' : 'Húmedo' ,
'País' : 'India' },
{ 'Tipo de suelo' : 'Rojo' , 'Irrigation_availability' : 'Sí' , 'Hectáreas' : 210 , 'Soil_status' : 'Seco' ,
'País' : 'REINO UNIDO' },
{ 'Tipo de suelo' : 'Otro' , 'Irrigation_availability' : 'No' , 'Hectáreas' : 1000 , 'Soil_status' : 'Húmedo' ,
'País' : 'EE.UU' },
{ 'Tipo de suelo' : 'Arena' , 'Irrigation_availability' : 'No' , 'Hectáreas' : 500 , 'Soil_status' : 'Seco' ,
'País' : 'India' }]
# crear el marco de datos a partir de los datos anteriores
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# Escriba el marco de datos anterior en la tabla.
agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )
Producción:
Podemos ver que se crea un archivo de parquet con los datos de PySpark anteriores.
Ejemplo 2:
Considere el DataFrame anterior y escriba 'Agri_Table2' en la tabla dividiendo los registros según los valores en la columna 'País'.
# Escriba el DataFrame anterior en la tabla con el parámetro particiónByagri_df.write.saveAsTable( 'Agri_Table2' ,particiónPor=[ 'País' ])
Producción:
Hay tres valores únicos en la columna 'País': 'India', 'Reino Unido' y 'EE. UU.'. Entonces, se crean tres particiones. Cada partición contiene los archivos de parquet.
Pyspark.sql.DataFrameReader.table()
Carguemos la tabla en PySpark DataFrame usando la función spark.read.table(). Solo toma un parámetro que es el nombre de la ruta/tabla. Carga directamente la tabla en PySpark DataFrame y todas las funciones de SQL que se aplican a PySpark DataFrame también se pueden aplicar en este DataFrame cargado.
Sintaxis:
spark_app.read.table(ruta/'Nombre_tabla')En este escenario, usamos la tabla anterior que se creó a partir de PySpark DataFrame. Asegúrese de que necesita implementar los fragmentos de código del escenario anterior en su entorno.
Ejemplo:
Cargue la tabla 'Agri_Table1' en el DataFrame llamado 'loaded_data'.
datos_cargados = linuxhint_spark_app.read.table( 'Agri_Tabla1' )loaded_data.show()
Producción:
Podemos ver que la tabla está cargada en PySpark DataFrame.
Ejecutando las Consultas SQL
Ahora, ejecutamos algunas consultas SQL en el DataFrame cargado usando la función spark.sql().
# Utilice el comando SELECCIONAR para mostrar todas las columnas de la tabla anterior.linuxhint_spark_app.sql( 'SELECCIONAR * de Agri_Table1' ).espectáculo()
# Dónde cláusula
linuxhint_spark_app.sql( 'SELECCIONE * de Agri_Table1 WHERE Soil_status='Dry' ' ).espectáculo()
linuxhint_spark_app.sql( 'SELECCIONE * de Agri_Table1 DONDE Acres > 2000 ' ).espectáculo()
Producción:
- La primera consulta muestra todas las columnas y registros del DataFrame.
- La segunda consulta muestra los registros basados en la columna 'Soil_status'. Solo hay tres registros con el elemento 'Seco'.
- La última consulta devuelve dos registros con 'Acres' que son mayores que 2000.
Pyspark.sql.DataFrameWriter.insertInto()
Usando la función insertInto(), podemos agregar el DataFrame a la tabla existente. Podemos usar esta función junto con selectExpr() para definir los nombres de las columnas y luego insertarla en la tabla. Esta función también toma tableName como parámetro.
Sintaxis:
DataFrame_obj.write.insertInto('Nombre_tabla')En este escenario, usamos la tabla anterior que se creó a partir de PySpark DataFrame. Asegúrese de que necesita implementar los fragmentos de código del escenario anterior en su entorno.
Ejemplo:
Cree un nuevo DataFrame con dos registros e insértelos en la tabla 'Agri_Table1'.
importar pysparkdesde pyspark.sql importar SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Sugerencia de Linux' ).getOrCreate()
# datos agrícolas con 2 filas
agrícola =[{ 'Tipo de suelo' : 'Arena' , 'Irrigation_availability' : 'No' , 'Hectáreas' : 2500 , 'Soil_status' : 'Seco' ,
'País' : 'EE.UU' },
{ 'Tipo de suelo' : 'Arena' , 'Irrigation_availability' : 'No' , 'Hectáreas' : 1200 , 'Soil_status' : 'Húmedo' ,
'País' : 'Japón' }]
# crear el marco de datos a partir de los datos anteriores
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# escribir.insertarEn()
agri_df2.selectExpr( 'Hectáreas' , 'País' , 'Irrigation_availability' , 'Tipo de suelo' ,
'Estado_del_suelo' ).escribir.insertarEn( 'Agri_Table1' )
# Mostrar la Agri_Table1 final
linuxhint_spark_app.sql( 'SELECCIONAR * de Agri_Table1' ).espectáculo()
Producción:
Ahora, el número total de filas que están presentes en el DataFrame es 7.
Conclusión
Ahora comprende cómo escribir PySpark DataFrame en la tabla mediante la función write.saveAsTable(). Toma el nombre de la tabla y otros parámetros opcionales. Luego, cargamos esta tabla en PySpark DataFrame usando la función spark.read.table(). Solo toma un parámetro que es el nombre de la ruta/tabla. Si desea agregar el nuevo DataFrame a la tabla existente, use la función insertInto().