From d68036dfe013c4949fdc636a5f60e599555fe2ca Mon Sep 17 00:00:00 2001 From: psychocrypt Date: Tue, 1 May 2018 20:46:02 +0200 Subject: fix job consume (possible deadlock) fix #1505 - fix possible deadlock of the executor thread - fix racecondition during the job consumation - remove switch_work in all classes `minethd` - move `consume_work` into `globalStates` --- xmrstak/backend/amd/minethd.cpp | 30 +++------------- xmrstak/backend/amd/minethd.hpp | 5 +-- xmrstak/backend/backendConnector.cpp | 3 -- xmrstak/backend/cpu/minethd.cpp | 16 +++------ xmrstak/backend/cpu/minethd.hpp | 3 -- xmrstak/backend/globalStates.cpp | 69 +++++++++++++++++++++++++++++++----- xmrstak/backend/globalStates.hpp | 4 ++- xmrstak/backend/nvidia/minethd.cpp | 30 +++------------- xmrstak/backend/nvidia/minethd.hpp | 3 -- 9 files changed, 78 insertions(+), 85 deletions(-) diff --git a/xmrstak/backend/amd/minethd.cpp b/xmrstak/backend/amd/minethd.cpp index c921f63..185e717 100644 --- a/xmrstak/backend/amd/minethd.cpp +++ b/xmrstak/backend/amd/minethd.cpp @@ -158,27 +158,6 @@ std::vector* minethd::thread_starter(uint32_t threadOffset, miner_wor return pvThreads; } -void minethd::switch_work(miner_work& pWork) -{ - // iConsumeCnt is a basic lock-like polling mechanism just in case we happen to push work - // faster than threads can consume them. This should never happen in real life. - // Pool cant physically send jobs faster than every 250ms or so due to net latency. - - while (globalStates::inst().iConsumeCnt.load(std::memory_order_seq_cst) < globalStates::inst().iThreadCount) - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - - globalStates::inst().oGlobalWork = pWork; - globalStates::inst().iConsumeCnt.store(0, std::memory_order_seq_cst); - globalStates::inst().iGlobalJobNo++; -} - -void minethd::consume_work() -{ - memcpy(&oWork, &globalStates::inst().oGlobalWork, sizeof(miner_work)); - iJobNo++; - globalStates::inst().iConsumeCnt++; - -} void minethd::work_main() { @@ -198,8 +177,6 @@ void minethd::work_main() auto miner_algo = ::jconf::inst()->GetCurrentCoinSelection().GetDescription(1).GetMiningAlgoRoot(); cn_hash_fun hash_fun = cpu::minethd::func_selector(::jconf::inst()->HaveHardwareAes(), true /*bNoPrefetch*/, miner_algo); - globalStates::inst().iConsumeCnt++; - uint8_t version = 0; size_t lastPoolId = 0; @@ -215,7 +192,7 @@ void minethd::work_main() while (globalStates::inst().iGlobalJobNo.load(std::memory_order_relaxed) == iJobNo) std::this_thread::sleep_for(std::chrono::milliseconds(100)); - consume_work(); + globalStates::inst().consume_work(oWork, iJobNo); continue; } @@ -255,6 +232,9 @@ void minethd::work_main() { globalStates::inst().calc_start_nonce(pGpuCtx->Nonce, oWork.bNiceHash, h_per_round * 16); } + // check if the job is still valid, there is a small posibility that the job is switched + if(globalStates::inst().iGlobalJobNo.load(std::memory_order_relaxed) != iJobNo) + break; cl_uint results[0x100]; memset(results,0,sizeof(cl_uint)*(0x100)); @@ -285,7 +265,7 @@ void minethd::work_main() std::this_thread::yield(); } - consume_work(); + globalStates::inst().consume_work(oWork, iJobNo); } } diff --git a/xmrstak/backend/amd/minethd.hpp b/xmrstak/backend/amd/minethd.hpp index 29ddb74..3142117 100644 --- a/xmrstak/backend/amd/minethd.hpp +++ b/xmrstak/backend/amd/minethd.hpp @@ -20,7 +20,6 @@ class minethd : public iBackend { public: - static void switch_work(miner_work& pWork); static std::vector* thread_starter(uint32_t threadOffset, miner_work& pWork); static bool init_gpus(); @@ -30,11 +29,9 @@ private: minethd(miner_work& pWork, size_t iNo, GpuContext* ctx, const jconf::thd_cfg cfg); void work_main(); - void consume_work(); uint64_t iJobNo; - - static miner_work oGlobalWork; + miner_work oWork; std::promise order_fix; diff --git a/xmrstak/backend/backendConnector.cpp b/xmrstak/backend/backendConnector.cpp index 6f80a0f..525413f 100644 --- a/xmrstak/backend/backendConnector.cpp +++ b/xmrstak/backend/backendConnector.cpp @@ -57,9 +57,6 @@ bool BackendConnector::self_test() std::vector* BackendConnector::thread_starter(miner_work& pWork) { - globalStates::inst().iGlobalJobNo = 0; - globalStates::inst().iConsumeCnt = 0; - std::vector* pvThreads = new std::vector; diff --git a/xmrstak/backend/cpu/minethd.cpp b/xmrstak/backend/cpu/minethd.cpp index c5ebfca..3dc1063 100644 --- a/xmrstak/backend/cpu/minethd.cpp +++ b/xmrstak/backend/cpu/minethd.cpp @@ -343,13 +343,6 @@ std::vector minethd::thread_starter(uint32_t threadOffset, miner_work return pvThreads; } -void minethd::consume_work() -{ - memcpy(&oWork, &globalStates::inst().inst().oGlobalWork, sizeof(miner_work)); - iJobNo++; - globalStates::inst().inst().iConsumeCnt++; -} - minethd::cn_hash_fun minethd::func_selector(bool bHaveAes, bool bNoPrefetch, xmrstak_algo algo) { // We have two independent flag bits in the functions @@ -446,7 +439,6 @@ void minethd::work_main() piHashVal = (uint64_t*)(result.bResult + 24); piNonce = (uint32_t*)(oWork.bWorkBlob + 39); - globalStates::inst().inst().iConsumeCnt++; result.iThreadId = iThreadNo; uint8_t version = 0; @@ -464,7 +456,7 @@ void minethd::work_main() while (globalStates::inst().iGlobalJobNo.load(std::memory_order_relaxed) == iJobNo) std::this_thread::sleep_for(std::chrono::milliseconds(100)); - consume_work(); + globalStates::inst().consume_work(oWork, iJobNo); continue; } @@ -520,7 +512,7 @@ void minethd::work_main() std::this_thread::yield(); } - consume_work(); + globalStates::inst().consume_work(oWork, iJobNo); } cryptonight_free_ctx(ctx); @@ -769,7 +761,7 @@ void minethd::multiway_work_main() while (globalStates::inst().iGlobalJobNo.load(std::memory_order_relaxed) == iJobNo) std::this_thread::sleep_for(std::chrono::milliseconds(100)); - consume_work(); + globalStates::inst().consume_work(oWork, iJobNo); prep_multiway_work(bWorkBlob, piNonce); continue; } @@ -832,7 +824,7 @@ void minethd::multiway_work_main() std::this_thread::yield(); } - consume_work(); + globalStates::inst().consume_work(oWork, iJobNo); prep_multiway_work(bWorkBlob, piNonce); } diff --git a/xmrstak/backend/cpu/minethd.hpp b/xmrstak/backend/cpu/minethd.hpp index 85a95d1..2d40ce3 100644 --- a/xmrstak/backend/cpu/minethd.hpp +++ b/xmrstak/backend/cpu/minethd.hpp @@ -47,11 +47,8 @@ private: void quad_work_main(); void penta_work_main(); - void consume_work(); - uint64_t iJobNo; - static miner_work oGlobalWork; miner_work oWork; std::promise order_fix; diff --git a/xmrstak/backend/globalStates.cpp b/xmrstak/backend/globalStates.cpp index 1ec7983..e60db8f 100644 --- a/xmrstak/backend/globalStates.cpp +++ b/xmrstak/backend/globalStates.cpp @@ -33,24 +33,75 @@ namespace xmrstak { +void globalStates::consume_work( miner_work& threadWork, uint64_t& currentJobId) +{ + /* Only the executer thread which updates the job is ever setting iConsumeCnt + * to 1000. In this case each consumer must wait until the job is fully updated. + */ + uint64_t numConsumer = 0; + + /* Take care that we not consume a job if the job is updated. + * If we leave the loop we have increased iConsumeCnt so that + * the job will not be updated until we leave the method. + */ + do{ + numConsumer = iConsumeCnt.load(std::memory_order_relaxed); + if(numConsumer < 1000) + { + // register that thread try consume job data + numConsumer = ++iConsumeCnt; + if(numConsumer >= 1000) + { + iConsumeCnt--; + // 11 is a arbitrary chosen prime number + std::this_thread::sleep_for(std::chrono::milliseconds(11)); + } + } + else + { + // an other thread is preparing a new job, 11 is a arbitrary chosen prime number + std::this_thread::sleep_for(std::chrono::milliseconds(11)); + } + } + while(numConsumer >= 1000); + + threadWork = oGlobalWork; + currentJobId = iGlobalJobNo.load(std::memory_order_relaxed); + + // signal that thread consumed work + iConsumeCnt--; +} void globalStates::switch_work(miner_work& pWork, pool_data& dat) { - // iConsumeCnt is a basic lock-like polling mechanism just in case we happen to push work - // faster than threads can consume them. This should never happen in real life. - // Pool cant physically send jobs faster than every 250ms or so due to net latency. - - while (iConsumeCnt.load(std::memory_order_seq_cst) < iThreadCount) - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + /* 1000 is used to notify that the the job will be updated as soon + * as all consumer (which currently coping oGlobalWork has copied + * all data) + */ + iConsumeCnt += 1000; + // wait until all threads which entered consume_work are finished + while (iConsumeCnt.load(std::memory_order_relaxed) > 1000) + { + // 7 is a arbitrary chosen prime number which is smaller than the consumer waiting time + std::this_thread::sleep_for(std::chrono::milliseconds(7)); + } + // BEGIN CRITICAL SECTION + // this notifies all threads that the job has changed + iGlobalJobNo++; size_t xid = dat.pool_id; dat.pool_id = pool_id; pool_id = xid; - dat.iSavedNonce = iGlobalNonce.exchange(dat.iSavedNonce, std::memory_order_seq_cst); + /* Maybe a worker thread is updating the nonce while we read it. + * In that case GPUs check the job ID after a nonce update and in the + * case that it is a CPU thread we have a small chance (max 6 nonces per CPU thread) + * that we recalculate a nonce after we reconnect to the current pool + */ + dat.iSavedNonce = iGlobalNonce.exchange(dat.iSavedNonce, std::memory_order_relaxed); oGlobalWork = pWork; - iConsumeCnt.store(0, std::memory_order_seq_cst); - iGlobalJobNo++; + // END CRITICAL SECTION: allow job consume + iConsumeCnt -= 1000; } } // namespace xmrstak diff --git a/xmrstak/backend/globalStates.hpp b/xmrstak/backend/globalStates.hpp index e1b554f..44b6e0b 100644 --- a/xmrstak/backend/globalStates.hpp +++ b/xmrstak/backend/globalStates.hpp @@ -31,6 +31,8 @@ struct globalStates nonce = iGlobalNonce.fetch_add(reserve_count); } + void consume_work( miner_work& threadWork, uint64_t& currentJobId); + miner_work oGlobalWork; std::atomic iGlobalJobNo; std::atomic iConsumeCnt; @@ -39,7 +41,7 @@ struct globalStates size_t pool_id = invalid_pool_id; private: - globalStates() : iThreadCount(0) + globalStates() : iThreadCount(0), iGlobalJobNo(0), iConsumeCnt(0) { } }; diff --git a/xmrstak/backend/nvidia/minethd.cpp b/xmrstak/backend/nvidia/minethd.cpp index d14fbd4..05ee0c6 100644 --- a/xmrstak/backend/nvidia/minethd.cpp +++ b/xmrstak/backend/nvidia/minethd.cpp @@ -195,27 +195,6 @@ std::vector* minethd::thread_starter(uint32_t threadOffset, miner_wor return pvThreads; } -void minethd::switch_work(miner_work& pWork) -{ - // iConsumeCnt is a basic lock-like polling mechanism just in case we happen to push work - // faster than threads can consume them. This should never happen in real life. - // Pool cant physically send jobs faster than every 250ms or so due to net latency. - - while (globalStates::inst().iConsumeCnt.load(std::memory_order_seq_cst) < globalStates::inst().iThreadCount) - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - - globalStates::inst().oGlobalWork = pWork; - globalStates::inst().iConsumeCnt.store(0, std::memory_order_seq_cst); - globalStates::inst().iGlobalJobNo++; -} - -void minethd::consume_work() -{ - memcpy(&oWork, &globalStates::inst().oGlobalWork, sizeof(miner_work)); - iJobNo++; - globalStates::inst().iConsumeCnt++; -} - void minethd::work_main() { if(affinity >= 0) //-1 means no affinity @@ -244,8 +223,6 @@ void minethd::work_main() uint32_t iNonce; - globalStates::inst().iConsumeCnt++; - uint8_t version = 0; size_t lastPoolId = 0; @@ -261,7 +238,7 @@ void minethd::work_main() while (globalStates::inst().iGlobalJobNo.load(std::memory_order_relaxed) == iJobNo) std::this_thread::sleep_for(std::chrono::milliseconds(100)); - consume_work(); + globalStates::inst().consume_work(oWork, iJobNo); continue; } uint8_t new_version = oWork.getVersion(); @@ -299,6 +276,9 @@ void minethd::work_main() { globalStates::inst().calc_start_nonce(iNonce, oWork.bNiceHash, h_per_round * 16); } + // check if the job is still valid, there is a small posibility that the job is switched + if(globalStates::inst().iGlobalJobNo.load(std::memory_order_relaxed) != iJobNo) + break; uint32_t foundNonce[10]; uint32_t foundCount; @@ -337,7 +317,7 @@ void minethd::work_main() std::this_thread::yield(); } - consume_work(); + globalStates::inst().consume_work(oWork, iJobNo); } } diff --git a/xmrstak/backend/nvidia/minethd.hpp b/xmrstak/backend/nvidia/minethd.hpp index ad541bf..d4ae038 100644 --- a/xmrstak/backend/nvidia/minethd.hpp +++ b/xmrstak/backend/nvidia/minethd.hpp @@ -24,7 +24,6 @@ class minethd : public iBackend { public: - static void switch_work(miner_work& pWork); static std::vector* thread_starter(uint32_t threadOffset, miner_work& pWork); static bool self_test(); @@ -35,14 +34,12 @@ private: void start_mining(); void work_main(); - void consume_work(); static std::atomic iGlobalJobNo; static std::atomic iConsumeCnt; static uint64_t iThreadCount; uint64_t iJobNo; - static miner_work oGlobalWork; miner_work oWork; std::promise numa_promise; -- cgit v1.1