1 module dlangide.builders.extprocess;
2 
3 import dlangui.core.logger;
4 
5 import std.process;
6 import std.stdio;
7 import std.utf;
8 import std.stdio;
9 import core.thread;
10 import core.sync.mutex;
11 import dlangui.core.files;
12 
13 /// interface to forward process output to
14 interface TextWriter {
15     /// log lines
16     void writeText(dstring text);
17 }
18 
19 /// interface to read text
20 interface TextReader {
21     /// log lines
22     dstring readText();
23 }
24 
25 /// protected text storage box to read and write text from different threads
26 class ProtectedTextStorage : TextReader, TextWriter {
27 
28     private Mutex _mutex;
29     private shared bool _closed;
30     private dchar[] _buffer;
31 
32     this() {
33         _mutex = new Mutex();
34     }
35 
36     @property bool closed() { return _closed; }
37 
38     void close() {
39         if (_closed)
40             return;
41         _closed = true;
42         _buffer = null;
43     }
44 
45     /// log lines
46     override void writeText(dstring text) {
47         if (!_closed) {
48             // if not closed
49             _mutex.lock();
50             scope(exit) _mutex.unlock();
51             // append text
52             _buffer ~= text;
53         }
54     }
55 
56     /// log lines
57     override dstring readText() {
58         if (!_closed) {
59             // if not closed
60             _mutex.lock();
61             scope(exit) _mutex.unlock();
62             if (!_buffer.length)
63                 return null;
64             dstring res = _buffer.dup;
65             _buffer = null;
66             return res;
67         } else {
68             // reading from closed
69             return null;
70         }
71     }
72 }
73 
74 enum ExternalProcessState : uint {
75     /// not initialized
76     None,
77     /// running
78     Running,
79     /// stop is requested
80     Stopping,
81     /// stopped
82     Stopped,
83     /// error occured, e.g. cannot run process
84     Error
85 }
86 
87 /// base class for text reading from std.stdio.File in background thread
88 class BackgroundReaderBase : Thread {
89     private std.stdio.File _file;
90     private shared bool _finished;
91     private ubyte[1] _byteBuffer;
92     private ubyte[] _bytes;
93     dchar[] _textbuffer;
94     private int _len;
95     private bool _utfError;
96 
97     this(std.stdio.File f) {
98         super(&run);
99         assert(f.isOpen());
100         _file = f;
101         _len = 0;
102         _finished = false;
103     }
104 
105     @property bool finished() {
106         return _finished;
107     }
108 
109     ubyte prevchar;
110     void addByte(ubyte data) {
111         if (_bytes.length < _len + 1)
112             _bytes.length = _bytes.length ? _bytes.length * 2 : 1024;
113         bool eolchar = (data == '\r' || data == '\n');
114         bool preveol = (prevchar == '\r' || prevchar == '\n');
115         _bytes[_len++] = data;
116         if (data == '\n')
117             flush();
118         //if ((eolchar && !preveol) || (!eolchar && preveol) || data == '\n') {
119         //    //Log.d("Flushing for prevChar=", prevchar, " newChar=", data);
120         //    flush();
121         //}
122         prevchar = data;
123     }
124     void flush() {
125         if (!_len)
126             return;
127         if (_textbuffer.length < _len)
128             _textbuffer.length = _len + 256;
129         size_t count = 0;
130         for(size_t i = 0; i < _len;) {
131             dchar ch = 0;
132             if (_utfError) {
133                 ch = _bytes[i++];
134             } else {
135                 try {
136                     ch = decode(cast(string)_bytes, i);
137                 } catch (UTFException e) {
138                     _utfError = true;
139                     ch = _bytes[i++];
140                     Log.d("non-unicode characters found in output of process");
141                 }
142             }
143             _textbuffer[count++] = ch;
144         }
145         _len = 0;
146 
147         if (!count)
148             return;
149 
150         // fix line endings - must be '\n'
151         count = convertLineEndings(_textbuffer[0..count]);
152 
153         // data is ready to send
154         if (count)
155             sendResult(_textbuffer[0..count].dup);
156     }
157     /// inplace convert line endings to unix format (\n)
158     size_t convertLineEndings(dchar[] text) {
159         size_t src = 0;
160         size_t dst = 0;
161         for(;src < text.length;) {
162             dchar ch = text[src++];
163             dchar nextch = src < text.length ? text[src] : 0;
164             if (ch == '\n') {
165                 if (nextch == '\r')
166                     src++;
167                 text[dst++] = '\n';
168             } else if (ch == '\r') {
169                 if (nextch == '\n')
170                     src++;
171                 text[dst++] = '\n';
172             } else {
173                 text[dst++] = ch;
174             }
175         }
176         return dst;
177     }
178     protected void sendResult(dstring text) {
179         // override to deal with ready data
180     }
181 
182     protected void handleFinish() {
183         // override to do something when thread is finishing
184     }
185 
186     private void run() {
187         //Log.d("BackgroundReaderBase run() enter");
188         // read file by bytes
189         try {
190             version (Windows) {
191                 import core.sys.windows.windows;
192                 // separate version for windows as workaround for hanging rawRead
193                 HANDLE h = _file.windowsHandle;
194                 DWORD bytesRead = 0;
195                 DWORD err;
196                 for (;;) {
197                     BOOL res = ReadFile(h, _byteBuffer.ptr, 1, &bytesRead, null);
198                     if (res) {
199                         if (bytesRead == 1)
200                             addByte(_byteBuffer[0]);
201                     } else {
202                         err = GetLastError();
203                         if (err == ERROR_MORE_DATA) {
204                             if (bytesRead == 1)
205                                 addByte(_byteBuffer[0]);
206                             continue;
207                         }
208                         //if (err == ERROR_BROKEN_PIPE || err = ERROR_INVALID_HANDLE)
209                         break;
210                     }
211                 }
212             } else {
213                 for (;;) {
214                     //Log.d("BackgroundReaderBase run() reading file");
215                     if (_file.eof)
216                         break;
217                     ubyte[] r = _file.rawRead(_byteBuffer);
218                     if (!r.length)
219                         break;
220                     //Log.d("BackgroundReaderBase run() read byte: ", r[0]);
221                     addByte(r[0]);
222                 }
223             }
224             _file.close();
225             flush();
226             //Log.d("BackgroundReaderBase run() closing file");
227             //Log.d("BackgroundReaderBase run() file closed");
228         } catch (Exception e) {
229             //Log.e("Exception occured while reading stream: ", e);
230         }
231         handleFinish();
232         _finished = true;
233         //Log.d("BackgroundReaderBase run() exit");
234     }
235 
236     void waitForFinish() {
237         static if (false) {
238             while (isRunning && !_finished)
239                 Thread.sleep( dur!("msecs")( 10 ) );
240         } else {
241             join(false);
242         }
243     }
244 
245 }
246 
247 /// reader which sends output text to TextWriter (warning: call will be made from background thread)
248 class BackgroundReader : BackgroundReaderBase {
249     protected TextWriter _destination;
250     this(std.stdio.File f, TextWriter destination) {
251         super(f);
252         assert(destination);
253         _destination = destination;
254     }
255     override protected void sendResult(dstring text) {
256         // override to deal with ready data
257         _destination.writeText(text);
258     }
259     override protected void handleFinish() {
260         // remove link to destination to help GC
261         _destination = null;
262     }
263 }
264 
265 /// runs external process, catches output, allows to stop
266 class ExternalProcess {
267 
268     protected char[][] _args;
269     protected char[] _workDir;
270     protected char[] _program;
271     protected string[string] _env;
272     protected TextWriter _stdout;
273     protected TextWriter _stderr;
274     protected BackgroundReader _stdoutReader;
275     protected BackgroundReader _stderrReader;
276     protected ProcessPipes _pipes;
277     protected ExternalProcessState _state;
278 
279     protected int _result;
280 
281     @property ExternalProcessState state() { return _state; }
282     /// returns process result for stopped process
283     @property int result() { return _result; }
284 
285     this() {
286     }
287 
288     ExternalProcessState run(string program, string[]args, string dir, TextWriter stdoutTarget, TextWriter stderrTarget = null) {
289         char[][] arguments;
290         foreach(a; args)
291             arguments ~= a.dup;
292         return run(program.dup, arguments, dir.dup, stdoutTarget, stderrTarget);
293     }
294     ExternalProcessState run(char[] program, char[][]args, char[] dir, TextWriter stdoutTarget, TextWriter stderrTarget = null) {
295         Log.d("ExternalProcess.run ", program, " ", args);
296         _state = ExternalProcessState.None;
297         _program = findExecutablePath(cast(string)program).dup;
298         if (!_program) {
299             _state = ExternalProcessState.Error;
300             Log.e("Executable not found for ", program);
301             return _state;
302         }
303         _args = args;
304         _workDir = dir;
305         _stdout = stdoutTarget;
306         _stderr = stderrTarget;
307         _result = 0;
308         assert(_stdout);
309         Redirect redirect;
310         char[][] params;
311         params ~= _program;
312         params ~= _args;
313         if (!_stderr)
314             redirect = Redirect.stdout | Redirect.stderrToStdout | Redirect.stdin;
315         else
316             redirect = Redirect.all;
317         Log.i("Trying to run program ", _program, " with args ", _args);
318         try {
319             _pipes = pipeProcess(params, redirect, _env, Config.suppressConsole, _workDir);
320             _state = ExternalProcessState.Running;
321             // start readers
322             _stdoutReader = new BackgroundReader(_pipes.stdout, _stdout);
323             _stdoutReader.start();
324             if (_stderr) {
325                 _stderrReader = new BackgroundReader(_pipes.stderr, _stderr);
326                 _stderrReader.start();
327             }
328         } catch (ProcessException e) {
329             Log.e("Cannot run program ", _program, " ", e);
330         } catch (std.stdio.StdioException e) {
331             Log.e("Cannot redirect streams for program ", _program, " ", e);
332         } catch (Throwable e) {
333             Log.e("Exception while trying to run program ", _program, " ", e);
334         }
335         return _state;
336     }
337 
338     protected void waitForReadingCompletion() {
339         try {
340             if (_stdoutReader && !_stdoutReader.finished) {
341                 _pipes.stdout.detach();
342                 //Log.d("waitForReadingCompletion - waiting for stdout");
343                 _stdoutReader.waitForFinish();
344                 //Log.d("waitForReadingCompletion - joined stdout");
345             }
346             _stdoutReader = null;
347         } catch (Exception e) {
348             Log.e("Exception while waiting for stdout reading completion for ", _program, " ", e);
349         }
350         try {
351             if (_stderrReader && !_stderrReader.finished) {
352                 _pipes.stderr.detach();
353                 //Log.d("waitForReadingCompletion - waiting for stderr");
354                 _stderrReader.waitForFinish();
355                 _stderrReader = null;
356                 //Log.d("waitForReadingCompletion - joined stderr");
357             }
358         } catch (Exception e) {
359             Log.e("Exception while waiting for stderr reading completion for ", _program, " ", e);
360         }
361         //Log.d("waitForReadingCompletion - done");
362     }
363 
364     /// polls all available output from process streams
365     ExternalProcessState poll() {
366         //Log.d("ExternalProcess.poll state = ", _state);
367         bool res = true;
368         if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped)
369             return _state;
370         // check for process finishing
371         try {
372             auto pstate = std.process.tryWait(_pipes.pid);
373             if (pstate.terminated) {
374                 _state = ExternalProcessState.Stopped;
375                 _result = pstate.status;
376                 waitForReadingCompletion();
377             }
378         } catch (Exception e) {
379             Log.e("Exception while waiting for process ", _program);
380             _state = ExternalProcessState.Error;
381         }
382         return _state;
383     }
384 
385     /// waits until termination
386     ExternalProcessState wait() {
387         Log.d("ExternalProcess.wait");
388         if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped)
389             return _state;
390         try {
391             _result = std.process.wait(_pipes.pid);
392             _state = ExternalProcessState.Stopped;
393             Log.d("ExternalProcess.wait : waitForReadingCompletion");
394             waitForReadingCompletion();
395         } catch (Exception e) {
396             Log.e("Exception while waiting for process ", _program);
397             _state = ExternalProcessState.Error;
398         }
399         return _state;
400     }
401 
402     /// request process stop
403     ExternalProcessState kill() {
404         Log.d("ExternalProcess.kill");
405         if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped)
406             return _state;
407         if (_state == ExternalProcessState.Running) {
408             std.process.kill(_pipes.pid);
409             _state = ExternalProcessState.Stopping;
410         }
411         return _state;
412     }
413 
414     bool write(string data) {
415         if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) {
416             return false;
417         } else {
418             //Log.d("writing ", data.length, " characters to stdin");
419             _pipes.stdin.write("", data);
420             _pipes.stdin.flush();
421             //_pipes.stdin.close();
422             return true;
423         }
424     }
425 }