Versions Compared
Key
- This line was added.
- This line was removed.
- Formatting was changed.
Introduction
CDAP provides Python Evaluator transform that allows user provided python code to be executed in a transform plugin. The current solution uses jython 2.5.x and does not have support for allowing any standard/native libraries to be used
Use case(s)
- Python evaluator transform should allow capabilities to use standard python libraries in the user provided code.
- Python evaluator transform should allow capabilities to use native third-party libraries (using numpy, scipy other scientific libs)
- Python 3 should be supported
- Publish benchmark of running transformations on a large datasets (~10 million records)
- Code for the python transform should be separated out into a repo in data-integrations org
Deliverables
- Source code in data integrations org
- Performance test on a large dataset
- Integration test code
- Relevant documentation in the source repo and reference documentation section in plugin
Relevant links
- Existing Python plugin code: https://github.com/caskdata/hydrator-plugins/tree/develop/
- Data-integrations org: https://github.com/data-integrations/
- Integration test repos: https://github.com/caskdata/cdap-integration-tests
Plugin Type
- Batch Source
- Batch Sink
- Real-time Source
- Real-time Sink
- Transform
- Action
- Post-Run Action
- Aggregate
- Join
- Spark Model
- Spark Compute
Configurables
This section defines properties that are configurable for this plugin.
User Facing Name | Type | Description | Constraints | PYTHONPATH|||||
---|---|---|---|---|---|---|---|---|
Execution Mode | String | PYTHONPATH to libraries which user may want to use. By default it's empty and we allow it to be empty it's not required. Python will have it's default PYTHONPATH anyway. | Path to Python binary | String | Path to Python binary. User is be able to provide a python binary of the any version. Both python2 and python3 will work. This is a field user is required to fill. Default is empty. | Timeout of Python process | Long | Java process will have to block and wait for Python process to finish. So it's inevitable that we need some kind of timeout not to block the app Default: 600 seconds |
Previously we used Jython which emulated how CPython works. However it cannot read pure C libraries like some default python libraries and most scientific libs (numpy, scipy - all C written), which are very useful for data analysis. Also Jython is not realeased for Python3.
So the solution is to use pure CPython process and a python library Py4j (some discussion and explanation on why we chose this lib can be found in comments). This library allows a Python process to communicate with JVM via plain file sockets to exchange data. Also it transforms class hierarchies and primitive types between Java and Python.
Here's how the process goes:
a) if it does not finish in timeout seconds, pipeline fails due to timeout.
We run the code below using plain cpython (e.g. /usr/bin/python):
Interpreted (default): use Jython to execute code (how it worked before). Pros: no need to install anything for it to work Cons: only python2. A lot of native C libraries are not supported. Native: use py4j to execute code as pure CPython Pros: Any python version can be used (including python3.*). All C libraries including scientific data analysis libs (numpy, scipy) and default C libs are supported Cons: user will need to install py4j on all the nodes | |||
PYTHONPATH | String | PYTHONPATH to libraries which user may want to use. By default it's empty and we allow it to be empty it's not required. Python will have it's default PYTHONPATH anyway. | |
Path to Python binary | String | Path to Python binary. User is be able to provide a python binary of the any version. Both python2 and python3 will work. This is a field user is required to fill. Default is empty. | |
Timeout of Python process | Long | Timeout of python transformation (for a single record) Transform operation will have to wait for Python process to finish. So it's inevitable that we need some kind of timeout not to block it indefinitely. Default: 600 seconds |
Example of Json:
No Format |
---|
{
"name":"Python",
"plugin":{
"name":"PythonEvaluator",
"type":"transform",
"label":"Python",
"properties":{
"pythonBinary":"/usr/bin/python",
"pythonpath":"",
"pythonProcessTimeout":600,
"script":"def transform(record, emitter, context):\n \"\"\"Transforms the provided input record into zero or more output records (as dictionaries) or errors (as an error dictionary).\n\n Input records are available as a dictionary.\n\n Args:\n record (dict): The input record as a dictionary. e.g. to access a field called 'total' from the input record, use record['total'].\n emitter (Emitter): an object that can be used to emit zero or more records (using the emitter.emit() method) or errors (using the emitter.emitError() method).\n context (Context): an object that provides access to:\n 1. CDAP Metrics - context.getMetrics().count('output', 1);\n 2. CDAP Logs - context.getLogger().debug('Received a record');\n 3. Lookups - context.getLookup('blacklist').lookup(input.id); or\n 4. Runtime Arguments - context.getArguments.get('priceThreshold')\n\n \"\"\"\n if record['company_name'].startswith(\"q\"):\n emitter.emit(record)\n",
"schema":"{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"company_name\",\"type\":\"string\"},{\"name\":\"primary_role\",\"type\":\"string\"},{\"name\":\"created_at\",\"type\":\"string\"},{\"name\":\"updated_at\",\"type\":\"string\"}]}"
}
}
} |
Design
Previously we used Jython which emulated how CPython works. However it cannot read pure C libraries like some default python libraries and most scientific libs (numpy, scipy - all C written), which are very useful for data analysis. Also Jython is not realeased for Python3.
So the solution is to use pure CPython process and a python library Py4j. This library allows a Python process to communicate with JVM via plain file sockets to exchange data. Also it transforms class hierarchies and primitive types between Java and Python.
Here's how the process goes:
Intialize Stage:
Create certificates for mutual authentication. Files should be created in unique tmp dir not to interfere with certificates used by other jobs, which might run in parallel (permissions 600). Since pipelines usually run in a YARN cluster, where containers from any other application from any other user may be running on the same host. It's possible for one of those other containers to connect to the right port and do malicious things. That's why even though all connections are localhost this is still a vulnerability and we will also have to use mutual authentication.
Start a Python process and pass a unique tmp dir as a parameter.
a) Starts JavaGateway on ephemeral port to avoid port conflicts when multiple transformations are running on the same host.
b) writes the dynamic port to file since it needs to be known by Java code.
- Java code needs to know the port of python gateway. So it waits for it in stdout of the python process.
Java code starts Py4J.Gateway to communicate with python process over plain sockets.
Transform Stage:
Python transform method is being called from jvm, providing the correct parameters
Destroy Stage:
Destroy might not be called while running in streaming mode or when job is getting killed. That's why there is a part in python code, which will automatically wait for java connection closed and if this happens it ends the process:
Code Block | ||
---|---|---|
| ||
def on_server_connection_stopped(signal, sender, **params):
gateway.shutdown()
java_gateway.server_connection_stopped.connect(on_server_connection_stopped) |
Link from py4j docs https://www.py4j.org/py4j_java_gateway.html#py4j-core-signals server_connection_stopped is called once java disconnects.
Java code destroy still runs:
- Close client connection in jvm itself
- Remove temporary folder.
If Process dies connection also gets closed and so python process dies as well. Regarding the tmp_folder we cannot delete it from python. Because in that case we won't be able to get output of process in destroy or during exception in python transform (output is in file output.txt and is saved only at the death of python process).
Pseudocode:
Code Block | ||
---|---|---|
| ||
import py4j.GatewayServer;
public class PythonEvaluator extends ... {
@Override
public void initialize(TransformContext context) {
// ...
// 1. generate certificates
// 2. run python process
// 3. wait until the port is emitted in stdout. Throw exception after certain timeout. Let's say 120seconds
// 4.
GatewayServer server = new GatewayServer((Object)null, 0, pythonPort, 0, 0, (List)null); // java port is 0 - ephemeral, python port is the port chosen by python process
server.start(true); // starts server
// pythonTransformTransport is an interface
pythonTransformTransport = (pythonTransformTransport) server.getPythonServerEntryPoint(new Class[] { pythonTransformTransport.class });
}
@Override
public void transform(StructuredRecord input, final Emitter<StructuredRecord> emitter) {
try {
pythonTransformTransport.transform(record, emitter, context);
} catch (PyException e) {
// logic to format stack trace from python correctly into java exception message
}
}
@Override
public void destroy() {
if(pythonTransformTransport != null) {
pythonTransformTransport.finish(); // ask Python process to close connection, since it is the server and it controls connection
}
if(server != null) {
server.shutdown();
}
// remove certificates
}
} |
Code Block | ||
---|---|---|
| ||
from threading import Thread
from py4j.java_gateway import JavaGateway, CallbackServerParameters
class PythonTransformTransportImpl(object):
def transform(self, record, emitter, context):
# code provided by user
# emitter.emit(record); # sends data to JVM
def finish(self):
# close the gateway server. This methods needs to return hence the delay.
Thread.timer(0.1, self.gateway.shutdown)
process.start()
class Java:
implements = ["co.cask.hydrator.plugin.transform.pythonTransformTransport"]
python_transform_transport = PythonTransformTransportImpl()
gateway = JavaGateway(
callback_server_parameters=CallbackServerParameters(port=0), # use ephemeral port
python_server_entry_point=python_transform_transport)
python_transform_transport.gateway = gateway
port = gateway.get_callback_server().get_listening_port()
print port |
Limitation(s)
- User will have to install Py4J library by himself (either for python2 or for python3). This can be done by running "pip install py4j" or "pip3 install py4j"
- User will have to install Python by himself.
There is no easy way to install py4j library automatically along with transform plugin unfortunately. We have to knowledge of version of python the user is going to use (python2 or python3). Also plugin is a single jar which does not include any additional files. For users who are installing CDAP from CM/Ambari/Docker image we could however pre-install py4j library.
Justification of using Py4J
We were to choose between 3 most popular solutions which enable executing Python code in scope of Java application. Here's a comparison table of them.
Py4J | Jep | Jpy | |
---|---|---|---|
License | BSD | zlib/libpng | Apache |
Mechanism of work | CPython process and java thread transfer data using sockets | JNI (via *.so library which has access to python) | JNI (via *.so library which has access to python) |
Anticipated performance | Normal | High (due to low-level mechanism) | High+ (due to low-level mechanism). Project states it was specifically designed to meet high performance goals |
Needs achitecture dependent files (*.so) | - | + | + |
Importing native libraries | Any native libraries | Can read almost all. Including numpy, scipy | Can read almost all. Including numpy, scipy |
Stability | Very high | High | High |
Ability to choose python version to use | + | - | - |
Installation/Running | Simple | Simple | Slighting advanced (building from sources and setting paths) |
Needs achitecture dependent files (*.so)
If library needs *.so file this gets a bit complicated. We will need to ask user to install the library with a simple command. Before using the transform pipeline. Since these *.so files are not only dependent on processor achitecture, but also dependent on very specific version of Python it is used with. I will create another comment to discuss this, cause there are other points to make.
Importing native libraries / Stability
Py4j uses real Python process no problems here everything will work. For Jep and Jpy they are using real CPython as well, but they are calling it via *.so libraries, which is not the usual way. And some presumably small number of libs can have a problem with this approach. From Jep docs:
works with most numpy, scipy, pandas, tensorflow, matplotlib, cvxpy
Jep should work with any pure Python modules. CPython extensions and Cython modules may or may not work correctly
Jep doesn't work correctly with some CPython extensions due to how those extensions were coded. Oftentimes projects were not designed towards the idea of running in embedded sub-interpreters instead of in a standard, single Python interpreter. When running Jep, these types of problems have been known to appear as a JVM crash, an attribute inexplicably set to None, or a thread deadlock.
Devs claim that on latest version jvm crash should not appear when using odd libs, when using another method of loading the module.
Ability to choose python version to use
Py4j will allow to choose any python user has installed. Organizations might have multiple different versions which is normal. Jep and Jpy install *.so library which is compiled and bound to a specific Python version so after installation user cannot choose the Python he would like to use.
Conclusion
As we can see Py4J wins in most categories, hence with it we can provide much better user experience. Allowing them to choose python version from UI, and install the library with ease (since no *.so is needed) and provide much more stable library (which won't crash jvm). Also Py4J is already used it big projects like Spark, which shows that it should be stable. The only drawback it has is that performance is expected to be a bit slower than of other two libs. However since performance on transform is not bottleneck for pipelines this should not be a problem for us.
Table of Contents
Table of Contents style circle
Checklist
- User stories documented
- User stories reviewed
- Design documented
- Design reviewed
- Feature merged
- Examples and guides
- Integration tests
- Documentation for feature
- Short video demonstrating the feature
Introduction
CDAP provides Python Evaluator transform that allows user provided python code to be executed in a transform plugin. The current solution uses jython 2.5.x and does not have support for allowing any standard/native libraries to be used
Use case(s)
- Python evaluator transform should allow capabilities to use standard python libraries in the user provided code.
- Python evaluator transform should allow capabilities to use native third-party libraries (using numpy, scipy other scientific libs)
- Python 3 should be supported
- Publish benchmark of running transformations on a large datasets (~10 million records)
- Code for the python transform should be separated out into a repo in data-integrations org
Deliverables
- Source code in data integrations org
- Performance test on a large dataset
- Integration test code
- Relevant documentation in the source repo and reference documentation section in plugin
Relevant links
- Existing Python plugin code: https://github.com/caskdata/hydrator-plugins/tree/develop/
- Data-integrations org: https://github.com/data-integrations/
- Integration test repos: https://github.com/caskdata/cdap-integration-tests
Plugin Type
- Batch Source
- Batch Sink
- Real-time Source
- Real-time Sink
- Transform
- Action
- Post-Run Action
- Aggregate
- Join
- Spark Model
- Spark Compute
Configurables
This section defines properties that are configurable for this plugin.
User Facing Name | Type | Description | Constraints |
---|---|---|---|
Execution Mode | String | Interpreted (default): use Jython to execute code (how it worked before). Pros: no need to install anything for it to work Cons: only python2. A lot of native C libraries are not supported. Native: use py4j to execute code as pure CPython Pros: Any python version can be used (including python3.*). All C libraries including scientific data analysis libs (numpy, scipy) and default C libs are supported Cons: user will need to install py4j on all the nodes | |
PYTHONPATH | String | PYTHONPATH to libraries which user may want to use. By default it's empty and we allow it to be empty it's not required. Python will have it's default PYTHONPATH anyway. | |
Path to Python binary | String | Path to Python binary. User is be able to provide a python binary of the any version. Both python2 and python3 will work. This is a field user is required to fill. Default is empty. | |
Timeout of Python process | Long | Timeout of python transformation (for a single record) Transform operation will have to wait for Python process to finish. So it's inevitable that we need some kind of timeout not to block it indefinitely. Default: 600 seconds |
Example of Json:
No Format |
---|
{
"name":"Python",
"plugin":{
"name":"PythonEvaluator",
"type":"transform",
"label":"Python",
"properties":{
"pythonBinary":"/usr/bin/python",
"pythonpath":"",
"pythonProcessTimeout":600,
"script":"def transform(record, emitter, context):\n \"\"\"Transforms the provided input record into zero or more output records (as dictionaries) or errors (as an error dictionary).\n\n Input records are available as a dictionary.\n\n Args:\n record (dict): The input record as a dictionary. e.g. to access a field called 'total' from the input record, use record['total'].\n emitter (Emitter): an object that can be used to emit zero or more records (using the emitter.emit() method) or errors (using the emitter.emitError() method).\n context (Context): an object that provides access to:\n 1. CDAP Metrics - context.getMetrics().count('output', 1);\n 2. CDAP Logs - context.getLogger().debug('Received a record');\n 3. Lookups - context.getLookup('blacklist').lookup(input.id); or\n 4. Runtime Arguments - context.getArguments.get('priceThreshold')\n\n \"\"\"\n if record['company_name'].startswith(\"q\"):\n emitter.emit(record)\n",
"schema":"{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"company_name\",\"type\":\"string\"},{\"name\":\"primary_role\",\"type\":\"string\"},{\"name\":\"created_at\",\"type\":\"string\"},{\"name\":\"updated_at\",\"type\":\"string\"}]}"
}
}
} |
Design
Previously we used Jython which emulated how CPython works. However it cannot read pure C libraries like some default python libraries and most scientific libs (numpy, scipy - all C written), which are very useful for data analysis. Also Jython is not realeased for Python3.
So the solution is to use pure CPython process and a python library Py4j. This library allows a Python process to communicate with JVM via plain file sockets to exchange data. Also it transforms class hierarchies and primitive types between Java and Python.
Here's how the process goes:
Intialize Stage:
Create certificates for mutual authentication. Files should be created in unique tmp dir not to interfere with certificates used by other jobs, which might run in parallel (permissions 600). Since pipelines usually run in a YARN cluster, where containers from any other application from any other user may be running on the same host. It's possible for one of those other containers to connect to the right port and do malicious things. That's why even though all connections are localhost this is still a vulnerability and we will also have to use mutual authentication.
Start a Python process and pass a unique tmp dir as a parameter.
a) Starts JavaGateway on ephemeral port to avoid port conflicts when multiple transformations are running on the same host.
b) writes the dynamic port to file since it needs to be known by Java code.
- Java code needs to know the port of python gateway. So it waits for it in stdout of the python process.
Java code starts Py4J.Gateway to communicate with python process over plain sockets.
Transform Stage:
Python transform method is being called from jvm, providing the correct parameters
Destroy Stage:
Destroy might not be called while running in streaming mode or when job is getting killed. That's why there is a part in python code will automatically wait for java connection closed and if this happens it ends the process:
Code Block | ||
---|---|---|
| ||
def on_server_connection_stopped(signal, sender, **params):
gateway.shutdown()
java_gateway.server_connection_stopped.connect(on_server_connection_stopped) |
Link from docs https://www.py4j.org/py4j_java_gateway.html#py4j-core-signals
- Close client connection in jvm itself
- Remove temporary certificates.
Pseudocode:
Code Block | ||
---|---|---|
| ||
import py4j.GatewayServer;
public class PythonEvaluator extends ... {
@Override
public void initialize(TransformContext context) {
// ...
// 1. generate certificates
// 2. run python process
// 3. wait until the port is emitted in stdout. Throw exception after certain timeout. Let's say 120seconds
// 4.
GatewayServer server = new GatewayServer((Object)null, 0, pythonPort, 0, 0, (List)null); // java port is 0 - ephemeral, python port is the port chosen by python process
server.start(true); // starts server
// pythonTransformTransport is an interface
pythonTransformTransport = (pythonTransformTransport) server.getPythonServerEntryPoint(new Class[] { pythonTransformTransport.class });
}
@Override
public void transform(StructuredRecord input, final Emitter<StructuredRecord> emitter) {
try {
pythonTransformTransport.transform(record, emitter, context);
} catch (PyException e) {
// logic to format stack trace from python correctly into java exception message
}
}
@Override
public void destroy() {
if(pythonTransformTransport != null) {
pythonTransformTransport.finish(); // ask Python process to close connection, since it is the server and it controls connection
}
if(server != null) {
server.shutdown();
}
// remove certificates
}
} |
Code Block | ||
---|---|---|
| ||
from threading import Thread from py4j.java_gateway import JavaGateway |
, CallbackServerParameters class PythonTransformTransportImpl(object): def transform(self, record, emitter, context): # code provided by user # emitter.emit(record); # sends data to JVM def finish(self): |
|
|
# |
close |
the |
gateway server. |
This |
methods |
needs |
to |
return |
hence |
the |
delay. |
|
|
|
|
|
|
|
This code includes the section that user provided from UI as well as some extra code in order to connect to JVM via py4j and to run transform code.
The information changed by transform is sent by calling emitter.emit(input) to JVM. Thread.timer(0.1, self.gateway.shutdown)
process.start()
class Java:
implements = ["co.cask.hydrator.plugin.transform.pythonTransformTransport"]
python_transform_transport = PythonTransformTransportImpl()
gateway = JavaGateway(
callback_server_parameters=CallbackServerParameters(port=0), # use ephemeral port
python_server_entry_point=python_transform_transport)
python_transform_transport.gateway = gateway
port = gateway.get_callback_server().get_listening_port()
print port |
Limitation(s)
- User will have to install Py4J library by himself (either for python2 or for python3). This can be done by running "pip install py4j" or "pip3 install py4j"
- User will have to install Python by himself.
There is no easy way to install py4j library automatically along with transform plugin unfortunately. We have to knowledge of version of python the user is going to use (python2 or python3). Also plugin is a single jar which does not include any additional files. For users who are installing CDAP from CM/Ambari/Docker image we could however pre-install py4j library.
Justification of using Py4J
We were to choose between 3 most popular solutions which enable executing Python code in scope of Java application. Here's a comparison table of them.
Py4J | Jep | Jpy | |
---|---|---|---|
License | BSD | zlib/libpng | Apache |
Mechanism of work | CPython process and java thread transfer data using sockets | JNI (via *.so library which has access to python) | JNI (via *.so library which has access to python) |
Anticipated performance | Normal | High (due to low-level mechanism) | High+ (due to low-level mechanism). Project states it was specifically designed to meet high performance goals |
Needs achitecture dependent files (*.so) | - | + | + |
Importing native libraries | Any native libraries | Can read almost all. Including numpy, scipy | Can read almost all. Including numpy, scipy |
Stability | Very high | High | High |
Ability to choose python version to use | + | - | - |
Installation/Running | Simple | Simple | Slighting advanced (building from sources and setting paths) |
Needs achitecture dependent files (*.so)
If library needs *.so file this gets a bit complicated. We will need to ask user to install the library with a simple command. Before using the transform pipeline. Since these *.so files are not only dependent on processor achitecture, but also dependent on very specific version of Python it is used with. I will create another comment to discuss this, cause there are other points to make.
Importing native libraries / Stability
Py4j uses real Python process no problems here everything will work. For Jep and Jpy they are using real CPython as well, but they are calling it via *.so libraries, which is not the usual way. And some presumably small number of libs can have a problem with this approach. From Jep docs:
works with most numpy, scipy, pandas, tensorflow, matplotlib, cvxpy
Jep should work with any pure Python modules. CPython extensions and Cython modules may or may not work correctly
Jep doesn't work correctly with some CPython extensions due to how those extensions were coded. Oftentimes projects were not designed towards the idea of running in embedded sub-interpreters instead of in a standard, single Python interpreter. When running Jep, these types of problems have been known to appear as a JVM crash, an attribute inexplicably set to None, or a thread deadlock.
Devs claim that on latest version jvm crash should not appear when using odd libs, when using another method of loading the module.
Ability to choose python version to use
Py4j will allow to choose any python user has installed. Organizations might have multiple different versions which is normal. Jep and Jpy install *.so library which is compiled and bound to a specific Python version so after installation user cannot choose the Python he would like to use.
Conclusion
As we can see Py4J wins in most categories, hence with it we can provide much better user experience. Allowing them to choose python version from UI, and install the library with ease (since no *.so is needed) and provide much more stable library (which won't crash jvm). Also Py4J is already used it big projects like Spark, which shows that it should be stable. The only drawback it has is that performance is expected to be a bit slower than of other two libs. However since performance on transform is not bottleneck for pipelines this should not be a problem for us.
Table of Contents
Table of Contents style circle
Checklist
- User stories documented
- User stories reviewed
- Design documented
- Design reviewed
- Feature merged
- Examples and guides
- Integration tests
- Documentation for feature
- Short video demonstrating the feature