diff --git a/library/std/src/sys/windows/c.rs b/library/std/src/sys/windows/c.rs index 0edf43e5d9dd5..0ecc2a5cfdad5 100644 --- a/library/std/src/sys/windows/c.rs +++ b/library/std/src/sys/windows/c.rs @@ -326,6 +326,12 @@ impl Default for IO_STATUS_BLOCK { } } +pub type LPOVERLAPPED_COMPLETION_ROUTINE = unsafe extern "system" fn( + dwErrorCode: DWORD, + dwNumberOfBytesTransfered: DWORD, + lpOverlapped: *mut OVERLAPPED, +); + #[repr(C)] #[cfg(not(target_pointer_width = "64"))] pub struct WSADATA { @@ -891,6 +897,7 @@ extern "system" { pub fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD; pub fn SwitchToThread() -> BOOL; pub fn Sleep(dwMilliseconds: DWORD); + pub fn SleepEx(dwMilliseconds: DWORD, bAlertable: BOOL) -> DWORD; pub fn GetProcessId(handle: HANDLE) -> DWORD; pub fn CopyFileExW( lpExistingFileName: LPCWSTR, @@ -957,6 +964,13 @@ extern "system" { lpNumberOfBytesRead: LPDWORD, lpOverlapped: LPOVERLAPPED, ) -> BOOL; + pub fn ReadFileEx( + hFile: BorrowedHandle<'_>, + lpBuffer: LPVOID, + nNumberOfBytesToRead: DWORD, + lpOverlapped: LPOVERLAPPED, + lpCompletionRoutine: LPOVERLAPPED_COMPLETION_ROUTINE, + ) -> BOOL; pub fn WriteFile( hFile: BorrowedHandle<'_>, lpBuffer: LPVOID, @@ -964,6 +978,13 @@ extern "system" { lpNumberOfBytesWritten: LPDWORD, lpOverlapped: LPOVERLAPPED, ) -> BOOL; + pub fn WriteFileEx( + hFile: BorrowedHandle<'_>, + lpBuffer: LPVOID, + nNumberOfBytesToWrite: DWORD, + lpOverlapped: LPOVERLAPPED, + lpCompletionRoutine: LPOVERLAPPED_COMPLETION_ROUTINE, + ) -> BOOL; pub fn CloseHandle(hObject: HANDLE) -> BOOL; pub fn MoveFileExW(lpExistingFileName: LPCWSTR, lpNewFileName: LPCWSTR, dwFlags: DWORD) -> BOOL; diff --git a/library/std/src/sys/windows/pipe.rs b/library/std/src/sys/windows/pipe.rs index 63d3d6c5ed42f..df4f1b24eec26 100644 --- a/library/std/src/sys/windows/pipe.rs +++ b/library/std/src/sys/windows/pipe.rs @@ -173,6 +173,15 @@ fn random_number() -> usize { } } +// Abstracts over `ReadFileEx` and `WriteFileEx` +type AlertableIoFn = unsafe extern "system" fn( + BorrowedHandle<'_>, + c::LPVOID, + c::DWORD, + c::LPOVERLAPPED, + c::LPOVERLAPPED_COMPLETION_ROUTINE, +) -> c::BOOL; + impl AnonPipe { pub fn handle(&self) -> &Handle { &self.inner @@ -182,7 +191,19 @@ impl AnonPipe { } pub fn read(&self, buf: &mut [u8]) -> io::Result { - self.inner.read(buf) + let result = unsafe { + let len = crate::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD; + self.alertable_io_internal(c::ReadFileEx, buf.as_mut_ptr() as _, len) + }; + + match result { + // The special treatment of BrokenPipe is to deal with Windows + // pipe semantics, which yields this error when *reading* from + // a pipe after the other end has closed; we interpret that as + // EOF on the pipe. + Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(0), + _ => result, + } } pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result { @@ -195,7 +216,10 @@ impl AnonPipe { } pub fn write(&self, buf: &[u8]) -> io::Result { - self.inner.write(buf) + unsafe { + let len = crate::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD; + self.alertable_io_internal(c::WriteFileEx, buf.as_ptr() as _, len) + } } pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result { @@ -206,6 +230,99 @@ impl AnonPipe { pub fn is_write_vectored(&self) -> bool { self.inner.is_write_vectored() } + + /// Synchronizes asynchronous reads or writes using our anonymous pipe. + /// + /// This is a wrapper around [`ReadFileEx`] or [`WriteFileEx`] that uses + /// [Asynchronous Procedure Call] (APC) to synchronize reads or writes. + /// + /// Note: This should not be used for handles we don't create. + /// + /// # Safety + /// + /// `buf` must be a pointer to a buffer that's valid for reads or writes + /// up to `len` bytes. The `AlertableIoFn` must be either `ReadFileEx` or `WriteFileEx` + /// + /// [`ReadFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfileex + /// [`WriteFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-writefileex + /// [Asynchronous Procedure Call]: https://docs.microsoft.com/en-us/windows/win32/sync/asynchronous-procedure-calls + unsafe fn alertable_io_internal( + &self, + io: AlertableIoFn, + buf: c::LPVOID, + len: c::DWORD, + ) -> io::Result { + // Use "alertable I/O" to synchronize the pipe I/O. + // This has four steps. + // + // STEP 1: Start the asynchronous I/O operation. + // This simply calls either `ReadFileEx` or `WriteFileEx`, + // giving it a pointer to the buffer and callback function. + // + // STEP 2: Enter an alertable state. + // The callback set in step 1 will not be called until the thread + // enters an "alertable" state. This can be done using `SleepEx`. + // + // STEP 3: The callback + // Once the I/O is complete and the thread is in an alertable state, + // the callback will be run on the same thread as the call to + // `ReadFileEx` or `WriteFileEx` done in step 1. + // In the callback we simply set the result of the async operation. + // + // STEP 4: Return the result. + // At this point we'll have a result from the callback function + // and can simply return it. Note that we must not return earlier, + // while the I/O is still in progress. + + // The result that will be set from the asynchronous callback. + let mut async_result: Option = None; + struct AsyncResult { + error: u32, + transfered: u32, + } + + // STEP 3: The callback. + unsafe extern "system" fn callback( + dwErrorCode: u32, + dwNumberOfBytesTransfered: u32, + lpOverlapped: *mut c::OVERLAPPED, + ) { + // Set `async_result` using a pointer smuggled through `hEvent`. + let result = AsyncResult { error: dwErrorCode, transfered: dwNumberOfBytesTransfered }; + *(*lpOverlapped).hEvent.cast::>() = Some(result); + } + + // STEP 1: Start the I/O operation. + let mut overlapped: c::OVERLAPPED = crate::mem::zeroed(); + // `hEvent` is unused by `ReadFileEx` and `WriteFileEx`. + // Therefore the documentation suggests using it to smuggle a pointer to the callback. + overlapped.hEvent = &mut async_result as *mut _ as *mut _; + + // Asynchronous read of the pipe. + // If successful, `callback` will be called once it completes. + let result = io(self.inner.as_handle(), buf, len, &mut overlapped, callback); + if result == c::FALSE { + // We can return here because the call failed. + // After this we must not return until the I/O completes. + return Err(io::Error::last_os_error()); + } + + // Wait indefinitely for the result. + let result = loop { + // STEP 2: Enter an alertable state. + // The second parameter of `SleepEx` is used to make this sleep alertable. + c::SleepEx(c::INFINITE, c::TRUE); + if let Some(result) = async_result { + break result; + } + }; + // STEP 4: Return the result. + // `async_result` is always `Some` at this point + match result.error { + c::ERROR_SUCCESS => Ok(result.transfered as usize), + error => Err(io::Error::from_raw_os_error(error as _)), + } + } } pub fn read2(p1: AnonPipe, v1: &mut Vec, p2: AnonPipe, v2: &mut Vec) -> io::Result<()> {