1 module dlangide.builders.extprocess; 2 3 import dlangui.core.logger; 4 5 import std.process; 6 import std.stdio; 7 import std.utf; 8 import std.stdio; 9 import core.thread; 10 import core.sync.mutex; 11 12 /// interface to forward process output to 13 interface TextWriter { 14 /// log lines 15 void writeText(dstring text); 16 } 17 18 /// interface to read text 19 interface TextReader { 20 /// log lines 21 dstring readText(); 22 } 23 24 /// protected text storage box to read and write text from different threads 25 class ProtectedTextStorage : TextReader, TextWriter { 26 27 private Mutex _mutex; 28 private shared bool _closed; 29 private dchar[] _buffer; 30 31 this() { 32 _mutex = new Mutex(); 33 } 34 35 @property bool closed() { return _closed; } 36 37 void close() { 38 if (_closed) 39 return; 40 _closed = true; 41 _buffer = null; 42 } 43 44 /// log lines 45 override void writeText(dstring text) { 46 if (!_closed) { 47 // if not closed 48 _mutex.lock(); 49 scope(exit) _mutex.unlock(); 50 // append text 51 _buffer ~= text; 52 } 53 } 54 55 /// log lines 56 override dstring readText() { 57 if (!_closed) { 58 // if not closed 59 _mutex.lock(); 60 scope(exit) _mutex.unlock(); 61 if (!_buffer.length) 62 return null; 63 dstring res = _buffer.dup; 64 _buffer = null; 65 return res; 66 } else { 67 // reading from closed 68 return null; 69 } 70 } 71 } 72 73 enum ExternalProcessState : uint { 74 /// not initialized 75 None, 76 /// running 77 Running, 78 /// stop is requested 79 Stopping, 80 /// stopped 81 Stopped, 82 /// error occured, e.g. cannot run process 83 Error 84 } 85 86 /// base class for text reading from std.stdio.File in background thread 87 class BackgroundReaderBase : Thread { 88 private std.stdio.File _file; 89 private shared bool _finished; 90 private ubyte[1] _byteBuffer; 91 private ubyte[] _bytes; 92 dchar[] _textbuffer; 93 private int _len; 94 private bool _utfError; 95 96 this(std.stdio.File f) { 97 super(&run); 98 assert(f.isOpen()); 99 _file = f; 100 _len = 0; 101 _finished = false; 102 } 103 104 @property bool finished() { 105 return _finished; 106 } 107 108 ubyte prevchar; 109 void addByte(ubyte data) { 110 if (_bytes.length < _len + 1) 111 _bytes.length = _bytes.length ? _bytes.length * 2 : 1024; 112 bool eolchar = (data == '\r' || data == '\n'); 113 bool preveol = (prevchar == '\r' || prevchar == '\n'); 114 if ((eolchar && !preveol) || (!eolchar && preveol)) { 115 //Log.d("Flushing for prevChar=", prevchar, " newChar=", data); 116 flush(); 117 } 118 _bytes[_len++] = data; 119 prevchar = data; 120 } 121 void flush() { 122 if (!_len) 123 return; 124 if (_textbuffer.length < _len) 125 _textbuffer.length = _len + 256; 126 size_t count = 0; 127 for(size_t i = 0; i < _len;) { 128 dchar ch = 0; 129 if (_utfError) { 130 ch = _bytes[i++]; 131 } else { 132 try { 133 ch = decode(cast(string)_bytes, i); 134 } catch (UTFException e) { 135 _utfError = true; 136 ch = _bytes[i++]; 137 Log.d("non-unicode characters found in output of process"); 138 } 139 } 140 _textbuffer[count++] = ch; 141 } 142 _len = 0; 143 144 if (!count) 145 return; 146 147 // fix line endings - must be '\n' 148 count = convertLineEndings(_textbuffer[0..count]); 149 150 // data is ready to send 151 if (count) 152 sendResult(_textbuffer[0..count].dup); 153 } 154 /// inplace convert line endings to unix format (\n) 155 size_t convertLineEndings(dchar[] text) { 156 size_t src = 0; 157 size_t dst = 0; 158 for(;src < text.length;) { 159 dchar ch = text[src++]; 160 dchar nextch = src < text.length ? text[src] : 0; 161 if (ch == '\n') { 162 if (nextch == '\r') 163 src++; 164 text[dst++] = '\n'; 165 } else if (ch == '\r') { 166 if (nextch == '\n') 167 src++; 168 text[dst++] = '\n'; 169 } else { 170 text[dst++] = ch; 171 } 172 } 173 return dst; 174 } 175 protected void sendResult(dstring text) { 176 // override to deal with ready data 177 } 178 179 protected void handleFinish() { 180 // override to do something when thread is finishing 181 } 182 183 private void run() { 184 //Log.d("BackgroundReaderBase run() enter"); 185 // read file by bytes 186 try { 187 version (Windows) { 188 import win32.windows; 189 // separate version for windows as workaround for hanging rawRead 190 HANDLE h = _file.windowsHandle; 191 DWORD bytesRead = 0; 192 DWORD err; 193 for (;;) { 194 BOOL res = ReadFile(h, _byteBuffer.ptr, 1, &bytesRead, null); 195 if (res) { 196 if (bytesRead == 1) 197 addByte(_byteBuffer[0]); 198 } else { 199 err = GetLastError(); 200 if (err == ERROR_MORE_DATA) { 201 if (bytesRead == 1) 202 addByte(_byteBuffer[0]); 203 continue; 204 } 205 //if (err == ERROR_BROKEN_PIPE || err = ERROR_INVALID_HANDLE) 206 break; 207 } 208 } 209 } else { 210 for (;;) { 211 //Log.d("BackgroundReaderBase run() reading file"); 212 if (_file.eof) 213 break; 214 ubyte[] r = _file.rawRead(_byteBuffer); 215 if (!r.length) 216 break; 217 //Log.d("BackgroundReaderBase run() read byte: ", r[0]); 218 addByte(r[0]); 219 } 220 } 221 _file.close(); 222 flush(); 223 //Log.d("BackgroundReaderBase run() closing file"); 224 //Log.d("BackgroundReaderBase run() file closed"); 225 } catch (Exception e) { 226 //Log.e("Exception occured while reading stream: ", e); 227 } 228 handleFinish(); 229 _finished = true; 230 //Log.d("BackgroundReaderBase run() exit"); 231 } 232 233 void waitForFinish() { 234 static if (false) { 235 while (isRunning && !_finished) 236 Thread.sleep( dur!("msecs")( 10 ) ); 237 } else { 238 join(false); 239 } 240 } 241 242 } 243 244 /// reader which sends output text to TextWriter (warning: call will be made from background thread) 245 class BackgroundReader : BackgroundReaderBase { 246 protected TextWriter _destination; 247 this(std.stdio.File f, TextWriter destination) { 248 super(f); 249 assert(destination); 250 _destination = destination; 251 } 252 override protected void sendResult(dstring text) { 253 // override to deal with ready data 254 _destination.writeText(text); 255 } 256 override protected void handleFinish() { 257 // remove link to destination to help GC 258 _destination = null; 259 } 260 } 261 262 /// runs external process, catches output, allows to stop 263 class ExternalProcess { 264 265 protected char[][] _args; 266 protected char[] _workDir; 267 protected char[] _program; 268 protected string[string] _env; 269 protected TextWriter _stdout; 270 protected TextWriter _stderr; 271 protected BackgroundReader _stdoutReader; 272 protected BackgroundReader _stderrReader; 273 protected ProcessPipes _pipes; 274 protected ExternalProcessState _state; 275 276 protected int _result; 277 278 @property ExternalProcessState state() { return _state; } 279 /// returns process result for stopped process 280 @property int result() { return _result; } 281 282 this() { 283 } 284 285 ExternalProcessState run(string program, string[]args, string dir, TextWriter stdoutTarget, TextWriter stderrTarget = null) { 286 char[][] arguments; 287 foreach(a; args) 288 arguments ~= a.dup; 289 return run(program.dup, arguments, dir.dup, stdoutTarget, stderrTarget); 290 } 291 ExternalProcessState run(char[] program, char[][]args, char[] dir, TextWriter stdoutTarget, TextWriter stderrTarget = null) { 292 Log.d("ExternalProcess.run ", program, " ", args); 293 _state = ExternalProcessState.None; 294 _program = program; 295 _args = args; 296 _workDir = dir; 297 _stdout = stdoutTarget; 298 _stderr = stderrTarget; 299 _result = 0; 300 assert(_stdout); 301 Redirect redirect; 302 char[][] params; 303 params ~= _program; 304 params ~= _args; 305 if (!_stderr) 306 redirect = Redirect.stdout | Redirect.stderrToStdout | Redirect.stdin; 307 else 308 redirect = Redirect.all; 309 Log.i("Trying to run program ", _program, " with args ", _args); 310 try { 311 _pipes = pipeProcess(params, redirect, _env, Config.suppressConsole, _workDir); 312 _state = ExternalProcessState.Running; 313 // start readers 314 _stdoutReader = new BackgroundReader(_pipes.stdout, _stdout); 315 _stdoutReader.start(); 316 if (_stderr) { 317 _stderrReader = new BackgroundReader(_pipes.stderr, _stderr); 318 _stderrReader.start(); 319 } 320 } catch (ProcessException e) { 321 Log.e("Cannot run program ", _program, " ", e); 322 } catch (std.stdio.StdioException e) { 323 Log.e("Cannot redirect streams for program ", _program, " ", e); 324 } 325 return _state; 326 } 327 328 protected void waitForReadingCompletion() { 329 try { 330 if (_stdoutReader && !_stdoutReader.finished) { 331 _pipes.stdout.detach(); 332 //Log.d("waitForReadingCompletion - waiting for stdout"); 333 _stdoutReader.waitForFinish(); 334 //Log.d("waitForReadingCompletion - joined stdout"); 335 } 336 _stdoutReader = null; 337 } catch (Exception e) { 338 Log.e("Exception while waiting for stdout reading completion for ", _program, " ", e); 339 } 340 try { 341 if (_stderrReader && !_stderrReader.finished) { 342 _pipes.stderr.detach(); 343 //Log.d("waitForReadingCompletion - waiting for stderr"); 344 _stderrReader.waitForFinish(); 345 _stderrReader = null; 346 //Log.d("waitForReadingCompletion - joined stderr"); 347 } 348 } catch (Exception e) { 349 Log.e("Exception while waiting for stderr reading completion for ", _program, " ", e); 350 } 351 //Log.d("waitForReadingCompletion - done"); 352 } 353 354 /// polls all available output from process streams 355 ExternalProcessState poll() { 356 //Log.d("ExternalProcess.poll state = ", _state); 357 bool res = true; 358 if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) 359 return _state; 360 // check for process finishing 361 try { 362 auto pstate = std.process.tryWait(_pipes.pid); 363 if (pstate.terminated) { 364 _state = ExternalProcessState.Stopped; 365 _result = pstate.status; 366 waitForReadingCompletion(); 367 } 368 } catch (Exception e) { 369 Log.e("Exception while waiting for process ", _program); 370 _state = ExternalProcessState.Error; 371 } 372 return _state; 373 } 374 375 /// waits until termination 376 ExternalProcessState wait() { 377 Log.d("ExternalProcess.wait"); 378 if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) 379 return _state; 380 try { 381 _result = std.process.wait(_pipes.pid); 382 _state = ExternalProcessState.Stopped; 383 waitForReadingCompletion(); 384 } catch (Exception e) { 385 Log.e("Exception while waiting for process ", _program); 386 _state = ExternalProcessState.Error; 387 } 388 return _state; 389 } 390 391 /// request process stop 392 ExternalProcessState kill() { 393 Log.d("ExternalProcess.kill"); 394 if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) 395 return _state; 396 if (_state == ExternalProcessState.Running) { 397 std.process.kill(_pipes.pid); 398 _state = ExternalProcessState.Stopping; 399 } 400 return _state; 401 } 402 403 bool write(string data) { 404 if(_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) { 405 return false; 406 } 407 else { 408 Log.d("writing ", data.length, " characters to stdin"); 409 _pipes.stdin.write("", data); 410 _pipes.stdin.flush(); 411 //_pipes.stdin.close(); 412 return true; 413 } 414 } 415 }