我正在编写一个程序,该程序使用多线程概念来实现Java中的Producer Consumer问题.以下是一些我应该怎么做的详细信息:
I’m writing a program that implements the Producer Consumer problem in Java using multithreading concepts. Below are few details how I’m supposed to do it:
1)主线程应创建一个缓冲区,其容量指定为命令行参数.生产者线程和使用者线程的数量也指定为命令行参数.我应该为每个生产者和使用者线程分配一个唯一的编号.如何为生产者线程和使用者线程分配唯一编号?
1) The main thread should create a buffer with capacity specified as a command line argument. The number of producer and consumer threads are also specified as command line arguments. I’m supposed to assign a unique number to each producer and consumer thread. How do I assign a unique number to producer and consumer threads?
2)生产者线程在无限循环中运行.它产生具有以下格式的数据项(字符串):<producer number>_<data item number>.例如,线程1的第一个数据项将是1_1,线程3的第二个数据项将是3_2.如何以这种格式创建数据项?
2) The producer thread operates in an infinite loop. It produces a data item (a string) with the following format: <producer number>_<data item number>. For example the 1st data item from thread number 1 will be 1_1 and second data item from thread number 3 will be 3_2. How do create data items in such a format?
3)然后,生产者线程将一个条目写入生产者日志文件("Generated" <data item>).写入日志条目后,它将尝试插入缓冲区.如果插入成功,它将在日志文件中创建一个条目(<producer number> <data item>插入成功").我该如何编写这样的代码?
3) Then the Producer thread writes an entry into the producer log file (< producer number > "Generated" <data item>). Upon writing the log entry, it attempts to insert into the buffer. If insertion is successful, it creates an entry into the log file (<producer number> <data item> "Insertion successful"). How do I write such a code?
下面是我编写的Java代码.
Below is the Java code I wrote.
import java.util.*; import java.util.logging.*; public class PC2 { public static void main(String args[]) { ArrayList<Integer> queue = new ArrayList<Integer>(); int size = Integer.parseInt(args[2]); Thread[] prod = new Thread[Integer.parseInt(args[0])]; Thread[] cons = new Thread[Integer.parseInt(args[1])]; for(int i=0; i<prod.length; i++) { prod[i] = new Thread(new Producer(queue, size)); prod[i].start(); } for(int i=0; i<cons.length; i++) { cons[i] = new Thread(new Consumer(queue, size)); cons[i].start(); } } } class Producer extends Thread { private final ArrayList<Integer> queue; private final int size; public Producer(ArrayList<Integer> queue, int size) { this.queue = queue; this.size = size; } public void run() { while(true){ for(int i=0; i<size; i++) { System.out.println("Produced: "+i+" by id " +Thread.currentThread().getId()); try { produce(i); Thread.sleep(3000); } catch(Exception e) { Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, e); } }} } public void produce(int i) throws InterruptedException { while(queue.size() == size) { synchronized(queue) { System.out.println("Queue is full "+Thread.currentThread().getName() +" is waiting, size: "+queue.size()); queue.wait(); } } synchronized(queue) { queue.add(i); queue.notifyAll(); } } } class Consumer extends Thread { private final ArrayList<Integer> queue; private final int size; public Consumer(ArrayList<Integer> queue, int size) { this.queue = queue; this.size = size; } public void run() { while(true) { try { System.out.println("Consumed: "+consume()); Thread.sleep(1000); } catch(Exception e) { Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, e); } } } public int consume() throws InterruptedException { while(queue.isEmpty()) { synchronized(queue) { System.out.println("Queue is empty "+Thread.currentThread().getName()+" is waiting, size: "+queue.size()); queue.wait(); } } synchronized (queue) { queue.notifyAll(); System.out.println("Consumed by id "+Thread.currentThread().getId()); return (Integer) queue.remove(0); } } }如何执行上述步骤?
推荐答案对于生产者消费者问题,最佳解决方案是BlockingQueue.我正在测试一些东西,因此设计相同的程序,现在可以根据您的需要对其进行修改.
For producer consumer problem best solution is BlockingQueue. I was testing a few things so designed same kind of program now modified it as per your need.
看看是否有帮助.
import java.util.concurrent.*; public class ThreadingExample { public static void main(String args[]){ BlockingQueue<Message> blockingQueue = new ArrayBlockingQueue<Message>(100); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new Producer(blockingQueue)); exec.execute(new Consumer(blockingQueue)); } } class Message{ private static int count=0; int messageId; Message(){ this.messageId=count++; System.out.print("message Id"+messageId+" Created "); } } class Producer implements Runnable{ private BlockingQueue<Message> blockingQueue; Producer(BlockingQueue<Message> blockingQueue){ this.blockingQueue=blockingQueue; } @Override public void run(){ while(!Thread.interrupted()){ System.out.print("Producer Started"); try { blockingQueue.put(new Message()); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Producer Done"); } } } class Consumer implements Runnable{ private BlockingQueue<Message> blockingQueue; Consumer(BlockingQueue<Message> blockingQueue){ this.blockingQueue=blockingQueue; } @Override public void run(){ while(!Thread.interrupted()){ System.out.print("Concumer Started"); try{ Message message = blockingQueue.take(); System.out.print("message Id"+message.messageId+" Consumed "); } catch(InterruptedException e){ e.printStackTrace(); } System.out.println("Concumer Done"); } } }更多推荐
生产者使用者使用线程
发布评论