Skip to content

Commit

Permalink
Revert "[Java] Account for client-side memory mapped files."
Browse files Browse the repository at this point in the history
This reverts commit adfe9aa.
  • Loading branch information
vyazelenko committed Sep 20, 2024
1 parent adfe9aa commit 43be7b7
Show file tree
Hide file tree
Showing 7 changed files with 6 additions and 215 deletions.
28 changes: 1 addition & 27 deletions aeron-client/src/main/java/io/aeron/Aeron.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.agrona.concurrent.broadcast.CopyBroadcastReceiver;
import org.agrona.concurrent.ringbuffer.ManyToOneRingBuffer;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersReader;

import java.io.File;
Expand Down Expand Up @@ -73,7 +72,6 @@ public class Aeron implements AutoCloseable
public static final int NULL_VALUE = -1;

private static final VarHandle IS_CLOSED_VH;

static
{
try
Expand Down Expand Up @@ -961,7 +959,6 @@ public static class Context extends CommonContext
private long closeLingerDurationNs = Configuration.closeLingerDurationNs();

private ThreadFactory threadFactory = Thread::new;
private AtomicCounter mappedBytesCounter;

/**
* Perform a shallow copy of the object.
Expand Down Expand Up @@ -1061,16 +1058,9 @@ else if (clientLock instanceof NoOpLock && !useConductorAgentInvoker)
countersValuesBuffer(CncFileDescriptor.createCountersValuesBuffer(cncByteBuffer, cncMetaDataBuffer));
}

if (null == mappedBytesCounter)
{
mappedBytesCounter = new AtomicCounter(countersValuesBuffer(), 35); // Bytes currently mapped
}

mappedBytesCounter.getAndAdd(cncByteBuffer.capacity());

if (null == logBuffersFactory)
{
logBuffersFactory = new MappedLogBuffersFactory(mappedBytesCounter);
logBuffersFactory = new MappedLogBuffersFactory();
}

if (null == errorHandler)
Expand Down Expand Up @@ -1780,26 +1770,11 @@ public PublicationErrorFrameHandler publicationErrorFrameHandler()
*/
public void close()
{
if (null != mappedBytesCounter)
{
mappedBytesCounter.getAndAdd(-cncByteBuffer.capacity());
}
BufferUtil.free(cncByteBuffer);
this.cncByteBuffer = null;
super.close();
}

AtomicCounter mappedBytesCounter()
{
return mappedBytesCounter;
}

Context mappedBytesCounter(final AtomicCounter mappedBytesCounter)
{
this.mappedBytesCounter = mappedBytesCounter;
return this;
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -1842,7 +1817,6 @@ public String toString()
"\n resourceLingerDurationNs=" + resourceLingerDurationNs +
"\n closeLingerDurationNs=" + closeLingerDurationNs +
"\n threadFactory=" + threadFactory +
"\n mappedBytesCounter=" + mappedBytesCounter +
"\n}";
}

Expand Down
20 changes: 1 addition & 19 deletions aeron-client/src/main/java/io/aeron/LogBuffers.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.agrona.CloseHelper;
import org.agrona.LangUtil;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -51,20 +50,13 @@ public final class LogBuffers implements AutoCloseable
private final ByteBuffer[] termBuffers = new ByteBuffer[PARTITION_COUNT];
private final UnsafeBuffer logMetaDataBuffer;
private final MappedByteBuffer[] mappedByteBuffers;
private final AtomicCounter mappedBytesCounter;
private long logLength;

/**
* Construct the log buffers for a given log file.
*
* @param logFileName to be mapped.
*/
public LogBuffers(final String logFileName)
{
this(logFileName, null);
}

LogBuffers(final String logFileName, final AtomicCounter mappedBytesCounter)
{
int termLength = 0;
FileChannel fileChannel = null;
Expand All @@ -74,7 +66,7 @@ public LogBuffers(final String logFileName)
try
{
fileChannel = FileChannel.open(Paths.get(logFileName), FILE_OPTIONS);
logLength = fileChannel.size();
final long logLength = fileChannel.size();
if (logLength < LOG_META_DATA_LENGTH)
{
throw new IllegalStateException(
Expand Down Expand Up @@ -161,12 +153,6 @@ public LogBuffers(final String logFileName)
this.fileChannel = fileChannel;
this.logMetaDataBuffer = logMetaDataBuffer;
this.mappedByteBuffers = mappedByteBuffers;
this.mappedBytesCounter = mappedBytesCounter;

if (null != mappedBytesCounter)
{
mappedBytesCounter.getAndAdd(logLength);
}
}

/**
Expand Down Expand Up @@ -232,10 +218,6 @@ public void preTouch()
public void close()
{
close(fileChannel, logMetaDataBuffer, mappedByteBuffers);
if (null != mappedBytesCounter)
{
mappedBytesCounter.getAndAdd(-logLength);
}
}

/**
Expand Down
11 changes: 1 addition & 10 deletions aeron-client/src/main/java/io/aeron/MappedLogBuffersFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,13 @@
*/
package io.aeron;

import org.agrona.concurrent.status.AtomicCounter;

/**
* Default factory for mapping log buffers in the client.
*/
class MappedLogBuffersFactory implements LogBuffersFactory
{
private final AtomicCounter mappedBytesCounter;

MappedLogBuffersFactory(final AtomicCounter mappedBytesCounter)
{
this.mappedBytesCounter = mappedBytesCounter;
}

public LogBuffers map(final String logFileName)
{
return new LogBuffers(logFileName, mappedBytesCounter);
return new LogBuffers(logFileName);
}
}
14 changes: 0 additions & 14 deletions aeron-client/src/test/java/io/aeron/AeronContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@
*/
package io.aeron;

import org.agrona.concurrent.status.AtomicCounter;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullAndEmptySource;
import org.junit.jupiter.params.provider.ValueSource;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.mock;

class AeronContextTest
{
Expand All @@ -46,15 +43,4 @@ void shouldAssignClientName(final String clientName)
context.clientName(clientName);
assertSame(clientName, context.clientName());
}

@Test
void shouldAssignMappedBytesCounter()
{
final Aeron.Context context = new Aeron.Context();
assertNull(context.mappedBytesCounter());

final AtomicCounter counter = mock(AtomicCounter.class);
assertSame(context, context.mappedBytesCounter(counter));
assertSame(counter, context.mappedBytesCounter());
}
}
71 changes: 0 additions & 71 deletions aeron-client/src/test/java/io/aeron/LogBuffersTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,19 @@
package io.aeron;

import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.InOrder;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

import static io.aeron.logbuffer.LogBufferDescriptor.*;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;

class LogBuffersTest
{
Expand Down Expand Up @@ -95,68 +88,4 @@ void throwsIllegalStateExceptionIfLogFileSizeIsLessThanLogMetaDataLength(@TempDi
assertEquals("Log file length less than min length of " + LOG_META_DATA_LENGTH + ": length=" + fileLength,
exception.getMessage());
}

@Test
void mapExistingFile(@TempDir final Path dir) throws IOException
{
final int termLength = TERM_MIN_LENGTH;
final int pageSize = PAGE_MIN_SIZE;
final Path logFile = createLogFile(dir, termLength, pageSize);

try (LogBuffers logBuffers = new LogBuffers(logFile.toString()))
{
final UnsafeBuffer metaDataBuffer = logBuffers.metaDataBuffer();
assertNotNull(metaDataBuffer);
assertEquals(termLength, termLength(metaDataBuffer));
assertEquals(termLength, logBuffers.termLength());
assertEquals(pageSize, pageSize(metaDataBuffer));
}
}

@Test
void mapFileAndCaptureMappedSize() throws IOException
{
final Path logFile = createLogFile(Files.createTempDirectory("test"), TERM_MAX_LENGTH, PAGE_MIN_SIZE * 4);
final long logFileSize = Files.size(logFile);

final AtomicCounter mappedBytesCounter = mock(AtomicCounter.class);
final LogBuffers logBuffers = new LogBuffers(logFile.toString(), mappedBytesCounter);

for (int i = 0; i < 5; i++)
{
logBuffers.duplicateTermBuffers();
}

logBuffers.close();

final InOrder inOrder = inOrder(mappedBytesCounter);
inOrder.verify(mappedBytesCounter).getAndAdd(logFileSize);
inOrder.verify(mappedBytesCounter).getAndAdd(-logFileSize);
inOrder.verifyNoMoreInteractions();
}

private static Path createLogFile(final Path dir, final int termLength, final int pageSize) throws IOException
{
final long logFileSize = (long)termLength * PARTITION_COUNT + LOG_META_DATA_LENGTH;
final UnsafeBuffer buffer = new UnsafeBuffer(ByteBuffer.allocateDirect(LOG_META_DATA_LENGTH));
termLength(buffer, termLength);
pageSize(buffer, pageSize);
final Path logFile = dir.resolve("some.log");
try (FileChannel fileChannel =
FileChannel.open(logFile, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW))
{
final ByteBuffer byteBuffer = buffer.byteBuffer();
byteBuffer.position(0).limit(LOG_META_DATA_LENGTH);

long position = logFileSize - LOG_META_DATA_LENGTH;
do
{
position += fileChannel.write(byteBuffer, position);
}
while (byteBuffer.remaining() > 0);
}

assertEquals(logFileSize, Files.size(logFile));
return logFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class MappedRawLog implements RawLog
preTouchPages(termBuffers, termLength, filePageSize);
}

mappedBytesCounter.getAndAdd(logLength);
mappedBytesCounter.getAndAddOrdered(logLength);
}
}
catch (final IOException ex)
Expand All @@ -147,7 +147,7 @@ public boolean free()
BufferUtil.free(mappedBuffers[i]);
}

mappedBytesCounter.getAndAdd(-logLength);
mappedBytesCounter.getAndAddOrdered(-logLength);

logMetaDataBuffer.wrap(0, 0);
for (int i = 0; i < termBuffers.length; i++)
Expand Down
Loading

0 comments on commit 43be7b7

Please sign in to comment.