Cómo implementar la transmisión de datos en tiempo real en Python

Como Implementar La Transmision De Datos En Tiempo Real En Python



Dominar la implementación de la transmisión de datos en tiempo real en Python actúa como una habilidad esencial en el mundo actual involucrado en datos. Esta guía explora los pasos principales y las herramientas esenciales para utilizar la transmisión de datos en tiempo real con autenticidad en Python. Desde seleccionar un marco adecuado como Apache Kafka o Apache Pulsar hasta escribir un código Python para el consumo, procesamiento y visualización efectiva de datos sin esfuerzo, adquiriremos las habilidades necesarias para construir canales de datos ágiles y eficientes en tiempo real.

Ejemplo 1: Implementación de transmisión de datos en tiempo real en Python

Implementar una transmisión de datos en tiempo real en Python es crucial en la era y el mundo actual impulsados ​​por los datos. En este ejemplo detallado, recorreremos el proceso de creación de un sistema de transmisión de datos en tiempo real utilizando Apache Kafka y Python en Google Colab.







Para inicializar el ejemplo antes de comenzar a codificar, es esencial crear un entorno específico en Google Colab. Lo primero que debemos hacer es instalar las bibliotecas necesarias. Usamos la biblioteca 'kafka-python' para la integración de Kafka.



! pepita instalar kafka-pitón

 
Este comando instala la biblioteca 'kafka-python' que proporciona las funciones de Python y los enlaces para Apache Kafka. A continuación, importamos las bibliotecas necesarias para nuestro proyecto. Importar las bibliotecas requeridas, incluidas 'KafkaProducer' y 'KafkaConsumer', son las clases de la biblioteca 'kafka-python' que nos permiten interactuar con los corredores de Kafka. JSON es la biblioteca de Python para trabajar con los datos JSON que utilizamos para serializar y deserializar los mensajes.



de kafka importar KafkaProducer, KafkaConsumer
importar json

 
Creación de un productor Kafka





Esto es importante porque un productor de Kafka envía los datos a un tema de Kafka. En nuestro ejemplo, creamos un productor para enviar datos simulados en tiempo real a un tema llamado 'tema en tiempo real'.

Creamos una instancia 'KafkaProducer' que especifica la dirección del agente Kafka como 'localhost:9092'. Luego, usamos “value_serializer”, una función que serializa los datos antes de enviarlos a Kafka. En nuestro caso, una función lambda codifica los datos como JSON codificado en UTF-8. Ahora, simulemos algunos datos en tiempo real y enviémoslos al tema de Kafka.



productor = KafkaProductor ( servidores_bootstrap = 'localhost:9092' ,
                          valor_serializador = lambda v: json.dumps ( en ) .codificar ( 'utf-8' ) )
# Datos simulados en tiempo real
datos = { 'identificador_sensor' : 1 , 'temperatura' : 25.5 , 'humedad' : 60.2 }
# Envío de datos al tema
productor.enviar ( 'tema en tiempo real' , datos )

 
En estas líneas, definimos un diccionario de “datos” que representa los datos de un sensor simulado. Luego utilizamos el método 'enviar' para publicar estos datos en el 'tema en tiempo real'.

Luego, queremos crear un consumidor de Kafka, y un consumidor de Kafka lee los datos de un tema de Kafka. Creamos un consumidor para consumir y procesar los mensajes en el 'tema en tiempo real'. Creamos una instancia 'KafkaConsumer', especificando el tema que queremos consumir, por ejemplo (tema en tiempo real) y la dirección del corredor de Kafka. Entonces, el “value_deserializer” es una función que deserializa los datos que se reciben de Kafka. En nuestro caso, una función lambda decodifica los datos como JSON codificado en UTF-8.

consumidor = KafkaConsumidor ( 'tema en tiempo real' ,
                          servidores_bootstrap = 'localhost:9092' ,
                          valor_deserializador = lambda x: json.cargas ( x.decodificar ( 'utf-8' ) ) )

 
Usamos un bucle iterativo para consumir y procesar continuamente los mensajes del tema.

# Lectura y procesamiento de datos en tiempo real.
para mensaje en consumidor:
datos = mensaje.valor
imprimir ( F 'Datos recibidos: {datos}' )

 
Recuperamos el valor de cada mensaje y los datos de nuestro sensor simulado dentro del bucle y los imprimimos en la consola. Ejecutar el productor y consumidor de Kafka implica ejecutar este código en Google Colab y ejecutar las celdas de código individualmente. El productor envía los datos simulados al tema de Kafka y el consumidor lee e imprime los datos recibidos.


Análisis del resultado a medida que se ejecuta el código

Observaremos en tiempo real los datos que se están produciendo y consumiendo. El formato de los datos puede variar dependiendo de nuestra simulación o fuente de datos real. En este ejemplo detallado, cubrimos todo el proceso de configuración de un sistema de transmisión de datos en tiempo real utilizando Apache Kafka y Python en Google Colab. Explicaremos cada línea de código y su importancia en la construcción de este sistema. La transmisión de datos en tiempo real es una capacidad poderosa y este ejemplo sirve como base para aplicaciones más complejas del mundo real.

Ejemplo 2: Implementación de una transmisión de datos en tiempo real en Python utilizando datos del mercado de valores

Hagamos otro ejemplo único de implementación de transmisión de datos en tiempo real en Python usando un escenario diferente; Esta vez nos centraremos en los datos del mercado de valores. Creamos un sistema de transmisión de datos en tiempo real que captura los cambios en el precio de las acciones y los procesa utilizando Apache Kafka y Python en Google Colab. Como se demostró en el ejemplo anterior, comenzamos configurando nuestro entorno en Google Colab. Primero, instalamos las bibliotecas necesarias:

! pepita instalar kafka-python y finanzas

 
Aquí, agregamos la biblioteca 'yfinance' que nos permite obtener datos del mercado de valores en tiempo real. A continuación, importamos las bibliotecas necesarias. Seguimos utilizando las clases 'KafkaProducer' y 'KafkaConsumer' de la biblioteca 'kafka-python' para la interacción con Kafka. Importamos JSON para trabajar con los datos JSON. También utilizamos 'yfinance' para obtener datos del mercado de valores en tiempo real. También importamos la biblioteca 'time' para agregar un retraso de tiempo para simular las actualizaciones en tiempo real.

de kafka importar KafkaProducer, KafkaConsumer
importar json
importar finanzas como yf
importar tiempo

 
Ahora creamos un productor Kafka para datos bursátiles. Nuestro productor de Kafka obtiene datos de acciones en tiempo real y los envía a un tema de Kafka llamado 'precio de acciones'.

productor = KafkaProductor ( servidores_bootstrap = 'localhost:9092' ,
                          valor_serializador = lambda v: json.dumps ( en ) .codificar ( 'utf-8' ) )

mientras Verdadero:
acciones = yf.Ticker ( 'AAPL' )   # Ejemplo: acciones de Apple Inc.
stock_data = stock.historial ( período = '1d' )
último_precio = datos_stock [ 'Cerca' ] .iloc [ - 1 ]
datos = { 'símbolo' : 'AAPL' , 'precio' : ultimo precio }    
productor.enviar ( 'precio de mercado' , datos )
tiempo de dormir ( 10 )   # Simule actualizaciones en tiempo real cada 10 segundos

 
Creamos una instancia 'KafkaProducer' con la dirección del corredor de Kafka en este código. Dentro del ciclo, utilizamos 'yfinance' para obtener el precio más reciente de las acciones de Apple Inc. ('AAPL'). Luego, extraemos el último precio de cierre y lo enviamos al tema “precio de las acciones”. Finalmente, introducimos un retraso para simular las actualizaciones en tiempo real cada 10 segundos.

Creemos un consumidor de Kafka para leer y procesar los datos del precio de las acciones del tema 'precio de las acciones'.

consumidor = KafkaConsumidor ( 'precio de mercado' ,
                          servidores_bootstrap = 'localhost:9092' ,
                          valor_deserializador = lambda x: json.cargas ( x.decodificar ( 'utf-8' ) ) )

para mensaje en consumidor:
stock_data = mensaje.valor
imprimir ( F 'Datos de stock recibidos: {stock_data['symbol']} - Precio: {stock_data['price']}' )

 
Este código es similar a la configuración del consumidor del ejemplo anterior. Lee y procesa continuamente los mensajes del tema 'precio de las acciones' e imprime el símbolo de las acciones y el precio en la consola. Ejecutamos las celdas de código secuencialmente, por ejemplo, una por una en Google Colab para ejecutar el productor y el consumidor. El productor obtiene y envía actualizaciones del precio de las acciones en tiempo real mientras el consumidor lee y muestra estos datos.

! pepita instalar kafka-python y finanzas
de kafka importar KafkaProducer, KafkaConsumer
importar json
importar finanzas como yf
importar tiempo
productor = KafkaProductor ( servidores_bootstrap = 'localhost:9092' ,
                          valor_serializador = lambda v: json.dumps ( en ) .codificar ( 'utf-8' ) )

mientras Verdadero:
acciones = yf.Ticker ( 'AAPL' )   # acciones de Apple Inc.
stock_data = stock.historial ( período = '1d' )
último_precio = datos_stock [ 'Cerca' ] .iloc [ - 1 ]
   
datos = { 'símbolo' : 'AAPL' , 'precio' : ultimo precio }
   
productor.enviar ( 'precio de mercado' , datos )
   
tiempo de dormir ( 10 )   # Simule actualizaciones en tiempo real cada 10 segundos
consumidor = KafkaConsumidor ( 'precio de mercado' ,
                          servidores_bootstrap = 'localhost:9092' ,
                          valor_deserializador = lambda x: json.cargas ( x.decodificar ( 'utf-8' ) ) )

para mensaje en consumidor:
stock_data = mensaje.valor
imprimir ( F 'Datos de stock recibidos: {stock_data['symbol']} - Precio: {stock_data['price']}' )

 
En el análisis del resultado después de ejecutar el código, observaremos cómo se producen y consumen las actualizaciones del precio de las acciones en tiempo real de Apple Inc.

Conclusión

En este ejemplo único, demostramos la implementación de transmisión de datos en tiempo real en Python utilizando Apache Kafka y la biblioteca 'yfinance' para capturar y procesar los datos del mercado de valores. Explicamos detalladamente cada línea del código. La transmisión de datos en tiempo real se puede aplicar a varios campos para crear aplicaciones del mundo real en finanzas, IoT y más.