Python Evaluator Transformation
The Python Evaluator transformation plugin is available in the Hub.
Executes user-provided python code that transforms one record into zero or more records. Each input record is converted into a dictionary which can be directly accessed in python. The transform expects to receive a dictionary as input, which it can process and emit zero or more transformed dictionaries, or emit an error dictionary using the provided emitter object.
Configuration
Property | Enable Marcos? | Description |
---|---|---|
Script | No | Required. Python code defining how to transform one record into another. The script must implement a function called For example:
will scale the |
Execution Mode | Yes | Required. How to execute the python code. Supported modes are: Interpreted mode: Python code is executed via jvm, hence C based libs (e.g. numpy) and Python3 syntax are not supported. Native mode: Python is run natively, so any python version and libraries installed on executors can be used. Native mode requires that python is available on every node in your cluster. It also requires that the py4j library is available on every node. To install py4j with Python 2, run |
Python Binary | Yes | Optional. Path to binary which will run the python code. E.g. |
PYTHONPATH | Yes | Optional. PYTHONPATH environment variable. Allows to include libs from various locations. This value is only used in native mode. |
Example
The transform checks each record’s 'subtotal'
field: if the 'subtotal'
is negative, it emits an error; else, it calculates the 'tax'
and 'total'
fields based on the 'subtotal'
, and then returns a record as a Python dictionary containing those three fields, with the error records written to the configured error dataset:
{
"name": "PythonEvaluator",
"type": "transform",
"properties": {
"script": "def transform(record, emitter, context):
if (record['subtotal'] < 0):
emitter.emitError({
'errorCode': 10,
'errorMsg': 'subtotal is less than 0',
'invalidRecord': record,
})
else:
taxrate = float(context.getArguments().get('taxrate'))
tax = record['subtotal'] * taxrate
if (tax > 1000.0):
context.getMetrics().count('tax.above.1000', 1)
emitter.emit({
'subtotal': record['subtotal'],
'tax': tax,
'total': record['subtotal'] + tax,
})
",
"schema": "{
\"type\":\"record\",
\"name\":\"expanded\",
\"fields\":[
{\"name\":\"subtotal\",\"type\":\"double\"},
{\"name\":\"tax\",\"type\":\"double\"},
{\"name\":\"total\",\"type\":\"double\"}
]
}"
}
}
Created in 2020 by Google Inc.