1 module dlangide.builders.extprocess; 2 3 import dlangui.core.logger; 4 5 import std.process; 6 import std.stdio; 7 import std.utf; 8 import std.stdio; 9 import core.thread; 10 import core.sync.mutex; 11 12 /// interface to forward process output to 13 interface TextWriter { 14 /// log lines 15 void writeText(dstring text); 16 } 17 18 /// interface to read text 19 interface TextReader { 20 /// log lines 21 dstring readText(); 22 } 23 24 /// protected text storage box to read and write text from different threads 25 class ProtectedTextStorage : TextReader, TextWriter { 26 27 private Mutex _mutex; 28 private shared bool _closed; 29 private dchar[] _buffer; 30 31 this() { 32 _mutex = new Mutex(); 33 } 34 35 @property bool closed() { return _closed; } 36 37 void close() { 38 if (_closed) 39 return; 40 _closed = true; 41 _buffer = null; 42 } 43 44 /// log lines 45 override void writeText(dstring text) { 46 if (!_closed) { 47 // if not closed 48 _mutex.lock(); 49 scope(exit) _mutex.unlock(); 50 // append text 51 _buffer ~= text; 52 } 53 } 54 55 /// log lines 56 override dstring readText() { 57 if (!_closed) { 58 // if not closed 59 _mutex.lock(); 60 scope(exit) _mutex.unlock(); 61 if (!_buffer.length) 62 return null; 63 dstring res = _buffer.dup; 64 _buffer = null; 65 return res; 66 } else { 67 // reading from closed 68 return null; 69 } 70 } 71 } 72 73 enum ExternalProcessState : uint { 74 /// not initialized 75 None, 76 /// running 77 Running, 78 /// stop is requested 79 Stopping, 80 /// stopped 81 Stopped, 82 /// error occured, e.g. cannot run process 83 Error 84 } 85 86 /// base class for text reading from std.stdio.File in background thread 87 class BackgroundReaderBase : Thread { 88 private std.stdio.File _file; 89 private shared bool _finished; 90 private ubyte[1] _byteBuffer; 91 private ubyte[] _bytes; 92 dchar[] _textbuffer; 93 private int _len; 94 private bool _utfError; 95 96 this(std.stdio.File f) { 97 super(&run); 98 assert(f.isOpen()); 99 _file = f; 100 _len = 0; 101 _finished = false; 102 } 103 104 @property bool finished() { 105 return _finished; 106 } 107 108 ubyte prevchar; 109 void addByte(ubyte data) { 110 if (_bytes.length < _len + 1) 111 _bytes.length = _bytes.length ? _bytes.length * 2 : 1024; 112 bool eolchar = (data == '\r' || data == '\n'); 113 bool preveol = (prevchar == '\r' || prevchar == '\n'); 114 if ((eolchar && !preveol) || (!eolchar && preveol)) { 115 //Log.d("Flushing for prevChar=", prevchar, " newChar=", data); 116 flush(); 117 } 118 _bytes[_len++] = data; 119 prevchar = data; 120 } 121 void flush() { 122 if (!_len) 123 return; 124 if (_textbuffer.length < _len) 125 _textbuffer.length = _len + 256; 126 size_t count = 0; 127 for(size_t i = 0; i < _len;) { 128 dchar ch = 0; 129 if (_utfError) { 130 ch = _bytes[i++]; 131 } else { 132 try { 133 ch = decode(cast(string)_bytes, i); 134 } catch (UTFException e) { 135 _utfError = true; 136 ch = _bytes[i++]; 137 Log.d("non-unicode characters found in output of process"); 138 } 139 } 140 _textbuffer[count++] = ch; 141 } 142 _len = 0; 143 144 if (!count) 145 return; 146 147 // fix line endings - must be '\n' 148 count = convertLineEndings(_textbuffer[0..count]); 149 150 // data is ready to send 151 if (count) 152 sendResult(_textbuffer[0..count].dup); 153 } 154 /// inplace convert line endings to unix format (\n) 155 size_t convertLineEndings(dchar[] text) { 156 size_t src = 0; 157 size_t dst = 0; 158 for(;src < text.length;) { 159 dchar ch = text[src++]; 160 dchar nextch = src < text.length ? text[src] : 0; 161 if (ch == '\n') { 162 if (nextch == '\r') 163 src++; 164 text[dst++] = '\n'; 165 } else if (ch == '\r') { 166 if (nextch == '\n') 167 src++; 168 text[dst++] = '\n'; 169 } else { 170 text[dst++] = ch; 171 } 172 } 173 return dst; 174 } 175 protected void sendResult(dstring text) { 176 // override to deal with ready data 177 } 178 179 protected void handleFinish() { 180 // override to do something when thread is finishing 181 } 182 183 private void run() { 184 //Log.d("BackgroundReaderBase run() enter"); 185 // read file by bytes 186 try { 187 version (Windows) { 188 import win32.windows; 189 // separate version for windows as workaround for hanging rawRead 190 HANDLE h = _file.windowsHandle; 191 DWORD bytesRead = 0; 192 DWORD err; 193 for (;;) { 194 BOOL res = ReadFile(h, _byteBuffer.ptr, 1, &bytesRead, null); 195 if (res) { 196 if (bytesRead == 1) 197 addByte(_byteBuffer[0]); 198 } else { 199 err = GetLastError(); 200 if (err == ERROR_MORE_DATA) { 201 if (bytesRead == 1) 202 addByte(_byteBuffer[0]); 203 continue; 204 } 205 //if (err == ERROR_BROKEN_PIPE || err = ERROR_INVALID_HANDLE) 206 break; 207 } 208 } 209 } else { 210 for (;;) { 211 //Log.d("BackgroundReaderBase run() reading file"); 212 if (_file.eof) 213 break; 214 ubyte[] r = _file.rawRead(_byteBuffer); 215 if (!r.length) 216 break; 217 //Log.d("BackgroundReaderBase run() read byte: ", r[0]); 218 addByte(r[0]); 219 } 220 } 221 _file.close(); 222 flush(); 223 //Log.d("BackgroundReaderBase run() closing file"); 224 //Log.d("BackgroundReaderBase run() file closed"); 225 } catch (Exception e) { 226 //Log.e("Exception occured while reading stream: ", e); 227 } 228 handleFinish(); 229 _finished = true; 230 //Log.d("BackgroundReaderBase run() exit"); 231 } 232 233 void waitForFinish() { 234 static if (false) { 235 while (isRunning && !_finished) 236 Thread.sleep( dur!("msecs")( 10 ) ); 237 } else { 238 join(false); 239 } 240 } 241 242 } 243 244 /// reader which sends output text to TextWriter (warning: call will be made from background thread) 245 class BackgroundReader : BackgroundReaderBase { 246 protected TextWriter _destination; 247 this(std.stdio.File f, TextWriter destination) { 248 super(f); 249 assert(destination); 250 _destination = destination; 251 } 252 override protected void sendResult(dstring text) { 253 // override to deal with ready data 254 _destination.writeText(text); 255 } 256 override protected void handleFinish() { 257 // remove link to destination to help GC 258 _destination = null; 259 } 260 } 261 262 /// runs external process, catches output, allows to stop 263 class ExternalProcess { 264 265 protected char[][] _args; 266 protected char[] _workDir; 267 protected char[] _program; 268 protected string[string] _env; 269 protected TextWriter _stdout; 270 protected TextWriter _stderr; 271 protected BackgroundReader _stdoutReader; 272 protected BackgroundReader _stderrReader; 273 protected ProcessPipes _pipes; 274 protected ExternalProcessState _state; 275 276 protected int _result; 277 278 @property ExternalProcessState state() { return _state; } 279 /// returns process result for stopped process 280 @property int result() { return _result; } 281 282 this() { 283 } 284 285 ExternalProcessState run(char[] program, char[][]args, char[] dir, TextWriter stdoutTarget, TextWriter stderrTarget = null) { 286 Log.d("ExternalProcess.run ", program, " ", args); 287 _state = ExternalProcessState.None; 288 _program = program; 289 _args = args; 290 _workDir = dir; 291 _stdout = stdoutTarget; 292 _stderr = stderrTarget; 293 _result = 0; 294 assert(_stdout); 295 Redirect redirect; 296 char[][] params; 297 params ~= _program; 298 params ~= _args; 299 if (!_stderr) 300 redirect = Redirect.stdout | Redirect.stderrToStdout; //Redirect.stdin | 301 else 302 redirect = Redirect.all; 303 Log.i("Trying to run program ", _program, " with args ", _args); 304 try { 305 _pipes = pipeProcess(params, redirect, _env, Config.suppressConsole, _workDir); 306 _state = ExternalProcessState.Running; 307 // start readers 308 _stdoutReader = new BackgroundReader(_pipes.stdout, _stdout); 309 _stdoutReader.start(); 310 if (_stderr) { 311 _stderrReader = new BackgroundReader(_pipes.stderr, _stderr); 312 _stderrReader.start(); 313 } 314 } catch (ProcessException e) { 315 Log.e("Cannot run program ", _program, " ", e); 316 } catch (std.stdio.StdioException e) { 317 Log.e("Cannot redirect streams for program ", _program, " ", e); 318 } 319 return _state; 320 } 321 322 protected void waitForReadingCompletion() { 323 try { 324 if (_stdoutReader && !_stdoutReader.finished) { 325 _pipes.stdout.detach(); 326 //Log.d("waitForReadingCompletion - waiting for stdout"); 327 _stdoutReader.waitForFinish(); 328 //Log.d("waitForReadingCompletion - joined stdout"); 329 } 330 _stdoutReader = null; 331 } catch (Exception e) { 332 Log.e("Exception while waiting for stdout reading completion for ", _program, " ", e); 333 } 334 try { 335 if (_stderrReader && !_stderrReader.finished) { 336 _pipes.stderr.detach(); 337 //Log.d("waitForReadingCompletion - waiting for stderr"); 338 _stderrReader.waitForFinish(); 339 _stderrReader = null; 340 //Log.d("waitForReadingCompletion - joined stderr"); 341 } 342 } catch (Exception e) { 343 Log.e("Exception while waiting for stderr reading completion for ", _program, " ", e); 344 } 345 //Log.d("waitForReadingCompletion - done"); 346 } 347 348 /// polls all available output from process streams 349 ExternalProcessState poll() { 350 //Log.d("ExternalProcess.poll state = ", _state); 351 bool res = true; 352 if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) 353 return _state; 354 // check for process finishing 355 try { 356 auto pstate = std.process.tryWait(_pipes.pid); 357 if (pstate.terminated) { 358 _state = ExternalProcessState.Stopped; 359 _result = pstate.status; 360 waitForReadingCompletion(); 361 } 362 } catch (Exception e) { 363 Log.e("Exception while waiting for process ", _program); 364 _state = ExternalProcessState.Error; 365 } 366 return _state; 367 } 368 369 /// waits until termination 370 ExternalProcessState wait() { 371 Log.d("ExternalProcess.wait"); 372 if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) 373 return _state; 374 try { 375 _result = std.process.wait(_pipes.pid); 376 _state = ExternalProcessState.Stopped; 377 waitForReadingCompletion(); 378 } catch (Exception e) { 379 Log.e("Exception while waiting for process ", _program); 380 _state = ExternalProcessState.Error; 381 } 382 return _state; 383 } 384 385 /// request process stop 386 ExternalProcessState kill() { 387 Log.d("ExternalProcess.kill"); 388 if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) 389 return _state; 390 if (_state == ExternalProcessState.Running) { 391 std.process.kill(_pipes.pid); 392 _state = ExternalProcessState.Stopping; 393 } 394 return _state; 395 } 396 }