diff options
Diffstat (limited to 'xmrstak/net/jpsock.cpp')
-rw-r--r-- | xmrstak/net/jpsock.cpp | 620 |
1 files changed, 620 insertions, 0 deletions
diff --git a/xmrstak/net/jpsock.cpp b/xmrstak/net/jpsock.cpp new file mode 100644 index 0000000..d287375 --- /dev/null +++ b/xmrstak/net/jpsock.cpp @@ -0,0 +1,620 @@ +/* + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * Additional permission under GNU GPL version 3 section 7 + * + * If you modify this Program, or any covered work, by linking or combining + * it with OpenSSL (or a modified version of that library), containing parts + * covered by the terms of OpenSSL License and SSLeay License, the licensors + * of this Program grant you additional permission to convey the resulting work. + * + */ + +#include <stdarg.h> +#include <assert.h> + +#include "jpsock.h" +#include "executor.h" +#include "jconf.h" + +#include "rapidjson/document.h" +#include "jext.h" +#include "socks.h" +#include "socket.h" +#include "version.h" + +#define AGENTID_STR XMR_STAK_NAME "/" XMR_STAK_VERSION + +using namespace rapidjson; + +struct jpsock::call_rsp +{ + bool bHaveResponse; + uint64_t iCallId; + Value* pCallData; + std::string sCallErr; + + call_rsp(Value* val) : pCallData(val) + { + bHaveResponse = false; + iCallId = 0; + sCallErr.clear(); + } +}; + +typedef GenericDocument<UTF8<>, MemoryPoolAllocator<>, MemoryPoolAllocator<>> MemDocument; + +/* + * + * !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + * ASSUMPTION - only one calling thread. Multiple calling threads would require better + * thread safety. The calling thread is assumed to be the executor thread. + * If there is a reason to call the pool outside of the executor context, consider + * doing it via an executor event. + * !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + * + * Call values and allocators are for the calling thread (executor). When processing + * a call, the recv thread will make a copy of the call response and then erase its copy. + */ + +struct jpsock::opaque_private +{ + Value oCallValue; + + MemoryPoolAllocator<> callAllocator; + MemoryPoolAllocator<> recvAllocator; + MemoryPoolAllocator<> parseAllocator; + MemDocument jsonDoc; + call_rsp oCallRsp; + + opaque_private(uint8_t* bCallMem, uint8_t* bRecvMem, uint8_t* bParseMem) : + callAllocator(bCallMem, jpsock::iJsonMemSize), + recvAllocator(bRecvMem, jpsock::iJsonMemSize), + parseAllocator(bParseMem, jpsock::iJsonMemSize), + jsonDoc(&recvAllocator, jpsock::iJsonMemSize, &parseAllocator), + oCallRsp(nullptr) + { + } +}; + +struct jpsock::opq_json_val +{ + const Value* val; + opq_json_val(const Value* val) : val(val) {} +}; + +jpsock::jpsock(size_t id, bool tls) : pool_id(id) +{ + sock_init(); + + bJsonCallMem = (uint8_t*)malloc(iJsonMemSize); + bJsonRecvMem = (uint8_t*)malloc(iJsonMemSize); + bJsonParseMem = (uint8_t*)malloc(iJsonMemSize); + + prv = new opaque_private(bJsonCallMem, bJsonRecvMem, bJsonParseMem); + +#ifndef CONF_NO_TLS + if(tls) + sck = new tls_socket(this); + else + sck = new plain_socket(this); +#else + sck = new plain_socket(this); +#endif + + oRecvThd = nullptr; + bRunning = false; + bLoggedIn = false; + iJobDiff = 0; + + memset(&oCurrentJob, 0, sizeof(oCurrentJob)); +} + +jpsock::~jpsock() +{ + delete prv; + prv = nullptr; + + free(bJsonCallMem); + free(bJsonRecvMem); + free(bJsonParseMem); +} + +std::string&& jpsock::get_call_error() +{ + return std::move(prv->oCallRsp.sCallErr); +} + +bool jpsock::set_socket_error(const char* a) +{ + if(!bHaveSocketError) + { + bHaveSocketError = true; + sSocketError.assign(a); + } + + return false; +} + +bool jpsock::set_socket_error(const char* a, const char* b) +{ + if(!bHaveSocketError) + { + bHaveSocketError = true; + size_t ln_a = strlen(a); + size_t ln_b = strlen(b); + + sSocketError.reserve(ln_a + ln_b + 2); + sSocketError.assign(a, ln_a); + sSocketError.append(b, ln_b); + } + + return false; +} + +bool jpsock::set_socket_error(const char* a, size_t len) +{ + if(!bHaveSocketError) + { + bHaveSocketError = true; + sSocketError.assign(a, len); + } + + return false; +} + +bool jpsock::set_socket_error_strerr(const char* a) +{ + char sSockErrText[512]; + return set_socket_error(a, sock_strerror(sSockErrText, sizeof(sSockErrText))); +} + +bool jpsock::set_socket_error_strerr(const char* a, int res) +{ + char sSockErrText[512]; + return set_socket_error(a, sock_gai_strerror(res, sSockErrText, sizeof(sSockErrText))); +} + +void jpsock::jpsock_thread() +{ + jpsock_thd_main(); + executor::inst()->push_event(ex_event(std::move(sSocketError), pool_id)); + + // If a call is wating, send an error to end it + bool bCallWaiting = false; + std::unique_lock<std::mutex> mlock(call_mutex); + if(prv->oCallRsp.pCallData != nullptr) + { + prv->oCallRsp.bHaveResponse = true; + prv->oCallRsp.iCallId = 0; + prv->oCallRsp.pCallData = nullptr; + bCallWaiting = true; + } + mlock.unlock(); + + if(bCallWaiting) + call_cond.notify_one(); + + bRunning = false; + bLoggedIn = false; + + std::unique_lock<std::mutex>(job_mutex); + memset(&oCurrentJob, 0, sizeof(oCurrentJob)); +} + +bool jpsock::jpsock_thd_main() +{ + if(!sck->connect()) + return false; + + executor::inst()->push_event(ex_event(EV_SOCK_READY, pool_id)); + + char buf[iSockBufferSize]; + size_t datalen = 0; + while (true) + { + int ret = sck->recv(buf + datalen, sizeof(buf) - datalen); + + if(ret <= 0) + return false; + + datalen += ret; + + if (datalen >= sizeof(buf)) + { + sck->close(false); + return set_socket_error("RECEIVE error: data overflow"); + } + + char* lnend; + char* lnstart = buf; + while ((lnend = (char*)memchr(lnstart, '\n', datalen)) != nullptr) + { + lnend++; + int lnlen = lnend - lnstart; + + if (!process_line(lnstart, lnlen)) + { + sck->close(false); + return false; + } + + datalen -= lnlen; + lnstart = lnend; + } + + //Got leftover data? Move it to the front + if (datalen > 0 && buf != lnstart) + memmove(buf, lnstart, datalen); + } +} + +bool jpsock::process_line(char* line, size_t len) +{ + prv->jsonDoc.SetNull(); + prv->parseAllocator.Clear(); + prv->callAllocator.Clear(); + + /*NULL terminate the line instead of '\n', parsing will add some more NULLs*/ + line[len-1] = '\0'; + + //printf("RECV: %s\n", line); + + if (prv->jsonDoc.ParseInsitu(line).HasParseError()) + return set_socket_error("PARSE error: Invalid JSON"); + + if (!prv->jsonDoc.IsObject()) + return set_socket_error("PARSE error: Invalid root"); + + const Value* mt; + if (prv->jsonDoc.HasMember("method")) + { + mt = GetObjectMember(prv->jsonDoc, "method"); + + if(!mt->IsString()) + return set_socket_error("PARSE error: Protocol error 1"); + + if(strcmp(mt->GetString(), "job") != 0) + return set_socket_error("PARSE error: Unsupported server method ", mt->GetString()); + + mt = GetObjectMember(prv->jsonDoc, "params"); + if(mt == nullptr || !mt->IsObject()) + return set_socket_error("PARSE error: Protocol error 2"); + + opq_json_val v(mt); + return process_pool_job(&v); + } + else + { + uint64_t iCallId; + mt = GetObjectMember(prv->jsonDoc, "id"); + if (mt == nullptr || !mt->IsUint64()) + return set_socket_error("PARSE error: Protocol error 3"); + + iCallId = mt->GetUint64(); + + mt = GetObjectMember(prv->jsonDoc, "error"); + + const char* sError = nullptr; + size_t iErrorLn = 0; + if (mt == nullptr || mt->IsNull()) + { + /* If there was no error we need a result */ + if ((mt = GetObjectMember(prv->jsonDoc, "result")) == nullptr) + return set_socket_error("PARSE error: Protocol error 7"); + } + else + { + if(!mt->IsObject()) + return set_socket_error("PARSE error: Protocol error 5"); + + const Value* msg = GetObjectMember(*mt, "message"); + + if(msg == nullptr || !msg->IsString()) + return set_socket_error("PARSE error: Protocol error 6"); + + iErrorLn = msg->GetStringLength(); + sError = msg->GetString(); + } + + std::unique_lock<std::mutex> mlock(call_mutex); + if (prv->oCallRsp.pCallData == nullptr) + { + /*Server sent us a call reply without us making a call*/ + mlock.unlock(); + return set_socket_error("PARSE error: Unexpected call response"); + } + + prv->oCallRsp.bHaveResponse = true; + prv->oCallRsp.iCallId = iCallId; + + if(sError != nullptr) + { + prv->oCallRsp.pCallData = nullptr; + prv->oCallRsp.sCallErr.assign(sError, iErrorLn); + } + else + prv->oCallRsp.pCallData->CopyFrom(*mt, prv->callAllocator); + + mlock.unlock(); + call_cond.notify_one(); + + return true; + } +} + +bool jpsock::process_pool_job(const opq_json_val* params) +{ + if (!params->val->IsObject()) + return set_socket_error("PARSE error: Job error 1"); + + const Value * blob, *jobid, *target; + jobid = GetObjectMember(*params->val, "job_id"); + blob = GetObjectMember(*params->val, "blob"); + target = GetObjectMember(*params->val, "target"); + + if (jobid == nullptr || blob == nullptr || target == nullptr || + !jobid->IsString() || !blob->IsString() || !target->IsString()) + { + return set_socket_error("PARSE error: Job error 2"); + } + + if (jobid->GetStringLength() >= sizeof(pool_job::sJobID)) // Note >= + return set_socket_error("PARSE error: Job error 3"); + + uint32_t iWorkLn = blob->GetStringLength() / 2; + if (iWorkLn > sizeof(pool_job::bWorkBlob)) + return set_socket_error("PARSE error: Invalid job legth. Are you sure you are mining the correct coin?"); + + pool_job oPoolJob; + if (!hex2bin(blob->GetString(), iWorkLn * 2, oPoolJob.bWorkBlob)) + return set_socket_error("PARSE error: Job error 4"); + + oPoolJob.iWorkLen = iWorkLn; + memset(oPoolJob.sJobID, 0, sizeof(pool_job::sJobID)); + memcpy(oPoolJob.sJobID, jobid->GetString(), jobid->GetStringLength()); //Bounds checking at proto error 3 + + size_t target_slen = target->GetStringLength(); + if(target_slen <= 8) + { + uint32_t iTempInt = 0; + char sTempStr[] = "00000000"; // Little-endian CPU FTW + memcpy(sTempStr, target->GetString(), target_slen); + if(!hex2bin(sTempStr, 8, (unsigned char*)&iTempInt) || iTempInt == 0) + return set_socket_error("PARSE error: Invalid target"); + + + oPoolJob.iTarget = t32_to_t64(iTempInt); + oPoolJob.iTarget32 = iTempInt; + + } + else if(target_slen <= 16) + { + oPoolJob.iTarget = 0; + char sTempStr[] = "0000000000000000"; + memcpy(sTempStr, target->GetString(), target_slen); + if(!hex2bin(sTempStr, 16, (unsigned char*)&oPoolJob.iTarget) || oPoolJob.iTarget == 0) + return set_socket_error("PARSE error: Invalid target"); + } + else + return set_socket_error("PARSE error: Job error 5"); + + iJobDiff = t64_to_diff(oPoolJob.iTarget); + + executor::inst()->push_event(ex_event(oPoolJob, pool_id)); + + std::unique_lock<std::mutex>(job_mutex); + oCurrentJob = oPoolJob; + return true; +} + +bool jpsock::connect(const char* sAddr, std::string& sConnectError) +{ + bHaveSocketError = false; + sSocketError.clear(); + iJobDiff = 0; + + if(sck->set_hostname(sAddr)) + { + bRunning = true; + oRecvThd = new std::thread(&jpsock::jpsock_thread, this); + return true; + } + + sConnectError = std::move(sSocketError); + return false; +} + +void jpsock::disconnect() +{ + sck->close(false); + + if(oRecvThd != nullptr) + { + oRecvThd->join(); + delete oRecvThd; + oRecvThd = nullptr; + } + + sck->close(true); +} + +bool jpsock::cmd_ret_wait(const char* sPacket, opq_json_val& poResult) +{ + //printf("SEND: %s\n", sPacket); + + /*Set up the call rsp for the call reply*/ + prv->oCallValue.SetNull(); + prv->callAllocator.Clear(); + + std::unique_lock<std::mutex> mlock(call_mutex); + prv->oCallRsp = call_rsp(&prv->oCallValue); + mlock.unlock(); + + if(!sck->send(sPacket)) + { + disconnect(); //This will join the other thread; + return false; + } + + //Success is true if the server approves, result is true if there was no socket error + bool bSuccess; + mlock.lock(); + bool bResult = call_cond.wait_for(mlock, std::chrono::seconds(jconf::inst()->GetCallTimeout()), + [&]() { return prv->oCallRsp.bHaveResponse; }); + + bSuccess = prv->oCallRsp.pCallData != nullptr; + prv->oCallRsp.pCallData = nullptr; + mlock.unlock(); + + if(bHaveSocketError) + return false; + + //This means that there was no socket error, but the server is not taking to us + if(!bResult) + { + set_socket_error("CALL error: Timeout while waiting for a reply"); + disconnect(); + return false; + } + + if(bSuccess) + poResult.val = &prv->oCallValue; + + return bSuccess; +} + +bool jpsock::cmd_login(const char* sLogin, const char* sPassword) +{ + char cmd_buffer[1024]; + + snprintf(cmd_buffer, sizeof(cmd_buffer), "{\"method\":\"login\",\"params\":{\"login\":\"%s\",\"pass\":\"%s\",\"agent\":\"" AGENTID_STR "\"},\"id\":1}\n", + sLogin, sPassword); + + opq_json_val oResult(nullptr); + + /*Normal error conditions (failed login etc..) will end here*/ + if (!cmd_ret_wait(cmd_buffer, oResult)) + return false; + + if (!oResult.val->IsObject()) + { + set_socket_error("PARSE error: Login protocol error 1"); + disconnect(); + return false; + } + + const Value* id = GetObjectMember(*oResult.val, "id"); + const Value* job = GetObjectMember(*oResult.val, "job"); + + if (id == nullptr || job == nullptr || !id->IsString()) + { + set_socket_error("PARSE error: Login protocol error 2"); + disconnect(); + return false; + } + + if (id->GetStringLength() >= sizeof(sMinerId)) + { + set_socket_error("PARSE error: Login protocol error 3"); + disconnect(); + return false; + } + + memset(sMinerId, 0, sizeof(sMinerId)); + memcpy(sMinerId, id->GetString(), id->GetStringLength()); + + opq_json_val v(job); + if(!process_pool_job(&v)) + { + disconnect(); + return false; + } + + bLoggedIn = true; + + return true; +} + +bool jpsock::cmd_submit(const char* sJobId, uint32_t iNonce, const uint8_t* bResult) +{ + char cmd_buffer[1024]; + char sNonce[9]; + char sResult[65]; + + bin2hex((unsigned char*)&iNonce, 4, sNonce); + sNonce[8] = '\0'; + + bin2hex(bResult, 32, sResult); + sResult[64] = '\0'; + + snprintf(cmd_buffer, sizeof(cmd_buffer), "{\"method\":\"submit\",\"params\":{\"id\":\"%s\",\"job_id\":\"%s\",\"nonce\":\"%s\",\"result\":\"%s\"},\"id\":1}\n", + sMinerId, sJobId, sNonce, sResult); + + opq_json_val oResult(nullptr); + return cmd_ret_wait(cmd_buffer, oResult); +} + +bool jpsock::get_current_job(pool_job& job) +{ + std::unique_lock<std::mutex>(job_mutex); + + if(oCurrentJob.iWorkLen == 0) + return false; + + oCurrentJob.iResumeCnt++; + job = oCurrentJob; + return true; +} + +inline unsigned char hf_hex2bin(char c, bool &err) +{ + if (c >= '0' && c <= '9') + return c - '0'; + else if (c >= 'a' && c <= 'f') + return c - 'a' + 0xA; + else if (c >= 'A' && c <= 'F') + return c - 'A' + 0xA; + + err = true; + return 0; +} + +bool jpsock::hex2bin(const char* in, unsigned int len, unsigned char* out) +{ + bool error = false; + for (unsigned int i = 0; i < len; i += 2) + { + out[i / 2] = (hf_hex2bin(in[i], error) << 4) | hf_hex2bin(in[i + 1], error); + if (error) return false; + } + return true; +} + +inline char hf_bin2hex(unsigned char c) +{ + if (c <= 0x9) + return '0' + c; + else + return 'a' - 0xA + c; +} + +void jpsock::bin2hex(const unsigned char* in, unsigned int len, char* out) +{ + for (unsigned int i = 0; i < len; i++) + { + out[i * 2] = hf_bin2hex((in[i] & 0xF0) >> 4); + out[i * 2 + 1] = hf_bin2hex(in[i] & 0x0F); + } +} |