From 34deee35bd12c6ece4829adeb584fa91b3dd5c36 Mon Sep 17 00:00:00 2001 From: psychocrypt Date: Wed, 16 May 2018 17:20:26 +0200 Subject: avoid out of order job processing The login result of a pool contains the first job for the miner. In the case where the pool is sending very fast after the pool login result a new job it is possible that the newer job is processed faster than the job within the login result. The result will be that the miner is mining an older job instead the newest (last received). - enumerate all received messages - trace the message id of the last procssed job - skip all jobs where the message id is older than the last procesed id --- xmrstak/net/jpsock.cpp | 36 ++++++++++++++++++++++++++++-------- xmrstak/net/jpsock.hpp | 7 +++++-- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/xmrstak/net/jpsock.cpp b/xmrstak/net/jpsock.cpp index 08bf99f..6c41f2b 100644 --- a/xmrstak/net/jpsock.cpp +++ b/xmrstak/net/jpsock.cpp @@ -43,8 +43,9 @@ struct jpsock::call_rsp uint64_t iCallId; Value* pCallData; std::string sCallErr; + uint64_t iMessageId; - call_rsp(Value* val) : pCallData(val) + call_rsp(Value* val) : pCallData(val), iMessageId(0) { bHaveResponse = false; iCallId = 0; @@ -215,6 +216,7 @@ void jpsock::jpsock_thread() prv->oCallRsp.bHaveResponse = true; prv->oCallRsp.iCallId = 0; prv->oCallRsp.pCallData = nullptr; + prv->oCallRsp.iMessageId = 0; bCallWaiting = true; } mlock.unlock(); @@ -286,6 +288,7 @@ bool jpsock::process_line(char* line, size_t len) prv->jsonDoc.SetNull(); prv->parseAllocator.Clear(); prv->callAllocator.Clear(); + ++iMessageCnt; /*NULL terminate the line instead of '\n', parsing will add some more NULLs*/ line[len-1] = '\0'; @@ -320,7 +323,7 @@ bool jpsock::process_line(char* line, size_t len) return set_socket_error("PARSE error: Protocol error 2"); opq_json_val v(mt); - return process_pool_job(&v); + return process_pool_job(&v, iMessageCnt); } else { @@ -365,6 +368,7 @@ bool jpsock::process_line(char* line, size_t len) prv->oCallRsp.bHaveResponse = true; prv->oCallRsp.iCallId = iCallId; + prv->oCallRsp.iMessageId = iMessageCnt; if(sError != nullptr) { @@ -382,8 +386,20 @@ bool jpsock::process_line(char* line, size_t len) } } -bool jpsock::process_pool_job(const opq_json_val* params) +bool jpsock::process_pool_job(const opq_json_val* params, const uint64_t messageId) { + std::unique_lock mlock(job_mutex); + if(messageId < iLastMessageId) + { + /* In the case where the processed job message id is lesser than the last + * processed job message id we skip the processing to avoid mining old jobs + */ + return true; + } + iLastMessageId = messageId; + + mlock.unlock(); + if (!params->val->IsObject()) return set_socket_error("PARSE error: Job error 1"); @@ -514,7 +530,7 @@ void jpsock::disconnect(bool quiet) quiet_close = false; } -bool jpsock::cmd_ret_wait(const char* sPacket, opq_json_val& poResult) +bool jpsock::cmd_ret_wait(const char* sPacket, opq_json_val& poResult, uint64_t& messageId) { //printf("SEND: %s\n", sPacket); @@ -554,8 +570,10 @@ bool jpsock::cmd_ret_wait(const char* sPacket, opq_json_val& poResult) } if(bSuccess) + { poResult.val = &prv->oCallValue; - + messageId = prv->oCallRsp.iMessageId; + } return bSuccess; } @@ -567,9 +585,10 @@ bool jpsock::cmd_login() usr_login.c_str(), usr_pass.c_str(), usr_rigid.c_str(), get_version_str().c_str()); opq_json_val oResult(nullptr); + uint64_t messageId = 0; /*Normal error conditions (failed login etc..) will end here*/ - if (!cmd_ret_wait(cmd_buffer, oResult)) + if (!cmd_ret_wait(cmd_buffer, oResult, messageId)) return false; if (!oResult.val->IsObject()) @@ -624,7 +643,7 @@ bool jpsock::cmd_login() } opq_json_val v(job); - if(!process_pool_job(&v)) + if(!process_pool_job(&v, messageId)) { disconnect(); return false; @@ -695,8 +714,9 @@ bool jpsock::cmd_submit(const char* sJobId, uint32_t iNonce, const uint8_t* bRes snprintf(cmd_buffer, sizeof(cmd_buffer), "{\"method\":\"submit\",\"params\":{\"id\":\"%s\",\"job_id\":\"%s\",\"nonce\":\"%s\",\"result\":\"%s\"%s%s%s},\"id\":1}\n", sMinerId, sJobId, sNonce, sResult, sBackend, sHashcount, sAlgo); + uint64_t messageId = 0; opq_json_val oResult(nullptr); - return cmd_ret_wait(cmd_buffer, oResult); + return cmd_ret_wait(cmd_buffer, oResult, messageId); } void jpsock::save_nonce(uint32_t nonce) diff --git a/xmrstak/net/jpsock.hpp b/xmrstak/net/jpsock.hpp index 3afcc9b..ad34f6c 100644 --- a/xmrstak/net/jpsock.hpp +++ b/xmrstak/net/jpsock.hpp @@ -124,8 +124,8 @@ private: void jpsock_thread(); bool jpsock_thd_main(); bool process_line(char* line, size_t len); - bool process_pool_job(const opq_json_val* params); - bool cmd_ret_wait(const char* sPacket, opq_json_val& poResult); + bool process_pool_job(const opq_json_val* params, const uint64_t messageId); + bool cmd_ret_wait(const char* sPacket, opq_json_val& poResult, uint64_t& messageId); char sMinerId[64]; std::atomic iJobDiff; @@ -142,5 +142,8 @@ private: opaque_private* prv; base_socket* sck; + + uint64_t iMessageCnt = 0; + uint64_t iLastMessageId = 0; }; -- cgit v1.1