001 /*
002 @license.text@
003 */
004
005 package biz.hammurapi.util;
006
007 import java.io.IOException;
008 import java.io.InputStream;
009 import java.io.OutputStream;
010 import java.util.ArrayList;
011 import java.util.List;
012
013 /**
014 *
015 * Copies all data from an input stream to an output stream.
016 * @author Pavel Vlasov
017 * @version $Revision: 1.3 $
018 */
019 public class StreamPumper implements Runnable {
020
021 private final static int SIZE = 1024;
022 private InputStream is;
023 private OutputStream os;
024
025 private ExceptionSink sink;
026
027 private boolean closeStreams=false;
028
029 /**
030 * Create a new stream pumper.
031 *
032 * @param is input stream to read data from
033 * @param os output stream to write data to.
034 */
035 public StreamPumper(InputStream is, OutputStream os, ExceptionSink sink, boolean closeStreams) {
036 this.is = is;
037 this.os = os;
038 this.closeStreams=closeStreams;
039 this.sink=sink;
040 }
041
042 private List listeners=new ArrayList();
043
044 public void addListener(StreamPumpListener listener, int tickSize) {
045 synchronized (listeners) {
046 listeners.add(new StreamPumpListenerEntry(listener, tickSize));
047 }
048 }
049
050 public void removeListener(StreamPumpListener listener) {
051 synchronized (listeners) {
052 listeners.remove(listener);
053 }
054 }
055
056 /**
057 * Copies data from the input stream to the output stream.
058 * Creates a copy of listeners collection before pumping.
059 * addListener() and removeListener() have no effect once pumping has started.
060 * Terminates as soon as the input stream is closed or an error occurs.
061 */
062 public void run() {
063 StreamPumpListenerEntry[] listenersArray;
064 synchronized (listeners) {
065 listenersArray= (StreamPumpListenerEntry[]) listeners.toArray(new StreamPumpListenerEntry[listeners.size()]);
066 }
067
068 for (int i=0; i<listenersArray.length; i++) {
069 listenersArray[i].listener.pumpStarted(this);
070 }
071
072 long counter=0;
073 try {
074 final byte[] buf = new byte[SIZE];
075 int length;
076 while ((length = is.read(buf)) != -1) {
077 os.write(buf, 0, length);
078 counter+=length;
079 for (int i=0; i<listenersArray.length; i++) {
080 listenersArray[i].counter+=length;
081 if (listenersArray[i].counter>=listenersArray[i].tickSize) {
082 listenersArray[i].counter=0;
083 listenersArray[i].listener.tick(this, counter);
084 }
085 }
086 }
087 } catch(IOException e) {
088 handleException(e, listenersArray);
089 } finally {
090 for (int i=0; i<listenersArray.length; i++) {
091 listenersArray[i].listener.pumpFinished(this);
092 }
093 if (closeStreams) {
094 try {
095 is.close();
096 } catch (IOException ie) {
097 handleException(ie, listenersArray);
098 }
099
100 try {
101 os.close();
102 } catch (IOException ie) {
103 handleException(ie, listenersArray);
104 }
105 }
106 }
107 }
108
109 /**
110 * @param e
111 * @param listenersArray
112 */
113 private void handleException(Exception e, StreamPumpListenerEntry[] listenersArray) {
114 for (int i=0; i<listenersArray.length; i++) {
115 listenersArray[i].listener.pumpingError(this, e);
116 }
117
118 if (sink==null) {
119 e.printStackTrace();
120 } else {
121 sink.consume(this, e);
122 }
123 }
124 }