Convertir PySpark DataFrame a JSON

Convertir Pyspark Dataframe A Json



La transmisión de datos estructurados mediante JSON es posible y también consume poca memoria. En comparación con PySpark RDD o PySpark DataFrame, JSON consume poca memoria y la serialización es posible con JSON. Podemos convertir PySpark DataFrame a JSON usando el método pyspark.sql.DataFrameWriter.json(). Aparte de eso, hay otras dos formas de convertir DataFrame a JSON.

Tema de contenidos:

Consideremos un PySpark DataFrame simple en todos los ejemplos y convirtámoslo en JSON usando las funciones mencionadas.







Módulo requerido:

Instale la biblioteca PySpark en su entorno si aún no está instalada. Puede consultar el siguiente comando para instalarlo:



pip instalar pyspark

PySpark DataFrame a JSON usando To_json() con ToPandas()

El método to_json() está disponible en el módulo de Pandas que convierte Pandas DataFrame a JSON. Podemos utilizar este método si convertimos nuestro PySpark DataFrame a Pandas DataFrame. Para convertir PySpark DataFrame a Pandas DataFrame, se utiliza el método toPandas(). Veamos la sintaxis de to_json() junto con sus parámetros.



Sintaxis:





dataframe_object.toPandas().to_json(orientar,índice,...)
  1. Orient se usa para mostrar el JSON convertido en el formato deseado. Toma 'registros', 'tabla', 'valores', 'columnas', 'índice', 'dividir'.
  2. El índice se usa para incluir/eliminar el índice de la cadena JSON convertida. Si se establece en 'Verdadero', se muestran los índices. De lo contrario, los índices no se mostrarán si la orientación es 'dividida' o 'tabla'.

Ejemplo 1: Orientar como 'Registros'

Cree un PySpark DataFrame 'skills_df' con 3 filas y 4 columnas. Convierta este DataFrame a JSON especificando el parámetro de orientación como 'registros'.

importar pyspark

importar pandas

desde pyspark.sql importar SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Sugerencia de Linux' ).getOrCreate()

# datos de habilidades con 3 filas y 4 columnas

habilidades =[{ 'identificación' : 123 , 'persona' : 'Miel' , 'habilidad' : 'cuadro' , 'premio' : 25000 },

{ 'identificación' : 112 , 'persona' : 'Mouni' , 'habilidad' : 'bailar' , 'premio' : 2000 },

{ 'identificación' : 153 , 'persona' : 'Tulasí' , 'habilidad' : 'lectura' , 'premio' : 1200 }

]

# crear el marco de datos de habilidades a partir de los datos anteriores

skills_df = linuxhint_spark_app.createDataFrame(habilidades)

# Datos de habilidades reales

skills_df.show()

# Convertir a JSON usando to_json() con orient como 'registros'

json_skills_data = skills_df.toPandas().to_json(orient= 'registros' )

imprimir (json_skills_data)

Producción:



+---+------+-----+--------+

| id|persona|premio| habilidad|

+---+------+-----+--------+

| 123 | Miel| 25000 |pintura|

| 112 | Moni| 2000 | baile|

| 153 |Tulasi| 1200 | leyendo|

+---+------+-----+--------+

[{ 'identificación' : 123 , 'persona' : 'Miel' , 'premio' : 25000 , 'habilidad' : 'cuadro' },{ 'identificación' : 112 , 'persona' : 'Mouni' , 'premio' : 2000 , 'habilidad' : 'bailar' },{ 'identificación' : 153 , 'persona' : 'Tulasí' , 'premio' : 1200 , 'habilidad' : 'lectura' }]

Podemos ver que PySpark DataFrame se convierte a la matriz JSON con un diccionario de valores. Aquí, las claves representan el nombre de la columna y el valor representa el valor de la fila/celda en PySpark DataFrame.

Ejemplo 2: Orientar como 'Dividir'

El formato JSON que devuelve la orientación 'dividida' incluye los nombres de las columnas que tienen una lista de columnas, una lista de índices y una lista de datos. El siguiente es el formato del oriente 'dividido'.

# Convertir a JSON usando to_json() con orient como 'dividir'

json_skills_data = skills_df.toPandas().to_json(orient= 'dividir' )

imprimir (json_skills_data)

Producción:

{ 'columnas' :[ 'identificación' , 'persona' , 'premio' , 'habilidad' ], 'índice' :[ 0 , 1 , 2 ], 'datos' :[[ 123 , 'Miel' , 25000 , 'cuadro' ],[ 112 , 'Mouni' , 2000 , 'bailar' ],[ 153 , 'Tulasí' , 1200 , 'lectura' ]]}

Ejemplo 3: Orientar como 'Índice'

Aquí, cada fila de PySpark DataFrame se retira en forma de diccionario con la clave como nombre de columna. Para cada diccionario, la posición del índice se especifica como una clave.

# Convertir a JSON usando to_json() con orient como 'índice'

json_skills_data = skills_df.toPandas().to_json(orient= 'índice' )

imprimir (json_skills_data)

Producción:

{ '0' :{ 'identificación' : 123 , 'persona' : 'Miel' , 'premio' : 25000 , 'habilidad' : 'cuadro' }, '1' :{ 'identificación' : 112 , 'persona' : 'Mouni' , 'premio' : 2000 , 'habilidad' : 'bailar' }, '2' :{ 'identificación' : 153 , 'persona' : 'Tulasí' , 'premio' : 1200 , 'habilidad' : 'lectura' }}

Ejemplo 4: Oriente como 'Columnas'

Las columnas son la clave para cada registro. Cada columna contiene un diccionario que toma los valores de la columna con números de índice.

# Convertir a JSON usando to_json() con orient como 'columnas'

json_skills_data = skills_df.toPandas().to_json(orient= 'columnas' )

imprimir (json_skills_data)

Producción:

{ 'identificación' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'persona' :{ '0' : 'Miel' , '1' : 'Mouni' , '2' : 'Tulasí' }, 'premio' :{ '0' : 25000 , '1' : 2000 , '2' : 1200 }, 'habilidad' :{ '0' : 'cuadro' , '1' : 'bailar' , '2' : 'lectura' }}

Ejemplo 5: Orientar como “Valores”

Si solo necesita los valores en JSON, puede optar por la orientación de 'valores'. Muestra cada fila en una lista. Finalmente, todas las listas se almacenan en una lista. Este JSON es del tipo de lista anidada.

# Convertir a JSON usando to_json() con orient como 'valores'

json_skills_data = skills_df.toPandas().to_json(orient= 'valores' )

imprimir (json_skills_data)

Producción:

[[ 123 , 'Miel' , 25000 , 'cuadro' ],[ 112 , 'Mouni' , 2000 , 'bailar' ],[ 153 , 'Tulasí' , 1200 , 'lectura' ]]

Ejemplo 6: Orientar como “Mesa”

La orientación de la 'tabla' devuelve el JSON que incluye el esquema con los nombres de los campos junto con los tipos de datos de las columnas, el índice como clave principal y la versión de Pandas. Los nombres de columna con valores se muestran como 'datos'.

# Convertir a JSON usando to_json() con orient como 'tabla'

json_skills_data = skills_df.toPandas().to_json(orient= 'mesa' )

imprimir (json_skills_data)

Producción:

{ 'esquema' :{ 'campos' :[{ 'nombre' : 'índice' , 'tipo' : 'entero' },{ 'nombre' : 'identificación' , 'tipo' : 'entero' },{ 'nombre' : 'persona' , 'tipo' : 'cadena' },{ 'nombre' : 'premio' , 'tipo' : 'entero' },{ 'nombre' : 'habilidad' , 'tipo' : 'cadena' }], 'Clave primaria' :[ 'índice' ], 'pandas_version' : '1.4.0' }, 'datos' :[{ 'índice' : 0 , 'identificación' : 123 , 'persona' : 'Miel' , 'premio' : 25000 , 'habilidad' : 'cuadro' },{ 'índice' : 1 , 'identificación' : 112 , 'persona' : 'Mouni' , 'premio' : 2000 , 'habilidad' : 'bailar' },{ 'índice' : 2 , 'identificación' : 153 , 'persona' : 'Tulasí' , 'premio' : 1200 , 'habilidad' : 'lectura' }]}

Ejemplo 7: con parámetro de índice

Primero, pasamos el parámetro de índice configurándolo en 'Verdadero'. Verá para cada valor de columna que la posición del índice se devuelve como una clave en un diccionario.

En la segunda salida, solo se devuelven los nombres de las columnas ('columnas') y los registros ('datos') sin las posiciones del índice, ya que el índice se establece en 'Falso'.

# Convertir a JSON usando to_json() con index=True

json_skills_data = skills_df.toPandas().to_json(index=True)

imprimir (json_skills_data, ' \norte ' )

# Convertir a JSON usando to_json() con index=False

json_skills_data= skills_df.toPandas().to_json(index=False,orient= 'dividir' )

imprimir (json_skills_data)

Producción:

{ 'identificación' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'persona' :{ '0' : 'Miel' , '1' : 'Mouni' , '2' : 'Tulasí' }, 'premio' :{ '0' : 25000 , '1' : 2000 , '2' : 1200 }, 'habilidad' :{ '0' : 'cuadro' , '1' : 'bailar' , '2' : 'lectura' }}

{ 'columnas' :[ 'identificación' , 'persona' , 'premio' , 'habilidad' ], 'datos' :[[ 123 , 'Miel' , 25000 , 'cuadro' ],[ 112 , 'Mouni' , 2000 , 'bailar' ],[ 153 , 'Tulasí' , 1200 , 'lectura' ]]

PySpark DataFrame a JSON usando ToJSON()

El método toJSON() se usa para convertir PySpark DataFrame en un objeto JSON. Básicamente, devuelve una cadena JSON que está rodeada por una lista. El [‘{columna:valor,…}’,…. ] es el formato que devuelve esta función. Aquí, cada fila del PySpark DataFrame se devuelve como un diccionario con el nombre de la columna como clave.

Sintaxis:

marco_de_datos_objeto.toJSON()

Puede ser posible pasar parámetros como el índice, las etiquetas de las columnas y el tipo de datos.

Ejemplo:

Cree un PySpark DataFrame 'skills_df' con 5 filas y 4 columnas. Convierta este DataFrame a JSON usando el método toJSON().

importar pyspark

desde pyspark.sql importar SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Sugerencia de Linux' ).getOrCreate()

# datos de habilidades con 5 filas y 4 columnas

habilidades =[{ 'identificación' : 123 , 'persona' : 'Miel' , 'habilidad' : 'cuadro' , 'premio' : 25000 },

{ 'identificación' : 112 , 'persona' : 'Mouni' , 'habilidad' : 'música/baile' , 'premio' : 2000 },

{ 'identificación' : 153 , 'persona' : 'Tulasí' , 'habilidad' : 'lectura' , 'premio' : 1200 },

{ 'identificación' : 173 , 'persona' : 'Corrió' , 'habilidad' : 'música' , 'premio' : 2000 },

{ 'identificación' : 43 , 'persona' : 'Kamala' , 'habilidad' : 'lectura' , 'premio' : 10000 }

]

# crear el marco de datos de habilidades a partir de los datos anteriores

skills_df = linuxhint_spark_app.createDataFrame(habilidades)

# Datos de habilidades reales

skills_df.show()

# Convertir a matriz JSON

json_skills_data = skills_df.toJSON().collect()

imprimir (json_skills_data)

Producción:

+---+------+-----+-----------+

| id|persona|premio| habilidad|

+---+------+-----+-----------+

| 123 | Miel| 25000 | pintura|

| 112 | Moni| 2000 |música/baile|

| 153 |Tulasi| 1200 | leyendo|

| 173 | corrió | 2000 | musica|

| 43 |Kamala| 10000 | leyendo|

+---+------+-----+-----------+

[ '{'id':123,'persona':'Cariño','premio':25000,'habilidad':'pintura'}' , '{'id':112,'persona':'Mouni','premio':2000,'habilidad':'música/baile'}' , '{'id':153,'persona':'Tulasi','premio':1200,'habilidad':'lectura'}' , '{'id':173,'persona':'Corrió','premio':2000,'habilidad':'música'}' , '{'id':43,'persona':'Kamala','premio':10000,'habilidad':'lectura'}' ]

Hay 5 filas en PySpark DataFrame. Todas estas 5 filas se devuelven como un diccionario de cadenas que están separadas por comas.

PySpark DataFrame a JSON usando Write.json()

El método write.json() está disponible en PySpark, que escribe/guarda el PySpark DataFrame en un archivo JSON. Toma el nombre del archivo/ruta como parámetro. Básicamente, devuelve el JSON en varios archivos (archivos particionados). Para unirlos todos en un solo archivo, podemos usar el método coalesce().

Sintaxis:

dataframe_object.coalesce( 1 ).write.json('nombre_de_archivo')
  1. Modo de agregar – dataframe_object.write.mode('append').json('file_name')
  2. Modo de sobrescritura – dataframe_object.write.mode('overwrite').json('file_name')

Puede ser posible agregar/sobrescribir el JSON existente. Usando write.mode (), podemos agregar los datos pasando 'agregar' o sobrescribir los datos JSON existentes pasando 'sobrescribir' a esta función.

Ejemplo 1:

Cree un PySpark DataFrame 'skills_df' con 3 filas y 4 columnas. Escriba este DataFrame en JSON.

importar pyspark

importar pandas

desde pyspark.sql importar SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Sugerencia de Linux' ).getOrCreate()

# datos de habilidades con 3 filas y 4 columnas

habilidades =[{ 'identificación' : 123 , 'persona' : 'Miel' , 'habilidad' : 'cuadro' , 'premio' : 25000 },

{ 'identificación' : 112 , 'persona' : 'Mouni' , 'habilidad' : 'bailar' , 'premio' : 2000 },

{ 'identificación' : 153 , 'persona' : 'Tulasí' , 'habilidad' : 'lectura' , 'premio' : 1200 }

]

# crear el marco de datos de habilidades a partir de los datos anteriores

skills_df = linuxhint_spark_app.createDataFrame(habilidades)

# escribir.json()

habilidades_df.coalesce( 1 ).escribir.json( 'habilidades_datos' )

Archivo JSON:

Podemos ver que la carpeta skills_data incluye los datos JSON particionados.

Abramos el archivo JSON. Podemos ver que todas las filas del PySpark DataFrame se convierten en JSON.

Hay 5 filas en PySpark DataFrame. Todas estas 5 filas se devuelven como un diccionario de cadenas que están separadas por comas.

Ejemplo 2:

Cree un PySpark DataFrame 'skills2_df' con una fila. Agregue una fila al archivo JSON anterior especificando el modo como 'agregar'.

importar pyspark

importar pandas

desde pyspark.sql importar SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Sugerencia de Linux' ).getOrCreate()

habilidades2 =[{ 'identificación' : 78 , 'persona' : 'María' , 'habilidad' : 'equitación' , 'premio' : 8960 }

]

# crear el marco de datos de habilidades a partir de los datos anteriores

habilidades2_df = linuxhint_spark_app.createDataFrame(habilidades2)

# write.json() con modo de adición.

habilidades2_df.escribir.modo( 'adjuntar' ).json( 'habilidades_datos' )

Archivo JSON:

Podemos ver los archivos JSON particionados. El primer archivo contiene los primeros registros de DataFrame y el segundo archivo contiene el segundo registro de DataFrame.

Conclusión

Hay tres formas diferentes de convertir PySpark DataFrame a JSON. Primero, discutimos el método to_json() que convierte a JSON al convertir PySpark DataFrame en Pandas DataFrame con diferentes ejemplos considerando diferentes parámetros. A continuación, utilizamos el método toJSON(). Por último, aprendimos a usar la función write.json() para escribir PySpark DataFrame en JSON. Con esta función es posible agregar y sobrescribir.