Skip to content

Commit

Permalink
Avoid blocking in ASMBasedTaskDescriptor (#336)
Browse files Browse the repository at this point in the history
Currently the ASMBasedTaskDescriptor uses ByteBuddy to hook into
the class loader to analyze bytecode so that ParSeq can generate
descriptions of tasks that were created via lambda functions to
aid in debugging and tracing.

In production, blocking on the lambda can cause a global lockout
of threads. This can happen because of the common fork join pool
being starved or because the classes being analyzed take a while.

If a class has already been analyzed, we do not need to await
the latch because we have a thread safe collection to access.
Classes can be analyzed multiple times in a JVM lifecycle because
of class unloading, but we don't care about subsequent analyses,
(the bytecode should not change with load -> unload -> load).
  • Loading branch information
cbrentharris authored Oct 13, 2023
1 parent 51ce337 commit ed54dc8
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 100 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
v5.1.14
------
* Update ASMBasedTaskDescriptor to avoid blocking when classes have already been analyzed

v5.1.13
------
* Pin python version for the "publish" github action
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version=5.1.13
version=5.1.14
group=com.linkedin.parseq
org.gradle.parallel=true
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import net.bytebuddy.matcher.ElementMatchers;
import net.bytebuddy.utility.JavaModule;


/**
* An ASM based implementation of {@link TaskDescriptor} to provide description for generated Lambda class.
* Description of Lambda expression includes source code location of lambda, function call or method reference
Expand Down Expand Up @@ -71,112 +72,109 @@ static void onExit(@Advice.Argument(0) Class<?> hostClass, @Advice.Argument(1) b

static {

try {
Instrumentation inst = ByteBuddyAgent.install();

/*
* If we can get the instance of jdk.internal.misc.Unsafe then we will
* attempt to instrument Unsafe.defineAnonymousClass(...) to capture classes
* generated for lambdas.
* This approach does not work for Oracle Java 8 because
* sun.misc.Unsafe.defineAnonymousClass(...) is a native method and we can
* at most replace it but there is no reasonably easy way to replace it and
* still invoke the original method.
*/
boolean isJdkUnsafe = false;
try {
Instrumentation inst = ByteBuddyAgent.install();

/*
* If we can get the instance of jdk.internal.misc.Unsafe then we will
* attempt to instrument Unsafe.defineAnonymousClass(...) to capture classes
* generated for lambdas.
* This approach does not work for Oracle Java 8 because
* sun.misc.Unsafe.defineAnonymousClass(...) is a native method and we can
* at most replace it but there is no reasonably easy way to replace it and
* still invoke the original method.
*/
boolean isJdkUnsafe = false;
Class<?> unsafe = null;
try {
unsafe = Class.forName("jdk.internal.misc.Unsafe");
isJdkUnsafe = true;
} catch (ClassNotFoundException e) {
}

if (isJdkUnsafe) {
// Code path that supports OpenJDK Java 11 and up

/*
* Inject AnalyzerAdvice to boot ClassLoader.
* It has to be reachable from jdk.internal.misc.Unsafe.
*/
ClassInjector.UsingUnsafe.ofBootLoader()
.inject(Collections.singletonMap(
new TypeDescription.ForLoadedType(AnalyzerAdvice.class),
ClassFileLocator.ForClassLoader.read(AnalyzerAdvice.class)
)
);

/*
* Inject the analyze(byte[] byteCode, ClassLoader loader) method from this ClassLoader
* to the AnalyzerAdvice class from boot ClassLoader.
*/
Class<?> injectedInt = ClassLoader.getSystemClassLoader().getParent().loadClass(AnalyzerAdvice.class.getName());
injectedInt.getField("_method").set(null, Analyzer.class.getDeclaredMethod("analyze", byte[].class, ClassLoader.class));

JavaModule module = JavaModule.ofType(injectedInt);

new AgentBuilder.Default()
.disableClassFormatChanges()
.ignore(noneOf(unsafe))
.with(AgentBuilder.InitializationStrategy.NoOp.INSTANCE)
.with(AgentBuilder.RedefinitionStrategy.REDEFINITION)
.with(AgentBuilder.TypeStrategy.Default.REDEFINE)
.with(AgentBuilder.InjectionStrategy.UsingUnsafe.INSTANCE)
.assureReadEdgeTo(inst, module)
.type(is(unsafe))
.transform(new AgentBuilder.Transformer() {
@Override
public Builder<?> transform(Builder<?> builder, TypeDescription typeDescription,
ClassLoader classLoader, JavaModule module) {
return builder.visit(Advice.to(AnalyzerAdvice.class).on(ElementMatchers.named("defineAnonymousClass")));
}
}).installOnByteBuddyAgent();
} else {
// Code path that supports Oracle Java 8 and 9
if (isJdkUnsafe) {
// Code path that supports OpenJDK Java 11 and up

/*
* Inject AnalyzerAdvice to boot ClassLoader.
* It has to be reachable from jdk.internal.misc.Unsafe.
*/
ClassInjector.UsingUnsafe.ofBootLoader()
.inject(Collections.singletonMap(new TypeDescription.ForLoadedType(AnalyzerAdvice.class),
ClassFileLocator.ForClassLoader.read(AnalyzerAdvice.class)));

/*
* Inject the analyze(byte[] byteCode, ClassLoader loader) method from this ClassLoader
* to the AnalyzerAdvice class from boot ClassLoader.
*/
Class<?> injectedInt = ClassLoader.getSystemClassLoader().getParent().loadClass(AnalyzerAdvice.class.getName());
injectedInt.getField("_method")
.set(null, Analyzer.class.getDeclaredMethod("analyze", byte[].class, ClassLoader.class));

JavaModule module = JavaModule.ofType(injectedInt);

new AgentBuilder.Default().disableClassFormatChanges()
.ignore(noneOf(unsafe))
.with(AgentBuilder.InitializationStrategy.NoOp.INSTANCE)
.with(AgentBuilder.RedefinitionStrategy.REDEFINITION)
.with(AgentBuilder.TypeStrategy.Default.REDEFINE)
.with(AgentBuilder.InjectionStrategy.UsingUnsafe.INSTANCE)
.assureReadEdgeTo(inst, module)
.type(is(unsafe))
.transform(new AgentBuilder.Transformer() {
@Override
public Builder<?> transform(Builder<?> builder, TypeDescription typeDescription, ClassLoader classLoader,
JavaModule module) {
return builder.visit(Advice.to(AnalyzerAdvice.class).on(ElementMatchers.named("defineAnonymousClass")));
}
})
.installOnByteBuddyAgent();
} else {
// Code path that supports Oracle Java 8 and 9
inst.addTransformer(new Analyzer());
}
} catch(Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public String getDescription(String className) {
Optional<String> lambdaClassDescription = getLambdaClassDescription(className);
if (lambdaClassDescription.isPresent()) {
return lambdaClassDescription.get();
} else {
return className;
}
return lambdaClassDescription.orElse(className);
}

Optional<String> getLambdaClassDescription(String className) {
int slashIndex = className.lastIndexOf('/');
// If we can't find the slash, we can't find the name of the lambda.
if (slashIndex <= 0) {
return Optional.empty();
}
String name = className.substring(0, slashIndex);
String description = _names.get(name);

// If we have already analyzed the class, we don't need to await
// analysis on other lambdas.
if (description != null) {
return Optional.of(description).filter(s -> !s.isEmpty());
}

CountDownLatch latch = _latchRef.get();
if (latch != null) {
try {
/*
* We wait up to one minute - an arbitrary, sufficiently large amount of time.
* The wait period must be bounded to avoid locking out JVM.
*/
// We wait up to one minute - an arbitrary, sufficiently large amount of time.
// The wait period must be bounded to avoid locking out JVM.
latch.await(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
System.out.println("ERROR: ParSeq Latch timed out suggesting serious issue in ASMBasedTaskDescriptor. "
+ "Current number of class being analyzed: " + String.valueOf(_count.get()));
System.err.println("ERROR: ParSeq Latch timed out suggesting serious issue in ASMBasedTaskDescriptor. "
+ "Current number of class being analyzed: " + _count.get());
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
int slashIndex = className.lastIndexOf('/');
if (slashIndex > 0) {
String name = className.substring(0, slashIndex);
String desc = _names.get(name);
if (desc == null || desc.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(desc);
}
}

return Optional.empty();
// Try again
return Optional.ofNullable(_names.get(name)).filter(s -> !s.isEmpty());
}

private static void add(String lambdaClassName, String description) {
Expand All @@ -187,8 +185,7 @@ public static class Analyzer implements ClassFileTransformer {

@Override
public byte[] transform(ClassLoader loader, String className, Class<?> classBeingRedefined,
ProtectionDomain protectionDomain, byte[] classfileBuffer)
throws IllegalClassFormatException {
ProtectionDomain protectionDomain, byte[] classfileBuffer) throws IllegalClassFormatException {
if (className == null && loader != null) {
analyze(classfileBuffer, loader);
}
Expand All @@ -208,23 +205,20 @@ public static void analyze(byte[] byteCode, ClassLoader loader) {
}
}
final Exception e = new Exception();
ForkJoinPool.commonPool().execute(new Runnable() {
@Override
public void run() {
try {
doAnalyze(byteCode, loader, e);
} catch (Throwable t) {
/*
* We need to catch everything because other
* threads may be blocked on CountDownLatch.
*/
System.out.println("WARNING: Parseq cannot doAnalyze");
t.printStackTrace();
}
if (_count.decrementAndGet() == 0) {
CountDownLatch latch = _latchRef.getAndSet(null);
latch.countDown();
}
ForkJoinPool.commonPool().execute(() -> {
try {
doAnalyze(byteCode, loader, e);
} catch (Throwable t) {
/*
* We need to catch everything because other
* threads may be blocked on CountDownLatch.
*/
System.out.println("WARNING: Parseq cannot doAnalyze");
t.printStackTrace();
}
if (_count.decrementAndGet() == 0) {
CountDownLatch latch = _latchRef.getAndSet(null);
latch.countDown();
}
});
}
Expand Down

0 comments on commit ed54dc8

Please sign in to comment.