Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Introduction

CDAP provides Python Evaluator transform that allows user provided python code to be executed in a transform plugin. The current solution uses jython 2.5.x and does not have support for allowing any standard/native libraries to be used

Use case(s)

  • Python evaluator transform should allow capabilities to use standard python libraries in the user provided code. 
  • Python evaluator transform should allow capabilities to use native third-party libraries (using numpy, scipy other scientific libs)
  • Python 3 should be supported
  • Publish benchmark of running transformations on a large datasets (~10 million records)
  • Code for the python transform should be separated out into a repo in data-integrations org

Deliverables 

  • Source code in data integrations org
  • Performance test on a large dataset
  • Integration test code 
  • Relevant documentation in the source repo and reference documentation section in plugin

Relevant links 

Plugin Type

  •  Batch Source
  •  Batch Sink 
  •  Real-time Source
  •  Real-time Sink
  •  Transform
  •  Action
  •  Post-Run Action
  •  Aggregate
  •  Join
  •  Spark Model
  •  Spark Compute

Configurables

This section defines properties that are configurable for this plugin. 

User Facing NameTypeDescriptionConstraints
Execution ModeString

Interpreted (default): use Jython to execute code (how it worked before).

Pros: no need to install anything for it to work

Cons: only python2. A lot of native C libraries are not supported.



Native: use py4j to execute code as pure CPython

Pros: Any python version can be used (including python3.*). All C libraries including scientific data analysis libs (numpy, scipy) and default C libs are supported

Cons: user will need to install py4j on all the nodes


PYTHONPATHString

PYTHONPATH to libraries which user may want to use.

By default it's empty and we allow it to be empty it's not required. Python will have it's default PYTHONPATH anyway.


Path to Python binaryString

Path to Python binary. User is be able to provide a python binary of the any version. Both python2 and python3 will work.

This is a field user is required to fill. Default is empty.


Timeout of Python processLong

Timeout of python transformation (for a single record)

Transform operation will have to wait for Python process to finish. So it's inevitable that we need some kind of timeout not to block it indefinitely.

Default: 600 seconds


Example of Json:

No Format
{
   "name":"Python",
   "plugin":{
      "name":"PythonEvaluator",
      "type":"transform",
      "label":"Python",
      "properties":{
         "pythonBinary":"/usr/bin/python",
         "pythonpath":"",
         "pythonProcessTimeout":600,
         "script":"def transform(record, emitter, context):\n    \"\"\"Transforms the provided input record into zero or more output records (as dictionaries) or errors (as an error dictionary).\n\n    Input records are available as a dictionary.\n\n    Args:\n        record (dict): The input record as a dictionary. e.g. to access a field called 'total' from the input record, use record['total'].\n        emitter (Emitter): an object that can be used to emit zero or more records (using the emitter.emit() method) or errors (using the emitter.emitError() method).\n        context (Context): an object that provides access to:\n                           1. CDAP Metrics - context.getMetrics().count('output', 1);\n                           2. CDAP Logs - context.getLogger().debug('Received a record');\n                           3. Lookups - context.getLookup('blacklist').lookup(input.id); or\n                           4. Runtime Arguments - context.getArguments.get('priceThreshold')\n\n    \"\"\"\n    if record['company_name'].startswith(\"q\"):\n      emitter.emit(record)\n",
         "schema":"{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"company_name\",\"type\":\"string\"},{\"name\":\"primary_role\",\"type\":\"string\"},{\"name\":\"created_at\",\"type\":\"string\"},{\"name\":\"updated_at\",\"type\":\"string\"}]}"
      }
   }
}



Design

Previously we used Jython which emulated how CPython works. However it cannot read pure C libraries like some default python libraries and most scientific libs (numpy, scipy - all C written), which are very useful for data analysis. Also Jython is not realeased for Python3.

So the solution is to use pure CPython process and a python library Py4j. This library allows a Python process to communicate with JVM via plain file sockets to exchange data. Also it transforms class hierarchies and primitive types between Java and Python.

Here's how the process goes:

Intialize Stage:

  1. Create certificates for mutual authentication. Files should be created in unique tmp dir not to interfere with certificates used by other jobs, which might run in parallel  (permissions 600). Since pipelines usually run in a YARN cluster, where containers from any other application from any other user may be running on the same host. It's possible for one of those other containers to connect to the right port and do malicious things. That's why even though all connections are localhost this is still a vulnerability and we will also have to use mutual authentication.

  2. Start a Python process . Which starts and pass a unique tmp dir as a parameter.

    a) Starts JavaGateway on ephemeral port to avoid port conflicts when multiple transformations are running on the same host. 

  3. Java code starts Py4J.Gateway to communicate with python process over plain sockets. 

Transform
  1. b) writes the dynamic port to file since it needs to be known by Java code.

  2. Java code needs to know the port of python gateway. So it waits for it in stdout of the python process.
  3. Java code starts Py4J.Gateway to communicate with python process over plain sockets. 

Transform Stage:

  1. Python transform method is being called from jvm, providing the correct parameters

Destroy Stage:

  • Call method finish to ask python process (which is a server) to close the connection. Which will automatically finish python process. Since no threads will be running.
  • Close client connection in

    Destroy might not be called while running in streaming mode or when job is getting killed. That's why there is a part in python code, which will automatically wait for java connection closed and if this happens it ends the process:

    Code Block
    languagepy
    def on_server_connection_stopped(signal, sender, **params):
        gateway.shutdown()
    
    java_gateway.server_connection_stopped.connect(on_server_connection_stopped)

    Link from py4j docs https://www.py4j.org/py4j_java_gateway.html#py4j-core-signals server_connection_stopped is called once java disconnects.

    Java code destroy still runs:

    1. Close client connection in jvm itself
    2. Remove temporary folder.

    If Process dies connection also gets closed and so python process dies as well. Regarding the tmp_folder we cannot delete it from python. Because in that case we won't be able to get output of process in destroy or during exception in python transform (output is in file output.txt and is saved only at the death of python process). 


    Pseudocode:

    Code Block
    languagejava
    import py4j.GatewayServer;
     
    public class PythonEvaluator extends ... {
      @Override
      public void initialize(TransformContext context) {
        // ... 
        
        // 1. generate certificates
        // 2. run python process
        
        // 3. wait until the port is emitted in stdout. Throw exception after certain timeout. Let's say 120seconds
        // 4.
        GatewayServer server = new GatewayServer((Object)null, 0, pythonPort, 0, 0, (List)null); // java port is 0 - ephemeral, python port is the port chosen by python process
        server.start(true); // starts server
    
        // pythonTransformTransport is an interface
        pythonTransformTransport = (pythonTransformTransport) server.getPythonServerEntryPoint(new Class[] { pythonTransformTransport.class });
      }
    
      @Override
      public void transform(StructuredRecord input, final Emitter<StructuredRecord> emitter) {
        try {
          pythonTransformTransport.transform(record, emitter, context);
        } catch (PyException e) {
          // logic to format stack trace from python correctly into java exception message 
        }
      }
    
      @Override
      public void destroy() {
        if(pythonTransformTransport != null) {
          pythonTransformTransport.finish(); // ask Python process to close connection, since it is the server and it controls connection
        }
        if(server != null) {
          server.shutdown();
        }
    
        // remove certificates
      }
    }
    Code Block
    languagepy
    from threading import Thread
    from py4j.java_gateway import JavaGateway, CallbackServerParameters
    
    class PythonTransformTransportImpl(object):
    
        def transform(self, record, emitter, context):
            # code provided by user
            # emitter.emit(record); # sends data to JVM
    
        def finish(self):
            # close the gateway server. This methods needs to return hence the delay.
            Thread.timer(0.1, self.gateway.shutdown)
            process.start()
    
        class Java:
            implements = ["co.cask.hydrator.plugin.transform.pythonTransformTransport"]
    
    python_transform_transport = PythonTransformTransportImpl()
    gateway = JavaGateway(
        callback_server_parameters=CallbackServerParameters(port=0), # use ephemeral port
        python_server_entry_point=python_transform_transport)
    python_transform_transport.gateway = gateway
    
    
    port = gateway.get_callback_server().get_listening_port()
    print port


    Limitation(s)

    • User will have to install Py4J library by himself (either for python2 or for python3). This can be done by running "pip install py4j" or "pip3 install py4j"
    • User will have to install Python by himself.


    There is no easy way to install py4j library automatically along with transform plugin unfortunately. We have to knowledge of version of python the user is going to use (python2 or python3). Also plugin is a single jar which does not include any additional files. For users who are installing CDAP from CM/Ambari/Docker image we could however pre-install py4j library. 

    Justification of using Py4J

    We were to choose between 3 most popular solutions which enable executing Python code in scope of Java application. Here's a comparison table of them.



    Py4JJepJpy
    LicenseBSDzlib/libpngApache
    Mechanism of work

    CPython process and java thread

    transfer data using sockets

    JNI (via *.so library which has access to python)JNI (via *.so library which has access to python)
    Anticipated performanceNormal


    High (due to low-level mechanism)High+ (due to low-level mechanism). Project states it was specifically designed to meet high performance goals
    Needs achitecture dependent files (*.so) -++

    Importing native libraries


    Any native librariesCan read almost all. Including numpy, scipyCan read almost all. Including numpy, scipy
    StabilityVery highHighHigh
    Ability to choose python version to use+--
    Installation/RunningSimpleSimpleSlighting advanced (building from sources and setting paths)


    Needs achitecture dependent files (*.so)

    If library needs *.so file this gets a bit complicated. We will need to ask user to install the library with a simple command. Before using the transform pipeline. Since these *.so files are not only dependent on processor achitecture, but also dependent on very specific version of Python it is used with. I will create another comment to discuss this, cause there are other points to make.

    Importing native libraries / Stability

    Py4j uses real Python process no problems here everything will work. For Jep and Jpy they are using real CPython as well, but they are calling it via *.so libraries, which is not the usual way. And some presumably small number of libs can have a problem with this approach.  From Jep docs:

    works with most numpy, scipy, pandas, tensorflow, matplotlib, cvxpy

    Jep should work with any pure Python modules. CPython extensions and Cython modules may or may not work correctly

    Jep doesn't work correctly with some CPython extensions due to how those extensions were coded. Oftentimes projects were not designed towards the idea of running in embedded sub-interpreters instead of in a standard, single Python interpreter. When running Jep, these types of problems have been known to appear as a JVM crash, an attribute inexplicably set to None, or a thread deadlock.

    Devs claim that on latest version jvm crash should not appear when using odd libs, when using another method of loading the module. 

    Ability to choose python version to use

    Py4j will allow to choose any python user has installed. Organizations might have multiple different versions which is normal. Jep and Jpy install *.so library which is compiled and bound to a specific Python version so after installation user cannot choose the Python he would like to use.

    Conclusion

    As we can see Py4J wins in most categories, hence with it we can provide much better user experience. Allowing them to choose python version from UI, and install the library with ease (since no *.so is needed) and provide much more stable library (which won't crash jvm). Also Py4J is already used it big projects like Spark, which shows that it should be stable. The only drawback it has is that performance is expected to be a bit slower than of other two libs. However since performance on transform is not bottleneck for pipelines this should not be a problem for us.



    Table of Contents

    Table of Contents
    stylecircle

    Checklist

    •  User stories documented 
    •  User stories reviewed 
    •  Design documented 
    •  Design reviewed 
    •  Feature merged 
    •  Examples and guides 
    •  Integration tests 
    •  Documentation for feature 
    •  Short video demonstrating the feature

    Introduction

    CDAP provides Python Evaluator transform that allows user provided python code to be executed in a transform plugin. The current solution uses jython 2.5.x and does not have support for allowing any standard/native libraries to be used

    Use case(s)

    • Python evaluator transform should allow capabilities to use standard python libraries in the user provided code. 
    • Python evaluator transform should allow capabilities to use native third-party libraries (using numpy, scipy other scientific libs)
    • Python 3 should be supported
    • Publish benchmark of running transformations on a large datasets (~10 million records)
    • Code for the python transform should be separated out into a repo in data-integrations org

    Deliverables 

    • Source code in data integrations org
    • Performance test on a large dataset
    • Integration test code 
    • Relevant documentation in the source repo and reference documentation section in plugin

    Relevant links 

    Plugin Type

    •  Batch Source
    •  Batch Sink 
    •  Real-time Source
    •  Real-time Sink
    •  Transform
    •  Action
    •  Post-Run Action
    •  Aggregate
    •  Join
    •  Spark Model
    •  Spark Compute

    Configurables

    This section defines properties that are configurable for this plugin. 

    User Facing NameTypeDescriptionConstraints
    Execution ModeString

    Interpreted (default): use Jython to execute code (how it worked before).

    Pros: no need to install anything for it to work

    Cons: only python2. A lot of native C libraries are not supported.



    Native: use py4j to execute code as pure CPython

    Pros: Any python version can be used (including python3.*). All C libraries including scientific data analysis libs (numpy, scipy) and default C libs are supported

    Cons: user will need to install py4j on all the nodes


    PYTHONPATHString

    PYTHONPATH to libraries which user may want to use.

    By default it's empty and we allow it to be empty it's not required. Python will have it's default PYTHONPATH anyway.


    Path to Python binaryString

    Path to Python binary. User is be able to provide a python binary of the any version. Both python2 and python3 will work.

    This is a field user is required to fill. Default is empty.


    Timeout of Python processLong

    Timeout of python transformation (for a single record)

    Transform operation will have to wait for Python process to finish. So it's inevitable that we need some kind of timeout not to block it indefinitely.

    Default: 600 seconds


    Example of Json:

    No Format
    {
       "name":"Python",
       "plugin":{
          "name":"PythonEvaluator",
          "type":"transform",
          "label":"Python",
          "properties":{
             "pythonBinary":"/usr/bin/python",
             "pythonpath":"",
             "pythonProcessTimeout":600,
             "script":"def transform(record, emitter, context):\n    \"\"\"Transforms the provided input record into zero or more output records (as dictionaries) or errors (as an error dictionary).\n\n    Input records are available as a dictionary.\n\n    Args:\n        record (dict): The input record as a dictionary. e.g. to access a field called 'total' from the input record, use record['total'].\n        emitter (Emitter): an object that can be used to emit zero or more records (using the emitter.emit() method) or errors (using the emitter.emitError() method).\n        context (Context): an object that provides access to:\n                           1. CDAP Metrics - context.getMetrics().count('output', 1);\n                           2. CDAP Logs - context.getLogger().debug('Received a record');\n                           3. Lookups - context.getLookup('blacklist').lookup(input.id); or\n                           4. Runtime Arguments - context.getArguments.get('priceThreshold')\n\n    \"\"\"\n    if record['company_name'].startswith(\"q\"):\n      emitter.emit(record)\n",
             "schema":"{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"company_name\",\"type\":\"string\"},{\"name\":\"primary_role\",\"type\":\"string\"},{\"name\":\"created_at\",\"type\":\"string\"},{\"name\":\"updated_at\",\"type\":\"string\"}]}"
          }
       }
    }



    Design

    Previously we used Jython which emulated how CPython works. However it cannot read pure C libraries like some default python libraries and most scientific libs (numpy, scipy - all C written), which are very useful for data analysis. Also Jython is not realeased for Python3.

    So the solution is to use pure CPython process and a python library Py4j. This library allows a Python process to communicate with JVM via plain file sockets to exchange data. Also it transforms class hierarchies and primitive types between Java and Python.

    Here's how the process goes:

    Intialize Stage:

    1. Create certificates for mutual authentication. Files should be created in unique tmp dir not to interfere with certificates used by other jobs, which might run in parallel  (permissions 600). Since pipelines usually run in a YARN cluster, where containers from any other application from any other user may be running on the same host. It's possible for one of those other containers to connect to the right port and do malicious things. That's why even though all connections are localhost this is still a vulnerability and we will also have to use mutual authentication.

    2. Start a Python process and pass a unique tmp dir as a parameter.

      a) Starts JavaGateway on ephemeral port to avoid port conflicts when multiple transformations are running on the same host. 

      b) writes the dynamic port to file since it needs to be known by Java code.

    3. Java code needs to know the port of python gateway. So it waits for it in stdout of the python process.
    4. Java code starts Py4J.Gateway to communicate with python process over plain sockets. 

    Transform Stage:

    1. Python transform method is being called from jvm, providing the correct parameters

    Destroy Stage:

    Destroy might not be called while running in streaming mode or when job is getting killed. That's why there is a part in python code will automatically wait for java connection closed and if this happens it ends the process:

    Code Block
    languagepy
    def on_server_connection_stopped(signal, sender, **params):
        gateway.shutdown()
    
    java_gateway.server_connection_stopped.connect(on_server_connection_stopped)

    Link from docs https://www.py4j.org/py4j_java_gateway.html#py4j-core-signals


    1. Close client connection in jvm itself
    2. Remove temporary certificates.

    Pseudocode:

    import py4j.GatewayServer; public class PythonEvaluator extends ... { @Override public void initialize(TransformContext context) { // ...
    Code Block
    languagejava
    Code Block
    languagejava
    import py4j.GatewayServer;
     
    public class PythonEvaluator extends ... {
      @Override
      public void initialize(TransformContext context) {
        // ... 
        
        // 1. generate certificates
        // 2. run python process
        
        // 3. wait until the port is emitted in stdout. Throw exception after certain timeout. Let's say 120seconds
        // a)4.
    generate certificates   GatewayServer server // b) run python process
        
        // c)
        GatewayServer server = new GatewayServer();= new GatewayServer((Object)null, 0, pythonPort, 0, 0, (List)null); // java port is 0 - ephemeral, python port is the port chosen by python process
        server.start(true); // starts server
    
        // pythonTransformTransport is an interface
        pythonTransformTransport = (pythonTransformTransport) server.getPythonServerEntryPoint(new Class[] { pythonTransformTransport.class });
      }
    
      @Override
      public void transform(StructuredRecord input, final Emitter<StructuredRecord> emitter) {
        try {
          pythonTransformTransport.transform(record, emitter, context);
        } catch (PyException e) {
          // logic to format stack trace from python correctly into java exception message 
        }
      }
    
      @Override
      public void destroy() {
        if(pythonTransformTransport != null) {
          pythonTransformTransport.finish(); // ask Python process to close connection, since it is the server and it controls connection
        }
        if(server != null) {
          server.shutdown();
        }
    
        // remove certificates
      }
    }
    Code Block
    languagepy
    from threading import Thread
    from py4j.java_gateway import JavaGateway, CallbackServerParameters
    
    class PythonTransformTransportImpl(object):
    
        def transform(self, record, emitter, context):
            # code provided by user
            # emitter.emit(record); # sends data to JVM
    
        def finish(self):
            //# close the gateway server. This methods needs to return hence the delay.
            Thread.timer(0.1, self.gateway.shutdown)
            process.start()
    
        class Java:
            implements = ["co.cask.hydrator.plugin.transform.pythonTransformTransport"]
    
    python_transform_transport = PythonTransformTransportImpl()
    gateway = JavaGateway(
        callback_server_parameters=CallbackServerParameters(port=0), //# use ephemeral port
        python_server_entry_point=python_transform_transport)
    simplepython_transform_hellotransport.gateway = gateway
    
    
    port = gateway.get_callback_server().get_listening_port()
    print port


    Limitation(s)

    • User will have to install Py4J library by himself (either for python2 or for python3). This can be done by running "pip install py4j" or "pip3 install py4j"
    • User will have to install Python by himself.


    There is no easy way to install py4j library automatically along with transform plugin unfortunately. We have to knowledge of version of python the user is going to use (python2 or python3). Also plugin is a single jar which does not include any additional files. For users who are installing CDAP from CM/Ambari/Docker image we could however pre-install py4j library. 

    Justification of using Py4J

    We were to choose between 3 most popular solutions which enable executing Python code in scope of Java application. Here's a comparison table of them.



    Py4JJepJpy
    LicenseBSDzlib/libpngApache
    Mechanism of work

    CPython process and java thread

    transfer data using sockets

    JNI (via *.so library which has access to python)JNI (via *.so library which has access to python)
    Anticipated performanceNormal


    High (due to low-level mechanism)High+ (due to low-level mechanism). Project states it was specifically designed to meet high performance goals
    Needs achitecture dependent files (*.so) -++

    Importing native libraries


    Any native librariesCan read almost all. Including numpy, scipyCan read almost all. Including numpy, scipy
    StabilityVery highHighHigh
    Ability to choose python version to use+--
    Installation/RunningSimpleSimpleSlighting advanced (building from sources and setting paths)


    Needs achitecture dependent files (*.so)

    If library needs *.so file this gets a bit complicated. We will need to ask user to install the library with a simple command. Before using the transform pipeline. Since these *.so files are not only dependent on processor achitecture, but also dependent on very specific version of Python it is used with. I will create another comment to discuss this, cause there are other points to make.

    Importing native libraries / Stability

    Py4j uses real Python process no problems here everything will work. For Jep and Jpy they are using real CPython as well, but they are calling it via *.so libraries, which is not the usual way. And some presumably small number of libs can have a problem with this approach.  From Jep docs:

    works with most numpy, scipy, pandas, tensorflow, matplotlib, cvxpy

    Jep should work with any pure Python modules. CPython extensions and Cython modules may or may not work correctly

    Jep doesn't work correctly with some CPython extensions due to how those extensions were coded. Oftentimes projects were not designed towards the idea of running in embedded sub-interpreters instead of in a standard, single Python interpreter. When running Jep, these types of problems have been known to appear as a JVM crash, an attribute inexplicably set to None, or a thread deadlock.

    Devs claim that on latest version jvm crash should not appear when using odd libs, when using another method of loading the module. 

    Ability to choose python version to use

    Py4j will allow to choose any python user has installed. Organizations might have multiple different versions which is normal. Jep and Jpy install *.so library which is compiled and bound to a specific Python version so after installation user cannot choose the Python he would like to use.

    Conclusion

    As we can see Py4J wins in most categories, hence with it we can provide much better user experience. Allowing them to choose python version from UI, and install the library with ease (since no *.so is needed) and provide much more stable library (which won't crash jvm). Also Py4J is already used it big projects like Spark, which shows that it should be stable. The only drawback it has is that performance is expected to be a bit slower than of other two libs. However since performance on transform is not bottleneck for pipelines this should not be a problem for us.



    Table of Contents

    Table of Contents
    stylecircle

    Checklist

    •  User stories documented 
    •  User stories reviewed 
    •  Design documented 
    •  Design reviewed 
    •  Feature merged 
    •  Examples and guides 
    •  Integration tests 
    •  Documentation for feature 
    •  Short video demonstrating the feature