From 18b27ddfa06c2acf65e23badd53f00a25f5fd6c9 Mon Sep 17 00:00:00 2001 From: Alexis DRAI Date: Fri, 3 Feb 2023 17:33:37 +0100 Subject: [PATCH] ugh --- src/com/alexisdrai/Listener.java | 25 ++++++++++++++++ src/com/alexisdrai/Main.java | 8 ----- src/com/alexisdrai/Pipeline.java | 51 ++++++++++++++++++++++++++++++++ src/com/alexisdrai/Stage.java | 48 ++++++++++++++++++++++++++++++ 4 files changed, 124 insertions(+), 8 deletions(-) create mode 100644 src/com/alexisdrai/Listener.java delete mode 100644 src/com/alexisdrai/Main.java create mode 100644 src/com/alexisdrai/Pipeline.java create mode 100644 src/com/alexisdrai/Stage.java diff --git a/src/com/alexisdrai/Listener.java b/src/com/alexisdrai/Listener.java new file mode 100644 index 0000000..7b629d4 --- /dev/null +++ b/src/com/alexisdrai/Listener.java @@ -0,0 +1,25 @@ +package com.alexisdrai; + +import java.util.concurrent.BlockingQueue; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class Listener extends Thread { + private final BlockingQueue out; + + public Listener(BlockingQueue out) { + this.out = out; + } + + @Override + public void run() { + while (!(out.isEmpty())) { + System.out.println("resultat " + out.peek()); +// try { +// wait(500); +// } catch (InterruptedException e) { +// Logger.getLogger(Listener.class.getName()).log(Level.SEVERE, null, e); +// } + } + } +} diff --git a/src/com/alexisdrai/Main.java b/src/com/alexisdrai/Main.java deleted file mode 100644 index d294541..0000000 --- a/src/com/alexisdrai/Main.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.alexisdrai; - -public class Main { - - public static void main(String[] args) { - // write your code here - } -} diff --git a/src/com/alexisdrai/Pipeline.java b/src/com/alexisdrai/Pipeline.java new file mode 100644 index 0000000..457d324 --- /dev/null +++ b/src/com/alexisdrai/Pipeline.java @@ -0,0 +1,51 @@ +package com.alexisdrai; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * @author sasa + */ +public class Pipeline { + public static final int PIPE_CAPACITY = 10; + public static final int STAGES_ABOVE_FIRST = 3; + + public static void main(String[] argv) { + + BlockingQueue first, in, out = null; + first = in = new LinkedBlockingQueue<>(PIPE_CAPACITY); + for (int i = 0; i < STAGES_ABOVE_FIRST; i++) { + out = new LinkedBlockingQueue<>(PIPE_CAPACITY); + (new Stage(in, out, -1)).start(); + in = out; + } + + (new Listener(out)).start(); + + System.out.println(first.isEmpty()); + + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + Logger.getLogger(Pipeline.class.getName()).log(Level.SEVERE, null, ex); + } + first.add(2); // 6 + + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + Logger.getLogger(Pipeline.class.getName()).log(Level.SEVERE, null, ex); + } + first.add(6); // 10 + + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + Logger.getLogger(Pipeline.class.getName()).log(Level.SEVERE, null, ex); + } + first.add(3); // 7 + } +} \ No newline at end of file diff --git a/src/com/alexisdrai/Stage.java b/src/com/alexisdrai/Stage.java new file mode 100644 index 0000000..365eb9b --- /dev/null +++ b/src/com/alexisdrai/Stage.java @@ -0,0 +1,48 @@ +package com.alexisdrai; + +import java.util.concurrent.BlockingQueue; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class Stage extends Thread { + private final BlockingQueue in; + private final BlockingQueue out; + private int number; + + public Stage(BlockingQueue in, BlockingQueue out, int number) { + this.in = in; + this.out = out; + this.number = number; + } + + @Override + public void run() { + while (true) { +// while (in.isEmpty()) { +// try { +// wait(); +// } catch (InterruptedException e) { +// Logger.getLogger(Stage.class.getName()).log(Level.SEVERE, null, e); +// } +// } + try { + + number = in.take(); + + } catch (InterruptedException e) { + Logger.getLogger(Stage.class.getName()).log(Level.SEVERE, null, e); + } + + number++; + + try { + + out.put(number); + + } catch (InterruptedException e) { + Logger.getLogger(Stage.class.getName()).log(Level.SEVERE, null, e); + } +// out.notifyAll(); + } + } +}