Appendix: HBaseDDLExecutor

For details on replication of CDAP clusters, see CDAP Replication.

This document provides a sample implementation of HBaseDDLExecutor.

The HBaseDDLExecutorImpl class performs the DDL operations on the local and peer cluster (master and slave).

Interface

Interface of HBaseDDLExecutor. Note that this is a Beta feature and subject to change without notice.

/** * Interface providing the HBase DDL operations. All methods except {@link #initialize} must * be idempotent in order to allow retry of failed operations. */ @Beta public interface HBaseDDLExecutor extends Closeable { /** * Initialize the {@link HBaseDDLExecutor}. * This method is called once when the executor is created, before any other methods are called. * @param context the context for the executor */ void initialize(HBaseDDLExecutorContext context); /** * Create the specified namespace if it does not exist. * This method gets called when CDAP attempts to create a new namespace. * * @param name the namespace to create * @return whether the namespace was created * @throws IOException if a remote or network exception occurs */ boolean createNamespaceIfNotExists(String name) throws IOException; /** * Delete the specified namespace if it exists. * This method is called during namespace deletion process. * * @param name the namespace to delete * @throws IOException if a remote or network exception occurs * @throws IllegalStateException if there are tables in the namespace */ void deleteNamespaceIfExists(String name) throws IOException; /** * Create the specified table if it does not exist. * This method is called during the creation of an HBase backed dataset (either system or user). * * @param descriptor the descriptor for the table to create * @param splitKeys the initial split keys for the table * @throws IOException if a remote or network exception occurs */ void createTableIfNotExists(TableDescriptor descriptor, @Nullable byte[][] splitKeys) throws IOException; /** * Enable the specified table if it is disabled. * * @param namespace the namespace of the table to enable * @param name the name of the table to enable * @throws IOException if a remote or network exception occurs */ void enableTableIfDisabled(String namespace, String name) throws IOException; /** * Disable the specified table if it is enabled. * * @param namespace the namespace of the table to disable * @param name the name of the table to disable * @throws IOException if a remote or network exception occurs */ void disableTableIfEnabled(String namespace, String name) throws IOException; /** * Modify the specified table. This is called when an HBase backed dataset has * its properties modified. In order to modify the HBase table, CDAP first calls * {@code disableTableIfEnabled}, then calls this method, then calls {@code enableTableIfDisabled}. * * @param namespace the namespace of the table to modify * @param name the name of the table to modify * @param descriptor the descriptor for the table * @throws IOException if a remote or network exception occurs * @throws IllegalStateException if the specified table is not disabled */ void modifyTable(String namespace, String name, TableDescriptor descriptor) throws IOException; /** * Truncate the specified table. Implementation of this method should disable the table first. * The table must also be re-enabled by implementation at the end of truncate operation. * * @param namespace the namespace of the table to truncate * @param name the name of the table to truncate * @throws IOException if a remote or network exception occurs * @throws IllegalStateException if the specified table is not disabled */ void truncateTable(String namespace, String name) throws IOException; /** * Delete the table if it exists. In order to delete the HBase table, * CDAP first calls {@code disableTableIfEnabled}, then calls this method. * * @param namespace the namespace of the table to delete * @param name the table to delete * @throws IOException if a remote or network exception occurs * @throws IllegalStateException if the specified table is not disabled */ void deleteTableIfExists(String namespace, String name) throws IOException; /** * Grant permissions on a table or namespace to users or groups. * * @param namespace the namespace of the table * @param table the name of the. If null, then the permissions are applied to the namespace * @param permissions A map from user or group name to the permissions for that user or group, given as a string * containing only characters 'a'(Admin), 'c'(Create), 'r'(Read), 'w'(Write), and 'x'(Execute). * Group names must be prefixed with the character '@'. * @throws IOException if anything goes wrong */ void grantPermissions(String namespace, @Nullable String table, Map<String, String> permissions) throws IOException; }

Implementation

Sample implementation of HBaseDDLExecutor, for HBase version 1.0.0-cdh5.5.1:

/* * Copyright © 2017 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */ package com.example.hbase.ddlexecutor; import io.cdap.cdap.spi.hbase.ColumnFamilyDescriptor; import io.cdap.cdap.spi.hbase.CoprocessorDescriptor; import io.cdap.cdap.spi.hbase.HBaseDDLExecutor; import io.cdap.cdap.spi.hbase.HBaseDDLExecutorContext; import io.cdap.cdap.spi.hbase.TableDescriptor; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NamespaceNotFoundException; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.StringWriter; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.Map; import java.util.concurrent.TimeUnit; /** * Sample implementation of {@link HBaseDDLExecutor} for HBase version 1.0.0-cdh5.5.1 */ public class HBaseDDLExecutorImpl implements HBaseDDLExecutor { public static final Logger LOG = LoggerFactory.getLogger(HBaseDDLExecutorImpl.class); private HBaseAdmin admin; private HBaseAdmin peerAdmin; /** * Encode a HBase entity name to ASCII encoding using {@link URLEncoder}. * * @param entityName entity string to be encoded * @return encoded string */ private String encodeHBaseEntity(String entityName) { try { return URLEncoder.encode(entityName, "ASCII"); } catch (UnsupportedEncodingException e) { // this can never happen - we know that ASCII is a supported character set! throw new RuntimeException(e); } } public void initialize(HBaseDDLExecutorContext context) { LOG.info("Initializing executor with properties {}", context.getProperties()); try { Configuration conf = context.getConfiguration(); this.admin = new HBaseAdmin(conf); Configuration peerConf = generatePeerConfig(context); this.peerAdmin = new HBaseAdmin(peerConf); } catch (IOException e) { throw new RuntimeException("Failed to create the HBaseAdmin", e); } } private boolean hasNamespace(String name) throws IOException { Preconditions.checkArgument(admin != null, "HBaseAdmin should not be null"); Preconditions.checkArgument(name != null, "Namespace should not be null."); try { admin.getNamespaceDescriptor(encodeHBaseEntity(name)); return true; } catch (NamespaceNotFoundException e) { return false; } } public boolean createNamespaceIfNotExists(String name) throws IOException { Preconditions.checkArgument(name != null, "Namespace should not be null."); if (hasNamespace(name)) { return false; } NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(encodeHBaseEntity(name)).build(); admin.createNamespace(namespaceDescriptor); peerAdmin.createNamespace(namespaceDescriptor); return true; } public void deleteNamespaceIfExists(String name) throws IOException { Preconditions.checkArgument(name != null, "Namespace should not be null."); if (hasNamespace(name)) { admin.deleteNamespace(encodeHBaseEntity(name)); peerAdmin.deleteNamespace(encodeHBaseEntity(name)); } } public void createTableIfNotExists(TableDescriptor descriptor, byte[][] splitKeys) throws IOException { createTableIfNotExists(getHTableDescriptor(descriptor), splitKeys); } private void createTableIfNotExists(HTableDescriptor htd, byte[][] splitKeys) throws IOException { if (admin.tableExists(htd.getName())) { return; } try { admin.createTable(htd, splitKeys); peerAdmin.createTable(htd, splitKeys); LOG.info("Table created '{}'", Bytes.toString(htd.getName())); } catch (TableExistsException e) { // table may exist because someone else is creating it at the same // time. But it may not be available yet, and opening it might fail. LOG.info("Table '{}' already exists.", Bytes.toString(htd.getName()), e); } // Wait for table to materialize try { Stopwatch stopwatch = new Stopwatch(); stopwatch.start(); long sleepTime = TimeUnit.MILLISECONDS.toNanos(5000L) / 10; sleepTime = sleepTime <= 0 ? 1 : sleepTime; do { if (admin.tableExists(htd.getName())) { LOG.debug("Table '{}' exists now. Assuming that another process concurrently created it.", Bytes.toString(htd.getName())); return; } else { TimeUnit.NANOSECONDS.sleep(sleepTime); } } while (stopwatch.elapsedTime(TimeUnit.MILLISECONDS) < 5000L); } catch (InterruptedException e) { LOG.warn("Sleeping thread interrupted."); } LOG.error("Table '{}' does not exist after waiting {} ms. Giving up.", Bytes.toString(htd.getName()), 5000L); } public void enableTableIfDisabled(String namespace, String name) throws IOException { Preconditions.checkArgument(namespace != null, "Namespace should not be null"); Preconditions.checkArgument(name != null, "Table name should not be null."); try { admin.enableTable(TableName.valueOf(namespace, encodeHBaseEntity(name))); peerAdmin.enableTable(TableName.valueOf(namespace, encodeHBaseEntity(name))); } catch (TableNotDisabledException e) { LOG.debug("Attempt to enable already enabled table {} in the namespace {}.", name, namespace); } } public void disableTableIfEnabled(String namespace, String name) throws IOException { Preconditions.checkArgument(namespace != null, "Namespace should not be null"); Preconditions.checkArgument(name != null, "Table name should not be null."); try { admin.disableTable(TableName.valueOf(namespace, encodeHBaseEntity(name))); peerAdmin.disableTable(TableName.valueOf(namespace, encodeHBaseEntity(name))); } catch (TableNotEnabledException e) { LOG.debug("Attempt to disable already disabled table {} in the namespace {}.", name, namespace); } } public void modifyTable(String namespace, String name, TableDescriptor descriptor) throws IOException { Preconditions.checkArgument(namespace != null, "Namespace should not be null"); Preconditions.checkArgument(name != null, "Table name should not be null."); Preconditions.checkArgument(descriptor != null, "Descriptor should not be null."); HTableDescriptor htd = getHTableDescriptor(descriptor); admin.modifyTable(htd.getTableName(), htd); peerAdmin.modifyTable(htd.getTableName(), htd); } public void truncateTable(String namespace, String name) throws IOException { Preconditions.checkArgument(namespace != null, "Namespace should not be null"); Preconditions.checkArgument(name != null, "Table name should not be null."); HTableDescriptor descriptor = admin.getTableDescriptor(TableName.valueOf(namespace, encodeHBaseEntity(name))); disableTableIfEnabled(namespace, name); deleteTableIfExists(namespace, name); createTableIfNotExists(descriptor, null); } public void deleteTableIfExists(String namespace, String name) throws IOException { Preconditions.checkArgument(namespace != null, "Namespace should not be null"); Preconditions.checkArgument(name != null, "Table name should not be null."); admin.deleteTable(TableName.valueOf(namespace, encodeHBaseEntity(name))); peerAdmin.deleteTable(TableName.valueOf(namespace, encodeHBaseEntity(name))); } @Override public void grantPermissions(String s, String s1, Map<String, String> map) throws IOException { // no-op } public void close() throws IOException { if (admin != null) { admin.close(); } if (peerAdmin != null) { peerAdmin.close(); } } /** * Converts the {@link ColumnFamilyDescriptor} to the {@link HColumnDescriptor} for admin operations. * @param ns the namespace for the table * @param tableName the name of the table * @param descriptor descriptor of the column family * @return the instance of HColumnDescriptor */ private static HColumnDescriptor getHColumnDesciptor(String ns, String tableName, ColumnFamilyDescriptor descriptor) { HColumnDescriptor hFamily = new HColumnDescriptor(descriptor.getName()); hFamily.setMaxVersions(descriptor.getMaxVersions()); hFamily.setCompressionType(Compression.Algorithm.valueOf(descriptor.getCompressionType().name())); hFamily.setBloomFilterType(org.apache.hadoop.hbase.regionserver.BloomType.valueOf( descriptor.getBloomType().name())); for (Map.Entry<String, String> property : descriptor.getProperties().entrySet()) { hFamily.setValue(property.getKey(), property.getValue()); } LOG.info("Setting replication scope to global for ns {}, table {}, cf {}", ns, tableName, descriptor.getName()); hFamily.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); return hFamily; } /** * Converts the {@link TableDescriptor} into corresponding {@link HTableDescriptor} for admin operations. * @param descriptor the table descriptor instance * @return the instance of HTableDescriptor */ private static HTableDescriptor getHTableDescriptor(TableDescriptor descriptor) { TableName tableName = TableName.valueOf(descriptor.getNamespace(), descriptor.getName()); HTableDescriptor htd = new HTableDescriptor(tableName); for (Map.Entry<String, ColumnFamilyDescriptor> family : descriptor.getFamilies().entrySet()) { htd.addFamily(getHColumnDesciptor(descriptor.getNamespace(), descriptor.getName(), family.getValue())); } for (Map.Entry<String, CoprocessorDescriptor> coprocessor : descriptor.getCoprocessors().entrySet()) { CoprocessorDescriptor cpd = coprocessor.getValue(); try { htd.addCoprocessor(cpd.getClassName(), new Path(cpd.getPath()), cpd.getPriority(), cpd.getProperties()); } catch (IOException e) { LOG.error("Error adding coprocessor.", e); } } for (Map.Entry<String, String> property : descriptor.getProperties().entrySet()) { htd.setValue(property.getKey(), property.getValue()); } return htd; } /** * Generate the peer configuration which is used to perform DDL operations on the remote cluster using Admin * @param context instance of {@link HBaseDDLExecutorContext} with which the DDL executor is initialized * @return the {@link Configuration} to be used for DDL operations on the remote cluster */ private static Configuration generatePeerConfig(HBaseDDLExecutorContext context) { Configuration peerConf = new Configuration(); peerConf.clear(); for (Map.Entry<String, String> entry : context.getProperties().entrySet()) { peerConf.set(entry.getKey(), entry.getValue()); } StringWriter sw = new StringWriter(); try { Configuration.dumpConfiguration(peerConf, sw); LOG.debug("PeerConfig - {}", sw); } catch (IOException e) { LOG.error("Error dumping config.", e); } return peerConf; } }

POM File

Corresponding pom.xml. Configure the property hbase-client (currently 1.0.0-cdh5.5.1) below as appropriate:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>io.cdap.cdap</groupId> <artifactId>HBaseDDLExecutorExtension</artifactId> <version>1.0-SNAPSHOT</version> <name>HBase DDL executor</name> <properties> <cdap.version>|release|</cdap.version> <slf4j.version>1.7.5</slf4j.version> </properties> <repositories> <repository> <id>sonatype</id> <url>https://oss.sonatype.org/content/groups/public</url> </repository> <repository> <id>apache.snapshots</id> <url>https://repository.apache.org/content/repositories/snapshots</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <dependency> <groupId>io.cdap.cdap</groupId> <artifactId>cdap-hbase-spi</artifactId> <version>${cdap.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.0.0-cdh5.5.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>${slf4j.version}</version> </dependency> </dependencies> </project>

 

Created in 2020 by Google Inc.