XRootD
Loading...
Searching...
No Matches
XrdPfc.hh
Go to the documentation of this file.
1#ifndef __XRDPFC_CACHE_HH__
2#define __XRDPFC_CACHE_HH__
3//----------------------------------------------------------------------------------
4// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
5// Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
6//----------------------------------------------------------------------------------
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY emacs WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//----------------------------------------------------------------------------------
20#include <string>
21#include <list>
22#include <map>
23#include <set>
24
25#include "Xrd/XrdScheduler.hh"
26#include "XrdVersion.hh"
28#include "XrdOuc/XrdOucCache.hh"
31
32#include "XrdPfcFile.hh"
33#include "XrdPfcDecision.hh"
34
35class XrdOucStream;
36class XrdSysError;
37class XrdSysTrace;
39
40namespace XrdPfc
41{
42class File;
43class IO;
44
45class DataFsState;
46}
47
48
49namespace XrdPfc
50{
51
52//----------------------------------------------------------------------------
54//----------------------------------------------------------------------------
56{
58
59 bool are_file_usage_limits_set() const { return m_fileUsageMax > 0; }
61 bool is_uvkeep_purge_in_effect() const { return m_cs_UVKeep >= 0; }
62 bool is_dir_stat_reporting_on() const { return m_dirStatsMaxDepth >= 0 || ! m_dirStatsDirs.empty() || ! m_dirStatsDirGlobs.empty(); }
63 bool is_purge_plugin_set_up() const { return false; }
64
65 void calculate_fractional_usages(long long du, long long fu, double &frac_du, double &frac_fu);
66
68
69 bool is_cschk_cache() const { return m_cs_Chk & CSChk_Cache; }
70 bool is_cschk_net() const { return m_cs_Chk & CSChk_Net; }
71 bool is_cschk_any() const { return m_cs_Chk & CSChk_Both; }
72 bool is_cschk_both() const { return (m_cs_Chk & CSChk_Both) == CSChk_Both; }
73
74 bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const { return m_cs_Chk & ~cks_on_file; }
75
76 bool should_uvkeep_purge(time_t delta) const { return m_cs_UVKeep >= 0 && delta > m_cs_UVKeep; }
77
80
81 std::string m_username;
82 std::string m_data_space;
83 std::string m_meta_space;
84
85 long long m_diskTotalSpace;
86 long long m_diskUsageLWM;
87 long long m_diskUsageHWM;
90 long long m_fileUsageMax;
95
96 std::set<std::string> m_dirStatsDirs;
97 std::set<std::string> m_dirStatsDirGlobs;
100
101 long long m_bufferSize;
107
108 long long m_hdfsbsize;
109 long long m_flushCnt;
110
111 time_t m_cs_UVKeep;
114
117};
118
119//------------------------------------------------------------------------------
120
122{
123 std::string m_diskUsageLWM;
124 std::string m_diskUsageHWM;
127 std::string m_fileUsageMax;
128 std::string m_flushRaw;
129
131 m_diskUsageLWM("0.90"), m_diskUsageHWM("0.95"),
132 m_flushRaw("")
133 {}
134};
135
136//==============================================================================
137
139{
140 char *f_str;
141 const char *f_delim;
142 char *f_state;
144
145 SplitParser(const std::string &s, const char *d) :
146 f_str(strdup(s.c_str())), f_delim(d), f_state(0), f_first(true)
147 {}
148 ~SplitParser() { free(f_str); }
149
150 char* get_token()
151 {
152 if (f_first) { f_first = false; return strtok_r(f_str, f_delim, &f_state); }
153 else { return strtok_r(0, f_delim, &f_state); }
154 }
155
157 {
158 if (f_first) { return f_str; }
159 else { *(f_state - 1) = f_delim[0]; return f_state - 1; }
160 }
161
163 {
164 return f_first ? f_str : f_state;
165 }
166
167 int fill_argv(std::vector<char*> &argv)
168 {
169 if (!f_first) return 0;
170 int dcnt = 0; { char *p = f_str; while (*p) { if (*(p++) == f_delim[0]) ++dcnt; } }
171 argv.reserve(dcnt + 1);
172 int argc = 0;
173 char *i = strtok_r(f_str, f_delim, &f_state);
174 while (i)
175 {
176 ++argc;
177 argv.push_back(i);
178 // printf(" arg %d : '%s'\n", argc, i);
179 i = strtok_r(0, f_delim, &f_state);
180 }
181 return argc;
182 }
183};
184
186{
187 std::vector<const char*> m_dirs;
188 const char *m_reminder;
190
191 PathTokenizer(const std::string &path, int max_depth, bool parse_as_lfn) :
192 SplitParser(path, "/"),
193 m_reminder (0),
194 m_n_dirs (0)
195 {
196 // If parse_as_lfn is true store final token into m_reminder, regardless of maxdepth.
197 // This assumes the last token is a file name (and full path is lfn, including the file name).
198
199 m_dirs.reserve(max_depth);
200
201 char *t = 0;
202 for (int i = 0; i < max_depth; ++i)
203 {
204 t = get_token();
205 if (t == 0) break;
206 m_dirs.emplace_back(t);
207 }
208 if (parse_as_lfn && *get_reminder() == 0 && ! m_dirs.empty())
209 {
210 m_reminder = m_dirs.back();
211 m_dirs.pop_back();
212 }
213 else
214 {
215 m_reminder = get_reminder();
216 }
217 m_n_dirs = (int) m_dirs.size();
218 }
219
221 {
222 return m_n_dirs;
223 }
224
225 const char *get_dir(int pos)
226 {
227 if (pos >= m_n_dirs) return 0;
228 return m_dirs[pos];
229 }
230
231 std::string make_path()
232 {
233 std::string res;
234 for (std::vector<const char*>::iterator i = m_dirs.begin(); i != m_dirs.end(); ++i)
235 {
236 res += "/";
237 res += *i;
238 }
239 if (m_reminder != 0)
240 {
241 res += "/";
242 res += m_reminder;
243 }
244 return res;
245 }
246
247 void deboog()
248 {
249 printf("PathTokenizer::deboog size=%d\n", m_n_dirs);
250 for (int i = 0; i < m_n_dirs; ++i)
251 {
252 printf(" %2d: %s\n", i, m_dirs[i]);
253 }
254 printf(" rem: %s\n", m_reminder);
255 }
256};
257
258
259//==============================================================================
260// Cache
261//==============================================================================
262
263//----------------------------------------------------------------------------
265//----------------------------------------------------------------------------
266class Cache : public XrdOucCache
267{
268public:
269 //---------------------------------------------------------------------
271 //---------------------------------------------------------------------
272 Cache(XrdSysLogger *logger, XrdOucEnv *env);
273
274 //---------------------------------------------------------------------
276 //---------------------------------------------------------------------
278
279 virtual XrdOucCacheIO *Attach(XrdOucCacheIO *, int Options = 0);
280
281 //---------------------------------------------------------------------
282 // Virtual function of XrdOucCache. Used for redirection to a local
283 // file on a distributed FS.
284 virtual int LocalFilePath(const char *url, char *buff=0, int blen=0,
285 LFP_Reason why=ForAccess, bool forall=false);
286
287 //---------------------------------------------------------------------
288 // Virtual function of XrdOucCache. Used for deferred open.
289 virtual int Prepare(const char *url, int oflags, mode_t mode);
290
291 // virtual function of XrdOucCache.
292 virtual int Stat(const char *url, struct stat &sbuff);
293
294 // virtual function of XrdOucCache.
295 virtual int Unlink(const char *url);
296
297 //---------------------------------------------------------------------
298 // Used by PfcFstcl::Fsctl function.
299 // Test if file is cached taking in onlyifcached configuration parameters.
300 //---------------------------------------------------------------------
301 virtual int ConsiderCached(const char *url);
302
303 bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk);
304 void WriteFileSizeXAttr(int cinfo_fd, long long file_size);
305 long long DetermineFullFileSize(const std::string &cinfo_fname);
306
307 //--------------------------------------------------------------------
313 //--------------------------------------------------------------------
314 bool Decide(XrdOucCacheIO*);
315
316 //------------------------------------------------------------------------
318 //------------------------------------------------------------------------
319 const Configuration& RefConfiguration() const { return m_configuration; }
320
321 //---------------------------------------------------------------------
328 //---------------------------------------------------------------------
329 bool Config(const char *config_filename, const char *parameters);
330
331 //---------------------------------------------------------------------
333 //---------------------------------------------------------------------
334 static Cache &CreateInstance(XrdSysLogger *logger, XrdOucEnv *env);
335
336 //---------------------------------------------------------------------
338 //---------------------------------------------------------------------
339 static Cache &GetInstance();
340 static const Cache &TheOne();
341 static const Configuration &Conf();
342
343 //---------------------------------------------------------------------
345 //---------------------------------------------------------------------
346 static bool VCheck(XrdVersionInfo &urVersion) { return true; }
347
348 //---------------------------------------------------------------------
350 //---------------------------------------------------------------------
352
353 //---------------------------------------------------------------------
355 //---------------------------------------------------------------------
356 void Purge();
357
358 //---------------------------------------------------------------------
360 //---------------------------------------------------------------------
361 int UnlinkFile(const std::string& f_name, bool fail_if_open);
362
363 //---------------------------------------------------------------------
365 //---------------------------------------------------------------------
366 void AddWriteTask(Block* b, bool from_read);
367
368 //---------------------------------------------------------------------
371 //---------------------------------------------------------------------
373
374 //---------------------------------------------------------------------
376 //---------------------------------------------------------------------
377 void ProcessWriteTasks();
378
379 char* RequestRAM(long long size);
380 void ReleaseRAM(char* buf, long long size);
381
384
386
387 void Prefetch();
388
389 XrdOss* GetOss() const { return m_oss; }
390
391 bool IsFileActiveOrPurgeProtected(const std::string&);
392
393 File* GetFile(const std::string&, IO*, long long off = 0, long long filesize = 0);
394
395 void ReleaseFile(File*, IO*);
396
397 void ScheduleFileSync(File* f) { schedule_file_sync(f, false, false); }
398
399 void FileSyncDone(File*, bool high_debug);
400
401 XrdSysError* GetLog() { return &m_log; }
402 XrdSysTrace* GetTrace() { return m_trace; }
403
404 XrdXrootdGStream* GetGStream() { return m_gstream; }
405
406 void ExecuteCommandUrl(const std::string& command_url);
407
409
410private:
411 bool ConfigParameters(std::string, XrdOucStream&, TmpConfiguration &tmpc);
412 bool ConfigXeq(char *, XrdOucStream &);
413 bool xcschk(XrdOucStream &);
414 bool xdlib(XrdOucStream &);
415 bool xtrace(XrdOucStream &);
416 bool test_oss_basics_and_features();
417
418 bool cfg2bytes(const std::string &str, long long &store, long long totalSpace, const char *name);
419
420 static Cache *m_instance;
421
422 XrdOucEnv *m_env;
423 XrdSysError m_log;
424 XrdSysTrace *m_trace;
425 const char *m_traceID;
426
427 XrdOss *m_oss;
428
429 XrdXrootdGStream *m_gstream;
430
431 std::vector<XrdPfc::Decision*> m_decisionpoints;
432
433 Configuration m_configuration;
434
435 XrdSysCondVar m_prefetch_condVar;
436 bool m_prefetch_enabled;
437
438 XrdSysMutex m_RAM_mutex;
439 long long m_RAM_used;
440 long long m_RAM_write_queue;
441 std::list<char*> m_RAM_std_blocks;
442 int m_RAM_std_size;
443
444 bool m_isClient;
445 bool m_dataXattr = false;
446 bool m_metaXattr = false;
447
448 struct WriteQ
449 {
450 WriteQ() : condVar(0), writes_between_purges(0), size(0) {}
451
452 XrdSysCondVar condVar;
453 std::list<Block*> queue;
454 long long writes_between_purges;
455 int size;
456 };
457
458 WriteQ m_writeQ;
459
460 // active map, purge delay set
461 typedef std::map<std::string, File*> ActiveMap_t;
462 typedef ActiveMap_t::iterator ActiveMap_i;
463 typedef std::multimap<std::string, XrdPfc::Stats> StatsMMap_t;
464 typedef StatsMMap_t::iterator StatsMMap_i;
465 typedef std::set<std::string> FNameSet_t;
466
467 ActiveMap_t m_active;
468 StatsMMap_t m_closed_files_stats;
469 FNameSet_t m_purge_delay_set;
470 bool m_in_purge;
471 XrdSysCondVar m_active_cond;
472
473 void inc_ref_cnt(File*, bool lock, bool high_debug);
474 void dec_ref_cnt(File*, bool high_debug);
475
476 void schedule_file_sync(File*, bool ref_cnt_already_set, bool high_debug);
477
478 // prefetching
479 typedef std::vector<File*> PrefetchList;
480 PrefetchList m_prefetchList;
481
482 //---------------------------------------------------------------------------
483 // Statistics, heart-beat, scan-and-purge
484
485 enum ScanAndPurgeThreadState_e { SPTS_Idle, SPTS_Scan, SPTS_Purge, SPTS_Done };
486
487 XrdSysCondVar m_stats_n_purge_cond;
488
489 DataFsState *m_fs_state;
490
491 int m_last_scan_duration;
492 int m_last_purge_duration;
493 ScanAndPurgeThreadState_e m_spt_state;
494
495 void copy_out_active_stats_and_update_data_fs_state();
496};
497
498}
499
500#endif
struct stat Stat
Definition XrdCks.cc:49
#define stat(a, b)
Definition XrdPosix.hh:96
XrdOucString File
virtual XrdOucCacheIO * Attach(XrdOucCacheIO *ioP, int opts=0)=0
XrdOucCache(const char *ctype)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:267
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition XrdPfc.cc:928
void FileSyncDone(File *, bool high_debug)
Definition XrdPfc.cc:568
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition XrdPfc.cc:415
static const Configuration & Conf()
Definition XrdPfc.cc:165
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
virtual int LocalFilePath(const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
Definition XrdPfc.cc:796
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:319
XrdSysTrace * GetTrace()
Definition XrdPfc.hh:402
void Purge()
Thread function invoked to scan and purge files from disk when needed.
void ReleaseRAM(char *buf, long long size)
Definition XrdPfc.cc:397
virtual int ConsiderCached(const char *url)
Definition XrdPfc.cc:1002
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:163
void ResourceMonitorHeartBeat()
Thread function checking resource usage periodically.
void DeRegisterPrefetchFile(File *)
Definition XrdPfc.cc:713
void ExecuteCommandUrl(const std::string &command_url)
void RegisterPrefetchFile(File *)
Definition XrdPfc.cc:697
void WriteFileSizeXAttr(int cinfo_fd, long long file_size)
Definition XrdPfc.cc:913
void Prefetch()
Definition XrdPfc.cc:754
void ReleaseFile(File *, IO *)
Definition XrdPfc.cc:497
void AddWriteTask(Block *b, bool from_read)
Add downloaded block in write queue.
Definition XrdPfc.cc:256
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition XrdPfc.cc:188
XrdXrootdGStream * GetGStream()
Definition XrdPfc.hh:404
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition XrdPfc.cc:167
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1179
static XrdScheduler * schedP
Definition XrdPfc.hh:408
bool IsFileActiveOrPurgeProtected(const std::string &)
Definition XrdPfc.cc:684
File * GetNextFileToPrefetch()
Definition XrdPfc.cc:735
void ProcessWriteTasks()
Separate task which writes blocks from ram to disk.
Definition XrdPfc.cc:308
virtual int Unlink(const char *url)
Definition XrdPfc.cc:1169
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition XrdPfc.cc:275
virtual XrdOucCacheIO * Attach(XrdOucCacheIO *, int Options=0)
Definition XrdPfc.cc:214
static const Cache & TheOne()
Definition XrdPfc.cc:164
XrdOss * GetOss() const
Definition XrdPfc.hh:389
static bool VCheck(XrdVersionInfo &urVersion)
Version check.
Definition XrdPfc.hh:346
char * RequestRAM(long long size)
Definition XrdPfc.cc:357
virtual int Prepare(const char *url, int oflags, mode_t mode)
Definition XrdPfc.cc:1062
bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
Definition XrdPfc.cc:969
static Cache & CreateInstance(XrdSysLogger *logger, XrdOucEnv *env)
Singleton creation.
Definition XrdPfc.cc:156
XrdSysError * GetLog()
Definition XrdPfc.hh:401
void ScheduleFileSync(File *f)
Definition XrdPfc.hh:397
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:18
@ CSChk_Cache
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:56
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition XrdPfc.hh:108
long long m_RamAbsAvailable
available from configuration
Definition XrdPfc.hh:102
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition XrdPfc.hh:109
int m_accHistorySize
max number of entries in access history part of cinfo file
Definition XrdPfc.hh:94
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition XrdPfc.hh:74
int m_wqueue_threads
number of threads writing blocks to disk
Definition XrdPfc.hh:105
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition XrdPfc.hh:85
long long m_fileUsageMax
cache purge - files usage maximum
Definition XrdPfc.hh:90
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition XrdPfc.hh:88
int m_dirStatsStoreDepth
depth to which statistics should be collected
Definition XrdPfc.hh:99
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition XrdPfc.hh:79
bool is_purge_plugin_set_up() const
Definition XrdPfc.hh:63
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition XrdPfc.hh:87
bool is_cschk_cache() const
Definition XrdPfc.hh:69
std::set< std::string > m_dirStatsDirGlobs
directory globs for which stat reporting was requested
Definition XrdPfc.hh:97
CkSumCheck_e get_cs_Chk() const
Definition XrdPfc.hh:67
bool is_uvkeep_purge_in_effect() const
Definition XrdPfc.hh:61
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition XrdPfc.hh:106
bool are_file_usage_limits_set() const
Definition XrdPfc.hh:59
bool is_cschk_any() const
Definition XrdPfc.hh:71
bool m_cs_ChkTLS
Allow TLS.
Definition XrdPfc.hh:113
long long m_fileUsageNominal
cache purge - files usage nominal
Definition XrdPfc.hh:89
int m_cs_Chk
Checksum check.
Definition XrdPfc.hh:112
bool should_uvkeep_purge(time_t delta) const
Definition XrdPfc.hh:76
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
Definition XrdPfc.hh:93
bool m_hdfsmode
flag for enabling block-level operation
Definition XrdPfc.hh:78
int m_purgeColdFilesAge
purge files older than this age
Definition XrdPfc.hh:92
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:82
bool is_cschk_both() const
Definition XrdPfc.hh:72
void calculate_fractional_usages(long long du, long long fu, double &frac_du, double &frac_fu)
Definition XrdPfc.cc:135
std::set< std::string > m_dirStatsDirs
directories for which stat reporting was requested
Definition XrdPfc.hh:96
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition XrdPfc.hh:86
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition XrdPfc.hh:103
long long m_bufferSize
prefetch buffer size, default 1MB
Definition XrdPfc.hh:101
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:83
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition XrdPfc.hh:104
bool is_age_based_purge_in_effect() const
Definition XrdPfc.hh:60
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:81
bool is_cschk_net() const
Definition XrdPfc.hh:70
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition XrdPfc.hh:116
time_t m_cs_UVKeep
unverified checksum cache keep
Definition XrdPfc.hh:111
int m_dirStatsMaxDepth
maximum depth for statistics write out
Definition XrdPfc.hh:98
int m_purgeInterval
sleep interval between cache purges
Definition XrdPfc.hh:91
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition XrdPfc.hh:115
bool is_dir_stat_reporting_on() const
Definition XrdPfc.hh:62
const char * get_dir(int pos)
Definition XrdPfc.hh:225
std::string make_path()
Definition XrdPfc.hh:231
const char * m_reminder
Definition XrdPfc.hh:188
PathTokenizer(const std::string &path, int max_depth, bool parse_as_lfn)
Definition XrdPfc.hh:191
std::vector< const char * > m_dirs
Definition XrdPfc.hh:187
SplitParser(const std::string &s, const char *d)
Definition XrdPfc.hh:145
char * get_reminder_with_delim()
Definition XrdPfc.hh:156
const char * f_delim
Definition XrdPfc.hh:141
char * get_token()
Definition XrdPfc.hh:150
char * get_reminder()
Definition XrdPfc.hh:162
int fill_argv(std::vector< char * > &argv)
Definition XrdPfc.hh:167
std::string m_diskUsageLWM
Definition XrdPfc.hh:123
std::string m_diskUsageHWM
Definition XrdPfc.hh:124
std::string m_fileUsageBaseline
Definition XrdPfc.hh:125
std::string m_fileUsageNominal
Definition XrdPfc.hh:126
std::string m_flushRaw
Definition XrdPfc.hh:128
std::string m_fileUsageMax
Definition XrdPfc.hh:127