Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import io.reactivex.rxjava4.schedulers.Schedulers;

public class SchedulerTest {
public class SchedulerMainTest {
private static final String DRIFT_USE_NANOTIME = "rxjava4.scheduler.use-nanotime";

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,14 @@ public void shouldUnsubscribeAll() throws InterruptedException {

final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < count; i++) {
final Thread t = new Thread() /* NFI */ {
@Override
public void run() {
try {
start.await();
cd.dispose();
} catch (final InterruptedException e) {
fail(e.getMessage());
}
final Thread t = new Thread(() -> {
try {
start.await();
cd.dispose();
} catch (final InterruptedException e) {
fail(e.getMessage());
}
};
});
t.start();
threads.add(t);
}
Expand Down Expand Up @@ -201,17 +198,14 @@ public void unsubscribeIdempotenceConcurrently()

final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < count; i++) {
final Thread t = new Thread() /* NFI */ {
@Override
public void run() {
try {
start.await();
cd.dispose();
} catch (final InterruptedException e) {
fail(e.getMessage());
}
final Thread t = new Thread(() -> {
try {
start.await();
cd.dispose();
} catch (final InterruptedException e) {
fail(e.getMessage());
}
};
});
t.start();
threads.add(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,16 @@ public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscriptionConcur

final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < count; i++) {
final Thread t = new Thread() /* NFI */ {
@Override
public void run() {
try {
start.await();
serialDisposable.dispose();
} catch (InterruptedException e) {
fail(e.getMessage());
} finally {
end.countDown();
}
final Thread t = new Thread(() -> {
try {
start.await();
serialDisposable.dispose();
} catch (InterruptedException e) {
fail(e.getMessage());
} finally {
end.countDown();
}
};
});
t.start();
threads.add(t);
}
Expand Down Expand Up @@ -174,19 +171,16 @@ public void concurrentSetDisposableShouldNotInterleave()
final Disposable subscription = mock(Disposable.class);
subscriptions.add(subscription);

final Thread t = new Thread() /* NFI */ {
@Override
public void run() {
try {
start.await();
serialDisposable.update(subscription);
} catch (InterruptedException e) {
fail(e.getMessage());
} finally {
end.countDown();
}
final Thread t = new Thread(() -> {
try {
start.await();
serialDisposable.update(subscription);
} catch (InterruptedException e) {
fail(e.getMessage());
} finally {
end.countDown();
}
};
});
t.start();
threads.add(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,16 @@ public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscriptionConcur

final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < count; i++) {
final Thread t = new Thread() /* NFI */ {
@Override
public void run() {
try {
start.await();
serialDisposable.dispose();
} catch (InterruptedException e) {
fail(e.getMessage());
} finally {
end.countDown();
}
final Thread t = new Thread(() -> {
try {
start.await();
serialDisposable.dispose();
} catch (InterruptedException e) {
fail(e.getMessage());
} finally {
end.countDown();
}
};
});
t.start();
threads.add(t);
}
Expand Down Expand Up @@ -174,19 +171,16 @@ public void concurrentSetDisposableShouldNotInterleave()
final Disposable subscription = mock(Disposable.class);
subscriptions.add(subscription);

final Thread t = new Thread() /* NFI */ {
@Override
public void run() {
try {
start.await();
serialDisposable.set(subscription);
} catch (InterruptedException e) {
fail(e.getMessage());
} finally {
end.countDown();
}
final Thread t = new Thread(() -> {
try {
start.await();
serialDisposable.set(subscription);
} catch (InterruptedException e) {
fail(e.getMessage());
} finally {
end.countDown();
}
};
});
t.start();
threads.add(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,31 +34,25 @@
public class BlockingFlowableNextTest extends RxJavaTest {

private void fireOnNextInNewThread(final FlowableProcessor<String> o, final String value) {
new Thread() /* NFI */ {
@Override
public void run() {
new Thread(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// ignore
}
o.onNext(value);
}
}.start();
}).start();
}

private void fireOnErrorInNewThread(final FlowableProcessor<String> o) {
new Thread() /* NFI */ {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// ignore
}
o.onError(new TestException());
new Thread(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// ignore
}
}.start();
o.onError(new TestException());
}).start();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ public void disposed() {
@Test
public void noOpConnect() {
final int[] calls = { 0 };
Flowable<Integer> f = new ConnectableFlowable<Integer>() {
Flowable<Integer> f = new ConnectableFlowable<Integer>() /* NFI */ {
@Override
public void connect(Consumer<? super Disposable> connection) {
calls[0]++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ public boolean isDisposed() {

@Test
public void boundedReplayBuffer() {
BoundedReplayBuffer<Integer> buf = new BoundedReplayBuffer<Integer>(true) {
BoundedReplayBuffer<Integer> buf = new BoundedReplayBuffer<Integer>(true) /* NFI */ {
private static final long serialVersionUID = -9081211580719235896L;

@Override
Expand Down Expand Up @@ -995,7 +995,7 @@ public void unsafeChildThrows() {
.doOnNext(_ -> count.getAndIncrement())
.replay().autoConnect();

TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() /* NFI */ {
@Override
public void onNext(Integer t) {
throw new TestException();
Expand Down Expand Up @@ -1257,7 +1257,7 @@ public void connectConsumerThrows() {
public void badSource() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
new Flowable<Integer>() {
new Flowable<Integer>() /* NFI */ {
@Override
protected void subscribeActual(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new BooleanSubscription());
Expand Down Expand Up @@ -1342,7 +1342,7 @@ public void unsubscribeReplayRace() {
public void reentrantOnNext() {
final PublishProcessor<Integer> pp = PublishProcessor.create();

TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() /* NFI */ {
@Override
public void onNext(Integer t) {
if (t == 1) {
Expand All @@ -1364,7 +1364,7 @@ public void onNext(Integer t) {
public void reentrantOnNextBound() {
final PublishProcessor<Integer> pp = PublishProcessor.create();

TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() /* NFI */ {
@Override
public void onNext(Integer t) {
if (t == 1) {
Expand All @@ -1386,7 +1386,7 @@ public void onNext(Integer t) {
public void reentrantOnNextCancel() {
final PublishProcessor<Integer> pp = PublishProcessor.create();

TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() /* NFI */ {
@Override
public void onNext(Integer t) {
if (t == 1) {
Expand All @@ -1408,7 +1408,7 @@ public void onNext(Integer t) {
public void reentrantOnNextCancelBounded() {
final PublishProcessor<Integer> pp = PublishProcessor.create();

TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() /* NFI */ {
@Override
public void onNext(Integer t) {
if (t == 1) {
Expand Down Expand Up @@ -1517,7 +1517,7 @@ public void sizedTruncation() {
public void delayedUpstreamOnSubscribe() {
final Subscriber<?>[] sub = { null };

new Flowable<Integer>() {
new Flowable<Integer>() /* NFI */ {
@Override
protected void subscribeActual(Subscriber<? super Integer> s) {
sub[0] = s;
Expand Down
Loading