summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpsychocrypt <psychocryptHPC@gmail.com>2018-05-16 17:20:26 +0200
committerpsychocrypt <psychocryptHPC@gmail.com>2018-05-17 09:32:45 +0200
commit84d3748550296ec0737588cf96b480927e5230d1 (patch)
tree459f520f6ab7b85d560f32f4a82151b02e5274ea
parent94d41580e0b0400e93a2f0226eb71476b891f4a7 (diff)
downloadxmr-stak-84d3748550296ec0737588cf96b480927e5230d1.zip
xmr-stak-84d3748550296ec0737588cf96b480927e5230d1.tar.gz
avoid out of order job processing
The login result of a pool contains the first job for the miner. In the case where the pool is sending very fast after the pool login result a new job it is possible that the newer job is processed faster than the job within the login result. The result will be that the miner is mining an older job instead the newest (last received). - enumerate all received messages - trace the message id of the last procssed job - skip all jobs where the message id is older than the last procesed id
-rw-r--r--xmrstak/net/jpsock.cpp36
-rw-r--r--xmrstak/net/jpsock.hpp7
2 files changed, 33 insertions, 10 deletions
diff --git a/xmrstak/net/jpsock.cpp b/xmrstak/net/jpsock.cpp
index 08bf99f..6c41f2b 100644
--- a/xmrstak/net/jpsock.cpp
+++ b/xmrstak/net/jpsock.cpp
@@ -43,8 +43,9 @@ struct jpsock::call_rsp
uint64_t iCallId;
Value* pCallData;
std::string sCallErr;
+ uint64_t iMessageId;
- call_rsp(Value* val) : pCallData(val)
+ call_rsp(Value* val) : pCallData(val), iMessageId(0)
{
bHaveResponse = false;
iCallId = 0;
@@ -215,6 +216,7 @@ void jpsock::jpsock_thread()
prv->oCallRsp.bHaveResponse = true;
prv->oCallRsp.iCallId = 0;
prv->oCallRsp.pCallData = nullptr;
+ prv->oCallRsp.iMessageId = 0;
bCallWaiting = true;
}
mlock.unlock();
@@ -286,6 +288,7 @@ bool jpsock::process_line(char* line, size_t len)
prv->jsonDoc.SetNull();
prv->parseAllocator.Clear();
prv->callAllocator.Clear();
+ ++iMessageCnt;
/*NULL terminate the line instead of '\n', parsing will add some more NULLs*/
line[len-1] = '\0';
@@ -320,7 +323,7 @@ bool jpsock::process_line(char* line, size_t len)
return set_socket_error("PARSE error: Protocol error 2");
opq_json_val v(mt);
- return process_pool_job(&v);
+ return process_pool_job(&v, iMessageCnt);
}
else
{
@@ -365,6 +368,7 @@ bool jpsock::process_line(char* line, size_t len)
prv->oCallRsp.bHaveResponse = true;
prv->oCallRsp.iCallId = iCallId;
+ prv->oCallRsp.iMessageId = iMessageCnt;
if(sError != nullptr)
{
@@ -382,8 +386,20 @@ bool jpsock::process_line(char* line, size_t len)
}
}
-bool jpsock::process_pool_job(const opq_json_val* params)
+bool jpsock::process_pool_job(const opq_json_val* params, const uint64_t messageId)
{
+ std::unique_lock<std::mutex> mlock(job_mutex);
+ if(messageId < iLastMessageId)
+ {
+ /* In the case where the processed job message id is lesser than the last
+ * processed job message id we skip the processing to avoid mining old jobs
+ */
+ return true;
+ }
+ iLastMessageId = messageId;
+
+ mlock.unlock();
+
if (!params->val->IsObject())
return set_socket_error("PARSE error: Job error 1");
@@ -514,7 +530,7 @@ void jpsock::disconnect(bool quiet)
quiet_close = false;
}
-bool jpsock::cmd_ret_wait(const char* sPacket, opq_json_val& poResult)
+bool jpsock::cmd_ret_wait(const char* sPacket, opq_json_val& poResult, uint64_t& messageId)
{
//printf("SEND: %s\n", sPacket);
@@ -554,8 +570,10 @@ bool jpsock::cmd_ret_wait(const char* sPacket, opq_json_val& poResult)
}
if(bSuccess)
+ {
poResult.val = &prv->oCallValue;
-
+ messageId = prv->oCallRsp.iMessageId;
+ }
return bSuccess;
}
@@ -567,9 +585,10 @@ bool jpsock::cmd_login()
usr_login.c_str(), usr_pass.c_str(), usr_rigid.c_str(), get_version_str().c_str());
opq_json_val oResult(nullptr);
+ uint64_t messageId = 0;
/*Normal error conditions (failed login etc..) will end here*/
- if (!cmd_ret_wait(cmd_buffer, oResult))
+ if (!cmd_ret_wait(cmd_buffer, oResult, messageId))
return false;
if (!oResult.val->IsObject())
@@ -624,7 +643,7 @@ bool jpsock::cmd_login()
}
opq_json_val v(job);
- if(!process_pool_job(&v))
+ if(!process_pool_job(&v, messageId))
{
disconnect();
return false;
@@ -695,8 +714,9 @@ bool jpsock::cmd_submit(const char* sJobId, uint32_t iNonce, const uint8_t* bRes
snprintf(cmd_buffer, sizeof(cmd_buffer), "{\"method\":\"submit\",\"params\":{\"id\":\"%s\",\"job_id\":\"%s\",\"nonce\":\"%s\",\"result\":\"%s\"%s%s%s},\"id\":1}\n",
sMinerId, sJobId, sNonce, sResult, sBackend, sHashcount, sAlgo);
+ uint64_t messageId = 0;
opq_json_val oResult(nullptr);
- return cmd_ret_wait(cmd_buffer, oResult);
+ return cmd_ret_wait(cmd_buffer, oResult, messageId);
}
void jpsock::save_nonce(uint32_t nonce)
diff --git a/xmrstak/net/jpsock.hpp b/xmrstak/net/jpsock.hpp
index 3afcc9b..ad34f6c 100644
--- a/xmrstak/net/jpsock.hpp
+++ b/xmrstak/net/jpsock.hpp
@@ -124,8 +124,8 @@ private:
void jpsock_thread();
bool jpsock_thd_main();
bool process_line(char* line, size_t len);
- bool process_pool_job(const opq_json_val* params);
- bool cmd_ret_wait(const char* sPacket, opq_json_val& poResult);
+ bool process_pool_job(const opq_json_val* params, const uint64_t messageId);
+ bool cmd_ret_wait(const char* sPacket, opq_json_val& poResult, uint64_t& messageId);
char sMinerId[64];
std::atomic<uint64_t> iJobDiff;
@@ -142,5 +142,8 @@ private:
opaque_private* prv;
base_socket* sck;
+
+ uint64_t iMessageCnt = 0;
+ uint64_t iLastMessageId = 0;
};
OpenPOWER on IntegriCloud