The SwappyItems Key-Values Store
SwappyItems.hpp
Go to the documentation of this file.
1 #ifndef __SWAPPY_ITEMS_HPP__
2 #define __SWAPPY_ITEMS_HPP__ 1
3 
4 #include <inttypes.h> // uintX_t stuff
5 #include <iostream> // sizeof and ios namespace
6 #include <algorithm> // find, minmax
7 #include <stdexcept> // std::out_of_range
8 #include <utility> // make_pair
9 #include <cstdlib> // srand, rand, exit
10 #include <limits> // min max of int... etc
11 
12 #include <vector> // store file information
13 #include <set> // a set of candidate files
14 #include <unordered_set> // a set of keys
15 #include <unordered_map> // for a key->data
16 #include <deque> // for key order (mru)
17 #include <map> // for store keys in a sorted way
18 
19 // we need it for PRId32 in snprintf
20 #define __STDC_FORMAT_MACROS
21 
22 #include <fstream> // file operations
23 #include <experimental/filesystem> // check and create directories
24 namespace filesys = std::experimental::filesystem;
25 
26 #include <execution> // c++17 and you need -ltbb
27 #include <mutex> // for mutex
28 #include <thread> // you need -lpthread
29 
30 
45 template <
46  class TKEY, class TVALUE,
47  int EACHFILE = 16384, int OLDIES = 5, int RAMSIZE = 3, int BLOOMBITS = 4, int MASKLENGTH = (2* 4*16384)
48 >
49 class SwappyItems {
50 
51 public:
52  typedef std::pair<TVALUE, std::vector<TKEY> > Data; // just for simplifing next line
53 
54  struct statistic_s {
55  uint64_t size = 0;
56  uint64_t fileKB = 0;
57  uint64_t queue = 0;
58 
59  uint64_t updates = 0;
60  uint64_t deletes = 0;
61 
62  uint64_t bloomSaysNotIn = 0;
63  uint64_t bloomFails = 0;
64 
65  uint64_t rangeSaysNo = 0;
66  uint64_t rangeFails = 0;
67 
68  uint64_t swaps = 0;
69  uint64_t fileLoads = 0;
70 
71  TKEY maxKey;
72  TKEY minKey;
73  };
74 
75 private:
76  typedef uint32_t Id;
77  typedef uint32_t Fid; // File id
78 
79  struct Detail {
80  TKEY minimum;
81  TKEY maximum;
82  std::vector<bool> bloomMask;
84  };
85 
86  struct Qentry {
87  TKEY key;
88  bool deleted;
89  };
90 
91  typedef uint32_t Bid;
92  typedef std::set<Bid> Fingerprint;
93 
94  typedef std::unordered_map<TKEY,Data> Ram;
95  typedef std::vector<Detail> Buckets;
96  typedef std::deque<Qentry> Order;
97 
98  // the item store in RAM
102 
103  // a queue for the keys
105 
106  // for each filenr we store min/max value and a bloom mask to filter
108  struct statistic_s statistic;
109 
110  const char _prefix[7] = "./temp";
111  char _swappypath[256];
112 
113  std::mutex mu_FsearchSuccess;
115 
116 public:
117 
123  bool set (TKEY key, TVALUE value) {
124  if (load(key)) {
125  // just update
126  // reverse search should be faster
127  auto it = std::find_if(std::execution::par, _mru.rbegin(), _mru.rend(),
128  [key] (const Qentry & qe) {
129  return (qe.key == key) && (qe.deleted == false);
130  }
131  );
132  it->deleted = true;
133  _mru.push_back(Qentry{key,false});
134  _ramList[key] = std::make_pair(value, _ramList[key].second);
135  ++statistic.updates;
136  return false;
137  } else {
138  // key is new
139  maySwap();
140  ++statistic.size;
141  _mru.push_back(Qentry{key,false});
142  _ramList[key] = std::make_pair(value, std::vector<TKEY>(0));
143  // update min/max of RAM
144  if (key > _ramMaximum) _ramMaximum = key;
145  if (key < _ramMinimum) _ramMinimum = key;
146  return true;
147  }
148  }
149 
155  bool set (TKEY key, TVALUE value, std::vector<TKEY> refs) {
156  if (load(key)) {
157  // just update
158  // reverse search should be faster
159  auto it = std::find_if(std::execution::par, _mru.rbegin(), _mru.rend(),
160  [key] (const Qentry & qe) {
161  return (qe.key == key) && (qe.deleted == false);
162  }
163  );
164  it->deleted = true;
165  _mru.push_back(Qentry{key,false});
166  _ramList[key] = std::make_pair(value, refs);
167  ++statistic.updates;
168  return false;
169  } else {
170  // key is new
171  maySwap();
172  ++statistic.size;
173  _mru.push_back(Qentry{key,false});
174  _ramList[key] = std::make_pair(value, refs);
175  // update min/max of RAM
176  if (key > _ramMaximum) _ramMaximum = key;
177  if (key < _ramMinimum) _ramMinimum = key;
178  return true;
179  }
180  }
181 
188  Data * get (const TKEY & key) {
189  if (load(key)) {
190  // reverse search should be faster
191  auto it = std::find_if(std::execution::par, _mru.rbegin(), _mru.rend(),
192  [key] (const Qentry & qe) {
193  return (qe.key == key) && (qe.deleted == false);
194  }
195  );
196  // every get fills the queue with "deleted = true" on this key, thus older entries are never equal false
197  it->deleted = true;
198  _mru.push_back(Qentry{key,false});
199  return &(_ramList[key]);
200  } else {
201  return nullptr;
202  }
203  }
204 
211  bool del (const TKEY & key) {
212  if (load(key)) {
213  // reverse search should be faster
214  auto it = std::find_if(std::execution::par, _mru.rbegin(), _mru.rend(),
215  [key] (const Qentry & qe) {
216  return (qe.key == key) && (qe.deleted == false);
217  }
218  );
219  it->deleted = true;
220  // we really have to delete it in ram, because load() search key initialy in ram!
221  _ramList.erase(key);
222 
223  --statistic.size;
224  ++statistic.deletes;
225 
226  if (key == _ramMaximum || key == _ramMinimum) ramMinMax();
227  return true;
228  } else {
229  return false;
230  }
231  }
232 
233  /*
234  TKEY min(void) {
235  TKEY omin = std::numeric_limits<TKEY>::max();
237  auto result = std::min(std::execution::par, _buckets, [](Detail const & lhs, Detail const & rhs) {
239 
240  // a deleted left value should never be a minimum
241  if (lhs.minimum == lhs.maximum) return false;
242  // a deleted right value should never be bigger
243  if (rhs.minimum == rhs.maximum) return true;
244 
245  return (lhs.minimum < rhs.minimum);
246  });
247  omin = result.minimum;
248  _ramMaximum = result.second;
249  return _ramMinimum;
250  }
251  */
252 
256  TKEY min (void) {
257  TKEY val = std::numeric_limits<TKEY>::max();
259  std::for_each (_buckets.begin(), _buckets.end(), [&](Detail finfo) {
260  if ( finfo.minimum != finfo.maximum ) { // it is not deleted!
261  if (finfo.minimum < val) val = finfo.minimum;
262  }
263  });
264 
265  if (_ramMinimum < val) val = _ramMinimum;
266  return val;
267  }
268 
272  TKEY max (void) {
273  TKEY val = std::numeric_limits<TKEY>::min();
275  std::for_each (_buckets.begin(), _buckets.end(), [&](Detail finfo) {
276  if ( finfo.minimum != finfo.maximum ) { // it is not deleted!
277  if (finfo.maximum > val) val = finfo.maximum;
278  }
279  });
280 
281  if (_ramMaximum > val) val = _ramMaximum;
282  return val;
283  }
284 
288  struct statistic_s getStatistic (void) {
289  statistic.fileKB = (_buckets.size() * EACHFILE * (sizeof(TKEY) + sizeof(TVALUE))/1000);
290  statistic.queue = _mru.size();
291  statistic.maxKey = this->max();
292  statistic.minKey = this->min();
293  return statistic;
294  }
295 
302  SwappyItems (int swappyId) {
303 
304  statistic.size = 0;
305  statistic.fileKB = 0;
306  statistic.queue = 0;
307 
308  statistic.updates = 0;
309  statistic.deletes = 0;
310 
311  statistic.bloomSaysNotIn = 0;
312  statistic.rangeSaysNo = 0;
313  statistic.rangeFails = 0;
314 
315  statistic.swaps = 0;
316  statistic.fileLoads = 0;
317 
318  _ramMinimum = std::numeric_limits<TKEY>::max();
319  _ramMaximum = std::numeric_limits<TKEY>::min();
320 
321  statistic.minKey = _ramMinimum;
322  statistic.maxKey = _ramMaximum;
323 
324  // make quasi hash from (swappyId, template parameter) instead of just swappyId.
325  // this should fix problems with other template paramter and loading hibernate data!
326  snprintf(
327  _swappypath, 512,
328  "%s%d-"
329  "%d-%d-%d-%d-%d",
330  _prefix,
331  swappyId,
332  EACHFILE, OLDIES, RAMSIZE, BLOOMBITS, MASKLENGTH
333  );
334 
335  if (filesys::exists(_swappypath)) {
336  // try to wake up after hibernate
337  if (false == wakeup()) {
338  fprintf(stderr, "folder '%s' still exists without hibernate data! Please delete or rename it!\n", _swappypath);
339  exit(0);
340  } else {
341  printf(
342  "# wakeup on items, updates, deletes: "
343  "%10ld %10" PRId64 " %10" PRId64 "\n",
344  statistic.size,
345  statistic.updates,
346  statistic.deletes
347  );
348  }
349  } else {
350  filesys::create_directory(_swappypath);
351  }
352  }
353 
357  ~SwappyItems (void) {
358  hibernate();
359  }
360 
361 private:
362 
368  bool wakeup (void) {
369  char filename[512];
370  TKEY loadedKey;
371  TKEY loadedKey2;
372  TVALUE loadedValue;
373  std::ifstream file;
374 
375  snprintf(filename, 512, "%s/hibernate.ramlist", _swappypath);
376  file.open(filename, std::ios::in | std::ios::binary);
377  Id length;
378  Id lengthVec;
379  file.read((char *) &length, sizeof(Id));
380  for (Id c = 0; c < length; ++c) {
381  std::vector<TKEY> vecdata(0);
382  file.read((char *) &loadedKey, sizeof(TKEY));
383  file.read((char *) &loadedValue, sizeof(TVALUE));
384  // stored vector?
385  file.read((char *) &lengthVec, sizeof(Id));
386  for (Id j=0; j<lengthVec; ++j) {
387  file.read((char *) &loadedKey2, sizeof(TKEY));
388  vecdata.push_back(loadedKey2);
389  }
390  _ramList[loadedKey] = std::make_pair(loadedValue, vecdata);
391  }
392  file.close();
393 
394  snprintf(filename, 512, "%s/hibernate.statistic", _swappypath);
395  file.open(filename, std::ios::in | std::ios::binary);
396  file.read((char *) &statistic, sizeof(statistic_s));
397  file.read((char *) &_ramMinimum, sizeof(TKEY));
398  file.read((char *) &_ramMaximum, sizeof(TKEY));
399  file.close();
400 
401  snprintf(filename, 512, "%s/hibernate.mru", _swappypath);
402  file.open(filename, std::ios::in | std::ios::binary);
403  file.read((char *) &length, sizeof(Id));
404  Qentry qe;
405  for (Id c = 0; c < length; ++c) {
406  file.read((char *) &(qe.key), sizeof(TKEY));
407  file.read((char *) &(qe.deleted), sizeof(Qentry::deleted));
408  _mru.push_back(qe);
409  }
410  file.close();
411 
412  snprintf(filename, 512, "%s/hibernate.buckets", _swappypath);
413  file.open(filename, std::ios::in | std::ios::binary);
414  file.read((char *) &length, sizeof(Id));
415  Detail detail;
416  char cbit;
417  for (Fid c = 0; c < length; ++c) {
418  file.read((char *) &(detail.minimum), sizeof(TKEY));
419  file.read((char *) &(detail.maximum), sizeof(TKEY));
420 
421  detail.bloomMask = std::vector<bool>(MASKLENGTH, true);
422 
423  for (Bid b=0; b < MASKLENGTH; ++b) {
424  file.get(cbit);
425  if (cbit == 'o') detail.bloomMask[b] = false;
426  }
427  file.read((char *) &(detail.fid), sizeof(Fid));
428 
429  _buckets.push_back(detail);
430  }
431  file.close();
432 
433  return true;
434  }
435 
441  void hibernate (void) {
442  char filename[512];
443 
444  snprintf(filename, 512, "%s/hibernate.ramlist", _swappypath);
445  std::ofstream file(filename, std::ios::out | std::ios::binary);
446  Id length = _ramList.size();
447  file.write((char *) &length, sizeof(Id));
448  for (auto it = _ramList.begin(); it != _ramList.end(); ++it) {
449  file.write((char *) &(it->first), sizeof(TKEY));
450  file.write((char *) &(it->second.first), sizeof(TVALUE));
451  // stored vector?
452  length = it->second.second.size();
453  file.write((char *) &length, sizeof(Id));
454  for (Id j=0; j<length; ++j) {
455  file.write((char *) &(it->second.second[j]), sizeof(TKEY));
456  }
457  }
458  file.close();
459 
460  snprintf(filename, 512, "%s/hibernate.statistic", _swappypath);
461  file.open(filename, std::ios::out | std::ios::binary);
462  file.write((char *) &statistic, sizeof(statistic_s));
463  file.write((char *) &_ramMinimum, sizeof(TKEY));
464  file.write((char *) &_ramMaximum, sizeof(TKEY));
465  file.close();
466 
467  snprintf(filename, 512, "%s/hibernate.mru", _swappypath);
468  file.open(filename, std::ios::out | std::ios::binary);
469  length = _mru.size();
470  file.write((char *) &length, sizeof(Id));
471  for (auto it = _mru.begin(); it != _mru.end(); ++it) {
472  file.write((char *) &(it->key), sizeof(TKEY));
473  file.write((char *) &(it->deleted), sizeof(Qentry::deleted));
474  }
475  file.close();
476 
477  snprintf(filename, 512, "%s/hibernate.buckets", _swappypath);
478  file.open(filename, std::ios::out | std::ios::binary);
479  length = _buckets.size();
480  file.write((char *) &length, sizeof(Id));
481  for (auto it = _buckets.begin(); it != _buckets.end(); ++it) {
482  file.write((char *) &(it->minimum), sizeof(TKEY));
483  file.write((char *) &(it->maximum), sizeof(TKEY));
484  for (bool b : it->bloomMask) {
485  if (b) {
486  file.put('i');
487  } else {
488  file.put('o');
489  }
490  }
491  file.write((char *) &(it->fid), sizeof(Fid));
492  }
493  file.close();
494  }
495 
496  Fingerprint getFingerprint (const TKEY & key) {
497  srand(key);
498  Fingerprint fp;
499  for(int i=0; i < BLOOMBITS; ++i) {
500  fp.insert( rand()%(MASKLENGTH) );
501  }
502  return fp;
503  }
504 
510  bool load (const TKEY & key) {
511  try {
512  _ramList.at(key);
513  return true;
514  } catch (const std::out_of_range & oor) {
515  return loadFromFiles(key);
516  }
517  }
518 
524  bool loadFromFiles (const TKEY & key) {
525  std::vector<Fid> candidates;
526  std::mutex m;
527  Fingerprint fp = getFingerprint(key);
528  fsearchSuccess = false;
529 
530  std::for_each (std::execution::par, _buckets.begin(), _buckets.end(), [&](Detail finfo) {
531  if ( finfo.minimum != finfo.maximum ) { // it is not deleted!
532 
533  if ( (key < finfo.minimum) || (key > finfo.maximum) ) {
534  // key is smaller then the smallest or bigger than the biggest
535  m.lock();
536  ++statistic.rangeSaysNo;
537  m.unlock();
538  } else {
539  // check bloom (is it possible, that this key is in that file?)
540  bool success = true;
541  for (auto b : fp) {
542  if (finfo.bloomMask[b] == false) {
543  success = false;
544  m.lock();
545  ++statistic.bloomSaysNotIn;
546  m.unlock();
547  break;
548  }
549  }
550  if (success) {
551  m.lock();
552  candidates.push_back(finfo.fid);
553  m.unlock();
554  }
555  }
556  }
557 
558  });
559 
560  if (candidates.size() > 0) {
561  // start workers
562  std::vector<std::thread *> workers;
563  for (unsigned i = 0; i < candidates.size(); ++i) {
564  workers.push_back( new std::thread(&SwappyItems::loadFromFile, this, candidates[i], key) );
565  }
566  // wait for workers
567  for (auto w : workers) if (w->joinable()) w->join();
568  // cleanup
569  for (auto w : workers) delete w;
570  }
571 
572  if (fsearchSuccess) return true;
573 
574  if (!fsearchSuccess && candidates.size() > 0) {
575  // many candidates, but finaly not found in file
576  ++statistic.rangeFails;
577  }
578  return false;
579  }
580 
591  void loadFromFile (Fid fid, TKEY key) {
592  Ram temp;
593  Id length;
594  TKEY loadedKey;
595  TKEY loadedKey2;
596  TVALUE loadedValue;
597 
598  bool result = false;
599 
600  char filename[512];
601  snprintf(filename, 512, "%s/%" PRId32 ".bucket", _swappypath, fid);
602  std::ifstream file(filename, std::ios::in | std::ios::binary);
603 
604  for (Id c = 0; c < EACHFILE; ++c) {
605  std::vector<TKEY> vecdata(0);
606  file.read((char *) &loadedKey, sizeof(TKEY));
607  file.read((char *) &loadedValue, sizeof(TVALUE));
608  // stored vector?
609  file.read((char *) &length, sizeof(Id));
610  for (Id j=0; j<length; ++j) {
611  file.read((char *) &loadedKey2, sizeof(TKEY));
612  vecdata.push_back(loadedKey2);
613  }
614  temp[loadedKey] = std::make_pair(loadedValue, vecdata);
615 
616  if (loadedKey == key) {
617  result = true;
618  mu_FsearchSuccess.lock();
619  fsearchSuccess = true;
620  ++statistic.fileLoads;
621  mu_FsearchSuccess.unlock();
622  }
623  if ((result == false) && fsearchSuccess) {
624  file.close();
625  return;
626  }
627  }
628  file.close();
629 
630  if (result) {
631  // store possible min/max for ram
632  TKEY omin = _buckets[fid].minimum;
633  TKEY omax = _buckets[fid].maximum;
634 
635  // ok, key exist. clean mask now, because we do not
636  // need the (still undeleted) file anymore
637  _buckets[fid].minimum = 0;
638  _buckets[fid].maximum = 0;
639 
640  // we try to make place for the filecontent in RAM ...
641  maySwap(true);
642 
643  // ... and load the stuff
644  for (auto key_val : temp) {
645  _ramList[key_val.first] = key_val.second;
646  _mru.push_back(Qentry{key_val.first,false});
647  }
648  // update ram min/max
649  if (omax > _ramMaximum) _ramMaximum = omax;
650  if (omin < _ramMinimum) _ramMinimum = omin;
651  }
652  }
653 
662  void maySwap (bool reload = false) {
663  Id needed = reload? EACHFILE : 0;
664  Id pos;
665 
666  // no need to swap files?
667  if ( (_ramList.size() + needed) < (RAMSIZE * EACHFILE * OLDIES) ) {
668  // cleanup queue instead?
669  if ( _mru.size() > ( (RAMSIZE+2) * EACHFILE*OLDIES) ) {
670  Order newMRU;
671  // remove some deleted items
672  for (auto it = _mru.begin(); it != _mru.end(); ++it) {
673  if (it->deleted == false) newMRU.push_back(*it);
674  }
675  _mru = newMRU;
676  }
677  return;
678  }
679 
680  Detail detail{0, 0};
681  Fingerprint fp;
682 
683  std::map<TKEY,Data> temp; // map is sorted by key!
684  bool success;
685 
686  ++statistic.swaps;
687 
688  // remove old items from front and move them into temp
689  for (pos = 0; pos < (EACHFILE*OLDIES); ) {
690  Qentry qe = _mru.front();
691  _mru.pop_front();
692  if (qe.deleted == false) {
693  temp[qe.key] = _ramList[qe.key];
694  _ramList.erase(qe.key);
695  ++pos; // we only count undeleted stuff!
696  }
697  }
698 
699  // run through sorted items
700  Id written = 0;
701  std::ofstream file;
702  typename std::map<TKEY,Data>::iterator it;
703 
704  for (it=temp.begin(); it!=temp.end(); ++it) {
705 
706  // open -----------------------------------
707  if ((written%(EACHFILE)) == 0) {
708  success = false;
709 
710  // find empty bucket, else create one on position ".size()"
711  for (Fid fid = 0; fid < _buckets.size(); ++fid) {
712  if (_buckets[fid].minimum == _buckets[fid].maximum) {
713  pos = fid;
714  success = true;
715  break; // we found an empty Bucket
716  }
717  }
718 
719  if (success == false) {
720  pos = _buckets.size();
721  // add an empty dummy Bucket
722  _buckets.push_back(detail);
723  }
724 
725  char filename[512];
726  snprintf(filename, 512, "%s/%" PRId32 ".bucket", _swappypath, pos);
727  file.open(filename, std::ios::out | std::ios::binary);
728  detail.minimum = it->first;
729  detail.bloomMask = std::vector<bool>(MASKLENGTH, false);
730  }
731 
732  // store data
733  file.write((char *) &(it->first), sizeof(TKEY));
734  file.write((char *) &(it->second.first), sizeof(TVALUE));
735 
736  // store vector?
737  Id length = it->second.second.size();
738  file.write((char *) &length, sizeof(Id));
739  for (Id j=0; j<length; ++j) {
740  file.write((char *) &(it->second.second[j]), sizeof(TKEY));
741  }
742 
743  // remember key in bloom mask
744  fp = getFingerprint(it->first);
745  for (auto b : fp) {
746  detail.bloomMask[b] = true;
747  }
748 
749  ++written;
750 
751  // close ----------------------------------
752  if ((written%(EACHFILE)) == 0) {
753  // store bucket
754  detail.fid = pos;
755  detail.maximum = it->first;
756  _buckets[pos] = detail;
757  file.close();
758  }
759  }
760  ramMinMax();
761  }
762 
766  void ramMinMax(void) {
767  auto result = std::minmax_element(std::execution::par, _ramList.begin(), _ramList.end(), [](auto lhs, auto rhs) {
768  return (lhs.first < rhs.first); // compare the keys
769  });
770  // .------------ the min
771  // v v------- the key of the unordered_map entry
772  _ramMinimum = result.first->first;
773  // v------------ the max
774  _ramMaximum = result.second->first;
775 
776  }
777 
778 // Each stuff ---------------------------------------------------------------------------------------------------
779 
780 public:
781 
790  bool each (Data & back, std::function<bool(TKEY, Data &)> foo) {
791  typename Ram::iterator it;
792  std::set<Fid> important;
793 
794  // run in RAM
796  for (it = _ramList.begin(); it != _ramList.end(); ++it) {
797  if (foo(it->first, it->second)) {
798  back = it->second;
799  return true;
800  }
801  }
802  // store all undeleted buckets, because they have the other items
803  std::for_each (_buckets.begin(), _buckets.end(), [&](Detail finfo) {
804  if ( finfo.minimum != finfo.maximum ) { // it is not deleted!
805  important.insert(finfo.fid);
806  }
807  });
808 
809  for (Fid fid : important) {
810  // 1.1) load into temp each bucket to get all keys of the file
811  Ram temp;
812  std::unordered_set<TKEY> keys;
813  Id length;
814  TKEY loadedKey;
815  TKEY loadedKey2;
816  TVALUE loadedValue;
817 
818  char filename[512];
819  snprintf(filename, 512, "%s/%" PRId32 ".bucket", _swappypath, fid);
820  std::ifstream file(filename, std::ios::in | std::ios::binary);
821 
822  for (Id c = 0; c < EACHFILE; ++c) {
823  std::vector<TKEY> vecdata(0);
824  file.read((char *) &loadedKey, sizeof(TKEY));
825  file.read((char *) &loadedValue, sizeof(TVALUE));
826  keys.insert(loadedKey);
827  // stored vector?
828  file.read((char *) &length, sizeof(Id));
829  if (length > 0) {
830  for (Id j=0; j<length; ++j) {
831  file.read((char *) &loadedKey2, sizeof(TKEY));
832  vecdata.push_back(loadedKey2);
833  }
834  }
835  temp[loadedKey] = std::make_pair(loadedValue, vecdata);
836  }
837  file.close();
838  // 1.2) store possible min/max for ram
839  TKEY omin = _buckets[fid].minimum;
840  TKEY omax = _buckets[fid].maximum;
841  // 2) mark bucket file as empty ---------------------
842  _buckets[fid].minimum = 0;
843  _buckets[fid].maximum = 0;
844  // 3.1) may swap .... ----------
845  maySwap(true);
846  // ... and load temp into ram ----------
847  for (auto key_val : temp) {
848  _ramList[key_val.first] = key_val.second;
849  _mru.push_back(Qentry{key_val.first,false});
850  }
851  // 3.2) update ram min/max
852  if (omax > _ramMaximum) _ramMaximum = omax;
853  if (omin < _ramMinimum) _ramMinimum = omin;
854 
855  // 4) run through in RAM with keys from temp
856  for (auto k : keys) {
857  if (foo(k, _ramList[k])) {
858  back = _ramList[k];
859  return true;
860  }
861  }
862  }
863 
864  return false;
865  }
866 
879  bool apply (Data & back, const TKEY & key, std::function<void(Data *)> foo) {
880 
881  try {
882  back = _ramList.at(key);
883  foo(&back);
884  _ramList[key] = back;
885  return true;
886 
887  } catch (const std::out_of_range & oor) {
888 
889  std::set<Fid> candidates;
890  std::mutex m;
891  Fingerprint fp = getFingerprint(key);
892 
893  std::for_each (std::execution::par, _buckets.begin(), _buckets.end(), [&](Detail finfo) {
894  if ( finfo.minimum != finfo.maximum ) { // it is not deleted!
895 
896  if ( (key < finfo.minimum) || (key > finfo.maximum) ) {
897  // key is smaller then the smallest or bigger than the biggest
898  m.lock();
899  ++statistic.rangeSaysNo;
900  m.unlock();
901  } else {
902  // check bloom (is it possible, that this key is in that file?)
903  bool success = true;
904  for (auto b : fp) {
905  if (finfo.bloomMask[b] == false) {
906  success = false;
907  m.lock();
908  ++statistic.bloomSaysNotIn;
909  m.unlock();
910  break;
911  }
912  }
913  if (success) {
914  m.lock();
915  candidates.insert(finfo.fid);
916  m.unlock();
917  }
918  }
919  }
920  });
921 
922  bool success = false;
923  Fid successFid = 0;
924  std::streampos startpos;
925  std::map<TKEY,Data> temp; // map is sorted by key!
926  char filename[512];
927 
928  for (auto fid : candidates) {
929 
930  Id length;
931  TKEY loadedKey;
932  TKEY loadedKey2;
933  TVALUE loadedValue;
934 
935  snprintf(filename, 512, "%s/%" PRId32 ".bucket", _swappypath, fid);
936  std::ifstream file(filename, std::ios::in | std::ios::binary);
937 
938  for (Id c = 0; c < EACHFILE; ++c) {
939  if(!success) startpos = file.tellg();
940  file.read((char *) &loadedKey, sizeof(TKEY));
941 
942  if (loadedKey == key) {
943  success = true;
944  successFid = fid;
945 
946  std::vector<TKEY> vecdata(0);
947  file.read((char *) &loadedValue, sizeof(TVALUE));
948  // stored vector?
949  file.read((char *) &length, sizeof(Id));
950  for (Id j=0; j<length; ++j) {
951  file.read((char *) &loadedKey2, sizeof(TKEY));
952  vecdata.push_back(loadedKey2);
953  }
954  // remember data and end position
955  back = std::make_pair(loadedValue, vecdata);
956  } else {
957  if (success) {
958  // store data from behind the relevant key-value pair
959  std::vector<TKEY> vecdata(0);
960  file.read((char *) &loadedValue, sizeof(TVALUE));
961  // stored vector?
962  file.read((char *) &length, sizeof(Id));
963  for (Id j=0; j<length; ++j) {
964  file.read((char *) &loadedKey2, sizeof(TKEY));
965  vecdata.push_back(loadedKey2);
966  }
967  // remember data
968  temp[loadedKey] = std::make_pair(loadedValue, vecdata);
969  } else {
970  // ignore
972  file.read((char *) &loadedValue, sizeof(TVALUE));
973  // stored vector?
974  file.read((char *) &length, sizeof(Id));
975  for (Id j=0; j<length; ++j) file.read((char *) &loadedKey2, sizeof(TKEY));
976  }
977  }
978  }
979  file.close();
980  // do not load other candidates, because we find the key!
981  if (success) break;
982  }
983 
984  if (success) {
985  // apply lambda function to the data with relevant key
986  foo(&back);
987  temp[key] = back;
988 
989  // reopen file to renew data
990  snprintf(filename, 512, "%s/%" PRId32 ".bucket", _swappypath, successFid);
991  //printf("%s\n", filename);
992  std::fstream file(filename, std::ios::in | std::ios::out | std::ios::binary);
993 
994  // got to place, where we have to renew
995  file.seekg(startpos);
996 
997  // overwrite data to the file
998  typename std::map<TKEY,Data>::iterator it;
999 
1000  for (it=temp.begin(); it!=temp.end(); ++it) {
1001  // store data
1002  file.write((char *) &(it->first), sizeof(TKEY));
1003  file.write((char *) &(it->second.first), sizeof(TVALUE));
1004 
1005  // store vector?
1006  Id le = it->second.second.size();
1007  file.write((char *) &le, sizeof(Id));
1008  for (Id j=0; j<le; ++j) {
1009  file.write((char *) &(it->second.second[j]), sizeof(TKEY));
1010  }
1011  }
1012  file.close();
1013 
1014  ++statistic.fileLoads;
1015  return true;
1016  } else {
1017  if (candidates.size() > 0) {
1018  // many candidates, but finaly not found in file
1019  ++statistic.rangeFails;
1020  }
1021  return false;
1022  }
1023  }
1024  }
1025 
1026 
1027 };
1028 
1029 #endif
Definition: SwappyItems.hpp:49
void maySwap(bool reload=false)
Definition: SwappyItems.hpp:662
bool wakeup(void)
Definition: SwappyItems.hpp:368
std::mutex mu_FsearchSuccess
Definition: SwappyItems.hpp:113
bool fsearchSuccess
Definition: SwappyItems.hpp:114
void loadFromFile(Fid fid, TKEY key)
Definition: SwappyItems.hpp:591
TKEY _ramMinimum
Definition: SwappyItems.hpp:100
TKEY _ramMaximum
Definition: SwappyItems.hpp:101
uint32_t Bid
Definition: SwappyItems.hpp:91
Order _mru
Definition: SwappyItems.hpp:104
SwappyItems(int swappyId)
Definition: SwappyItems.hpp:302
Ram _ramList
Definition: SwappyItems.hpp:99
void ramMinMax(void)
Definition: SwappyItems.hpp:766
std::set< Bid > Fingerprint
Definition: SwappyItems.hpp:92
std::vector< Detail > Buckets
Definition: SwappyItems.hpp:95
std::deque< Qentry > Order
Definition: SwappyItems.hpp:96
std::unordered_map< TKEY, Data > Ram
Definition: SwappyItems.hpp:94
std::pair< TVALUE, std::vector< TKEY > > Data
Definition: SwappyItems.hpp:52
uint32_t Fid
Definition: SwappyItems.hpp:77
bool apply(Data &back, const TKEY &key, std::function< void(Data *)> foo)
Definition: SwappyItems.hpp:879
Fingerprint getFingerprint(const TKEY &key)
Definition: SwappyItems.hpp:496
void hibernate(void)
Definition: SwappyItems.hpp:441
Buckets _buckets
Definition: SwappyItems.hpp:107
bool set(TKEY key, TVALUE value, std::vector< TKEY > refs)
Definition: SwappyItems.hpp:155
TKEY min(void)
Definition: SwappyItems.hpp:256
Data * get(const TKEY &key)
Definition: SwappyItems.hpp:188
bool loadFromFiles(const TKEY &key)
Definition: SwappyItems.hpp:524
bool each(Data &back, std::function< bool(TKEY, Data &)> foo)
Definition: SwappyItems.hpp:790
bool load(const TKEY &key)
Definition: SwappyItems.hpp:510
bool del(const TKEY &key)
Definition: SwappyItems.hpp:211
~SwappyItems(void)
Definition: SwappyItems.hpp:357
bool set(TKEY key, TVALUE value)
Definition: SwappyItems.hpp:123
uint32_t Id
Definition: SwappyItems.hpp:76
Color c
Definition: osm2graph_bitmap.cpp:123
double pos
Definition: osm2graph_bitmap.cpp:125
Definition: SwappyItems.hpp:79
Fid fid
Definition: SwappyItems.hpp:83
std::vector< bool > bloomMask
Definition: SwappyItems.hpp:82
TKEY maximum
Definition: SwappyItems.hpp:81
TKEY minimum
Definition: SwappyItems.hpp:80
Definition: SwappyItems.hpp:86
bool deleted
Definition: SwappyItems.hpp:88
TKEY key
Definition: SwappyItems.hpp:87
Definition: SwappyItems.hpp:54
TKEY minKey
Definition: SwappyItems.hpp:72
TKEY maxKey
Definition: SwappyItems.hpp:71