Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Oversubscriptie kan de algehele efficiëntie van sommige toepassingen die taken bevatten met een hoge latentie verbeteren. In dit onderwerp wordt uitgelegd hoe u oversubscriptie gebruikt om de latentie te compenseren die wordt veroorzaakt door het lezen van gegevens van een netwerkverbinding.
Voorbeeld
In dit voorbeeld wordt de Asynchrone agentsbibliotheek gebruikt om bestanden van HTTP-servers te downloaden. De http_reader klasse is afgeleid van concurrency::agent en gebruikt berichtoverdracht om asynchroon te lezen welke URL's gedownload moeten worden.
De http_reader klasse maakt gebruik van de gelijktijdigheid::task_group klasse om elk bestand gelijktijdig te lezen. Elke taak roept de concurrency::Context::Oversubscribe-methode aan met de parameter _BeginOversubscription ingesteld op true om oversubscripting in de huidige context in te schakelen. Elke taak gebruikt vervolgens de CInternetSession - en CHttpFile-klassen van Microsoft Foundation Classes (MFC) om het bestand te downloaden. Ten slotte roept elke taak Context::Oversubscribe aan met de _BeginOversubscription-parameter ingesteld op false om oversubscriptie uit te schakelen.
Wanneer oversubscriptie is ingeschakeld, maakt de runtime één extra thread waarin taken moeten worden uitgevoerd. Elk van deze threads kan ook de huidige context overschrijven en daardoor extra threads maken. De http_reader klasse maakt gebruik van een gelijktijdigheid::unbounded_buffer-object om het aantal threads te beperken dat door de toepassing wordt gebruikt. De agent initialiseert de buffer met een vast aantal tokenwaarden. Voor elke downloadbewerking leest de agent een tokenwaarde uit de buffer voordat de bewerking wordt gestart en schrijft deze waarde vervolgens terug naar de buffer nadat de bewerking is voltooid. Wanneer de buffer leeg is, wacht de agent tot een van de downloadbewerkingen een waarde terug naar de buffer schrijft.
In het volgende voorbeeld wordt het aantal gelijktijdige taken beperkt tot twee keer het aantal beschikbare hardwarethreads. Deze waarde is een goed startpunt om te gebruiken wanneer u experimenteert met oversubscriptie. U kunt een waarde gebruiken die past bij een bepaalde verwerkingsomgeving of deze waarde dynamisch wijzigen om te reageren op de werkelijke werkbelasting.
// download-oversubscription.cpp
// compile with: /EHsc /MD /D "_AFXDLL"
#define _WIN32_WINNT 0x0501
#include <afxinet.h>
#include <concrtrm.h>
#include <agents.h>
#include <ppl.h>
#include <sstream>
#include <iostream>
#include <array>
using namespace concurrency;
using namespace std;
// Calls the provided work function and returns the number of milliseconds
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
__int64 begin = GetTickCount();
f();
return GetTickCount() - begin;
}
// Downloads the file at the given URL.
CString GetHttpFile(CInternetSession& session, const CString& strUrl);
// Reads files from HTTP servers.
class http_reader : public agent
{
public:
explicit http_reader(CInternetSession& session,
ISource<string>& source,
unsigned int& total_bytes,
unsigned int max_concurrent_reads)
: _session(session)
, _source(source)
, _total_bytes(total_bytes)
{
// Add one token to the available tasks buffer for each
// possible concurrent read operation. The value of each token
// is not important, but can be useful for debugging.
for (unsigned int i = 0; i < max_concurrent_reads; ++i)
send(_available_tasks, i);
}
// Signals to the agent that there are no more items to download.
static const string input_sentinel;
protected:
void run()
{
// A task group. Each task in the group downloads one file.
task_group tasks;
// Holds the total number of bytes downloaded.
combinable<unsigned int> total_bytes;
// Read from the source buffer until the application
// sends the sentinel value.
string url;
while ((url = receive(_source)) != input_sentinel)
{
// Wait for a task to release an available slot.
unsigned int token = receive(_available_tasks);
// Create a task to download the file.
tasks.run([&, token, url] {
// Print a message.
wstringstream ss;
ss << L"Downloading " << url.c_str() << L"..." << endl;
wcout << ss.str();
// Download the file.
string content = download(url);
// Update the total number of bytes downloaded.
total_bytes.local() += content.size();
// Release the slot for another task.
send(_available_tasks, token);
});
}
// Wait for all tasks to finish.
tasks.wait();
// Compute the total number of bytes download on all threads.
_total_bytes = total_bytes.combine(plus<unsigned int>());
// Set the status of the agent to agent_done.
done();
}
// Downloads the file at the given URL.
string download(const string& url)
{
// Enable oversubscription.
Context::Oversubscribe(true);
// Download the file.
string content = GetHttpFile(_session, url.c_str());
// Disable oversubscription.
Context::Oversubscribe(false);
return content;
}
private:
// Manages the network connection.
CInternetSession& _session;
// A message buffer that holds the URL names to download.
ISource<string>& _source;
// The total number of bytes downloaded
unsigned int& _total_bytes;
// Limits the agent to a given number of simultaneous tasks.
unbounded_buffer<unsigned int> _available_tasks;
};
const string http_reader::input_sentinel("");
int wmain()
{
// Create an array of URL names to download.
// A real-world application might read the names from user input.
array<string, 21> urls = {
"http://www.adatum.com/",
"http://www.adventure-works.com/",
"http://www.alpineskihouse.com/",
"http://www.cpandl.com/",
"http://www.cohovineyard.com/",
"http://www.cohowinery.com/",
"http://www.cohovineyardandwinery.com/",
"http://www.contoso.com/",
"http://www.consolidatedmessenger.com/",
"http://www.fabrikam.com/",
"http://www.fourthcoffee.com/",
"http://www.graphicdesigninstitute.com/",
"http://www.humongousinsurance.com/",
"http://www.litwareinc.com/",
"http://www.lucernepublishing.com/",
"http://www.margiestravel.com/",
"http://www.northwindtraders.com/",
"http://www.proseware.com/",
"http://www.fineartschool.net",
"http://www.tailspintoys.com/",
http_reader::input_sentinel,
};
// Manages the network connection.
CInternetSession session("Microsoft Internet Browser");
// A message buffer that enables the application to send URL names to the
// agent.
unbounded_buffer<string> source_urls;
// The total number of bytes that the agent has downloaded.
unsigned int total_bytes = 0u;
// Create an http_reader object that can oversubscribe each processor by one.
http_reader reader(session, source_urls, total_bytes, 2*GetProcessorCount());
// Compute the amount of time that it takes for the agent to download all files.
__int64 elapsed = time_call([&] {
// Start the agent.
reader.start();
// Use the message buffer to send each URL name to the agent.
for_each(begin(urls), end(urls), [&](const string& url) {
send(source_urls, url);
});
// Wait for the agent to finish downloading.
agent::wait(&reader);
});
// Print the results.
wcout << L"Downloaded " << total_bytes
<< L" bytes in " << elapsed << " ms." << endl;
}
// Downloads the file at the given URL and returns the size of that file.
CString GetHttpFile(CInternetSession& session, const CString& strUrl)
{
CString strResult;
// Reads data from an HTTP server.
CHttpFile* pHttpFile = NULL;
try
{
// Open URL.
pHttpFile = (CHttpFile*)session.OpenURL(strUrl, 1,
INTERNET_FLAG_TRANSFER_ASCII |
INTERNET_FLAG_RELOAD | INTERNET_FLAG_DONT_CACHE);
// Read the file.
if(pHttpFile != NULL)
{
UINT uiBytesRead;
do
{
char chBuffer[10000];
uiBytesRead = pHttpFile->Read(chBuffer, sizeof(chBuffer));
strResult += chBuffer;
}
while (uiBytesRead > 0);
}
}
catch (CInternetException)
{
// TODO: Handle exception
}
// Clean up and return.
delete pHttpFile;
return strResult;
}
In dit voorbeeld wordt de volgende uitvoer gegenereerd op een computer met vier processors:
Downloading http://www.adatum.com/...
Downloading http://www.adventure-works.com/...
Downloading http://www.alpineskihouse.com/...
Downloading http://www.cpandl.com/...
Downloading http://www.cohovineyard.com/...
Downloading http://www.cohowinery.com/...
Downloading http://www.cohovineyardandwinery.com/...
Downloading http://www.contoso.com/...
Downloading http://www.consolidatedmessenger.com/...
Downloading http://www.fabrikam.com/...
Downloading http://www.fourthcoffee.com/...
Downloading http://www.graphicdesigninstitute.com/...
Downloading http://www.humongousinsurance.com/...
Downloading http://www.litwareinc.com/...
Downloading http://www.lucernepublishing.com/...
Downloading http://www.margiestravel.com/...
Downloading http://www.northwindtraders.com/...
Downloading http://www.proseware.com/...
Downloading http://www.fineartschool.net...
Downloading http://www.tailspintoys.com/...
Downloaded 1801040 bytes in 3276 ms.
Het voorbeeld kan sneller worden uitgevoerd wanneer oversubscriptie is ingeschakeld omdat extra taken worden uitgevoerd terwijl andere taken wachten tot een latente bewerking is voltooid.
De code compileren
Kopieer de voorbeeldcode en plak deze in een Visual Studio-project of plak deze in een bestand met de naam download-oversubscription.cpp en voer vervolgens een van de volgende opdrachten uit in een Visual Studio-opdrachtpromptvenster .
cl.exe /EHsc /MD /D "_AFXDLL" download-oversubscription.cpp
cl.exe /EHsc /MT download-oversubscription.cpp
Robuuste programmering
Schakel oversubscriptie altijd uit nadat u deze niet meer nodig hebt. Overweeg een functie die geen uitzondering afhandelt die wordt gegenereerd door een andere functie. Als u oversubscriptie niet uitschakelt voordat de functie wordt geretourneerd, wordt de huidige context ook overschreven door extra parallel werk.
U kunt het RAII-patroon ( Resource Acquisition Is Initialization ) gebruiken om de oversubscriptie te beperken tot een bepaald bereik. Onder het RAII-patroon wordt een gegevensstructuur toegewezen aan de stack. Deze gegevensstructuur initialiseert of verkrijgt een resource wanneer deze wordt gemaakt en vernietigt of publiceert die resource wanneer de gegevensstructuur wordt vernietigd. Het RAII-patroon garandeert dat de destructor wordt aangeroepen voordat het omsluitbereik wordt afgesloten. Daarom wordt de resource correct beheerd wanneer er een uitzondering wordt gegenereerd of wanneer een functie meerdere return instructies bevat.
In het volgende voorbeeld wordt een structuur gedefinieerd met de naam scoped_blocking_signal. De constructor van de scoped_blocking_signal structuur maakt oversubscriptie mogelijk en destructor schakelt oversubscriptie uit.
struct scoped_blocking_signal
{
scoped_blocking_signal()
{
concurrency::Context::Oversubscribe(true);
}
~scoped_blocking_signal()
{
concurrency::Context::Oversubscribe(false);
}
};
In het volgende voorbeeld wordt de body van de download-methode aangepast om RAII te gebruiken en zo ervoor te zorgen dat oversubscription is uitgeschakeld voordat de functie terugkeert. Deze techniek zorgt ervoor dat de download methode uitzonderingsveilig is.
// Downloads the file at the given URL.
string download(const string& url)
{
scoped_blocking_signal signal;
// Download the file.
return string(GetHttpFile(_session, url.c_str()));
}