1 module dlangide.builders.extprocess; 2 3 import dlangui.core.logger; 4 5 import std.process; 6 import std.stdio; 7 import std.utf; 8 import std.stdio; 9 import core.thread; 10 import core.sync.mutex; 11 import dlangui.core.files; 12 13 /// interface to forward process output to 14 interface TextWriter { 15 /// log lines 16 void writeText(dstring text); 17 } 18 19 /// interface to read text 20 interface TextReader { 21 /// log lines 22 dstring readText(); 23 } 24 25 /// protected text storage box to read and write text from different threads 26 class ProtectedTextStorage : TextReader, TextWriter { 27 28 private Mutex _mutex; 29 private shared bool _closed; 30 private dchar[] _buffer; 31 32 this() { 33 _mutex = new Mutex(); 34 } 35 36 @property bool closed() { return _closed; } 37 38 void close() { 39 if (_closed) 40 return; 41 _closed = true; 42 _buffer = null; 43 } 44 45 /// log lines 46 override void writeText(dstring text) { 47 if (!_closed) { 48 // if not closed 49 _mutex.lock(); 50 scope(exit) _mutex.unlock(); 51 // append text 52 _buffer ~= text; 53 } 54 } 55 56 /// log lines 57 override dstring readText() { 58 if (!_closed) { 59 // if not closed 60 _mutex.lock(); 61 scope(exit) _mutex.unlock(); 62 if (!_buffer.length) 63 return null; 64 dstring res = _buffer.dup; 65 _buffer = null; 66 return res; 67 } else { 68 // reading from closed 69 return null; 70 } 71 } 72 } 73 74 enum ExternalProcessState : uint { 75 /// not initialized 76 None, 77 /// running 78 Running, 79 /// stop is requested 80 Stopping, 81 /// stopped 82 Stopped, 83 /// error occured, e.g. cannot run process 84 Error 85 } 86 87 /// base class for text reading from std.stdio.File in background thread 88 class BackgroundReaderBase : Thread { 89 private std.stdio.File _file; 90 private shared bool _finished; 91 private ubyte[1] _byteBuffer; 92 private ubyte[] _bytes; 93 dchar[] _textbuffer; 94 private int _len; 95 private bool _utfError; 96 97 this(std.stdio.File f) { 98 super(&run); 99 assert(f.isOpen()); 100 _file = f; 101 _len = 0; 102 _finished = false; 103 } 104 105 @property bool finished() { 106 return _finished; 107 } 108 109 ubyte prevchar; 110 void addByte(ubyte data) { 111 if (_bytes.length < _len + 1) 112 _bytes.length = _bytes.length ? _bytes.length * 2 : 1024; 113 bool eolchar = (data == '\r' || data == '\n'); 114 bool preveol = (prevchar == '\r' || prevchar == '\n'); 115 _bytes[_len++] = data; 116 if (data == '\n') 117 flush(); 118 //if ((eolchar && !preveol) || (!eolchar && preveol) || data == '\n') { 119 // //Log.d("Flushing for prevChar=", prevchar, " newChar=", data); 120 // flush(); 121 //} 122 prevchar = data; 123 } 124 void flush() { 125 if (!_len) 126 return; 127 if (_textbuffer.length < _len) 128 _textbuffer.length = _len + 256; 129 size_t count = 0; 130 for(size_t i = 0; i < _len;) { 131 dchar ch = 0; 132 if (_utfError) { 133 ch = _bytes[i++]; 134 } else { 135 try { 136 ch = decode(cast(string)_bytes, i); 137 } catch (UTFException e) { 138 _utfError = true; 139 ch = _bytes[i++]; 140 Log.d("non-unicode characters found in output of process"); 141 } 142 } 143 _textbuffer[count++] = ch; 144 } 145 _len = 0; 146 147 if (!count) 148 return; 149 150 // fix line endings - must be '\n' 151 count = convertLineEndings(_textbuffer[0..count]); 152 153 // data is ready to send 154 if (count) 155 sendResult(_textbuffer[0..count].dup); 156 } 157 /// inplace convert line endings to unix format (\n) 158 size_t convertLineEndings(dchar[] text) { 159 size_t src = 0; 160 size_t dst = 0; 161 for(;src < text.length;) { 162 dchar ch = text[src++]; 163 dchar nextch = src < text.length ? text[src] : 0; 164 if (ch == '\n') { 165 if (nextch == '\r') 166 src++; 167 text[dst++] = '\n'; 168 } else if (ch == '\r') { 169 if (nextch == '\n') 170 src++; 171 text[dst++] = '\n'; 172 } else { 173 text[dst++] = ch; 174 } 175 } 176 return dst; 177 } 178 protected void sendResult(dstring text) { 179 // override to deal with ready data 180 } 181 182 protected void handleFinish() { 183 // override to do something when thread is finishing 184 } 185 186 private void run() { 187 //Log.d("BackgroundReaderBase run() enter"); 188 // read file by bytes 189 try { 190 version (Windows) { 191 import core.sys.windows.windows; 192 // separate version for windows as workaround for hanging rawRead 193 HANDLE h = _file.windowsHandle; 194 DWORD bytesRead = 0; 195 DWORD err; 196 for (;;) { 197 BOOL res = ReadFile(h, _byteBuffer.ptr, 1, &bytesRead, null); 198 if (res) { 199 if (bytesRead == 1) 200 addByte(_byteBuffer[0]); 201 } else { 202 err = GetLastError(); 203 if (err == ERROR_MORE_DATA) { 204 if (bytesRead == 1) 205 addByte(_byteBuffer[0]); 206 continue; 207 } 208 //if (err == ERROR_BROKEN_PIPE || err = ERROR_INVALID_HANDLE) 209 break; 210 } 211 } 212 } else { 213 for (;;) { 214 //Log.d("BackgroundReaderBase run() reading file"); 215 if (_file.eof) 216 break; 217 ubyte[] r = _file.rawRead(_byteBuffer); 218 if (!r.length) 219 break; 220 //Log.d("BackgroundReaderBase run() read byte: ", r[0]); 221 addByte(r[0]); 222 } 223 } 224 _file.close(); 225 flush(); 226 //Log.d("BackgroundReaderBase run() closing file"); 227 //Log.d("BackgroundReaderBase run() file closed"); 228 } catch (Exception e) { 229 //Log.e("Exception occured while reading stream: ", e); 230 } 231 handleFinish(); 232 _finished = true; 233 //Log.d("BackgroundReaderBase run() exit"); 234 } 235 236 void waitForFinish() { 237 static if (false) { 238 while (isRunning && !_finished) 239 Thread.sleep( dur!("msecs")( 10 ) ); 240 } else { 241 join(false); 242 } 243 } 244 245 } 246 247 /// reader which sends output text to TextWriter (warning: call will be made from background thread) 248 class BackgroundReader : BackgroundReaderBase { 249 protected TextWriter _destination; 250 this(std.stdio.File f, TextWriter destination) { 251 super(f); 252 assert(destination); 253 _destination = destination; 254 } 255 override protected void sendResult(dstring text) { 256 // override to deal with ready data 257 _destination.writeText(text); 258 } 259 override protected void handleFinish() { 260 // remove link to destination to help GC 261 _destination = null; 262 } 263 } 264 265 /// runs external process, catches output, allows to stop 266 class ExternalProcess { 267 268 protected char[][] _args; 269 protected char[] _workDir; 270 protected char[] _program; 271 protected string[string] _env; 272 protected TextWriter _stdout; 273 protected TextWriter _stderr; 274 protected BackgroundReader _stdoutReader; 275 protected BackgroundReader _stderrReader; 276 protected ProcessPipes _pipes; 277 protected ExternalProcessState _state; 278 279 protected int _result; 280 281 @property ExternalProcessState state() { return _state; } 282 /// returns process result for stopped process 283 @property int result() { return _result; } 284 285 this() { 286 } 287 288 ExternalProcessState run(string program, string[]args, string dir, TextWriter stdoutTarget, TextWriter stderrTarget = null) { 289 char[][] arguments; 290 foreach(a; args) 291 arguments ~= a.dup; 292 return run(program.dup, arguments, dir.dup, stdoutTarget, stderrTarget); 293 } 294 ExternalProcessState run(char[] program, char[][]args, char[] dir, TextWriter stdoutTarget, TextWriter stderrTarget = null) { 295 Log.d("ExternalProcess.run ", program, " ", args); 296 _state = ExternalProcessState.None; 297 _program = findExecutablePath(cast(string)program).dup; 298 if (!_program) { 299 _state = ExternalProcessState.Error; 300 Log.e("Executable not found for ", program); 301 return _state; 302 } 303 _args = args; 304 _workDir = dir; 305 _stdout = stdoutTarget; 306 _stderr = stderrTarget; 307 _result = 0; 308 assert(_stdout); 309 Redirect redirect; 310 char[][] params; 311 params ~= _program; 312 params ~= _args; 313 if (!_stderr) 314 redirect = Redirect.stdout | Redirect.stderrToStdout | Redirect.stdin; 315 else 316 redirect = Redirect.all; 317 Log.i("Trying to run program ", _program, " with args ", _args); 318 try { 319 _pipes = pipeProcess(params, redirect, _env, Config.suppressConsole, _workDir); 320 _state = ExternalProcessState.Running; 321 // start readers 322 _stdoutReader = new BackgroundReader(_pipes.stdout, _stdout); 323 _stdoutReader.start(); 324 if (_stderr) { 325 _stderrReader = new BackgroundReader(_pipes.stderr, _stderr); 326 _stderrReader.start(); 327 } 328 } catch (ProcessException e) { 329 Log.e("Cannot run program ", _program, " ", e); 330 } catch (std.stdio.StdioException e) { 331 Log.e("Cannot redirect streams for program ", _program, " ", e); 332 } catch (Throwable e) { 333 Log.e("Exception while trying to run program ", _program, " ", e); 334 } 335 return _state; 336 } 337 338 protected void waitForReadingCompletion() { 339 try { 340 if (_stdoutReader && !_stdoutReader.finished) { 341 _pipes.stdout.detach(); 342 //Log.d("waitForReadingCompletion - waiting for stdout"); 343 _stdoutReader.waitForFinish(); 344 //Log.d("waitForReadingCompletion - joined stdout"); 345 } 346 _stdoutReader = null; 347 } catch (Exception e) { 348 Log.e("Exception while waiting for stdout reading completion for ", _program, " ", e); 349 } 350 try { 351 if (_stderrReader && !_stderrReader.finished) { 352 _pipes.stderr.detach(); 353 //Log.d("waitForReadingCompletion - waiting for stderr"); 354 _stderrReader.waitForFinish(); 355 _stderrReader = null; 356 //Log.d("waitForReadingCompletion - joined stderr"); 357 } 358 } catch (Exception e) { 359 Log.e("Exception while waiting for stderr reading completion for ", _program, " ", e); 360 } 361 //Log.d("waitForReadingCompletion - done"); 362 } 363 364 /// polls all available output from process streams 365 ExternalProcessState poll() { 366 //Log.d("ExternalProcess.poll state = ", _state); 367 bool res = true; 368 if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) 369 return _state; 370 // check for process finishing 371 try { 372 auto pstate = std.process.tryWait(_pipes.pid); 373 if (pstate.terminated) { 374 _state = ExternalProcessState.Stopped; 375 _result = pstate.status; 376 waitForReadingCompletion(); 377 } 378 } catch (Exception e) { 379 Log.e("Exception while waiting for process ", _program); 380 _state = ExternalProcessState.Error; 381 } 382 return _state; 383 } 384 385 /// waits until termination 386 ExternalProcessState wait() { 387 Log.d("ExternalProcess.wait"); 388 if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) 389 return _state; 390 try { 391 _result = std.process.wait(_pipes.pid); 392 _state = ExternalProcessState.Stopped; 393 Log.d("ExternalProcess.wait : waitForReadingCompletion"); 394 waitForReadingCompletion(); 395 } catch (Exception e) { 396 Log.e("Exception while waiting for process ", _program); 397 _state = ExternalProcessState.Error; 398 } 399 return _state; 400 } 401 402 /// request process stop 403 ExternalProcessState kill() { 404 Log.d("ExternalProcess.kill"); 405 if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) 406 return _state; 407 if (_state == ExternalProcessState.Running) { 408 std.process.kill(_pipes.pid); 409 _state = ExternalProcessState.Stopping; 410 } 411 return _state; 412 } 413 414 bool write(string data) { 415 if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) { 416 return false; 417 } else { 418 //Log.d("writing ", data.length, " characters to stdin"); 419 _pipes.stdin.write("", data); 420 _pipes.stdin.flush(); 421 //_pipes.stdin.close(); 422 return true; 423 } 424 } 425 }