diff options
-rw-r--r-- | xmrstak/net/jpsock.cpp | 36 | ||||
-rw-r--r-- | xmrstak/net/jpsock.hpp | 7 |
2 files changed, 33 insertions, 10 deletions
diff --git a/xmrstak/net/jpsock.cpp b/xmrstak/net/jpsock.cpp index 08bf99f..6c41f2b 100644 --- a/xmrstak/net/jpsock.cpp +++ b/xmrstak/net/jpsock.cpp @@ -43,8 +43,9 @@ struct jpsock::call_rsp uint64_t iCallId; Value* pCallData; std::string sCallErr; + uint64_t iMessageId; - call_rsp(Value* val) : pCallData(val) + call_rsp(Value* val) : pCallData(val), iMessageId(0) { bHaveResponse = false; iCallId = 0; @@ -215,6 +216,7 @@ void jpsock::jpsock_thread() prv->oCallRsp.bHaveResponse = true; prv->oCallRsp.iCallId = 0; prv->oCallRsp.pCallData = nullptr; + prv->oCallRsp.iMessageId = 0; bCallWaiting = true; } mlock.unlock(); @@ -286,6 +288,7 @@ bool jpsock::process_line(char* line, size_t len) prv->jsonDoc.SetNull(); prv->parseAllocator.Clear(); prv->callAllocator.Clear(); + ++iMessageCnt; /*NULL terminate the line instead of '\n', parsing will add some more NULLs*/ line[len-1] = '\0'; @@ -320,7 +323,7 @@ bool jpsock::process_line(char* line, size_t len) return set_socket_error("PARSE error: Protocol error 2"); opq_json_val v(mt); - return process_pool_job(&v); + return process_pool_job(&v, iMessageCnt); } else { @@ -365,6 +368,7 @@ bool jpsock::process_line(char* line, size_t len) prv->oCallRsp.bHaveResponse = true; prv->oCallRsp.iCallId = iCallId; + prv->oCallRsp.iMessageId = iMessageCnt; if(sError != nullptr) { @@ -382,8 +386,20 @@ bool jpsock::process_line(char* line, size_t len) } } -bool jpsock::process_pool_job(const opq_json_val* params) +bool jpsock::process_pool_job(const opq_json_val* params, const uint64_t messageId) { + std::unique_lock<std::mutex> mlock(job_mutex); + if(messageId < iLastMessageId) + { + /* In the case where the processed job message id is lesser than the last + * processed job message id we skip the processing to avoid mining old jobs + */ + return true; + } + iLastMessageId = messageId; + + mlock.unlock(); + if (!params->val->IsObject()) return set_socket_error("PARSE error: Job error 1"); @@ -514,7 +530,7 @@ void jpsock::disconnect(bool quiet) quiet_close = false; } -bool jpsock::cmd_ret_wait(const char* sPacket, opq_json_val& poResult) +bool jpsock::cmd_ret_wait(const char* sPacket, opq_json_val& poResult, uint64_t& messageId) { //printf("SEND: %s\n", sPacket); @@ -554,8 +570,10 @@ bool jpsock::cmd_ret_wait(const char* sPacket, opq_json_val& poResult) } if(bSuccess) + { poResult.val = &prv->oCallValue; - + messageId = prv->oCallRsp.iMessageId; + } return bSuccess; } @@ -567,9 +585,10 @@ bool jpsock::cmd_login() usr_login.c_str(), usr_pass.c_str(), usr_rigid.c_str(), get_version_str().c_str()); opq_json_val oResult(nullptr); + uint64_t messageId = 0; /*Normal error conditions (failed login etc..) will end here*/ - if (!cmd_ret_wait(cmd_buffer, oResult)) + if (!cmd_ret_wait(cmd_buffer, oResult, messageId)) return false; if (!oResult.val->IsObject()) @@ -624,7 +643,7 @@ bool jpsock::cmd_login() } opq_json_val v(job); - if(!process_pool_job(&v)) + if(!process_pool_job(&v, messageId)) { disconnect(); return false; @@ -695,8 +714,9 @@ bool jpsock::cmd_submit(const char* sJobId, uint32_t iNonce, const uint8_t* bRes snprintf(cmd_buffer, sizeof(cmd_buffer), "{\"method\":\"submit\",\"params\":{\"id\":\"%s\",\"job_id\":\"%s\",\"nonce\":\"%s\",\"result\":\"%s\"%s%s%s},\"id\":1}\n", sMinerId, sJobId, sNonce, sResult, sBackend, sHashcount, sAlgo); + uint64_t messageId = 0; opq_json_val oResult(nullptr); - return cmd_ret_wait(cmd_buffer, oResult); + return cmd_ret_wait(cmd_buffer, oResult, messageId); } void jpsock::save_nonce(uint32_t nonce) diff --git a/xmrstak/net/jpsock.hpp b/xmrstak/net/jpsock.hpp index 3afcc9b..ad34f6c 100644 --- a/xmrstak/net/jpsock.hpp +++ b/xmrstak/net/jpsock.hpp @@ -124,8 +124,8 @@ private: void jpsock_thread(); bool jpsock_thd_main(); bool process_line(char* line, size_t len); - bool process_pool_job(const opq_json_val* params); - bool cmd_ret_wait(const char* sPacket, opq_json_val& poResult); + bool process_pool_job(const opq_json_val* params, const uint64_t messageId); + bool cmd_ret_wait(const char* sPacket, opq_json_val& poResult, uint64_t& messageId); char sMinerId[64]; std::atomic<uint64_t> iJobDiff; @@ -142,5 +142,8 @@ private: opaque_private* prv; base_socket* sck; + + uint64_t iMessageCnt = 0; + uint64_t iLastMessageId = 0; }; |