Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Introduction

CDAP provides Python Evaluator transform that allows user provided python code to be executed in a transform plugin. The current solution uses jython 2.5.x and does not have support for allowing any standard/native libraries to be used

Use case(s)

  • Python evaluator transform should allow capabilities to use standard python libraries in the user provided code. 
  • Python evaluator transform should allow capabilities to use native third-party libraries (using numpy, scipy other scientific libs)
  • Python 3 should be supported
  • Publish benchmark of running transformations on a large datasets (~10 million records)
  • Code for the python transform should be separated out into a repo in data-integrations org

Deliverables 

  • Source code in data integrations org
  • Performance test on a large dataset
  • Integration test code 
  • Relevant documentation in the source repo and reference documentation section in plugin

Relevant links 

Plugin Type

  •  Batch Source
  •  Batch Sink 
  •  Real-time Source
  •  Real-time Sink
  •  Transform
  •  Action
  •  Post-Run Action
  •  Aggregate
  •  Join
  •  Spark Model
  •  Spark Compute

Configurables

This section defines properties that are configurable for this plugin. 

User Facing NameTypeDescriptionConstraints
Execution ModeString

Interpreted (default): use Jython to execute code (how it worked before).

Pros: no need to install anything for it to work

Cons: only python2. A lot of native C libraries are not supported.



Native: use py4j to execute code as pure CPython

Pros: Any python version can be used (including python3.*). All C libraries including scientific data analysis libs (numpy, scipy) and default C libs are supported

Cons: user will need to install py4j on all the nodes


PYTHONPATHString

PYTHONPATH to libraries which user may want to use.

By default it's empty and we allow it to be empty it's not required. Python will have it's default PYTHONPATH anyway.


Path to Python binaryString

Path to Python binary. User is be able to provide a python binary of the any version. Both python2 and python3 will work.

This is a field user is required to fill. Default is empty.


Timeout of Python processLong

Timeout of python transformation (for a single record)

Transform operation will have to wait for Python process to finish. So it's inevitable that we need some kind of timeout not to block it indefinitely.

Default: 600 seconds


Example of Json:

No Format
{
   "name":"Python",
   "plugin":{
      "name":"PythonEvaluator",
      "type":"transform",
      "label":"Python",
      "properties":{
         "pythonBinary":"/usr/bin/python",
         "pythonpath":"",
         "pythonProcessTimeout":600,
         "script":"def transform(record, emitter, context):\n    \"\"\"Transforms the provided input record into zero or more output records (as dictionaries) or errors (as an error dictionary).\n\n    Input records are available as a dictionary.\n\n    Args:\n        record (dict): The input record as a dictionary. e.g. to access a field called 'total' from the input record, use record['total'].\n        emitter (Emitter): an object that can be used to emit zero or more records (using the emitter.emit() method) or errors (using the emitter.emitError() method).\n        context (Context): an object that provides access to:\n                           1. CDAP Metrics - context.getMetrics().count('output', 1);\n                           2. CDAP Logs - context.getLogger().debug('Received a record');\n                           3. Lookups - context.getLookup('blacklist').lookup(input.id); or\n                           4. Runtime Arguments - context.getArguments.get('priceThreshold')\n\n    \"\"\"\n    if record['company_name'].startswith(\"q\"):\n      emitter.emit(record)\n",
         "schema":"{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"company_name\",\"type\":\"string\"},{\"name\":\"primary_role\",\"type\":\"string\"},{\"name\":\"created_at\",\"type\":\"string\"},{\"name\":\"updated_at\",\"type\":\"string\"}]}"
      }
   }
}



Design

Previously we used Jython which emulated how CPython works. However it cannot read pure C libraries like some default python libraries and most scientific libs (numpy, scipy - all C written), which are very useful for data analysis. Also Jython is not realeased for Python3.

So the solution is to use pure CPython process and a python library Py4j. This library allows a Python process to communicate with JVM via plain file sockets to exchange data. Also it transforms class hierarchies and primitive types between Java and Python.

Here's how the process goes:

Intialize Stage:

  1. Create certificates for mutual authentication. Files should be created in unique tmp dir not to interfere with certificates used by other jobs, which might run in parallel  (permissions 600). Since pipelines usually run in a YARN cluster, where containers from any other application from any other user may be running on the same host. It's possible for one of those other containers to connect to the right port and do malicious things. That's why even though all connections are localhost this is still a vulnerability and we will also have to use mutual authentication.

  2. Start a Python process and pass a unique tmp dir as a parameter.

    a) Starts JavaGateway on ephemeral port to avoid port conflicts when multiple transformations are running on the same host. 

    b) writes the dynamic port to file since it needs to be known by Java code.

  3. Java code needs to know the port of python gateway. So it waits for it in stdout of the python process.
  4. Java code starts Py4J.Gateway to communicate with python process over plain sockets. 

Transform Stage:

  1. Python transform method is being called from jvm, providing the correct parameters

Destroy Stage:

  1. Call method "finish" from python process to ask it to close the connection. Which will automatically finish python process. Since no threads will be running.
  2. Close client connection in jvm itself
  3. Remove temporary certificates.

Pseudocode:

Code Block
languagejava
import py4j.GatewayServer;
 
public class PythonEvaluator extends ... {
  @Override
  public void initialize(TransformContext context) {
    // ... 
    
    // 1. generate certificates
    // 2. run python process
    
    // 3. wait until the port is emitted in stdout. Throw exception after certain timeout. Let's say 120seconds
    // 4.
    GatewayServer server = new GatewayServer((Object)null, 0, pythonPort, 0, 0, (List)null); // java port is 0 - ephemeral, python port is the port chosen by python process
    server.start(true); // starts server

    // pythonTransformTransport is an interface
    pythonTransformTransport = (pythonTransformTransport) server.getPythonServerEntryPoint(new Class[] { pythonTransformTransport.class });
  }

  @Override
  public void transform(StructuredRecord input, final Emitter<StructuredRecord> emitter) {
    try {
      pythonTransformTransport.transform(record, emitter, context);
    } catch (PyException e) {
      // logic to format stack trace from python correctly into java exception message 
    }
  }

  @Override
  public void destroy() {
    if(pythonTransformTransport != null) {
      pythonTransformTransport.finish(); // ask Python process to close connection, since it is the server and it controls connection
    }
    if(server != null) {
      server.shutdown();
    }

    // remove certificates
  }
}
Code Block
languagepy
from threading import Thread
from py4j.java_gateway import JavaGateway, CallbackServerParameters

class PythonTransformTransportImpl(object):

    def transform(self, record, emitter, context):
        # code provided by user
        # emitter.emit(record); # sends data to JVM

    def finish(self):
        # close the gateway server. This methods needs to return hence the delay.
        Thread.timer(0.1, self.gateway.shutdown)
        process.start()

    class Java:
        implements = ["co.cask.hydrator.plugin.transform.pythonTransformTransport"]

python_transform_transport = PythonTransformTransportImpl()
gateway = JavaGateway(
    callback_server_parameters=CallbackServerParameters(port=0), //# use ephemeral port
    python_server_entry_point=python_transform_transport)
python_transform_transport.gateway = gateway


port = gateway.get_callback_server().get_listening_port()
print port


Limitation(s)

  • User will have to install Py4J library by himself (either for python2 or for python3). This can be done by running "pip install py4j" or "pip3 install py4j"
  • User will have to install Python by himself.


There is no easy way to install py4j library automatically along with transform plugin unfortunately. We have to knowledge of version of python the user is going to use (python2 or python3). Also plugin is a single jar which does not include any additional files. For users who are installing CDAP from CM/Ambari/Docker image we could however pre-install py4j library. 

Justification of using Py4J

We were to choose between 3 most popular solutions which enable executing Python code in scope of Java application. Here's a comparison table of them.



Py4JJepJpy
LicenseBSDzlib/libpngApache
Mechanism of work

CPython process and java thread

transfer data using sockets

JNI (via *.so library which has access to python)JNI (via *.so library which has access to python)
Anticipated performanceNormal


High (due to low-level mechanism)High+ (due to low-level mechanism). Project states it was specifically designed to meet high performance goals
Needs achitecture dependent files (*.so) -++

Importing native libraries


Any native librariesCan read almost all. Including numpy, scipyCan read almost all. Including numpy, scipy
StabilityVery highHighHigh
Ability to choose python version to use+--
Installation/RunningSimpleSimpleSlighting advanced (building from sources and setting paths)


Needs achitecture dependent files (*.so)

If library needs *.so file this gets a bit complicated. We will need to ask user to install the library with a simple command. Before using the transform pipeline. Since these *.so files are not only dependent on processor achitecture, but also dependent on very specific version of Python it is used with. I will create another comment to discuss this, cause there are other points to make.

Importing native libraries / Stability

Py4j uses real Python process no problems here everything will work. For Jep and Jpy they are using real CPython as well, but they are calling it via *.so libraries, which is not the usual way. And some presumably small number of libs can have a problem with this approach.  From Jep docs:

works with most numpy, scipy, pandas, tensorflow, matplotlib, cvxpy

Jep should work with any pure Python modules. CPython extensions and Cython modules may or may not work correctly

Jep doesn't work correctly with some CPython extensions due to how those extensions were coded. Oftentimes projects were not designed towards the idea of running in embedded sub-interpreters instead of in a standard, single Python interpreter. When running Jep, these types of problems have been known to appear as a JVM crash, an attribute inexplicably set to None, or a thread deadlock.

Devs claim that on latest version jvm crash should not appear when using odd libs, when using another method of loading the module. 

Ability to choose python version to use

Py4j will allow to choose any python user has installed. Organizations might have multiple different versions which is normal. Jep and Jpy install *.so library which is compiled and bound to a specific Python version so after installation user cannot choose the Python he would like to use.

Conclusion

As we can see Py4J wins in most categories, hence with it we can provide much better user experience. Allowing them to choose python version from UI, and install the library with ease (since no *.so is needed) and provide much more stable library (which won't crash jvm). Also Py4J is already used it big projects like Spark, which shows that it should be stable. The only drawback it has is that performance is expected to be a bit slower than of other two libs. However since performance on transform is not bottleneck for pipelines this should not be a problem for us.



Table of Contents

Table of Contents
stylecircle

Checklist

  •  User stories documented 
  •  User stories reviewed 
  •  Design documented 
  •  Design reviewed 
  •  Feature merged 
  •  Examples and guides 
  •  Integration tests 
  •  Documentation for feature 
  •  Short video demonstrating the feature