From 2ecf54520ab409d170a1d60685bb9c5c12f09557 Mon Sep 17 00:00:00 2001 From: Alexis DRAI Date: Fri, 3 Feb 2023 18:23:21 +0100 Subject: [PATCH] yay! --- src/com/alexisdrai/Listener.java | 22 +++++++----- src/com/alexisdrai/Pipeline.java | 62 ++++++++++++++++---------------- src/com/alexisdrai/Stage.java | 23 ++++++------ 3 files changed, 56 insertions(+), 51 deletions(-) diff --git a/src/com/alexisdrai/Listener.java b/src/com/alexisdrai/Listener.java index 7b629d4..dd1cbbc 100644 --- a/src/com/alexisdrai/Listener.java +++ b/src/com/alexisdrai/Listener.java @@ -1,25 +1,31 @@ package com.alexisdrai; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; public class Listener extends Thread { private final BlockingQueue out; + private final AtomicBoolean running = new AtomicBoolean(false); public Listener(BlockingQueue out) { this.out = out; } + public void cleanStop() { + running.set(false); + } + @Override - public void run() { - while (!(out.isEmpty())) { - System.out.println("resultat " + out.peek()); -// try { -// wait(500); -// } catch (InterruptedException e) { -// Logger.getLogger(Listener.class.getName()).log(Level.SEVERE, null, e); -// } + public void run() { + running.set(true); + while (running.get()) { + try { + System.out.println("resultat " + out.take()); + } catch (InterruptedException e) { + Logger.getLogger(Listener.class.getName()).log(Level.SEVERE, null, e); + } } } } diff --git a/src/com/alexisdrai/Pipeline.java b/src/com/alexisdrai/Pipeline.java index 457d324..8bdd547 100644 --- a/src/com/alexisdrai/Pipeline.java +++ b/src/com/alexisdrai/Pipeline.java @@ -2,50 +2,48 @@ package com.alexisdrai; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.IntStream; -/** - * @author sasa - */ public class Pipeline { public static final int PIPE_CAPACITY = 10; - public static final int STAGES_ABOVE_FIRST = 3; + public static final int STAGES = 4; public static void main(String[] argv) { - BlockingQueue first, in, out = null; - first = in = new LinkedBlockingQueue<>(PIPE_CAPACITY); - for (int i = 0; i < STAGES_ABOVE_FIRST; i++) { - out = new LinkedBlockingQueue<>(PIPE_CAPACITY); - (new Stage(in, out, -1)).start(); - in = out; - } - - (new Listener(out)).start(); + BlockingQueue first = null, out = null, in; - System.out.println(first.isEmpty()); - - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - Logger.getLogger(Pipeline.class.getName()).log(Level.SEVERE, null, ex); - } - first.add(2); // 6 + Stage[] threads = new Stage[STAGES]; + for (int i = 0; i < STAGES; i++) { + if (first == null) { + first = in = new ArrayBlockingQueue<>(PIPE_CAPACITY); + } + else { + in = out; + } - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - Logger.getLogger(Pipeline.class.getName()).log(Level.SEVERE, null, ex); + out = new ArrayBlockingQueue<>(PIPE_CAPACITY); + threads[i] = new Stage(in, out); + (threads[i]).start(); } - first.add(6); // 10 - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - Logger.getLogger(Pipeline.class.getName()).log(Level.SEVERE, null, ex); + Listener listener = new Listener(out); + (listener).start(); + + BlockingQueue finalFirst = first; + IntStream.of(2, 5, 7, 12, 64, 53, 1, 7).forEach(n -> { + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + Logger.getLogger(Pipeline.class.getName()).log(Level.SEVERE, null, ex); + } + finalFirst.add(n); + }); + + listener.cleanStop(); + for (Stage thread : threads) { + thread.cleanStop(); } - first.add(3); // 7 } } \ No newline at end of file diff --git a/src/com/alexisdrai/Stage.java b/src/com/alexisdrai/Stage.java index 365eb9b..92a70e6 100644 --- a/src/com/alexisdrai/Stage.java +++ b/src/com/alexisdrai/Stage.java @@ -1,6 +1,7 @@ package com.alexisdrai; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -8,23 +9,21 @@ public class Stage extends Thread { private final BlockingQueue in; private final BlockingQueue out; private int number; + private final AtomicBoolean running = new AtomicBoolean(false); - public Stage(BlockingQueue in, BlockingQueue out, int number) { + public Stage(BlockingQueue in, BlockingQueue out) { this.in = in; this.out = out; - this.number = number; + } + + public void cleanStop() { + running.set(false); } @Override public void run() { - while (true) { -// while (in.isEmpty()) { -// try { -// wait(); -// } catch (InterruptedException e) { -// Logger.getLogger(Stage.class.getName()).log(Level.SEVERE, null, e); -// } -// } + running.set(true); + while (running.get()) { try { number = in.take(); @@ -42,7 +41,9 @@ public class Stage extends Thread { } catch (InterruptedException e) { Logger.getLogger(Stage.class.getName()).log(Level.SEVERE, null, e); } -// out.notifyAll(); + synchronized (out) { + out.notifyAll(); + } } } }