1
0
Fork 0
mirror of https://github.com/VSadov/Satori.git synced 2025-06-09 17:44:48 +09:00

Implement TranscodingStream (#35145)

This is a streaming analog to the one-shot Encoding.Convert method.
This commit is contained in:
Levi Broderick 2020-04-30 13:50:48 -07:00 committed by GitHub
parent 58017ba71a
commit 1f4393df55
Signed by: github
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 1596 additions and 2 deletions

View file

@ -0,0 +1,25 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System.Threading;
using System.Threading.Tasks;
namespace System.IO
{
public static class StreamExtensions
{
public static async Task<int> ReadByteAsync(this Stream stream, CancellationToken cancellationToken = default)
{
byte[] buffer = new byte[1];
int numBytesRead = await stream.ReadAsync(buffer, 0, 1, cancellationToken);
if (numBytesRead == 0)
{
return -1; // EOF
}
return buffer[0];
}
}
}

View file

@ -11,6 +11,7 @@
<ItemGroup>
<Compile Include="System\AdminHelpers.cs" />
<Compile Include="System\AssertExtensions.cs" />
<Compile Include="System\IO\StreamExtensions.cs" />
<Compile Include="System\RetryHelper.cs" />
<Compile Include="System\Buffers\BoundedMemory.cs" />
<Compile Include="System\Buffers\BoundedMemory.Creation.cs" />

View file

@ -9,8 +9,6 @@
<Compile Include="System\Net\Http\Json\HttpClientJsonExtensions.Put.cs" />
<Compile Include="System\Net\Http\Json\HttpContentJsonExtensions.cs" />
<Compile Include="System\Net\Http\Json\JsonContent.cs" />
<Compile Include="System\Net\Http\Json\TranscodingReadStream.cs" />
<Compile Include="System\Net\Http\Json\TranscodingWriteStream.cs" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == '$(NetCoreAppCurrent)'">
<Compile Include="System\Net\Http\Json\JsonContent.netcoreapp.cs" />
@ -21,6 +19,8 @@
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<Compile Include="System\ArraySegmentExtensions.netstandard.cs" />
<Compile Include="System\Net\Http\Json\TranscodingReadStream.cs" />
<Compile Include="System\Net\Http\Json\TranscodingWriteStream.cs" />
<Reference Include="System.Buffers" />
</ItemGroup>
<ItemGroup>

View file

@ -38,7 +38,11 @@ namespace System.Net.Http.Json
// Wrap content stream into a transcoding stream that buffers the data transcoded from the sourceEncoding to utf-8.
if (sourceEncoding != null && sourceEncoding != Encoding.UTF8)
{
#if NETCOREAPP
contentStream = Encoding.CreateTranscodingStream(contentStream, innerStreamEncoding: sourceEncoding, outerStreamEncoding: Encoding.UTF8);
#else
contentStream = new TranscodingReadStream(contentStream, sourceEncoding);
#endif
}
using (contentStream)
@ -54,7 +58,11 @@ namespace System.Net.Http.Json
// Wrap content stream into a transcoding stream that buffers the data transcoded from the sourceEncoding to utf-8.
if (sourceEncoding != null && sourceEncoding != Encoding.UTF8)
{
#if NETCOREAPP
contentStream = Encoding.CreateTranscodingStream(contentStream, innerStreamEncoding: sourceEncoding, outerStreamEncoding: Encoding.UTF8);
#else
contentStream = new TranscodingReadStream(contentStream, sourceEncoding);
#endif
}
using (contentStream)

View file

@ -67,6 +67,19 @@ namespace System.Net.Http.Json
// Wrap provided stream into a transcoding stream that buffers the data transcoded from utf-8 to the targetEncoding.
if (targetEncoding != null && targetEncoding != Encoding.UTF8)
{
#if NETCOREAPP
Stream transcodingStream = Encoding.CreateTranscodingStream(targetStream, targetEncoding, Encoding.UTF8, leaveOpen: true);
try
{
await JsonSerializer.SerializeAsync(transcodingStream, Value, ObjectType, _jsonSerializerOptions, cancellationToken).ConfigureAwait(false);
}
finally
{
// DisposeAsync will flush any partial write buffers. In practice our partial write
// buffers should be empty as we expect JsonSerializer to emit only well-formed UTF-8 data.
await transcodingStream.DisposeAsync().ConfigureAwait(false);
}
#else
using (TranscodingWriteStream transcodingStream = new TranscodingWriteStream(targetStream, targetEncoding))
{
await JsonSerializer.SerializeAsync(transcodingStream, Value, ObjectType, _jsonSerializerOptions, cancellationToken).ConfigureAwait(false);
@ -75,6 +88,7 @@ namespace System.Net.Http.Json
// acceptable to Flush a Stream (multiple times) prior to completion.
await transcodingStream.FinalWriteAsync(cancellationToken).ConfigureAwait(false);
}
#endif
}
else
{

View file

@ -874,6 +874,7 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Text\StringBuilder.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Text\StringBuilder.Debug.cs" Condition="'$(Configuration)' == 'Debug'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Text\StringRuneEnumerator.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Text\TranscodingStream.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Text\TrimType.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Text\Unicode\GraphemeClusterBreakType.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Text\Unicode\TextSegmentationUtility.cs" />

View file

@ -4,6 +4,7 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Runtime.InteropServices;
using System.Runtime.Serialization;
@ -1040,6 +1041,50 @@ namespace System.Text
public override int GetHashCode() =>
_codePage + this.EncoderFallback.GetHashCode() + this.DecoderFallback.GetHashCode();
/// <summary>
/// Creates a <see cref="Stream"/> which serves to transcode data between an inner <see cref="Encoding"/>
/// and an outer <see cref="Encoding"/>, similar to <see cref="Convert"/>.
/// </summary>
/// <param name="innerStream">The <see cref="Stream"/> to wrap.</param>
/// <param name="innerStreamEncoding">The <see cref="Encoding"/> associated with <paramref name="innerStream"/>.</param>
/// <param name="outerStreamEncoding">The <see cref="Encoding"/> associated with the <see cref="Stream"/> returned
/// by this method.</param>
/// <param name="leaveOpen"><see langword="true"/> if disposing the <see cref="Stream"/> returned by this method
/// should <em>not</em> dispose <paramref name="innerStream"/>.</param>
/// <returns>A <see cref="Stream"/> which transcodes the contents of <paramref name="innerStream"/>
/// as <paramref name="outerStreamEncoding"/>.</returns>
/// <remarks>
/// The returned <see cref="Stream"/>'s <see cref="Stream.CanRead"/> and <see cref="Stream.CanWrite"/> properties
/// will reflect whether <paramref name="innerStream"/> is readable or writable. If <paramref name="innerStream"/>
/// is full-duplex, the returned <see cref="Stream"/> will be as well. However, the returned <see cref="Stream"/>
/// is not seekable, even if <paramref name="innerStream"/>'s <see cref="Stream.CanSeek"/> property returns <see langword="true"/>.
/// </remarks>
public static Stream CreateTranscodingStream(Stream innerStream, Encoding innerStreamEncoding, Encoding outerStreamEncoding, bool leaveOpen = false)
{
if (innerStream is null)
{
throw new ArgumentNullException(nameof(innerStream));
}
if (innerStreamEncoding is null)
{
throw new ArgumentNullException(nameof(innerStreamEncoding));
}
if (outerStreamEncoding is null)
{
throw new ArgumentNullException(nameof(outerStreamEncoding));
}
// We can't entirely optimize away the case where innerStreamEncoding == outerStreamEncoding. For example,
// the Encoding might perform a lossy conversion when it sees invalid data, so we still need to call it
// to perform basic validation. It's also possible that somebody subclassed one of the built-in types
// like ASCIIEncoding or UTF8Encoding and is running some non-standard logic. If this becomes a bottleneck
// we can consider targeted optimizations in a future release.
return new TranscodingStream(innerStream, innerStreamEncoding, outerStreamEncoding, leaveOpen);
}
internal virtual char[] GetBestFitUnicodeToBytesData() =>
// Normally we don't have any best fit data.
Array.Empty<char>();

View file

@ -0,0 +1,598 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
namespace System.Text
{
internal sealed class TranscodingStream : Stream
{
private const int DefaultReadByteBufferSize = 4 * 1024; // lifted from StreamReader.cs (FileStream)
// We optimistically assume 1 byte ~ 1 char during transcoding. This is a good rule of thumb
// but isn't always appropriate: transcoding between single-byte and multi-byte encodings
// will violate this, as will any invalid data fixups performed by the transcoder itself.
// To account for these unknowns we have a minimum scratch buffer size we use during the
// transcoding process. This should be generous enough to account for even the largest
// fallback mechanism we're likely to see in the real world.
private const int MinWriteRentedArraySize = 4 * 1024;
private const int MaxWriteRentedArraySize = 1024 * 1024;
private readonly Encoding _innerEncoding;
private readonly Encoding _thisEncoding;
private Stream _innerStream; // null if the wrapper has been disposed
private readonly bool _leaveOpen;
/*
* Fields used for writing bytes [this] -> chars -> bytes [inner]
* Lazily initialized the first time we need to write
*/
private Encoder? _innerEncoder;
private Decoder? _thisDecoder;
/*
* Fields used for reading bytes [inner] -> chars -> bytes [this]
* Lazily initialized the first time we need to read
*/
private Encoder? _thisEncoder;
private Decoder? _innerDecoder;
private int _readCharBufferMaxSize; // the maximum number of characters _innerDecoder.ReadChars can return
private byte[]? _readBuffer; // contains the data that Read() should return
private int _readBufferOffset;
private int _readBufferCount;
internal TranscodingStream(Stream innerStream, Encoding innerEncoding, Encoding thisEncoding, bool leaveOpen)
{
Debug.Assert(innerStream != null);
Debug.Assert(innerEncoding != null);
Debug.Assert(thisEncoding != null);
_innerStream = innerStream;
_leaveOpen = leaveOpen;
_innerEncoding = innerEncoding;
_thisEncoding = thisEncoding;
}
/*
* Most CanXyz methods delegate to the inner stream, returning false
* if this instance has been disposed. CanSeek is always false.
*/
public override bool CanRead => _innerStream?.CanRead ?? false;
public override bool CanSeek => false;
public override bool CanWrite => _innerStream?.CanWrite ?? false;
public override long Length => throw new NotSupportedException(SR.NotSupported_UnseekableStream);
public override long Position
{
get => throw new NotSupportedException(SR.NotSupported_UnseekableStream);
set => throw new NotSupportedException(SR.NotSupported_UnseekableStream);
}
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
=> TaskToApm.Begin(ReadAsync(buffer, offset, count, CancellationToken.None), callback, state);
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
=> TaskToApm.Begin(WriteAsync(buffer, offset, count, CancellationToken.None), callback, state);
protected override void Dispose(bool disposing)
{
Debug.Assert(disposing, "This type isn't finalizable.");
if (_innerStream is null)
{
return; // dispose called multiple times, ignore
}
// First, flush any pending data to the inner stream.
ArraySegment<byte> pendingData = FinalFlushWriteBuffers();
if (pendingData.Count != 0)
{
_innerStream.Write(pendingData);
}
// Mark our object as disposed
Stream innerStream = _innerStream;
_innerStream = null!;
// And dispose the inner stream if needed
if (!_leaveOpen)
{
innerStream.Dispose();
}
}
public override ValueTask DisposeAsync()
{
if (_innerStream is null)
{
return default; // dispose called multiple times, ignore
}
// First, get any pending data destined for the inner stream.
ArraySegment<byte> pendingData = FinalFlushWriteBuffers();
if (pendingData.Count == 0)
{
// Fast path: just dispose of the object graph.
// No need to write anything to the stream first.
Stream innerStream = _innerStream;
_innerStream = null!;
return (_leaveOpen)
? default /* no work to do */
: innerStream.DisposeAsync();
}
// Slower path; need to perform an async write followed by an async dispose.
return DisposeAsyncCore(pendingData);
async ValueTask DisposeAsyncCore(ArraySegment<byte> pendingData)
{
Debug.Assert(pendingData.Count != 0);
Stream innerStream = _innerStream;
_innerStream = null!;
await innerStream.WriteAsync(pendingData.AsMemory()).ConfigureAwait(false);
if (!_leaveOpen)
{
await innerStream.DisposeAsync().ConfigureAwait(false);
}
}
}
public override int EndRead(IAsyncResult asyncResult)
=> TaskToApm.End<int>(asyncResult);
public override void EndWrite(IAsyncResult asyncResult)
=> TaskToApm.End(asyncResult);
#pragma warning disable CS3016 // Arrays as attribute arguments is not CLS-compliant
#pragma warning disable CS8774 // Member must have a non-null value when exiting.
// Sets up the data structures that are necessary before any read operation takes place,
// throwing if the object is in a state where reads are not possible.
[MemberNotNull(nameof(_innerDecoder), nameof(_thisEncoder), nameof(_readBuffer))]
private void EnsurePreReadConditions()
{
ThrowIfDisposed();
if (_innerDecoder is null)
{
InitializeReadDataStructures();
}
void InitializeReadDataStructures()
{
if (!CanRead)
{
throw Error.GetReadNotSupported();
}
_innerDecoder = _innerEncoding.GetDecoder();
_thisEncoder = _thisEncoding.GetEncoder();
_readCharBufferMaxSize = _innerEncoding.GetMaxCharCount(DefaultReadByteBufferSize);
// Can't use ArrayPool for the below array since it's an instance field of this object.
// But since we never expose the raw array contents to our callers we can get away
// with skipping the array zero-init during allocation. The segment points to the
// data which we haven't yet read; however, we own the entire backing array and can
// re-create the segment as needed once the array is repopulated.
_readBuffer = GC.AllocateUninitializedArray<byte>(_thisEncoding.GetMaxByteCount(_readCharBufferMaxSize));
}
}
// Sets up the data structures that are necessary before any write operation takes place,
// throwing if the object is in a state where writes are not possible.
[MemberNotNull(nameof(_thisDecoder), nameof(_innerEncoder))]
private void EnsurePreWriteConditions()
{
ThrowIfDisposed();
if (_innerEncoder is null)
{
InitializeReadDataStructures();
}
void InitializeReadDataStructures()
{
if (!CanWrite)
{
throw Error.GetWriteNotSupported();
}
_innerEncoder = _innerEncoding.GetEncoder();
_thisDecoder = _thisEncoding.GetDecoder();
}
}
#pragma warning restore CS8774 // Member must have a non-null value when exiting.
#pragma warning restore CS3016 // Arrays as attribute arguments is not CLS-compliant
// returns any pending data that needs to be flushed to the inner stream before disposal
private ArraySegment<byte> FinalFlushWriteBuffers()
{
// If this stream was never used for writing, no-op.
if (_thisDecoder is null || _innerEncoder is null)
{
return default;
}
// convert bytes [this] -> chars
// Having leftover data in our buffers should be very rare since it should only
// occur if the end of the stream contains an incomplete multi-byte sequence.
// Let's not bother complicating this logic with array pool rentals or allocation-
// avoiding loops.
char[] chars = Array.Empty<char>();
int charCount = _thisDecoder.GetCharCount(Array.Empty<byte>(), 0, 0, flush: true);
if (charCount > 0)
{
chars = new char[charCount];
charCount = _thisDecoder.GetChars(Array.Empty<byte>(), 0, 0, chars, 0, flush: true);
}
// convert chars -> bytes [inner]
// It's possible that _innerEncoder might need to perform some end-of-text fixup
// (due to flush: true), even if _thisDecoder didn't need to do so.
byte[] bytes = Array.Empty<byte>();
int byteCount = _innerEncoder.GetByteCount(chars, 0, charCount, flush: true);
if (byteCount > 0)
{
bytes = new byte[byteCount];
byteCount = _innerEncoder.GetBytes(chars, 0, charCount, bytes, 0, flush: true);
}
return new ArraySegment<byte>(bytes, 0, byteCount);
}
public override void Flush()
{
// Don't pass flush: true to our inner decoder + encoder here, since it could cause data
// corruption if a flush occurs mid-stream. Wait until the stream is being closed.
ThrowIfDisposed();
_innerStream.Flush();
}
public override Task FlushAsync(CancellationToken cancellationToken)
{
// Don't pass flush: true to our inner decoder + encoder here, since it could cause data
// corruption if a flush occurs mid-stream. Wait until the stream is being closed.
ThrowIfDisposed();
return _innerStream.FlushAsync(cancellationToken);
}
public override int Read(byte[] buffer, int offset, int count)
{
if (buffer is null)
{
throw new ArgumentNullException(nameof(buffer));
}
return Read(new Span<byte>(buffer, offset, count));
}
public override int Read(Span<byte> buffer)
{
EnsurePreReadConditions();
// If there's no data in our pending read buffer, we'll need to populate it from
// the inner stream. We read the inner stream's bytes, decode that to chars using
// the 'inner' encoding, then re-encode those chars under the 'this' encoding.
// We've already calculated the worst-case expansions for the intermediate buffers,
// so we use GetChars / GetBytes instead of Convert to simplify the below code
// and to ensure an exception is thrown if the Encoding reported an incorrect
// worst-case expansion.
if (_readBufferCount == 0)
{
byte[] rentedBytes = ArrayPool<byte>.Shared.Rent(DefaultReadByteBufferSize);
char[] rentedChars = ArrayPool<char>.Shared.Rent(_readCharBufferMaxSize);
try
{
// Beware: Use our constant value instead of 'rentedBytes.Length' for the count
// parameter below. The reason for this is that the array pool could've returned
// a larger-than-expected array, but our worst-case expansion calculations
// performed earlier didn't take that into account.
int innerBytesReadJustNow = _innerStream.Read(rentedBytes, 0, DefaultReadByteBufferSize);
bool isEofReached = (innerBytesReadJustNow == 0);
// convert bytes [inner] -> chars, then convert chars -> bytes [this]
int charsDecodedJustNow = _innerDecoder.GetChars(rentedBytes, 0, innerBytesReadJustNow, rentedChars, 0, flush: isEofReached);
int pendingReadDataPopulatedJustNow = _thisEncoder.GetBytes(rentedChars, 0, charsDecodedJustNow, _readBuffer, 0, flush: isEofReached);
_readBufferOffset = 0;
_readBufferCount = pendingReadDataPopulatedJustNow;
}
finally
{
ArrayPool<byte>.Shared.Return(rentedBytes);
ArrayPool<char>.Shared.Return(rentedChars);
}
}
// At this point: (a) we've populated our pending read buffer and there's
// useful data to return to our caller; or (b) the pending read buffer is
// empty because the inner stream has reached EOF and all pending read data
// has already been flushed, and we should return 0.
int bytesToReturn = Math.Min(_readBufferCount, buffer.Length);
_readBuffer.AsSpan(_readBufferOffset, bytesToReturn).CopyTo(buffer);
_readBufferOffset += bytesToReturn;
_readBufferCount -= bytesToReturn;
return bytesToReturn;
}
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer is null)
{
throw new ArgumentNullException(nameof(buffer));
}
return ReadAsync(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
{
EnsurePreReadConditions();
if (cancellationToken.IsCancellationRequested)
{
return new ValueTask<int>(Task.FromCanceled<int>(cancellationToken));
}
return ReadAsyncCore(buffer, cancellationToken);
async ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToken cancellationToken)
{
// If there's no data in our pending read buffer, we'll need to populate it from
// the inner stream. We read the inner stream's bytes, decode that to chars using
// the 'inner' encoding, then re-encode those chars under the 'this' encoding.
// We've already calculated the worst-case expansions for the intermediate buffers,
// so we use GetChars / GetBytes instead of Convert to simplify the below code
// and to ensure an exception is thrown if the Encoding reported an incorrect
// worst-case expansion.
if (_readBufferCount == 0)
{
byte[] rentedBytes = ArrayPool<byte>.Shared.Rent(DefaultReadByteBufferSize);
char[] rentedChars = ArrayPool<char>.Shared.Rent(_readCharBufferMaxSize);
try
{
// Beware: Use our constant value instead of 'rentedBytes.Length' when creating
// the Mem<byte> struct. The reason for this is that the array pool could've returned
// a larger-than-expected array, but our worst-case expansion calculations
// performed earlier didn't take that into account.
int innerBytesReadJustNow = await _innerStream.ReadAsync(rentedBytes.AsMemory(0, DefaultReadByteBufferSize), cancellationToken).ConfigureAwait(false);
bool isEofReached = (innerBytesReadJustNow == 0);
// convert bytes [inner] -> chars, then convert chars -> bytes [this]
int charsDecodedJustNow = _innerDecoder.GetChars(rentedBytes, 0, innerBytesReadJustNow, rentedChars, 0, flush: isEofReached);
int pendingReadDataPopulatedJustNow = _thisEncoder.GetBytes(rentedChars, 0, charsDecodedJustNow, _readBuffer, 0, flush: isEofReached);
_readBufferOffset = 0;
_readBufferCount = pendingReadDataPopulatedJustNow;
}
finally
{
ArrayPool<byte>.Shared.Return(rentedBytes);
ArrayPool<char>.Shared.Return(rentedChars);
}
}
// At this point: (a) we've populated our pending read buffer and there's
// useful data to return to our caller; or (b) the pending read buffer is
// empty because the inner stream has reached EOF and all pending read data
// has already been flushed, and we should return 0.
int bytesToReturn = Math.Min(_readBufferCount, buffer.Length);
_readBuffer.AsSpan(_readBufferOffset, bytesToReturn).CopyTo(buffer.Span);
_readBufferOffset += bytesToReturn;
_readBufferCount -= bytesToReturn;
return bytesToReturn;
}
}
public override int ReadByte()
{
Span<byte> buffer = stackalloc byte[1];
int bytesRead = Read(buffer);
return (bytesRead == 0) ? -1 /* EOF */ : buffer[0];
}
public override long Seek(long offset, SeekOrigin origin)
=> throw new NotSupportedException(SR.NotSupported_UnseekableStream);
public override void SetLength(long value)
=> throw new NotSupportedException(SR.NotSupported_UnseekableStream);
[StackTraceHidden]
private void ThrowIfDisposed()
{
if (_innerStream is null)
{
ThrowObjectDisposedException();
}
}
[DoesNotReturn]
[StackTraceHidden]
private void ThrowObjectDisposedException()
{
throw new ObjectDisposedException(
objectName: GetType().Name,
message: SR.ObjectDisposed_StreamClosed);
}
public override void Write(byte[] buffer, int offset, int count)
{
if (buffer is null)
{
throw new ArgumentNullException(nameof(buffer));
}
Write(new ReadOnlySpan<byte>(buffer, offset, count));
}
public override void Write(ReadOnlySpan<byte> buffer)
{
EnsurePreWriteConditions();
int rentalLength = Math.Clamp(buffer.Length, MinWriteRentedArraySize, MaxWriteRentedArraySize);
char[] scratchChars = ArrayPool<char>.Shared.Rent(rentalLength);
byte[] scratchBytes = ArrayPool<byte>.Shared.Rent(rentalLength);
try
{
bool decoderFinished, encoderFinished;
do
{
// convert bytes [this] -> chars
_thisDecoder.Convert(
bytes: buffer,
chars: scratchChars,
flush: false,
out int bytesConsumed,
out int charsWritten,
out decoderFinished);
buffer = buffer[bytesConsumed..];
// convert chars -> bytes [inner]
Span<char> decodedChars = scratchChars.AsSpan(..charsWritten);
do
{
_innerEncoder.Convert(
chars: decodedChars,
bytes: scratchBytes,
flush: false,
out int charsConsumed,
out int bytesWritten,
out encoderFinished);
decodedChars = decodedChars[charsConsumed..];
// It's more likely that the inner stream provides an optimized implementation of
// Write(byte[], ...) over Write(ROS<byte>), so we'll prefer the byte[]-based overloads.
_innerStream.Write(scratchBytes, 0, bytesWritten);
} while (!encoderFinished);
} while (!decoderFinished);
}
finally
{
ArrayPool<char>.Shared.Return(scratchChars);
ArrayPool<byte>.Shared.Return(scratchBytes);
}
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer is null)
{
throw new ArgumentNullException(nameof(buffer));
}
return WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();
}
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
{
EnsurePreWriteConditions();
if (cancellationToken.IsCancellationRequested)
{
return new ValueTask(Task.FromCanceled(cancellationToken));
}
return WriteAsyncCore(buffer, cancellationToken);
async ValueTask WriteAsyncCore(ReadOnlyMemory<byte> remainingOuterEncodedBytes, CancellationToken cancellationToken)
{
int rentalLength = Math.Clamp(remainingOuterEncodedBytes.Length, MinWriteRentedArraySize, MaxWriteRentedArraySize);
char[] scratchChars = ArrayPool<char>.Shared.Rent(rentalLength);
byte[] scratchBytes = ArrayPool<byte>.Shared.Rent(rentalLength);
try
{
bool decoderFinished, encoderFinished;
do
{
// convert bytes [this] -> chars
_thisDecoder.Convert(
bytes: remainingOuterEncodedBytes.Span,
chars: scratchChars,
flush: false,
out int bytesConsumed,
out int charsWritten,
out decoderFinished);
remainingOuterEncodedBytes = remainingOuterEncodedBytes[bytesConsumed..];
// convert chars -> bytes [inner]
ArraySegment<char> decodedChars = new ArraySegment<char>(scratchChars, 0, charsWritten);
do
{
_innerEncoder.Convert(
chars: decodedChars,
bytes: scratchBytes,
flush: false,
out int charsConsumed,
out int bytesWritten,
out encoderFinished);
decodedChars = decodedChars[charsConsumed..];
await _innerStream.WriteAsync(new ReadOnlyMemory<byte>(scratchBytes, 0, bytesWritten), cancellationToken).ConfigureAwait(false);
} while (!encoderFinished);
} while (!decoderFinished);
}
finally
{
ArrayPool<char>.Shared.Return(scratchChars);
ArrayPool<byte>.Shared.Return(scratchBytes);
}
}
}
public override void WriteByte(byte value)
=> Write(MemoryMarshal.CreateReadOnlySpan(ref value, 1));
}
}

View file

@ -10193,6 +10193,7 @@ namespace System.Text
public virtual object Clone() { throw null; }
public static byte[] Convert(System.Text.Encoding srcEncoding, System.Text.Encoding dstEncoding, byte[] bytes) { throw null; }
public static byte[] Convert(System.Text.Encoding srcEncoding, System.Text.Encoding dstEncoding, byte[] bytes, int index, int count) { throw null; }
public static System.IO.Stream CreateTranscodingStream(System.IO.Stream innerStream, System.Text.Encoding innerStreamEncoding, System.Text.Encoding outerStreamEncoding, bool leaveOpen = false) { throw null; }
public override bool Equals(object? value) { throw null; }
[System.CLSCompliantAttribute(false)]
public unsafe virtual int GetByteCount(char* chars, int count) { throw null; }

View file

@ -0,0 +1,897 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Moq;
using Xunit;
namespace System.Text.Tests
{
public class TranscodingStreamTests
{
public static IEnumerable<object[]> ReadWriteTestBufferLengths
{
get
{
yield return new object[] { 1 };
yield return new object[] { 4 * 1024 };
yield return new object[] { 128 * 1024 };
yield return new object[] { 2 * 1024 * 1024 };
}
}
[Fact]
public void AsyncMethods_ReturnCanceledTaskIfCancellationTokenTripped()
{
// Arrange
CancellationTokenSource cts = new CancellationTokenSource();
CancellationToken expectedCancellationToken = cts.Token;
cts.Cancel();
var innerStreamMock = new Mock<Stream>(MockBehavior.Strict); // only CanRead/CanWrite should ever be invoked
innerStreamMock.Setup(o => o.CanRead).Returns(true);
innerStreamMock.Setup(o => o.CanWrite).Returns(true);
Stream transcodingStream = Encoding.CreateTranscodingStream(innerStreamMock.Object, Encoding.UTF8, Encoding.UTF8, leaveOpen: true);
// Act & assert
RunTest(() => transcodingStream.ReadAsync(new byte[0], 0, 0, expectedCancellationToken));
RunTest(() => transcodingStream.ReadAsync(Memory<byte>.Empty, expectedCancellationToken).AsTask());
RunTest(() => transcodingStream.WriteAsync(new byte[0], 0, 0, expectedCancellationToken));
RunTest(() => transcodingStream.WriteAsync(ReadOnlyMemory<byte>.Empty, expectedCancellationToken).AsTask());
void RunTest(Func<Task> callback)
{
Task task = callback();
Assert.True(task.IsCanceled);
Assert.Equal(expectedCancellationToken, Assert.Throws<TaskCanceledException>(() => task.GetAwaiter().GetResult()).CancellationToken);
}
}
[Fact]
public void CreateTranscodingStream_InvalidArgs()
{
Assert.Throws<ArgumentNullException>("innerStream", () => Encoding.CreateTranscodingStream(null, Encoding.UTF8, Encoding.UTF8));
Assert.Throws<ArgumentNullException>("innerStreamEncoding", () => Encoding.CreateTranscodingStream(Stream.Null, null, Encoding.UTF8));
Assert.Throws<ArgumentNullException>("outerStreamEncoding", () => Encoding.CreateTranscodingStream(Stream.Null, Encoding.UTF8, null));
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public void CanRead_DelegatesToInnerStream(bool expectedCanRead)
{
// Arrange
var innerStreamMock = new Mock<Stream>();
innerStreamMock.Setup(o => o.CanRead).Returns(expectedCanRead);
Stream transcodingStream = Encoding.CreateTranscodingStream(innerStreamMock.Object, Encoding.UTF8, Encoding.UTF8, leaveOpen: true);
// Act
bool actualCanReadBeforeDispose = transcodingStream.CanRead;
transcodingStream.Dispose();
bool actualCanReadAfterDispose = transcodingStream.CanRead;
// Assert
Assert.Equal(expectedCanRead, actualCanReadBeforeDispose);
Assert.False(actualCanReadAfterDispose);
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public void CanWrite_DelegatesToInnerStream(bool expectedCanWrite)
{
// Arrange
var innerStreamMock = new Mock<Stream>();
innerStreamMock.Setup(o => o.CanWrite).Returns(expectedCanWrite);
Stream transcodingStream = Encoding.CreateTranscodingStream(innerStreamMock.Object, Encoding.UTF8, Encoding.UTF8, leaveOpen: true);
// Act
bool actualCanWriteBeforeDispose = transcodingStream.CanWrite;
transcodingStream.Dispose();
bool actualCanWriteAfterDispose = transcodingStream.CanWrite;
// Assert
Assert.Equal(expectedCanWrite, actualCanWriteBeforeDispose);
Assert.False(actualCanWriteAfterDispose);
}
[Fact]
public void Dispose_MakesMostSubsequentOperationsThrow()
{
// Arrange
MemoryStream innerStream = new MemoryStream();
Stream transcodingStream = Encoding.CreateTranscodingStream(innerStream, Encoding.UTF8, Encoding.UTF8, leaveOpen: true);
// Act
transcodingStream.Dispose();
// Assert
// For Task/ValueTask-returning methods, we want the exception to be thrown synchronously.
Assert.False(transcodingStream.CanRead);
Assert.False(transcodingStream.CanSeek);
Assert.False(transcodingStream.CanWrite);
Assert.Throws<ObjectDisposedException>(() => (object)transcodingStream.BeginRead(new byte[0], 0, 0, null, null));
Assert.Throws<ObjectDisposedException>(() => (object)transcodingStream.BeginWrite(new byte[0], 0, 0, null, null));
Assert.Throws<ObjectDisposedException>(() => transcodingStream.Flush());
Assert.Throws<ObjectDisposedException>(() => (object)transcodingStream.FlushAsync());
Assert.Throws<ObjectDisposedException>(() => (object)transcodingStream.Read(new byte[0], 0, 0));
Assert.Throws<ObjectDisposedException>(() => (object)transcodingStream.Read(Span<byte>.Empty));
Assert.Throws<ObjectDisposedException>(() => (object)transcodingStream.ReadAsync(new byte[0], 0, 0));
Assert.Throws<ObjectDisposedException>(() => (object)transcodingStream.ReadAsync(Memory<byte>.Empty));
Assert.Throws<ObjectDisposedException>(() => (object)transcodingStream.ReadByte());
Assert.Throws<ObjectDisposedException>(() => transcodingStream.Write(new byte[0], 0, 0));
Assert.Throws<ObjectDisposedException>(() => transcodingStream.Write(ReadOnlySpan<byte>.Empty));
Assert.Throws<ObjectDisposedException>(() => (object)transcodingStream.WriteAsync(new byte[0], 0, 0));
Assert.Throws<ObjectDisposedException>(() => (object)transcodingStream.WriteAsync(ReadOnlyMemory<byte>.Empty));
Assert.Throws<ObjectDisposedException>(() => transcodingStream.WriteByte((byte)'x'));
}
[Fact]
public void Dispose_WithLeaveOpenFalse_DisposesInnerStream()
{
// Sync
MemoryStream innerStream = new MemoryStream();
Stream transcodingStream = Encoding.CreateTranscodingStream(innerStream, Encoding.UTF8, Encoding.UTF8, leaveOpen: false);
transcodingStream.Dispose();
transcodingStream.Dispose(); // calling it a second time should no-op
Assert.Throws<ObjectDisposedException>(() => innerStream.Read(Span<byte>.Empty));
// Async
innerStream = new MemoryStream();
transcodingStream = Encoding.CreateTranscodingStream(innerStream, Encoding.UTF8, Encoding.UTF8, leaveOpen: false);
transcodingStream.DisposeAsync().GetAwaiter().GetResult();
transcodingStream.DisposeAsync().GetAwaiter().GetResult(); // calling it a second time should no-op
Assert.Throws<ObjectDisposedException>(() => innerStream.Read(Span<byte>.Empty));
}
[Fact]
public void Dispose_WithLeaveOpenTrue_DoesNotDisposeInnerStream()
{
// Sync
MemoryStream innerStream = new MemoryStream();
Stream transcodingStream = Encoding.CreateTranscodingStream(innerStream, Encoding.UTF8, Encoding.UTF8, leaveOpen: true);
transcodingStream.Dispose();
transcodingStream.Dispose(); // calling it a second time should no-op
innerStream.Read(Span<byte>.Empty); // shouldn't throw
// Async
innerStream = new MemoryStream();
transcodingStream = Encoding.CreateTranscodingStream(innerStream, Encoding.UTF8, Encoding.UTF8, leaveOpen: true);
transcodingStream.DisposeAsync().GetAwaiter().GetResult();
transcodingStream.DisposeAsync().GetAwaiter().GetResult(); // calling it a second time should no-op
innerStream.Read(Span<byte>.Empty); // shouldn't throw
}
[Fact]
public void Flush_FlushesInnerStreamButNotDecodedState()
{
// Arrange
CancellationToken expectedCancellationToken = new CancellationTokenSource().Token;
Task expectedFlushAsyncTask = Task.FromResult("just some task");
var innerStreamMock = new Mock<MemoryStream>() { CallBase = true };
innerStreamMock.Setup(o => o.FlushAsync(expectedCancellationToken)).Returns(expectedFlushAsyncTask);
Stream transcodingStream = Encoding.CreateTranscodingStream(innerStreamMock.Object, Encoding.UTF8, Encoding.UTF8, leaveOpen: true);
transcodingStream.Write(new byte[] { 0x7A, 0xE0 });
innerStreamMock.Verify(o => o.Flush(), Times.Never);
innerStreamMock.Verify(o => o.FlushAsync(It.IsAny<CancellationToken>()), Times.Never);
// Act & assert - sync flush
transcodingStream.Flush();
innerStreamMock.Verify(o => o.Flush(), Times.Once);
innerStreamMock.Verify(o => o.FlushAsync(It.IsAny<CancellationToken>()), Times.Never);
// Act & assert - async flush
// This also validates that we flowed the CancellationToken as expected
Task actualFlushAsyncReturnedTask = transcodingStream.FlushAsync(expectedCancellationToken);
Assert.Same(expectedFlushAsyncTask, actualFlushAsyncReturnedTask);
innerStreamMock.Verify(o => o.Flush(), Times.Once);
innerStreamMock.Verify(o => o.FlushAsync(expectedCancellationToken), Times.Once);
Assert.Equal("z", Encoding.UTF8.GetString(innerStreamMock.Object.ToArray())); // [ E0 ] shouldn't have been flushed
}
[Fact]
public void IdenticalInnerAndOuterEncodings_DoesNotActAsPassthrough()
{
// Test read
// [ C0 ] is never a valid UTF-8 byte, should be replaced with U+FFFD
MemoryStream innerStream = new MemoryStream(new byte[] { 0xC0 });
Stream transcodingStream = Encoding.CreateTranscodingStream(innerStream, Encoding.UTF8, Encoding.UTF8);
Assert.Equal(0xEF, transcodingStream.ReadByte());
Assert.Equal(0xBF, transcodingStream.ReadByte());
Assert.Equal(0xBD, transcodingStream.ReadByte());
Assert.Equal(-1 /* eof */, transcodingStream.ReadByte());
// Test write
innerStream = new MemoryStream();
transcodingStream = Encoding.CreateTranscodingStream(innerStream, Encoding.UTF8, Encoding.UTF8);
transcodingStream.WriteByte(0xC0);
Assert.Equal(new byte[] { 0xEF, 0xBF, 0xBD }, innerStream.ToArray());
}
[Theory]
[MemberData(nameof(ReadWriteTestBufferLengths))]
public void Read_ByteArray(int bufferLength)
{
// Tests TranscodingStream.Read(byte[], int, int)
byte[] buffer = new byte[bufferLength + 3];
RunReadTest((transcodingStream, sink) =>
{
int numBytesRead = transcodingStream.Read(buffer, 1, bufferLength);
Assert.True(numBytesRead >= 0);
Assert.True(numBytesRead <= bufferLength);
sink.Write(buffer, 1, numBytesRead);
return numBytesRead;
});
}
[Fact]
public void Read_ByteArray_WithInvalidArgs_Throws()
{
Stream transcodingStream = Encoding.CreateTranscodingStream(new MemoryStream(), Encoding.UTF8, Encoding.UTF8);
Assert.Throws<ArgumentNullException>("buffer", () => transcodingStream.Read(null, 0, 0));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.Read(new byte[5], -1, -1));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.Read(new byte[5], 3, -1));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.Read(new byte[5], 5, 1));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.Read(new byte[5], 6, -1));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.Read(new byte[5], 6, 0));
}
[Fact]
public void Read_ByteByByte()
{
// Tests TranscodingStream.ReadByte
RunReadTest((transcodingStream, sink) =>
{
int value = transcodingStream.ReadByte();
if (value < 0)
{
return 0;
}
sink.WriteByte(checked((byte)value));
return 1;
});
}
[Theory]
[MemberData(nameof(ReadWriteTestBufferLengths))]
public void Read_Span(int bufferLength)
{
// Tests TranscodingStream.Read(Span<byte>)
byte[] buffer = new byte[bufferLength];
RunReadTest((transcodingStream, sink) =>
{
int numBytesRead = transcodingStream.Read(buffer.AsSpan());
Assert.True(numBytesRead >= 0);
Assert.True(numBytesRead <= bufferLength);
sink.Write(buffer.AsSpan(0..numBytesRead));
return numBytesRead;
});
}
private void RunReadTest(Func<Stream, MemoryStream, int> callback)
{
MemoryStream sink = new MemoryStream();
MemoryStream innerStream = new MemoryStream();
Stream transcodingStream = Encoding.CreateTranscodingStream(innerStream,
innerStreamEncoding: Encoding.UTF8,
outerStreamEncoding: CustomAsciiEncoding);
// Test with a small string, then test with a large string
RunOneTestIteration(128);
RunOneTestIteration(10 * 1024 * 1024);
Assert.Equal(-1, transcodingStream.ReadByte()); // should've reached EOF
// Now put some invalid data into the inner stream as EOF.
innerStream.SetLength(0); // reset
innerStream.WriteByte(0xC0);
innerStream.Position = 0;
sink.SetLength(0); // reset
int numBytesReadJustNow;
do
{
numBytesReadJustNow = callback(transcodingStream, sink);
Assert.True(numBytesReadJustNow >= 0);
} while (numBytesReadJustNow > 0);
Assert.Equal("[FFFD]", ErrorCheckingAsciiEncoding.GetString(sink.ToArray()));
Assert.Equal(-1, transcodingStream.ReadByte()); // should've reached EOF
void RunOneTestIteration(int stringLength)
{
sink.SetLength(0); // reset
string expectedStringContents = GetVeryLongAsciiString(stringLength);
innerStream.SetLength(0); // reset
innerStream.Write(Encoding.UTF8.GetBytes(expectedStringContents));
innerStream.Position = 0;
int numBytesReadJustNow;
do
{
numBytesReadJustNow = callback(transcodingStream, sink);
Assert.True(numBytesReadJustNow >= 0);
} while (numBytesReadJustNow > 0);
Assert.Equal(expectedStringContents, ErrorCheckingAsciiEncoding.GetString(sink.ToArray()));
}
}
[Fact]
public Task ReadApm()
{
// Tests TranscodingStream.BeginRead / EndRead
byte[] buffer = new byte[1024 * 1024];
return RunReadTestAsync((transcodingStream, cancellationToken, sink) =>
{
TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();
object expectedState = new object();
try
{
IAsyncResult asyncResult = transcodingStream.BeginRead(buffer, 1, buffer.Length - 2, (asyncResult) =>
{
try
{
int numBytesReadJustNow = transcodingStream.EndRead(asyncResult);
Assert.True(numBytesReadJustNow >= 0);
Assert.True(numBytesReadJustNow < buffer.Length - 3);
sink.Write(buffer, 1, numBytesReadJustNow);
tcs.SetResult(numBytesReadJustNow);
}
catch (Exception ex)
{
tcs.SetException(ex);
}
}, expectedState);
Assert.Same(expectedState, asyncResult.AsyncState);
}
catch (Exception ex)
{
tcs.SetException(ex);
}
return new ValueTask<int>(tcs.Task);
},
suppressExpectedCancellationTokenAsserts: true); // APM pattern doesn't allow flowing CancellationToken
}
[Theory]
[MemberData(nameof(ReadWriteTestBufferLengths))]
public Task ReadAsync_ByteArray(int bufferLength)
{
// Tests TranscodingStream.ReadAsync(byte[], int, int, CancellationToken)
byte[] buffer = new byte[bufferLength + 3];
return RunReadTestAsync(async (transcodingStream, cancellationToken, sink) =>
{
int numBytesRead = await transcodingStream.ReadAsync(buffer, 1, bufferLength, cancellationToken);
Assert.True(numBytesRead >= 0);
Assert.True(numBytesRead <= bufferLength);
sink.Write(buffer, 1, numBytesRead);
return numBytesRead;
});
}
[Theory]
[MemberData(nameof(ReadWriteTestBufferLengths))]
public async Task ReadAsync_Memory(int bufferLength)
{
// Tests TranscodingStream.ReadAsync(Memory<byte>, CancellationToken)
byte[] buffer = new byte[bufferLength];
await RunReadTestAsync(async (transcodingStream, cancellationToken, sink) =>
{
int numBytesRead = await transcodingStream.ReadAsync(buffer.AsMemory(), cancellationToken);
Assert.True(numBytesRead >= 0);
Assert.True(numBytesRead <= bufferLength);
sink.Write(buffer.AsSpan(0..numBytesRead));
return numBytesRead;
});
}
[Fact]
public void ReadAsync_WithInvalidArgs_Throws()
{
Stream transcodingStream = Encoding.CreateTranscodingStream(new MemoryStream(), Encoding.UTF8, Encoding.UTF8);
Assert.Throws<ArgumentNullException>("buffer", () => (object)transcodingStream.ReadAsync(null, 0, 0));
Assert.Throws<ArgumentOutOfRangeException>(() => (object)transcodingStream.ReadAsync(new byte[5], -1, -1));
Assert.Throws<ArgumentOutOfRangeException>(() => (object)transcodingStream.ReadAsync(new byte[5], 3, -1));
Assert.Throws<ArgumentOutOfRangeException>(() => (object)transcodingStream.ReadAsync(new byte[5], 5, 1));
Assert.Throws<ArgumentOutOfRangeException>(() => (object)transcodingStream.ReadAsync(new byte[5], 6, -1));
Assert.Throws<ArgumentOutOfRangeException>(() => (object)transcodingStream.ReadAsync(new byte[5], 6, 0));
}
[Fact]
public void ReadApm_WithInvalidArgs_Throws()
{
Stream transcodingStream = Encoding.CreateTranscodingStream(new MemoryStream(), Encoding.UTF8, Encoding.UTF8);
Assert.Throws<ArgumentNullException>("buffer", () => transcodingStream.BeginRead(null, 0, 0, null, null));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.BeginRead(new byte[5], -1, -1, null, null));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.BeginRead(new byte[5], 3, -1, null, null));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.BeginRead(new byte[5], 5, 1, null, null));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.BeginRead(new byte[5], 6, -1, null, null));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.BeginRead(new byte[5], 6, 0, null, null));
}
private async Task RunReadTestAsync(Func<Stream, CancellationToken, MemoryStream, ValueTask<int>> callback, bool suppressExpectedCancellationTokenAsserts = false)
{
CancellationToken expectedCancellationToken = new CancellationTokenSource().Token;
MemoryStream sink = new MemoryStream();
MemoryStream innerStream = new MemoryStream();
var delegatingInnerStreamMock = new Mock<Stream>(MockBehavior.Strict);
delegatingInnerStreamMock.Setup(o => o.CanRead).Returns(true);
if (suppressExpectedCancellationTokenAsserts)
{
delegatingInnerStreamMock.Setup(o => o.ReadAsync(It.IsAny<Memory<byte>>(), It.IsAny<CancellationToken>()))
.Returns<Memory<byte>, CancellationToken>(innerStream.ReadAsync);
}
else
{
delegatingInnerStreamMock.Setup(o => o.ReadAsync(It.IsAny<Memory<byte>>(), expectedCancellationToken))
.Returns<Memory<byte>, CancellationToken>(innerStream.ReadAsync);
}
Stream transcodingStream = Encoding.CreateTranscodingStream(
innerStream: delegatingInnerStreamMock.Object,
innerStreamEncoding: Encoding.UTF8,
outerStreamEncoding: CustomAsciiEncoding);
// Test with a small string, then test with a large string
await RunOneTestIteration(128);
await RunOneTestIteration(10 * 1024 * 1024);
Assert.Equal(-1, await transcodingStream.ReadByteAsync(expectedCancellationToken)); // should've reached EOF
// Now put some invalid data into the inner stream as EOF.
innerStream.SetLength(0); // reset
innerStream.WriteByte(0xC0);
innerStream.Position = 0;
sink.SetLength(0); // reset
int numBytesReadJustNow;
do
{
numBytesReadJustNow = await callback(transcodingStream, expectedCancellationToken, sink);
Assert.True(numBytesReadJustNow >= 0);
} while (numBytesReadJustNow > 0);
Assert.Equal("[FFFD]", ErrorCheckingAsciiEncoding.GetString(sink.ToArray()));
Assert.Equal(-1, await transcodingStream.ReadByteAsync(expectedCancellationToken)); // should've reached EOF
async Task RunOneTestIteration(int stringLength)
{
sink.SetLength(0); // reset
string expectedStringContents = GetVeryLongAsciiString(stringLength);
innerStream.SetLength(0); // reset
innerStream.Write(Encoding.UTF8.GetBytes(expectedStringContents));
innerStream.Position = 0;
int numBytesReadJustNow;
do
{
numBytesReadJustNow = await callback(transcodingStream, expectedCancellationToken, sink);
Assert.True(numBytesReadJustNow >= 0);
} while (numBytesReadJustNow > 0);
Assert.Equal(expectedStringContents, ErrorCheckingAsciiEncoding.GetString(sink.ToArray()));
}
}
[Fact]
public void ReadTimeout_WriteTimeout_NotSupported()
{
// Arrange - allow inner stream to support ReadTimeout + WriteTimeout
var innerStreamMock = new Mock<Stream>();
innerStreamMock.SetupProperty(o => o.ReadTimeout);
innerStreamMock.SetupProperty(o => o.WriteTimeout);
Stream transcodingStream = Encoding.CreateTranscodingStream(Stream.Null, Encoding.UTF8, Encoding.UTF8, leaveOpen: true);
// Act & assert - TranscodingStream shouldn't support ReadTimeout + WriteTimeout
Assert.False(transcodingStream.CanTimeout);
Assert.Throws<InvalidOperationException>(() => transcodingStream.ReadTimeout);
Assert.Throws<InvalidOperationException>(() => transcodingStream.ReadTimeout = 42);
Assert.Throws<InvalidOperationException>(() => transcodingStream.WriteTimeout);
Assert.Throws<InvalidOperationException>(() => transcodingStream.WriteTimeout = 42);
}
[Fact]
public void Seek_AlwaysThrows()
{
// MemoryStream is seekable, but we're not
Stream transcodingStream = Encoding.CreateTranscodingStream(new MemoryStream(), Encoding.UTF8, Encoding.UTF8);
Assert.False(transcodingStream.CanSeek);
Assert.Throws<NotSupportedException>(() => transcodingStream.Length);
Assert.Throws<NotSupportedException>(() => transcodingStream.Position);
Assert.Throws<NotSupportedException>(() => transcodingStream.Position = 0);
Assert.Throws<NotSupportedException>(() => transcodingStream.Seek(0, SeekOrigin.Current));
Assert.Throws<NotSupportedException>(() => transcodingStream.SetLength(0));
}
[Fact]
public void Write()
{
MemoryStream innerStream = new MemoryStream();
Stream transcodingStream = Encoding.CreateTranscodingStream(
innerStream,
innerStreamEncoding: ErrorCheckingUnicodeEncoding /* throws on error */,
outerStreamEncoding: Encoding.UTF8 /* performs substition */,
leaveOpen: true);
// First, test Write(byte[], int, int)
transcodingStream.Write(Encoding.UTF8.GetBytes("abcdefg"), 2, 3);
Assert.Equal("cde", ErrorCheckingUnicodeEncoding.GetString(innerStream.ToArray()));
// Then test WriteByte(byte)
transcodingStream.WriteByte((byte)'z');
Assert.Equal("cdez", ErrorCheckingUnicodeEncoding.GetString(innerStream.ToArray()));
// We'll write U+00E0 (utf-8: [C3 A0]) byte-by-byte.
// We shouldn't flush any intermediate bytes.
transcodingStream.WriteByte((byte)0xC3);
Assert.Equal("cdez", ErrorCheckingUnicodeEncoding.GetString(innerStream.ToArray()));
transcodingStream.WriteByte((byte)0xA0);
Assert.Equal("cdez\u00E0", ErrorCheckingUnicodeEncoding.GetString(innerStream.ToArray()));
innerStream.SetLength(0); // reset inner stream
// Then test Write(ROS<byte>), once with a short string and once with a long string
string asciiString = GetVeryLongAsciiString(128);
byte[] asciiBytesAsUtf8 = Encoding.UTF8.GetBytes(asciiString);
transcodingStream.Write(asciiBytesAsUtf8.AsSpan());
Assert.Equal(asciiString, ErrorCheckingUnicodeEncoding.GetString(innerStream.ToArray()));
innerStream.SetLength(0); // reset inner stream
asciiString = GetVeryLongAsciiString(16 * 1024 * 1024);
asciiBytesAsUtf8 = Encoding.UTF8.GetBytes(asciiString);
transcodingStream.Write(asciiBytesAsUtf8.AsSpan());
Assert.Equal(asciiString, ErrorCheckingUnicodeEncoding.GetString(innerStream.ToArray()));
innerStream.SetLength(0); // reset inner stream
// Close the outer stream and ensure no leftover data was written to the inner stream
transcodingStream.Close();
Assert.Equal(0, innerStream.Position);
}
[Fact]
public void Write_WithPartialData()
{
MemoryStream innerStream = new MemoryStream();
Stream transcodingStream = Encoding.CreateTranscodingStream(
innerStream,
innerStreamEncoding: CustomAsciiEncoding /* performs custom substitution */,
outerStreamEncoding: Encoding.UTF8 /* performs U+FFFD substition */,
leaveOpen: true);
// First, write some incomplete data
transcodingStream.Write(new byte[] { 0x78, 0x79, 0x7A, 0xC3 }); // [C3] shouldn't be flushed yet
Assert.Equal("xyz", ErrorCheckingAsciiEncoding.GetString(innerStream.ToArray()));
// Flushing should have no effect
transcodingStream.Flush();
Assert.Equal("xyz", ErrorCheckingAsciiEncoding.GetString(innerStream.ToArray()));
// Provide the second byte of the multi-byte sequence
transcodingStream.WriteByte(0xA0); // [C3 A0] = U+00E0
Assert.Equal("xyz[00E0]", ErrorCheckingAsciiEncoding.GetString(innerStream.ToArray()));
// Provide an incomplete sequence, then close the stream.
// Closing the stream should flush the underlying buffers and write the replacement char.
transcodingStream.Write(new byte[] { 0xE0, 0xBF }); // first 2 bytes of incomplete 3-byte sequence
Assert.Equal("xyz[00E0]", ErrorCheckingAsciiEncoding.GetString(innerStream.ToArray())); // wasn't flushed yet
transcodingStream.Close();
Assert.Equal("xyz[00E0][FFFD]", ErrorCheckingAsciiEncoding.GetString(innerStream.ToArray()));
}
[Fact]
public void Write_WithInvalidArgs_Throws()
{
Stream transcodingStream = Encoding.CreateTranscodingStream(new MemoryStream(), Encoding.UTF8, Encoding.UTF8);
Assert.Throws<ArgumentNullException>("buffer", () => transcodingStream.Write(null, 0, 0));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.Write(new byte[5], -1, -1));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.Write(new byte[5], 3, -1));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.Write(new byte[5], 5, 1));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.Write(new byte[5], 6, -1));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.Write(new byte[5], 6, 0));
}
[Fact]
public async Task WriteAsync()
{
MemoryStream sink = new MemoryStream();
CancellationToken expectedFlushAsyncCancellationToken = new CancellationTokenSource().Token;
CancellationToken expectedWriteAsyncCancellationToken = new CancellationTokenSource().Token;
var innerStreamMock = new Mock<Stream>(MockBehavior.Strict);
innerStreamMock.Setup(o => o.CanWrite).Returns(true);
innerStreamMock.Setup(o => o.WriteAsync(It.IsAny<ReadOnlyMemory<byte>>(), expectedWriteAsyncCancellationToken))
.Returns<ReadOnlyMemory<byte>, CancellationToken>(sink.WriteAsync);
innerStreamMock.Setup(o => o.FlushAsync(expectedFlushAsyncCancellationToken)).Returns(Task.CompletedTask);
Stream transcodingStream = Encoding.CreateTranscodingStream(
innerStreamMock.Object,
innerStreamEncoding: ErrorCheckingUnicodeEncoding,
outerStreamEncoding: Encoding.UTF8 /* performs U+FFFD substition */,
leaveOpen: true);
// First, test WriteAsync(byte[], int, int, CancellationToken)
await transcodingStream.WriteAsync(Encoding.UTF8.GetBytes("abcdefg"), 2, 3, expectedWriteAsyncCancellationToken);
Assert.Equal("cde", ErrorCheckingUnicodeEncoding.GetString(sink.ToArray()));
// We'll write U+00E0 (utf-8: [C3 A0]) byte-by-byte.
// We shouldn't flush any intermediate bytes.
await transcodingStream.WriteAsync(new byte[] { 0xC3, 0xA0 }, 0, 1, expectedWriteAsyncCancellationToken);
await transcodingStream.FlushAsync(expectedFlushAsyncCancellationToken);
Assert.Equal("cde", ErrorCheckingUnicodeEncoding.GetString(sink.ToArray()));
await transcodingStream.WriteAsync(new byte[] { 0xC3, 0xA0 }, 1, 1, expectedWriteAsyncCancellationToken);
Assert.Equal("cde\u00E0", ErrorCheckingUnicodeEncoding.GetString(sink.ToArray()));
sink.SetLength(0); // reset sink
// Then test WriteAsync(ROM<byte>, CancellationToken), once with a short string and once with a long string
string asciiString = GetVeryLongAsciiString(128);
byte[] asciiBytesAsUtf8 = Encoding.UTF8.GetBytes(asciiString);
await transcodingStream.WriteAsync(asciiBytesAsUtf8.AsMemory(), expectedWriteAsyncCancellationToken);
Assert.Equal(asciiString, ErrorCheckingUnicodeEncoding.GetString(sink.ToArray()));
sink.SetLength(0); // reset sink
asciiString = GetVeryLongAsciiString(16 * 1024 * 1024);
asciiBytesAsUtf8 = Encoding.UTF8.GetBytes(asciiString);
await transcodingStream.WriteAsync(asciiBytesAsUtf8.AsMemory(), expectedWriteAsyncCancellationToken);
Assert.Equal(asciiString, ErrorCheckingUnicodeEncoding.GetString(sink.ToArray()));
sink.SetLength(0); // reset sink
// Close the outer stream and ensure no leftover data was written to the inner stream
ValueTask actualDisposeTask = transcodingStream.DisposeAsync();
Assert.Equal(default(ValueTask), actualDisposeTask); // should've completed synchronously
Assert.Equal(0, sink.Position);
}
[Fact]
public async Task WriteAsync_WithPartialData()
{
MemoryStream sink = new MemoryStream();
CancellationToken expectedCancellationToken = new CancellationTokenSource().Token;
var innerStreamMock = new Mock<Stream>(MockBehavior.Strict);
innerStreamMock.Setup(o => o.CanWrite).Returns(true);
innerStreamMock.Setup(o => o.WriteAsync(It.IsAny<ReadOnlyMemory<byte>>(), expectedCancellationToken))
.Returns<ReadOnlyMemory<byte>, CancellationToken>(sink.WriteAsync);
Stream transcodingStream = Encoding.CreateTranscodingStream(
innerStreamMock.Object,
innerStreamEncoding: CustomAsciiEncoding /* performs custom substitution */,
outerStreamEncoding: Encoding.UTF8 /* performs U+FFFD substition */,
leaveOpen: true);
// First, write some incomplete data
await transcodingStream.WriteAsync(new byte[] { 0x78, 0x79, 0x7A, 0xC3 }, expectedCancellationToken); // [C3] shouldn't be flushed yet
Assert.Equal("xyz", ErrorCheckingAsciiEncoding.GetString(sink.ToArray()));
// Provide the second byte of the multi-byte sequence
await transcodingStream.WriteAsync(new byte[] { 0xA0 }, expectedCancellationToken); // [C3 A0] = U+00E0
Assert.Equal("xyz[00E0]", ErrorCheckingAsciiEncoding.GetString(sink.ToArray()));
// Provide an incomplete sequence, then close the stream.
// Closing the stream should flush the underlying buffers and write the replacement char.
await transcodingStream.WriteAsync(new byte[] { 0xE0, 0xBF }, expectedCancellationToken); // first 2 bytes of incomplete 3-byte sequence
Assert.Equal("xyz[00E0]", ErrorCheckingAsciiEncoding.GetString(sink.ToArray())); // wasn't flushed yet
// The call to DisposeAsync() will call innerStream.WriteAsync without a CancellationToken.
innerStreamMock.Setup(o => o.WriteAsync(It.IsAny<ReadOnlyMemory<byte>>(), CancellationToken.None))
.Returns<ReadOnlyMemory<byte>, CancellationToken>(sink.WriteAsync);
await transcodingStream.DisposeAsync();
Assert.Equal("xyz[00E0][FFFD]", ErrorCheckingAsciiEncoding.GetString(sink.ToArray()));
}
[Fact]
public void WriteAsync_WithInvalidArgs_Throws()
{
Stream transcodingStream = Encoding.CreateTranscodingStream(new MemoryStream(), Encoding.UTF8, Encoding.UTF8);
Assert.Throws<ArgumentNullException>("buffer", () => (object)transcodingStream.WriteAsync(null, 0, 0));
Assert.Throws<ArgumentOutOfRangeException>(() => (object)transcodingStream.WriteAsync(new byte[5], -1, -1));
Assert.Throws<ArgumentOutOfRangeException>(() => (object)transcodingStream.WriteAsync(new byte[5], 3, -1));
Assert.Throws<ArgumentOutOfRangeException>(() => (object)transcodingStream.WriteAsync(new byte[5], 5, 1));
Assert.Throws<ArgumentOutOfRangeException>(() => (object)transcodingStream.WriteAsync(new byte[5], 6, -1));
Assert.Throws<ArgumentOutOfRangeException>(() => (object)transcodingStream.WriteAsync(new byte[5], 6, 0));
}
[Fact]
public void WriteApm()
{
// Arrange
MemoryStream sink = new MemoryStream();
object expectedState = new object();
var innerStreamMock = new Mock<Stream>(MockBehavior.Strict);
innerStreamMock.Setup(o => o.CanWrite).Returns(true);
innerStreamMock.Setup(o => o.WriteAsync(It.IsAny<ReadOnlyMemory<byte>>(), CancellationToken.None))
.Returns<ReadOnlyMemory<byte>, CancellationToken>(sink.WriteAsync);
Stream transcodingStream = Encoding.CreateTranscodingStream(innerStreamMock.Object, Encoding.UTF8, Encoding.UTF8);
// Act
IAsyncResult asyncResult = transcodingStream.BeginWrite(Encoding.UTF8.GetBytes("abcdefg"), 1, 3, null, expectedState);
transcodingStream.EndWrite(asyncResult);
// Assert
Assert.Equal(expectedState, asyncResult.AsyncState);
Assert.Equal("bcd", Encoding.UTF8.GetString(sink.ToArray()));
}
[Fact]
public void WriteApm_WithInvalidArgs_Throws()
{
Stream transcodingStream = Encoding.CreateTranscodingStream(new MemoryStream(), Encoding.UTF8, Encoding.UTF8);
Assert.Throws<ArgumentNullException>("buffer", () => transcodingStream.BeginWrite(null, 0, 0, null, null));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.BeginWrite(new byte[5], -1, -1, null, null));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.BeginWrite(new byte[5], 3, -1, null, null));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.BeginWrite(new byte[5], 5, 1, null, null));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.BeginWrite(new byte[5], 6, -1, null, null));
Assert.Throws<ArgumentOutOfRangeException>(() => transcodingStream.BeginWrite(new byte[5], 6, 0, null, null));
}
// returns "abc...xyzabc...xyzabc..."
private static string GetVeryLongAsciiString(int length)
{
return string.Create(length, (object)null, (buffer, _) =>
{
for (int i = 0; i < buffer.Length; i++)
{
buffer[i] = (char)('a' + (i % 26));
}
});
}
// A custom ASCIIEncoding where both encoder + decoder fallbacks have been specified
private static readonly Encoding CustomAsciiEncoding = Encoding.GetEncoding(
"ascii", new CustomEncoderFallback(), new DecoderReplacementFallback("\uFFFD"));
private static readonly Encoding ErrorCheckingAsciiEncoding
= Encoding.GetEncoding("ascii", EncoderFallback.ExceptionFallback, DecoderFallback.ExceptionFallback);
private static readonly UnicodeEncoding ErrorCheckingUnicodeEncoding
= new UnicodeEncoding(bigEndian: false, byteOrderMark: false, throwOnInvalidBytes: true);
// A custom encoder fallback which substitutes unknown chars with "[xxxx]" (the code point as hex)
private sealed class CustomEncoderFallback : EncoderFallback
{
public override int MaxCharCount => 8; // = "[10FFFF]".Length
public override EncoderFallbackBuffer CreateFallbackBuffer()
{
return new CustomEncoderFallbackBuffer();
}
private sealed class CustomEncoderFallbackBuffer : EncoderFallbackBuffer
{
private string _remaining = string.Empty;
private int _remainingIdx = 0;
public override int Remaining => _remaining.Length - _remainingIdx;
public override bool Fallback(char charUnknownHigh, char charUnknownLow, int index)
=> FallbackCommon((uint)char.ConvertToUtf32(charUnknownHigh, charUnknownLow));
public override bool Fallback(char charUnknown, int index)
=> FallbackCommon(charUnknown);
private bool FallbackCommon(uint codePoint)
{
Assert.True(codePoint <= 0x10FFFF);
_remaining = FormattableString.Invariant($"[{codePoint:X4}]");
_remainingIdx = 0;
return true;
}
public override char GetNextChar()
{
return (_remainingIdx < _remaining.Length)
? _remaining[_remainingIdx++]
: '\0' /* end of string reached */;
}
public override bool MovePrevious()
{
if (_remainingIdx == 0)
{
return false;
}
_remainingIdx--;
return true;
}
}
}
}
}

View file

@ -34,6 +34,7 @@
<Compile Include="Encoding\EncodingGetEncodingTests.cs" />
<Compile Include="Encoding\EncodingConvertTests.cs" />
<Compile Include="Encoding\EncodingVirtualTests.cs" />
<Compile Include="Encoding\TranscodingStreamTests.cs" />
<Compile Include="Fallback\DecoderReplacementFallbackTests.cs" />
<Compile Include="Fallback\EncoderReplacementFallbackTests.cs" />
<Compile Include="Fallback\EncoderExceptionFallbackTests.cs" />
@ -77,4 +78,7 @@
<Compile Include="Decoder\Decoder.cs" />
<Compile Include="Encoder\Encoder.cs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Moq" Version="$(MoqVersion)" />
</ItemGroup>
</Project>