Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Lo spazio dei nomi System.Threading.Channels fornisce un set di struttura di dati di sincronizzazione per passare i dati tra produttori e consumatori in modo asincrono. La libreria è destinata a .NET, .NET Standard e .NET Framework e funziona su tutte le implementazioni di .NET.
Questa libreria è disponibile nel 📦 pacchetto NuGet System.Threading.Channels . Tuttavia, se si usa .NET Core 3.0 o versione successiva, il pacchetto viene incluso come parte del framework condiviso.
Modello di programmazione concettuale produttore/consumatore
I canali sono un'implementazione del modello di programmazione concettuale produttore/consumatore. In questo modello di programmazione i produttori producono i dati in modo asincrono e i consumatori li utilizzano in modo asincrono. In altre parole, questo modello passa i dati da una parte a un'altra tramite una coda FIFO ("First-In First-Out"). Pensa ai canali come a qualsiasi altro tipo di raccolta generica comune, ad esempio un List<T>. La differenza principale consiste nel fatto che questa raccolta gestisce la sincronizzazione e, tramite le opzioni di creazione della factory, fornisce vari modelli di consumo. Queste opzioni controllano il comportamento dei canali, ad esempio:
- Quanti elementi sono autorizzati a archiviare e cosa accade se viene raggiunto tale limite.
- Indica se il canale è accessibile da più producer o più consumer contemporaneamente.
Utilizzo di base
Nell'esempio seguente viene illustrato l'utilizzo di base di un canale, in cui un producer scrive elementi e un consumer li legge:
static async Task BasicUsageAsync()
{
Channel<int> channel = Channel.CreateUnbounded<int>();
Task producer = ProduceAsync(channel.Writer);
Task consumer = ConsumeAsync(channel.Reader);
await Task.WhenAll(producer, consumer);
static async Task ProduceAsync(ChannelWriter<int> writer)
{
for (int i = 0; i < 5; i++)
{
await writer.WriteAsync(i);
}
writer.Complete();
}
static async Task ConsumeAsync(ChannelReader<int> reader)
{
await foreach (int item in reader.ReadAllAsync())
{
Console.WriteLine($"Received: {item}");
}
}
}
Strategie di delimitazione
A seconda della modalità di creazione di un oggetto Channel<T>, il lettore e chi scrive (writer) si comportano in modo diverso.
Per creare un canale che specifica una capacità massima, chiamare Channel.CreateBounded. Per creare un canale utilizzato contemporaneamente da un numero qualsiasi di lettori e writer, chiamare Channel.CreateUnbounded. Ogni strategia di delimitazione espone varie opzioni definite dall'autore, come BoundedChannelOptions o UnboundedChannelOptions rispettivamente.
Nota
Indipendentemente dalla strategia di delimitazione, un canale genera sempre un'eccezione ChannelClosedException quando viene usato dopo la chiusura.
Canali non delimitati
Per creare un canale non delimitato, chiamare uno degli overload Channel.CreateUnbounded:
var channel = Channel.CreateUnbounded<T>();
Quando si crea un canale non associato, per impostazione predefinita, il canale può essere usato contemporaneamente da un numero qualsiasi di lettori e writer. In alternativa, è possibile specificare un comportamento non predefinito durante la creazione di un canale non delimitato fornendo un'istanza UnboundedChannelOptions. La capacità del canale non è delimitata e tutte le scritture vengono eseguite in modo sincrono. Per altri esempi, vedere Modelli di creazione senza delimitazioni.
Canali delimitati
Per creare un canale delimitato, chiamare uno degli overload Channel.CreateBounded:
var channel = Channel.CreateBounded<T>(7);
Il codice precedente crea un canale con capacità massima di 7 elementi. Quando si crea un canale delimitato, il canale viene associato a una capacità massima. Quando viene raggiunto tale limite, il comportamento predefinito è che il canale blocchi in modo asincrono il produttore fino a quando sia disponibile dello spazio. È possibile configurare questo comportamento specificando un'opzione al momento della creazione del canale. I canali delimitati possono essere creati con qualsiasi valore di capacità maggiore di zero. Per altri esempi, vedere Modelli di creazione delimitati.
Comportamento in modalità completa
Quando si usa un canale delimitato, è possibile specificare il comportamento a cui il canale deve conformarsi quando viene raggiunto il limite configurato. La tabella seguente elenca i comportamenti in modalità completa per ogni valore BoundedChannelFullMode:
| Valore | Comportamento |
|---|---|
| BoundedChannelFullMode.Wait | Questo è il valore predefinito. Le chiamate a WriteAsync richiedono di attendere che ci sia spazio disponibile per completare l'operazione di scrittura. Le chiamate a TryWrite restituiscono immediatamente false. |
| BoundedChannelFullMode.DropNewest | Rimuove e ignora l'elemento più recente nel canale per liberare spazio per l'elemento in corso di scrittura. |
| BoundedChannelFullMode.DropOldest | Rimuove e ignora l'elemento meno recente nel canale per liberare spazio per l'elemento in corso di scrittura. |
| BoundedChannelFullMode.DropWrite | Annulla l'elemento che si sta scrivendo. |
Importante
Ogni volta che un Channel<TWrite,TRead>.Writer produce più velocemente di quanto un Channel<TWrite,TRead>.Reader è in grado di consumare, il writer del canale sperimenta una contropressione.
API dei produttori
La funzionalità producer viene esposta nel Channel<TWrite,TRead>.Writer. Le API producer e il comportamento previsto sono descritti in dettaglio nella tabella seguente:
| API (Interfaccia di Programmazione delle Applicazioni) | Comportamento previsto |
|---|---|
| ChannelWriter<T>.Complete | Contrassegna il canale come completo, per indicare che non vengono scritti ulteriori elementi. |
| ChannelWriter<T>.TryComplete | Tenta di contrassegnare il canale come completato, per indicare che non vengono scritti altri dati. |
| ChannelWriter<T>.TryWrite | Tenta di scrivere l'elemento specificato al canale. Se usato con un canale non delimitato, questo restituisce sempre true a meno che il writer del canale non segnali il completamento con ChannelWriter<T>.Complete o ChannelWriter<T>.TryComplete. |
| ChannelWriter<T>.WaitToWriteAsync | Restituisce un ValueTask<TResult> che si completa quando è disponibile spazio per scrivere un elemento. |
| ChannelWriter<T>.WriteAsync | Scrive in modo asincrono un elemento nel canale. |
API per consumatori
La funzionalità utente viene esposta su Channel<TWrite,TRead>.Reader. Le API consumer e il comportamento previsto sono descritti in dettaglio nella tabella seguente:
| API (Interfaccia di Programmazione delle Applicazioni) | Comportamento previsto |
|---|---|
| ChannelReader<T>.ReadAllAsync | Crea un elemento IAsyncEnumerable<T> che consente la lettura di tutti i dati dal canale. |
| ChannelReader<T>.ReadAsync | Legge in modo asincrono un elemento dal canale. |
| ChannelReader<T>.TryPeek | Tenta di visualizzare un elemento dal canale. |
| ChannelReader<T>.TryRead | Tenta di leggere un elemento dal canale. |
| ChannelReader<T>.WaitToReadAsync | Restituisce un ValueTask<TResult> che si completa quando sono disponibili i dati per la lettura. |
Modelli di utilizzo comuni
Esistono diversi modelli di utilizzo per i canali:
L'API è progettata per essere semplice, coerente e il più flessibile possibile. Tutti i metodi asincroni restituiscono un oggetto ValueTask (o ValueTask<bool>) che rappresenta un'operazione asincrona leggera, in grado di evitare l’allocazione se l'operazione viene completata in modo sincrono e, potenzialmente, perfino in modo asincrono. Inoltre, l'API è progettata per essere componibile, in quanto l'autore di un canale ne promette l'utilizzo previsto. Quando un canale viene creato con determinati parametri, l'implementazione interna può operare in modo più efficiente conoscendo tali promesse.
Modelli di creazione
Si supponga di creare una soluzione produttore/consumatore per un sistema di posizione globale (GPS). Si desidera tenere traccia delle coordinate di un dispositivo nel tempo. Un oggetto campione delle coordinate potrebbe essere simile al seguente:
/// <summary>
/// A representation of a device's coordinates,
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
Guid DeviceId,
double Latitude,
double Longitude);
Modelli di creazione senza limitazioni
Un modello di utilizzo comune consiste nel creare un canale non associato predefinito:
var channel = Channel.CreateUnbounded<Coordinates>();
Si supponga invece di voler creare un canale non limitato con più produttori e consumatori. Impostare SingleWriter = false e SingleReader = false nelle opzioni del canale:
var channel = Channel.CreateUnbounded<Coordinates>(
new UnboundedChannelOptions
{
SingleWriter = false,
SingleReader = false,
AllowSynchronousContinuations = true
});
In questo caso, tutte le operazioni di scrittura sono sincrone, anche WriteAsync. Questo comportamento si verifica perché un canale non associato ha sempre spazio disponibile per una scrittura immediata. Tuttavia, impostando AllowSynchronousContinuations su true, le scritture potrebbero finire per eseguire operazioni associate a un lettore eseguendo le relative continuazioni. Questa impostazione non influisce sulla sincronizzazione dell'operazione.
Modelli di creazione delimitati
Con i canali delimitati , la configurabilità del canale deve essere nota al consumer per garantire un consumo appropriato. Ovvero, il consumatore deve conoscere il comportamento visualizzato dal canale quando viene raggiunto il limite configurato. Negli esempi seguenti vengono illustrati alcuni dei modelli di creazione delimitati comuni.
Il modo più semplice per creare un canale delimitato consiste nel specificare una capacità. Il codice seguente crea un canale delimitato con capacità massima di 1.
var channel = Channel.CreateBounded<Coordinates>(1);
Sono disponibili altre opzioni, Alcune opzioni sono uguali a un canale non associato, mentre altre sono specifiche dei canali delimitati. Nel codice seguente il canale viene creato come canale delimitato limitato a 1.000 elementi, con un singolo writer ma molti lettori. Il comportamento della modalità completa viene definito come DropWrite, il che significa che elimina l'elemento scritto se il canale è pieno.
var channel = Channel.CreateBounded<Coordinates>(
new BoundedChannelOptions(1_000)
{
SingleWriter = true,
SingleReader = false,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.DropWrite
});
Per osservare gli elementi eliminati quando si usano canali delimitati, registrare un callback itemDropped:
var channel = Channel.CreateBounded(
new BoundedChannelOptions(10)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest
},
static void (Coordinates dropped) =>
Console.WriteLine($"Coordinates dropped: {dropped}"));
Ogni volta che il canale è pieno e viene aggiunto un nuovo elemento, viene richiamato il callback itemDropped. In questo esempio, il callback fornito scrive l'elemento nella console, ma è possibile eseguire qualsiasi altra azione desiderata.
Schemi del produttore
Si supponga che il produttore in questo scenario stia scrivendo nuove coordinate all’interno del canale. Il produttore può eseguire questa operazione chiamando TryWrite:
static void ProduceWithWhileAndTryWrite(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
}
}
Il codice produttore precedente:
- Accetta il
Channel<Coordinates>.Writer(ChannelWriter<Coordinates>) come argomento, insieme all'oggetto inizialeCoordinates. - Definisce un ciclo condizionale
whileche tenta di spostare le coordinate usandoTryWrite.
Un produttore alternativo potrebbe usare il metodo WriteAsync:
static async ValueTask ProduceWithWhileWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
await writer.WriteAsync(
item: coordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
});
}
writer.Complete();
}
Anche in questo caso, Channel<Coordinates>.Writer viene usato all'interno di un ciclo while. Ma questa volta viene chiamato metodo WriteAsync. Il metodo continua solo dopo la scrittura delle coordinate. Quando il ciclo while si chiude, viene effettuata una chiamata a Complete, a segnalare che nel canale non vengono scritti altri dati.
Un altro modello produttore consiste nell'usare il metodoWaitToWriteAsync, considerare il codice seguente:
static async ValueTask ProduceWithWaitToWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
await writer.WaitToWriteAsync())
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
await Task.Delay(TimeSpan.FromMilliseconds(10));
}
writer.Complete();
}
Come parte dell'istruzione condizionale while, il risultato della chiamata WaitToWriteAsync viene usato per stabilire se continuare il ciclo.
Modelli di consumo
Esistono diversi modelli comuni di consumatori nei canali. Quando un canale non termina mai, ovvero produce dati per un periodo illimitato, il consumatore potrebbe usare un ciclo while (true) e leggere i dati man mano che diventano disponibili:
static async ValueTask ConsumeWithWhileAsync(
ChannelReader<Coordinates> reader)
{
while (true)
{
// May throw ChannelClosedException if
// the parent channel's writer signals complete.
Coordinates coordinates = await reader.ReadAsync();
Console.WriteLine(coordinates);
}
}
Nota
Questo codice genera un'eccezione se il canale è chiuso.
Un consumatore alternativo potrebbe evitare questo problema usando un ciclo while annidato, come illustrato nel codice seguente:
static async ValueTask ConsumeWithNestedWhileAsync(
ChannelReader<Coordinates> reader)
{
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out Coordinates coordinates))
{
Console.WriteLine(coordinates);
}
}
}
Nel codice precedente il consumatore attende di leggere i dati. Quando i dati sono disponibili, il consumatore prova a leggerlo. Tali cicli continuano a effettuare valutazioni fino a quando il produttore del canale segnala che non ha più dati da leggere. Detto questo, quando un produttore è noto per avere un numero finito di elementi che produce e ne segnala il completamento, il consumatore può usare la semantica await foreach per scorrere gli elementi:
static async ValueTask ConsumeWithAwaitForeachAsync(
ChannelReader<Coordinates> reader)
{
await foreach (Coordinates coordinates in reader.ReadAllAsync())
{
Console.WriteLine(coordinates);
}
}
Il codice precedente usa il metodo ReadAllAsyncper leggere tutte le coordinate dal canale.
Multipli produttori e consumatori
I canali supportano più produttori e consumatori simultanei. Per abilitare questa operazione, creare un canale con SingleWriter = false e SingleReader = false nelle opzioni del canale. È quindi possibile distribuire la scrittura su più attività producer e concentrare la lettura da più attività consumer.
static async Task UseMultipleProducersAndConsumersAsync()
{
Channel<Coordinates> channel = Channel.CreateUnbounded<Coordinates>(
new UnboundedChannelOptions
{
SingleWriter = false,
SingleReader = false
});
// Start three concurrent producer tasks.
Task[] producerTasks = Enumerable.Range(0, 3)
.Select(id => ProduceAsync(id, channel))
.ToArray();
// Start two concurrent consumer tasks.
Task[] consumerTasks = Enumerable.Range(0, 2)
.Select(_ => ConsumeAsync(channel))
.ToArray();
// Wait for all producers to finish, then mark the channel as complete.
await Task.WhenAll(producerTasks);
channel.Writer.Complete();
// Wait for all consumers to finish.
await Task.WhenAll(consumerTasks);
static async Task ProduceAsync(int id, Channel<Coordinates> channel)
{
Coordinates coordinates = new(
DeviceId: Guid.NewGuid(),
Latitude: -90 + (id * 30),
Longitude: -180 + (id * 60));
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
coordinates = coordinates with
{
Latitude = coordinates.Latitude + 0.5,
Longitude = coordinates.Longitude + 1
};
await channel.Writer.WriteAsync(coordinates);
}
}
static async Task ConsumeAsync(Channel<Coordinates> channel)
{
await foreach (Coordinates coordinates in channel.Reader.ReadAllAsync())
{
Console.WriteLine(coordinates);
}
}
}
Il codice precedente:
- Crea un canale senza limiti che supporta in modo esplicito più scrittori e lettori simultanei.
- Avvia tre attività di produttore simultanee, ognuna delle quali scrive una serie di coordinate con un identificatore univoco per dispositivo.
- Avvia due attività consumer simultanee, ognuna delle quali legge dallo stesso canale usando
ReadAllAsync. - Attende il completamento di tutti i produttori, quindi chiama Complete per segnalare che non vengono scritti altri dati nel canale.
- Attende che tutti i consumatori finiscano di svuotare i dati rimanenti dal canale.
Suggerimento
Con più produttori, chiamare channel.Writer.Complete() solo dopo che tutti i produttori hanno terminato di scrivere. Questo segnala che non vengono scritti altri dati, consentendo ReadAllAsync() di completare dopo l'utilizzo di tutti gli elementi rimanenti.