1 module dlangide.builders.extprocess;
2 
3 import dlangui.core.logger;
4 
5 import std.process;
6 import std.stdio;
7 import std.utf;
8 import std.stdio;
9 import core.thread;
10 import core.sync.mutex;
11 
12 /// interface to forward process output to
13 interface TextWriter {
14     /// log lines
15     void writeText(dstring text);
16 }
17 
18 /// interface to read text
19 interface TextReader {
20     /// log lines
21     dstring readText();
22 }
23 
24 /// protected text storage box to read and write text from different threads
25 class ProtectedTextStorage : TextReader, TextWriter {
26 
27     private Mutex _mutex;
28     private shared bool _closed;
29     private dchar[] _buffer;
30 
31     this() {
32         _mutex = new Mutex();
33     }
34 
35     @property bool closed() { return _closed; }
36 
37     void close() {
38         if (_closed)
39             return;
40         _closed = true;
41         _buffer = null;
42     }
43 
44     /// log lines
45     override void writeText(dstring text) {
46         if (!_closed) {
47             // if not closed
48             _mutex.lock();
49             scope(exit) _mutex.unlock();
50             // append text
51             _buffer ~= text;
52         }
53     }
54 
55     /// log lines
56     override dstring readText() {
57         if (!_closed) {
58             // if not closed
59             _mutex.lock();
60             scope(exit) _mutex.unlock();
61             if (!_buffer.length)
62                 return null;
63             dstring res = _buffer.dup;
64             _buffer = null;
65             return res;
66         } else {
67             // reading from closed
68             return null;
69         }
70     }
71 }
72 
73 enum ExternalProcessState : uint {
74     /// not initialized
75     None,
76     /// running
77     Running,
78     /// stop is requested
79     Stopping,
80     /// stopped
81     Stopped,
82     /// error occured, e.g. cannot run process
83     Error
84 }
85 
86 /// base class for text reading from std.stdio.File in background thread
87 class BackgroundReaderBase : Thread {
88     private std.stdio.File _file;
89     private shared bool _finished;
90     private ubyte[1] _byteBuffer;
91     private ubyte[] _bytes;
92     dchar[] _textbuffer;
93     private int _len;
94     private bool _utfError;
95 
96     this(std.stdio.File f) {
97         super(&run);
98         assert(f.isOpen());
99         _file = f;
100         _len = 0;
101         _finished = false;
102     }
103 
104     @property bool finished() {
105         return _finished;
106     }
107 
108     ubyte prevchar;
109     void addByte(ubyte data) {
110         if (_bytes.length < _len + 1)
111             _bytes.length = _bytes.length ? _bytes.length * 2 : 1024;
112         bool eolchar = (data == '\r' || data == '\n');
113         bool preveol = (prevchar == '\r' || prevchar == '\n');
114         if ((eolchar && !preveol) || (!eolchar && preveol)) {
115             //Log.d("Flushing for prevChar=", prevchar, " newChar=", data);
116             flush();
117         }
118         _bytes[_len++] = data;
119         prevchar = data;
120     }
121     void flush() {
122         if (!_len)
123             return;
124         if (_textbuffer.length < _len)
125             _textbuffer.length = _len + 256;
126         size_t count = 0;
127         for(size_t i = 0; i < _len;) {
128             dchar ch = 0;
129             if (_utfError) {
130                 ch = _bytes[i++];
131             } else {
132                 try {
133                     ch = decode(cast(string)_bytes, i);
134                 } catch (UTFException e) {
135                     _utfError = true;
136                     ch = _bytes[i++];
137                     Log.d("non-unicode characters found in output of process");
138                 }
139             }
140             _textbuffer[count++] = ch;
141         }
142         _len = 0;
143 
144         if (!count)
145             return;
146 
147         // fix line endings - must be '\n'
148         count = convertLineEndings(_textbuffer[0..count]);
149 
150         // data is ready to send
151         if (count)
152             sendResult(_textbuffer[0..count].dup);
153     }
154     /// inplace convert line endings to unix format (\n)
155     size_t convertLineEndings(dchar[] text) {
156         size_t src = 0;
157         size_t dst = 0;
158         for(;src < text.length;) {
159             dchar ch = text[src++];
160             dchar nextch = src < text.length ? text[src] : 0;
161             if (ch == '\n') {
162                 if (nextch == '\r')
163                     src++;
164                 text[dst++] = '\n';
165             } else if (ch == '\r') {
166                 if (nextch == '\n')
167                     src++;
168                 text[dst++] = '\n';
169             } else {
170                 text[dst++] = ch;
171             }
172         }
173         return dst;
174     }
175     protected void sendResult(dstring text) {
176         // override to deal with ready data
177     }
178 
179     protected void handleFinish() {
180         // override to do something when thread is finishing
181     }
182 
183     private void run() {
184         //Log.d("BackgroundReaderBase run() enter");
185         // read file by bytes
186         try {
187             version (Windows) {
188                 import win32.windows;
189                 // separate version for windows as workaround for hanging rawRead
190                 HANDLE h = _file.windowsHandle;
191                 DWORD bytesRead = 0;
192                 DWORD err;
193                 for (;;) {
194                     BOOL res = ReadFile(h, _byteBuffer.ptr, 1, &bytesRead, null);
195                     if (res) {
196                         if (bytesRead == 1)
197                             addByte(_byteBuffer[0]);
198                     } else {
199                         err = GetLastError();
200                         if (err == ERROR_MORE_DATA) {
201                             if (bytesRead == 1)
202                                 addByte(_byteBuffer[0]);
203                             continue;
204                         }
205                         //if (err == ERROR_BROKEN_PIPE || err = ERROR_INVALID_HANDLE)
206                         break;
207                     }
208                 }
209             } else {
210                 for (;;) {
211                     //Log.d("BackgroundReaderBase run() reading file");
212                     if (_file.eof)
213                         break;
214                     ubyte[] r = _file.rawRead(_byteBuffer);
215                     if (!r.length)
216                         break;
217                     //Log.d("BackgroundReaderBase run() read byte: ", r[0]);
218                     addByte(r[0]);
219                 }
220             }
221             _file.close();
222             flush();
223             //Log.d("BackgroundReaderBase run() closing file");
224             //Log.d("BackgroundReaderBase run() file closed");
225         } catch (Exception e) {
226             //Log.e("Exception occured while reading stream: ", e);
227         }
228         handleFinish();
229         _finished = true;
230         //Log.d("BackgroundReaderBase run() exit");
231     }
232 
233     void waitForFinish() {
234         static if (false) {
235             while (isRunning && !_finished)
236                 Thread.sleep( dur!("msecs")( 10 ) );
237         } else {
238             join(false);
239         }
240     }
241 
242 }
243 
244 /// reader which sends output text to TextWriter (warning: call will be made from background thread)
245 class BackgroundReader : BackgroundReaderBase {
246     protected TextWriter _destination;
247     this(std.stdio.File f, TextWriter destination) {
248         super(f);
249         assert(destination);
250         _destination = destination;
251     }
252     override protected void sendResult(dstring text) {
253         // override to deal with ready data
254         _destination.writeText(text);
255     }
256     override protected void handleFinish() {
257         // remove link to destination to help GC
258         _destination = null;
259     }
260 }
261 
262 /// runs external process, catches output, allows to stop
263 class ExternalProcess {
264 
265     protected char[][] _args;
266     protected char[] _workDir;
267     protected char[] _program;
268     protected string[string] _env;
269     protected TextWriter _stdout;
270     protected TextWriter _stderr;
271     protected BackgroundReader _stdoutReader;
272     protected BackgroundReader _stderrReader;
273     protected ProcessPipes _pipes;
274     protected ExternalProcessState _state;
275 
276     protected int _result;
277 
278     @property ExternalProcessState state() { return _state; }
279     /// returns process result for stopped process
280     @property int result() { return _result; }
281 
282     this() {
283     }
284 
285     ExternalProcessState run(string program, string[]args, string dir, TextWriter stdoutTarget, TextWriter stderrTarget = null) {
286         char[][] arguments;
287         foreach(a; args)
288             arguments ~= a.dup;
289         return run(program.dup, arguments, dir.dup, stdoutTarget, stderrTarget);
290     }
291     ExternalProcessState run(char[] program, char[][]args, char[] dir, TextWriter stdoutTarget, TextWriter stderrTarget = null) {
292         Log.d("ExternalProcess.run ", program, " ", args);
293         _state = ExternalProcessState.None;
294         _program = program;
295         _args = args;
296         _workDir = dir;
297         _stdout = stdoutTarget;
298         _stderr = stderrTarget;
299         _result = 0;
300         assert(_stdout);
301         Redirect redirect;
302         char[][] params;
303         params ~= _program;
304         params ~= _args;
305         if (!_stderr)
306             redirect = Redirect.stdout | Redirect.stderrToStdout | Redirect.stdin;
307         else
308             redirect = Redirect.all;
309         Log.i("Trying to run program ", _program, " with args ", _args);
310         try {
311             _pipes = pipeProcess(params, redirect, _env, Config.suppressConsole, _workDir);
312             _state = ExternalProcessState.Running;
313             // start readers
314             _stdoutReader = new BackgroundReader(_pipes.stdout, _stdout);
315             _stdoutReader.start();
316             if (_stderr) {
317                 _stderrReader = new BackgroundReader(_pipes.stderr, _stderr);
318                 _stderrReader.start();
319             }
320         } catch (ProcessException e) {
321             Log.e("Cannot run program ", _program, " ", e);
322         } catch (std.stdio.StdioException e) {
323             Log.e("Cannot redirect streams for program ", _program, " ", e);
324         }
325         return _state;
326     }
327 
328     protected void waitForReadingCompletion() {
329         try {
330             if (_stdoutReader && !_stdoutReader.finished) {
331                 _pipes.stdout.detach();
332                 //Log.d("waitForReadingCompletion - waiting for stdout");
333                 _stdoutReader.waitForFinish();
334                 //Log.d("waitForReadingCompletion - joined stdout");
335             }
336             _stdoutReader = null;
337         } catch (Exception e) {
338             Log.e("Exception while waiting for stdout reading completion for ", _program, " ", e);
339         }
340         try {
341             if (_stderrReader && !_stderrReader.finished) {
342                 _pipes.stderr.detach();
343                 //Log.d("waitForReadingCompletion - waiting for stderr");
344                 _stderrReader.waitForFinish();
345                 _stderrReader = null;
346                 //Log.d("waitForReadingCompletion - joined stderr");
347             }
348         } catch (Exception e) {
349             Log.e("Exception while waiting for stderr reading completion for ", _program, " ", e);
350         }
351         //Log.d("waitForReadingCompletion - done");
352     }
353 
354     /// polls all available output from process streams
355     ExternalProcessState poll() {
356         //Log.d("ExternalProcess.poll state = ", _state);
357         bool res = true;
358         if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped)
359             return _state;
360         // check for process finishing
361         try {
362             auto pstate = std.process.tryWait(_pipes.pid);
363             if (pstate.terminated) {
364                 _state = ExternalProcessState.Stopped;
365                 _result = pstate.status;
366                 waitForReadingCompletion();
367             }
368         } catch (Exception e) {
369             Log.e("Exception while waiting for process ", _program);
370             _state = ExternalProcessState.Error;
371         }
372         return _state;
373     }
374 
375     /// waits until termination
376     ExternalProcessState wait() {
377         Log.d("ExternalProcess.wait");
378         if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped)
379             return _state;
380         try {
381             _result = std.process.wait(_pipes.pid);
382             _state = ExternalProcessState.Stopped;
383             waitForReadingCompletion();
384         } catch (Exception e) {
385             Log.e("Exception while waiting for process ", _program);
386             _state = ExternalProcessState.Error;
387         }
388         return _state;
389     }
390 
391     /// request process stop
392     ExternalProcessState kill() {
393         Log.d("ExternalProcess.kill");
394         if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped)
395             return _state;
396         if (_state == ExternalProcessState.Running) {
397             std.process.kill(_pipes.pid);
398             _state = ExternalProcessState.Stopping;
399         }
400         return _state;
401     }
402 
403     bool write(string data) {
404         if(_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) {
405             return false;
406         }
407         else {
408             Log.d("writing ", data.length, " characters to stdin");
409             _pipes.stdin.write("", data);
410 			_pipes.stdin.flush();
411             //_pipes.stdin.close();
412 			return true;
413         }
414     }
415 }