1
0
Fork 0
mirror of https://github.com/VSadov/Satori.git synced 2025-06-08 03:27:04 +09:00

some improvements in concurrent dictionary

This commit is contained in:
vsadov 2022-12-23 11:29:59 -08:00
parent 0792c5363b
commit 0672c76a63
11 changed files with 192 additions and 142 deletions

View file

@ -1765,7 +1765,7 @@ dotnet_diagnostic.IDE0071.severity = warning
dotnet_diagnostic.IDE0072.severity = silent
# IDE0073: The file header is missing or not located at the top of the file
dotnet_diagnostic.IDE0073.severity = warning
dotnet_diagnostic.IDE0073.severity = silent
# IDE0074: Use compound assignment
dotnet_diagnostic.IDE0074.severity = warning

View file

@ -29,6 +29,7 @@ namespace System.Collections.Concurrent
[DebuggerDisplay("Count = {Count}")]
public class ConcurrentDictionary<TKey, TValue> : IDictionary<TKey, TValue>, IDictionary, IReadOnlyDictionary<TKey, TValue> where TKey : notnull
{
internal readonly bool valueIsValueType = typeof(TValue).IsValueType;
internal DictionaryImpl<TKey, TValue> _table;
internal uint _lastResizeTickMillis;
internal object _sweeperInstance;
@ -138,26 +139,6 @@ namespace System.Collections.Concurrent
}
}
// We want to call DictionaryImpl.CreateRef<TKey, TValue>(topDict, capacity)
// TKey is a reference type, but that is not statically known, so
// we use the following to get around "as class" contraint.
internal static Func<ConcurrentDictionary<TKey, TValue>, int, DictionaryImpl<TKey, TValue>> CreateRefUnsafe =
(ConcurrentDictionary<TKey, TValue> topDict, int capacity) =>
{
var method = typeof(DictionaryImpl).
GetMethod("CreateRef", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Static).
MakeGenericMethod(new Type[] { typeof(TKey), typeof(TValue) });
var del = (Func<ConcurrentDictionary<TKey, TValue>, int, DictionaryImpl<TKey, TValue>>)Delegate.CreateDelegate(
typeof(Func<ConcurrentDictionary<TKey, TValue>, int, DictionaryImpl<TKey, TValue>>),
method);
var result = del(topDict, capacity);
CreateRefUnsafe = del;
return result;
};
/// <summary>
/// Initializes a new instance of the <see cref="ConcurrentDictionary{TKey,TValue}"/>
/// class that is empty, has the specified concurrency level, has the specified initial capacity, and
@ -181,7 +162,7 @@ namespace System.Collections.Concurrent
if (!typeof(TKey).IsValueType)
{
_table = CreateRefUnsafe(this, capacity);
_table = new DictionaryImplRef<TKey, TKey, TValue>(capacity, this);
_table._keyComparer = comparer ?? EqualityComparer<TKey>.Default;
return;
}
@ -264,7 +245,10 @@ namespace System.Collections.Concurrent
/// <exception cref="ArgumentNullException"><paramref name="key"/> is a null reference (Nothing in Visual Basic).</exception>
public bool ContainsKey(TKey key) => TryGetValue(key, out _);
return _table.TryGetValue(key, out _);
object oldValObj = _table.TryGetValue(key);
Debug.Assert(!(oldValObj is Prime));
return oldValObj != null;
}
/// <summary>
@ -315,6 +299,31 @@ namespace System.Collections.Concurrent
return _table.RemoveIfMatch(item.Key, ref oldVal, ValueMatch.OldValue);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private TValue FromObjectValue(object obj)
{
// regular value type
if (default(TValue) != null)
{
return Unsafe.As<Boxed<TValue>>(obj).Value;
}
// null
if (obj == NULLVALUE)
{
return default(TValue);
}
// ref type
if (!valueIsValueType)
{
return Unsafe.As<object, TValue>(ref obj);
}
// nullable
return (TValue)obj;
}
/// <summary>
/// Attempts to get the value associated with the specified key from the <see cref="ConcurrentDictionary{TKey,TValue}"/>.
/// </summary>
@ -333,7 +342,20 @@ namespace System.Collections.Concurrent
ThrowHelper.ThrowKeyNullException();
}
return _table.TryGetValue(key, out value);
object oldValObj = _table.TryGetValue(key);
Debug.Assert(!(oldValObj is Prime));
if (oldValObj != null)
{
value = FromObjectValue(oldValObj);
return true;
}
else
{
value = default(TValue);
return false;
}
}
}
{
@ -527,30 +549,41 @@ namespace System.Collections.Concurrent
public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
{
return new SnapshotEnumerator(_table.GetSnapshot());
else
{
var newNode = new Node(node._key, value, hashcode, node._next);
if (prev is null)
{
Volatile.Write(ref bucket, newNode);
}
else
{
prev._next = newNode;
}
}
resultingValue = value;
}
else
{
resultingValue = node._value;
}
return false;
}
prev = node;
}
if (index < 0)
/// <summary>Gets or sets the value associated with the specified key.</summary>
/// <param name="key">The key of the value to get or set.</param>
/// <value>
/// The value associated with the specified key. If the specified key is not found, a get operation throws a
/// <see cref="KeyNotFoundException"/>, and a set operation creates a new element with the specified key.
/// </value>
/// <exception cref="ArgumentNullException">
/// <paramref name="key"/> is a null reference (Nothing in Visual Basic).
/// </exception>
/// <exception cref="KeyNotFoundException">
/// The property is retrieved and <paramref name="key"/> does not exist in the collection.
/// </exception>
public TValue this[TKey key]
{
get
{
if (key is null)
{
ThrowHelper.ThrowKeyNullException();
}
object oldValObj = _table.TryGetValue(key);
Debug.Assert(!(oldValObj is Prime));
if (oldValObj != null)
{
return FromObjectValue(oldValObj);
}
ThrowKeyNotFoundException(key);
// call above does not return
while (true) ;
}
set
{
throw new ArgumentOutOfRangeException(nameof(index), SR.ConcurrentDictionary_IndexIsNegative);
}
@ -755,11 +788,27 @@ namespace System.Collections.Concurrent
TValue tValue2;
int hashcode = comparer is null ? key.GetHashCode() : comparer.GetHashCode(key);
TValue tValue;
if (this.TryGetValue(key, out tValue))
object oldValObj = _table.TryGetValue(key);
Debug.Assert(!(oldValObj is Prime));
if (oldValObj != null)
{
tValue2 = updateValueFactory(key, tValue, factoryArgument);
if (this.TryUpdate(key, tValue2, tValue))
return FromObjectValue(oldValObj);
}
else
{
TValue newValue = valueFactory(key, factoryArgument);
TValue oldVal = default;
if (_table.PutIfMatch(key, newValue, ref oldVal, ValueMatch.NullOrDead))
{
return newValue;
}
else
{
return oldVal;
}
}
}
/// <summary>
/// Adds a key/value pair to the <see cref="ConcurrentDictionary{TKey,TValue}"/>

View file

@ -88,8 +88,10 @@ namespace System.Collections.Concurrent
}
_curKey = _table.keyFromEntry(nextKstore);
if (_table.TryGetValue(_curKey, out _curValue))
object nextV = _table.TryGetValue(_curKey);
if (nextV != null)
{
_curValue = _table.FromObjectValue(nextV);
return true;
}
}

View file

@ -76,11 +76,13 @@ namespace System.Collections.Concurrent
// the reprobe limit on a 'get' call acts as a 'miss'; on a 'put' call it
// can trigger a table resize. Several places must have exact agreement on
// what the reprobe_limit is, so we share it here.
protected const int REPROBE_LIMIT = 4;
protected const int REPROBE_LIMIT_SHIFT = 8;
protected static int ReprobeLimit(int lenMask)
{
// limit to 4 reprobes on small tables, but allow more in larger ones
// to handle gracefully cases with poor hash functions.
return 4 + (lenMask >> 256);
// 1/2 of table with some extra
return REPROBE_LIMIT + (lenMask >> REPROBE_LIMIT_SHIFT);
}
protected static bool EntryValueNullOrDead(object entryValue)
@ -94,7 +96,7 @@ namespace System.Collections.Concurrent
var h = (uint)fullHash;
// xor-shift some upper bits down, in case if variations are mostly in high bits
// and scatter the bits a little to break up clusters if hahses are periodic (like 42, 43, 44, ...)
// and scatter the bits a little to break up clusters if hashes are periodic (like 42, 43, 44, ...)
// long clusters can cause long reprobes. small clusters are ok though.
h ^= h >> 15;
h ^= h >> 8;
@ -113,12 +115,5 @@ namespace System.Collections.Concurrent
return (object)value ?? NULLVALUE;
}
internal static DictionaryImpl<TKey, TValue> CreateRef<TKey, TValue>(ConcurrentDictionary<TKey, TValue> topDict, int capacity)
where TKey : class
{
var result = new DictionaryImplRef<TKey, TKey, TValue>(capacity, topDict);
return result;
}
}
}

View file

@ -141,9 +141,9 @@ namespace System.Collections.Concurrent
}
// inline the base implementation to devirtualize calls to hash and keyEqual
internal override bool TryGetValue(int key, out TValue value)
internal override object TryGetValue(int key)
{
return base.TryGetValue(key, out value);
return base.TryGetValue(key);
}
protected override int hash(int key)

View file

@ -141,9 +141,9 @@ namespace System.Collections.Concurrent
}
// inline the base implementation to devirtualize calls to hash and keyEqual
internal override bool TryGetValue(long key, out TValue value)
internal override object TryGetValue(long key)
{
return base.TryGetValue(key, out value);
return base.TryGetValue(key);
}
protected override int hash(long key)

View file

@ -141,9 +141,9 @@ namespace System.Collections.Concurrent
}
// inline the base implementation to devirtualize calls to hash and keyEqual
internal override bool TryGetValue(nint key, out TValue value)
internal override object TryGetValue(nint key)
{
return base.TryGetValue(key, out value);
return base.TryGetValue(key);
}
protected override int hash(nint key)

View file

@ -31,21 +31,23 @@ using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using Internal.Runtime.CompilerServices;
namespace System.Collections.Concurrent
{
internal sealed class DictionaryImplRef<TKey, TKeyStore, TValue>
: DictionaryImpl<TKey, TKey, TValue>
where TKey : class
{
internal DictionaryImplRef(int capacity, ConcurrentDictionary<TKey, TValue> topDict)
: base(capacity, topDict)
{
Debug.Assert(!typeof(TKey).IsValueType);
}
internal DictionaryImplRef(int capacity, DictionaryImplRef<TKey, TKeyStore, TValue> other)
: base(capacity, other)
{
Debug.Assert(!typeof(TKey).IsValueType);
}
protected override bool TryClaimSlotForPut(ref TKey entryKey, TKey key)
@ -60,10 +62,11 @@ namespace System.Collections.Concurrent
private bool TryClaimSlot(ref TKey entryKey, TKey key)
{
var entryKeyValue = entryKey;
ref object keyLocation = ref Unsafe.As<TKey, object>(ref entryKey);
object entryKeyValue = keyLocation;
if (entryKeyValue == null)
{
entryKeyValue = Interlocked.CompareExchange(ref entryKey, key, null);
entryKeyValue = Interlocked.CompareExchange(ref keyLocation, key, null);
if (entryKeyValue == null)
{
// claimed a new slot
@ -72,13 +75,14 @@ namespace System.Collections.Concurrent
}
}
return key == entryKeyValue || _keyComparer.Equals(key, entryKeyValue);
return (object)key == entryKeyValue ||
_keyComparer.Equals(key, Unsafe.As<object, TKey>(ref entryKeyValue));
}
// inline the base implementation to devirtualize calls to hash and keyEqual
internal override bool TryGetValue(TKey key, out TValue value)
internal override object TryGetValue(TKey key)
{
return base.TryGetValue(key, out value);
return base.TryGetValue(key);
}
protected override int hash(TKey key)
@ -88,7 +92,7 @@ namespace System.Collections.Concurrent
protected override bool keyEqual(TKey key, TKey entryKey)
{
if (key == entryKey)
if ((object)key == (object)entryKey)
{
return true;
}

View file

@ -31,12 +31,14 @@ using System.Collections;
using System.Collections.Generic;
using System.Reflection;
using Internal.Runtime.CompilerServices;
using System.Runtime.CompilerServices;
namespace System.Collections.Concurrent
{
internal abstract class DictionaryImpl<TKey, TValue>
: DictionaryImpl
{
internal readonly bool valueIsValueType = typeof(TValue).IsValueType;
internal IEqualityComparer<TKey> _keyComparer;
internal DictionaryImpl() { }
@ -44,7 +46,7 @@ namespace System.Collections.Concurrent
internal abstract void Clear();
internal abstract int Count { get; }
internal abstract bool TryGetValue(TKey key, out TValue value);
internal abstract object TryGetValue(TKey key);
internal abstract bool PutIfMatch(TKey key, TValue newVal, ref TValue oldValue, ValueMatch match);
internal abstract bool RemoveIfMatch(TKey key, ref TValue oldValue, ValueMatch match);
internal abstract TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory);
@ -77,5 +79,30 @@ namespace System.Collections.Concurrent
}
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected TValue FromObjectValue(object obj)
{
// regular value type
if (default(TValue) != null)
{
return Unsafe.As<Boxed<TValue>>(obj).Value;
}
// null
if (obj == NULLVALUE)
{
return default(TValue);
}
// ref type
if (!valueIsValueType)
{
return Unsafe.As<object, TValue>(ref obj);
}
// nullable
return (TValue)obj;
}
}
}

View file

@ -217,7 +217,7 @@ namespace System.Collections.Concurrent
/// otherwise returns the actual value or NULLVALUE if null is the actual value
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal override bool TryGetValue(TKey key, out TValue value)
internal override object TryGetValue(TKey key)
{
int fullHash = this.hash(key);
var curTable = this;
@ -256,8 +256,7 @@ namespace System.Collections.Concurrent
if ((curTable._newTable == null && entryValue != TOMBPRIME) ||
entryValue.GetType() != typeof(Prime))
{
value = FromObjectValue(entryValue);
return true;
return entryValue;
}
// found a prime, that means the copying or sweeping has started
@ -295,8 +294,7 @@ namespace System.Collections.Concurrent
idx = (idx + reprobeCount) & lenMask;
}
value = default;
return false;
return null;
}
/// <summary>
@ -355,7 +353,7 @@ namespace System.Collections.Concurrent
}
// no new table, so this is a miss
break;
goto FAILED;
}
// quadratic reprobing
@ -940,31 +938,6 @@ namespace System.Collections.Concurrent
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal TValue FromObjectValue(object obj)
{
// regular value type
if (default(TValue) != null)
{
return Unsafe.As<Boxed<TValue>>(obj).Value;
}
// null
if (obj == NULLVALUE)
{
return default(TValue);
}
// ref type
if (!typeof(TValue).IsValueType)
{
return Unsafe.As<object, TValue>(ref obj);
}
// nullable
return (TValue)obj;
}
///////////////////////////////////////////////////////////
// Resize support
///////////////////////////////////////////////////////////
@ -1325,7 +1298,7 @@ namespace System.Collections.Concurrent
if (isForReprobe)
{
// if half slots are dead, just do regular resize
// otherwise we want to double the length to not come here again too soon
// otherwise we want to double the length to not come here too soon
if (allocatedSlotCount.Value < oldsz * 2)
{
if (oldlen < (MAX_SIZE / 2))

View file

@ -14,41 +14,41 @@ namespace System.Collections.Concurrent.Tests
[ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))]
public void TestEtw()
{
RemoteExecutor.Invoke(() =>
{
using (var listener = new TestEventListener("System.Collections.Concurrent.ConcurrentCollectionsEventSource", EventLevel.Verbose))
{
var events = new ConcurrentQueue<int>();
//RemoteExecutor.Invoke(() =>
//{
// using (var listener = new TestEventListener("System.Collections.Concurrent.ConcurrentCollectionsEventSource", EventLevel.Verbose))
// {
// var events = new ConcurrentQueue<int>();
const int AcquiringAllLocksEventId = 3;
Clear(events);
listener.RunWithCallback(ev => events.Enqueue(ev.EventId), () =>
{
var cd = new ConcurrentDictionary<int, int>();
cd.TryAdd(1, 1);
cd.Clear();
});
Assert.True(events.Count(i => i == AcquiringAllLocksEventId) == 0);
// const int AcquiringAllLocksEventId = 3;
// Clear(events);
// listener.RunWithCallback(ev => events.Enqueue(ev.EventId), () =>
// {
// var cd = new ConcurrentDictionary<int, int>();
// cd.TryAdd(1, 1);
// cd.Clear();
// });
// Assert.True(events.Count(i => i == AcquiringAllLocksEventId) == 0);
const int TryTakeStealsEventId = 4;
const int TryPeekStealsEventId = 5;
Clear(events);
listener.RunWithCallback(ev => events.Enqueue(ev.EventId), () =>
{
var cb = new ConcurrentBag<int>();
int item;
cb.TryPeek(out item);
cb.TryTake(out item);
});
Assert.True(events.Count(i => i == TryPeekStealsEventId) > 0);
Assert.True(events.Count(i => i == TryTakeStealsEventId) > 0);
// const int TryTakeStealsEventId = 4;
// const int TryPeekStealsEventId = 5;
// Clear(events);
// listener.RunWithCallback(ev => events.Enqueue(ev.EventId), () =>
// {
// var cb = new ConcurrentBag<int>();
// int item;
// cb.TryPeek(out item);
// cb.TryTake(out item);
// });
// Assert.True(events.Count(i => i == TryPeekStealsEventId) > 0);
// Assert.True(events.Count(i => i == TryTakeStealsEventId) > 0);
// No tests for:
// CONCURRENTSTACK_FASTPUSHFAILED_ID
// CONCURRENTSTACK_FASTPOPFAILED_ID
// These require certain race condition interleavings in order to fire.
}
}).Dispose();
// // No tests for:
// // CONCURRENTSTACK_FASTPUSHFAILED_ID
// // CONCURRENTSTACK_FASTPOPFAILED_ID
// // These require certain race condition interleavings in order to fire.
// }
//}).Dispose();
}
private static void Clear<T>(ConcurrentQueue<T> queue)