/
Appendix: HBaseDDLExecutor
Appendix: HBaseDDLExecutor
For details on replication of CDAP clusters, see CDAP Replication.
This document provides a sample implementation of HBaseDDLExecutor
.
The HBaseDDLExecutorImpl
class performs the DDL operations on the local and peer cluster (master and slave).
Interface
Interface of HBaseDDLExecutor
. Note that this is a Beta feature and subject to change without notice.
/**
* Interface providing the HBase DDL operations. All methods except {@link #initialize} must
* be idempotent in order to allow retry of failed operations.
*/
@Beta
public interface HBaseDDLExecutor extends Closeable {
/**
* Initialize the {@link HBaseDDLExecutor}.
* This method is called once when the executor is created, before any other methods are called.
* @param context the context for the executor
*/
void initialize(HBaseDDLExecutorContext context);
/**
* Create the specified namespace if it does not exist.
* This method gets called when CDAP attempts to create a new namespace.
*
* @param name the namespace to create
* @return whether the namespace was created
* @throws IOException if a remote or network exception occurs
*/
boolean createNamespaceIfNotExists(String name) throws IOException;
/**
* Delete the specified namespace if it exists.
* This method is called during namespace deletion process.
*
* @param name the namespace to delete
* @throws IOException if a remote or network exception occurs
* @throws IllegalStateException if there are tables in the namespace
*/
void deleteNamespaceIfExists(String name) throws IOException;
/**
* Create the specified table if it does not exist.
* This method is called during the creation of an HBase backed dataset (either system or user).
*
* @param descriptor the descriptor for the table to create
* @param splitKeys the initial split keys for the table
* @throws IOException if a remote or network exception occurs
*/
void createTableIfNotExists(TableDescriptor descriptor, @Nullable byte[][] splitKeys)
throws IOException;
/**
* Enable the specified table if it is disabled.
*
* @param namespace the namespace of the table to enable
* @param name the name of the table to enable
* @throws IOException if a remote or network exception occurs
*/
void enableTableIfDisabled(String namespace, String name) throws IOException;
/**
* Disable the specified table if it is enabled.
*
* @param namespace the namespace of the table to disable
* @param name the name of the table to disable
* @throws IOException if a remote or network exception occurs
*/
void disableTableIfEnabled(String namespace, String name) throws IOException;
/**
* Modify the specified table. This is called when an HBase backed dataset has
* its properties modified. In order to modify the HBase table, CDAP first calls
* {@code disableTableIfEnabled}, then calls this method, then calls {@code enableTableIfDisabled}.
*
* @param namespace the namespace of the table to modify
* @param name the name of the table to modify
* @param descriptor the descriptor for the table
* @throws IOException if a remote or network exception occurs
* @throws IllegalStateException if the specified table is not disabled
*/
void modifyTable(String namespace, String name, TableDescriptor descriptor) throws IOException;
/**
* Truncate the specified table. Implementation of this method should disable the table first.
* The table must also be re-enabled by implementation at the end of truncate operation.
*
* @param namespace the namespace of the table to truncate
* @param name the name of the table to truncate
* @throws IOException if a remote or network exception occurs
* @throws IllegalStateException if the specified table is not disabled
*/
void truncateTable(String namespace, String name) throws IOException;
/**
* Delete the table if it exists. In order to delete the HBase table,
* CDAP first calls {@code disableTableIfEnabled}, then calls this method.
*
* @param namespace the namespace of the table to delete
* @param name the table to delete
* @throws IOException if a remote or network exception occurs
* @throws IllegalStateException if the specified table is not disabled
*/
void deleteTableIfExists(String namespace, String name) throws IOException;
/**
* Grant permissions on a table or namespace to users or groups.
*
* @param namespace the namespace of the table
* @param table the name of the. If null, then the permissions are applied to the namespace
* @param permissions A map from user or group name to the permissions for that user or group, given as a string
* containing only characters 'a'(Admin), 'c'(Create), 'r'(Read), 'w'(Write), and 'x'(Execute).
* Group names must be prefixed with the character '@'.
* @throws IOException if anything goes wrong
*/
void grantPermissions(String namespace, @Nullable String table, Map<String, String> permissions) throws IOException;
}
Implementation
Sample implementation of HBaseDDLExecutor
, for HBase version 1.0.0-cdh5.5.1:
/*
* Copyright © 2017 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.example.hbase.ddlexecutor;
import io.cdap.cdap.spi.hbase.ColumnFamilyDescriptor;
import io.cdap.cdap.spi.hbase.CoprocessorDescriptor;
import io.cdap.cdap.spi.hbase.HBaseDDLExecutor;
import io.cdap.cdap.spi.hbase.HBaseDDLExecutorContext;
import io.cdap.cdap.spi.hbase.TableDescriptor;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceNotFoundException;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Sample implementation of {@link HBaseDDLExecutor} for HBase version 1.0.0-cdh5.5.1
*/
public class HBaseDDLExecutorImpl implements HBaseDDLExecutor {
public static final Logger LOG = LoggerFactory.getLogger(HBaseDDLExecutorImpl.class);
private HBaseAdmin admin;
private HBaseAdmin peerAdmin;
/**
* Encode a HBase entity name to ASCII encoding using {@link URLEncoder}.
*
* @param entityName entity string to be encoded
* @return encoded string
*/
private String encodeHBaseEntity(String entityName) {
try {
return URLEncoder.encode(entityName, "ASCII");
} catch (UnsupportedEncodingException e) {
// this can never happen - we know that ASCII is a supported character set!
throw new RuntimeException(e);
}
}
public void initialize(HBaseDDLExecutorContext context) {
LOG.info("Initializing executor with properties {}", context.getProperties());
try {
Configuration conf = context.getConfiguration();
this.admin = new HBaseAdmin(conf);
Configuration peerConf = generatePeerConfig(context);
this.peerAdmin = new HBaseAdmin(peerConf);
} catch (IOException e) {
throw new RuntimeException("Failed to create the HBaseAdmin", e);
}
}
private boolean hasNamespace(String name) throws IOException {
Preconditions.checkArgument(admin != null, "HBaseAdmin should not be null");
Preconditions.checkArgument(name != null, "Namespace should not be null.");
try {
admin.getNamespaceDescriptor(encodeHBaseEntity(name));
return true;
} catch (NamespaceNotFoundException e) {
return false;
}
}
public boolean createNamespaceIfNotExists(String name) throws IOException {
Preconditions.checkArgument(name != null, "Namespace should not be null.");
if (hasNamespace(name)) {
return false;
}
NamespaceDescriptor namespaceDescriptor =
NamespaceDescriptor.create(encodeHBaseEntity(name)).build();
admin.createNamespace(namespaceDescriptor);
peerAdmin.createNamespace(namespaceDescriptor);
return true;
}
public void deleteNamespaceIfExists(String name) throws IOException {
Preconditions.checkArgument(name != null, "Namespace should not be null.");
if (hasNamespace(name)) {
admin.deleteNamespace(encodeHBaseEntity(name));
peerAdmin.deleteNamespace(encodeHBaseEntity(name));
}
}
public void createTableIfNotExists(TableDescriptor descriptor, byte[][] splitKeys) throws IOException {
createTableIfNotExists(getHTableDescriptor(descriptor), splitKeys);
}
private void createTableIfNotExists(HTableDescriptor htd, byte[][] splitKeys) throws IOException {
if (admin.tableExists(htd.getName())) {
return;
}
try {
admin.createTable(htd, splitKeys);
peerAdmin.createTable(htd, splitKeys);
LOG.info("Table created '{}'", Bytes.toString(htd.getName()));
} catch (TableExistsException e) {
// table may exist because someone else is creating it at the same
// time. But it may not be available yet, and opening it might fail.
LOG.info("Table '{}' already exists.", Bytes.toString(htd.getName()), e);
}
// Wait for table to materialize
try {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
long sleepTime = TimeUnit.MILLISECONDS.toNanos(5000L) / 10;
sleepTime = sleepTime <= 0 ? 1 : sleepTime;
do {
if (admin.tableExists(htd.getName())) {
LOG.debug("Table '{}' exists now. Assuming that another process concurrently created it.",
Bytes.toString(htd.getName()));
return;
} else {
TimeUnit.NANOSECONDS.sleep(sleepTime);
}
} while (stopwatch.elapsedTime(TimeUnit.MILLISECONDS) < 5000L);
} catch (InterruptedException e) {
LOG.warn("Sleeping thread interrupted.");
}
LOG.error("Table '{}' does not exist after waiting {} ms. Giving up.", Bytes.toString(htd.getName()), 5000L);
}
public void enableTableIfDisabled(String namespace, String name) throws IOException {
Preconditions.checkArgument(namespace != null, "Namespace should not be null");
Preconditions.checkArgument(name != null, "Table name should not be null.");
try {
admin.enableTable(TableName.valueOf(namespace, encodeHBaseEntity(name)));
peerAdmin.enableTable(TableName.valueOf(namespace, encodeHBaseEntity(name)));
} catch (TableNotDisabledException e) {
LOG.debug("Attempt to enable already enabled table {} in the namespace {}.", name, namespace);
}
}
public void disableTableIfEnabled(String namespace, String name) throws IOException {
Preconditions.checkArgument(namespace != null, "Namespace should not be null");
Preconditions.checkArgument(name != null, "Table name should not be null.");
try {
admin.disableTable(TableName.valueOf(namespace, encodeHBaseEntity(name)));
peerAdmin.disableTable(TableName.valueOf(namespace, encodeHBaseEntity(name)));
} catch (TableNotEnabledException e) {
LOG.debug("Attempt to disable already disabled table {} in the namespace {}.", name, namespace);
}
}
public void modifyTable(String namespace, String name, TableDescriptor descriptor) throws IOException {
Preconditions.checkArgument(namespace != null, "Namespace should not be null");
Preconditions.checkArgument(name != null, "Table name should not be null.");
Preconditions.checkArgument(descriptor != null, "Descriptor should not be null.");
HTableDescriptor htd = getHTableDescriptor(descriptor);
admin.modifyTable(htd.getTableName(), htd);
peerAdmin.modifyTable(htd.getTableName(), htd);
}
public void truncateTable(String namespace, String name) throws IOException {
Preconditions.checkArgument(namespace != null, "Namespace should not be null");
Preconditions.checkArgument(name != null, "Table name should not be null.");
HTableDescriptor descriptor = admin.getTableDescriptor(TableName.valueOf(namespace, encodeHBaseEntity(name)));
disableTableIfEnabled(namespace, name);
deleteTableIfExists(namespace, name);
createTableIfNotExists(descriptor, null);
}
public void deleteTableIfExists(String namespace, String name) throws IOException {
Preconditions.checkArgument(namespace != null, "Namespace should not be null");
Preconditions.checkArgument(name != null, "Table name should not be null.");
admin.deleteTable(TableName.valueOf(namespace, encodeHBaseEntity(name)));
peerAdmin.deleteTable(TableName.valueOf(namespace, encodeHBaseEntity(name)));
}
@Override
public void grantPermissions(String s, String s1, Map<String, String> map) throws IOException {
// no-op
}
public void close() throws IOException {
if (admin != null) {
admin.close();
}
if (peerAdmin != null) {
peerAdmin.close();
}
}
/**
* Converts the {@link ColumnFamilyDescriptor} to the {@link HColumnDescriptor} for admin operations.
* @param ns the namespace for the table
* @param tableName the name of the table
* @param descriptor descriptor of the column family
* @return the instance of HColumnDescriptor
*/
private static HColumnDescriptor getHColumnDesciptor(String ns, String tableName,
ColumnFamilyDescriptor descriptor) {
HColumnDescriptor hFamily = new HColumnDescriptor(descriptor.getName());
hFamily.setMaxVersions(descriptor.getMaxVersions());
hFamily.setCompressionType(Compression.Algorithm.valueOf(descriptor.getCompressionType().name()));
hFamily.setBloomFilterType(org.apache.hadoop.hbase.regionserver.BloomType.valueOf(
descriptor.getBloomType().name()));
for (Map.Entry<String, String> property : descriptor.getProperties().entrySet()) {
hFamily.setValue(property.getKey(), property.getValue());
}
LOG.info("Setting replication scope to global for ns {}, table {}, cf {}", ns, tableName, descriptor.getName());
hFamily.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
return hFamily;
}
/**
* Converts the {@link TableDescriptor} into corresponding {@link HTableDescriptor} for admin operations.
* @param descriptor the table descriptor instance
* @return the instance of HTableDescriptor
*/
private static HTableDescriptor getHTableDescriptor(TableDescriptor descriptor) {
TableName tableName = TableName.valueOf(descriptor.getNamespace(), descriptor.getName());
HTableDescriptor htd = new HTableDescriptor(tableName);
for (Map.Entry<String, ColumnFamilyDescriptor> family : descriptor.getFamilies().entrySet()) {
htd.addFamily(getHColumnDesciptor(descriptor.getNamespace(), descriptor.getName(), family.getValue()));
}
for (Map.Entry<String, CoprocessorDescriptor> coprocessor : descriptor.getCoprocessors().entrySet()) {
CoprocessorDescriptor cpd = coprocessor.getValue();
try {
htd.addCoprocessor(cpd.getClassName(), new Path(cpd.getPath()), cpd.getPriority(), cpd.getProperties());
} catch (IOException e) {
LOG.error("Error adding coprocessor.", e);
}
}
for (Map.Entry<String, String> property : descriptor.getProperties().entrySet()) {
htd.setValue(property.getKey(), property.getValue());
}
return htd;
}
/**
* Generate the peer configuration which is used to perform DDL operations on the remote cluster using Admin
* @param context instance of {@link HBaseDDLExecutorContext} with which the DDL executor is initialized
* @return the {@link Configuration} to be used for DDL operations on the remote cluster
*/
private static Configuration generatePeerConfig(HBaseDDLExecutorContext context) {
Configuration peerConf = new Configuration();
peerConf.clear();
for (Map.Entry<String, String> entry : context.getProperties().entrySet()) {
peerConf.set(entry.getKey(), entry.getValue());
}
StringWriter sw = new StringWriter();
try {
Configuration.dumpConfiguration(peerConf, sw);
LOG.debug("PeerConfig - {}", sw);
} catch (IOException e) {
LOG.error("Error dumping config.", e);
}
return peerConf;
}
}
POM File
Corresponding pom.xml
. Configure the property hbase-client
(currently 1.0.0-cdh5.5.1
) below as appropriate:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.cdap.cdap</groupId>
<artifactId>HBaseDDLExecutorExtension</artifactId>
<version>1.0-SNAPSHOT</version>
<name>HBase DDL executor</name>
<properties>
<cdap.version>|release|</cdap.version>
<slf4j.version>1.7.5</slf4j.version>
</properties>
<repositories>
<repository>
<id>sonatype</id>
<url>https://oss.sonatype.org/content/groups/public</url>
</repository>
<repository>
<id>apache.snapshots</id>
<url>https://repository.apache.org/content/repositories/snapshots</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-hbase-spi</artifactId>
<version>${cdap.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.0.0-cdh5.5.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
</project>
, multiple selections available,
Related content
Vertica Bulk Import Action (Deprecated)
Vertica Bulk Import Action (Deprecated)
More like this
Optional Configuration in CDAP Custom Resource
Optional Configuration in CDAP Custom Resource
More like this
CDAP Release 6.11.0
CDAP Release 6.11.0
More like this
CDAP Release 6.10.1
CDAP Release 6.10.1
More like this
Hive Bulk Import Action (Deprecated)
Hive Bulk Import Action (Deprecated)
More like this
Apache Kudu Sink (Deprecated)
Apache Kudu Sink (Deprecated)
More like this
Created in 2020 by Google Inc.