blob: ffcb0870dc3c7fab8b6330fc19a80e6a63c92404 [file] [log] [blame]
/*
* Copyright 2000-2014 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.intellij.util.concurrency;
import com.intellij.openapi.application.Application;
import com.intellij.openapi.application.ApplicationManager;
import com.intellij.openapi.application.ModalityState;
import com.intellij.openapi.diagnostic.Logger;
import com.intellij.openapi.progress.ProcessCanceledException;
import com.intellij.openapi.util.Condition;
import com.intellij.openapi.util.Conditions;
import com.intellij.util.Consumer;
import com.intellij.util.PairConsumer;
import org.jetbrains.annotations.NotNull;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
/**
* <p>QueueProcessor processes elements which are being added to a queue via {@link #add(Object)} and {@link #addFirst(Object)} methods.</p>
* <p>Elements are processed one by one in a special single thread.
* The processor itself is passed in the constructor and is called from that thread.
* By default processing starts when the first element is added to the queue, though there is an 'autostart' option which holds
* the processor until {@link #start()} is called.</p>
*
* @param <T> type of queue elements.
*/
public class QueueProcessor<T> {
private static final Logger LOG = Logger.getInstance("#com.intellij.util.concurrency.QueueProcessor");
public static enum ThreadToUse {
AWT,
POOLED
}
private final PairConsumer<T, Runnable> myProcessor;
private final Deque<T> myQueue = new ArrayDeque<T>();
private final Runnable myContinuationContext = new Runnable() {
@Override
public void run() {
synchronized (myQueue) {
isProcessing = false;
if (myQueue.isEmpty()) {
myQueue.notifyAll();
}
else {
startProcessing();
}
}
}
};
private boolean isProcessing;
private boolean myStarted;
private final ThreadToUse myThreadToUse;
private final Condition<?> myDeathCondition;
private final Map<MyOverrideEquals, ModalityState> myModalityState = new HashMap<MyOverrideEquals, ModalityState>();
/**
* Constructs a QueueProcessor, which will autostart as soon as the first element is added to it.
*/
public QueueProcessor(@NotNull Consumer<T> processor) {
this(processor, Conditions.alwaysFalse());
}
/**
* Constructs a QueueProcessor, which will autostart as soon as the first element is added to it.
*/
public QueueProcessor(@NotNull Consumer<T> processor, @NotNull Condition<?> deathCondition) {
this(processor, deathCondition, true);
}
public QueueProcessor(@NotNull Consumer<T> processor, @NotNull Condition<?> deathCondition, boolean autostart) {
this(wrappingProcessor(processor), autostart, ThreadToUse.POOLED, deathCondition);
}
@NotNull
public static QueueProcessor<Runnable> createRunnableQueueProcessor() {
return new QueueProcessor<Runnable>(new RunnableConsumer());
}
@NotNull
public static QueueProcessor<Runnable> createRunnableQueueProcessor(ThreadToUse threadToUse) {
return new QueueProcessor<Runnable>(wrappingProcessor(new RunnableConsumer()), true, threadToUse, Conditions.FALSE);
}
@NotNull
private static <T> PairConsumer<T, Runnable> wrappingProcessor(@NotNull final Consumer<T> processor) {
return new PairConsumer<T, Runnable>() {
@Override
public void consume(final T item, Runnable runnable) {
runSafely(new Runnable() {
@Override
public void run() {
processor.consume(item);
}
});
runnable.run();
}
};
}
/**
* Constructs a QueueProcessor with the given processor and autostart setting.
* By default QueueProcessor starts processing when it receives the first element. Pass <code>false</code> to alternate its behavior.
*
* @param processor processor of queue elements.
* @param autostart if <code>true</code> (which is by default), the queue will be processed immediately when it receives the first element.
* If <code>false</code>, then it will wait for the {@link #start()} command.
* After QueueProcessor has started once, autostart setting doesn't matter anymore: all other elements will be processed immediately.
*/
public QueueProcessor(@NotNull PairConsumer<T, Runnable> processor,
boolean autostart,
@NotNull ThreadToUse threadToUse,
@NotNull Condition<?> deathCondition) {
myProcessor = processor;
myStarted = autostart;
myThreadToUse = threadToUse;
myDeathCondition = deathCondition;
}
/**
* Starts queue processing if it hasn't started yet.
* Effective only if the QueueProcessor was created with no-autostart option: otherwise processing will start as soon as the first element
* is added to the queue.
* If there are several elements in the queue, processing starts from the first one.
*/
public void start() {
synchronized (myQueue) {
if (myStarted) return;
myStarted = true;
if (!myQueue.isEmpty()) {
startProcessing();
}
}
}
public void add(@NotNull T t, ModalityState state) {
synchronized (myQueue) {
myModalityState.put(new MyOverrideEquals(t), state);
}
doAdd(t, false);
}
public void add(@NotNull T element) {
doAdd(element, false);
}
public void addFirst(@NotNull T element) {
doAdd(element, true);
}
private void doAdd(@NotNull T element, boolean atHead) {
synchronized (myQueue) {
if (atHead) {
myQueue.addFirst(element);
}
else {
myQueue.add(element);
}
startProcessing();
}
}
public void clear() {
synchronized (myQueue) {
myQueue.clear();
}
}
public void waitFor() {
synchronized (myQueue) {
while (isProcessing) {
try {
myQueue.wait();
}
catch (InterruptedException e) {
//ok
}
}
}
}
private boolean startProcessing() {
LOG.assertTrue(Thread.holdsLock(myQueue));
if (isProcessing || !myStarted) {
return false;
}
isProcessing = true;
final T item = myQueue.removeFirst();
final Runnable runnable = new Runnable() {
@Override
public void run() {
if (myDeathCondition.value(null)) return;
runSafely(new Runnable() {
@Override
public void run() {
myProcessor.consume(item, myContinuationContext);
}
});
}
};
final Application application = ApplicationManager.getApplication();
if (myThreadToUse == ThreadToUse.AWT) {
final ModalityState state = myModalityState.remove(new MyOverrideEquals(item));
if (state != null) {
application.invokeLater(runnable, state);
}
else {
application.invokeLater(runnable);
}
}
else {
application.executeOnPooledThread(runnable);
}
return true;
}
public static void runSafely(@NotNull Runnable run) {
try {
run.run();
}
catch (ProcessCanceledException e) {
throw e;
}
catch (Throwable e) {
try {
LOG.error(e);
}
catch (Throwable e2) {
//noinspection CallToPrintStackTrace
e2.printStackTrace();
}
}
}
public boolean isEmpty() {
synchronized (myQueue) {
return myQueue.isEmpty() && !isProcessing;
}
}
/**
* Removes several last tasks in the queue, leaving only {@code remaining} amount of them, counted from the head of the queue.
*/
public void dismissLastTasks(int remaining) {
synchronized (myQueue) {
while (myQueue.size() > remaining) {
myQueue.pollLast();
}
}
}
public boolean hasPendingItemsToProcess() {
synchronized (myQueue) {
return !myQueue.isEmpty();
}
}
private static class MyOverrideEquals {
private final Object myDelegate;
private MyOverrideEquals(@NotNull Object delegate) {
myDelegate = delegate;
}
@Override
public int hashCode() {
return myDelegate.hashCode();
}
@Override
public boolean equals(Object obj) {
return ((MyOverrideEquals)obj).myDelegate == myDelegate;
}
}
public static final class RunnableConsumer implements Consumer<Runnable> {
@Override
public void consume(Runnable runnable) {
runnable.run();
}
}
}