Skip to content

Cleanup 11apr25 #4086

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/java/org/apache/cassandra/io/util/File.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,15 @@ public void deleteRecursive()
PathUtils.deleteRecursive(toPathForWrite());
}

/**
* Deletes all files and subdirectories under "dir".
* @return false if the root cannot be deleted
*/
public boolean tryDeleteRecursive()
{
return PathUtils.tryDeleteRecursive(toPathForWrite());
}

/**
* Try to delete the file on process exit.
*/
Expand Down
46 changes: 36 additions & 10 deletions src/java/org/apache/cassandra/io/util/PathUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,22 @@ public static Throwable delete(Path file, Throwable accumulate, @Nullable RateLi
private static void deleteRecursiveUsingNixCommand(Path path, boolean quietly)
{
String [] cmd = new String[]{ "rm", quietly ? "-rdf" : "-rd", path.toAbsolutePath().toString() };
IOException failure = null;
if (!quietly && !Files.exists(path))
failure = new NoSuchFileException(path.toString());

if (failure == null)
failure = tryDeleteRecursiveUsingNixCommand(path, quietly);

if (failure != null)
throw propagateUnchecked(failure, path, true);
}

private static IOException tryDeleteRecursiveUsingNixCommand(Path path, boolean quietly)
{
String[] cmd = new String[]{ "rm", quietly ? "-rdf" : "-rd", path.toAbsolutePath().toString() };
try
{
if (!quietly && !Files.exists(path))
throw new NoSuchFileException(path.toString());

Process p = Runtime.getRuntime().exec(cmd);
int result = p.waitFor();

Expand All @@ -363,24 +374,39 @@ private static void deleteRecursiveUsingNixCommand(Path path, boolean quietly)
}

if (result != 0 && Files.exists(path))
{
logger.error("{} returned:\nstdout:\n{}\n\nstderr:\n{}", Arrays.toString(cmd), out, err);
throw new IOException(String.format("%s returned non-zero exit code: %d%nstdout:%n%s%n%nstderr:%n%s", Arrays.toString(cmd), result, out, err));
}
return new IOException(String.format("%s returned non-zero exit code: %d%nstdout:%n%s%n%nstderr:%n%s", Arrays.toString(cmd), result, out, err));

onDeletion.accept(path);
return null;
}
catch (IOException e)
{
throw propagateUnchecked(e, path, true);
return e;
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
throw new FSWriteError(e, path);
return new IOException("Interrupted while executing command " + Arrays.toString(cmd), e);
}
}


/**
* Deletes all files and subdirectories under "path".
* @param path file to be deleted
* @return false if the root cannot be deleted
*/
public static boolean tryDeleteRecursive(Path path)
{
if (USE_NIX_RECURSIVE_DELETE.getBoolean() && path.getFileSystem() == java.nio.file.FileSystems.getDefault())
return null == tryDeleteRecursiveUsingNixCommand(path, true);

if (isDirectory(path))
forEach(path, PathUtils::tryDeleteRecursive);

// The directory should now be empty, so now it can be smoked
return tryDelete(path);
}

/**
* Deletes all files and subdirectories under "path".
* @param path file to be deleted
Expand Down
18 changes: 15 additions & 3 deletions src/java/org/apache/cassandra/journal/Flusher.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,20 @@ public void run(Interruptible.State state) throws InterruptedException
}
}

private boolean hasWork()
{
return hasWork(fsyncStartedFor);
}

private boolean hasWork(long lastStartedAt)
{
return fsyncWaitingSince != lastStartedAt;
}

private void awaitWork() throws InterruptedException
{
long lastStartedAt = fsyncStartedFor;
if (fsyncWaitingSince != lastStartedAt)
if (hasWork(lastStartedAt))
return;

awaitingWork = Thread.currentThread();
Expand All @@ -158,7 +168,7 @@ private void awaitWork() throws InterruptedException
throw new InterruptedException();
}

if (fsyncWaitingSince != lastStartedAt)
if (hasWork(lastStartedAt))
break;

LockSupport.park();
Expand All @@ -175,7 +185,9 @@ void notify(Thread notify)

public void doRun(Interruptible.State state) throws InterruptedException
{
awaitWork();
if (state == NORMAL) awaitWork();
else if (!hasWork()) return;

if (fsyncing == null)
fsyncing = journal.oldestActiveSegment();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import accord.topology.Shard;
import accord.topology.Topology;
import accord.utils.Invariants;
import accord.utils.SortedListSet;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
import org.agrona.collections.LongArrayList;
Expand Down Expand Up @@ -449,8 +450,7 @@ protected void localSyncComplete(Topology topology, boolean startSync)
epochState.setSyncStatus(SyncStatus.NOTIFYING);
}

// TODO (required): replace with SortedArraySet when it is available
Set<Node.Id> notify = new HashSet<>(topology.nodes());
Set<Node.Id> notify = SortedListSet.allOf(topology.nodes());
notify.remove(localId);
syncPropagator.reportSyncComplete(epoch, notify, localId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,6 @@ public void reserialize(JournalKey key, DurableBeforeAccumulator from, DataOutpu
@Override
public void deserialize(JournalKey journalKey, DurableBeforeAccumulator into, DataInputPlus in, Version userVersion) throws IOException
{
// TODO: maybe using local serializer is not the best call here, but how do we distinguish
// between messaging and disk versioning?
into.update(CommandStoreSerializers.durableBefore.deserialize(in));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ public class AccordKeyspace

public static final Set<String> TABLE_NAMES = ImmutableSet.of(COMMANDS_FOR_KEY, JOURNAL);

// TODO (desired): implement a custom type so we can get correct sort order
public static final TupleType TIMESTAMP_TYPE = new TupleType(Lists.newArrayList(LongType.instance, LongType.instance, Int32Type.instance));

private static final ClusteringIndexFilter FULL_PARTITION = new ClusteringIndexNamesFilter(BTreeSet.of(new ClusteringComparator(), Clustering.EMPTY), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ public <P1, P2> void visit(Unseekables<?> keysOrRanges, Timestamp startedBefore,
commandsForRanges.visit(keysOrRanges, startedBefore, testKind, visitor, p1, p2);
}

// TODO (expected): instead of accepting a slice, accept the min/max epoch and let implementation handle it
@Override
public boolean visit(Unseekables<?> keysOrRanges, TxnId testTxnId, Txn.Kind.Kinds testKind, TestStartedAt testStartedAt, Timestamp testStartedAtTimestamp, ComputeIsDep computeIsDep, AllCommandVisitor visit)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestFailureException;
import org.apache.cassandra.journal.Params;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.IVerbHandler;
Expand Down Expand Up @@ -475,7 +476,8 @@ private TopologyRange fetchTopologies(long from) throws ExecutionException, Inte
}
catch (Throwable e)
{
logger.info("Failed to fetch epochs [{}, {}] from {}", from, metadata.epoch.getEpoch(), peer);
if (e instanceof RequestFailureException) logger.info("Failed to fetch epochs [{}, {}] from {}", from, metadata.epoch.getEpoch(), peer);
else logger.info("Failed to fetch epochs [{}, {}] from {}", from, metadata.epoch.getEpoch(), peer, e);
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/java/org/apache/cassandra/service/accord/AccordTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,11 @@ boolean isComplete()
private final String loggingId;
private static final AtomicLong nextLoggingId = new AtomicLong(Clock.Global.currentTimeMillis());

// TODO (expected): merge all of these maps into one
// TODO (desired): merge all of these maps into one
@Nullable Object2ObjectHashMap<TxnId, AccordSafeCommand> commands;
@Nullable Object2ObjectHashMap<RoutingKey, AccordSafeCommandsForKey> commandsForKey;
@Nullable Object2ObjectHashMap<Object, AccordSafeState<?, ?>> loading;
// TODO (expected): collection supporting faster deletes but still fast poll (e.g. some ordered collection)
// TODO (desired): collection supporting faster deletes but still fast poll (e.g. some ordered collection)
@Nullable ArrayDeque<AccordCacheEntry<?, ?>> waitingToLoad;
@Nullable RangeTxnScanner rangeScanner;
boolean hasRanges;
Expand Down Expand Up @@ -662,7 +662,6 @@ public void run()
safeStore = commandStore.begin(this, commandsForRanges);
R result = apply(safeStore);

// TODO (required): currently, we are not very efficient about ensuring that we persist the absolute minimum amount of state. Improve that.
List<Journal.CommandUpdate> changes = null;
if (commands != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

import static accord.local.CommandSummaries.SummaryStatus.NOT_DIRECTLY_WITNESSED;

// TODO (required): move to accord-core, merge with existing logic there
// TODO (expected): move to accord-core, merge with existing logic there
public class CommandsForRanges extends TreeMap<Timestamp, Summary> implements CommandSummaries.ByTxnIdSnapshot
{
public CommandsForRanges(Map<? extends Timestamp, ? extends Summary> m)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ private static ReadDataSerializer serializerFor(ReadType type)

public static final class ReplySerializer<D extends Data> implements IVersionedSerializer<ReadReply>
{
// TODO (expected): use something other than ordinal
final CommitOrReadNack[] nacks = CommitOrReadNack.values();
private final VersionedSerializer<D, Version> dataSerializer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

public class ResultSerializers
{
// TODO (expected): this is meant to encode e.g. whether the transaction's condition met or not for clients to later query
// TODO (desired): this is meant to encode e.g. whether the transaction's condition met or not for clients to later query
public static final Result APPLIED = new Result()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public final T deserialize(DataInputPlus in, Version version) throws IOException
{
TxnId txnId = CommandSerializers.txnId.deserialize(in);
Route<?> scope = KeySerializers.route.deserialize(in);
// TODO: there should be a base epoch
// TODO (desired): there should be a base epoch
long waitForEpoch = in.readUnsignedVInt();
return deserializeBody(in, version, txnId, scope, waitForEpoch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public UpdateParameters updateParameters(TableMetadata metadata, DecoratedKey dk
// For the time being, guardrails are disabled for Accord queries.
ClientState disabledGuardrails = null;

// TODO : How should Accord work with TTL?
int ttl = metadata.params.defaultTimeToLive;
return new RowUpdateParameters(metadata,
disabledGuardrails,
Expand All @@ -103,7 +102,6 @@ private Map<DecoratedKey, Partition> prefetchRow(TableMetadata metadata, Decorat
checkState(data.entrySet().size() == 1, "CAS read should only have one entry");
return ImmutableMap.of(dk, value);
case AUTO_READ:
// TODO (review): Is this the right DK being passed into that matches what we used to store in TxnDataName
if (TxnData.txnDataNameIndex(name) == index)
return ImmutableMap.of(dk, value);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,6 @@ public AsyncChain<Data> read(TableMetadatas tables, ConsistencyLevel consistency
if (command == null)
return AsyncResults.success(TxnData.NOOP_DATA);

// TODO (required, safety): before release, double check reasoning that this is safe
// AccordCommandsForKey cfk = ((SafeAccordCommandStore)safeStore).commandsForKey(key);
// int nowInSeconds = cfk.nowInSecondsFor(executeAt, isForWriteTxn);
// It's fine for our nowInSeconds to lag slightly our insertion timestamp, as to the user
// this simply looks like the transaction witnessed TTL'd data and the data then expired
// immediately after the transaction executed, and this simplifies things a great deal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,15 @@ public boolean preserveTimestamps()
public Update slice(Ranges ranges)
{
Keys keys = this.keys.slice(ranges);
// TODO: Slice the condition.
// TODO (desired): Slice the condition.
return new TxnUpdate(tables, keys, select(this.keys, keys, fragments), condition, cassandraCommitCL, preserveTimestamps);
}

@Override
public Update intersecting(Participants<?> participants)
{
Keys keys = this.keys.intersecting(participants);
// TODO: Slice the condition.
// TODO (desired): Slice the condition.
return new TxnUpdate(tables, keys, select(this.keys, keys, fragments), condition, cassandraCommitCL, preserveTimestamps);
}

Expand All @@ -201,9 +201,9 @@ private static ByteBuffer[] select(Keys in, Keys out, ByteBuffer[] from)
@Override
public Update merge(Update update)
{
// TODO: special method for linear merging keyed and non-keyed lists simultaneously
TxnUpdate that = (TxnUpdate) update;
Keys mergedKeys = this.keys.with(that.keys);
// TODO (desired): special method for linear merging keyed and non-keyed lists simultaneously
ByteBuffer[] mergedFragments = merge(this.keys, that.keys, this.fragments, that.fragments, mergedKeys.size());
return new TxnUpdate(tables, mergedKeys, mergedFragments, condition, cassandraCommitCL, preserveTimestamps);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.util.UUID;

import org.junit.Ignore;
import org.junit.Test;

import org.apache.cassandra.auth.CassandraRoleManager;
Expand All @@ -44,7 +43,6 @@
@SuppressWarnings("Convert2MethodRef")
public class HintsMaxSizeTest extends TestBaseImpl
{
@Ignore
@Test
public void testMaxHintedHandoffSize() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

import org.apache.cassandra.config.DatabaseDescriptor;
Expand Down Expand Up @@ -106,7 +105,6 @@ public void bulkLoaderSuccessfullyStreamsOverSsl() throws Throwable
assertRows(CLUSTER.get(1).executeInternal("SELECT count(*) FROM ssl_upload_tables.test"), row(42L));
}

@Ignore
@Test
public void bulkLoaderSuccessfullyStreamsOverSslWithDeprecatedSslStoragePort() throws Throwable
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ public void runQuery(Cluster cluster, ClusterState clusterState, Operation opera
cluster.coordinator(coordinator).execute(withKeyspace(query), ConsistencyLevel.QUORUM);
fail("should fail");
}
catch (Exception ignored) {}
catch (Exception ignored)
{
}

boolean metricBumped = false;
for (int i = 1; i <= cluster.size(); i++)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import org.apache.cassandra.distributed.Cluster;
Expand All @@ -57,7 +56,6 @@

public class SplitBrainTest extends TestBaseImpl
{
@Ignore
@Test
public void testSplitBrainStartup() throws IOException, TimeoutException
{
Expand Down
2 changes: 1 addition & 1 deletion test/unit/org/apache/cassandra/ServerTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private static void cleanupDirectory(File directory)
{
if (directory.exists())
{
Arrays.stream(directory.tryList()).forEach(File::deleteRecursive);
Arrays.stream(directory.tryList()).forEach(File::tryDeleteRecursive);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

import org.apache.cassandra.config.DatabaseDescriptor;
Expand Down Expand Up @@ -110,7 +109,6 @@ public void testSegmentFlaggingWithNonblockingOnCreation() throws Throwable
testWithNonblockingMode(this::testSegmentFlaggingOnCreation0);
}

@Ignore
@Test
public void testNonblockingShouldMaintainSteadyDiskUsage() throws Throwable
{
Expand Down
Loading