Skip to content

Commit

Permalink
[GOBBLIN-2141] Simulate directly on trash class (apache#4036)
Browse files Browse the repository at this point in the history
* Support Trash simulate mode without using MockTrash

* Use Trashfactory initialization on simulated trash classes
  • Loading branch information
Will-Lo authored Aug 23, 2024
1 parent d58bbc0 commit 7c8127c
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.List;
import java.util.Properties;

import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -40,6 +39,8 @@
import org.slf4j.LoggerFactory;

import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;


/**
Expand All @@ -52,6 +53,8 @@ public class Trash implements GobblinTrash {
private static final FsPermission ALL_PERM = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
public static final String TRASH_CLASS_KEY = "trash.class";

private final boolean simulate;

/**
* Location of trash directory in file system. The location can include a token $USER that will be automatically
* replaced by the name of the active user.
Expand Down Expand Up @@ -106,7 +109,11 @@ protected Path createTrashLocation(FileSystem fs, Properties props, String user)
throw new IllegalArgumentException("Trash location must be absolute. Found " + trashLocation.toString());
}
Path qualifiedTrashLocation = fs.makeQualified(trashLocation);
ensureTrashLocationExists(fs, qualifiedTrashLocation);
if (this.simulate) {
LOG.info("Simulating trash location creation at " + qualifiedTrashLocation);
} else {
ensureTrashLocationExists(fs, qualifiedTrashLocation);
}
return qualifiedTrashLocation;
}

Expand Down Expand Up @@ -156,6 +163,7 @@ public Trash(FileSystem fs, Properties props) throws IOException {

protected Trash(FileSystem fs, Properties props, String user) throws IOException {
this.fs = fs;
this.simulate = PropertiesUtils.getPropAsBoolean(props, TrashFactory.SIMULATE, "false");
this.trashLocation = createTrashLocation(fs, props, user);
try {
Class<?> snapshotCleanupPolicyClass = Class.forName(props.getProperty(SNAPSHOT_CLEANUP_POLICY_CLASS_KEY,
Expand Down Expand Up @@ -189,11 +197,18 @@ public boolean moveToTrash(Path path) throws IOException {
Path targetPathInTrash = PathUtils.mergePaths(this.trashLocation, fullyResolvedPath);

if (!this.fs.exists(targetPathInTrash.getParent())) {
this.fs.mkdirs(targetPathInTrash.getParent());
if (this.simulate) {
LOG.info("Making a parent directory at " + targetPathInTrash.getParent() + " in trash.");
} else {
this.fs.mkdirs(targetPathInTrash.getParent());
}
} else if (this.fs.exists(targetPathInTrash)) {
targetPathInTrash = targetPathInTrash.suffix("_" + System.currentTimeMillis());
}

if (this.simulate) {
LOG.info("Simulating moving " + fullyResolvedPath + " to " + targetPathInTrash + " in trash.");
return true;
}
return this.fs.rename(fullyResolvedPath, targetPathInTrash);
}

Expand All @@ -211,23 +226,28 @@ public void createTrashSnapshot() throws IOException {

Path snapshotDir = new Path(this.trashLocation, new DateTime().toString(TRASH_SNAPSHOT_NAME_FORMATTER));
if (this.fs.exists(snapshotDir)) {
throw new IOException("New snapshot directory " + snapshotDir.toString() + " already exists.");
throw new IOException("New snapshot directory " + snapshotDir + " already exists.");
}

if (!safeFsMkdir(fs, snapshotDir, PERM)) {
throw new IOException("Failed to create new snapshot directory at " + snapshotDir.toString());
if (this.simulate) {
LOG.info("Simulating creation of new snapshot directory at " + snapshotDir);
} else if (!safeFsMkdir(fs, snapshotDir, PERM)) {
throw new IOException("Failed to create new snapshot directory at " + snapshotDir);
}

LOG.info(String.format("Moving %d paths in Trash directory to newly created snapshot at %s.", pathsInTrash.length,
snapshotDir.toString()));
snapshotDir));

int pathsFailedToMove = 0;
for (FileStatus fileStatus : pathsInTrash) {
Path pathRelativeToTrash = PathUtils.relativizePath(fileStatus.getPath(), this.trashLocation);
Path targetPath = new Path(snapshotDir, pathRelativeToTrash);
boolean movedThisPath = true;
try {
movedThisPath = this.fs.rename(fileStatus.getPath(), targetPath);
if (this.simulate) {
LOG.info("Simulating moving of " + fileStatus.getPath() + " to " + targetPath + " in snapshot.");
} else {
movedThisPath = this.fs.rename(fileStatus.getPath(), targetPath);
}
} catch (IOException exception) {
LOG.error("Failed to move path " + fileStatus.getPath().toString() + " to snapshot.", exception);
pathsFailedToMove += 1;
Expand Down Expand Up @@ -275,11 +295,15 @@ public int compare(FileStatus o1, FileStatus o2) {
for (FileStatus snapshot : snapshotsInTrash) {
if (this.snapshotCleanupPolicy.shouldDeleteSnapshot(snapshot, this)) {
try {
boolean successfullyDeleted = this.fs.delete(snapshot.getPath(), true);
if (successfullyDeleted) {
snapshotsDeleted++;
if (this.simulate) {
LOG.info("Simulating delete of snapshot " + snapshot.getPath());
} else {
LOG.error("Failed to delete snapshot " + snapshot.getPath());
boolean successfullyDeleted = this.fs.delete(snapshot.getPath(), true);
if (successfullyDeleted) {
snapshotsDeleted++;
} else {
LOG.error("Failed to delete snapshot " + snapshot.getPath());
}
}
} catch (IOException exception) {
LOG.error("Failed to delete snapshot " + snapshot.getPath(), exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public class TrashFactory {
public static final String SIMULATE = "gobblin.trash.simulate";
public static final String SKIP_TRASH = "gobblin.trash.skip.trash";

// Configuration to avoid using MockTrash - as Trash implementations get more complex it's better to have the Trash class itself support simulate
public static final String SIMULATE_USING_ACTUAL_TRASH = "gobblin.trash.simulate.actual.trash";

public static Trash createTrash(FileSystem fs) throws IOException {
return createTrash(fs, new Properties());
}
Expand Down Expand Up @@ -107,8 +110,8 @@ private static ProxiedTrash createTestMockOrImmediateDeletionTrash(FileSystem fs
LOG.info("Creating a test trash. Nothing will actually be deleted.");
return new TestTrash(fs, props, user);
}
if(props.containsKey(SIMULATE) && Boolean.parseBoolean(props.getProperty(SIMULATE))) {
LOG.info("Creating a simulate trash. Nothing will actually be deleted.");
if(props.containsKey(SIMULATE) && Boolean.parseBoolean(props.getProperty(SIMULATE)) && !Boolean.parseBoolean(props.getProperty(SIMULATE_USING_ACTUAL_TRASH)) ) {
LOG.info("Creating a mock trash. Nothing will actually be deleted.");
return new MockTrash(fs, props, user);
}
if(props.containsKey(SKIP_TRASH) && Boolean.parseBoolean(props.getProperty(SKIP_TRASH))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package org.apache.gobblin.data.management.trash;


import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
Expand Down Expand Up @@ -241,4 +241,81 @@ public Boolean answer(InvocationOnMock invocation)
}
}

@Test
public void testMoveToTrashSimulate() throws IOException {
Properties properties = new Properties();
properties.setProperty(Trash.SNAPSHOT_CLEANUP_POLICY_CLASS_KEY, TestCleanupPolicy.class.getCanonicalName());
properties.setProperty(TrashFactory.SIMULATE, "true");
properties.setProperty(TrashFactory.SIMULATE_USING_ACTUAL_TRASH, "true");
properties.setProperty(Trash.TRASH_LOCATION_KEY, "/trash/dir");
FileSystem mockTrash = mock(FileSystem.class);
when(mockTrash.makeQualified(any(Path.class))).thenReturn(new Path("/trash/dir"));
Trash trash = TrashFactory.createTrash(mockTrash, properties);

Path pathToDelete = new Path("/path/to/delete");

final List<Pair<Path, Path>> movedPaths = Lists.newArrayList();

when(trash.fs.rename(any(Path.class), any(Path.class))).thenAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation)
throws Throwable {
Object[] args = invocation.getArguments();
movedPaths.add(new Pair<Path, Path>((Path) args[0], (Path) args[1]));
return true;
}
});

Assert.assertTrue(trash.moveToTrash(pathToDelete));

verify(trash.fs, times(0)).mkdirs(any(Path.class));

Assert.assertEquals(movedPaths.size(), 0);

}

@Test
public void testPurgeSnapshotsWithSimulate() throws IOException {

try {
Properties properties = new Properties();
properties.setProperty(Trash.SNAPSHOT_CLEANUP_POLICY_CLASS_KEY, TestCleanupPolicy.class.getCanonicalName());
properties.setProperty(TrashFactory.SIMULATE, "true");
properties.setProperty(TrashFactory.SIMULATE_USING_ACTUAL_TRASH, "true");
properties.setProperty(Trash.TRASH_LOCATION_KEY, "/trash/dir");
FileSystem mockTrash = mock(FileSystem.class);
when(mockTrash.makeQualified(any(Path.class))).thenReturn(new Path("/trash/dir"));
Trash trash = TrashFactory.createTrash(mockTrash, properties);

DateTimeUtils.setCurrentMillisFixed(new DateTime(2015, 7, 15, 10, 0).withZone(DateTimeZone.UTC).getMillis());

final List<Path> deletedPaths = Lists.newArrayList();

Path snapshot1 = new Path(trash.getTrashLocation(), Trash.TRASH_SNAPSHOT_NAME_FORMATTER.print(new DateTime()));
Path snapshot2 = new Path(trash.getTrashLocation(),
Trash.TRASH_SNAPSHOT_NAME_FORMATTER.print(new DateTime().minusDays(1)));

when(trash.fs.listStatus(any(Path.class), any(PathFilter.class))).
thenReturn(
Lists.newArrayList(
new FileStatus(0, true, 0, 0, 0, snapshot1),
new FileStatus(0, true, 0, 0, 0, snapshot2))
.toArray(new FileStatus[]{}));
when(trash.fs.delete(any(Path.class), anyBoolean())).thenAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation)
throws Throwable {
deletedPaths.add((Path) invocation.getArguments()[0]);
return true;
}
});

trash.purgeTrashSnapshots();

Assert.assertEquals(deletedPaths.size(), 0);
} finally {
DateTimeUtils.setCurrentMillisSystem();
}
}

}

0 comments on commit 7c8127c

Please sign in to comment.