ConnectionsManager.cpp 155 KB


  1. /*
  2. * This is the source code of tgnet library v. 1.1
  3. * It is licensed under GNU GPL v. 2 or later.
  4. * You should have received a copy of the license in this archive (see LICENSE).
  5. *
  6. * Copyright Nikolai Kudashov, 2015-2018.
  7. */
  8. #include <cassert>
  9. #include <cstdlib>
  10. #include <sys/eventfd.h>
  11. #include <unistd.h>
  12. #include <chrono>
  13. #include <algorithm>
  14. #include <fcntl.h>
  15. #include <memory.h>
  16. #include <openssl/rand.h>
  17. #include <zlib.h>
  18. #include <memory>
  19. #include <string>
  20. #include <cinttypes>
  21. #include "ConnectionsManager.h"
  22. #include "FileLog.h"
  23. #include "EventObject.h"
  24. #include "MTProtoScheme.h"
  25. #include "ApiScheme.h"
  26. #include "NativeByteBuffer.h"
  27. #include "Connection.h"
  28. #include "Datacenter.h"
  29. #include "Request.h"
  30. #include "BuffersStorage.h"
  31. #include "ByteArray.h"
  32. #include "Config.h"
  33. #include "ProxyCheckInfo.h"
  34. #include "Handshake.h"
  35. #ifdef ANDROID
  36. #include <jni.h>
  37. JavaVM *javaVm = nullptr;
  38. JNIEnv *jniEnv[MAX_ACCOUNT_COUNT];
  39. jclass jclass_ByteBuffer = nullptr;
  40. jmethodID jclass_ByteBuffer_allocateDirect = nullptr;
  41. #endif
  42. static bool done = false;
  43. ConnectionsManager::ConnectionsManager(int32_t instance) {
  44. instanceNum = instance;
  45. if ((epolFd = epoll_create(128)) == -1) {
  46. if (LOGS_ENABLED) DEBUG_E("unable to create epoll instance");
  47. exit(1);
  48. }
  49. int flags;
  50. if ((flags = fcntl(epolFd, F_GETFD, NULL)) < 0) {
  51. if (LOGS_ENABLED) DEBUG_W("fcntl(%d, F_GETFD)", epolFd);
  52. }
  53. if (!(flags & FD_CLOEXEC)) {
  54. if (fcntl(epolFd, F_SETFD, flags | FD_CLOEXEC) == -1) {
  55. if (LOGS_ENABLED) DEBUG_W("fcntl(%d, F_SETFD)", epolFd);
  56. }
  57. }
  58. if ((epollEvents = new epoll_event[128]) == nullptr) {
  59. if (LOGS_ENABLED) DEBUG_E("unable to allocate epoll events");
  60. exit(1);
  61. }
  62. eventFd = eventfd(0, EFD_NONBLOCK);
  63. if (eventFd != -1) {
  64. struct epoll_event event = {0};
  65. event.data.ptr = new EventObject(&eventFd, EventObjectTypeEvent);
  66. event.events = EPOLLIN | EPOLLET;
  67. if (epoll_ctl(epolFd, EPOLL_CTL_ADD, eventFd, &event) == -1) {
  68. eventFd = -1;
  69. FileLog::e("unable to add eventfd");
  70. }
  71. }
  72. if (eventFd == -1) {
  73. pipeFd = new int[2];
  74. if (pipe(pipeFd) != 0) {
  75. if (LOGS_ENABLED) DEBUG_E("unable to create pipe");
  76. exit(1);
  77. }
  78. flags = fcntl(pipeFd[0], F_GETFL);
  79. if (flags == -1) {
  80. if (LOGS_ENABLED) DEBUG_E("fcntl get pipefds[0] failed");
  81. exit(1);
  82. }
  83. if (fcntl(pipeFd[0], F_SETFL, flags | O_NONBLOCK) == -1) {
  84. if (LOGS_ENABLED) DEBUG_E("fcntl set pipefds[0] failed");
  85. exit(1);
  86. }
  87. flags = fcntl(pipeFd[1], F_GETFL);
  88. if (flags == -1) {
  89. if (LOGS_ENABLED) DEBUG_E("fcntl get pipefds[1] failed");
  90. exit(1);
  91. }
  92. if (fcntl(pipeFd[1], F_SETFL, flags | O_NONBLOCK) == -1) {
  93. if (LOGS_ENABLED) DEBUG_E("fcntl set pipefds[1] failed");
  94. exit(1);
  95. }
  96. auto eventObject = new EventObject(pipeFd, EventObjectTypePipe);
  97. epoll_event eventMask = {};
  98. eventMask.events = EPOLLIN;
  99. eventMask.data.ptr = eventObject;
  100. if (epoll_ctl(epolFd, EPOLL_CTL_ADD, pipeFd[0], &eventMask) != 0) {
  101. if (LOGS_ENABLED) DEBUG_E("can't add pipe to epoll");
  102. exit(1);
  103. }
  104. }
  105. sizeCalculator = new NativeByteBuffer(true);
  106. networkBuffer = new NativeByteBuffer((uint32_t) READ_BUFFER_SIZE);
  107. if (networkBuffer == nullptr) {
  108. if (LOGS_ENABLED) DEBUG_E("unable to allocate read buffer");
  109. exit(1);
  110. }
  111. pthread_mutex_init(&mutex, nullptr);
  112. }
  113. ConnectionsManager::~ConnectionsManager() {
  114. if (epolFd != 0) {
  115. close(epolFd);
  116. epolFd = 0;
  117. }
  118. if (pipeFd != nullptr) {
  119. delete[] pipeFd;
  120. pipeFd = nullptr;
  121. }
  122. pthread_mutex_destroy(&mutex);
  123. }
  124. ConnectionsManager& ConnectionsManager::getInstance(int32_t instanceNum) {
  125. switch (instanceNum) {
  126. case 0:
  127. static ConnectionsManager instance0(0);
  128. return instance0;
  129. case 1:
  130. static ConnectionsManager instance1(1);
  131. return instance1;
  132. case 2:
  133. static ConnectionsManager instance2(2);
  134. return instance2;
  135. case 3:
  136. static ConnectionsManager instance3(3);
  137. return instance3;
  138. case 4:
  139. default:
  140. static ConnectionsManager instance4(4);
  141. return instance4;
  142. }
  143. }
  144. int ConnectionsManager::callEvents(int64_t now) {
  145. if (!events.empty()) {
  146. for (auto iter = events.begin(); iter != events.end();) {
  147. EventObject *eventObject = (*iter);
  148. if (eventObject->time <= now) {
  149. iter = events.erase(iter);
  150. eventObject->onEvent(0);
  151. } else {
  152. int diff = (int) (eventObject->time - now);
  153. return diff > 1000 || diff < 0 ? 1000 : diff;
  154. }
  155. }
  156. }
  157. if (!networkPaused) {
  158. return 1000;
  159. }
  160. auto timeToPushPing = (int32_t) ((sendingPushPing ? 30000 : nextPingTimeOffset) - llabs(now - lastPushPingTime));
  161. if (timeToPushPing <= 0) {
  162. return 1000;
  163. }
  164. return timeToPushPing;
  165. }
  166. void ConnectionsManager::checkPendingTasks() {
  167. int32_t count = INT_MAX;
  168. while (true) {
  169. std::function<void()> task;
  170. pthread_mutex_lock(&mutex);
  171. if (pendingTasks.empty() || count <= 0) {
  172. pthread_mutex_unlock(&mutex);
  173. return;
  174. }
  175. if (count == INT_MAX) {
  176. count = (int32_t) pendingTasks.size();
  177. } else {
  178. count--;
  179. }
  180. task = pendingTasks.front();
  181. pendingTasks.pop();
  182. pthread_mutex_unlock(&mutex);
  183. task();
  184. }
  185. }
  186. void ConnectionsManager::select() {
  187. checkPendingTasks();
  188. int eventsCount = epoll_wait(epolFd, epollEvents, 128, callEvents(getCurrentTimeMonotonicMillis()));
  189. checkPendingTasks();
  190. int64_t now = getCurrentTimeMonotonicMillis();
  191. callEvents(now);
  192. for (int32_t a = 0; a < eventsCount; a++) {
  193. auto eventObject = (EventObject *) epollEvents[a].data.ptr;
  194. eventObject->onEvent(epollEvents[a].events);
  195. }
  196. activeConnectionsCopy.resize(activeConnections.size());
  197. std::copy(std::begin(activeConnections), std::end(activeConnections), std::begin(activeConnectionsCopy));
  198. for (auto connection : activeConnectionsCopy) {
  199. connection->checkTimeout(now);
  200. }
  201. Datacenter *datacenter = getDatacenterWithId(currentDatacenterId);
  202. if (pushConnectionEnabled) {
  203. if ((sendingPushPing && llabs(now - lastPushPingTime) >= 30000) || llabs(now - lastPushPingTime) >= nextPingTimeOffset + 10000) {
  204. lastPushPingTime = 0;
  205. sendingPushPing = false;
  206. if (datacenter != nullptr) {
  207. Connection *connection = datacenter->getPushConnection(false);
  208. if (connection != nullptr) {
  209. connection->suspendConnection();
  210. }
  211. }
  212. if (LOGS_ENABLED) DEBUG_D("push ping timeout");
  213. }
  214. if (llabs(now - lastPushPingTime) >= nextPingTimeOffset) {
  215. if (LOGS_ENABLED) DEBUG_D("time for push ping");
  216. lastPushPingTime = now;
  217. uint8_t offset;
  218. RAND_bytes(&offset, 1);
  219. nextPingTimeOffset = 60000 * 3 + (offset % 40) - 20;
  220. if (datacenter != nullptr) {
  221. sendPing(datacenter, true);
  222. }
  223. }
  224. }
  225. if (lastPauseTime != 0 && llabs(now - lastPauseTime) >= nextSleepTimeout) {
  226. bool dontSleep = !requestingSaltsForDc.empty();
  227. if (!dontSleep) {
  228. for (auto & runningRequest : runningRequests) {
  229. Request *request = runningRequest.get();
  230. if (request->connectionType & ConnectionTypeDownload || request->connectionType & ConnectionTypeUpload) {
  231. dontSleep = true;
  232. break;
  233. }
  234. }
  235. }
  236. if (!dontSleep) {
  237. for (auto & iter : requestsQueue) {
  238. Request *request = iter.get();
  239. if (request->connectionType & ConnectionTypeDownload || request->connectionType & ConnectionTypeUpload) {
  240. dontSleep = true;
  241. break;
  242. }
  243. }
  244. }
  245. if (!dontSleep) {
  246. if (!networkPaused) {
  247. if (LOGS_ENABLED) DEBUG_D("pausing network and timers by sleep time = %d", nextSleepTimeout);
  248. for (auto & dc : datacenters) {
  249. dc.second->suspendConnections(false);
  250. }
  251. }
  252. networkPaused = true;
  253. return;
  254. } else {
  255. lastPauseTime = now;
  256. if (LOGS_ENABLED) DEBUG_D("don't sleep because of salt, upload or download request");
  257. }
  258. }
  259. if (networkPaused) {
  260. networkPaused = false;
  261. for (auto & dc : datacenters) {
  262. if (dc.second->isHandshaking(false)) {
  263. dc.second->createGenericConnection()->connect();
  264. } else if (dc.second->isHandshaking(true)) {
  265. dc.second->createGenericMediaConnection()->connect();
  266. }
  267. }
  268. if (LOGS_ENABLED) DEBUG_D("resume network and timers");
  269. }
  270. if (delegate != nullptr) {
  271. delegate->onUpdate(instanceNum);
  272. }
  273. if (datacenter != nullptr) {
  274. if (datacenter->hasAuthKey(ConnectionTypeGeneric, 1)) {
  275. if (llabs(now - lastPingTime) >= (testBackend ? 2000 : 19000)) {
  276. lastPingTime = now;
  277. sendPing(datacenter, false);
  278. }
  279. if (abs((int32_t) (now / 1000) - lastDcUpdateTime) >= DC_UPDATE_TIME) {
  280. updateDcSettings(0, false);
  281. }
  282. processRequestQueue(0, 0);
  283. } else if (!datacenter->isHandshakingAny()) {
  284. datacenter->beginHandshake(HandshakeTypeAll, true);
  285. }
  286. }
  287. }
  288. void ConnectionsManager::scheduleTask(std::function<void()> task) {
  289. pthread_mutex_lock(&mutex);
  290. pendingTasks.push(task);
  291. pthread_mutex_unlock(&mutex);
  292. wakeup();
  293. }
  294. void ConnectionsManager::scheduleEvent(EventObject *eventObject, uint32_t time) {
  295. eventObject->time = getCurrentTimeMonotonicMillis() + time;
  296. std::list<EventObject *>::iterator iter;
  297. for (iter = events.begin(); iter != events.end(); iter++) {
  298. if ((*iter)->time > eventObject->time) {
  299. break;
  300. }
  301. }
  302. events.insert(iter, eventObject);
  303. }
  304. void ConnectionsManager::removeEvent(EventObject *eventObject) {
  305. for (auto iter = events.begin(); iter != events.end(); iter++) {
  306. if (*iter == eventObject) {
  307. events.erase(iter);
  308. break;
  309. }
  310. }
  311. }
  312. void ConnectionsManager::wakeup() {
  313. if (pipeFd == nullptr) {
  314. eventfd_write(eventFd, 1);
  315. } else {
  316. char ch = 'x';
  317. write(pipeFd[1], &ch, 1);
  318. }
  319. }
  320. void *ConnectionsManager::ThreadProc(void *data) {
  321. if (LOGS_ENABLED) DEBUG_D("network thread started");
  322. auto networkManager = (ConnectionsManager *) (data);
  323. #ifdef ANDROID
  324. javaVm->AttachCurrentThread(&jniEnv[networkManager->instanceNum], nullptr);
  325. #endif
  326. if (networkManager->currentUserId != 0 && networkManager->pushConnectionEnabled) {
  327. Datacenter *datacenter = networkManager->getDatacenterWithId(networkManager->currentDatacenterId);
  328. if (datacenter != nullptr) {
  329. datacenter->createPushConnection()->setSessionId(networkManager->pushSessionId);
  330. networkManager->sendPing(datacenter, true);
  331. }
  332. }
  333. do {
  334. networkManager->select();
  335. } while (!done);
  336. return nullptr;
  337. }
  338. void ConnectionsManager::loadConfig() {
  339. if (config == nullptr) {
  340. config = new Config(instanceNum, "tgnet.dat");
  341. }
  342. NativeByteBuffer *buffer = config->readConfig();
  343. if (buffer != nullptr) {
  344. uint32_t version = buffer->readUint32(nullptr);
  345. if (LOGS_ENABLED) DEBUG_D("config version = %u", version);
  346. if (version <= configVersion) {
  347. testBackend = buffer->readBool(nullptr);
  348. if (version >= 3) {
  349. clientBlocked = buffer->readBool(nullptr);
  350. }
  351. if (version >= 4) {
  352. lastInitSystemLangcode = buffer->readString(nullptr);
  353. }
  354. if (buffer->readBool(nullptr)) {
  355. currentDatacenterId = buffer->readUint32(nullptr);
  356. timeDifference = buffer->readInt32(nullptr);
  357. lastDcUpdateTime = buffer->readInt32(nullptr);
  358. pushSessionId = buffer->readInt64(nullptr);
  359. if (version >= 2) {
  360. registeredForInternalPush = buffer->readBool(nullptr);
  361. }
  362. if (version >= 5) {
  363. int32_t lastServerTime = buffer->readInt32(nullptr);
  364. int32_t currentTime = getCurrentTime();
  365. if (currentTime > timeDifference && currentTime < lastServerTime) {
  366. timeDifference += (lastServerTime - currentTime);
  367. }
  368. }
  369. if (LOGS_ENABLED) DEBUG_D("current dc id = %u, time difference = %d, registered for push = %d", currentDatacenterId, timeDifference, (int32_t) registeredForInternalPush);
  370. uint32_t count = buffer->readUint32(nullptr);
  371. for (uint32_t a = 0; a < count; a++) {
  372. sessionsToDestroy.push_back(buffer->readInt64(nullptr));
  373. }
  374. count = buffer->readUint32(nullptr);
  375. for (uint32_t a = 0; a < count; a++) {
  376. auto datacenter = new Datacenter(instanceNum, buffer);
  377. datacenters[datacenter->getDatacenterId()] = datacenter;
  378. if (LOGS_ENABLED) DEBUG_D("datacenter(%p) %u loaded (hasAuthKey = %d, 0x%" PRIx64 ")", datacenter, datacenter->getDatacenterId(), (int) datacenter->hasPermanentAuthKey(), datacenter->getPermanentAuthKeyId());
  379. }
  380. }
  381. }
  382. buffer->reuse();
  383. }
  384. if (currentDatacenterId != 0 && currentUserId) {
  385. Datacenter *datacenter = getDatacenterWithId(currentDatacenterId);
  386. if (datacenter == nullptr || !datacenter->hasPermanentAuthKey()) {
  387. if (datacenter != nullptr) {
  388. if (LOGS_ENABLED) DEBUG_D("reset authorization because of dc %d", currentDatacenterId);
  389. }
  390. currentDatacenterId = 0;
  391. datacenters.clear();
  392. scheduleTask([&] {
  393. if (delegate != nullptr) {
  394. delegate->onLogout(instanceNum);
  395. }
  396. });
  397. }
  398. }
  399. initDatacenters();
  400. if ((!datacenters.empty() && currentDatacenterId == 0) || pushSessionId == 0) {
  401. if (pushSessionId == 0) {
  402. RAND_bytes((uint8_t *) &pushSessionId, 8);
  403. }
  404. if (currentDatacenterId == 0) {
  405. currentDatacenterId = 2;
  406. }
  407. saveConfig();
  408. }
  409. movingToDatacenterId = DEFAULT_DATACENTER_ID;
  410. }
  411. void ConnectionsManager::saveConfigInternal(NativeByteBuffer *buffer) {
  412. buffer->writeInt32(configVersion);
  413. buffer->writeBool(testBackend);
  414. buffer->writeBool(clientBlocked);
  415. buffer->writeString(lastInitSystemLangcode);
  416. Datacenter *currentDatacenter = getDatacenterWithId(currentDatacenterId);
  417. buffer->writeBool(currentDatacenter != nullptr);
  418. if (currentDatacenter != nullptr) {
  419. buffer->writeInt32(currentDatacenterId);
  420. buffer->writeInt32(timeDifference);
  421. buffer->writeInt32(lastDcUpdateTime);
  422. buffer->writeInt64(pushSessionId);
  423. buffer->writeBool(registeredForInternalPush);
  424. buffer->writeInt32(getCurrentTime());
  425. std::vector<int64_t> sessions;
  426. currentDatacenter->getSessions(sessions);
  427. auto count = (uint32_t) sessions.size();
  428. buffer->writeInt32(count);
  429. for (uint32_t a = 0; a < count; a++) {
  430. buffer->writeInt64(sessions[a]);
  431. }
  432. count = (uint32_t) datacenters.size();
  433. buffer->writeInt32(count);
  434. for (auto & datacenter : datacenters) {
  435. datacenter.second->serializeToStream(buffer);
  436. }
  437. }
  438. }
  439. void ConnectionsManager::saveConfig() {
  440. if (config == nullptr) {
  441. config = new Config(instanceNum, "tgnet.dat");
  442. }
  443. sizeCalculator->clearCapacity();
  444. saveConfigInternal(sizeCalculator);
  445. NativeByteBuffer *buffer = BuffersStorage::getInstance().getFreeBuffer(sizeCalculator->capacity());
  446. saveConfigInternal(buffer);
  447. config->writeConfig(buffer);
  448. buffer->reuse();
  449. }
  450. inline NativeByteBuffer *decompressGZip(NativeByteBuffer *data) {
  451. int retCode;
  452. z_stream stream;
  453. memset(&stream, 0, sizeof(z_stream));
  454. stream.avail_in = data->limit();
  455. stream.next_in = data->bytes();
  456. retCode = inflateInit2(&stream, 15 + 32);
  457. if (retCode != Z_OK) {
  458. if (LOGS_ENABLED) DEBUG_E("can't decompress data");
  459. exit(1);
  460. }
  461. NativeByteBuffer *result = BuffersStorage::getInstance().getFreeBuffer(data->limit() * 4);
  462. stream.avail_out = result->capacity();
  463. stream.next_out = result->bytes();
  464. while (1) {
  465. retCode = inflate(&stream, Z_NO_FLUSH);
  466. if (retCode == Z_STREAM_END) {
  467. break;
  468. }
  469. if (retCode == Z_OK) {
  470. NativeByteBuffer *newResult = BuffersStorage::getInstance().getFreeBuffer(result->capacity() * 2);
  471. memcpy(newResult->bytes(), result->bytes(), result->capacity());
  472. stream.avail_out = newResult->capacity() - result->capacity();
  473. stream.next_out = newResult->bytes() + result->capacity();
  474. result->reuse();
  475. result = newResult;
  476. } else {
  477. if (LOGS_ENABLED) DEBUG_E("can't decompress data");
  478. exit(1);
  479. }
  480. }
  481. result->limit((uint32_t) stream.total_out);
  482. inflateEnd(&stream);
  483. return result;
  484. }
  485. inline NativeByteBuffer *compressGZip(NativeByteBuffer *buffer) {
  486. if (buffer == nullptr || buffer->limit() == 0) {
  487. return nullptr;
  488. }
  489. z_stream stream;
  490. int retCode;
  491. memset(&stream, 0, sizeof(z_stream));
  492. stream.avail_in = buffer->limit();
  493. stream.next_in = buffer->bytes();
  494. retCode = deflateInit2(&stream, Z_BEST_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY);
  495. if (retCode != Z_OK) {
  496. if (LOGS_ENABLED) DEBUG_E("%s: deflateInit2() failed with error %i", __PRETTY_FUNCTION__, retCode);
  497. return nullptr;
  498. }
  499. NativeByteBuffer *result = BuffersStorage::getInstance().getFreeBuffer(buffer->limit());
  500. stream.avail_out = result->limit();
  501. stream.next_out = result->bytes();
  502. retCode = deflate(&stream, Z_FINISH);
  503. if ((retCode != Z_OK) && (retCode != Z_STREAM_END)) {
  504. if (LOGS_ENABLED) DEBUG_E("%s: deflate() failed with error %i", __PRETTY_FUNCTION__, retCode);
  505. deflateEnd(&stream);
  506. result->reuse();
  507. return nullptr;
  508. }
  509. if (retCode != Z_STREAM_END || stream.total_out >= buffer->limit() - 4) {
  510. deflateEnd(&stream);
  511. result->reuse();
  512. return nullptr;
  513. }
  514. result->limit((uint32_t) stream.total_out);
  515. deflateEnd(&stream);
  516. return result;
  517. }
  518. int64_t ConnectionsManager::getCurrentTimeMillis() {
  519. clock_gettime(CLOCK_REALTIME, &timeSpec);
  520. return (int64_t) timeSpec.tv_sec * 1000 + (int64_t) timeSpec.tv_nsec / 1000000;
  521. }
  522. int64_t ConnectionsManager::getCurrentTimeMonotonicMillis() {
  523. clock_gettime(CLOCK_BOOTTIME, &timeSpecMonotonic);
  524. return (int64_t) timeSpecMonotonic.tv_sec * 1000 + (int64_t) timeSpecMonotonic.tv_nsec / 1000000;
  525. }
  526. int32_t ConnectionsManager::getCurrentTime() {
  527. return (int32_t) (getCurrentTimeMillis() / 1000) + timeDifference;
  528. }
  529. uint32_t ConnectionsManager::getCurrentDatacenterId() {
  530. Datacenter *datacenter = getDatacenterWithId(DEFAULT_DATACENTER_ID);
  531. return datacenter != nullptr ? datacenter->getDatacenterId() : INT_MAX;
  532. }
  533. bool ConnectionsManager::isTestBackend() {
  534. return testBackend;
  535. }
  536. int32_t ConnectionsManager::getTimeDifference() {
  537. return timeDifference;
  538. }
  539. int64_t ConnectionsManager::generateMessageId() {
  540. auto messageId = (int64_t) ((((double) getCurrentTimeMillis() + ((double) timeDifference) * 1000) * 4294967296.0) / 1000.0);
  541. if (messageId <= lastOutgoingMessageId) {
  542. messageId = lastOutgoingMessageId + 1;
  543. }
  544. while (messageId % 4 != 0) {
  545. messageId++;
  546. }
  547. lastOutgoingMessageId = messageId;
  548. return messageId;
  549. }
  550. bool ConnectionsManager::isNetworkAvailable() {
  551. return networkAvailable;
  552. }
  553. void ConnectionsManager::cleanUp(bool resetKeys, int32_t datacenterId) {
  554. scheduleTask([&, resetKeys, datacenterId] {
  555. for (auto iter = requestsQueue.begin(); iter != requestsQueue.end();) {
  556. Request *request = iter->get();
  557. if (datacenterId != -1) {
  558. Datacenter *requestDatacenter = getDatacenterWithId(request->datacenterId);
  559. if (requestDatacenter != nullptr && requestDatacenter->getDatacenterId() != datacenterId) {
  560. iter++;
  561. continue;
  562. }
  563. }
  564. if (request->requestFlags & RequestFlagWithoutLogin) {
  565. iter++;
  566. continue;
  567. }
  568. if (request->onCompleteRequestCallback != nullptr) {
  569. auto error = new TL_error();
  570. error->code = -1000;
  571. error->text = "";
  572. request->onComplete(nullptr, error, 0, 0);
  573. delete error;
  574. }
  575. iter = requestsQueue.erase(iter);
  576. }
  577. for (auto iter = runningRequests.begin(); iter != runningRequests.end();) {
  578. Request *request = iter->get();
  579. if (datacenterId != -1) {
  580. Datacenter *requestDatacenter = getDatacenterWithId(request->datacenterId);
  581. if (requestDatacenter != nullptr && requestDatacenter->getDatacenterId() != datacenterId) {
  582. iter++;
  583. continue;
  584. }
  585. }
  586. if (request->requestFlags & RequestFlagWithoutLogin) {
  587. iter++;
  588. continue;
  589. }
  590. if (request->onCompleteRequestCallback != nullptr) {
  591. auto error = new TL_error();
  592. error->code = -1000;
  593. error->text = "";
  594. request->onComplete(nullptr, error, 0, 0);
  595. delete error;
  596. }
  597. iter = runningRequests.erase(iter);
  598. }
  599. quickAckIdToRequestIds.clear();
  600. for (auto & datacenter : datacenters) {
  601. if (datacenterId != -1 && datacenter.second->getDatacenterId() != datacenterId) {
  602. continue;
  603. }
  604. if (resetKeys) {
  605. datacenter.second->clearAuthKey(HandshakeTypeAll);
  606. }
  607. datacenter.second->recreateSessions(HandshakeTypeAll);
  608. datacenter.second->authorized = false;
  609. }
  610. if (datacenterId == -1) {
  611. sessionsToDestroy.clear();
  612. currentUserId = 0;
  613. registeredForInternalPush = false;
  614. }
  615. saveConfig();
  616. });
  617. }
  618. void ConnectionsManager::onConnectionClosed(Connection *connection, int reason) {
  619. Datacenter *datacenter = connection->getDatacenter();
  620. if ((connection->getConnectionType() == ConnectionTypeGeneric || connection->getConnectionType() == ConnectionTypeGenericMedia) && datacenter->isHandshakingAny()) {
  621. datacenter->onHandshakeConnectionClosed(connection);
  622. }
  623. if (connection->getConnectionType() == ConnectionTypeGeneric) {
  624. if (datacenter->getDatacenterId() == currentDatacenterId) {
  625. sendingPing = false;
  626. if (!connection->isSuspended() && (proxyAddress.empty() || connection->hasTlsHashMismatch())) {
  627. if (reason == 2) {
  628. disconnectTimeoutAmount += connection->getTimeout();
  629. } else {
  630. disconnectTimeoutAmount += 4;
  631. }
  632. if (LOGS_ENABLED) DEBUG_D("increase disconnect timeout %d", disconnectTimeoutAmount);
  633. int32_t maxTimeout;
  634. if (clientBlocked) {
  635. maxTimeout = 5;
  636. } else {
  637. maxTimeout = 20;
  638. }
  639. if (disconnectTimeoutAmount >= maxTimeout) {
  640. if (!connection->hasUsefullData()) {
  641. if (LOGS_ENABLED) DEBUG_D("start requesting new address and port due to timeout reach");
  642. requestingSecondAddressByTlsHashMismatch = connection->hasTlsHashMismatch();
  643. if (requestingSecondAddressByTlsHashMismatch) {
  644. requestingSecondAddress = 1;
  645. } else {
  646. requestingSecondAddress = 0;
  647. }
  648. delegate->onRequestNewServerIpAndPort(requestingSecondAddress, instanceNum);
  649. } else {
  650. if (LOGS_ENABLED) DEBUG_D("connection has usefull data, don't request anything");
  651. }
  652. disconnectTimeoutAmount = 0;
  653. }
  654. }
  655. if (networkAvailable) {
  656. if (proxyAddress.empty()) {
  657. if (connectionState != ConnectionStateConnecting) {
  658. connectionState = ConnectionStateConnecting;
  659. if (delegate != nullptr) {
  660. delegate->onConnectionStateChanged(connectionState, instanceNum);
  661. }
  662. }
  663. } else {
  664. if (connectionState != ConnectionStateConnectingViaProxy) {
  665. connectionState = ConnectionStateConnectingViaProxy;
  666. if (delegate != nullptr) {
  667. delegate->onConnectionStateChanged(connectionState, instanceNum);
  668. }
  669. }
  670. }
  671. } else {
  672. if (connectionState != ConnectionStateWaitingForNetwork) {
  673. connectionState = ConnectionStateWaitingForNetwork;
  674. if (delegate != nullptr) {
  675. delegate->onConnectionStateChanged(connectionState, instanceNum);
  676. }
  677. }
  678. }
  679. }
  680. } else if (connection->getConnectionType() == ConnectionTypePush) {
  681. if (LOGS_ENABLED) DEBUG_D("connection(%p) push connection closed", connection);
  682. sendingPushPing = false;
  683. lastPushPingTime = getCurrentTimeMonotonicMillis() - nextPingTimeOffset + 4000;
  684. } else if (connection->getConnectionType() == ConnectionTypeProxy) {
  685. scheduleTask([&, connection] {
  686. for (auto iter = proxyActiveChecks.begin(); iter != proxyActiveChecks.end(); iter++) {
  687. ProxyCheckInfo *proxyCheckInfo = iter->get();
  688. if (proxyCheckInfo->connectionNum == connection->getConnectionNum()) {
  689. bool found = false;
  690. for (auto iter2 = runningRequests.begin(); iter2 != runningRequests.end(); iter2++) {
  691. Request *request = iter2->get();
  692. if (connection->getConnectionToken() == request->connectionToken && request->requestToken == proxyCheckInfo->requestToken && (request->connectionType & 0x0000ffff) == ConnectionTypeProxy) {
  693. request->completed = true;
  694. runningRequests.erase(iter2);
  695. proxyCheckInfo->onRequestTime(-1);
  696. found = true;
  697. break;
  698. }
  699. }
  700. if (found) {
  701. proxyActiveChecks.erase(iter);
  702. if (!proxyCheckQueue.empty()) {
  703. proxyCheckInfo = proxyCheckQueue[0].release();
  704. proxyCheckQueue.erase(proxyCheckQueue.begin());
  705. checkProxyInternal(proxyCheckInfo);
  706. }
  707. }
  708. break;
  709. }
  710. }
  711. });
  712. }
  713. }
  714. void ConnectionsManager::onConnectionConnected(Connection *connection) {
  715. Datacenter *datacenter = connection->getDatacenter();
  716. ConnectionType connectionType = connection->getConnectionType();
  717. if ((connectionType == ConnectionTypeGeneric || connectionType == ConnectionTypeGenericMedia) && datacenter->isHandshakingAny()) {
  718. datacenter->onHandshakeConnectionConnected(connection);
  719. return;
  720. }
  721. if (datacenter->hasAuthKey(connectionType, 1)) {
  722. if (connectionType == ConnectionTypePush) {
  723. sendingPushPing = false;
  724. lastPushPingTime = getCurrentTimeMonotonicMillis();
  725. sendPing(datacenter, true);
  726. } else {
  727. if (connectionType == ConnectionTypeGeneric && datacenter->getDatacenterId() == currentDatacenterId) {
  728. sendingPing = false;
  729. }
  730. if (networkPaused && lastPauseTime != 0) {
  731. lastPauseTime = getCurrentTimeMonotonicMillis();
  732. }
  733. processRequestQueue(connection->getConnectionType(), datacenter->getDatacenterId());
  734. }
  735. }
  736. }
  737. void ConnectionsManager::onConnectionQuickAckReceived(Connection *connection, int32_t ack) {
  738. auto iter = quickAckIdToRequestIds.find(ack);
  739. if (iter == quickAckIdToRequestIds.end()) {
  740. return;
  741. }
  742. for (auto & runningRequest : runningRequests) {
  743. Request *request = runningRequest.get();
  744. if (std::find(iter->second.begin(), iter->second.end(), request->requestToken) != iter->second.end()) {
  745. request->onQuickAck();
  746. }
  747. }
  748. quickAckIdToRequestIds.erase(iter);
  749. }
  750. void ConnectionsManager::onConnectionDataReceived(Connection *connection, NativeByteBuffer *data, uint32_t length) {
  751. bool error = false;
  752. if (length <= 24 + 32) {
  753. int32_t code = data->readInt32(&error);
  754. if (code == 0) {
  755. if (LOGS_ENABLED) DEBUG_D("mtproto noop");
  756. } else if (code == -1) {
  757. int32_t ackId = data->readInt32(&error);
  758. if (!error) {
  759. onConnectionQuickAckReceived(connection, ackId & (~(1 << 31)));
  760. }
  761. } else {
  762. Datacenter *datacenter = connection->getDatacenter();
  763. if (LOGS_ENABLED) DEBUG_W("mtproto error = %d", code);
  764. if (code == -444 && connection->getConnectionType() == ConnectionTypeGeneric && !proxyAddress.empty() && !proxySecret.empty()) {
  765. if (delegate != nullptr) {
  766. delegate->onProxyError(instanceNum);
  767. }
  768. } else if (code == -404 && (datacenter->isCdnDatacenter || PFS_ENABLED)) {
  769. if (!datacenter->isHandshaking(connection->isMediaConnection)) {
  770. datacenter->clearAuthKey(connection->isMediaConnection ? HandshakeTypeMediaTemp : HandshakeTypeTemp);
  771. datacenter->beginHandshake(connection->isMediaConnection ? HandshakeTypeMediaTemp : HandshakeTypeTemp, true);
  772. if (LOGS_ENABLED) DEBUG_D("connection(%p, account%u, dc%u, type %d) reset auth key due to -404 error", connection, instanceNum, datacenter->getDatacenterId(), connection->getConnectionType());
  773. }
  774. } else {
  775. connection->reconnect();
  776. }
  777. }
  778. return;
  779. }
  780. uint32_t mark = data->position();
  781. int64_t keyId = data->readInt64(&error);
  782. if (error) {
  783. connection->reconnect();
  784. return;
  785. }
  786. Datacenter *datacenter = connection->getDatacenter();
  787. if (connectionState != ConnectionStateConnected && connection->getConnectionType() == ConnectionTypeGeneric && datacenter->getDatacenterId() == currentDatacenterId) {
  788. connectionState = ConnectionStateConnected;
  789. if (delegate != nullptr) {
  790. delegate->onConnectionStateChanged(connectionState, instanceNum);
  791. }
  792. }
  793. if (keyId == 0) {
  794. int64_t messageId = data->readInt64(&error);
  795. if (error) {
  796. connection->reconnect();
  797. return;
  798. }
  799. if (connection->isMessageIdProcessed(messageId)) {
  800. return;
  801. }
  802. uint32_t messageLength = data->readUint32(&error);
  803. if (error) {
  804. connection->reconnect();
  805. return;
  806. }
  807. if (!connection->allowsCustomPadding()) {
  808. if (messageLength != data->remaining()) {
  809. if (LOGS_ENABLED) DEBUG_E("connection(%p) received incorrect message length", connection);
  810. connection->reconnect();
  811. return;
  812. }
  813. }
  814. TLObject *request;
  815. if (datacenter->isHandshaking(connection->isMediaConnection)) {
  816. request = datacenter->getCurrentHandshakeRequest(connection->isMediaConnection);
  817. } else {
  818. request = getRequestWithMessageId(messageId);
  819. }
  820. deserializingDatacenter = datacenter;
  821. TLObject *object = TLdeserialize(request, messageLength, data);
  822. if (object != nullptr) {
  823. if (datacenter->isHandshaking(connection->isMediaConnection)) {
  824. datacenter->processHandshakeResponse(connection->isMediaConnection, object, messageId);
  825. } else {
  826. processServerResponse(object, messageId, 0, 0, connection, 0, 0);
  827. connection->addProcessedMessageId(messageId);
  828. }
  829. lastProtocolUsefullData = true;
  830. connection->setHasUsefullData();
  831. delete object;
  832. }
  833. } else {
  834. if (connection->allowsCustomPadding()) {
  835. uint32_t padding = (length - 24) % 16;
  836. if (padding != 0) {
  837. length -= padding;
  838. }
  839. }
  840. if (length < 24 + 32 || (!connection->allowsCustomPadding() && (length - 24) % 16 != 0) || !datacenter->decryptServerResponse(keyId, data->bytes() + mark + 8, data->bytes() + mark + 24, length - 24, connection)) {
  841. if (LOGS_ENABLED) DEBUG_E("connection(%p) unable to decrypt server response", connection);
  842. connection->reconnect();
  843. return;
  844. }
  845. data->position(mark + 24);
  846. int64_t messageServerSalt = data->readInt64(&error);
  847. int64_t messageSessionId = data->readInt64(&error);
  848. if (messageSessionId != connection->getSessionId()) {
  849. if (LOGS_ENABLED) DEBUG_E("connection(%p) received invalid message session id (0x%" PRIx64 " instead of 0x%" PRIx64 ")", connection, (uint64_t) messageSessionId, (uint64_t) connection->getSessionId());
  850. return;
  851. }
  852. int64_t messageId = data->readInt64(&error);
  853. int32_t messageSeqNo = data->readInt32(&error);
  854. uint32_t messageLength = data->readUint32(&error);
  855. int32_t processedStatus = connection->isMessageIdProcessed(messageId);
  856. if (messageSeqNo % 2 != 0) {
  857. connection->addMessageToConfirm(messageId);
  858. }
  859. TLObject *object = nullptr;
  860. if (processedStatus != 1) {
  861. deserializingDatacenter = datacenter;
  862. object = TLdeserialize(nullptr, messageLength, data);
  863. if (processedStatus == 2) {
  864. if (object == nullptr) {
  865. connection->recreateSession();
  866. connection->reconnect();
  867. return;
  868. } else {
  869. delete object;
  870. object = nullptr;
  871. }
  872. }
  873. }
  874. if (!processedStatus) {
  875. if (object != nullptr) {
  876. lastProtocolUsefullData = true;
  877. connection->setHasUsefullData();
  878. if (LOGS_ENABLED) DEBUG_D("connection(%p, account%u, dc%u, type %d) received object %s", connection, instanceNum, datacenter->getDatacenterId(), connection->getConnectionType(), typeid(*object).name());
  879. processServerResponse(object, messageId, messageSeqNo, messageServerSalt, connection, 0, 0);
  880. connection->addProcessedMessageId(messageId);
  881. delete object;
  882. if (connection->getConnectionType() == ConnectionTypePush) {
  883. std::vector<std::unique_ptr<NetworkMessage>> messages;
  884. sendMessagesToConnectionWithConfirmation(messages, connection, false);
  885. }
  886. } else {
  887. if (delegate != nullptr) {
  888. delegate->onUnparsedMessageReceived(0, data, connection->getConnectionType(), instanceNum);
  889. }
  890. }
  891. } else {
  892. std::vector<std::unique_ptr<NetworkMessage>> messages;
  893. sendMessagesToConnectionWithConfirmation(messages, connection, false);
  894. }
  895. }
  896. }
  897. bool ConnectionsManager::hasPendingRequestsForConnection(Connection *connection) {
  898. ConnectionType type = connection->getConnectionType();
  899. if (type == ConnectionTypeGeneric || type == ConnectionTypeTemp || type == ConnectionTypeGenericMedia) {
  900. Datacenter *datacenter = connection->getDatacenter();
  901. int8_t num = connection->getConnectionNum();
  902. uint32_t token = connection->getConnectionToken();
  903. if (type == ConnectionTypeGeneric) {
  904. if (sendingPing && type == ConnectionTypeGeneric && datacenter->getDatacenterId() == currentDatacenterId) {
  905. return true;
  906. } else if (datacenter->isHandshaking(false)) {
  907. return true;
  908. }
  909. } else if (type == ConnectionTypeGenericMedia) {
  910. if (datacenter->isHandshaking(true)) {
  911. return true;
  912. }
  913. }
  914. for (auto & runningRequest : runningRequests) {
  915. Request *request = runningRequest.get();
  916. auto connectionNum = (uint8_t) (request->connectionType >> 16);
  917. auto connectionType = (ConnectionType) (request->connectionType & 0x0000ffff);
  918. if ((connectionType == type && connectionNum == num) || request->connectionToken == token) {
  919. return true;
  920. }
  921. }
  922. return false;
  923. }
  924. return true;
  925. }
  926. TLObject *ConnectionsManager::getRequestWithMessageId(int64_t messageId) {
  927. for (auto & runningRequest : runningRequests) {
  928. Request *request = runningRequest.get();
  929. if (request->messageId == messageId) {
  930. return request->rawRequest;
  931. }
  932. }
  933. return nullptr;
  934. }
  935. TLObject *ConnectionsManager::TLdeserialize(TLObject *request, uint32_t bytes, NativeByteBuffer *data) {
  936. bool error = false;
  937. uint32_t position = data->position();
  938. uint32_t constructor = data->readUint32(&error);
  939. if (error) {
  940. data->position(position);
  941. return nullptr;
  942. }
  943. TLObject *object = TLClassStore::TLdeserialize(data, bytes, constructor, instanceNum, error);
  944. if (error) {
  945. delete object;
  946. data->position(position);
  947. return nullptr;
  948. }
  949. if (object == nullptr) {
  950. if (request != nullptr) {
  951. auto apiRequest = dynamic_cast<TL_api_request *>(request);
  952. if (apiRequest != nullptr) {
  953. object = apiRequest->deserializeResponse(data, bytes, instanceNum, error);
  954. if (LOGS_ENABLED) DEBUG_D("api request constructor 0x%x, don't parse", constructor);
  955. } else {
  956. object = request->deserializeResponse(data, constructor, instanceNum, error);
  957. if (object != nullptr && error) {
  958. delete object;
  959. object = nullptr;
  960. }
  961. }
  962. } else {
  963. if (LOGS_ENABLED) DEBUG_D("not found request to parse constructor 0x%x", constructor);
  964. }
  965. }
  966. if (object == nullptr) {
  967. data->position(position);
  968. }
  969. return object;
  970. }
  971. void ConnectionsManager::processServerResponse(TLObject *message, int64_t messageId, int32_t messageSeqNo, int64_t messageSalt, Connection *connection, int64_t innerMsgId, int64_t containerMessageId) {
  972. const std::type_info &typeInfo = typeid(*message);
  973. if (LOGS_ENABLED) DEBUG_D("process server response %p - %s", message, typeInfo.name());
  974. auto timeMessage = (int64_t) ((messageId != 0 ? messageId : innerMsgId) / 4294967296.0 * 1000);
  975. Datacenter *datacenter = connection->getDatacenter();
  976. if (typeInfo == typeid(TL_new_session_created)) {
  977. auto response = (TL_new_session_created *) message;
  978. if (!connection->isSessionProcessed(response->unique_id)) {
  979. if (LOGS_ENABLED) DEBUG_D("connection(%p, account%u, dc%u, type %d) new session created (first message id: 0x%" PRIx64 ", server salt: 0x%" PRIx64 ", unique id: 0x%" PRIx64 ")", connection, instanceNum, datacenter->getDatacenterId(), connection->getConnectionType(), (uint64_t) response->first_msg_id, (uint64_t) response->server_salt, (uint64_t) response->unique_id);
  980. std::unique_ptr<TL_future_salt> salt = std::make_unique<TL_future_salt>();
  981. salt->valid_until = salt->valid_since = getCurrentTime();
  982. salt->valid_until += 30 * 60;
  983. salt->salt = response->server_salt;
  984. datacenter->addServerSalt(salt, Connection::isMediaConnectionType(connection->getConnectionType()));
  985. for (auto & runningRequest : runningRequests) {
  986. Request *request = runningRequest.get();
  987. Datacenter *requestDatacenter = getDatacenterWithId(request->datacenterId);
  988. if (request->messageId < response->first_msg_id && request->connectionType & connection->getConnectionType() && requestDatacenter != nullptr && requestDatacenter->getDatacenterId() == datacenter->getDatacenterId()) {
  989. if (LOGS_ENABLED) DEBUG_D("clear request %p - %s", request->rawRequest, typeid(*request->rawRequest).name());
  990. request->clear(true);
  991. }
  992. }
  993. saveConfig();
  994. if (datacenter->getDatacenterId() == currentDatacenterId && currentUserId) {
  995. if (connection->getConnectionType() == ConnectionTypePush) {
  996. registerForInternalPushUpdates();
  997. } else if (connection->getConnectionType() == ConnectionTypeGeneric) {
  998. if (delegate != nullptr) {
  999. delegate->onSessionCreated(instanceNum);
  1000. }
  1001. }
  1002. }
  1003. connection->addProcessedSession(response->unique_id);
  1004. }
  1005. } else if (typeInfo == typeid(TL_msg_container)) {
  1006. auto response = (TL_msg_container *) message;
  1007. size_t count = response->messages.size();
  1008. if (LOGS_ENABLED) DEBUG_D("received container with %d items", (int32_t) count);
  1009. for (uint32_t a = 0; a < count; a++) {
  1010. TL_message *innerMessage = response->messages[a].get();
  1011. int64_t innerMessageId = innerMessage->msg_id;
  1012. if (innerMessage->seqno % 2 != 0) {
  1013. connection->addMessageToConfirm(innerMessageId);
  1014. }
  1015. int32_t processedStatus = connection->isMessageIdProcessed(innerMessageId);
  1016. if (processedStatus == 2) {
  1017. if (innerMessage->unparsedBody != nullptr) {
  1018. connection->recreateSession();
  1019. connection->reconnect();
  1020. return;
  1021. }
  1022. processedStatus = 0;
  1023. }
  1024. if (processedStatus) {
  1025. if (LOGS_ENABLED) DEBUG_D("inner message %d id 0x%" PRIx64 " already processed", a, innerMessageId);
  1026. continue;
  1027. }
  1028. if (innerMessage->unparsedBody != nullptr) {
  1029. if (LOGS_ENABLED) DEBUG_D("inner message %d id 0x%" PRIx64 " is unparsed", a, innerMessageId);
  1030. if (delegate != nullptr) {
  1031. delegate->onUnparsedMessageReceived(0, innerMessage->unparsedBody.get(), connection->getConnectionType(), instanceNum);
  1032. }
  1033. } else {
  1034. if (LOGS_ENABLED) DEBUG_D("inner message %d id 0x%" PRIx64 " process", a, innerMessageId);
  1035. processServerResponse(innerMessage->body.get(), 0, innerMessage->seqno, messageSalt, connection, innerMessageId, messageId);
  1036. }
  1037. connection->addProcessedMessageId(innerMessageId);
  1038. }
  1039. } else if (typeInfo == typeid(TL_pong)) {
  1040. if (connection->getConnectionType() == ConnectionTypePush) {
  1041. if (!registeredForInternalPush) {
  1042. registerForInternalPushUpdates();
  1043. }
  1044. if (LOGS_ENABLED) DEBUG_D("connection(%p, account%u, dc%u, type %d) received push ping", connection, instanceNum, datacenter->getDatacenterId(), connection->getConnectionType());
  1045. sendingPushPing = false;
  1046. } else {
  1047. auto response = (TL_pong *) message;
  1048. if (response->ping_id >= 2000000) {
  1049. for (auto iter = proxyActiveChecks.begin(); iter != proxyActiveChecks.end(); iter++) {
  1050. ProxyCheckInfo *proxyCheckInfo = iter->get();
  1051. if (proxyCheckInfo->pingId == response->ping_id) {
  1052. for (auto iter2 = runningRequests.begin(); iter2 != runningRequests.end(); iter2++) {
  1053. Request *request = iter2->get();
  1054. if (request->requestToken == proxyCheckInfo->requestToken) {
  1055. int64_t ping = llabs(getCurrentTimeMonotonicMillis() - request->startTimeMillis);
  1056. if (LOGS_ENABLED) DEBUG_D("got ping response for request %p, %" PRId64, request->rawRequest, ping);
  1057. request->completed = true;
  1058. proxyCheckInfo->onRequestTime(ping);
  1059. runningRequests.erase(iter2);
  1060. break;
  1061. }
  1062. }
  1063. proxyActiveChecks.erase(iter);
  1064. if (!proxyCheckQueue.empty()) {
  1065. proxyCheckInfo = proxyCheckQueue[0].release();
  1066. proxyCheckQueue.erase(proxyCheckQueue.begin());
  1067. scheduleCheckProxyInternal(proxyCheckInfo);
  1068. }
  1069. break;
  1070. }
  1071. }
  1072. } else if (response->ping_id == lastPingId) {
  1073. int32_t diff = (int32_t) (getCurrentTimeMonotonicMillis() / 1000) - pingTime;
  1074. if (abs(diff) < 10) {
  1075. currentPingTime = (diff + currentPingTime) / 2;
  1076. if (messageId != 0) {
  1077. timeDifference = (int32_t) ((timeMessage - getCurrentTimeMillis()) / 1000 - currentPingTime / 2);
  1078. }
  1079. }
  1080. sendingPing = false;
  1081. }
  1082. }
  1083. } else if (typeInfo == typeid(TL_future_salts)) {
  1084. auto response = (TL_future_salts *) message;
  1085. int64_t requestMid = response->req_msg_id;
  1086. for (auto iter = runningRequests.begin(); iter != runningRequests.end(); iter++) {
  1087. Request *request = iter->get();
  1088. if (request->respondsToMessageId(requestMid)) {
  1089. request->onComplete(response, nullptr, connection->currentNetworkType, timeMessage);
  1090. request->completed = true;
  1091. runningRequests.erase(iter);
  1092. break;
  1093. }
  1094. }
  1095. } else if (dynamic_cast<DestroySessionRes *>(message)) {
  1096. auto response = (DestroySessionRes *) message;
  1097. if (LOGS_ENABLED) DEBUG_D("destroyed session 0x%" PRIx64 " (%s)", (uint64_t) response->session_id, typeInfo == typeid(TL_destroy_session_ok) ? "ok" : "not found");
  1098. } else if (typeInfo == typeid(TL_rpc_result)) {
  1099. auto response = (TL_rpc_result *) message;
  1100. int64_t resultMid = response->req_msg_id;
  1101. if (resultMid == lastInvokeAfterMessageId) {
  1102. lastInvokeAfterMessageId = 0;
  1103. }
  1104. bool hasResult = response->result != nullptr;
  1105. bool ignoreResult = false;
  1106. if (hasResult) {
  1107. TLObject *object = response->result.get();
  1108. if (LOGS_ENABLED) DEBUG_D("connection(%p, account%u, dc%u, type %d) received rpc_result with %s", connection, instanceNum, datacenter->getDatacenterId(), connection->getConnectionType(), typeid(*object).name());
  1109. }
  1110. RpcError *error = hasResult ? dynamic_cast<RpcError *>(response->result.get()) : nullptr;
  1111. if (error != nullptr) {
  1112. if (LOGS_ENABLED) DEBUG_E("connection(%p, account%u, dc%u, type %d) rpc error %d: %s", connection, instanceNum, datacenter->getDatacenterId(), connection->getConnectionType(), error->error_code, error->error_message.c_str());
  1113. if (error->error_code == 303) {
  1114. uint32_t migrateToDatacenterId = DEFAULT_DATACENTER_ID;
  1115. static std::vector<std::string> migrateErrors = {"NETWORK_MIGRATE_", "PHONE_MIGRATE_", "USER_MIGRATE_"};
  1116. size_t count = migrateErrors.size();
  1117. for (uint32_t a = 0; a < count; a++) {
  1118. std::string &possibleError = migrateErrors[a];
  1119. if (error->error_message.find(possibleError) != std::string::npos) {
  1120. std::string num = error->error_message.substr(possibleError.size(), error->error_message.size() - possibleError.size());
  1121. auto val = (uint32_t) atoi(num.c_str());
  1122. migrateToDatacenterId = val;
  1123. }
  1124. }
  1125. if (migrateToDatacenterId != DEFAULT_DATACENTER_ID) {
  1126. ignoreResult = true;
  1127. moveToDatacenter(migrateToDatacenterId);
  1128. }
  1129. }
  1130. }
  1131. uint32_t retryRequestsFromDatacenter = DEFAULT_DATACENTER_ID - 1;
  1132. uint32_t retryRequestsConnections = 0;
  1133. if (!ignoreResult) {
  1134. for (auto iter = runningRequests.begin(); iter != runningRequests.end(); iter++) {
  1135. Request *request = iter->get();
  1136. if (!request->respondsToMessageId(resultMid)) {
  1137. continue;
  1138. }
  1139. if (LOGS_ENABLED) DEBUG_D("got response for request %p - %s", request->rawRequest, typeid(*request->rawRequest).name());
  1140. bool discardResponse = false;
  1141. bool isError = false;
  1142. bool allowInitConnection = true;
  1143. if (request->onCompleteRequestCallback != nullptr) {
  1144. TL_error *implicitError = nullptr;
  1145. NativeByteBuffer *unpacked_data = nullptr;
  1146. TLObject *result = response->result.get();
  1147. if (typeid(*result) == typeid(TL_gzip_packed)) {
  1148. auto innerResponse = (TL_gzip_packed *) result;
  1149. unpacked_data = decompressGZip(innerResponse->packed_data.get());
  1150. TLObject *object = TLdeserialize(request->rawRequest, unpacked_data->limit(), unpacked_data);
  1151. if (object != nullptr) {
  1152. response->result = std::unique_ptr<TLObject>(object);
  1153. } else {
  1154. response->result = std::unique_ptr<TLObject>(nullptr);
  1155. }
  1156. }
  1157. hasResult = response->result != nullptr;
  1158. error = hasResult ? dynamic_cast<RpcError *>(response->result.get()) : nullptr;
  1159. TL_error *error2 = hasResult ? dynamic_cast<TL_error *>(response->result.get()) : nullptr;
  1160. if (error != nullptr) {
  1161. allowInitConnection = false;
  1162. static std::string authRestart = "AUTH_RESTART";
  1163. static std::string authKeyPermEmpty = "AUTH_KEY_PERM_EMPTY";
  1164. static std::string workerBusy = "WORKER_BUSY_TOO_LONG_RETRY";
  1165. bool processEvenFailed = error->error_code == 500 && error->error_message.find(authRestart) != std::string::npos;
  1166. bool isWorkerBusy = error->error_code == 500 && error->error_message.find(workerBusy) != std::string::npos;
  1167. if (LOGS_ENABLED) DEBUG_E("request %p rpc error %d: %s", request, error->error_code, error->error_message.c_str());
  1168. if (error->error_code == 401 && error->error_message.find(authKeyPermEmpty) != std::string::npos) {
  1169. discardResponse = true;
  1170. request->minStartTime = (int32_t) (getCurrentTimeMonotonicMillis() / 1000 + 1);
  1171. request->startTime = 0;
  1172. if (!datacenter->isHandshaking(connection->isMediaConnection)) {
  1173. datacenter->clearAuthKey(connection->isMediaConnection ? HandshakeTypeMediaTemp : HandshakeTypeTemp);
  1174. saveConfig();
  1175. datacenter->beginHandshake(connection->isMediaConnection ? HandshakeTypeMediaTemp : HandshakeTypeTemp, false);
  1176. }
  1177. } else if ((request->requestFlags & RequestFlagFailOnServerErrors) == 0 || processEvenFailed) {
  1178. if (error->error_code == 500 || error->error_code < 0) {
  1179. static std::string waitFailed = "MSG_WAIT_FAILED";
  1180. static std::string waitTimeout = "MSG_WAIT_TIMEOUT";
  1181. if (error->error_message.find(waitFailed) != std::string::npos) {
  1182. request->startTime = 0;
  1183. request->startTimeMillis = 0;
  1184. request->requestFlags |= RequestFlagResendAfter;
  1185. } else {
  1186. if (isWorkerBusy) {
  1187. request->minStartTime = 0;
  1188. } else {
  1189. request->minStartTime = request->startTime + (request->serverFailureCount > 10 ? 10 : request->serverFailureCount);
  1190. }
  1191. request->serverFailureCount++;
  1192. }
  1193. discardResponse = true;
  1194. } else if (error->error_code == 420) {
  1195. int32_t waitTime = 2;
  1196. static std::string floodWait = "FLOOD_WAIT_";
  1197. static std::string slowmodeWait = "SLOWMODE_WAIT_";
  1198. discardResponse = true;
  1199. if (error->error_message.find(floodWait) != std::string::npos) {
  1200. std::string num = error->error_message.substr(floodWait.size(), error->error_message.size() - floodWait.size());
  1201. waitTime = atoi(num.c_str());
  1202. if (waitTime <= 0) {
  1203. waitTime = 2;
  1204. }
  1205. } else if (error->error_message.find(slowmodeWait) != std::string::npos) {
  1206. std::string num = error->error_message.substr(slowmodeWait.size(), error->error_message.size() - slowmodeWait.size());
  1207. waitTime = atoi(num.c_str());
  1208. if (waitTime <= 0) {
  1209. waitTime = 2;
  1210. }
  1211. discardResponse = false;
  1212. }
  1213. request->failedByFloodWait = waitTime;
  1214. request->startTime = 0;
  1215. request->startTimeMillis = 0;
  1216. request->minStartTime = (int32_t) (getCurrentTimeMonotonicMillis() / 1000 + waitTime);
  1217. } else if (error->error_code == 400) {
  1218. static std::string waitFailed = "MSG_WAIT_FAILED";
  1219. static std::string bindFailed = "ENCRYPTED_MESSAGE_INVALID";
  1220. static std::string waitTimeout = "MSG_WAIT_TIMEOUT";
  1221. if (error->error_message.find(waitTimeout) != std::string::npos || error->error_message.find(waitFailed) != std::string::npos) {
  1222. discardResponse = true;
  1223. request->startTime = 0;
  1224. request->startTimeMillis = 0;
  1225. request->requestFlags |= RequestFlagResendAfter;
  1226. } else if (error->error_message.find(bindFailed) != std::string::npos && typeid(*request->rawRequest) == typeid(TL_auth_bindTempAuthKey)) {
  1227. int datacenterId;
  1228. if (delegate != nullptr && getDatacenterWithId(DEFAULT_DATACENTER_ID) == datacenter) {
  1229. delegate->onLogout(instanceNum);
  1230. datacenterId = -1;
  1231. } else {
  1232. datacenterId = datacenter->getDatacenterId();
  1233. }
  1234. cleanUp(true, datacenterId);
  1235. }
  1236. }
  1237. }
  1238. if (!discardResponse) {
  1239. implicitError = new TL_error();
  1240. implicitError->code = error->error_code;
  1241. implicitError->text = error->error_message;
  1242. }
  1243. } else if (error2 == nullptr) {
  1244. if (request->rawRequest == nullptr || response->result == nullptr) {
  1245. allowInitConnection = false;
  1246. if (LOGS_ENABLED) DEBUG_E("rawRequest is null");
  1247. implicitError = new TL_error();
  1248. implicitError->code = -1000;
  1249. implicitError->text = "";
  1250. }
  1251. }
  1252. if (!discardResponse) {
  1253. if (implicitError != nullptr || error2 != nullptr) {
  1254. isError = true;
  1255. request->onComplete(nullptr, implicitError != nullptr ? implicitError : error2, connection->currentNetworkType, timeMessage);
  1256. delete error2;
  1257. } else {
  1258. request->onComplete(response->result.get(), nullptr, connection->currentNetworkType, timeMessage);
  1259. }
  1260. }
  1261. if (implicitError != nullptr) {
  1262. if (implicitError->code == 401) {
  1263. allowInitConnection = false;
  1264. isError = true;
  1265. static std::string sessionPasswordNeeded = "SESSION_PASSWORD_NEEDED";
  1266. if (implicitError->text.find(sessionPasswordNeeded) != std::string::npos) {
  1267. //ignore this error
  1268. } else if (datacenter->getDatacenterId() == currentDatacenterId || datacenter->getDatacenterId() == movingToDatacenterId) {
  1269. if (request->connectionType & ConnectionTypeGeneric && currentUserId) {
  1270. currentUserId = 0;
  1271. if (delegate != nullptr) {
  1272. delegate->onLogout(instanceNum);
  1273. }
  1274. cleanUp(false, -1);
  1275. }
  1276. } else {
  1277. datacenter->authorized = false;
  1278. saveConfig();
  1279. discardResponse = true;
  1280. if (request->connectionType & ConnectionTypeDownload || request->connectionType & ConnectionTypeUpload) {
  1281. retryRequestsFromDatacenter = datacenter->getDatacenterId();
  1282. retryRequestsConnections = request->connectionType;
  1283. }
  1284. }
  1285. } else if (currentUserId == 0 && implicitError->code == 406) {
  1286. static std::string authKeyDuplicated = "AUTH_KEY_DUPLICATED";
  1287. if (implicitError->text.find(authKeyDuplicated) != std::string::npos) {
  1288. cleanUp(true, datacenter->getDatacenterId());
  1289. }
  1290. }
  1291. }
  1292. if (unpacked_data != nullptr) {
  1293. unpacked_data->reuse();
  1294. }
  1295. delete implicitError;
  1296. }
  1297. if (!discardResponse) {
  1298. if (allowInitConnection && !isError) {
  1299. bool save = false;
  1300. if (request->isInitRequest && datacenter->lastInitVersion != currentVersion) {
  1301. datacenter->lastInitVersion = currentVersion;
  1302. save = true;
  1303. } else if (request->isInitMediaRequest && datacenter->lastInitMediaVersion != currentVersion) {
  1304. datacenter->lastInitMediaVersion = currentVersion;
  1305. save = true;
  1306. }
  1307. if (save) {
  1308. saveConfig();
  1309. if (LOGS_ENABLED) DEBUG_D("dc%d init connection completed", datacenter->getDatacenterId());
  1310. }
  1311. }
  1312. request->completed = true;
  1313. removeRequestFromGuid(request->requestToken);
  1314. runningRequests.erase(iter);
  1315. } else {
  1316. request->messageId = 0;
  1317. request->messageSeqNo = 0;
  1318. request->connectionToken = 0;
  1319. }
  1320. break;
  1321. }
  1322. }
  1323. if (retryRequestsFromDatacenter != DEFAULT_DATACENTER_ID - 1) {
  1324. processRequestQueue(retryRequestsConnections, retryRequestsFromDatacenter);
  1325. } else {
  1326. processRequestQueue(0, 0);
  1327. }
  1328. } else if (typeInfo == typeid(TL_msgs_ack)) {
  1329. } else if (typeInfo == typeid(TL_bad_msg_notification)) {
  1330. auto result = (TL_bad_msg_notification *) message;
  1331. if (LOGS_ENABLED) DEBUG_E("bad message notification %d for messageId 0x%" PRIx64 ", seqno %d", result->error_code, result->bad_msg_id, result->bad_msg_seqno);
  1332. switch (result->error_code) {
  1333. case 16:
  1334. case 17:
  1335. case 19:
  1336. case 32:
  1337. case 33:
  1338. case 64: {
  1339. int64_t realId = messageId != 0 ? messageId : containerMessageId;
  1340. if (realId == 0) {
  1341. realId = innerMsgId;
  1342. }
  1343. if (realId != 0) {
  1344. auto time = (int64_t) (messageId / 4294967296.0 * 1000);
  1345. int64_t currentTime = getCurrentTimeMillis();
  1346. timeDifference = (int32_t) ((time - currentTime) / 1000 - currentPingTime / 2);
  1347. }
  1348. datacenter->recreateSessions(HandshakeTypeAll);
  1349. saveConfig();
  1350. lastOutgoingMessageId = 0;
  1351. clearRequestsForDatacenter(datacenter, HandshakeTypeAll);
  1352. break;
  1353. }
  1354. case 20: {
  1355. for (auto & runningRequest : runningRequests) {
  1356. Request *request = runningRequest.get();
  1357. if (request->respondsToMessageId(result->bad_msg_id)) {
  1358. if (request->completed) {
  1359. break;
  1360. }
  1361. connection->addMessageToConfirm(result->bad_msg_id);
  1362. request->clear(true);
  1363. break;
  1364. }
  1365. }
  1366. }
  1367. default:
  1368. break;
  1369. }
  1370. } else if (typeInfo == typeid(TL_bad_server_salt)) {
  1371. bool media = Connection::isMediaConnectionType(connection->getConnectionType());
  1372. requestSaltsForDatacenter(datacenter, media, connection->getConnectionType() == ConnectionTypeTemp);
  1373. if (messageId != 0) {
  1374. auto time = (int64_t) (messageId / 4294967296.0 * 1000);
  1375. int64_t currentTime = getCurrentTimeMillis();
  1376. timeDifference = (int32_t) ((time - currentTime) / 1000 - currentPingTime / 2);
  1377. lastOutgoingMessageId = (messageId > lastOutgoingMessageId ? messageId : lastOutgoingMessageId);
  1378. }
  1379. if ((connection->getConnectionType() & ConnectionTypeDownload) == 0 || !datacenter->containsServerSalt(messageSalt, media)) {
  1380. auto response = (TL_bad_server_salt *) message;
  1381. int64_t resultMid = response->bad_msg_id;
  1382. if (resultMid != 0) {
  1383. bool beginHandshake = false;
  1384. for (auto & runningRequest : runningRequests) {
  1385. Request *request = runningRequest.get();
  1386. if (!beginHandshake && request->datacenterId == datacenter->getDatacenterId() && typeid(*request->rawRequest) == typeid(TL_auth_bindTempAuthKey) && request->respondsToMessageId(response->bad_msg_id)) {
  1387. beginHandshake = true;
  1388. }
  1389. if ((request->connectionType & ConnectionTypeDownload) == 0) {
  1390. continue;
  1391. }
  1392. Datacenter *requestDatacenter = getDatacenterWithId(request->datacenterId);
  1393. if (requestDatacenter != nullptr && requestDatacenter->getDatacenterId() == datacenter->getDatacenterId()) {
  1394. request->retryCount = 0;
  1395. request->failedBySalt = true;
  1396. }
  1397. }
  1398. if (beginHandshake) {
  1399. datacenter->beginHandshake(HandshakeTypeCurrent, false);
  1400. }
  1401. }
  1402. datacenter->clearServerSalts(media);
  1403. std::unique_ptr<TL_future_salt> salt = std::make_unique<TL_future_salt>();
  1404. salt->valid_until = salt->valid_since = getCurrentTime();
  1405. salt->valid_until += 30 * 60;
  1406. salt->salt = messageSalt;
  1407. datacenter->addServerSalt(salt, media);
  1408. saveConfig();
  1409. if (datacenter->hasAuthKey(ConnectionTypeGeneric, 1)) {
  1410. processRequestQueue(AllConnectionTypes, datacenter->getDatacenterId());
  1411. }
  1412. }
  1413. } else if (typeInfo == typeid(MsgsStateInfo)) {
  1414. auto response = (MsgsStateInfo *) message;
  1415. if (LOGS_ENABLED) DEBUG_D("connection(%p, account%u, dc%u, type %d) got %s for messageId 0x%" PRIx64, connection, instanceNum, datacenter->getDatacenterId(), connection->getConnectionType(), typeInfo.name(), response->req_msg_id);
  1416. auto mIter = resendRequests.find(response->req_msg_id);
  1417. if (mIter != resendRequests.end()) {
  1418. if (LOGS_ENABLED) DEBUG_D("found resend for messageId 0x%" PRIx64, mIter->second);
  1419. connection->addMessageToConfirm(mIter->second);
  1420. for (auto & runningRequest : runningRequests) {
  1421. Request *request = runningRequest.get();
  1422. if (request->respondsToMessageId(mIter->second)) {
  1423. if (request->completed) {
  1424. break;
  1425. }
  1426. request->clear(true);
  1427. break;
  1428. }
  1429. }
  1430. resendRequests.erase(mIter);
  1431. }
  1432. } else if (dynamic_cast<MsgDetailedInfo *>(message)) {
  1433. auto response = (MsgDetailedInfo *) message;
  1434. bool requestResend = false;
  1435. bool confirm = true;
  1436. if (LOGS_ENABLED) DEBUG_D("connection(%p, account%u, dc%u, type %d) got %s for messageId 0x%" PRIx64, connection, instanceNum, datacenter->getDatacenterId(), connection->getConnectionType(), typeInfo.name(), response->msg_id);
  1437. if (typeInfo == typeid(TL_msg_detailed_info)) {
  1438. for (auto & runningRequest : runningRequests) {
  1439. Request *request = runningRequest.get();
  1440. if (request->respondsToMessageId(response->msg_id)) {
  1441. if (request->completed) {
  1442. break;
  1443. }
  1444. if (LOGS_ENABLED) DEBUG_D("got TL_msg_detailed_info for rpc request %p - %s", request->rawRequest, typeid(*request->rawRequest).name());
  1445. auto currentTime = (int32_t) (getCurrentTimeMonotonicMillis() / 1000);
  1446. if (request->lastResendTime == 0 || abs(currentTime - request->lastResendTime) >= 60) {
  1447. request->lastResendTime = currentTime;
  1448. requestResend = true;
  1449. } else {
  1450. confirm = false;
  1451. }
  1452. break;
  1453. }
  1454. }
  1455. } else {
  1456. if (!connection->isMessageIdProcessed(messageId)) {
  1457. requestResend = true;
  1458. }
  1459. }
  1460. if (requestResend) {
  1461. auto request = new TL_msg_resend_req();
  1462. request->msg_ids.push_back(response->answer_msg_id);
  1463. auto networkMessage = new NetworkMessage();
  1464. networkMessage->message = std::make_unique<TL_message>();
  1465. networkMessage->message->msg_id = generateMessageId();
  1466. networkMessage->message->bytes = request->getObjectSize();
  1467. networkMessage->message->body = std::unique_ptr<TLObject>(request);
  1468. networkMessage->message->seqno = connection->generateMessageSeqNo(false);
  1469. resendRequests[networkMessage->message->msg_id] = response->answer_msg_id;
  1470. std::vector<std::unique_ptr<NetworkMessage>> array;
  1471. array.push_back(std::unique_ptr<NetworkMessage>(networkMessage));
  1472. sendMessagesToConnection(array, connection, false);
  1473. } else if (confirm) {
  1474. connection->addMessageToConfirm(response->answer_msg_id);
  1475. }
  1476. } else if (typeInfo == typeid(TL_gzip_packed)) {
  1477. auto response = (TL_gzip_packed *) message;
  1478. NativeByteBuffer *data = decompressGZip(response->packed_data.get());
  1479. TLObject *object = TLdeserialize(getRequestWithMessageId(messageId), data->limit(), data);
  1480. if (object != nullptr) {
  1481. if (LOGS_ENABLED) DEBUG_D("connection(%p, account%u, dc%u, type %d) received object %s", connection, instanceNum, datacenter->getDatacenterId(), connection->getConnectionType(), typeid(*object).name());
  1482. processServerResponse(object, messageId, messageSeqNo, messageSalt, connection, innerMsgId, containerMessageId);
  1483. delete object;
  1484. } else {
  1485. if (delegate != nullptr) {
  1486. delegate->onUnparsedMessageReceived(messageId, data, connection->getConnectionType(), instanceNum);
  1487. }
  1488. }
  1489. data->reuse();
  1490. } else if (typeInfo == typeid(TL_updatesTooLong)) {
  1491. if (connection->connectionType == ConnectionTypePush) {
  1492. if (networkPaused) {
  1493. lastPauseTime = getCurrentTimeMonotonicMillis();
  1494. if (LOGS_ENABLED) DEBUG_D("received internal push: wakeup network in background");
  1495. } else if (lastPauseTime != 0) {
  1496. lastPauseTime = getCurrentTimeMonotonicMillis();
  1497. if (LOGS_ENABLED) DEBUG_D("received internal push: reset sleep timeout");
  1498. } else {
  1499. if (LOGS_ENABLED) DEBUG_D("received internal push");
  1500. }
  1501. if (delegate != nullptr) {
  1502. delegate->onInternalPushReceived(instanceNum);
  1503. }
  1504. } else {
  1505. if (delegate != nullptr) {
  1506. NativeByteBuffer *data = BuffersStorage::getInstance().getFreeBuffer(message->getObjectSize());
  1507. message->serializeToStream(data);
  1508. data->position(0);
  1509. delegate->onUnparsedMessageReceived(0, data, connection->getConnectionType(), instanceNum);
  1510. data->reuse();
  1511. }
  1512. }
  1513. }
  1514. }
  1515. void ConnectionsManager::sendPing(Datacenter *datacenter, bool usePushConnection) {
  1516. if (usePushConnection && (currentUserId == 0 || !usePushConnection)) {
  1517. return;
  1518. }
  1519. Connection *connection = nullptr;
  1520. if (usePushConnection) {
  1521. connection = datacenter->getPushConnection(true);
  1522. } else {
  1523. connection = datacenter->getGenericConnection(true, 0);
  1524. }
  1525. if (connection == nullptr || (!usePushConnection && connection->getConnectionToken() == 0)) {
  1526. return;
  1527. }
  1528. auto request = new TL_ping_delay_disconnect();
  1529. request->ping_id = ++lastPingId;
  1530. if (usePushConnection) {
  1531. request->disconnect_delay = 60 * 7;
  1532. } else {
  1533. request->disconnect_delay = testBackend ? 10 : 35;
  1534. pingTime = (int32_t) (getCurrentTimeMonotonicMillis() / 1000);
  1535. }
  1536. auto networkMessage = new NetworkMessage();
  1537. networkMessage->message = std::make_unique<TL_message>();
  1538. networkMessage->message->msg_id = generateMessageId();
  1539. networkMessage->message->bytes = request->getObjectSize();
  1540. networkMessage->message->body = std::unique_ptr<TLObject>(request);
  1541. networkMessage->message->seqno = connection->generateMessageSeqNo(false);
  1542. std::vector<std::unique_ptr<NetworkMessage>> array;
  1543. array.push_back(std::unique_ptr<NetworkMessage>(networkMessage));
  1544. NativeByteBuffer *transportData = datacenter->createRequestsData(array, nullptr, connection, false);
  1545. if (usePushConnection) {
  1546. if (LOGS_ENABLED) DEBUG_D("dc%d send ping to push connection", datacenter->getDatacenterId());
  1547. sendingPushPing = true;
  1548. } else {
  1549. sendingPing = true;
  1550. }
  1551. connection->sendData(transportData, false, true);
  1552. }
  1553. uint8_t ConnectionsManager::getIpStratagy() {
  1554. return ipStrategy;
  1555. }
  1556. void ConnectionsManager::initDatacenters() {
  1557. Datacenter *datacenter;
  1558. if (!testBackend) {
  1559. if (datacenters.find(1) == datacenters.end()) {
  1560. datacenter = new Datacenter(instanceNum, 1);
  1561. datacenter->addAddressAndPort("149.154.175.50", 443, 0, "");
  1562. datacenter->addAddressAndPort("2001:b28:f23d:f001:0000:0000:0000:000a", 443, 1, "");
  1563. datacenters[1] = datacenter;
  1564. }
  1565. if (datacenters.find(2) == datacenters.end()) {
  1566. datacenter = new Datacenter(instanceNum, 2);
  1567. datacenter->addAddressAndPort("149.154.167.51", 443, 0, "");
  1568. datacenter->addAddressAndPort("95.161.76.100", 443, 0, "");
  1569. datacenter->addAddressAndPort("2001:67c:4e8:f002:0000:0000:0000:000a", 443, 1, "");
  1570. datacenters[2] = datacenter;
  1571. }
  1572. if (datacenters.find(3) == datacenters.end()) {
  1573. datacenter = new Datacenter(instanceNum, 3);
  1574. datacenter->addAddressAndPort("149.154.175.100", 443, 0, "");
  1575. datacenter->addAddressAndPort("2001:b28:f23d:f003:0000:0000:0000:000a", 443, 1, "");
  1576. datacenters[3] = datacenter;
  1577. }
  1578. if (datacenters.find(4) == datacenters.end()) {
  1579. datacenter = new Datacenter(instanceNum, 4);
  1580. datacenter->addAddressAndPort("149.154.167.91", 443, 0, "");
  1581. datacenter->addAddressAndPort("2001:67c:4e8:f004:0000:0000:0000:000a", 443, 1, "");
  1582. datacenters[4] = datacenter;
  1583. }
  1584. if (datacenters.find(5) == datacenters.end()) {
  1585. datacenter = new Datacenter(instanceNum, 5);
  1586. datacenter->addAddressAndPort("149.154.171.5", 443, 0, "");
  1587. datacenter->addAddressAndPort("2001:b28:f23f:f005:0000:0000:0000:000a", 443, 1, "");
  1588. datacenters[5] = datacenter;
  1589. }
  1590. } else {
  1591. if (datacenters.find(1) == datacenters.end()) {
  1592. datacenter = new Datacenter(instanceNum, 1);
  1593. datacenter->addAddressAndPort("149.154.175.40", 443, 0, "");
  1594. datacenter->addAddressAndPort("2001:b28:f23d:f001:0000:0000:0000:000e", 443, 1, "");
  1595. datacenters[1] = datacenter;
  1596. }
  1597. if (datacenters.find(2) == datacenters.end()) {
  1598. datacenter = new Datacenter(instanceNum, 2);
  1599. datacenter->addAddressAndPort("149.154.167.40", 443, 0, "");
  1600. datacenter->addAddressAndPort("2001:67c:4e8:f002:0000:0000:0000:000e", 443, 1, "");
  1601. datacenters[2] = datacenter;
  1602. }
  1603. if (datacenters.find(3) == datacenters.end()) {
  1604. datacenter = new Datacenter(instanceNum, 3);
  1605. datacenter->addAddressAndPort("149.154.175.117", 443, 0, "");
  1606. datacenter->addAddressAndPort("2001:b28:f23d:f003:0000:0000:0000:000e", 443, 1, "");
  1607. datacenters[3] = datacenter;
  1608. }
  1609. }
  1610. }
  1611. void ConnectionsManager::attachConnection(ConnectionSocket *connection) {
  1612. if (std::find(activeConnections.begin(), activeConnections.end(), connection) != activeConnections.end()) {
  1613. return;
  1614. }
  1615. activeConnections.push_back(connection);
  1616. }
  1617. void ConnectionsManager::detachConnection(ConnectionSocket *connection) {
  1618. auto iter = std::find(activeConnections.begin(), activeConnections.end(), connection);
  1619. if (iter != activeConnections.end()) {
  1620. activeConnections.erase(iter);
  1621. }
  1622. }
  1623. int32_t ConnectionsManager::sendRequestInternal(TLObject *object, onCompleteFunc onComplete, onQuickAckFunc onQuickAck, uint32_t flags, uint32_t datacenterId, ConnectionType connetionType, bool immediate) {
  1624. if (!currentUserId && !(flags & RequestFlagWithoutLogin)) {
  1625. if (LOGS_ENABLED) DEBUG_D("can't do request without login %s", typeid(*object).name());
  1626. delete object;
  1627. return 0;
  1628. }
  1629. auto request = new Request(instanceNum, lastRequestToken++, connetionType, flags, datacenterId, onComplete, onQuickAck, nullptr);
  1630. request->rawRequest = object;
  1631. request->rpcRequest = wrapInLayer(object, getDatacenterWithId(datacenterId), request);
  1632. requestsQueue.push_back(std::unique_ptr<Request>(request));
  1633. if (immediate) {
  1634. processRequestQueue(0, 0);
  1635. }
  1636. return request->requestToken;
  1637. }
  1638. int32_t ConnectionsManager::sendRequest(TLObject *object, onCompleteFunc onComplete, onQuickAckFunc onQuickAck, uint32_t flags, uint32_t datacenterId, ConnectionType connetionType, bool immediate) {
  1639. int32_t requestToken = lastRequestToken++;
  1640. return sendRequest(object, onComplete, onQuickAck, flags, datacenterId, connetionType, immediate, requestToken);
  1641. }
  1642. int32_t ConnectionsManager::sendRequest(TLObject *object, onCompleteFunc onComplete, onQuickAckFunc onQuickAck, uint32_t flags, uint32_t datacenterId, ConnectionType connetionType, bool immediate, int32_t requestToken) {
  1643. if (!currentUserId && !(flags & RequestFlagWithoutLogin)) {
  1644. if (LOGS_ENABLED) DEBUG_D("can't do request without login %s", typeid(*object).name());
  1645. delete object;
  1646. return 0;
  1647. }
  1648. if (requestToken == 0) {
  1649. requestToken = lastRequestToken++;
  1650. }
  1651. scheduleTask([&, requestToken, object, onComplete, onQuickAck, flags, datacenterId, connetionType, immediate] {
  1652. auto request = new Request(instanceNum, requestToken, connetionType, flags, datacenterId, onComplete, onQuickAck, nullptr);
  1653. request->rawRequest = object;
  1654. request->rpcRequest = wrapInLayer(object, getDatacenterWithId(datacenterId), request);
  1655. requestsQueue.push_back(std::unique_ptr<Request>(request));
  1656. if (immediate) {
  1657. processRequestQueue(0, 0);
  1658. }
  1659. });
  1660. return requestToken;
  1661. }
  1662. #ifdef ANDROID
  1663. void ConnectionsManager::sendRequest(TLObject *object, onCompleteFunc onComplete, onQuickAckFunc onQuickAck, onWriteToSocketFunc onWriteToSocket, uint32_t flags, uint32_t datacenterId, ConnectionType connetionType, bool immediate, int32_t requestToken, jobject ptr1, jobject ptr2, jobject ptr3) {
  1664. if (!currentUserId && !(flags & RequestFlagWithoutLogin)) {
  1665. if (LOGS_ENABLED) DEBUG_D("can't do request without login %s", typeid(*object).name());
  1666. delete object;
  1667. JNIEnv *env = 0;
  1668. if (javaVm->GetEnv((void **) &env, JNI_VERSION_1_6) != JNI_OK) {
  1669. if (LOGS_ENABLED) DEBUG_E("can't get jnienv");
  1670. exit(1);
  1671. }
  1672. if (ptr1 != nullptr) {
  1673. env->DeleteGlobalRef(ptr1);
  1674. ptr1 = nullptr;
  1675. }
  1676. if (ptr2 != nullptr) {
  1677. env->DeleteGlobalRef(ptr2);
  1678. ptr2 = nullptr;
  1679. }
  1680. if (ptr3 != nullptr) {
  1681. env->DeleteGlobalRef(ptr3);
  1682. ptr3 = nullptr;
  1683. }
  1684. return;
  1685. }
  1686. scheduleTask([&, requestToken, object, onComplete, onQuickAck, onWriteToSocket, flags, datacenterId, connetionType, immediate, ptr1, ptr2, ptr3] {
  1687. if (LOGS_ENABLED) DEBUG_D("send request %p - %s", object, typeid(*object).name());
  1688. auto request = new Request(instanceNum, requestToken, connetionType, flags, datacenterId, onComplete, onQuickAck, onWriteToSocket);
  1689. request->rawRequest = object;
  1690. request->ptr1 = ptr1;
  1691. request->ptr2 = ptr2;
  1692. request->ptr3 = ptr3;
  1693. request->rpcRequest = wrapInLayer(object, getDatacenterWithId(datacenterId), request);
  1694. if (LOGS_ENABLED) DEBUG_D("send request wrapped %p - %s", request->rpcRequest.get(), typeid(*(request->rpcRequest.get())).name());
  1695. requestsQueue.push_back(std::unique_ptr<Request>(request));
  1696. if (immediate) {
  1697. processRequestQueue(0, 0);
  1698. }
  1699. });
  1700. }
  1701. #endif
  1702. void ConnectionsManager::cancelRequestsForGuid(int32_t guid) {
  1703. scheduleTask([&, guid] {
  1704. auto iter = requestsByGuids.find(guid);
  1705. if (iter != requestsByGuids.end()) {
  1706. std::vector<int32_t> &requests = iter->second;
  1707. size_t count = requests.size();
  1708. for (uint32_t a = 0; a < count; a++) {
  1709. cancelRequestInternal(requests[a], 0, true, false);
  1710. auto iter2 = guidsByRequests.find(requests[a]);
  1711. if (iter2 != guidsByRequests.end()) {
  1712. guidsByRequests.erase(iter2);
  1713. }
  1714. }
  1715. requestsByGuids.erase(iter);
  1716. }
  1717. });
  1718. }
  1719. void ConnectionsManager::bindRequestToGuid(int32_t requestToken, int32_t guid) {
  1720. scheduleTask([&, requestToken, guid] {
  1721. auto iter = requestsByGuids.find(guid);
  1722. if (iter != requestsByGuids.end()) {
  1723. iter->second.push_back(requestToken);
  1724. } else {
  1725. std::vector<int32_t> array;
  1726. array.push_back(requestToken);
  1727. requestsByGuids[guid] = array;
  1728. }
  1729. guidsByRequests[requestToken] = guid;
  1730. });
  1731. }
  1732. void ConnectionsManager::setUserId(int64_t userId) {
  1733. scheduleTask([&, userId] {
  1734. int32_t oldUserId = currentUserId;
  1735. currentUserId = userId;
  1736. if (oldUserId == userId && userId != 0) {
  1737. registerForInternalPushUpdates();
  1738. }
  1739. if (currentUserId != userId && userId != 0) {
  1740. updateDcSettings(0, false);
  1741. }
  1742. if (currentUserId != 0 && pushConnectionEnabled) {
  1743. Datacenter *datacenter = getDatacenterWithId(currentDatacenterId);
  1744. if (datacenter != nullptr) {
  1745. datacenter->createPushConnection()->setSessionId(pushSessionId);
  1746. sendPing(datacenter, true);
  1747. }
  1748. }
  1749. });
  1750. }
  1751. void ConnectionsManager::switchBackend(bool restart) {
  1752. scheduleTask([&, restart] {
  1753. currentDatacenterId = 1;
  1754. testBackend = !testBackend;
  1755. if (!restart) {
  1756. Handshake::cleanupServerKeys();
  1757. }
  1758. datacenters.clear();
  1759. initDatacenters();
  1760. saveConfig();
  1761. if (restart) {
  1762. exit(1);
  1763. }
  1764. });
  1765. }
  1766. void ConnectionsManager::removeRequestFromGuid(int32_t requestToken) {
  1767. auto iter2 = guidsByRequests.find(requestToken);
  1768. if (iter2 != guidsByRequests.end()) {
  1769. auto iter = requestsByGuids.find(iter2->first);
  1770. if (iter != requestsByGuids.end()) {
  1771. auto iter3 = std::find(iter->second.begin(), iter->second.end(), iter->first);
  1772. if (iter3 != iter->second.end()) {
  1773. iter->second.erase(iter3);
  1774. if (iter->second.empty()) {
  1775. requestsByGuids.erase(iter);
  1776. }
  1777. }
  1778. }
  1779. guidsByRequests.erase(iter2);
  1780. }
  1781. }
  1782. bool ConnectionsManager::cancelRequestInternal(int32_t token, int64_t messageId, bool notifyServer, bool removeFromClass) {
  1783. for (auto iter = requestsQueue.begin(); iter != requestsQueue.end(); iter++) {
  1784. Request *request = iter->get();
  1785. if ((token != 0 && request->requestToken == token) || (messageId != 0 && request->respondsToMessageId(messageId))) {
  1786. request->cancelled = true;
  1787. if (LOGS_ENABLED) DEBUG_D("cancelled queued rpc request %p - %s", request->rawRequest, typeid(*request->rawRequest).name());
  1788. requestsQueue.erase(iter);
  1789. if (removeFromClass) {
  1790. removeRequestFromGuid(token);
  1791. }
  1792. return true;
  1793. }
  1794. }
  1795. for (auto iter = runningRequests.begin(); iter != runningRequests.end(); iter++) {
  1796. Request *request = iter->get();
  1797. if ((token != 0 && request->requestToken == token) || (messageId != 0 && request->respondsToMessageId(messageId))) {
  1798. if (notifyServer) {
  1799. auto dropAnswer = new TL_rpc_drop_answer();
  1800. dropAnswer->req_msg_id = request->messageId;
  1801. sendRequest(dropAnswer, nullptr, nullptr, RequestFlagEnableUnauthorized | RequestFlagWithoutLogin | RequestFlagFailOnServerErrors, request->datacenterId, request->connectionType, true);
  1802. }
  1803. request->cancelled = true;
  1804. if (LOGS_ENABLED) DEBUG_D("cancelled running rpc request %p - %s", request->rawRequest, typeid(*request->rawRequest).name());
  1805. runningRequests.erase(iter);
  1806. if (removeFromClass) {
  1807. removeRequestFromGuid(token);
  1808. }
  1809. return true;
  1810. }
  1811. }
  1812. return false;
  1813. }
  1814. void ConnectionsManager::cancelRequest(int32_t token, bool notifyServer) {
  1815. if (token == 0) {
  1816. return;
  1817. }
  1818. scheduleTask([&, token, notifyServer] {
  1819. cancelRequestInternal(token, 0, notifyServer, true);
  1820. });
  1821. }
  1822. void ConnectionsManager::onDatacenterHandshakeComplete(Datacenter *datacenter, HandshakeType type, int32_t timeDiff) {
  1823. saveConfig();
  1824. uint32_t datacenterId = datacenter->getDatacenterId();
  1825. if (datacenterId == currentDatacenterId || datacenterId == movingToDatacenterId || updatingDcSettingsWorkaround || updatingDcSettings) {
  1826. timeDifference = timeDiff;
  1827. datacenter->recreateSessions(type);
  1828. clearRequestsForDatacenter(datacenter, type);
  1829. }
  1830. processRequestQueue(AllConnectionTypes, datacenterId);
  1831. if (type == HandshakeTypeTemp && !proxyCheckQueue.empty()) {
  1832. ProxyCheckInfo *proxyCheckInfo = proxyCheckQueue[0].release();
  1833. proxyCheckQueue.erase(proxyCheckQueue.begin());
  1834. scheduleCheckProxyInternal(proxyCheckInfo);
  1835. }
  1836. }
  1837. void ConnectionsManager::onDatacenterExportAuthorizationComplete(Datacenter *datacenter) {
  1838. saveConfig();
  1839. scheduleTask([&, datacenter] {
  1840. processRequestQueue(AllConnectionTypes, datacenter->getDatacenterId());
  1841. });
  1842. }
  1843. void ConnectionsManager::sendMessagesToConnection(std::vector<std::unique_ptr<NetworkMessage>> &messages, Connection *connection, bool reportAck) {
  1844. if (messages.empty() || connection == nullptr) {
  1845. return;
  1846. }
  1847. std::vector<std::unique_ptr<NetworkMessage>> currentMessages;
  1848. Datacenter *datacenter = connection->getDatacenter();
  1849. uint32_t currentSize = 0;
  1850. size_t count = messages.size();
  1851. for (uint32_t a = 0; a < count; a++) {
  1852. NetworkMessage *networkMessage = messages[a].get();
  1853. currentMessages.push_back(std::move(messages[a]));
  1854. currentSize += networkMessage->message->bytes;
  1855. if (currentSize >= 3 * 1024 || a == count - 1) {
  1856. int32_t quickAckId = 0;
  1857. NativeByteBuffer *transportData = datacenter->createRequestsData(currentMessages, reportAck ? &quickAckId : nullptr, connection, false);
  1858. if (transportData != nullptr) {
  1859. if (reportAck && quickAckId != 0) {
  1860. std::vector<int32_t> requestIds;
  1861. size_t count2 = currentMessages.size();
  1862. for (uint32_t b = 0; b < count2; b++) {
  1863. NetworkMessage *message = currentMessages[b].get();
  1864. if (message->requestId != 0) {
  1865. requestIds.push_back(message->requestId);
  1866. }
  1867. }
  1868. if (!requestIds.empty()) {
  1869. auto iter = quickAckIdToRequestIds.find(quickAckId);
  1870. if (iter == quickAckIdToRequestIds.end()) {
  1871. quickAckIdToRequestIds[quickAckId] = requestIds;
  1872. } else {
  1873. iter->second.insert(iter->second.end(), requestIds.begin(), requestIds.end());
  1874. }
  1875. }
  1876. }
  1877. connection->sendData(transportData, reportAck, true);
  1878. } else {
  1879. if (LOGS_ENABLED) DEBUG_E("connection(%p) connection data is empty", connection);
  1880. }
  1881. currentSize = 0;
  1882. currentMessages.clear();
  1883. }
  1884. }
  1885. }
  1886. void ConnectionsManager::sendMessagesToConnectionWithConfirmation(std::vector<std::unique_ptr<NetworkMessage>> &messages, Connection *connection, bool reportAck) {
  1887. NetworkMessage *networkMessage = connection->generateConfirmationRequest();
  1888. if (networkMessage != nullptr) {
  1889. messages.push_back(std::unique_ptr<NetworkMessage>(networkMessage));
  1890. }
  1891. sendMessagesToConnection(messages, connection, reportAck);
  1892. }
  1893. void ConnectionsManager::requestSaltsForDatacenter(Datacenter *datacenter, bool media, bool useTempConnection) {
  1894. uint32_t id = datacenter->getDatacenterId();
  1895. if (useTempConnection) {
  1896. id |= 0x80000000;
  1897. }
  1898. if (media) {
  1899. id |= 0x40000000;
  1900. }
  1901. if (std::find(requestingSaltsForDc.begin(), requestingSaltsForDc.end(), id) != requestingSaltsForDc.end()) {
  1902. return;
  1903. }
  1904. ConnectionType connectionType;
  1905. if (media) {
  1906. connectionType = ConnectionTypeGenericMedia;
  1907. } else if (useTempConnection) {
  1908. connectionType = ConnectionTypeTemp;
  1909. } else {
  1910. connectionType = ConnectionTypeGeneric;
  1911. }
  1912. requestingSaltsForDc.push_back(id);
  1913. auto request = new TL_get_future_salts();
  1914. request->num = 32;
  1915. sendRequest(request, [&, datacenter, id, media](TLObject *response, TL_error *error, int32_t networkType, int64_t responseTime) {
  1916. auto iter = std::find(requestingSaltsForDc.begin(), requestingSaltsForDc.end(), id);
  1917. if (iter != requestingSaltsForDc.end()) {
  1918. requestingSaltsForDc.erase(iter);
  1919. }
  1920. if (response != nullptr) {
  1921. datacenter->mergeServerSalts((TL_future_salts *) response, media);
  1922. saveConfig();
  1923. }
  1924. }, nullptr, RequestFlagWithoutLogin | RequestFlagEnableUnauthorized | RequestFlagUseUnboundKey, datacenter->getDatacenterId(), connectionType, true);
  1925. }
  1926. void ConnectionsManager::clearRequestsForDatacenter(Datacenter *datacenter, HandshakeType type) {
  1927. for (auto & runningRequest : runningRequests) {
  1928. Request *request = runningRequest.get();
  1929. Datacenter *requestDatacenter = getDatacenterWithId(request->datacenterId);
  1930. if (requestDatacenter->getDatacenterId() != datacenter->getDatacenterId()) {
  1931. continue;
  1932. }
  1933. if (type == HandshakeTypePerm || type == HandshakeTypeAll || (type == HandshakeTypeMediaTemp && request->isMediaRequest()) || (type == HandshakeTypeTemp && !request->isMediaRequest())) {
  1934. request->clear(true);
  1935. }
  1936. }
  1937. }
  1938. void ConnectionsManager::registerForInternalPushUpdates() {
  1939. if (registeringForPush || !currentUserId) {
  1940. return;
  1941. }
  1942. registeredForInternalPush = false;
  1943. registeringForPush = true;
  1944. auto request = new TL_account_registerDevice();
  1945. request->token_type = 7;
  1946. request->token = to_string_uint64((uint64_t) pushSessionId);
  1947. sendRequest(request, [&](TLObject *response, TL_error *error, int32_t networkType, int64_t responseTime) {
  1948. if (error == nullptr) {
  1949. registeredForInternalPush = true;
  1950. if (LOGS_ENABLED) DEBUG_D("registered for internal push");
  1951. } else {
  1952. registeredForInternalPush = false;
  1953. if (LOGS_ENABLED) DEBUG_E("unable to registering for internal push");
  1954. }
  1955. saveConfig();
  1956. registeringForPush = false;
  1957. }, nullptr, 0, DEFAULT_DATACENTER_ID, ConnectionTypeGeneric, true);
  1958. }
  1959. inline void addMessageToDatacenter(uint32_t datacenterId, NetworkMessage *networkMessage, std::map<uint32_t, std::vector<std::unique_ptr<NetworkMessage>>> &messagesToDatacenters) {
  1960. auto iter = messagesToDatacenters.find(datacenterId);
  1961. if (iter == messagesToDatacenters.end()) {
  1962. std::vector<std::unique_ptr<NetworkMessage>> &array = messagesToDatacenters[datacenterId] = std::vector<std::unique_ptr<NetworkMessage>>();
  1963. array.push_back(std::unique_ptr<NetworkMessage>(networkMessage));
  1964. } else {
  1965. iter->second.push_back(std::unique_ptr<NetworkMessage>(networkMessage));
  1966. }
  1967. }
  1968. void ConnectionsManager::processRequestQueue(uint32_t connectionTypes, uint32_t dc) {
  1969. genericMessagesToDatacenters.clear();
  1970. genericMediaMessagesToDatacenters.clear();
  1971. tempMessagesToDatacenters.clear();
  1972. unknownDatacenterIds.clear();
  1973. neededDatacenters.clear();
  1974. unauthorizedDatacenters.clear();
  1975. downloadRunningRequestCount.clear();
  1976. int64_t currentTimeMillis = getCurrentTimeMonotonicMillis();
  1977. auto currentTime = (int32_t) (currentTimeMillis / 1000);
  1978. uint32_t genericRunningRequestCount = 0;
  1979. uint32_t uploadRunningRequestCount = 0;
  1980. bool hasInvokeAfterMessage = false;
  1981. bool hasInvokeWaitMessage = false;
  1982. for (auto iter = runningRequests.begin(); iter != runningRequests.end();) {
  1983. Request *request = iter->get();
  1984. const std::type_info &typeInfo = typeid(*request->rawRequest);
  1985. uint32_t datacenterId = request->datacenterId;
  1986. if (datacenterId == DEFAULT_DATACENTER_ID) {
  1987. if (movingToDatacenterId != DEFAULT_DATACENTER_ID) {
  1988. iter++;
  1989. continue;
  1990. }
  1991. datacenterId = currentDatacenterId;
  1992. }
  1993. if ((request->requestFlags & RequestFlagResendAfter) != 0) {
  1994. hasInvokeWaitMessage = true;
  1995. if (hasInvokeAfterMessage) {
  1996. iter++;
  1997. continue;
  1998. }
  1999. }
  2000. if (!hasInvokeAfterMessage && (request->requestFlags & RequestFlagInvokeAfter) != 0) {
  2001. hasInvokeAfterMessage = true;
  2002. }
  2003. switch (request->connectionType & 0x0000ffff) {
  2004. case ConnectionTypeGeneric:
  2005. genericRunningRequestCount++;
  2006. break;
  2007. case ConnectionTypeDownload: {
  2008. uint32_t currentCount;
  2009. auto dcIter = downloadRunningRequestCount.find(datacenterId);
  2010. if (dcIter != downloadRunningRequestCount.end()) {
  2011. currentCount = dcIter->second;
  2012. } else {
  2013. currentCount = 0;
  2014. }
  2015. downloadRunningRequestCount[datacenterId] = currentCount + 1;
  2016. break;
  2017. }
  2018. case ConnectionTypeUpload:
  2019. uploadRunningRequestCount++;
  2020. break;
  2021. default:
  2022. break;
  2023. }
  2024. if (request->requestFlags & RequestFlagTryDifferentDc) {
  2025. int32_t requestStartTime = request->startTime;
  2026. int32_t timeout = 30;
  2027. if (updatingDcSettings && dynamic_cast<TL_help_getConfig *>(request->rawRequest)) {
  2028. requestStartTime = updatingDcStartTime;
  2029. updatingDcStartTime = currentTime;
  2030. timeout = 60;
  2031. }
  2032. if (request->startTime != 0 && abs(currentTime - requestStartTime) >= timeout) {
  2033. if (LOGS_ENABLED) DEBUG_D("move %s to requestsQueue", typeid(*request->rawRequest).name());
  2034. requestsQueue.push_back(std::move(*iter));
  2035. iter = runningRequests.erase(iter);
  2036. continue;
  2037. }
  2038. }
  2039. int32_t canUseUnboundKey = 0;
  2040. if ((request->requestFlags & RequestFlagUseUnboundKey) != 0) {
  2041. canUseUnboundKey |= 1;
  2042. }
  2043. Datacenter *requestDatacenter = getDatacenterWithId(datacenterId);
  2044. if (requestDatacenter == nullptr) {
  2045. if (std::find(unknownDatacenterIds.begin(), unknownDatacenterIds.end(), datacenterId) == unknownDatacenterIds.end()) {
  2046. unknownDatacenterIds.push_back(datacenterId);
  2047. }
  2048. iter++;
  2049. continue;
  2050. } else {
  2051. if (requestDatacenter->isCdnDatacenter) {
  2052. request->requestFlags |= RequestFlagEnableUnauthorized;
  2053. }
  2054. if (request->needInitRequest(requestDatacenter, currentVersion) && !request->hasInitFlag() && request->rawRequest->isNeedLayer()) {
  2055. if (LOGS_ENABLED) DEBUG_D("move %p - %s to requestsQueue because of initConnection", request->rawRequest, typeid(*request->rawRequest).name());
  2056. requestsQueue.push_back(std::move(*iter));
  2057. iter = runningRequests.erase(iter);
  2058. continue;
  2059. }
  2060. if (!requestDatacenter->hasAuthKey(request->connectionType, canUseUnboundKey)) {
  2061. std::pair<Datacenter *, ConnectionType> pair = std::make_pair(requestDatacenter, request->connectionType);
  2062. if (std::find(neededDatacenters.begin(), neededDatacenters.end(), pair) == neededDatacenters.end()) {
  2063. neededDatacenters.push_back(pair);
  2064. }
  2065. iter++;
  2066. continue;
  2067. } else if (!(request->requestFlags & RequestFlagEnableUnauthorized) && !requestDatacenter->authorized && request->datacenterId != DEFAULT_DATACENTER_ID && request->datacenterId != currentDatacenterId) {
  2068. if (std::find(unauthorizedDatacenters.begin(), unauthorizedDatacenters.end(), requestDatacenter) == unauthorizedDatacenters.end()) {
  2069. unauthorizedDatacenters.push_back(requestDatacenter);
  2070. }
  2071. iter++;
  2072. continue;
  2073. }
  2074. }
  2075. Connection *connection = requestDatacenter->getConnectionByType(request->connectionType, true, canUseUnboundKey);
  2076. int32_t maxTimeout = request->connectionType & ConnectionTypeGeneric ? 8 : 30;
  2077. if (!networkAvailable || connection->getConnectionToken() == 0) {
  2078. iter++;
  2079. continue;
  2080. }
  2081. uint32_t requestConnectionType = request->connectionType & 0x0000ffff;
  2082. bool forceThisRequest = (connectionTypes & requestConnectionType) && requestDatacenter->getDatacenterId() == dc;
  2083. if (typeInfo == typeid(TL_get_future_salts)) {
  2084. if (request->messageId != 0) {
  2085. request->addRespondMessageId(request->messageId);
  2086. }
  2087. request->clear(false);
  2088. forceThisRequest = false;
  2089. }
  2090. if (forceThisRequest || (abs(currentTime - request->startTime) > maxTimeout &&
  2091. (currentTime >= request->minStartTime ||
  2092. (request->failedByFloodWait != 0 && (request->minStartTime - currentTime) > request->failedByFloodWait) ||
  2093. (request->failedByFloodWait == 0 && abs(currentTime - request->minStartTime) >= 60))
  2094. )
  2095. ) {
  2096. if (!forceThisRequest && request->connectionToken > 0) {
  2097. if ((request->connectionType & ConnectionTypeGeneric || request->connectionType & ConnectionTypeTemp) && request->connectionToken == connection->getConnectionToken()) {
  2098. if (LOGS_ENABLED) DEBUG_D("request token is valid, not retrying %s (%p)", typeInfo.name(), request->rawRequest);
  2099. iter++;
  2100. continue;
  2101. } else {
  2102. if (connection->getConnectionToken() != 0 && request->connectionToken == connection->getConnectionToken()) {
  2103. if (LOGS_ENABLED) DEBUG_D("request download token is valid, not retrying %s (%p)", typeInfo.name(), request->rawRequest);
  2104. iter++;
  2105. continue;
  2106. }
  2107. }
  2108. }
  2109. if (request->connectionToken != 0 && request->connectionToken != connection->getConnectionToken()) {
  2110. request->lastResendTime = 0;
  2111. request->isResending = true;
  2112. }
  2113. request->retryCount++;
  2114. if (!request->failedBySalt) {
  2115. if (request->connectionType & ConnectionTypeDownload) {
  2116. uint32_t retryMax = 10;
  2117. if (!(request->requestFlags & RequestFlagForceDownload)) {
  2118. if (request->failedByFloodWait) {
  2119. retryMax = 2;
  2120. } else {
  2121. retryMax = 6;
  2122. }
  2123. }
  2124. if (request->retryCount >= retryMax) {
  2125. if (LOGS_ENABLED) DEBUG_E("timed out %s", typeInfo.name());
  2126. auto error = new TL_error();
  2127. error->code = -123;
  2128. error->text = "RETRY_LIMIT";
  2129. request->onComplete(nullptr, error, connection->currentNetworkType, 0);
  2130. delete error;
  2131. iter = runningRequests.erase(iter);
  2132. continue;
  2133. }
  2134. }
  2135. } else {
  2136. request->failedBySalt = false;
  2137. }
  2138. if (request->messageSeqNo == 0) {
  2139. request->messageSeqNo = connection->generateMessageSeqNo((request->connectionType & ConnectionTypeProxy) == 0);
  2140. request->messageId = generateMessageId();
  2141. if (request->rawRequest->initFunc != nullptr) {
  2142. request->rawRequest->initFunc(request->messageId);
  2143. }
  2144. }
  2145. request->startTime = currentTime;
  2146. request->startTimeMillis = currentTimeMillis;
  2147. auto networkMessage = new NetworkMessage();
  2148. networkMessage->forceContainer = request->isResending;
  2149. networkMessage->message = std::make_unique<TL_message>();
  2150. networkMessage->message->msg_id = request->messageId;
  2151. networkMessage->message->bytes = request->serializedLength;
  2152. networkMessage->message->outgoingBody = request->getRpcRequest();
  2153. networkMessage->message->seqno = request->messageSeqNo;
  2154. networkMessage->requestId = request->requestToken;
  2155. networkMessage->invokeAfter = (request->requestFlags & RequestFlagInvokeAfter) != 0 && (request->requestFlags & RequestFlagResendAfter) == 0;
  2156. networkMessage->needQuickAck = (request->requestFlags & RequestFlagNeedQuickAck) != 0;
  2157. request->connectionToken = connection->getConnectionToken();
  2158. switch (requestConnectionType) {
  2159. case ConnectionTypeGeneric:
  2160. addMessageToDatacenter(requestDatacenter->getDatacenterId(), networkMessage, genericMessagesToDatacenters);
  2161. break;
  2162. case ConnectionTypeGenericMedia:
  2163. addMessageToDatacenter(requestDatacenter->getDatacenterId(), networkMessage, genericMediaMessagesToDatacenters);
  2164. break;
  2165. case ConnectionTypeTemp:
  2166. addMessageToDatacenter(requestDatacenter->getDatacenterId(), networkMessage, tempMessagesToDatacenters);
  2167. break;
  2168. case ConnectionTypeProxy: {
  2169. std::vector<std::unique_ptr<NetworkMessage>> array;
  2170. array.push_back(std::unique_ptr<NetworkMessage>(networkMessage));
  2171. sendMessagesToConnection(array, connection, false);
  2172. break;
  2173. }
  2174. case ConnectionTypeDownload:
  2175. case ConnectionTypeUpload: {
  2176. std::vector<std::unique_ptr<NetworkMessage>> array;
  2177. array.push_back(std::unique_ptr<NetworkMessage>(networkMessage));
  2178. sendMessagesToConnectionWithConfirmation(array, connection, false);
  2179. request->onWriteToSocket();
  2180. break;
  2181. }
  2182. default:
  2183. delete networkMessage;
  2184. }
  2185. }
  2186. iter++;
  2187. }
  2188. Connection *genericConnection = nullptr;
  2189. Datacenter *defaultDatacenter = getDatacenterWithId(currentDatacenterId);
  2190. if (defaultDatacenter != nullptr) {
  2191. genericConnection = defaultDatacenter->getGenericConnection(true, 0);
  2192. if (genericConnection != nullptr && !sessionsToDestroy.empty() && genericConnection->getConnectionToken() != 0) {
  2193. auto iter = sessionsToDestroy.begin();
  2194. if (abs(currentTime - lastDestroySessionRequestTime) > 2) {
  2195. lastDestroySessionRequestTime = currentTime;
  2196. auto request = new TL_destroy_session();
  2197. request->session_id = *iter;
  2198. auto networkMessage = new NetworkMessage();
  2199. networkMessage->message = std::make_unique<TL_message>();
  2200. networkMessage->message->msg_id = generateMessageId();
  2201. networkMessage->message->bytes = request->getObjectSize();
  2202. networkMessage->message->body = std::unique_ptr<TLObject>(request);
  2203. networkMessage->message->seqno = genericConnection->generateMessageSeqNo(false);
  2204. addMessageToDatacenter(defaultDatacenter->getDatacenterId(), networkMessage, genericMessagesToDatacenters);
  2205. }
  2206. sessionsToDestroy.erase(iter);
  2207. }
  2208. }
  2209. for (auto iter = requestsQueue.begin(); iter != requestsQueue.end();) {
  2210. Request *request = iter->get();
  2211. if (request->cancelled) {
  2212. iter = requestsQueue.erase(iter);
  2213. continue;
  2214. }
  2215. if (hasInvokeWaitMessage && (request->requestFlags & RequestFlagInvokeAfter) != 0 && (request->requestFlags & RequestFlagResendAfter) == 0) {
  2216. request->requestFlags |= RequestFlagResendAfter;
  2217. }
  2218. if (hasInvokeAfterMessage && (request->requestFlags & RequestFlagResendAfter) != 0) {
  2219. iter++;
  2220. continue;
  2221. }
  2222. if (!hasInvokeAfterMessage && (request->requestFlags & RequestFlagInvokeAfter) != 0) {
  2223. hasInvokeAfterMessage = true;
  2224. }
  2225. uint32_t datacenterId = request->datacenterId;
  2226. if (datacenterId == DEFAULT_DATACENTER_ID) {
  2227. if (movingToDatacenterId != DEFAULT_DATACENTER_ID) {
  2228. iter++;
  2229. continue;
  2230. }
  2231. datacenterId = currentDatacenterId;
  2232. }
  2233. int32_t canUseUnboundKey = 0;
  2234. if ((request->requestFlags & RequestFlagUseUnboundKey) != 0) {
  2235. canUseUnboundKey |= 1;
  2236. }
  2237. if (request->requestFlags & RequestFlagTryDifferentDc) {
  2238. int32_t requestStartTime = request->startTime;
  2239. int32_t timeout = 30;
  2240. if (updatingDcSettings && dynamic_cast<TL_help_getConfig *>(request->rawRequest)) {
  2241. requestStartTime = updatingDcStartTime;
  2242. timeout = 60;
  2243. } else {
  2244. request->startTime = 0;
  2245. request->startTimeMillis = 0;
  2246. }
  2247. if (requestStartTime != 0 && abs(currentTime - requestStartTime) >= timeout) {
  2248. std::vector<uint32_t> allDc;
  2249. for (auto & datacenter : datacenters) {
  2250. if (datacenter.first == datacenterId || datacenter.second->isCdnDatacenter) {
  2251. continue;
  2252. }
  2253. allDc.push_back(datacenter.first);
  2254. }
  2255. uint8_t index;
  2256. RAND_bytes(&index, 1);
  2257. datacenterId = allDc[index % allDc.size()];
  2258. if (dynamic_cast<TL_help_getConfig *>(request->rawRequest)) {
  2259. updatingDcStartTime = currentTime;
  2260. request->datacenterId = datacenterId;
  2261. } else {
  2262. currentDatacenterId = datacenterId;
  2263. }
  2264. }
  2265. }
  2266. Datacenter *requestDatacenter = getDatacenterWithId(datacenterId);
  2267. if (requestDatacenter == nullptr) {
  2268. if (std::find(unknownDatacenterIds.begin(), unknownDatacenterIds.end(), datacenterId) == unknownDatacenterIds.end()) {
  2269. unknownDatacenterIds.push_back(datacenterId);
  2270. }
  2271. iter++;
  2272. continue;
  2273. } else {
  2274. if (request->needInitRequest(requestDatacenter, currentVersion) && !request->hasInitFlag()) {
  2275. request->rpcRequest.release();
  2276. request->rpcRequest = wrapInLayer(request->rawRequest, requestDatacenter, request);
  2277. }
  2278. if (!requestDatacenter->hasAuthKey(request->connectionType, canUseUnboundKey)) {
  2279. std::pair<Datacenter *, ConnectionType> pair = std::make_pair(requestDatacenter, request->connectionType);
  2280. if (std::find(neededDatacenters.begin(), neededDatacenters.end(), pair) == neededDatacenters.end()) {
  2281. neededDatacenters.push_back(pair);
  2282. }
  2283. iter++;
  2284. continue;
  2285. } else if (!(request->requestFlags & RequestFlagEnableUnauthorized) && !requestDatacenter->authorized && request->datacenterId != DEFAULT_DATACENTER_ID && request->datacenterId != currentDatacenterId) {
  2286. if (std::find(unauthorizedDatacenters.begin(), unauthorizedDatacenters.end(), requestDatacenter) == unauthorizedDatacenters.end()) {
  2287. unauthorizedDatacenters.push_back(requestDatacenter);
  2288. }
  2289. iter++;
  2290. continue;
  2291. }
  2292. }
  2293. Connection *connection = requestDatacenter->getConnectionByType(request->connectionType, true, canUseUnboundKey);
  2294. if (request->connectionType & ConnectionTypeGeneric && connection->getConnectionToken() == 0) {
  2295. iter++;
  2296. continue;
  2297. }
  2298. switch (request->connectionType & 0x0000ffff) {
  2299. case ConnectionTypeGeneric:
  2300. case ConnectionTypeGenericMedia:
  2301. if (!canUseUnboundKey && genericRunningRequestCount >= 60) {
  2302. iter++;
  2303. continue;
  2304. }
  2305. genericRunningRequestCount++;
  2306. break;
  2307. case ConnectionTypeDownload: {
  2308. uint32_t currentCount;
  2309. auto dcIter = downloadRunningRequestCount.find(datacenterId);
  2310. if (dcIter != downloadRunningRequestCount.end()) {
  2311. currentCount = dcIter->second;
  2312. } else {
  2313. currentCount = 0;
  2314. }
  2315. if (!networkAvailable || currentCount >= 16) {
  2316. iter++;
  2317. continue;
  2318. }
  2319. downloadRunningRequestCount[datacenterId] = currentCount + 1;
  2320. break;
  2321. }
  2322. case ConnectionTypeProxy:
  2323. case ConnectionTypeTemp:
  2324. if (!networkAvailable) {
  2325. iter++;
  2326. continue;
  2327. }
  2328. break;
  2329. case ConnectionTypeUpload:
  2330. if (!networkAvailable || uploadRunningRequestCount >= 10) {
  2331. iter++;
  2332. continue;
  2333. }
  2334. uploadRunningRequestCount++;
  2335. break;
  2336. default:
  2337. break;
  2338. }
  2339. request->messageId = generateMessageId();
  2340. if (request->rawRequest->initFunc != nullptr) {
  2341. request->rawRequest->initFunc(request->messageId);
  2342. }
  2343. if (LOGS_ENABLED) DEBUG_D("messageId for token = %d, 0x%" PRIx64, request->requestToken, request->messageId);
  2344. uint32_t requestLength = request->rpcRequest->getObjectSize();
  2345. if (request->requestFlags & RequestFlagCanCompress) {
  2346. request->requestFlags &= ~RequestFlagCanCompress;
  2347. NativeByteBuffer *original = BuffersStorage::getInstance().getFreeBuffer(requestLength);
  2348. request->rpcRequest->serializeToStream(original);
  2349. NativeByteBuffer *buffer = compressGZip(original);
  2350. if (buffer != nullptr) {
  2351. auto packed = new TL_gzip_packed();
  2352. packed->originalRequest = std::move(request->rpcRequest);
  2353. packed->packed_data_to_send = buffer;
  2354. request->rpcRequest = std::unique_ptr<TLObject>(packed);
  2355. requestLength = packed->getObjectSize();
  2356. }
  2357. original->reuse();
  2358. }
  2359. request->serializedLength = requestLength;
  2360. request->messageSeqNo = connection->generateMessageSeqNo((request->connectionType & ConnectionTypeProxy) == 0);
  2361. request->startTime = currentTime;
  2362. request->startTimeMillis = currentTimeMillis;
  2363. request->connectionToken = connection->getConnectionToken();
  2364. auto networkMessage = new NetworkMessage();
  2365. networkMessage->message = std::make_unique<TL_message>();
  2366. networkMessage->forceContainer = request->isResending;
  2367. networkMessage->message->msg_id = request->messageId;
  2368. networkMessage->message->bytes = request->serializedLength;
  2369. networkMessage->message->outgoingBody = request->getRpcRequest();
  2370. networkMessage->message->seqno = request->messageSeqNo;
  2371. networkMessage->requestId = request->requestToken;
  2372. networkMessage->invokeAfter = (request->requestFlags & RequestFlagInvokeAfter) != 0 && (request->requestFlags & RequestFlagResendAfter) == 0;
  2373. networkMessage->needQuickAck = (request->requestFlags & RequestFlagNeedQuickAck) != 0;
  2374. if (!hasPendingRequestsForConnection(connection)) {
  2375. connection->resetLastEventTime();
  2376. }
  2377. runningRequests.push_back(std::move(*iter));
  2378. switch (request->connectionType & 0x0000ffff) {
  2379. case ConnectionTypeGeneric:
  2380. addMessageToDatacenter(requestDatacenter->getDatacenterId(), networkMessage, genericMessagesToDatacenters);
  2381. break;
  2382. case ConnectionTypeGenericMedia:
  2383. addMessageToDatacenter(requestDatacenter->getDatacenterId(), networkMessage, genericMediaMessagesToDatacenters);
  2384. break;
  2385. case ConnectionTypeTemp:
  2386. addMessageToDatacenter(requestDatacenter->getDatacenterId(), networkMessage, tempMessagesToDatacenters);
  2387. break;
  2388. case ConnectionTypeProxy: {
  2389. std::vector<std::unique_ptr<NetworkMessage>> array;
  2390. array.push_back(std::unique_ptr<NetworkMessage>(networkMessage));
  2391. sendMessagesToConnection(array, connection, false);
  2392. break;
  2393. }
  2394. case ConnectionTypeDownload:
  2395. case ConnectionTypeUpload: {
  2396. std::vector<std::unique_ptr<NetworkMessage>> array;
  2397. array.push_back(std::unique_ptr<NetworkMessage>(networkMessage));
  2398. sendMessagesToConnectionWithConfirmation(array, connection, false);
  2399. break;
  2400. }
  2401. default:
  2402. delete networkMessage;
  2403. }
  2404. iter = requestsQueue.erase(iter);
  2405. }
  2406. for (auto & iter : datacenters) {
  2407. Datacenter *datacenter = iter.second;
  2408. auto iter2 = genericMessagesToDatacenters.find(datacenter->getDatacenterId());
  2409. if (iter2 == genericMessagesToDatacenters.end()) {
  2410. Connection *connection = datacenter->getGenericConnection(false, 1);
  2411. if (connection != nullptr && connection->getConnectionToken() != 0 && connection->hasMessagesToConfirm()) {
  2412. genericMessagesToDatacenters[datacenter->getDatacenterId()] = std::vector<std::unique_ptr<NetworkMessage>>();
  2413. }
  2414. }
  2415. iter2 = genericMediaMessagesToDatacenters.find(datacenter->getDatacenterId());
  2416. if (iter2 == genericMediaMessagesToDatacenters.end()) {
  2417. Connection *connection = datacenter->getGenericMediaConnection(false, 1);
  2418. if (connection != nullptr && connection->getConnectionToken() != 0 && connection->hasMessagesToConfirm()) {
  2419. genericMediaMessagesToDatacenters[datacenter->getDatacenterId()] = std::vector<std::unique_ptr<NetworkMessage>>();
  2420. }
  2421. }
  2422. iter2 = tempMessagesToDatacenters.find(datacenter->getDatacenterId());
  2423. if (iter2 == tempMessagesToDatacenters.end()) {
  2424. Connection *connection = datacenter->getTempConnection(false);
  2425. if (connection != nullptr && connection->getConnectionToken() != 0 && connection->hasMessagesToConfirm()) {
  2426. tempMessagesToDatacenters[datacenter->getDatacenterId()] = std::vector<std::unique_ptr<NetworkMessage>>();
  2427. }
  2428. }
  2429. }
  2430. for (auto & genericMessagesToDatacenter : genericMessagesToDatacenters) {
  2431. Datacenter *datacenter = getDatacenterWithId(genericMessagesToDatacenter.first);
  2432. if (datacenter != nullptr) {
  2433. bool scannedPreviousRequests = false;
  2434. bool needQuickAck = false;
  2435. int64_t lastSentMessageRpcId = 0;
  2436. std::vector<std::unique_ptr<NetworkMessage>> &array = genericMessagesToDatacenter.second;
  2437. size_t count = array.size();
  2438. for (uint32_t b = 0; b < count; b++) {
  2439. NetworkMessage *networkMessage = array[b].get();
  2440. if (networkMessage->needQuickAck) {
  2441. needQuickAck = true;
  2442. }
  2443. if (networkMessage->invokeAfter) {
  2444. if (!scannedPreviousRequests) {
  2445. scannedPreviousRequests = true;
  2446. std::vector<int64_t> currentRequests;
  2447. for (uint32_t a = 0; a < count; a++) {
  2448. NetworkMessage *currentNetworkMessage = array[a].get();
  2449. if (currentNetworkMessage->invokeAfter) {
  2450. currentRequests.push_back(currentNetworkMessage->message->msg_id);
  2451. }
  2452. }
  2453. int64_t maxRequestId = 0;
  2454. if (lastInvokeAfterMessageId != 0) {
  2455. auto timeMessage = (int64_t) (lastInvokeAfterMessageId / 4294967296.0);
  2456. if (getCurrentTime() - timeMessage <= 5) {
  2457. maxRequestId = lastInvokeAfterMessageId;
  2458. }
  2459. }
  2460. for (auto & runningRequest : runningRequests) {
  2461. Request *request = runningRequest.get();
  2462. if (request->requestFlags & RequestFlagInvokeAfter) {
  2463. if (request->messageId > maxRequestId && std::find(currentRequests.begin(), currentRequests.end(), request->messageId) == currentRequests.end()) {
  2464. maxRequestId = request->messageId;
  2465. }
  2466. }
  2467. }
  2468. lastSentMessageRpcId = maxRequestId;
  2469. }
  2470. TL_message *message = networkMessage->message.get();
  2471. if (lastSentMessageRpcId != 0 && lastSentMessageRpcId != message->msg_id) {
  2472. auto request = new TL_invokeAfterMsg();
  2473. request->msg_id = lastSentMessageRpcId;
  2474. if (message->outgoingBody != nullptr) {
  2475. if (LOGS_ENABLED) DEBUG_D("wrap outgoingBody(%p, %s) to TL_invokeAfterMsg, token = %d, after 0x%" PRIx64, message->outgoingBody, typeid(*message->outgoingBody).name(), networkMessage->requestId, request->msg_id);
  2476. request->outgoingQuery = message->outgoingBody;
  2477. message->outgoingBody = nullptr;
  2478. } else {
  2479. if (LOGS_ENABLED) DEBUG_D("wrap body(%p, %s) to TL_invokeAfterMsg, token = %d, after 0x%" PRIx64, message->body.get(), typeid(*(message->body.get())).name(), networkMessage->requestId, request->msg_id);
  2480. request->query = std::move(message->body);
  2481. }
  2482. message->body = std::unique_ptr<TLObject>(request);
  2483. message->bytes += 4 + 8;
  2484. }
  2485. lastSentMessageRpcId = message->msg_id;
  2486. lastInvokeAfterMessageId = message->msg_id;
  2487. }
  2488. }
  2489. sendMessagesToConnectionWithConfirmation(array, datacenter->getGenericConnection(true, 1), needQuickAck);
  2490. }
  2491. }
  2492. for (auto & tempMessagesToDatacenter : tempMessagesToDatacenters) {
  2493. Datacenter *datacenter = getDatacenterWithId(tempMessagesToDatacenter.first);
  2494. if (datacenter != nullptr) {
  2495. std::vector<std::unique_ptr<NetworkMessage>> &array = tempMessagesToDatacenter.second;
  2496. sendMessagesToConnectionWithConfirmation(array, datacenter->getTempConnection(true), false);
  2497. }
  2498. }
  2499. for (auto & genericMediaMessagesToDatacenter : genericMediaMessagesToDatacenters) {
  2500. Datacenter *datacenter = getDatacenterWithId(genericMediaMessagesToDatacenter.first);
  2501. if (datacenter != nullptr) {
  2502. std::vector<std::unique_ptr<NetworkMessage>> &array = genericMediaMessagesToDatacenter.second;
  2503. sendMessagesToConnectionWithConfirmation(array, datacenter->getGenericMediaConnection(true, 1), false);
  2504. }
  2505. }
  2506. if (connectionTypes == ConnectionTypeGeneric && dc == currentDatacenterId) {
  2507. auto iter2 = genericMessagesToDatacenters.find(currentDatacenterId);
  2508. if (iter2 == genericMessagesToDatacenters.end()) {
  2509. sendPing(getDatacenterWithId(currentDatacenterId), false);
  2510. }
  2511. }
  2512. if (!unknownDatacenterIds.empty()) {
  2513. updateDcSettings(0, false);
  2514. }
  2515. size_t count = neededDatacenters.size();
  2516. for (uint32_t a = 0; a < count; a++) {
  2517. Datacenter *datacenter = neededDatacenters[a].first;
  2518. bool media = Connection::isMediaConnectionType(neededDatacenters[a].second) && datacenter->hasMediaAddress();
  2519. if (datacenter->getDatacenterId() != movingToDatacenterId && !datacenter->isHandshaking(media) && !datacenter->hasAuthKey(neededDatacenters[a].second, 1)) {
  2520. datacenter->beginHandshake(media ? HandshakeTypeMediaTemp : HandshakeTypeTemp, true);
  2521. }
  2522. }
  2523. if (currentUserId) {
  2524. count = unauthorizedDatacenters.size();
  2525. for (uint32_t a = 0; a < count; a++) {
  2526. Datacenter *datacenter = unauthorizedDatacenters[a];
  2527. uint32_t id = datacenter->getDatacenterId();
  2528. if (id != currentDatacenterId && id != movingToDatacenterId && !datacenter->isExportingAuthorization()) {
  2529. datacenter->exportAuthorization();
  2530. }
  2531. }
  2532. }
  2533. }
  2534. Datacenter *ConnectionsManager::getDatacenterWithId(uint32_t datacenterId) {
  2535. if (datacenterId == DEFAULT_DATACENTER_ID) {
  2536. return datacenters[currentDatacenterId];
  2537. }
  2538. auto iter = datacenters.find(datacenterId);
  2539. return iter != datacenters.end() ? iter->second : nullptr;
  2540. }
  2541. std::unique_ptr<TLObject> ConnectionsManager::wrapInLayer(TLObject *object, Datacenter *datacenter, Request *baseRequest) {
  2542. if (object->isNeedLayer()) {
  2543. bool media = PFS_ENABLED && datacenter != nullptr && baseRequest->isMediaRequest() && datacenter->hasMediaAddress();
  2544. if (datacenter == nullptr || baseRequest->needInitRequest(datacenter, currentVersion)) {
  2545. if (datacenter != nullptr && datacenter->getDatacenterId() == currentDatacenterId) {
  2546. registerForInternalPushUpdates();
  2547. }
  2548. if (media) {
  2549. baseRequest->isInitMediaRequest = true;
  2550. } else {
  2551. baseRequest->isInitRequest = true;
  2552. }
  2553. auto request = new initConnection();
  2554. if (delegate != nullptr) {
  2555. request->flags = delegate->getInitFlags(instanceNum);
  2556. } else {
  2557. request->flags = 0;
  2558. }
  2559. request->query = std::unique_ptr<TLObject>(object);
  2560. request->api_id = currentApiId;
  2561. request->app_version = currentAppVersion;
  2562. request->lang_code = currentLangCode;
  2563. request->lang_pack = "android";
  2564. request->system_lang_code = currentSystemLangCode;
  2565. auto jsonObject = new TL_jsonObject();
  2566. request->params = std::unique_ptr<JSONValue>(jsonObject);
  2567. if (!currentRegId.empty()) {
  2568. auto objectValue = new TL_jsonObjectValue();
  2569. jsonObject->value.push_back(std::unique_ptr<TL_jsonObjectValue>(objectValue));
  2570. auto jsonString = new TL_jsonString();
  2571. jsonString->value = currentRegId;
  2572. objectValue->key = "device_token";
  2573. objectValue->value = std::unique_ptr<JSONValue>(jsonString);
  2574. }
  2575. if (!certFingerprint.empty()) {
  2576. auto objectValue = new TL_jsonObjectValue();
  2577. jsonObject->value.push_back(std::unique_ptr<TL_jsonObjectValue>(objectValue));
  2578. auto jsonString = new TL_jsonString();
  2579. jsonString->value = certFingerprint;
  2580. objectValue->key = "data";
  2581. objectValue->value = std::unique_ptr<JSONValue>(jsonString);
  2582. }
  2583. auto objectValue = new TL_jsonObjectValue();
  2584. jsonObject->value.push_back(std::unique_ptr<TL_jsonObjectValue>(objectValue));
  2585. auto jsonString = new TL_jsonString();
  2586. jsonString->value = installer;
  2587. objectValue->key = "installer";
  2588. objectValue->value = std::unique_ptr<JSONValue>(jsonString);
  2589. objectValue = new TL_jsonObjectValue();
  2590. jsonObject->value.push_back(std::unique_ptr<TL_jsonObjectValue>(objectValue));
  2591. jsonString = new TL_jsonString();
  2592. jsonString->value = package;
  2593. objectValue->key = "package_id";
  2594. objectValue->value = std::unique_ptr<JSONValue>(jsonString);
  2595. objectValue = new TL_jsonObjectValue();
  2596. jsonObject->value.push_back(std::unique_ptr<TL_jsonObjectValue>(objectValue));
  2597. auto jsonNumber = new TL_jsonNumber();
  2598. jsonNumber->value = currentDeviceTimezone;
  2599. objectValue->key = "tz_offset";
  2600. objectValue->value = std::unique_ptr<JSONValue>(jsonNumber);
  2601. request->flags |= 2;
  2602. if (!proxyAddress.empty() && !proxySecret.empty()) {
  2603. request->flags |= 1;
  2604. request->proxy = std::make_unique<TL_inputClientProxy>();
  2605. request->proxy->address = proxyAddress;
  2606. request->proxy->port = proxyPort;
  2607. }
  2608. if (datacenter == nullptr || datacenter->isCdnDatacenter) {
  2609. request->device_model = "n/a";
  2610. request->system_version = "n/a";
  2611. } else {
  2612. request->device_model = currentDeviceModel;
  2613. request->system_version = currentSystemVersion;
  2614. }
  2615. if (request->lang_code.empty()) {
  2616. request->lang_code = "en";
  2617. }
  2618. if (request->device_model.empty()) {
  2619. request->device_model = "n/a";
  2620. }
  2621. if (request->app_version.empty()) {
  2622. request->app_version = "n/a";
  2623. }
  2624. if (request->system_version.empty()) {
  2625. request->system_version = "n/a";
  2626. }
  2627. auto request2 = new invokeWithLayer();
  2628. request2->layer = currentLayer;
  2629. request2->query = std::unique_ptr<TLObject>(request);
  2630. if (LOGS_ENABLED) DEBUG_D("wrap in layer %s, flags = %d", typeid(*object).name(), request->flags);
  2631. return std::unique_ptr<TLObject>(request2);
  2632. }
  2633. }
  2634. return std::unique_ptr<TLObject>(object);
  2635. }
  2636. static const char *const url_symbols64 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
  2637. static unsigned char url_char_to_value[256];
  2638. static void init_base64url_table() {
  2639. static bool is_inited = []() {
  2640. std::fill(std::begin(url_char_to_value), std::end(url_char_to_value), static_cast<unsigned char>(64));
  2641. for (unsigned char i = 0; i < 64; i++) {
  2642. url_char_to_value[static_cast<size_t>(url_symbols64[i])] = i;
  2643. }
  2644. return true;
  2645. }();
  2646. assert(is_inited);
  2647. }
  2648. std::string base64UrlDecode(std::string base64) {
  2649. init_base64url_table();
  2650. size_t padding_length = 0;
  2651. while (!base64.empty() && base64.back() == '=') {
  2652. base64.pop_back();
  2653. padding_length++;
  2654. }
  2655. if (padding_length >= 3 || (padding_length > 0 && ((base64.size() + padding_length) & 3) != 0)) {
  2656. return "";
  2657. }
  2658. if ((base64.size() & 3) == 1) {
  2659. return "";
  2660. }
  2661. std::string output;
  2662. output.reserve(((base64.size() + 3) >> 2) * 3);
  2663. for (size_t i = 0; i < base64.size();) {
  2664. size_t left = std::min(base64.size() - i, static_cast<size_t>(4));
  2665. int c = 0;
  2666. for (size_t t = 0; t < left; t++) {
  2667. auto value = url_char_to_value[base64.c_str()[i++]];
  2668. if (value == 64) {
  2669. return "";
  2670. }
  2671. c |= value << ((3 - t) * 6);
  2672. }
  2673. output += static_cast<char>(static_cast<unsigned char>(c >> 16));
  2674. if (left == 2) {
  2675. if ((c & ((1 << 16) - 1)) != 0) {
  2676. return "";
  2677. }
  2678. } else {
  2679. output += static_cast<char>(static_cast<unsigned char>(c >> 8));
  2680. if (left == 3) {
  2681. if ((c & ((1 << 8) - 1)) != 0) {
  2682. return "";
  2683. }
  2684. } else {
  2685. output += static_cast<char>(static_cast<unsigned char>(c));
  2686. }
  2687. }
  2688. }
  2689. return output;
  2690. }
  2691. inline std::string decodeSecret(std::string secret) {
  2692. bool allHex = true;
  2693. for (char i : secret) {
  2694. if (!((i >= '0' && i <= '9') || (i >= 'a' && i <= 'f') || (i >= 'A' && i <= 'F'))) {
  2695. allHex = false;
  2696. break;
  2697. }
  2698. }
  2699. if (allHex) {
  2700. size_t size = secret.size() / 2;
  2701. char *result = new char[size];
  2702. for (int32_t i = 0; i < size; i++) {
  2703. result[i] = (char) (char2int(secret[i * 2]) * 16 + char2int(secret[i * 2 + 1]));
  2704. }
  2705. secret = std::string(result, size);
  2706. delete[] result;
  2707. return secret;
  2708. }
  2709. return base64UrlDecode(secret);
  2710. }
  2711. void ConnectionsManager::updateDcSettings(uint32_t dcNum, bool workaround) {
  2712. if (workaround) {
  2713. if (updatingDcSettingsWorkaround) {
  2714. return;
  2715. }
  2716. updatingDcSettingsWorkaround = true;
  2717. } else {
  2718. if (updatingDcSettings) {
  2719. return;
  2720. }
  2721. updatingDcSettings = true;
  2722. updatingDcStartTime = (int32_t) (getCurrentTimeMonotonicMillis() / 1000);
  2723. }
  2724. auto request = new TL_help_getConfig();
  2725. sendRequest(request, [&, workaround](TLObject *response, TL_error *error, int32_t networkType, int64_t responseTime) {
  2726. if ((!workaround && !updatingDcSettings) || (workaround && !updatingDcSettingsWorkaround)) {
  2727. return;
  2728. }
  2729. if (response != nullptr) {
  2730. auto config = (TL_config *) response;
  2731. clientBlocked = (config->flags & 256) != 0;
  2732. if (!workaround) {
  2733. int32_t updateIn = config->expires - getCurrentTime();
  2734. if (updateIn <= 0) {
  2735. updateIn = 120;
  2736. }
  2737. lastDcUpdateTime = (int32_t) (getCurrentTimeMonotonicMillis() / 1000) - DC_UPDATE_TIME + updateIn;
  2738. }
  2739. struct DatacenterInfo {
  2740. std::vector<TcpAddress> addressesIpv4;
  2741. std::vector<TcpAddress> addressesIpv6;
  2742. std::vector<TcpAddress> addressesIpv4Download;
  2743. std::vector<TcpAddress> addressesIpv6Download;
  2744. bool isCdn = false;
  2745. void addAddressAndPort(TL_dcOption *dcOption) {
  2746. std::vector<TcpAddress> *addresses;
  2747. if (!isCdn) {
  2748. isCdn = dcOption->cdn;
  2749. }
  2750. if (dcOption->media_only) {
  2751. if (dcOption->ipv6) {
  2752. addresses = &addressesIpv6Download;
  2753. } else {
  2754. addresses = &addressesIpv4Download;
  2755. }
  2756. } else {
  2757. if (dcOption->ipv6) {
  2758. addresses = &addressesIpv6;
  2759. } else {
  2760. addresses = &addressesIpv4;
  2761. }
  2762. }
  2763. for (auto & addresse : *addresses) {
  2764. if (addresse.address == dcOption->ip_address && addresse.port == dcOption->port) {
  2765. return;
  2766. }
  2767. }
  2768. std::string secret;
  2769. if (dcOption->secret != nullptr) {
  2770. secret = std::string((const char *) dcOption->secret->bytes, dcOption->secret->length);
  2771. }
  2772. if (LOGS_ENABLED) DEBUG_D("getConfig add %s:%d to dc%d, flags %d, has_secret = %d[%d], try_this_port_only = %d", dcOption->ip_address.c_str(), dcOption->port, dcOption->id, dcOption->flags, dcOption->secret != nullptr ? 1 : 0, dcOption->secret != nullptr ? dcOption->secret->length : 0, dcOption->thisPortOnly ? 1 : 0);
  2773. if (dcOption->thisPortOnly) {
  2774. addresses->insert(addresses->begin(), TcpAddress(dcOption->ip_address, dcOption->port, dcOption->flags, secret));
  2775. } else {
  2776. addresses->push_back(TcpAddress(dcOption->ip_address, dcOption->port, dcOption->flags, secret));
  2777. }
  2778. }
  2779. };
  2780. std::map<uint32_t, std::unique_ptr<DatacenterInfo>> map;
  2781. size_t count = config->dc_options.size();
  2782. for (uint32_t a = 0; a < count; a++) {
  2783. TL_dcOption *dcOption = config->dc_options[a].get();
  2784. auto iter = map.find((uint32_t) dcOption->id);
  2785. DatacenterInfo *info;
  2786. if (iter == map.end()) {
  2787. map[dcOption->id] = std::unique_ptr<DatacenterInfo>(info = new DatacenterInfo);
  2788. } else {
  2789. info = iter->second.get();
  2790. }
  2791. info->addAddressAndPort(dcOption);
  2792. }
  2793. if (!map.empty()) {
  2794. for (auto & iter : map) {
  2795. Datacenter *datacenter = getDatacenterWithId(iter.first);
  2796. DatacenterInfo *info = iter.second.get();
  2797. if (datacenter == nullptr) {
  2798. datacenter = new Datacenter(instanceNum, iter.first);
  2799. datacenters[iter.first] = datacenter;
  2800. }
  2801. datacenter->replaceAddresses(info->addressesIpv4, info->isCdn ? 8 : 0);
  2802. datacenter->replaceAddresses(info->addressesIpv6, info->isCdn ? 9 : 1);
  2803. datacenter->replaceAddresses(info->addressesIpv4Download, info->isCdn ? 10 : 2);
  2804. datacenter->replaceAddresses(info->addressesIpv6Download, info->isCdn ? 11 : 3);
  2805. if (iter.first == movingToDatacenterId) {
  2806. movingToDatacenterId = DEFAULT_DATACENTER_ID;
  2807. moveToDatacenter(iter.first);
  2808. }
  2809. }
  2810. saveConfig();
  2811. scheduleTask([&] {
  2812. processRequestQueue(AllConnectionTypes, 0);
  2813. });
  2814. }
  2815. if (delegate != nullptr) {
  2816. delegate->onUpdateConfig(config, instanceNum);
  2817. }
  2818. }
  2819. if (workaround) {
  2820. updatingDcSettingsWorkaround = false;
  2821. } else {
  2822. updatingDcSettings = false;
  2823. }
  2824. }, nullptr, RequestFlagEnableUnauthorized | RequestFlagWithoutLogin | RequestFlagUseUnboundKey | (workaround ? 0 : RequestFlagTryDifferentDc), dcNum == 0 ? currentDatacenterId : dcNum, workaround ? ConnectionTypeTemp : ConnectionTypeGeneric, true);
  2825. }
  2826. void ConnectionsManager::moveToDatacenter(uint32_t datacenterId) {
  2827. if (movingToDatacenterId == datacenterId) {
  2828. return;
  2829. }
  2830. movingToDatacenterId = datacenterId;
  2831. Datacenter *currentDatacenter = getDatacenterWithId(currentDatacenterId);
  2832. clearRequestsForDatacenter(currentDatacenter, HandshakeTypeAll);
  2833. if (currentUserId) {
  2834. auto request = new TL_auth_exportAuthorization();
  2835. request->dc_id = datacenterId;
  2836. sendRequest(request, [&, datacenterId](TLObject *response, TL_error *error, int32_t networkType, int64_t responseTime) {
  2837. if (error == nullptr) {
  2838. movingAuthorization = std::move(((TL_auth_exportedAuthorization *) response)->bytes);
  2839. authorizeOnMovingDatacenter();
  2840. } else {
  2841. moveToDatacenter(datacenterId);
  2842. }
  2843. }, nullptr, RequestFlagWithoutLogin, DEFAULT_DATACENTER_ID, ConnectionTypeGeneric, true);
  2844. } else {
  2845. authorizeOnMovingDatacenter();
  2846. }
  2847. }
  2848. void ConnectionsManager::authorizeOnMovingDatacenter() {
  2849. Datacenter *datacenter = getDatacenterWithId(movingToDatacenterId);
  2850. if (datacenter == nullptr) {
  2851. updateDcSettings(0, false);
  2852. return;
  2853. }
  2854. datacenter->recreateSessions(HandshakeTypeAll);
  2855. clearRequestsForDatacenter(datacenter, HandshakeTypeAll);
  2856. if (!datacenter->hasAuthKey(ConnectionTypeGeneric, 0) && !datacenter->isHandshakingAny()) {
  2857. datacenter->clearServerSalts(false);
  2858. datacenter->clearServerSalts(true);
  2859. datacenter->beginHandshake(HandshakeTypeAll, true);
  2860. }
  2861. if (movingAuthorization != nullptr) {
  2862. auto request = new TL_auth_importAuthorization();
  2863. request->id = currentUserId;
  2864. request->bytes = std::move(movingAuthorization);
  2865. sendRequest(request, [&](TLObject *response, TL_error *error, int32_t networkType, int64_t responseTime) {
  2866. if (error == nullptr) {
  2867. authorizedOnMovingDatacenter();
  2868. } else {
  2869. moveToDatacenter(movingToDatacenterId);
  2870. }
  2871. }, nullptr, RequestFlagWithoutLogin, datacenter->getDatacenterId(), ConnectionTypeGeneric, true);
  2872. } else {
  2873. authorizedOnMovingDatacenter();
  2874. }
  2875. }
  2876. void ConnectionsManager::authorizedOnMovingDatacenter() {
  2877. movingAuthorization.reset();
  2878. currentDatacenterId = movingToDatacenterId;
  2879. movingToDatacenterId = DEFAULT_DATACENTER_ID;
  2880. saveConfig();
  2881. scheduleTask([&] {
  2882. processRequestQueue(0, 0);
  2883. });
  2884. }
  2885. void ConnectionsManager::applyDatacenterAddress(uint32_t datacenterId, std::string ipAddress, uint32_t port) {
  2886. scheduleTask([&, datacenterId, ipAddress, port] {
  2887. Datacenter *datacenter = getDatacenterWithId(datacenterId);
  2888. if (datacenter != nullptr) {
  2889. std::vector<TcpAddress> addresses;
  2890. addresses.emplace_back(ipAddress, port, 0, "");
  2891. datacenter->suspendConnections(true);
  2892. datacenter->replaceAddresses(addresses, 0);
  2893. datacenter->resetAddressAndPortNum();
  2894. saveConfig();
  2895. if (datacenter->isHandshakingAny()) {
  2896. datacenter->beginHandshake(HandshakeTypeCurrent, true);
  2897. }
  2898. updateDcSettings(datacenterId, false);
  2899. }
  2900. });
  2901. }
  2902. ConnectionState ConnectionsManager::getConnectionState() {
  2903. return connectionState;
  2904. }
  2905. void ConnectionsManager::setDelegate(ConnectiosManagerDelegate *connectiosManagerDelegate) {
  2906. delegate = connectiosManagerDelegate;
  2907. }
  2908. void ConnectionsManager::setPushConnectionEnabled(bool value) {
  2909. pushConnectionEnabled = value;
  2910. Datacenter *datacenter = getDatacenterWithId(currentDatacenterId);
  2911. if (datacenter != nullptr) {
  2912. if (!pushConnectionEnabled) {
  2913. Connection *connection = datacenter->getPushConnection(false);
  2914. if (connection != nullptr) {
  2915. connection->suspendConnection();
  2916. }
  2917. } else {
  2918. datacenter->createPushConnection()->setSessionId(pushSessionId);
  2919. sendPing(datacenter, true);
  2920. }
  2921. }
  2922. }
  2923. inline bool checkPhoneByPrefixesRules(std::string phone, std::string rules) {
  2924. if (rules.empty() || phone.empty()) {
  2925. return true;
  2926. }
  2927. bool found = false;
  2928. std::stringstream ss(rules);
  2929. std::string prefix;
  2930. while (std::getline(ss, prefix, ',')) {
  2931. if (prefix.empty()) {
  2932. found = true;
  2933. } else if (prefix[0] == '+' && phone.find(prefix.substr(1)) == 0) {
  2934. found = true;
  2935. } else if (prefix[0] == '-' && phone.find(prefix.substr(1)) == 0) {
  2936. return false;
  2937. }
  2938. }
  2939. return found;
  2940. }
  2941. void ConnectionsManager::applyDnsConfig(NativeByteBuffer *buffer, std::string phone, int32_t date) {
  2942. scheduleTask([&, buffer, phone, date] {
  2943. int32_t realDate = date;
  2944. if (LOGS_ENABLED) DEBUG_D("trying to decrypt config %d", requestingSecondAddress);
  2945. TL_help_configSimple *config = Datacenter::decodeSimpleConfig(buffer);
  2946. if (config != nullptr && realDate == 0) {
  2947. realDate = config->date;
  2948. }
  2949. int currentDate = getCurrentTime();
  2950. if (config != nullptr && config->date <= currentDate && currentDate <= config->expires) {
  2951. if (realDate > 0 && requestingSecondAddressByTlsHashMismatch) {
  2952. timeDifference += (realDate - currentDate);
  2953. requestingSecondAddressByTlsHashMismatch = false;
  2954. }
  2955. for (auto & iter : config->rules) {
  2956. TL_accessPointRule *rule = iter.get();
  2957. if (!checkPhoneByPrefixesRules(phone, rule->phone_prefix_rules)) {
  2958. continue;
  2959. }
  2960. Datacenter *datacenter = getDatacenterWithId(rule->dc_id);
  2961. if (datacenter != nullptr) {
  2962. std::vector<TcpAddress> addresses;
  2963. for (auto iter2 = rule->ips.begin(); iter2 != rule->ips.end(); iter2++) {
  2964. IpPort *port = iter2->get();
  2965. const std::type_info &typeInfo = typeid(*port);
  2966. if (typeInfo == typeid(TL_ipPort)) {
  2967. auto ipPort = (TL_ipPort *) port;
  2968. addresses.emplace_back(ipPort->ipv4, ipPort->port, 0, "");
  2969. if (LOGS_ENABLED) DEBUG_D("got address %s and port %d for dc%d", ipPort->ipv4.c_str(), ipPort->port, rule->dc_id);
  2970. } else if (typeInfo == typeid(TL_ipPortSecret)) {
  2971. auto ipPort = (TL_ipPortSecret *) port;
  2972. addresses.emplace_back(ipPort->ipv4, ipPort->port, 0, std::string((const char *) ipPort->secret->bytes, ipPort->secret->length));
  2973. if (LOGS_ENABLED) DEBUG_D("got address %s and port %d for dc%d with secret", ipPort->ipv4.c_str(), ipPort->port, rule->dc_id);
  2974. }
  2975. }
  2976. if (!addresses.empty()) {
  2977. datacenter->replaceAddresses(addresses, TcpAddressFlagTemp);
  2978. Connection *connection = datacenter->getTempConnection(false);
  2979. if (connection != nullptr) {
  2980. connection->suspendConnection();
  2981. }
  2982. if (datacenter->isHandshakingAny()) {
  2983. datacenter->beginHandshake(HandshakeTypeCurrent, true);
  2984. }
  2985. updateDcSettings(rule->dc_id, true);
  2986. }
  2987. } else {
  2988. if (LOGS_ENABLED) DEBUG_D("config datacenter %d not found", rule->dc_id);
  2989. }
  2990. }
  2991. delete config;
  2992. } else {
  2993. if (config == nullptr) {
  2994. if (LOGS_ENABLED) DEBUG_D("can't decrypt dns config");
  2995. } else {
  2996. delete config;
  2997. if (LOGS_ENABLED) DEBUG_D("dns config not valid due to date or expire");
  2998. }
  2999. if (requestingSecondAddress == 2) {
  3000. requestingSecondAddress = 3;
  3001. delegate->onRequestNewServerIpAndPort(requestingSecondAddress, instanceNum);
  3002. } else if (requestingSecondAddress == 1) {
  3003. requestingSecondAddress = 2;
  3004. delegate->onRequestNewServerIpAndPort(requestingSecondAddress, instanceNum);
  3005. } else if (requestingSecondAddress == 0) {
  3006. requestingSecondAddress = 1;
  3007. delegate->onRequestNewServerIpAndPort(requestingSecondAddress, instanceNum);
  3008. } else {
  3009. requestingSecondAddress = 0;
  3010. }
  3011. }
  3012. buffer->reuse();
  3013. });
  3014. }
  3015. void ConnectionsManager::init(uint32_t version, int32_t layer, int32_t apiId, std::string deviceModel, std::string systemVersion, std::string appVersion, std::string langCode, std::string systemLangCode, std::string configPath, std::string logPath, std::string regId, std::string cFingerpting, std::string installerId, std::string packageId, int32_t timezoneOffset, int64_t userId, bool isPaused, bool enablePushConnection, bool hasNetwork, int32_t networkType) {
  3016. currentVersion = version;
  3017. currentLayer = layer;
  3018. currentApiId = apiId;
  3019. currentConfigPath = configPath;
  3020. currentDeviceModel = deviceModel;
  3021. currentSystemVersion = systemVersion;
  3022. currentAppVersion = appVersion;
  3023. currentLangCode = langCode;
  3024. currentRegId = regId;
  3025. certFingerprint = cFingerpting;
  3026. installer = installerId;
  3027. package = packageId;
  3028. currentDeviceTimezone = timezoneOffset;
  3029. currentSystemLangCode = systemLangCode;
  3030. currentUserId = userId;
  3031. currentLogPath = logPath;
  3032. pushConnectionEnabled = enablePushConnection;
  3033. currentNetworkType = networkType;
  3034. networkAvailable = hasNetwork;
  3035. if (isPaused) {
  3036. lastPauseTime = getCurrentTimeMonotonicMillis();
  3037. }
  3038. if (!currentConfigPath.empty() && currentConfigPath.find_last_of('/') != currentConfigPath.size() - 1) {
  3039. currentConfigPath += "/";
  3040. }
  3041. if (!logPath.empty()) {
  3042. LOGS_ENABLED = true;
  3043. FileLog::getInstance().init(logPath);
  3044. }
  3045. loadConfig();
  3046. bool needLoadConfig = false;
  3047. if (systemLangCode.compare(lastInitSystemLangcode) != 0) {
  3048. lastInitSystemLangcode = systemLangCode;
  3049. for (auto & datacenter : datacenters) {
  3050. datacenter.second->resetInitVersion();
  3051. }
  3052. needLoadConfig = true;
  3053. saveConfig();
  3054. }
  3055. if (!needLoadConfig && currentUserId != 0) {
  3056. Datacenter *datacenter = getDatacenterWithId(DEFAULT_DATACENTER_ID);
  3057. if (datacenter != nullptr && datacenter->lastInitVersion != currentVersion) {
  3058. needLoadConfig = true;
  3059. }
  3060. }
  3061. pthread_create(&networkThread, nullptr, (ConnectionsManager::ThreadProc), this);
  3062. if (needLoadConfig) {
  3063. updateDcSettings(0, false);
  3064. }
  3065. }
  3066. void ConnectionsManager::setProxySettings(std::string address, uint16_t port, std::string username, std::string password, std::string secret) {
  3067. scheduleTask([&, address, port, username, password, secret] {
  3068. std::string newSecret = decodeSecret(secret);
  3069. bool secretChanged = proxySecret != newSecret;
  3070. bool reconnect = proxyAddress != address || proxyPort != port || username != proxyUser || proxyPassword != password || secretChanged;
  3071. proxyAddress = address;
  3072. proxyPort = port;
  3073. proxyUser = username;
  3074. proxyPassword = password;
  3075. proxySecret = std::move(newSecret);
  3076. if (!proxyAddress.empty() && connectionState == ConnectionStateConnecting) {
  3077. connectionState = ConnectionStateConnectingViaProxy;
  3078. if (delegate != nullptr) {
  3079. delegate->onConnectionStateChanged(connectionState, instanceNum);
  3080. }
  3081. } else if (proxyAddress.empty() && connectionState == ConnectionStateConnectingViaProxy) {
  3082. connectionState = ConnectionStateConnecting;
  3083. if (delegate != nullptr) {
  3084. delegate->onConnectionStateChanged(connectionState, instanceNum);
  3085. }
  3086. }
  3087. if (secretChanged) {
  3088. Datacenter *datacenter = getDatacenterWithId(DEFAULT_DATACENTER_ID);
  3089. if (datacenter != nullptr) {
  3090. datacenter->resetInitVersion();
  3091. }
  3092. }
  3093. if (reconnect) {
  3094. for (auto & datacenter : datacenters) {
  3095. datacenter.second->suspendConnections(true);
  3096. }
  3097. Datacenter *datacenter = getDatacenterWithId(DEFAULT_DATACENTER_ID);
  3098. if (datacenter != nullptr && datacenter->isHandshakingAny()) {
  3099. datacenter->beginHandshake(HandshakeTypeCurrent, true);
  3100. }
  3101. processRequestQueue(0, 0);
  3102. }
  3103. });
  3104. }
  3105. void ConnectionsManager::setLangCode(std::string langCode) {
  3106. scheduleTask([&, langCode] {
  3107. if (currentLangCode == langCode) {
  3108. return;
  3109. }
  3110. currentLangCode = langCode;
  3111. for (auto & datacenter : datacenters) {
  3112. datacenter.second->resetInitVersion();
  3113. }
  3114. saveConfig();
  3115. });
  3116. }
  3117. void ConnectionsManager::setRegId(std::string regId) {
  3118. scheduleTask([&, regId] {
  3119. if (currentRegId == regId) {
  3120. return;
  3121. }
  3122. currentRegId = regId;
  3123. for (auto & datacenter : datacenters) {
  3124. datacenter.second->resetInitVersion();
  3125. }
  3126. updateDcSettings(0, false);
  3127. saveConfig();
  3128. });
  3129. }
  3130. void ConnectionsManager::setSystemLangCode(std::string langCode) {
  3131. scheduleTask([&, langCode] {
  3132. if (currentSystemLangCode == langCode) {
  3133. return;
  3134. }
  3135. lastInitSystemLangcode = currentSystemLangCode = langCode;
  3136. for (auto & datacenter : datacenters) {
  3137. datacenter.second->resetInitVersion();
  3138. }
  3139. saveConfig();
  3140. updateDcSettings(0, false);
  3141. });
  3142. }
  3143. void ConnectionsManager::resumeNetwork(bool partial) {
  3144. scheduleTask([&, partial] {
  3145. if (lastMonotonicPauseTime != 0) {
  3146. int64_t diff = (getCurrentTimeMonotonicMillis() - lastMonotonicPauseTime) / 1000;
  3147. int64_t systemDiff = getCurrentTime() - lastSystemPauseTime;
  3148. if (systemDiff < 0 || abs(systemDiff - diff) > 2) {
  3149. timeDifference -= (systemDiff - diff);
  3150. }
  3151. }
  3152. if (partial) {
  3153. if (networkPaused) {
  3154. lastMonotonicPauseTime = lastPauseTime = getCurrentTimeMonotonicMillis();
  3155. lastSystemPauseTime = getCurrentTime();
  3156. networkPaused = false;
  3157. if (LOGS_ENABLED) DEBUG_D("wakeup network in background account%u", instanceNum);
  3158. } else if (lastPauseTime != 0) {
  3159. lastMonotonicPauseTime = lastPauseTime = getCurrentTimeMonotonicMillis();
  3160. lastSystemPauseTime = getCurrentTime();
  3161. networkPaused = false;
  3162. if (LOGS_ENABLED) DEBUG_D("reset sleep timeout account%u", instanceNum);
  3163. }
  3164. } else {
  3165. lastPauseTime = 0;
  3166. lastMonotonicPauseTime = 0;
  3167. lastSystemPauseTime = 0;
  3168. networkPaused = false;
  3169. if (LOGS_ENABLED) DEBUG_D("wakeup network account%u", instanceNum);
  3170. }
  3171. if (!networkPaused) {
  3172. for (auto & datacenter : datacenters) {
  3173. if (datacenter.second->isHandshaking(false)) {
  3174. datacenter.second->createGenericConnection()->connect();
  3175. } else if (datacenter.second->isHandshaking(true)) {
  3176. datacenter.second->createGenericMediaConnection()->connect();
  3177. }
  3178. }
  3179. }
  3180. });
  3181. }
  3182. void ConnectionsManager::pauseNetwork() {
  3183. if (lastPauseTime != 0) {
  3184. return;
  3185. }
  3186. lastMonotonicPauseTime = lastPauseTime = getCurrentTimeMonotonicMillis();
  3187. lastSystemPauseTime = getCurrentTime();
  3188. saveConfig();
  3189. }
  3190. void ConnectionsManager::setNetworkAvailable(bool value, int32_t type, bool slow) {
  3191. scheduleTask([&, value, type, slow] {
  3192. networkAvailable = value;
  3193. currentNetworkType = type;
  3194. networkSlow = slow;
  3195. if (!networkAvailable) {
  3196. connectionState = ConnectionStateWaitingForNetwork;
  3197. } else {
  3198. for (auto & datacenter : datacenters) {
  3199. if (datacenter.second->isHandshaking(false)) {
  3200. datacenter.second->createGenericConnection()->connect();
  3201. } else if (datacenter.second->isHandshaking(true)) {
  3202. datacenter.second->createGenericMediaConnection()->connect();
  3203. }
  3204. }
  3205. }
  3206. if (delegate != nullptr) {
  3207. delegate->onConnectionStateChanged(connectionState, instanceNum);
  3208. }
  3209. });
  3210. }
  3211. void ConnectionsManager::setIpStrategy(uint8_t value) {
  3212. scheduleTask([&, value] {
  3213. ipStrategy = value;
  3214. });
  3215. }
  3216. int64_t ConnectionsManager::checkProxy(std::string address, uint16_t port, std::string username, std::string password, std::string secret, onRequestTimeFunc requestTimeFunc, jobject ptr1) {
  3217. auto proxyCheckInfo = new ProxyCheckInfo();
  3218. proxyCheckInfo->address = address;
  3219. proxyCheckInfo->port = port;
  3220. proxyCheckInfo->username = username;
  3221. proxyCheckInfo->password = password;
  3222. proxyCheckInfo->secret = decodeSecret(secret);
  3223. proxyCheckInfo->onRequestTime = requestTimeFunc;
  3224. proxyCheckInfo->pingId = ++lastPingProxyId;
  3225. proxyCheckInfo->instanceNum = instanceNum;
  3226. proxyCheckInfo->ptr1 = ptr1;
  3227. scheduleCheckProxyInternal(proxyCheckInfo);
  3228. return proxyCheckInfo->pingId;
  3229. }
  3230. void ConnectionsManager::scheduleCheckProxyInternal(ProxyCheckInfo *proxyCheckInfo) {
  3231. scheduleTask([&, proxyCheckInfo] {
  3232. checkProxyInternal(proxyCheckInfo);
  3233. });
  3234. }
  3235. void ConnectionsManager::checkProxyInternal(ProxyCheckInfo *proxyCheckInfo) {
  3236. int32_t freeConnectionNum = -1;
  3237. if (proxyActiveChecks.size() != PROXY_CONNECTIONS_COUNT) {
  3238. for (int32_t a = 0; a < PROXY_CONNECTIONS_COUNT; a++) {
  3239. bool found = false;
  3240. for (auto & proxyActiveCheck : proxyActiveChecks) {
  3241. if (proxyActiveCheck.get()->connectionNum == a) {
  3242. found = true;
  3243. break;
  3244. }
  3245. }
  3246. if (!found) {
  3247. freeConnectionNum = a;
  3248. break;
  3249. }
  3250. }
  3251. }
  3252. if (freeConnectionNum == -1) {
  3253. proxyCheckQueue.push_back(std::unique_ptr<ProxyCheckInfo>(proxyCheckInfo));
  3254. } else {
  3255. auto connectionType = (ConnectionType) (ConnectionTypeProxy | (freeConnectionNum << 16));
  3256. Datacenter *datacenter = getDatacenterWithId(DEFAULT_DATACENTER_ID);
  3257. Connection *connection = datacenter->getProxyConnection((uint8_t) freeConnectionNum, true, false);
  3258. if (connection != nullptr) {
  3259. connection->setOverrideProxy(proxyCheckInfo->address, proxyCheckInfo->port, proxyCheckInfo->username, proxyCheckInfo->password, proxyCheckInfo->secret);
  3260. connection->suspendConnection();
  3261. proxyCheckInfo->connectionNum = freeConnectionNum;
  3262. auto request = new TL_ping();
  3263. request->ping_id = proxyCheckInfo->pingId;
  3264. proxyCheckInfo->requestToken = sendRequest(request, nullptr, nullptr, RequestFlagEnableUnauthorized | RequestFlagWithoutLogin, DEFAULT_DATACENTER_ID, connectionType, true, 0);
  3265. proxyActiveChecks.push_back(std::unique_ptr<ProxyCheckInfo>(proxyCheckInfo));
  3266. } else if (PFS_ENABLED) {
  3267. if (datacenter->isHandshaking(false)) {
  3268. datacenter->beginHandshake(HandshakeTypeTemp, false);
  3269. }
  3270. proxyCheckQueue.push_back(std::unique_ptr<ProxyCheckInfo>(proxyCheckInfo));
  3271. }
  3272. }
  3273. }
  3274. #ifdef ANDROID
  3275. void ConnectionsManager::useJavaVM(JavaVM *vm, bool useJavaByteBuffers) {
  3276. javaVm = vm;
  3277. if (useJavaByteBuffers) {
  3278. JNIEnv *env = nullptr;
  3279. if (javaVm->GetEnv((void **) &env, JNI_VERSION_1_6) != JNI_OK) {
  3280. if (LOGS_ENABLED) DEBUG_E("can't get jnienv");
  3281. exit(1);
  3282. }
  3283. jclass_ByteBuffer = (jclass) env->NewGlobalRef(env->FindClass("java/nio/ByteBuffer"));
  3284. if (jclass_ByteBuffer == nullptr) {
  3285. if (LOGS_ENABLED) DEBUG_E("can't find java ByteBuffer class");
  3286. exit(1);
  3287. }
  3288. jclass_ByteBuffer_allocateDirect = env->GetStaticMethodID(jclass_ByteBuffer, "allocateDirect", "(I)Ljava/nio/ByteBuffer;");
  3289. if (jclass_ByteBuffer_allocateDirect == nullptr) {
  3290. if (LOGS_ENABLED) DEBUG_E("can't find java ByteBuffer allocateDirect");
  3291. exit(1);
  3292. }
  3293. if (LOGS_ENABLED) DEBUG_D("using java ByteBuffer");
  3294. }
  3295. }
  3296. #endif