Testing a CDAP Application

Strategies in Testing Applications: Test Framework

CDAP comes with a convenient way to unit-test your applications with CDAP’s Test Framework. This framework starts an in-memory CDAP runtime and lets you deploy an application; start, stop and monitor programs; access datasets to validate processing results; and retrieve metrics from the application.

The base class for such tests is TestBase, which is packaged separately from the API in its own artifact because it depends on the CDAP’s runtime classes. You can include it in your test dependencies in one of two ways:

  • include all JAR files in the lib directory of the CDAP Sandbox installation, or

  • include the cdap-unit-test artifact in your Maven test dependencies (as shown in the pom.xml file of the CDAP Sandbox examples):

    1 2 3 4 5 6 7 8 . . . <dependency> <groupId>io.cdap.cdap</groupId> <artifactId>cdap-unit-test</artifactId> <version>${cdap.version}</version> <scope>test</scope> </dependency> . . .

Note that for building an application, you only need to include the CDAP API in your dependencies. For testing, however, you need the CDAP run-time. To build your test case, extend the TestBase class.

Running Tests with Spark

The TestBase class included in the cdap-unit-test3_2.12 dependency will run programs using Spark and Scala 2.12.

Running Tests from an IDE

When running tests from an IDE such IntelliJ or Eclipse, set the memory setting for the JUnit tests that are run from the IDE to an increased amount of memory. We suggest starting with:

1 -Xmx1024m

Strategies in Testing MapReduce Programs (Deprecated)

We can write unit testing for MapReduce programs. Let's write a test case for an application that uses MapReduce.x.

The PurchaseTest class should extend from TestBase:

1 2 3 public class PurchaseTest extends TestBase { @Test public void test() throws Exception {

The PurchaseApp application can be deployed using the deployApplication method from the TestBase class:

1 2 // Deploy an application ApplicationManager appManager = deployApplication(PurchaseApp.class);

The MapReduce reads from the purchases dataset. As a first step, the data to the purchases should be populated by running the PurchaseFlow and sending the data to the purchaseStream stream:

1 2 3 4 5 6 7 8 9 10 11 12 13 FlowManager flowManager = appManager.getFlowManager("PurchaseFlow").start(); // Send stream events to the "purchaseStream" Stream StreamManager streamManager = getStreamManager("purchaseStream"); streamManager.send("bob bought 3 apples for $30"); streamManager.send("joe bought 1 apple for $100"); streamManager.send("joe bought 10 pineapples for $20"); streamManager.send("cat bought 3 bottles for $12"); streamManager.send("cat bought 2 pops for $14"); // Wait for the last flowlet to process 5 events or at most 15 seconds RuntimeMetrics metrics = flowManager.getFlowletMetrics("collector"); metrics.waitForProcessed(5, 15, TimeUnit.SECONDS);

Start the MapReduce and wait for a maximum of 60 seconds:

1 2 3 // Start the MapReduce MapReduceManager mrManager = appManager.getMapReduceManager("PurchaseHistoryBuilder").start(); mrManager.waitForFinish(60, TimeUnit.SECONDS);

We can start verifying that the MapReduce was run correctly by using the PurchaseHistoryService to retrieve a customer's purchase history:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 // Start PurchaseHistoryService ServiceManager purchaseHistoryServiceManager = appManager.getServiceManager(PurchaseHistoryService.SERVICE_NAME).start(); // Wait for service startup purchaseHistoryServiceManager.waitForStatus(true); // Test service to retrieve a customer's purchase history URL url = new URL(purchaseHistoryServiceManager.getServiceURL(15, TimeUnit.SECONDS), "history/joe"); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); String historyJson; try { historyJson = new String(ByteStreams.toByteArray(conn.getInputStream()), Charsets.UTF_8); } finally { conn.disconnect(); } PurchaseHistory history = GSON.fromJson(historyJson, PurchaseHistory.class); Assert.assertEquals("joe", history.getCustomer()); Assert.assertEquals(2, history.getPurchases().size());

The assertion will verify that the correct result was received.

Strategies in Testing Spark Programs

Let's write a test case for an application that uses a Spark program.

The SparkPageRankTest class should extend from TestBase:

1 2 3 public class SparkPageRankTest extends TestBase { @Test public void test() throws Exception {

The SparkPageRankTest application can be deployed using the deployApplication method from the TestBase class:

1 2 // Deploy an application ApplicationManager appManager = deployApplication(SparkPageRankApp.class);

The Spark program reads from the backlinkURLs dataset. As a first step, data in the backlinkURLs should be populated by running the BackLinkFlow and sending the data to the stream backlinkURLStream:

1 2 3 4 5 6 7 FlowManager flowManager = appManager.getFlowManager("BackLinkFlow").start(); // Send data to the stream sendData(); // Wait for the last flowlet to process 4 events or at most 5 seconds RuntimeMetrics metrics = flowManager.getFlowletMetrics("reader"); metrics.waitForProcessed(4, 5, TimeUnit.SECONDS);

Start the Spark program and wait for a maximum of 60 seconds:

1 2 3 // Start the Spark program. SparkManager sparkManager = appManager.getSparkManager("SparkPageRankProgram").start(); sparkManager.waitForFinish(60, TimeUnit.SECONDS);

We verify that the Spark program ran correctly by using the Ranks service to check the results:

1 2 3 4 5 6 // Wait for ranks service to start serviceManager.waitForStatus(true); String response = requestService(new URL(serviceManager.getServiceURL(15, TimeUnit.SECONDS), "rank?url=http://example.com/page1")); Assert.assertEquals("14", response);

The assertion will verify that the correct result was received.

Strategies in Testing Artifacts

The Test Framework provides methods to create and deploy JAR files as artifacts. This lets you test the creation of multiple applications from the same artifact, as well as the use of plugin artifacts.

To add an artifact containing an application class:

1 2 3 4 5 6 7 // Add the artifact for a Data Pipeline app addAppArtifact(new ArtifactId(NamespaceId.DEFAULT.getNamespace(), "data-pipeline", "3.5.0"), DataPipelineApp.class, BatchSource.class.getPackage().getName(), Action.class.getPackage().getName(), PipelineConfigurable.class.getPackage().getName(), "org.apache.avro.mapred", "org.apache.avro", "org.apache.avro.generic");

The first argument is the id of the artifact; the second is the application class; and the remainder of the arguments are packages that should be included in the Export-Packages manifest attribute bundled in the JAR. The framework will trace the dependencies of the specified application class to create a JAR with those dependencies. This will mimic what happens when you actually build your application JAR using maven.

An application can then be deployed using that artifact:

1 2 3 4 5 6 7 // Create application create request ETLBatchConfig etlConfig = new ETLBatchConfig("* * * * *", source, sink, transformList); AppRequest<ETLBatchConfig> appRequest = new AppRequest<>(new ArtifactSummary("etlbatch", "3.5.0"), etlConfig); ApplicationId appId = NamespaceId.DEFAULT.app("KVToKV"); // Deploy the application ApplicationManager appManager = deployApplication(appId, appRequest);

Plugins extending the artifact can also be added:

1 2 3 4 // Add some test plugins addPluginArtifact(new ArtifactId(NamespaceId.DEFAULT.getNamespace(), "spark-plugins", "1.0.0"), APP_ARTIFACT_ID, NaiveBayesTrainer.class, NaiveBayesClassifier.class);

The first argument is the id of the plugin artifact; the second is the parent artifact it is extending; and the remainder of the arguments are classes that should be bundled in the JAR. The packages of all these classes are included in the Export-Packages manifest attribute bundled in the JAR. When adding a plugin artifact this way, it is important to include all classes in your plugin packages, even if they are not used in your test case. This is to ensure that the JAR can trace all required dependencies to correctly build the JAR.

The examples are taken from the DataPipelineTest and HydratorTestBase classes of CDAP pipelines.

Validating Test Data with SQL

Often the easiest way to verify that a test produced the right data is to run a SQL query, if the data sets involved in the test case are record-scannable, as described in Data Exploration. This can be done using a JDBC connection obtained from the test base:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 // Obtain a JDBC connection Connection connection = getQueryClient(); try { // Run a query over the dataset results = connection.prepareStatement("SELECT key FROM mytable WHERE value = '1'").executeQuery(); Assert.assertTrue(results.next()); Assert.assertEquals("a", results.getString(1)); Assert.assertTrue(results.next()); Assert.assertEquals("c", results.getString(1)); Assert.assertFalse(results.next()); } finally { results.close(); connection.close(); }

The JDBC connection does not implement the full JDBC functionality: it does not allow variable replacement and will not allow you to make any changes to datasets. But it is sufficient to perform test validation: you can create or prepare statements and execute queries, then iterate over the results set and validate its correctness.

Configuring CDAP Runtime for Test Framework

The TestBase class inherited by your test class starts an in-memory CDAP runtime before executing any test methods. Sometimes you may need to configure the CDAP runtime to suit your specific requirements. For example, if your test does not involve usage of SQL queries, you can turn off the explore service to reduce startup and shutdown times.

You alter the configurations for the CDAP runtime by applying a JUnit @ClassRule on a TestConfiguration instance. For example:

1 2 3 4 5 // Disable the SQL query support // Set the transaction timeout to 60 seconds @ClassRule public static final TestConfiguration CONFIG = new TestConfiguration("explore.enabled", false, "data.tx.timeout", 60);

Refer to the cdap-site.xml for the available set of configurations used by CDAP.


Created in 2020 by Google Inc.