Traducción no actualizada
El operador Código puede ser de tipo origen, proceso y análisis o destino.
Importante
- El código se debe crear con la misma versión de los paquetes que aparecen en esta lista.
- Independientemente del tipo de operador, debe declarar todos los atributos de salida en la ventana Editar esquema.
Restricción
Los objetos de datetime
que se pasan no deben depender del huso horario.
Si devuelve o envía tuplas de sucesos con objetos datetime
, asegúrese de que sean “naive” o “timezone-unaware”, es decir, que no dependan del huso horario. Para obtener más información sobre el concepto de “timezone aware” y de “naive” en codificación Python, consulte datetime: Tipos básicos de fecha y hora.
Si se pasan objetos datetime
con la característica “timezone-aware”, se generan errores de tiempo de ejecución y por tanto se pasan por alto.
Consejo
En Python, puede convertir un objeto datetime
“time-zone” en un objeto datetime
“naive” basado en UTC. A continuación encontrará un ejemplo de fragmento de código.
from dateutil.tz import tzutc
event['dt_utc'] = dt.astimezone(tzutc()).replace(tzinfo=None)
Consulte el Ejemplo 2 para ver un ejemplo completo.
Operador Código como origen
Puede utilizar el operador Código como origen cuando desee generar datos de prueba sin tener que configurar una instancia de Event Streams.
También lo puede utilizar si desea incorporar datos de un socket web o de un origen de datos de su propiedad.
A. Asíncrono
# User logs include messages at INFO, WARN and ERROR levels
logger = logging.getLogger(__name__)
class Code(object):
# Called when flow is starting to run
# def __enter__(self):
# pass
def __call__(self, submit):
counter = 0
while(counter <= 10):
# Enviar una tupla en cada iteración:
submit({"number": counter, "square": counter * counter})
counter += 1
time.sleep(0.5) # Simula un retardo de 0,5 segundos entre los sucesos emitidos
# Called to handle an exception or shutdown
# def __exit__(self, exc_type, exc_value, traceback):
# pass
Cuando se ejecuta el flujo de secuencias, el flujo de sucesos muestra los datos de prueba generados.
B. Síncrono
El código siguiente muestra el primer enfoque:
import sys
import time
import logging
# User logs include messages at INFO, WARN and ERROR levels
logger = logging.getLogger(__name__)
class Code(object):
# Called when flow is starting to run
# def __enter__(self):
# pass
def __call__(self):
counter = 0
while(counter <= 10):
# Enviar una tupla en cada iteración:
counter += 1
time.sleep(0.5) # Simula un retardo de 0,5 segundos entre los sucesos emitidos
yield {"number": counter, "square": counter * counter}
# Called to handle an exception or shutdown
# def __exit__(self, exc_type, exc_value, traceback):
# pass
El código siguiente muestra el segundo enfoque:
import time
class Code(object):
# Called when flow is starting to run
# def __enter__(self):
# pass
def __call__(self):
return self
def __next__(self):
time.sleep(0.5) # Simula un retardo de 0,5 segundos entre los sucesos emitidos
self.counter += 1
# Enviar una tupla en cada iteración:
return {"number": self.counter, "square": self.counter * self.counter}
# Called to handle an exception or shutdown
# def __exit__(self, exc_type, exc_value, traceback):
# pass
Operador Código como proceso y análisis
El operador Código como proceso y análisis tiene tanto un parámetro de entrada como un valor de retorno.
Ejemplo 1
Objetivo: Tiene que devolver al esquema de salida dos atributos nuevos que no estén presentes en el esquema de entrada.
En el siguiente fragmento de código se muestra un ejemplo para dos atributos, "friendly_greeting" y "formal_greeting", en el esquema de salida del operador Código.
import sys
def process(event):
if 'name' in event:
name = event['name']
else:
name = 'stranger'
friendly_greeting = 'Hey ' + name + '!'
formal_greeting = "Dear ' + name + ','
return
{'friendly_greeting':friendly_greeting,
'formal_greeting':formal_greeting}
Los atributos 'friendly_greeting' y 'formal_greeting' que se devuelven se deben incluir en el esquema de salida del operador Código.
Para comprobar que están presentes, pulse en el panel Propiedades de código y, a continuación, añada los atributos y su tipo, cuando sea necesario.
Ejemplo 2
Objetivo: Tiene que devolver al esquema de salida la hora de un suceso que ha pasado del huso horario IST a UTC.
from dateutil.parser import parse
from dateutil import tz
def process(event):
# datetime.parse no entiende "IST" como un indicador de huso horario, por lo que lo debe cambiar por +05:30
dt = parse(event['event_time'].replace('IST','+05:30'))
# convertir también a huso horario UTC
event['dt_utc'] = dt.astimezone(tz.gettz('UTC'))
return event
Para obtener más información sobre los formatos de fecha, consulte Formatos de fecha.
Ejemplo 3
Objetivo: Tiene que devolver una de cada dos tuplas al esquema de salida.
import sys
counter=0
def process(event):
global counter
counter+=1
event['counter'] = counter
if counter%2 is 0:
return None
return event
Ejemplo 4
Objetivo: Desea informar sobre el movimiento geográfico de dispositivos móviles que disponen de GPS (teléfonos, tabletas, coches). Este escenario es un ejemplo para un caso práctico que requiere state
.
def init(state):
# Nada que inicializar en este ejemplo
pass
def process(event, state):
deviceId = event['deviceId'] # Extraer el ID de dispositivo de la tupla del suceso
if deviceId not in state:
# No hay registro anterior para este ID de dispositivo, lo que significa que es el primero suceso para el mismo
message = "Detected initial location"
else:
record = state[deviceId] # Extraer el registro de dispositivo de la última (anterior) ubicación
directions = [] # Matriz para obtener direcciones: norte o sur, este u oeste
if event['lat'] > record['last_lat']:
directions.append("north")
elif event['lat'] < record['last_lat']:
directions.append("south")
if event['long'] > record['last_long']:
directions.append("east")
elif event['long'] < record['last_long']:
directions.append("west")
message = "Moved " + "-".join(directions)
output = {
'deviceId': deviceId,
'message': message
}
rememberDeviceLocation(event, state) # For comparing with future locations
return output
Resultado de ejemplo
Suceso de entrada 1 {‘deviceId’: ‘p008’, ‘lat’: ‘32.678611’, ‘long’: ‘35.576944’}
Salida resultante {‘deviceId’: ‘p008’, ‘message’: ‘Detected initial location’} ***
Suceso de entrada 2 {‘deviceId’: ‘p008’, ‘lat’: ‘32.67862’, ‘long’: ‘35.576944’}
Salida resultante {‘deviceId’: ‘p008’, ‘message’: ‘Moved north’} ***
Suceso de entrada 3 {‘deviceId’: ‘p008’, ‘lat’: ‘32.6786’, ‘long’: ‘35.57694’}
Salida resultante {‘deviceId’: ‘p008’, ‘message’: ‘Moved south-west’} ***
Suceso de entrada 4 {‘deviceId’: ‘x123’, ‘lat’: ‘32.6786’, ‘long’: ‘35.57694’}
Salida resultante {‘deviceId’: ‘x123’, ‘message’: ‘Detected initial location’}
Ejemplo 5
Objetivo: El operador de Watson IoT ingiere lecturas de uso de electricidad de medidores inteligentes para facturar a los usuarios. Tiene que tomar esos datos e implementar la facturación basada en el tiempo de uso. También desea enviar una alerta por correo electrónico a los clientes cuyo uso correspondiente al mes curso sea alto.
Vea el vídeo sobre Uso de código Python en un flujo de secuencias.
Operador Código como destino
El operador Código como destino le permite escribir código Python en un nodo de destino.
En el código siguiente se muestra un ejemplo:
# se invocará la función process() en cada tupla de suceso
# @event un objeto de diccionario de Python que representa la tupla del suceso de entrada según está definida en el esquema de entrada
# @state un objeto de diccionario de Python para conservar el estado durante las siguientes llamadas a la función
def process(event, state):
# Hacer algo con el suceso, como por ejemplo utilizar una condición de filtrado compleja y enviar el suceso a una base de datos.
pass