Alexis Drai 2 years ago
parent 18b27ddfa0
commit 2ecf54520a

@ -1,25 +1,31 @@
package com.alexisdrai; package com.alexisdrai;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
public class Listener extends Thread { public class Listener extends Thread {
private final BlockingQueue<Integer> out; private final BlockingQueue<Integer> out;
private final AtomicBoolean running = new AtomicBoolean(false);
public Listener(BlockingQueue<Integer> out) { public Listener(BlockingQueue<Integer> out) {
this.out = out; this.out = out;
} }
public void cleanStop() {
running.set(false);
}
@Override @Override
public void run() { public void run() {
while (!(out.isEmpty())) { running.set(true);
System.out.println("resultat " + out.peek()); while (running.get()) {
// try { try {
// wait(500); System.out.println("resultat " + out.take());
// } catch (InterruptedException e) { } catch (InterruptedException e) {
// Logger.getLogger(Listener.class.getName()).log(Level.SEVERE, null, e); Logger.getLogger(Listener.class.getName()).log(Level.SEVERE, null, e);
// } }
} }
} }
} }

@ -2,50 +2,48 @@ package com.alexisdrai;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import java.util.stream.IntStream;
/**
* @author sasa
*/
public class Pipeline { public class Pipeline {
public static final int PIPE_CAPACITY = 10; public static final int PIPE_CAPACITY = 10;
public static final int STAGES_ABOVE_FIRST = 3; public static final int STAGES = 4;
public static void main(String[] argv) { public static void main(String[] argv) {
BlockingQueue<Integer> first, in, out = null; BlockingQueue<Integer> first = null, out = null, in;
first = in = new LinkedBlockingQueue<>(PIPE_CAPACITY);
for (int i = 0; i < STAGES_ABOVE_FIRST; i++) { Stage[] threads = new Stage[STAGES];
out = new LinkedBlockingQueue<>(PIPE_CAPACITY); for (int i = 0; i < STAGES; i++) {
(new Stage(in, out, -1)).start(); if (first == null) {
first = in = new ArrayBlockingQueue<>(PIPE_CAPACITY);
}
else {
in = out; in = out;
} }
(new Listener(out)).start(); out = new ArrayBlockingQueue<>(PIPE_CAPACITY);
threads[i] = new Stage(in, out);
System.out.println(first.isEmpty()); (threads[i]).start();
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
Logger.getLogger(Pipeline.class.getName()).log(Level.SEVERE, null, ex);
} }
first.add(2); // 6
Listener listener = new Listener(out);
(listener).start();
BlockingQueue<Integer> finalFirst = first;
IntStream.of(2, 5, 7, 12, 64, 53, 1, 7).forEach(n -> {
try { try {
Thread.sleep(1000); Thread.sleep(1000);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
Logger.getLogger(Pipeline.class.getName()).log(Level.SEVERE, null, ex); Logger.getLogger(Pipeline.class.getName()).log(Level.SEVERE, null, ex);
} }
first.add(6); // 10 finalFirst.add(n);
});
try { listener.cleanStop();
Thread.sleep(1000); for (Stage thread : threads) {
} catch (InterruptedException ex) { thread.cleanStop();
Logger.getLogger(Pipeline.class.getName()).log(Level.SEVERE, null, ex);
} }
first.add(3); // 7
} }
} }

@ -1,6 +1,7 @@
package com.alexisdrai; package com.alexisdrai;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -8,23 +9,21 @@ public class Stage extends Thread {
private final BlockingQueue<Integer> in; private final BlockingQueue<Integer> in;
private final BlockingQueue<Integer> out; private final BlockingQueue<Integer> out;
private int number; private int number;
private final AtomicBoolean running = new AtomicBoolean(false);
public Stage(BlockingQueue<Integer> in, BlockingQueue<Integer> out, int number) { public Stage(BlockingQueue<Integer> in, BlockingQueue<Integer> out) {
this.in = in; this.in = in;
this.out = out; this.out = out;
this.number = number; }
public void cleanStop() {
running.set(false);
} }
@Override @Override
public void run() { public void run() {
while (true) { running.set(true);
// while (in.isEmpty()) { while (running.get()) {
// try {
// wait();
// } catch (InterruptedException e) {
// Logger.getLogger(Stage.class.getName()).log(Level.SEVERE, null, e);
// }
// }
try { try {
number = in.take(); number = in.take();
@ -42,7 +41,9 @@ public class Stage extends Thread {
} catch (InterruptedException e) { } catch (InterruptedException e) {
Logger.getLogger(Stage.class.getName()).log(Level.SEVERE, null, e); Logger.getLogger(Stage.class.getName()).log(Level.SEVERE, null, e);
} }
// out.notifyAll(); synchronized (out) {
out.notifyAll();
}
} }
} }
} }

Loading…
Cancel
Save