Skip to content

Commit

Permalink
[GOBBLIN-2083] set max connections to 2000 in mysql started through t…
Browse files Browse the repository at this point in the history
…he docker image to avoid "too many connections" error (apache#3969)

* set max connections to 2000 in mysql docker image to avoid "too many connections" error
* fix tests using AssertWithBackOff
  • Loading branch information
arjun4084346 authored Jun 11, 2024
1 parent 8c3f326 commit 1abd4cc
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 140 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build_and_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ jobs:
sudo dpkg -l | grep -i mysql
sudo apt-get clean
sudo apt-get install -y mysql-client
mysql --host 127.0.0.1 --port 3306 -uroot -ppassword -e "SET GLOBAL max_connections=2000"
mysql --host 127.0.0.1 --port 3306 -uroot -ppassword -e "SHOW DATABASES"
- name: Cache Gradle Dependencies
uses: actions/cache@v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@
import java.util.Properties;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.io.Files;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.conversion.hive.LocalHiveMetastoreTestUtils;
Expand All @@ -40,18 +53,6 @@
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.AutoReturnableObject;
import org.apache.gobblin.util.HiveJdbcConnector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.io.Files;


@Test (groups = {"disabledOnCI"})
Expand Down Expand Up @@ -234,10 +235,10 @@ private TaskContext getTaskContextForRun(WorkUnit workUnit) {
}

private List<List<String>> executeStatementAndGetResults(HiveJdbcConnector connector, String query, int columns) throws SQLException {
Connection conn = connector.getConnection();
List<List<String>> result = new ArrayList<>();

try (Statement stmt = conn.createStatement()) {
try (Connection conn = connector.getConnection();
Statement stmt = conn.createStatement()) {
stmt.execute(query);
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import java.sql.SQLException;
import java.sql.Timestamp;

import com.zaxxer.hikari.HikariDataSource;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.joda.time.DateTime;
Expand All @@ -33,6 +31,8 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.zaxxer.hikari.HikariDataSource;


/**
* A Proof of concept to represent Retention policies as SQL queries. The POC uses Apache Derby in-memory database to
Expand Down Expand Up @@ -71,8 +71,11 @@ public void setup() throws SQLException {
execute("CREATE FUNCTION TIMESTAMP_DIFF(timestamp1 TIMESTAMP, timestamp2 TIMESTAMP, unitString VARCHAR(50)) RETURNS BIGINT PARAMETER STYLE JAVA NO SQL LANGUAGE JAVA EXTERNAL NAME 'org.apache.gobblin.data.management.retention.sql.SqlUdfs.timestamp_diff'");
}

@AfterClass
@AfterClass (alwaysRun = true)
public void cleanUp() throws Exception {
if (connection != null) {
connection.close();
}
dataSource.close();
}

Expand All @@ -89,17 +92,17 @@ public void testKeepLast2Snapshots() throws Exception {
insertSnapshot(new Path("/data/databases/Forum/Comments/1453889323804-PT-440936752"));

// Derby does not support LIMIT keyword. The suggested workaround is to setMaxRows in the PreparedStatement
PreparedStatement statement = connection.prepareStatement("SELECT name FROM Snapshots ORDER BY ts desc");
statement.setMaxRows(2);

ResultSet rs = statement.executeQuery();

// Snapshots to be retained
rs.next();
Assert.assertEquals(rs.getString(1), "1453889323804-PT-440936752");
rs.next();
Assert.assertEquals(rs.getString(1), "1453860526464-PT-440847244");

try (PreparedStatement statement = connection.prepareStatement("SELECT name FROM Snapshots ORDER BY ts desc")) {
statement.setMaxRows(2);

try (ResultSet rs = statement.executeQuery()) {
// Snapshots to be retained
rs.next();
Assert.assertEquals(rs.getString(1), "1453889323804-PT-440936752");
rs.next();
Assert.assertEquals(rs.getString(1), "1453860526464-PT-440847244");
}
}
}

/**
Expand All @@ -119,18 +122,19 @@ public void testKeepLast2YearsOfDailyPartitions() throws Exception {
Timestamp currentTimestamp =
new Timestamp(DateTimeFormat.forPattern(DAILY_PARTITION_PATTERN).parseDateTime("2016/01/25").getMillis());

PreparedStatement statement =
connection.prepareStatement("SELECT path FROM Daily_Partitions WHERE TIMESTAMP_DIFF(?, ts, 'Days') > ?");
statement.setTimestamp(1, currentTimestamp);
statement.setLong(2, TWO_YEARS_IN_DAYS);
ResultSet rs = statement.executeQuery();

// Daily partitions to be cleaned
rs.next();
Assert.assertEquals(rs.getString(1), "/data/tracking/MetricEvent/daily/2014/01/22");
rs.next();
Assert.assertEquals(rs.getString(1), "/data/tracking/MetricEvent/daily/2013/01/25");

try (PreparedStatement statement =
connection.prepareStatement("SELECT path FROM Daily_Partitions WHERE TIMESTAMP_DIFF(?, ts, 'Days') > ?")) {
statement.setTimestamp(1, currentTimestamp);
statement.setLong(2, TWO_YEARS_IN_DAYS);
try (ResultSet rs = statement.executeQuery()) {

// Daily partitions to be cleaned
rs.next();
Assert.assertEquals(rs.getString(1), "/data/tracking/MetricEvent/daily/2014/01/22");
rs.next();
Assert.assertEquals(rs.getString(1), "/data/tracking/MetricEvent/daily/2013/01/25");
}
}
}

private void insertSnapshot(Path snapshotPath) throws Exception {
Expand All @@ -140,15 +144,15 @@ private void insertSnapshot(Path snapshotPath) throws Exception {
long ts = Long.parseLong(StringUtils.substringBefore(snapshotName, "-PT-"));
long recordCount = Long.parseLong(StringUtils.substringAfter(snapshotName, "-PT-"));

PreparedStatement insert = connection.prepareStatement("INSERT INTO Snapshots VALUES (?, ?, ?, ?, ?)");
insert.setString(1, datasetPath);
insert.setString(2, snapshotName);
insert.setString(3, snapshotPath.toString());
insert.setTimestamp(4, new Timestamp(ts));
insert.setLong(5, recordCount);

insert.executeUpdate();
try (PreparedStatement insert = connection.prepareStatement("INSERT INTO Snapshots VALUES (?, ?, ?, ?, ?)")) {
insert.setString(1, datasetPath);
insert.setString(2, snapshotName);
insert.setString(3, snapshotPath.toString());
insert.setTimestamp(4, new Timestamp(ts));
insert.setLong(5, recordCount);

insert.executeUpdate();
}
}

private void insertDailyPartition(Path dailyPartitionPath) throws Exception {
Expand All @@ -159,17 +163,18 @@ private void insertDailyPartition(Path dailyPartitionPath) throws Exception {
DateTimeFormat.forPattern(DAILY_PARTITION_PATTERN).parseDateTime(
StringUtils.substringAfter(dailyPartitionPath.toString(), "daily" + Path.SEPARATOR));

PreparedStatement insert = connection.prepareStatement("INSERT INTO Daily_Partitions VALUES (?, ?, ?)");
insert.setString(1, datasetPath);
insert.setString(2, dailyPartitionPath.toString());
insert.setTimestamp(3, new Timestamp(partition.getMillis()));

insert.executeUpdate();
try (PreparedStatement insert = connection.prepareStatement("INSERT INTO Daily_Partitions VALUES (?, ?, ?)")) {
insert.setString(1, datasetPath);
insert.setString(2, dailyPartitionPath.toString());
insert.setTimestamp(3, new Timestamp(partition.getMillis()));

insert.executeUpdate();
}
}

private void execute(String query) throws SQLException {
PreparedStatement insertStatement = connection.prepareStatement(query);
insertStatement.executeUpdate();
try (PreparedStatement insertStatement = connection.prepareStatement(query)) {
insertStatement.executeUpdate();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,12 @@ public void hiveTest() throws Exception {
metaStoreClient.tableExists(TARGET_DB, TEST_TABLE);
FileSystem fs = FileSystem.getLocal(new Configuration());
fs.exists(new Path(TARGET_PATH));

statement.close();
}

// Tearing down the Hive components from derby driver if there's anything generated through the test.
@AfterClass
@AfterClass (alwaysRun = true)
public void hiveTearDown() throws Exception {
FileSystem fs = FileSystem.getLocal(new Configuration());
Path targetPath = new Path(TARGET_PATH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static ITestMetastoreDatabase get(String version) throws Exception {
public static ITestMetastoreDatabase get(String version, Config dbConfig) throws Exception {
try {
synchronized (syncObject) {
ensureDatabaseExists(dbConfig);
ensureDatabaseServerExists(dbConfig);
TestMetadataDatabase instance = new TestMetadataDatabase(testMetastoreDatabaseServer, version);
instances.add(instance);
return instance;
Expand All @@ -72,14 +72,14 @@ public static ITestMetastoreDatabase get(String version, Config dbConfig) throws

static void release(ITestMetastoreDatabase instance) throws IOException {
synchronized (syncObject) {
if (instances.remove(instance) && instances.size() == 0) {
if (instances.remove(instance) && instances.isEmpty()) {
testMetastoreDatabaseServer.close();
testMetastoreDatabaseServer = null;
}
}
}

private static void ensureDatabaseExists(Config dbConfig) throws Exception {
private static void ensureDatabaseServerExists(Config dbConfig) throws Exception {
if (testMetastoreDatabaseServer == null) {
try (Mutex ignored = new Mutex()) {
if (testMetastoreDatabaseServer == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,10 @@
*/
package org.apache.gobblin.data.management.conversion.hive.validation;

import org.apache.gobblin.config.client.ConfigClient;
import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
import org.apache.gobblin.util.PathUtils;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
Expand Down Expand Up @@ -58,8 +54,6 @@
import org.joda.time.DateTime;
import org.slf4j.LoggerFactory;

import azkaban.jobExecutor.AbstractJob;

import com.google.common.base.Charsets;
import com.google.common.base.Enums;
import com.google.common.base.Optional;
Expand All @@ -74,6 +68,11 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import azkaban.jobExecutor.AbstractJob;

import org.apache.gobblin.config.client.ConfigClient;
import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDatasetFinder;
import org.apache.gobblin.data.management.conversion.hive.events.EventConstants;
Expand All @@ -82,19 +81,21 @@
import org.apache.gobblin.data.management.conversion.hive.provider.UpdateProviderFactory;
import org.apache.gobblin.data.management.conversion.hive.query.HiveValidationQueryGenerator;
import org.apache.gobblin.data.management.conversion.hive.source.HiveSource;
import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
import org.apache.gobblin.data.management.copy.hive.HiveDataset;
import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
import org.apache.gobblin.data.management.copy.hive.HiveUtils;
import org.apache.gobblin.hive.HiveMetastoreClientPool;
import org.apache.gobblin.hive.HiveSerDeWrapper;
import org.apache.gobblin.util.HiveJdbcConnector;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.AutoReturnableObject;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.HiveJdbcConnector;
import org.apache.gobblin.util.PathUtils;


/**
Expand Down Expand Up @@ -490,25 +491,23 @@ public Void call() throws Exception {
*/
@SuppressWarnings("unused")
private List<Long> getValidationOutputFromHiveJdbc(List<String> queries) throws IOException {
if (null == queries || queries.size() == 0) {
if (null == queries || queries.isEmpty()) {
log.warn("No queries specified to be executed");
return Collections.emptyList();
}
Statement statement = null;
List<Long> rowCounts = Lists.newArrayList();
Closer closer = Closer.create();

try {
HiveJdbcConnector hiveJdbcConnector = HiveJdbcConnector.newConnectorWithProps(props);
statement = hiveJdbcConnector.getConnection().createStatement();

try (HiveJdbcConnector hiveJdbcConnector = HiveJdbcConnector.newConnectorWithProps(props);
Connection connection = hiveJdbcConnector.getConnection();
Statement statement = connection.createStatement()){
for (String query : queries) {
log.info("Executing query: " + query);
boolean result = statement.execute(query);
if (result) {
ResultSet resultSet = statement.getResultSet();
if (resultSet.next()) {
rowCounts.add(resultSet.getLong(1));
try (ResultSet resultSet = statement.getResultSet()) {
if (resultSet.next()) {
rowCounts.add(resultSet.getLong(1));
}
}
} else {
log.warn("Query output for: " + query + " : " + result);
Expand All @@ -517,19 +516,6 @@ private List<Long> getValidationOutputFromHiveJdbc(List<String> queries) throws

} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
try {
closer.close();
} catch (Exception e) {
log.warn("Could not close HiveJdbcConnector", e);
}
if (null != statement) {
try {
statement.close();
} catch (SQLException e) {
log.warn("Could not close Hive statement", e);
}
}
}

return rowCounts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.List;
import java.util.Map;

import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.jdbc.HiveConnection;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -118,21 +117,18 @@ private synchronized void setProxiedConnection(final List<String> proxies)
String realm = this.state.getProp(ConfigurationKeys.KERBEROS_REALM);
UserGroupInformation loginUser = UserGroupInformation
.loginUserFromKeytabAndReturnUGI(HostUtils.getPrincipalUsingHostname(superUser, realm), keytabLocation);
loginUser.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run()
throws MetaException, SQLException, ClassNotFoundException {
for (String proxy : proxies) {
HiveConnection hiveConnection = getHiveConnection(Optional.fromNullable(proxy));
loginUser.doAs((PrivilegedExceptionAction<Void>) () -> {
for (String proxy : proxies) {
try (HiveConnection hiveConnection = getHiveConnection(Optional.fromNullable(proxy))) {
Statement statement = hiveConnection.createStatement();
statementMap.put(proxy, statement);
connectionMap.put(proxy, hiveConnection);
for (String setting : settings) {
statement.execute(setting);
}
}
return null;
}
return null;
});
}

Expand Down
Loading

0 comments on commit 1abd4cc

Please sign in to comment.