0 / 0
Volver a la versión inglesa de la documentación
Ejemplos de operador Código
Última actualización: 09 abr 2021
Ejemplos de operador Código

El operador Código puede ser de tipo origen, proceso y análisis o destino.

Importante

  • El código se debe crear con la misma versión de los paquetes que aparecen en esta lista.
  • Independientemente del tipo de operador, debe declarar todos los atributos de salida en la ventana Editar esquema.

Restricción

Los objetos de datetime que se pasan no deben depender del huso horario.

Si devuelve o envía tuplas de sucesos con objetos datetime, asegúrese de que sean “naive” o “timezone-unaware”, es decir, que no dependan del huso horario. Para obtener más información sobre el concepto de “timezone aware” y de “naive” en codificación Python, consulte datetime: Tipos básicos de fecha y hora.

Si se pasan objetos datetime con la característica “timezone-aware”, se generan errores de tiempo de ejecución y por tanto se pasan por alto.

Consejo

En Python, puede convertir un objeto datetime “time-zone” en un objeto datetime “naive” basado en UTC. A continuación encontrará un ejemplo de fragmento de código.

from dateutil.tz import tzutc

event['dt_utc'] = dt.astimezone(tzutc()).replace(tzinfo=None)

Consulte el Ejemplo 2 para ver un ejemplo completo.

 

Operador Código como origen

Puede utilizar el operador Código como origen cuando desee generar datos de prueba sin tener que configurar una instancia de Event Streams.

También lo puede utilizar si desea incorporar datos de un socket web o de un origen de datos de su propiedad.

A. Asíncrono

# User logs include messages at INFO, WARN and ERROR levels
logger = logging.getLogger(__name__)

class Code(object):
    # Called when flow is starting to run
    # def __enter__(self):
    #     pass

    def __call__(self, submit):
        counter = 0
        while(counter <= 10):
            # Enviar una tupla en cada iteración:
            submit({"number": counter, "square": counter * counter})
            counter += 1
            time.sleep(0.5) # Simula un retardo de 0,5 segundos entre los sucesos emitidos

    # Called to handle an exception or shutdown
    # def __exit__(self, exc_type, exc_value, traceback):
    #     pass

Cuando se ejecuta el flujo de secuencias, el flujo de sucesos muestra los datos de prueba generados.

Flujo de sucesos para código como origen

B. Síncrono

El código siguiente muestra el primer enfoque:

import sys
import time
import logging

# User logs include messages at INFO, WARN and ERROR levels
logger = logging.getLogger(__name__)

class Code(object):
   # Called when flow is starting to run
   # def __enter__(self):
   #     pass

   def __call__(self):
       counter = 0
       while(counter <= 10):
           # Enviar una tupla en cada iteración:
           counter += 1
           time.sleep(0.5)   # Simula un retardo de 0,5 segundos entre los sucesos emitidos
           yield {"number": counter, "square": counter * counter}

   # Called to handle an exception or shutdown
   # def __exit__(self, exc_type, exc_value, traceback):
   #     pass

El código siguiente muestra el segundo enfoque:

import time


class Code(object):
    # Called when flow is starting to run
    # def __enter__(self):
    #     pass

    def __call__(self):
        return self

    def __next__(self):
        time.sleep(0.5) # Simula un retardo de 0,5 segundos entre los sucesos emitidos
        self.counter += 1
        # Enviar una tupla en cada iteración:
        return {"number": self.counter, "square": self.counter * self.counter}

    # Called to handle an exception or shutdown
    # def __exit__(self, exc_type, exc_value, traceback):
    #     pass


Operador Código como proceso y análisis

El operador Código como proceso y análisis tiene tanto un parámetro de entrada como un valor de retorno.

Ejemplo 1

Objetivo: Tiene que devolver al esquema de salida dos atributos nuevos que no estén presentes en el esquema de entrada.

En el siguiente fragmento de código se muestra un ejemplo para dos atributos, "friendly_greeting" y "formal_greeting", en el esquema de salida del operador Código.

import sys
def process(event):
  if 'name' in event:
    name = event['name']
  else:
    name = 'stranger'
  friendly_greeting = 'Hey ' + name + '!'
  formal_greeting = "Dear ' + name + ','
  return
  {'friendly_greeting':friendly_greeting,
   'formal_greeting':formal_greeting}

Los atributos 'friendly_greeting' y 'formal_greeting' que se devuelven se deben incluir en el esquema de salida del operador Código. Para comprobar que están presentes, pulse Editar esquema de salida en el panel Propiedades de código y, a continuación, añada los atributos y su tipo, cuando sea necesario.

Ejemplo 2

Objetivo: Tiene que devolver al esquema de salida la hora de un suceso que ha pasado del huso horario IST a UTC.

from dateutil.parser import parse
from dateutil import tz
def process(event):
  # datetime.parse no entiende "IST" como un indicador de huso horario, por lo que lo debe cambiar por +05:30

  dt = parse(event['event_time'].replace('IST','+05:30'))

  # convertir también a huso horario UTC

  event['dt_utc'] = dt.astimezone(tz.gettz('UTC'))
  return event

Para obtener más información sobre los formatos de fecha, consulte Formatos de fecha.

Ejemplo 3

Objetivo: Tiene que devolver una de cada dos tuplas al esquema de salida.

import sys

counter=0

def process(event):
	global counter
	counter+=1
	event['counter'] = counter
	if counter%2 is 0:
		return None
	return event

Ejemplo 4

Objetivo: Desea informar sobre el movimiento geográfico de dispositivos móviles que disponen de GPS (teléfonos, tabletas, coches). Este escenario es un ejemplo para un caso práctico que requiere state.

def init(state):
    # Nada que inicializar en este ejemplo
    pass


def process(event, state):
    deviceId = event['deviceId']  # Extraer el ID de dispositivo de la tupla del suceso
    if deviceId not in state:
        # No hay registro anterior para este ID de dispositivo, lo que significa que es el primero suceso para el mismo
        message = "Detected initial location"
    else:
        record = state[deviceId]  # Extraer el registro de dispositivo de la última (anterior) ubicación
        directions = []  # Matriz para obtener direcciones: norte o sur, este u oeste

        if event['lat'] > record['last_lat']:
            directions.append("north")
        elif event['lat'] < record['last_lat']:
            directions.append("south")

        if event['long'] > record['last_long']:
            directions.append("east")
        elif event['long'] < record['last_long']:
            directions.append("west")

        message = "Moved " + "-".join(directions)

    output = {
        'deviceId': deviceId,
        'message': message
    }
    rememberDeviceLocation(event, state)  # For comparing with future locations
    return output

Resultado de ejemplo

Suceso de entrada 1 {‘deviceId’: ‘p008’, ‘lat’: ‘32.678611’, ‘long’: ‘35.576944’}

Salida resultante {‘deviceId’: ‘p008’, ‘message’: ‘Detected initial location’} ***

Suceso de entrada 2 {‘deviceId’: ‘p008’, ‘lat’: ‘32.67862’, ‘long’: ‘35.576944’}

Salida resultante {‘deviceId’: ‘p008’, ‘message’: ‘Moved north’} ***

Suceso de entrada 3 {‘deviceId’: ‘p008’, ‘lat’: ‘32.6786’, ‘long’: ‘35.57694’}

Salida resultante {‘deviceId’: ‘p008’, ‘message’: ‘Moved south-west’} ***

Suceso de entrada 4 {‘deviceId’: ‘x123’, ‘lat’: ‘32.6786’, ‘long’: ‘35.57694’}

Salida resultante {‘deviceId’: ‘x123’, ‘message’: ‘Detected initial location’}

Ejemplo 5

Objetivo: El operador de Watson IoT ingiere lecturas de uso de electricidad de medidores inteligentes para facturar a los usuarios. Tiene que tomar esos datos e implementar la facturación basada en el tiempo de uso. También desea enviar una alerta por correo electrónico a los clientes cuyo uso correspondiente al mes curso sea alto.

Vea el vídeo sobre Uso de código Python en un flujo de secuencias.

Operador Código como destino

El operador Código como destino le permite escribir código Python en un nodo de destino.

En el código siguiente se muestra un ejemplo:

# se invocará la función process() en cada tupla de suceso
# @event un objeto de diccionario de Python que representa la tupla del suceso de entrada según está definida en el esquema de entrada
# @state un objeto de diccionario de Python para conservar el estado durante las siguientes llamadas a la función
def process(event, state):
    # Hacer algo con el suceso, como por ejemplo utilizar una condición de filtrado compleja y enviar el suceso a una base de datos.
    pass

Más información