blob: 876474b9124b412fe5b09c8e1876ee56bebc2459 [file] [log] [blame]
* Copyright 2000-2014 JetBrains s.r.o.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package com.intellij.concurrency;
import com.intellij.openapi.application.ApplicationManager;
import com.intellij.openapi.progress.ProcessCanceledException;
import com.intellij.openapi.progress.ProgressIndicator;
import com.intellij.openapi.progress.ProgressManager;
import com.intellij.openapi.progress.util.AbstractProgressIndicatorBase;
import com.intellij.util.Consumer;
import com.intellij.util.IncorrectOperationException;
import com.intellij.util.Processor;
import jsr166e.ForkJoinPool;
import jsr166e.ForkJoinTask;
import jsr166e.ForkJoinWorkerThread;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
* @author cdr
public class JobLauncherImpl extends JobLauncher {
private static final AtomicLong bits = new AtomicLong();
private static final ForkJoinPool.ForkJoinWorkerThreadFactory FACTORY = new ForkJoinPool.ForkJoinWorkerThreadFactory() {
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
final int n = addThread();
ForkJoinWorkerThread thread = new ForkJoinWorkerThread(pool) {
protected void onTermination(Throwable exception) {
thread.setName("JobScheduler FJ pool "+ n +"/"+ JobSchedulerImpl.CORES_COUNT);
return thread;
private int addThread() {
boolean set;
int n;
do {
long l = bits.longValue();
long next = (l + 1) | l;
n = Long.numberOfTrailingZeros(l + 1);
set = bits.compareAndSet(l, next);
} while (!set);
return n;
private void finishThread(int n) {
boolean set;
do {
long l = bits.get();
long next = l & ~(1L << n);
set = bits.compareAndSet(l, next);
} while (!set);
private static final ForkJoinPool pool = new ForkJoinPool(JobSchedulerImpl.CORES_COUNT, FACTORY, null, false);
static final int CORES_FORK_THRESHOLD = 1;
private static <T> boolean invokeConcurrentlyForAll(@NotNull final List<T> things,
boolean runInReadAction,
@NotNull final Processor<? super T> thingProcessor,
@NotNull ProgressIndicator wrapper) throws ProcessCanceledException {
ApplierCompleter applier = new ApplierCompleter(null, runInReadAction, wrapper, things, thingProcessor, 0, things.size(), null);
try {
if (applier.throwable != null) throw applier.throwable;
catch (ApplierCompleter.ComputationAbortedException e) {
return false;
catch (RuntimeException e) {
throw e;
catch (Error e) {
throw e;
catch (Throwable e) {
throw new RuntimeException(e);
assert applier.isDone();
return applier.completeTaskWhichFailToAcquireReadAction();
public <T> boolean invokeConcurrentlyUnderProgress(@NotNull List<? extends T>things,
ProgressIndicator progress,
boolean failFastOnAcquireReadAction,
@NotNull final Processor<T> thingProcessor) throws ProcessCanceledException {
return invokeConcurrentlyUnderProgress(things, progress, ApplicationManager.getApplication().isReadAccessAllowed(),
failFastOnAcquireReadAction, thingProcessor);
public <T> boolean invokeConcurrentlyUnderProgress(@NotNull final List<? extends T> things,
ProgressIndicator progress,
boolean runInReadAction,
boolean failFastOnAcquireReadAction,
@NotNull final Processor<T> thingProcessor) throws ProcessCanceledException {
if (things.isEmpty()) return true;
// supply our own indicator even if we haven't given one - to support cancellation
final ProgressIndicator wrapper = progress == null ? new AbstractProgressIndicatorBase() : new SensitiveProgressWrapper(progress);
if (things.size() <= 1 || JobSchedulerImpl.CORES_COUNT <= CORES_FORK_THRESHOLD) {
final AtomicBoolean result = new AtomicBoolean(true);
ProgressManager.getInstance().executeProcessUnderProgress(new Runnable() {
public void run() {
//noinspection ForLoopReplaceableByForEach
for (int i = 0; i < things.size(); i++) {
T thing = things.get(i);
if (!thingProcessor.process(thing)) {
}, wrapper);
return result.get();
return invokeConcurrentlyForAll(things, runInReadAction, thingProcessor, wrapper);
// This implementation is not really async
public <T> AsyncFuture<Boolean> invokeConcurrentlyUnderProgressAsync(@NotNull List<? extends T> things,
ProgressIndicator progress,
boolean failFastOnAcquireReadAction,
@NotNull Processor<T> thingProcessor) {
return AsyncUtil.wrapBoolean(invokeConcurrentlyUnderProgress(things, progress, failFastOnAcquireReadAction, thingProcessor));
public Job<Void> submitToJobThread(int priority, @NotNull final Runnable action, final Consumer<Future> onDoneCallback) {
VoidForkJoinTask task = new VoidForkJoinTask(action, onDoneCallback);
return task;
private static class VoidForkJoinTask extends ForkJoinTask<Void> implements Job<Void> {
private final Runnable myAction;
private final Consumer<Future> myOnDoneCallback;
public VoidForkJoinTask(@NotNull Runnable action, @Nullable Consumer<Future> onDoneCallback) {
myAction = action;
myOnDoneCallback = onDoneCallback;
public Void getRawResult() {
return null;
protected void setRawResult(Void value) {
protected boolean exec() {
try {;
complete(null); // complete manually before calling callback
catch (Throwable throwable) {
finally {
if (myOnDoneCallback != null) {
return true;
//////////////// Job
public String getTitle() {
throw new IncorrectOperationException();
public boolean isCanceled() {
return isCancelled();
public void addTask(@NotNull Callable<Void> task) {
throw new IncorrectOperationException();
public void addTask(@NotNull Runnable task, Void result) {
throw new IncorrectOperationException();
public void addTask(@NotNull Runnable task) {
throw new IncorrectOperationException();
public List<Void> scheduleAndWaitForResults() throws Throwable {
throw new IncorrectOperationException();
public void cancel() {
public void schedule() {
throw new IncorrectOperationException();
public void waitForCompletion(int millis) throws InterruptedException, ExecutionException, TimeoutException, CancellationException {
get(millis, TimeUnit.MILLISECONDS);