Appendix: HBaseDDLExecutor

For details on replication of CDAP clusters, see CDAP Replication.

This document provides a sample implementation of HBaseDDLExecutor.

The HBaseDDLExecutorImpl class performs the DDL operations on the local and peer cluster (master and slave).

Interface

Interface of HBaseDDLExecutor. Note that this is a Beta feature and subject to change without notice.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 /** * Interface providing the HBase DDL operations. All methods except {@link #initialize} must * be idempotent in order to allow retry of failed operations. */ @Beta public interface HBaseDDLExecutor extends Closeable { /** * Initialize the {@link HBaseDDLExecutor}. * This method is called once when the executor is created, before any other methods are called. * @param context the context for the executor */ void initialize(HBaseDDLExecutorContext context); /** * Create the specified namespace if it does not exist. * This method gets called when CDAP attempts to create a new namespace. * * @param name the namespace to create * @return whether the namespace was created * @throws IOException if a remote or network exception occurs */ boolean createNamespaceIfNotExists(String name) throws IOException; /** * Delete the specified namespace if it exists. * This method is called during namespace deletion process. * * @param name the namespace to delete * @throws IOException if a remote or network exception occurs * @throws IllegalStateException if there are tables in the namespace */ void deleteNamespaceIfExists(String name) throws IOException; /** * Create the specified table if it does not exist. * This method is called during the creation of an HBase backed dataset (either system or user). * * @param descriptor the descriptor for the table to create * @param splitKeys the initial split keys for the table * @throws IOException if a remote or network exception occurs */ void createTableIfNotExists(TableDescriptor descriptor, @Nullable byte[][] splitKeys) throws IOException; /** * Enable the specified table if it is disabled. * * @param namespace the namespace of the table to enable * @param name the name of the table to enable * @throws IOException if a remote or network exception occurs */ void enableTableIfDisabled(String namespace, String name) throws IOException; /** * Disable the specified table if it is enabled. * * @param namespace the namespace of the table to disable * @param name the name of the table to disable * @throws IOException if a remote or network exception occurs */ void disableTableIfEnabled(String namespace, String name) throws IOException; /** * Modify the specified table. This is called when an HBase backed dataset has * its properties modified. In order to modify the HBase table, CDAP first calls * {@code disableTableIfEnabled}, then calls this method, then calls {@code enableTableIfDisabled}. * * @param namespace the namespace of the table to modify * @param name the name of the table to modify * @param descriptor the descriptor for the table * @throws IOException if a remote or network exception occurs * @throws IllegalStateException if the specified table is not disabled */ void modifyTable(String namespace, String name, TableDescriptor descriptor) throws IOException; /** * Truncate the specified table. Implementation of this method should disable the table first. * The table must also be re-enabled by implementation at the end of truncate operation. * * @param namespace the namespace of the table to truncate * @param name the name of the table to truncate * @throws IOException if a remote or network exception occurs * @throws IllegalStateException if the specified table is not disabled */ void truncateTable(String namespace, String name) throws IOException; /** * Delete the table if it exists. In order to delete the HBase table, * CDAP first calls {@code disableTableIfEnabled}, then calls this method. * * @param namespace the namespace of the table to delete * @param name the table to delete * @throws IOException if a remote or network exception occurs * @throws IllegalStateException if the specified table is not disabled */ void deleteTableIfExists(String namespace, String name) throws IOException; /** * Grant permissions on a table or namespace to users or groups. * * @param namespace the namespace of the table * @param table the name of the. If null, then the permissions are applied to the namespace * @param permissions A map from user or group name to the permissions for that user or group, given as a string * containing only characters 'a'(Admin), 'c'(Create), 'r'(Read), 'w'(Write), and 'x'(Execute). * Group names must be prefixed with the character '@'. * @throws IOException if anything goes wrong */ void grantPermissions(String namespace, @Nullable String table, Map<String, String> permissions) throws IOException; }

Implementation

Sample implementation of HBaseDDLExecutor, for HBase version 1.0.0-cdh5.5.1:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 /* * Copyright © 2017 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */ package com.example.hbase.ddlexecutor; import io.cdap.cdap.spi.hbase.ColumnFamilyDescriptor; import io.cdap.cdap.spi.hbase.CoprocessorDescriptor; import io.cdap.cdap.spi.hbase.HBaseDDLExecutor; import io.cdap.cdap.spi.hbase.HBaseDDLExecutorContext; import io.cdap.cdap.spi.hbase.TableDescriptor; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NamespaceNotFoundException; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.StringWriter; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.Map; import java.util.concurrent.TimeUnit; /** * Sample implementation of {@link HBaseDDLExecutor} for HBase version 1.0.0-cdh5.5.1 */ public class HBaseDDLExecutorImpl implements HBaseDDLExecutor { public static final Logger LOG = LoggerFactory.getLogger(HBaseDDLExecutorImpl.class); private HBaseAdmin admin; private HBaseAdmin peerAdmin; /** * Encode a HBase entity name to ASCII encoding using {@link URLEncoder}. * * @param entityName entity string to be encoded * @return encoded string */ private String encodeHBaseEntity(String entityName) { try { return URLEncoder.encode(entityName, "ASCII"); } catch (UnsupportedEncodingException e) { // this can never happen - we know that ASCII is a supported character set! throw new RuntimeException(e); } } public void initialize(HBaseDDLExecutorContext context) { LOG.info("Initializing executor with properties {}", context.getProperties()); try { Configuration conf = context.getConfiguration(); this.admin = new HBaseAdmin(conf); Configuration peerConf = generatePeerConfig(context); this.peerAdmin = new HBaseAdmin(peerConf); } catch (IOException e) { throw new RuntimeException("Failed to create the HBaseAdmin", e); } } private boolean hasNamespace(String name) throws IOException { Preconditions.checkArgument(admin != null, "HBaseAdmin should not be null"); Preconditions.checkArgument(name != null, "Namespace should not be null."); try { admin.getNamespaceDescriptor(encodeHBaseEntity(name)); return true; } catch (NamespaceNotFoundException e) { return false; } } public boolean createNamespaceIfNotExists(String name) throws IOException { Preconditions.checkArgument(name != null, "Namespace should not be null."); if (hasNamespace(name)) { return false; } NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(encodeHBaseEntity(name)).build(); admin.createNamespace(namespaceDescriptor); peerAdmin.createNamespace(namespaceDescriptor); return true; } public void deleteNamespaceIfExists(String name) throws IOException { Preconditions.checkArgument(name != null, "Namespace should not be null."); if (hasNamespace(name)) { admin.deleteNamespace(encodeHBaseEntity(name)); peerAdmin.deleteNamespace(encodeHBaseEntity(name)); } } public void createTableIfNotExists(TableDescriptor descriptor, byte[][] splitKeys) throws IOException { createTableIfNotExists(getHTableDescriptor(descriptor), splitKeys); } private void createTableIfNotExists(HTableDescriptor htd, byte[][] splitKeys) throws IOException { if (admin.tableExists(htd.getName())) { return; } try { admin.createTable(htd, splitKeys); peerAdmin.createTable(htd, splitKeys); LOG.info("Table created '{}'", Bytes.toString(htd.getName())); } catch (TableExistsException e) { // table may exist because someone else is creating it at the same // time. But it may not be available yet, and opening it might fail. LOG.info("Table '{}' already exists.", Bytes.toString(htd.getName()), e); } // Wait for table to materialize try { Stopwatch stopwatch = new Stopwatch(); stopwatch.start(); long sleepTime = TimeUnit.MILLISECONDS.toNanos(5000L) / 10; sleepTime = sleepTime <= 0 ? 1 : sleepTime; do { if (admin.tableExists(htd.getName())) { LOG.debug("Table '{}' exists now. Assuming that another process concurrently created it.", Bytes.toString(htd.getName())); return; } else { TimeUnit.NANOSECONDS.sleep(sleepTime); } } while (stopwatch.elapsedTime(TimeUnit.MILLISECONDS) < 5000L); } catch (InterruptedException e) { LOG.warn("Sleeping thread interrupted."); } LOG.error("Table '{}' does not exist after waiting {} ms. Giving up.", Bytes.toString(htd.getName()), 5000L); } public void enableTableIfDisabled(String namespace, String name) throws IOException { Preconditions.checkArgument(namespace != null, "Namespace should not be null"); Preconditions.checkArgument(name != null, "Table name should not be null."); try { admin.enableTable(TableName.valueOf(namespace, encodeHBaseEntity(name))); peerAdmin.enableTable(TableName.valueOf(namespace, encodeHBaseEntity(name))); } catch (TableNotDisabledException e) { LOG.debug("Attempt to enable already enabled table {} in the namespace {}.", name, namespace); } } public void disableTableIfEnabled(String namespace, String name) throws IOException { Preconditions.checkArgument(namespace != null, "Namespace should not be null"); Preconditions.checkArgument(name != null, "Table name should not be null."); try { admin.disableTable(TableName.valueOf(namespace, encodeHBaseEntity(name))); peerAdmin.disableTable(TableName.valueOf(namespace, encodeHBaseEntity(name))); } catch (TableNotEnabledException e) { LOG.debug("Attempt to disable already disabled table {} in the namespace {}.", name, namespace); } } public void modifyTable(String namespace, String name, TableDescriptor descriptor) throws IOException { Preconditions.checkArgument(namespace != null, "Namespace should not be null"); Preconditions.checkArgument(name != null, "Table name should not be null."); Preconditions.checkArgument(descriptor != null, "Descriptor should not be null."); HTableDescriptor htd = getHTableDescriptor(descriptor); admin.modifyTable(htd.getTableName(), htd); peerAdmin.modifyTable(htd.getTableName(), htd); } public void truncateTable(String namespace, String name) throws IOException { Preconditions.checkArgument(namespace != null, "Namespace should not be null"); Preconditions.checkArgument(name != null, "Table name should not be null."); HTableDescriptor descriptor = admin.getTableDescriptor(TableName.valueOf(namespace, encodeHBaseEntity(name))); disableTableIfEnabled(namespace, name); deleteTableIfExists(namespace, name); createTableIfNotExists(descriptor, null); } public void deleteTableIfExists(String namespace, String name) throws IOException { Preconditions.checkArgument(namespace != null, "Namespace should not be null"); Preconditions.checkArgument(name != null, "Table name should not be null."); admin.deleteTable(TableName.valueOf(namespace, encodeHBaseEntity(name))); peerAdmin.deleteTable(TableName.valueOf(namespace, encodeHBaseEntity(name))); } @Override public void grantPermissions(String s, String s1, Map<String, String> map) throws IOException { // no-op } public void close() throws IOException { if (admin != null) { admin.close(); } if (peerAdmin != null) { peerAdmin.close(); } } /** * Converts the {@link ColumnFamilyDescriptor} to the {@link HColumnDescriptor} for admin operations. * @param ns the namespace for the table * @param tableName the name of the table * @param descriptor descriptor of the column family * @return the instance of HColumnDescriptor */ private static HColumnDescriptor getHColumnDesciptor(String ns, String tableName, ColumnFamilyDescriptor descriptor) { HColumnDescriptor hFamily = new HColumnDescriptor(descriptor.getName()); hFamily.setMaxVersions(descriptor.getMaxVersions()); hFamily.setCompressionType(Compression.Algorithm.valueOf(descriptor.getCompressionType().name())); hFamily.setBloomFilterType(org.apache.hadoop.hbase.regionserver.BloomType.valueOf( descriptor.getBloomType().name())); for (Map.Entry<String, String> property : descriptor.getProperties().entrySet()) { hFamily.setValue(property.getKey(), property.getValue()); } LOG.info("Setting replication scope to global for ns {}, table {}, cf {}", ns, tableName, descriptor.getName()); hFamily.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); return hFamily; } /** * Converts the {@link TableDescriptor} into corresponding {@link HTableDescriptor} for admin operations. * @param descriptor the table descriptor instance * @return the instance of HTableDescriptor */ private static HTableDescriptor getHTableDescriptor(TableDescriptor descriptor) { TableName tableName = TableName.valueOf(descriptor.getNamespace(), descriptor.getName()); HTableDescriptor htd = new HTableDescriptor(tableName); for (Map.Entry<String, ColumnFamilyDescriptor> family : descriptor.getFamilies().entrySet()) { htd.addFamily(getHColumnDesciptor(descriptor.getNamespace(), descriptor.getName(), family.getValue())); } for (Map.Entry<String, CoprocessorDescriptor> coprocessor : descriptor.getCoprocessors().entrySet()) { CoprocessorDescriptor cpd = coprocessor.getValue(); try { htd.addCoprocessor(cpd.getClassName(), new Path(cpd.getPath()), cpd.getPriority(), cpd.getProperties()); } catch (IOException e) { LOG.error("Error adding coprocessor.", e); } } for (Map.Entry<String, String> property : descriptor.getProperties().entrySet()) { htd.setValue(property.getKey(), property.getValue()); } return htd; } /** * Generate the peer configuration which is used to perform DDL operations on the remote cluster using Admin * @param context instance of {@link HBaseDDLExecutorContext} with which the DDL executor is initialized * @return the {@link Configuration} to be used for DDL operations on the remote cluster */ private static Configuration generatePeerConfig(HBaseDDLExecutorContext context) { Configuration peerConf = new Configuration(); peerConf.clear(); for (Map.Entry<String, String> entry : context.getProperties().entrySet()) { peerConf.set(entry.getKey(), entry.getValue()); } StringWriter sw = new StringWriter(); try { Configuration.dumpConfiguration(peerConf, sw); LOG.debug("PeerConfig - {}", sw); } catch (IOException e) { LOG.error("Error dumping config.", e); } return peerConf; } }

POM File

Corresponding pom.xml. Configure the property hbase-client (currently 1.0.0-cdh5.5.1) below as appropriate:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>io.cdap.cdap</groupId> <artifactId>HBaseDDLExecutorExtension</artifactId> <version>1.0-SNAPSHOT</version> <name>HBase DDL executor</name> <properties> <cdap.version>|release|</cdap.version> <slf4j.version>1.7.5</slf4j.version> </properties> <repositories> <repository> <id>sonatype</id> <url>https://oss.sonatype.org/content/groups/public</url> </repository> <repository> <id>apache.snapshots</id> <url>https://repository.apache.org/content/repositories/snapshots</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <dependency> <groupId>io.cdap.cdap</groupId> <artifactId>cdap-hbase-spi</artifactId> <version>${cdap.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.0.0-cdh5.5.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>${slf4j.version}</version> </dependency> </dependencies> </project>