XRootD
Loading...
Searching...
No Matches
XrdPfc::Cache Class Reference

Attaches/creates and detaches/deletes cache-io objects for disk based cache. More...

#include <XrdPfc.hh>

Inheritance diagram for XrdPfc::Cache:
Collaboration diagram for XrdPfc::Cache:

Public Member Functions

 Cache (XrdSysLogger *logger, XrdOucEnv *env)
 Constructor.
void AddWriteTask (Block *b, bool from_read)
 Add downloaded block in write queue.
virtual XrdOucCacheIOAttach (XrdOucCacheIO *, int Options=0)
bool Config (const char *config_filename, const char *parameters)
 Parse configuration file.
virtual int ConsiderCached (const char *url)
bool Decide (XrdOucCacheIO *)
 Makes decision if the original XrdOucCacheIO should be cached.
bool DecideIfConsideredCached (long long file_size, long long bytes_on_disk)
void DeRegisterPrefetchFile (File *)
long long DetermineFullFileSize (const std::string &cinfo_fname)
void ExecuteCommandUrl (const std::string &command_url)
void FileSyncDone (File *, bool high_debug)
FileGetFile (const std::string &, IO *, long long off=0, long long filesize=0)
XrdXrootdGStreamGetGStream ()
XrdSysErrorGetLog ()
FileGetNextFileToPrefetch ()
XrdOssGetOss () const
XrdSysTraceGetTrace ()
bool IsFileActiveOrPurgeProtected (const std::string &)
virtual int LocalFilePath (const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
void Prefetch ()
virtual int Prepare (const char *url, int oflags, mode_t mode)
void ProcessWriteTasks ()
 Separate task which writes blocks from ram to disk.
void Purge ()
 Thread function invoked to scan and purge files from disk when needed.
const ConfigurationRefConfiguration () const
 Reference XrdPfc configuration.
void RegisterPrefetchFile (File *)
void ReleaseFile (File *, IO *)
void ReleaseRAM (char *buf, long long size)
void RemoveWriteQEntriesFor (File *f)
 Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.
char * RequestRAM (long long size)
void ResourceMonitorHeartBeat ()
 Thread function checking resource usage periodically.
void ScheduleFileSync (File *f)
virtual int Stat (const char *url, struct stat &sbuff)
virtual int Unlink (const char *url)
int UnlinkFile (const std::string &f_name, bool fail_if_open)
 Remove cinfo and data files from cache.
void WriteFileSizeXAttr (int cinfo_fd, long long file_size)
Public Member Functions inherited from XrdOucCache
 XrdOucCache (const char *ctype)
virtual ~XrdOucCache ()
 Destructor.
virtual int Rename (const char *oldp, const char *newp)
virtual int Rmdir (const char *dirp)
virtual int Truncate (const char *path, off_t size)
virtual int Xeq (XeqCmd cmd, char *arg, int arglen)

Static Public Member Functions

static const ConfigurationConf ()
static CacheCreateInstance (XrdSysLogger *logger, XrdOucEnv *env)
 Singleton creation.
static CacheGetInstance ()
 Singleton access.
static const CacheTheOne ()
static bool VCheck (XrdVersionInfo &urVersion)
 Version check.

Static Public Attributes

static XrdSchedulerschedP = 0
Static Public Attributes inherited from XrdOucCache
static const int optFIS = 0x0001
 File is structured (e.g. root file)
static const int optNEW = 0x0014
 File is new -> optRW (o/w read or write)
static const int optRW = 0x0004
 File is read/write (o/w read/only)
static const int optWIN = 0x0024
 File is new -> optRW use write-in cache.

Additional Inherited Members

Public Types inherited from XrdOucCache
enum  LFP_Reason {
  ForAccess =0 ,
  ForInfo ,
  ForPath
}
enum  XeqCmd { xeqNoop = 0 }
Public Attributes inherited from XrdOucCache
const char CacheType [8]
 A 1-to-7 character cache type identifier (usually pfc or rmc).
XrdOucCacheStats Statistics

Detailed Description

Attaches/creates and detaches/deletes cache-io objects for disk based cache.

Definition at line 266 of file XrdPfc.hh.

Constructor & Destructor Documentation

◆ Cache()

Cache::Cache ( XrdSysLogger * logger,
XrdOucEnv * env )

Constructor.

Definition at line 188 of file XrdPfc.cc.

188 :
189 XrdOucCache("pfc"),
190 m_env(env),
191 m_log(logger, "XrdPfc_"),
192 m_trace(new XrdSysTrace("XrdPfc", logger)),
193 m_traceID("Cache"),
194 m_oss(0),
195 m_gstream(0),
196 m_prefetch_condVar(0),
197 m_prefetch_enabled(false),
198 m_RAM_used(0),
199 m_RAM_write_queue(0),
200 m_RAM_std_size(0),
201 m_isClient(false),
202 m_in_purge(false),
203 m_active_cond(0),
204 m_stats_n_purge_cond(0),
205 m_fs_state(0),
206 m_last_scan_duration(0),
207 m_last_purge_duration(0),
208 m_spt_state(SPTS_Idle)
209{
210 // Default log level is Warning.
211 m_trace->What = 2;
212}
XrdOucCache(const char *ctype)

References XrdOucCache::XrdOucCache().

Referenced by CreateInstance(), GetInstance(), and TheOne().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Function Documentation

◆ AddWriteTask()

void Cache::AddWriteTask ( Block * b,
bool from_read )

Add downloaded block in write queue.

Definition at line 256 of file XrdPfc.cc.

257{
258 TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
259
260 {
261 XrdSysMutexHelper lock(&m_RAM_mutex);
262 m_RAM_write_queue += b->get_size();
263 }
264
265 m_writeQ.condVar.Lock();
266 if (fromRead)
267 m_writeQ.queue.push_back(b);
268 else
269 m_writeQ.queue.push_front(b);
270 m_writeQ.size++;
271 m_writeQ.condVar.Signal();
272 m_writeQ.condVar.UnLock();
273}
#define TRACE(act, x)
Definition XrdTrace.hh:63
int get_size() const
long long m_offset
File * get_file() const
std::string & GetLocalPath()

References XrdPfc::Block::get_file(), XrdPfc::Block::get_size(), XrdPfc::File::GetLocalPath(), XrdPfc::Block::m_offset, and TRACE.

Here is the call graph for this function:

◆ Attach()

XrdOucCacheIO * Cache::Attach ( XrdOucCacheIO * io,
int Options = 0 )
virtual

Implements XrdOucCache.

Definition at line 214 of file XrdPfc.cc.

215{
216 const char* tpfx = "Attach() ";
217
218 if (Cache::GetInstance().Decide(io))
219 {
220 TRACE(Info, tpfx << obfuscateAuth(io->Path()));
221
222 IO *cio;
223
224 if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
225 {
226 cio = new IOFileBlock(io, *this);
227 }
228 else
229 {
230 IOFile *iof = new IOFile(io, *this);
231
232 if ( ! iof->HasFile())
233 {
234 delete iof;
235 // TODO - redirect instead. But this is kind of an awkward place for it.
236 // errno is set during IOFile construction.
237 TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
238 return io;
239 }
240
241 cio = iof;
242 }
243
244 TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
245 ((loc && loc[0] != 0) ? loc : "<deferred open>"));
246
247 return cio;
248 }
249 else
250 {
251 TRACE(Info, tpfx << "decision decline " << io->Path());
252 }
253 return io;
254}
std::string obfuscateAuth(const std::string &input)
#define TRACE_PC(act, pre_code, x)
bool Debug
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:319
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:163
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition XrdPfc.cc:167
bool HasFile() const
Check if File was opened successfully.

References Debug, Decide(), Error, GetInstance(), XrdPfc::IOFile::HasFile(), XrdOucCacheIO::Location(), obfuscateAuth(), XrdOucCacheIO::Path(), RefConfiguration(), TRACE, and TRACE_PC.

Here is the call graph for this function:

◆ Conf()

const Configuration & Cache::Conf ( )
static

Definition at line 165 of file XrdPfc.cc.

165{ return m_instance->RefConfiguration(); }

Referenced by XrdPfc::FPurgeState::CheckFile().

Here is the caller graph for this function:

◆ Config()

bool Cache::Config ( const char * config_filename,
const char * parameters )

Parse configuration file.

Parameters
config_filenamepath to configuration file
parametersoptional parameters to be passed
Returns
parse status

Definition at line 351 of file XrdPfcConfiguration.cc.

352{
353 // Indicate whether or not we are a client instance
354 const char *theINS = getenv("XRDINSTANCE");
355 m_isClient = (theINS != 0 && strncmp("*client ", theINS, 8) == 0);
356
357 // Tell everyone else we are a caching proxy
358 XrdOucEnv::Export("XRDPFC", 1);
359
360 XrdOucEnv myEnv;
361 XrdOucStream Config(&m_log, theINS, &myEnv, "=====> ");
362
363 if (! config_filename || ! *config_filename)
364 {
365 TRACE(Error, "Config() configuration file not specified.");
366 return false;
367 }
368
369 int fd;
370 if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
371 {
372 TRACE( Error, "Config() can't open configuration file " << config_filename);
373 return false;
374 }
375
376 Config.Attach(fd);
377 static const char *cvec[] = { "*** pfc plugin config:", 0 };
378 Config.Capture(cvec);
379
380 // Obtain OFS configurator for OSS plugin.
381 XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
382 &XrdVERSIONINFOVAR(XrdOucGetCache));
383 if (! ofsCfg) return false;
384
385 TmpConfiguration tmpc;
386
387 // Adjust default parameters for client/serverless caching
388 if (m_isClient)
389 {
390 m_configuration.m_bufferSize = 128 * 1024; // same as normal.
391 m_configuration.m_wqueue_blocks = 8;
392 m_configuration.m_wqueue_threads = 1;
393 }
394
395 // If network checksum processing is the default, indicate so.
396 if (m_configuration.is_cschk_net()) m_env->Put("psx.CSNet", m_configuration.m_cs_ChkTLS ? "2" : "1");
397
398 // Actual parsing of the config file.
399 bool retval = true, aOK = true;
400 char *var;
401 while ((var = Config.GetMyFirstWord()))
402 {
403 if (! strcmp(var,"pfc.osslib"))
404 {
405 retval = ofsCfg->Parse(XrdOfsConfigPI::theOssLib);
406 }
407 else if (! strcmp(var,"pfc.cschk"))
408 {
409 retval = xcschk(Config);
410 }
411 else if (! strcmp(var,"pfc.decisionlib"))
412 {
413 retval = xdlib(Config);
414 }
415 else if (! strcmp(var,"pfc.trace"))
416 {
417 retval = xtrace(Config);
418 }
419 else if (! strcmp(var,"pfc.allow_xrdpfc_command"))
420 {
421 m_configuration.m_allow_xrdpfc_command = true;
422 }
423 else if (! strncmp(var,"pfc.", 4))
424 {
425 retval = ConfigParameters(std::string(var+4), Config, tmpc);
426 }
427
428 if ( ! retval)
429 {
430 TRACE(Error, "Config() error in parsing");
431 aOK = false;
432 }
433 }
434
435 Config.Close();
436
437 // Load OSS plugin.
438 myEnv.Put("oss.runmode", "pfc");
439 if (m_configuration.is_cschk_cache())
440 {
441 char csi_conf[128];
442 if (snprintf(csi_conf, 128, "space=%s nofill", m_configuration.m_meta_space.c_str()) < 128)
443 {
444 ofsCfg->Push(XrdOfsConfigPI::theOssLib, "libXrdOssCsi.so", csi_conf);
445 } else {
446 TRACE(Error, "Config() buffer too small for libXrdOssCsi params.");
447 return false;
448 }
449 }
450 if (ofsCfg->Load(XrdOfsConfigPI::theOssLib, &myEnv))
451 {
452 ofsCfg->Plugin(m_oss);
453 }
454 else
455 {
456 TRACE(Error, "Config() Unable to create an OSS object");
457 return false;
458 }
459
460 // Test if OSS is operational, determine optional features.
461 aOK &= test_oss_basics_and_features();
462
463 // sets default value for disk usage
464 XrdOssVSInfo sP;
465 {
466 if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
467 {
468 m_log.Emsg("ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
469 return false;
470 }
471 if (sP.Total < 10ll << 20)
472 {
473 m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
474 m_configuration.m_data_space.c_str());
475 return false;
476 }
477
478 m_configuration.m_diskTotalSpace = sP.Total;
479
480 if (cfg2bytes(tmpc.m_diskUsageLWM, m_configuration.m_diskUsageLWM, sP.Total, "lowWatermark") &&
481 cfg2bytes(tmpc.m_diskUsageHWM, m_configuration.m_diskUsageHWM, sP.Total, "highWatermark"))
482 {
483 if (m_configuration.m_diskUsageLWM >= m_configuration.m_diskUsageHWM) {
484 m_log.Emsg("ConfigParameters()", "pfc.diskusage should have lowWatermark < highWatermark.");
485 aOK = false;
486 }
487 }
488 else aOK = false;
489
490 if ( ! tmpc.m_fileUsageMax.empty())
491 {
492 if (cfg2bytes(tmpc.m_fileUsageBaseline, m_configuration.m_fileUsageBaseline, sP.Total, "files baseline") &&
493 cfg2bytes(tmpc.m_fileUsageNominal, m_configuration.m_fileUsageNominal, sP.Total, "files nominal") &&
494 cfg2bytes(tmpc.m_fileUsageMax, m_configuration.m_fileUsageMax, sP.Total, "files max"))
495 {
496 if (m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageNominal ||
497 m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageMax ||
498 m_configuration.m_fileUsageNominal >= m_configuration.m_fileUsageMax)
499 {
500 m_log.Emsg("ConfigParameters()", "pfc.diskusage files should have baseline < nominal < max.");
501 aOK = false;
502 }
503 }
504 else aOK = false;
505 }
506 }
507
508 // sets flush frequency
509 if ( ! tmpc.m_flushRaw.empty())
510 {
511 if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
512 {
513 if (XrdOuca2x::a2sz(m_log, "Error getting number of bytes written before flush", tmpc.m_flushRaw.c_str(),
514 &m_configuration.m_flushCnt,
515 100 * m_configuration.m_bufferSize , 100000 * m_configuration.m_bufferSize))
516 {
517 return false;
518 }
519 m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
520 }
521 else
522 {
523 if (XrdOuca2x::a2ll(m_log, "Error getting number of blocks written before flush", tmpc.m_flushRaw.c_str(),
524 &m_configuration.m_flushCnt, 100, 100000))
525 {
526 return false;
527 }
528 }
529 }
530
531 // get number of available RAM blocks after process configuration
532 if (m_configuration.m_RamAbsAvailable == 0)
533 {
534 m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
535 char buff[1024];
536 snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
537 m_log.Say("Config info: ", buff);
538 }
539 // Setup number of standard-size blocks not released back to the system to 5% of total RAM.
540 m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;
541
542 // Set tracing to debug if this is set in environment
543 char* cenv = getenv("XRDDEBUG");
544 if (cenv && ! strcmp(cenv,"1") && m_trace->What < 4) m_trace->What = 4;
545
546 if (aOK)
547 {
548 int loff = 0;
549// 000 001 010
550 const char *csc[] = {"off", "cache nonet", "nocache net notls",
551// 011
552 "cache net notls",
553// 100 101 110
554 "off", "cache nonet", "nocache net tls",
555// 111
556 "cache net tls"};
557 char buff[8192], uvk[32];
558 if (m_configuration.m_cs_UVKeep < 0)
559 strcpy(uvk, "lru");
560 else
561 sprintf(uvk, "%lld", (long long) m_configuration.m_cs_UVKeep);
562 float rg = (m_configuration.m_RamAbsAvailable) / float(1024*1024*1024);
563 loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
564 " pfc.cschk %s uvkeep %s\n"
565 " pfc.blocksize %lld\n"
566 " pfc.prefetch %d\n"
567 " pfc.ram %.fg\n"
568 " pfc.writequeue %d %d\n"
569 " # Total available disk: %lld\n"
570 " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
571 " pfc.spaces %s %s\n"
572 " pfc.trace %d\n"
573 " pfc.flush %lld\n"
574 " pfc.acchistorysize %d\n"
575 " pfc.onlyIfCachedMinBytes %lld\n"
576 " pfc.onlyIfCachedMinFrac %.2f\n",
577 config_filename,
578 csc[int(m_configuration.m_cs_Chk)], uvk,
579 m_configuration.m_bufferSize,
580 m_configuration.m_prefetch_max_blocks,
581 rg,
582 m_configuration.m_wqueue_blocks, m_configuration.m_wqueue_threads,
583 sP.Total,
584 m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM,
585 m_configuration.m_fileUsageBaseline, m_configuration.m_fileUsageNominal, m_configuration.m_fileUsageMax,
586 m_configuration.m_purgeInterval, m_configuration.m_purgeColdFilesAge,
587 m_configuration.m_data_space.c_str(),
588 m_configuration.m_meta_space.c_str(),
589 m_trace->What,
590 m_configuration.m_flushCnt,
591 m_configuration.m_accHistorySize,
592 m_configuration.m_onlyIfCachedMinSize,
593 m_configuration.m_onlyIfCachedMinFrac);
594
595 if (m_configuration.is_dir_stat_reporting_on())
596 {
597 loff += snprintf(buff + loff, sizeof(buff) - loff,
598 " pfc.dirstats maxdepth %d ((internal: store_depth %d, size_of_dirlist %d, size_of_globlist %d))\n",
599 m_configuration.m_dirStatsMaxDepth, m_configuration.m_dirStatsStoreDepth,
600 (int) m_configuration.m_dirStatsDirs.size(), (int) m_configuration.m_dirStatsDirGlobs.size());
601 loff += snprintf(buff + loff, sizeof(buff) - loff, " dirlist:\n");
602 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirs.begin(); i != m_configuration.m_dirStatsDirs.end(); ++i)
603 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s\n", i->c_str());
604 loff += snprintf(buff + loff, sizeof(buff) - loff, " globlist:\n");
605 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirGlobs.begin(); i != m_configuration.m_dirStatsDirGlobs.end(); ++i)
606 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s/*\n", i->c_str());
607 }
608
609 if (m_configuration.m_hdfsmode)
610 {
611 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.hdfsmode hdfsbsize %lld\n", m_configuration.m_hdfsbsize);
612 }
613
614 if (m_configuration.m_username.empty())
615 {
616 char unameBuff[256];
617 XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
618 m_configuration.m_username = unameBuff;
619 }
620 else
621 {
622 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.user %s\n", m_configuration.m_username.c_str());
623 }
624
625 m_log.Say(buff);
626
627 m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
628 }
629
630 // Derived settings
631 m_prefetch_enabled = m_configuration.m_prefetch_max_blocks > 0;
632 Info::s_maxNumAccess = m_configuration.m_accHistorySize;
633
634 m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");
635
636 m_log.Say("Config Proxy File Cache g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive");
637
638 m_log.Say("------ Proxy File Cache configuration parsing ", aOK ? "completed" : "failed");
639
640 if (ofsCfg) delete ofsCfg;
641
642 // XXXX-CKSUM Testing. To be removed after OssPgi is also merged and valildated.
643 // Building of xrdpfc_print fails when this is enabled.
644#ifdef XRDPFC_CKSUM_TEST
645 {
646 int xxx = m_configuration.m_cs_Chk;
647
648 for (m_configuration.m_cs_Chk = CSChk_None; m_configuration.m_cs_Chk <= CSChk_Both; ++m_configuration.m_cs_Chk)
649 {
650 Info::TestCksumStuff();
651 }
652
653 m_configuration.m_cs_Chk = xxx;
654 }
655#endif
656
657 return aOK;
658}
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition XrdPfc.cc:84
#define open
Definition XrdPosix.hh:71
bool Parse(TheLib what)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
@ theOssLib
Oss plugin.
long long Total
Definition XrdOssVS.hh:90
static int Export(const char *Var, const char *Val)
Definition XrdOucEnv.cc:170
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:257
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:70
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
static size_t s_maxNumAccess
std::string m_diskUsageLWM
Definition XrdPfc.hh:123
std::string m_diskUsageHWM
Definition XrdPfc.hh:124
std::string m_fileUsageBaseline
Definition XrdPfc.hh:125
std::string m_fileUsageNominal
Definition XrdPfc.hh:126
std::string m_flushRaw
Definition XrdPfc.hh:128
std::string m_fileUsageMax
Definition XrdPfc.hh:127

References XrdOuca2x::a2ll(), XrdOuca2x::a2sz(), Config(), XrdPfc::CSChk_Both, XrdPfc::CSChk_None, Error, XrdOucEnv::Export(), XrdOfsConfigPI::Load(), XrdPfc::TmpConfiguration::m_diskUsageHWM, XrdPfc::TmpConfiguration::m_diskUsageLWM, XrdPfc::TmpConfiguration::m_fileUsageBaseline, XrdPfc::TmpConfiguration::m_fileUsageMax, XrdPfc::TmpConfiguration::m_fileUsageNominal, XrdPfc::TmpConfiguration::m_flushRaw, XrdOfsConfigPI::New(), open, XrdOfsConfigPI::Parse(), XrdOfsConfigPI::Plugin(), XrdOfsConfigPI::Push(), XrdOucEnv::Put(), XrdPfc::Info::s_maxNumAccess, XrdOfsConfigPI::theOssLib, XrdOssVSInfo::Total, TRACE, XrdOucUtils::UserName(), and XrdOucGetCache().

Referenced by Config(), and XrdOucGetCache().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ ConsiderCached()

int Cache::ConsiderCached ( const char * curl)
virtual
Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why.
>0 - Reserved for future use.

Definition at line 1002 of file XrdPfc.cc.

1003{
1004 static const char* tpfx = "ConsiderCached ";
1005
1006 TRACE(Debug, tpfx << curl);
1007
1008 XrdCl::URL url(curl);
1009 std::string f_name = url.GetPath();
1010
1011 File *file = nullptr;
1012 {
1013 XrdSysCondVarHelper lock(&m_active_cond);
1014 auto it = m_active.find(f_name);
1015 if (it != m_active.end()) {
1016 file = it->second;
1017 inc_ref_cnt(file, false, false);
1018 }
1019 }
1020 if (file) {
1021 struct stat sbuff;
1022 int res = file->Fstat(sbuff);
1023 dec_ref_cnt(file, false);
1024 if (res)
1025 return res;
1026 // DecideIfConsideredCached() already called in File::Fstat().
1027 return sbuff.st_atime > 0 ? 0 : -EREMOTE;
1028 }
1029
1030 struct stat sbuff;
1031 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1032 if (res != XrdOssOK) {
1033 TRACE(Debug, tpfx << curl << " -> " << res);
1034 return res;
1035 }
1036 if (S_ISDIR(sbuff.st_mode))
1037 {
1038 TRACE(Debug, tpfx << curl << " -> EISDIR");
1039 return -EISDIR;
1040 }
1041
1042 long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1043 if (file_size < 0) {
1044 TRACE(Debug, tpfx << curl << " -> " << file_size);
1045 return (int) file_size;
1046 }
1047 bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1048
1049 return is_cached ? 0 : -EREMOTE;
1050}
#define XrdOssOK
Definition XrdOss.hh:50
#define stat(a, b)
Definition XrdPosix.hh:96
XrdOucString File
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition XrdPfc.cc:928
bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
Definition XrdPfc.cc:969
int Fstat(struct stat &sbuff)
static const char * s_infoExtension

References Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, stat, TRACE, and XrdOssOK.

Here is the call graph for this function:

◆ CreateInstance()

Cache & Cache::CreateInstance ( XrdSysLogger * logger,
XrdOucEnv * env )
static

Singleton creation.

Definition at line 156 of file XrdPfc.cc.

157{
158 assert (m_instance == 0);
159 m_instance = new Cache(logger, env);
160 return *m_instance;
161}
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition XrdPfc.cc:188

References Cache().

Referenced by XrdOucGetCache().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Decide()

bool Cache::Decide ( XrdOucCacheIO * io)

Makes decision if the original XrdOucCacheIO should be cached.

Parameters
&URL of file
Returns
decision if IO object will be cached.

Definition at line 167 of file XrdPfc.cc.

168{
169 if (! m_decisionpoints.empty())
170 {
171 XrdCl::URL url(io->Path());
172 std::string filename = url.GetPath();
173 std::vector<Decision*>::const_iterator it;
174 for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
175 {
176 XrdPfc::Decision *d = *it;
177 if (! d) continue;
178 if (! d->Decide(filename, *m_oss))
179 {
180 return false;
181 }
182 }
183 }
184
185 return true;
186}
virtual bool Decide(const std::string &, XrdOss &) const =0

References XrdPfc::Decision::Decide(), XrdCl::URL::GetPath(), and XrdOucCacheIO::Path().

Referenced by Attach().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ DecideIfConsideredCached()

bool Cache::DecideIfConsideredCached ( long long file_size,
long long bytes_on_disk )

Definition at line 969 of file XrdPfc.cc.

970{
971 if (file_size == 0 || bytes_on_disk >= file_size)
972 return true;
973
974 double frac_on_disk = (double) bytes_on_disk / file_size;
975
976 if (file_size <= m_configuration.m_onlyIfCachedMinSize)
977 {
978 if (frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
979 return true;
980 }
981 else
982 {
983 if (bytes_on_disk >= m_configuration.m_onlyIfCachedMinSize &&
984 frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
985 return true;
986 }
987 return false;
988}

Referenced by ConsiderCached(), and Stat().

Here is the caller graph for this function:

◆ DeRegisterPrefetchFile()

void Cache::DeRegisterPrefetchFile ( File * file)

Definition at line 713 of file XrdPfc.cc.

714{
715 // Can be called with other locks held.
716
717 if ( ! m_prefetch_enabled)
718 {
719 return;
720 }
721
722 m_prefetch_condVar.Lock();
723 for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
724 {
725 if (*it == file)
726 {
727 m_prefetchList.erase(it);
728 break;
729 }
730 }
731 m_prefetch_condVar.UnLock();
732}

◆ DetermineFullFileSize()

long long Cache::DetermineFullFileSize ( const std::string & cinfo_fname)

Definition at line 928 of file XrdPfc.cc.

929{
930 if (m_metaXattr) {
931 char pfn[4096];
932 m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
933 long long fsize = -1ll;
934 int res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
935 if (res == sizeof(long long))
936 {
937 return fsize;
938 }
939 else
940 {
941 TRACE(Debug, "DetermineFullFileSize error getting xattr " << res);
942 }
943 }
944
945 XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
946 XrdOucEnv env;
947 long long ret;
948 int res = infoFile->Open(cinfo_fname.c_str(), O_RDONLY, 0600, env);
949 if (res < 0) {
950 ret = res;
951 } else {
952 Info info(m_trace, 0);
953 if ( ! info.Read(infoFile, cinfo_fname.c_str())) {
954 ret = -EBADF;
955 } else {
956 ret = info.GetFileSize();
957 }
958 infoFile->Close();
959 }
960 delete infoFile;
961 return ret;
962}
XrdSysXAttr * XrdSysXAttrActive
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0

References XrdOssDF::Close(), Debug, XrdPfc::Info::GetFileSize(), XrdOssDF::Open(), XrdPfc::Info::Read(), TRACE, and XrdSysXAttrActive.

Referenced by ConsiderCached(), and Stat().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ ExecuteCommandUrl()

void Cache::ExecuteCommandUrl ( const std::string & command_url)

Definition at line 48 of file XrdPfcCommand.cc.

49{
50 static const char *top_epfx = "ExecuteCommandUrl ";
51
52 SplitParser cp(command_url, "/");
53
54 std::string token = cp.get_token();
55
56 if (token != "xrdpfc_command")
57 {
58 TRACE(Error, top_epfx << "First token is NOT xrdpfc_command.");
59 return;
60 }
61
62 // Get the command
63 token = cp.get_token();
64
65
66 //================================================================
67 // create_file
68 //================================================================
69
70 if (token == "create_file")
71 {
72 static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/create_file: ";
73 static const char* usage =
74 "Usage: create_file/ [-h] [-s filesize] [-b blocksize] [-t access_time] [-d access_duration]/<path>\n"
75 " Creates a cache file with given parameters. Data in file is random.\n"
76 " Useful for cache purge testing.\n"
77 "Notes:\n"
78 " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n"
79 " . Default filesize=1G, blocksize=<as configured>, access_time=-10, access_duration=10.\n"
80 " . -t and -d can be given multiple times to record several accesses.\n"
81 " . Negative arguments given to -t are interpreted as relative to now.\n";
82
83 const Configuration &conf = m_configuration;
84
85 token = cp.get_token();
86
87 TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
88
89 std::vector<char*> argv;
90 SplitParser ap(token, " ");
91 int argc = ap.fill_argv(argv);
92
93 long long file_size = ONE_GB;
94 long long block_size = conf.m_bufferSize;
95 int access_time [MAX_ACCESSES];
96 int access_duration[MAX_ACCESSES];
97 int at_count = 0, ad_count = 0;
98 XrdOucArgs Spec(&m_log, err_prefix, "hvs:b:t:d:",
99 "help", 1, "h",
100 "verbose", 1, "v",
101 "size", 1, "s",
102 "blocksize", 1, "b",
103 "time", 1, "t",
104 "duration", 1, "d",
105 (const char *) 0);
106
107 time_t time_now = time(0);
108
109 Spec.Set(argc, &argv[0]);
110 char theOpt;
111
112 while ((theOpt = Spec.getopt()) != (char) -1)
113 {
114 switch (theOpt)
115 {
116 case 'h': {
117 m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
118 return;
119 }
120 case 's': {
121 if (XrdOuca2x::a2sz(m_log, "Error getting filesize", Spec.getarg(),
122 &file_size, 0ll, 32 * ONE_GB))
123 return;
124 break;
125 }
126 case 'b': {
127 if (XrdOuca2x::a2sz(m_log, "Error getting blocksize", Spec.getarg(),
128 &block_size, 0ll, 64 * ONE_MB))
129 return;
130 break;
131 }
132 case 't': {
133 if (XrdOuca2x::a2i(m_log, "Error getting access time", Spec.getarg(),
134 &access_time[at_count++], INT_MIN, INT_MAX))
135 return;
136 break;
137 }
138 case 'd': {
139 if (XrdOuca2x::a2i(m_log, "Error getting access duration", Spec.getarg(),
140 &access_duration[ad_count++], 0, 24 * 3600))
141 return;
142 break;
143 }
144 default: {
145 TRACE(Error, err_prefix << "Unhandled command argument.");
146 return;
147 }
148 }
149 }
150 if (Spec.getarg())
151 {
152 TRACE(Error, err_prefix << "Options must take up all the arguments.");
153 return;
154 }
155
156 if (at_count < 1) access_time [at_count++] = time_now - 10;
157 if (ad_count < 1) access_duration[ad_count++] = 10;
158
159 if (at_count != ad_count)
160 {
161 TRACE(Error, err_prefix << "Options -t and -d must be given the same number of times.");
162 return;
163 }
164
165 std::string file_path (cp.get_reminder_with_delim());
166 std::string cinfo_path(file_path + Info::s_infoExtension);
167
168 TRACE(Debug, err_prefix << "Command arguments parsed successfully. Proceeding to create file " << file_path);
169
170 // Check if cinfo exists ... bail out if it does.
171 {
172 struct stat infoStat;
173 if (GetOss()->Stat(cinfo_path.c_str(), &infoStat) == XrdOssOK)
174 {
175 TRACE(Error, err_prefix << "cinfo file already exists for '" << file_path << "'. Refusing to overwrite.");
176 return;
177 }
178 }
179
180 TRACE(Debug, err_prefix << "Command arguments parsed successfully, proceeding to execution.");
181
182 {
183 const char *myUser = conf.m_username.c_str();
184 XrdOucEnv myEnv;
185
186 // Create the data file.
187
188 char size_str[32]; sprintf(size_str, "%lld", file_size);
189 myEnv.Put("oss.asize", size_str);
190 myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
191 int cret;
192 if ((cret = GetOss()->Create(myUser, file_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
193 {
194 TRACE(Error, err_prefix << "Create failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(-cret));
195 return;
196 }
197
198 XrdOssDF *myFile = GetOss()->newFile(myUser);
199 if ((cret = myFile->Open(file_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
200 {
201 TRACE(Error, err_prefix << "Open failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(-cret));
202 delete myFile;
203 return;
204 }
205
206 // Create the info file.
207
208 myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
209 myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
210 if ((cret = GetOss()->Create(myUser, cinfo_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
211 {
212 TRACE(Error, err_prefix << "Create failed for info file " << cinfo_path << ", " << ERRNO_AND_ERRSTR(-cret));
213 myFile->Close(); delete myFile;
214 return;
215 }
216
217 XrdOssDF *myInfoFile = GetOss()->newFile(myUser);
218 if ((cret = myInfoFile->Open(cinfo_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
219 {
220 TRACE(Error, err_prefix << "Open failed for info file " << cinfo_path << ", " << ERRNO_AND_ERRSTR(-cret));
221 delete myInfoFile;
222 myFile->Close(); delete myFile;
223 return;
224 }
225
226 // Allocate space for the data file.
227
228 if ((cret = posix_fallocate(myFile->getFD(), 0, file_size)))
229 {
230 TRACE(Error, err_prefix << "posix_fallocate failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(cret));
231 }
232
233 // Fill up cinfo.
234
235 Info myInfo(m_trace, false);
236 myInfo.SetBufferSizeFileSizeAndCreationTime(block_size, file_size);
237 myInfo.SetAllBitsSynced();
238
239 for (int i = 0; i < at_count; ++i)
240 {
241 time_t att_time = access_time[i] >= 0 ? access_time[i] : time_now + access_time[i];
242
243 myInfo.WriteIOStatSingle(file_size, att_time, att_time + access_duration[i]);
244 }
245
246 myInfo.Write(myInfoFile, cinfo_path.c_str());
247
248 myInfoFile->Close(); delete myInfoFile;
249 myFile->Close(); delete myFile;
250
251 TRACE(Info, err_prefix << "Created file '" << file_path << "', size=" << (file_size>>20) << "MB.");
252
253 {
254 XrdSysCondVarHelper lock(&m_writeQ.condVar);
255
256 m_writeQ.writes_between_purges += file_size;
257 }
258 }
259 }
260
261 //================================================================
262 // remove_file
263 //================================================================
264
265 else if (token == "remove_file")
266 {
267 static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/remove_file: ";
268 static const char* usage =
269 "Usage: remove_file/ [-h] /<path>\n"
270 " Removes given file from the cache unless it is currently open.\n"
271 " Useful for removal of stale files or duplicate files in a caching cluster.\n"
272 "Notes:\n"
273 " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n";
274
275 token = cp.get_token();
276
277 TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
278
279 std::vector<char*> argv;
280 SplitParser ap(token, " ");
281 int argc = ap.fill_argv(argv);
282
283 XrdOucArgs Spec(&m_log, err_prefix, "hvs:b:t:d:",
284 "help", 1, "h",
285 (const char *) 0);
286
287 Spec.Set(argc, &argv[0]);
288 char theOpt;
289
290 while ((theOpt = Spec.getopt()) != (char) -1)
291 {
292 switch (theOpt)
293 {
294 case 'h': {
295 m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
296 return;
297 }
298 default: {
299 TRACE(Error, err_prefix << "Unhandled command argument.");
300 return;
301 }
302 }
303 }
304 if (Spec.getarg())
305 {
306 TRACE(Error, err_prefix << "Options must take up all the arguments.");
307 return;
308 }
309
310 std::string f_name(cp.get_reminder());
311
312 TRACE(Debug, err_prefix << "file argument '" << f_name << "'.");
313
314 int ret = UnlinkFile(f_name, true);
315
316 TRACE(Info, err_prefix << "returned with status " << ret);
317 }
318
319 //================================================================
320 // unknown command
321 //================================================================
322
323 else
324 {
325 TRACE(Error, top_epfx << "Unknown or empty command '" << token << "'");
326 }
327}
struct stat Stat
Definition XrdCks.cc:49
void usage()
#define XRDOSS_mkpath
Definition XrdOss.hh:466
const int MAX_ACCESSES
const long long ONE_GB
const long long ONE_MB
#define ERRNO_AND_ERRSTR(err_code)
bool Create
virtual int getFD()
Definition XrdOss.hh:426
virtual XrdOssDF * newFile(const char *tident)=0
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition XrdOuca2x.cc:45
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1179
XrdOss * GetOss() const
Definition XrdPfc.hh:389
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:82
long long m_bufferSize
prefetch buffer size, default 1MB
Definition XrdPfc.hh:101
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:83
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:81

References XrdOuca2x::a2i(), XrdOuca2x::a2sz(), XrdOssDF::Close(), Create, Debug, ERRNO_AND_ERRSTR, Error, XrdPfc::SplitParser::fill_argv(), XrdPfc::SplitParser::get_reminder(), XrdPfc::SplitParser::get_reminder_with_delim(), XrdPfc::SplitParser::get_token(), XrdOucArgs::getarg(), XrdOssDF::getFD(), XrdOucArgs::getopt(), GetOss(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_meta_space, XrdPfc::Configuration::m_username, MAX_ACCESSES, XrdOss::newFile(), ONE_GB, ONE_MB, XrdOssDF::Open(), XrdOucEnv::Put(), XrdPfc::Info::s_infoExtension, XrdOucArgs::Set(), XrdPfc::Info::SetAllBitsSynced(), XrdPfc::Info::SetBufferSizeFileSizeAndCreationTime(), Stat, stat, TRACE, UnlinkFile(), usage(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), XRDOSS_mkpath, and XrdOssOK.

Here is the call graph for this function:

◆ FileSyncDone()

void Cache::FileSyncDone ( File * f,
bool high_debug )

Definition at line 568 of file XrdPfc.cc.

569{
570 dec_ref_cnt(f, high_debug);
571}

◆ GetFile()

File * Cache::GetFile ( const std::string & path,
IO * io,
long long off = 0,
long long filesize = 0 )

Definition at line 415 of file XrdPfc.cc.

416{
417 // Called from virtual IOFile constructor.
418
419 TRACE(Debug, "GetFile " << path << ", io " << io);
420
421 ActiveMap_i it;
422
423 {
424 XrdSysCondVarHelper lock(&m_active_cond);
425
426 while (true)
427 {
428 it = m_active.find(path);
429
430 // File is not open or being opened. Mark it as being opened and
431 // proceed to opening it outside of while loop.
432 if (it == m_active.end())
433 {
434 it = m_active.insert(std::make_pair(path, (File*) 0)).first;
435 break;
436 }
437
438 if (it->second != 0)
439 {
440 it->second->AddIO(io);
441 inc_ref_cnt(it->second, false, true);
442
443 return it->second;
444 }
445 else
446 {
447 // Wait for some change in m_active, then recheck.
448 m_active_cond.Wait();
449 }
450 }
451 }
452
453 // This is always true, now that IOFileBlock is unsupported.
454 if (filesize == 0)
455 {
456 struct stat st;
457 int res = io->Fstat(st);
458 if (res < 0) {
459 errno = res;
460 TRACE(Error, "GetFile, could not get valid stat");
461 } else if (res > 0) {
462 errno = ENOTSUP;
463 TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
464 } else {
465 filesize = st.st_size;
466 }
467 }
468
469 File *file = 0;
470
471 if (filesize >= 0)
472 {
473 file = File::FileOpen(path, off, filesize);
474 }
475
476 {
477 XrdSysCondVarHelper lock(&m_active_cond);
478
479 if (file)
480 {
481 inc_ref_cnt(file, false, true);
482 it->second = file;
483
484 file->AddIO(io);
485 }
486 else
487 {
488 m_active.erase(it);
489 }
490
491 m_active_cond.Broadcast();
492 }
493
494 return file;
495}
virtual int Fstat(struct stat &sbuff)
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
void AddIO(IO *io)

References XrdPfc::File::AddIO(), Debug, Error, XrdPfc::File::FileOpen(), XrdOucCacheIO::Fstat(), stat, and TRACE.

Referenced by XrdPfc::IOFile::IOFile().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GetGStream()

XrdXrootdGStream * XrdPfc::Cache::GetGStream ( )
inline

Definition at line 404 of file XrdPfc.hh.

404{ return m_gstream; }

◆ GetInstance()

Cache & Cache::GetInstance ( )
static

Singleton access.

Definition at line 163 of file XrdPfc.cc.

163{ return *m_instance; }

References Cache().

Referenced by XrdPfc::IOFile::IOFile(), XrdPfc::IOFileBlock::IOFileBlock(), Attach(), XrdPfc::IOFile::DetachFinalize(), XrdPfc::File::GetLog(), XrdPfc::File::GetTrace(), XrdPfc::GetTrace(), PrefetchThread(), ProcessWriteTaskThread(), PurgeThread(), ResourceMonitorHeartBeatThread(), XrdPfc::File::Sync(), and XrdPfc::File::WriteBlockToDisk().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GetLog()

XrdSysError * XrdPfc::Cache::GetLog ( )
inline

Definition at line 401 of file XrdPfc.hh.

401{ return &m_log; }

Referenced by XrdPfc::File::GetLog().

Here is the caller graph for this function:

◆ GetNextFileToPrefetch()

File * Cache::GetNextFileToPrefetch ( )

Definition at line 735 of file XrdPfc.cc.

736{
737 m_prefetch_condVar.Lock();
738 while (m_prefetchList.empty())
739 {
740 m_prefetch_condVar.Wait();
741 }
742
743 // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
744
745 size_t l = m_prefetchList.size();
746 int idx = rand() % l;
747 File* f = m_prefetchList[idx];
748
749 m_prefetch_condVar.UnLock();
750 return f;
751}

Referenced by Prefetch().

Here is the caller graph for this function:

◆ GetOss()

XrdOss * XrdPfc::Cache::GetOss ( ) const
inline

Definition at line 389 of file XrdPfc.hh.

389{ return m_oss; }

Referenced by ExecuteCommandUrl().

Here is the caller graph for this function:

◆ GetTrace()

XrdSysTrace * XrdPfc::Cache::GetTrace ( )
inline

Definition at line 402 of file XrdPfc.hh.

402{ return m_trace; }

Referenced by XrdPfc::File::GetTrace(), and XrdPfc::GetTrace().

Here is the caller graph for this function:

◆ IsFileActiveOrPurgeProtected()

bool Cache::IsFileActiveOrPurgeProtected ( const std::string & path)

Definition at line 684 of file XrdPfc.cc.

685{
686 XrdSysCondVarHelper lock(&m_active_cond);
687
688 return m_active.find(path) != m_active.end() ||
689 m_purge_delay_set.find(path) != m_purge_delay_set.end();
690}

Referenced by Purge().

Here is the caller graph for this function:

◆ LocalFilePath()

int Cache::LocalFilePath ( const char * curl,
char * buff = 0,
int blen = 0,
LFP_Reason why = ForAccess,
bool forall = false )
virtual

Get the path to a file that is complete in the local cache. By default, the file must be complete in the cache (i.e. no blocks are missing). This can be overridden. This path can be used to access the file on the local node.

Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why. If a buffer was supplied and a path could be generated it is returned only if "why" is ForCheck or ForInfo. Otherwise, a null path is returned.
>0 - Reserved for future use.

Reimplemented from XrdOucCache.

Definition at line 796 of file XrdPfc.cc.

798{
799 static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
800 static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
801 static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
802
803 TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
804
805 if (buff && blen > 0) buff[0] = 0;
806
807 XrdCl::URL url(curl);
808 std::string f_name = url.GetPath();
809 std::string i_name = f_name + Info::s_infoExtension;
810
811 if (why == ForPath)
812 {
813 int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
814 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
815 return ret;
816 }
817
818 {
819 XrdSysCondVarHelper lock(&m_active_cond);
820 m_purge_delay_set.insert(f_name);
821 }
822
823 struct stat sbuff, sbuff2;
824 if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
825 m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
826 {
827 if (S_ISDIR(sbuff.st_mode))
828 {
829 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
830 return -EISDIR;
831 }
832 else
833 {
834 bool read_ok = false;
835 bool is_complete = false;
836
837 // Lock and check if the file is active. If NOT, keep the lock
838 // and add dummy access after successful reading of info file.
839 // If it IS active, just release the lock, this ongoing access will
840 // assure the file continues to exist.
841
842 // XXXX How can I just loop over the cinfo file when active?
843 // Can I not get is_complete from the existing file?
844 // Do I still want to inject access record?
845 // Oh, it writes only if not active .... still let's try to use existing File.
846
847 m_active_cond.Lock();
848
849 bool is_active = m_active.find(f_name) != m_active.end();
850
851 if (is_active) m_active_cond.UnLock();
852
853 XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
854 XrdOucEnv myEnv;
855 int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
856 if (res >= 0)
857 {
858 Info info(m_trace, 0);
859 if (info.Read(infoFile, i_name.c_str()))
860 {
861 read_ok = true;
862
863 is_complete = info.IsComplete();
864
865 // Add full-size access if reason is for access.
866 if ( ! is_active && is_complete && why == ForAccess)
867 {
868 info.WriteIOStatSingle(info.GetFileSize());
869 info.Write(infoFile, i_name.c_str());
870 }
871 }
872 infoFile->Close();
873 }
874 delete infoFile;
875
876 if ( ! is_active) m_active_cond.UnLock();
877
878 if (read_ok)
879 {
880 if ((is_complete || why == ForInfo) && buff != 0)
881 {
882 int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
883 if (res2 < 0)
884 return res2;
885
886 // Normally, files are owned by us but when direct cache access
887 // is wanted and possible, make sure the file is world readable.
888 if (why == ForAccess)
889 {mode_t mode = (forall ? worldReadable : groupReadable);
890 if (((sbuff.st_mode & worldReadable) != mode)
891 && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
892 {is_complete = false;
893 *buff = 0;
894 }
895 }
896 }
897
898 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
899 (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
900
901 return is_complete ? 0 : -EREMOTE;
902 }
903 }
904 }
905
906 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
907 return -ENOENT;
908}

References XrdOssDF::Close(), Debug, XrdOucCache::ForAccess, XrdOucCache::ForInfo, XrdOucCache::ForPath, XrdPfc::Info::GetFileSize(), XrdCl::URL::GetPath(), XrdPfc::Info::IsComplete(), XrdOssDF::Open(), XrdPfc::Info::Read(), XrdPfc::Info::s_infoExtension, stat, TRACE, XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), and XrdOssOK.

Here is the call graph for this function:

◆ Prefetch()

void Cache::Prefetch ( )

Definition at line 754 of file XrdPfc.cc.

755{
756 const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
757
758 while (true)
759 {
760 m_RAM_mutex.Lock();
761 bool doPrefetch = (m_RAM_used < limit_RAM);
762 m_RAM_mutex.UnLock();
763
764 if (doPrefetch)
765 {
767 f->Prefetch();
768 }
769 else
770 {
772 }
773 }
774}
File * GetNextFileToPrefetch()
Definition XrdPfc.cc:735
static void Wait(int milliseconds)

References GetNextFileToPrefetch(), XrdPfc::File::Prefetch(), and XrdSysTimer::Wait().

Referenced by PrefetchThread().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Prepare()

int Cache::Prepare ( const char * curl,
int oflags,
mode_t mode )
virtual

Preapare the cache for a file open request. This method is called prior to actually opening a file. This method is meant to allow defering an open request or implementing the full I/O stack in the cache layer.

Returns
<0 Error has occurred, return value is -errno; fail open request. =0 Continue with open() request. >0 Defer open but treat the file as actually being open. Use the XrdOucCacheIO::Open() method to open the file at a later time.

Reimplemented from XrdOucCache.

Definition at line 1062 of file XrdPfc.cc.

1063{
1064 XrdCl::URL url(curl);
1065 std::string f_name = url.GetPath();
1066 std::string i_name = f_name + Info::s_infoExtension;
1067
1068 // Do not allow write access.
1069 if (oflags & (O_WRONLY | O_RDWR | O_APPEND | O_CREAT))
1070 {
1071 TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1072 return -EROFS;
1073 }
1074
1075 // Intercept xrdpfc_command requests.
1076 if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1077 {
1078 // Schedule a job to process command request.
1079 {
1080 CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1081
1082 schedP->Schedule(ce);
1083 }
1084
1085 return -EAGAIN;
1086 }
1087
1088 {
1089 XrdSysCondVarHelper lock(&m_active_cond);
1090 m_purge_delay_set.insert(f_name);
1091 }
1092
1093 struct stat sbuff;
1094 if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK)
1095 {
1096 TRACE(Dump, "Prepare defer open " << f_name);
1097 return 1;
1098 }
1099 else
1100 {
1101 return 0;
1102 }
1103}
static XrdScheduler * schedP
Definition XrdPfc.hh:408

References XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, schedP, stat, TRACE, and XrdOssOK.

Here is the call graph for this function:

◆ ProcessWriteTasks()

void Cache::ProcessWriteTasks ( )

Separate task which writes blocks from ram to disk.

Definition at line 308 of file XrdPfc.cc.

309{
310 std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
311
312 while (true)
313 {
314 m_writeQ.condVar.Lock();
315 while (m_writeQ.size == 0)
316 {
317 m_writeQ.condVar.Wait();
318 }
319
320 // MT -- optimize to pop several blocks if they are available (or swap the list).
321 // This makes sense especially for smallish block sizes.
322
323 int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
324 long long sum_size = 0;
325
326 for (int bi = 0; bi < n_pushed; ++bi)
327 {
328 Block* block = m_writeQ.queue.front();
329 m_writeQ.queue.pop_front();
330 m_writeQ.writes_between_purges += block->get_size();
331 sum_size += block->get_size();
332
333 blks_to_write[bi] = block;
334
335 TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
336 }
337 m_writeQ.size -= n_pushed;
338
339 m_writeQ.condVar.UnLock();
340
341 {
342 XrdSysMutexHelper lock(&m_RAM_mutex);
343 m_RAM_write_queue -= sum_size;
344 }
345
346 for (int bi = 0; bi < n_pushed; ++bi)
347 {
348 Block* block = blks_to_write[bi];
349
350 block->m_file->WriteBlockToDisk(block);
351 }
352 }
353}
const char * lPath() const
Log path.
void WriteBlockToDisk(Block *b)

References XrdPfc::Block::get_size(), XrdPfc::File::lPath(), XrdPfc::Block::m_file, TRACE, and XrdPfc::File::WriteBlockToDisk().

Referenced by ProcessWriteTaskThread().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Purge()

void XrdPfc::Cache::Purge ( )

Thread function invoked to scan and purge files from disk when needed.

Definition at line 698 of file XrdPfcPurge.cc.

699{
700 static const char *trc_pfx = "Purge() ";
701
702 XrdOucEnv env;
703 long long disk_usage;
704 long long estimated_file_usage = m_configuration.m_diskUsageHWM;
705
706 // Pause before initial run
707 sleep(1);
708
709 m_fs_state = new DataFsState;
710
711 // { PathTokenizer p("/a/b/c/f.root", 2, true); p.deboog(); }
712 // { PathTokenizer p("/a/b/f.root", 2, true); p.deboog(); }
713 // { PathTokenizer p("/a/f.root", 2, true); p.deboog(); }
714 // { PathTokenizer p("/f.root", 2, true); p.deboog(); }
715
716 int age_based_purge_countdown = 0; // enforce on first purge loop entry.
717 bool is_first = true;
718
719 while (true)
720 {
721 time_t purge_start = time(0);
722
723 {
724 XrdSysCondVarHelper lock(&m_active_cond);
725
726 m_in_purge = true;
727 }
728
729 TRACE(Info, trc_pfx << "Started.");
730
731 // Bytes to remove based on total disk usage (d) and file usage (f).
732 long long bytesToRemove_d = 0, bytesToRemove_f = 0;
733
734 // get amount of space to potentially erase based on total disk usage
735 XrdOssVSInfo sP; // Make sure we start when a clean slate in each loop
736 if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
737 {
738 TRACE(Error, trc_pfx << "can't get StatVS for oss space " << m_configuration.m_data_space);
739 continue;
740 }
741 else
742 {
743 disk_usage = sP.Total - sP.Free;
744 TRACE(Debug, trc_pfx << "used disk space " << disk_usage << " bytes.");
745
746 if (disk_usage > m_configuration.m_diskUsageHWM)
747 {
748 bytesToRemove_d = disk_usage - m_configuration.m_diskUsageLWM;
749 }
750 }
751
752 // estimate amount of space to erase based on file usage
753 if (m_configuration.are_file_usage_limits_set())
754 {
755 long long estimated_writes_since_last_purge;
756 {
757 XrdSysCondVarHelper lock(&m_writeQ.condVar);
758
759 estimated_writes_since_last_purge = m_writeQ.writes_between_purges;
760 m_writeQ.writes_between_purges = 0;
761 }
762 estimated_file_usage += estimated_writes_since_last_purge;
763
764 TRACE(Debug, trc_pfx << "estimated usage by files " << estimated_file_usage << " bytes.");
765
766 bytesToRemove_f = std::max(estimated_file_usage - m_configuration.m_fileUsageNominal, 0ll);
767
768 // Here we estimate fractional usages -- to decide if full scan is necessary before actual purge.
769 double frac_du = 0, frac_fu = 0;
770 m_configuration.calculate_fractional_usages(disk_usage, estimated_file_usage, frac_du, frac_fu);
771
772 if (frac_fu > 1.0 - frac_du)
773 {
774 bytesToRemove_f = std::max(bytesToRemove_f, disk_usage - m_configuration.m_diskUsageLWM);
775 }
776 }
777
778 long long bytesToRemove = std::max(bytesToRemove_d, bytesToRemove_f);
779
780 bool enforce_age_based_purge = false;
781 if (m_configuration.is_age_based_purge_in_effect() || m_configuration.is_uvkeep_purge_in_effect())
782 {
783 // XXXX ... I could collect those guys in larger vectors (maps?) and do traversal when
784 // they are empty.
785 if (--age_based_purge_countdown <= 0)
786 {
787 enforce_age_based_purge = true;
788 age_based_purge_countdown = m_configuration.m_purgeAgeBasedPeriod;
789 }
790 }
791
792 bool enforce_traversal_for_usage_collection = is_first;
793 // XXX Other conditions? Periodic checks?
794
795 copy_out_active_stats_and_update_data_fs_state();
796
797 TRACE(Debug, trc_pfx << "Precheck:");
798 TRACE(Debug, "\tbytes_to_remove_disk = " << bytesToRemove_d << " B");
799 TRACE(Debug, "\tbytes_to remove_files = " << bytesToRemove_f << " B (" << (is_first ? "max possible for initial run" : "estimated") << ")");
800 TRACE(Debug, "\tbytes_to_remove = " << bytesToRemove << " B");
801 TRACE(Debug, "\tenforce_age_based_purge = " << enforce_age_based_purge);
802 is_first = false;
803
804 long long bytesToRemove_at_start = 0; // set after file scan
805 int deleted_file_count = 0;
806
807 bool purge_required = (bytesToRemove > 0 || enforce_age_based_purge);
808
809 // XXXX-PurgeOpt Need to retain this state between purges so I can avoid doing
810 // the traversal more often than really needed.
811 FPurgeState purgeState(2 * bytesToRemove, *m_oss); // prepare twice more volume than required
812
813 if (purge_required || enforce_traversal_for_usage_collection)
814 {
815 // Make a sorted map of file paths sorted by access time.
816
817 if (m_configuration.is_age_based_purge_in_effect())
818 {
819 purgeState.setMinTime(time(0) - m_configuration.m_purgeColdFilesAge);
820 }
821 if (m_configuration.is_uvkeep_purge_in_effect())
822 {
823 purgeState.setUVKeepMinTime(time(0) - m_configuration.m_cs_UVKeep);
824 }
825
826 XrdOssDF* dh = m_oss->newDir(m_configuration.m_username.c_str());
827 if (dh->Opendir("/", env) == XrdOssOK)
828 {
829 purgeState.begin_traversal(m_fs_state->get_root());
830
831 purgeState.TraverseNamespace(dh);
832
833 purgeState.end_traversal();
834
835 dh->Close();
836 }
837 delete dh; dh = 0;
838
839 estimated_file_usage = purgeState.getNBytesTotal();
840
841 TRACE(Debug, trc_pfx << "actual usage by files " << estimated_file_usage << " bytes.");
842
843 // Adjust bytesToRemove_f and then bytesToRemove based on actual file usage,
844 // possibly retreating below nominal file usage (but not below baseline file usage).
845 if (m_configuration.are_file_usage_limits_set())
846 {
847 bytesToRemove_f = std::max(estimated_file_usage - m_configuration.m_fileUsageNominal, 0ll);
848
849 double frac_du = 0, frac_fu = 0;
850 m_configuration.calculate_fractional_usages(disk_usage, estimated_file_usage, frac_du, frac_fu);
851
852 if (frac_fu > 1.0 - frac_du)
853 {
854 bytesToRemove = std::max(bytesToRemove_f, disk_usage - m_configuration.m_diskUsageLWM);
855 bytesToRemove = std::min(bytesToRemove, estimated_file_usage - m_configuration.m_fileUsageBaseline);
856 }
857 else
858 {
859 bytesToRemove = std::max(bytesToRemove_d, bytesToRemove_f);
860 }
861 }
862 else
863 {
864 bytesToRemove = std::max(bytesToRemove_d, bytesToRemove_f);
865 }
866 bytesToRemove_at_start = bytesToRemove;
867
868 TRACE(Debug, trc_pfx << "After scan:");
869 TRACE(Debug, "\tbytes_to_remove_disk = " << bytesToRemove_d << " B");
870 TRACE(Debug, "\tbytes_to remove_files = " << bytesToRemove_f << " B (measured)");
871 TRACE(Debug, "\tbytes_to_remove = " << bytesToRemove << " B");
872 TRACE(Debug, "\tenforce_age_based_purge = " << enforce_age_based_purge);
873 TRACE(Debug, "\tmin_time = " << purgeState.getMinTime());
874
875 if (enforce_age_based_purge)
876 {
877 purgeState.MoveListEntriesToMap();
878 }
879 }
880
881 // Dump statistcs before actual purging so maximum usage values get recorded.
882 // Should really go to gstream --- and should really go from Heartbeat.
883 if (m_configuration.is_dir_stat_reporting_on())
884 {
885 m_fs_state->dump_recursively();
886 }
887
888 if (purge_required)
889 {
890 // Loop over map and remove files with oldest values of access time.
891 struct stat fstat;
892 size_t info_ext_len = strlen(Info::s_infoExtension);
893 int protected_cnt = 0;
894 long long protected_sum = 0;
895 for (FPurgeState::map_i it = purgeState.m_fmap.begin(); it != purgeState.m_fmap.end(); ++it)
896 {
897 // Finish when enough space has been freed but not while age-based purging is in progress.
898 // Those files are marked with time-stamp = 0.
899 if (bytesToRemove <= 0 && ! (enforce_age_based_purge && it->first == 0))
900 {
901 break;
902 }
903
904 std::string &infoPath = it->second.path;
905 std::string dataPath = infoPath.substr(0, infoPath.size() - info_ext_len);
906
907 if (IsFileActiveOrPurgeProtected(dataPath))
908 {
909 ++protected_cnt;
910 protected_sum += it->second.nBytes;
911 TRACE(Debug, trc_pfx << "File is active or purge-protected: " << dataPath << " size: " << it->second.nBytes);
912 continue;
913 }
914
915 // remove info file
916 if (m_oss->Stat(infoPath.c_str(), &fstat) == XrdOssOK)
917 {
918 // cinfo file can be on another oss.space, do not subtract for now.
919 // Could be relevant for very small block sizes.
920 // bytesToRemove -= fstat.st_size;
921 // estimated_file_usage -= fstat.st_size;
922 // ++deleted_file_count;
923
924 m_oss->Unlink(infoPath.c_str());
925 TRACE(Dump, trc_pfx << "Removed file: '" << infoPath << "' size: " << fstat.st_size);
926 }
927
928 // remove data file
929 if (m_oss->Stat(dataPath.c_str(), &fstat) == XrdOssOK)
930 {
931 bytesToRemove -= it->second.nBytes;
932 estimated_file_usage -= it->second.nBytes;
933 ++deleted_file_count;
934
935 m_oss->Unlink(dataPath.c_str());
936 TRACE(Dump, trc_pfx << "Removed file: '" << dataPath << "' size: " << it->second.nBytes << ", time: " << it->first);
937
938 if (it->second.dirState != 0) // XXXX This should now always be true.
939 it->second.dirState->add_usage_purged(it->second.nBytes);
940 else
941 TRACE(Error, trc_pfx << "DirState not set for file '" << dataPath << "'.");
942 }
943 }
944 if (protected_cnt > 0)
945 {
946 TRACE(Info, trc_pfx << "Encountered " << protected_cnt << " protected files, sum of their size: " << protected_sum);
947 }
948
949 m_fs_state->upward_propagate_usage_purged();
950 }
951
952 {
953 XrdSysCondVarHelper lock(&m_active_cond);
954
955 m_purge_delay_set.clear();
956 m_in_purge = false;
957 }
958
959 int purge_duration = time(0) - purge_start;
960
961 TRACE(Info, trc_pfx << "Finished, removed " << deleted_file_count << " data files, total size " <<
962 bytesToRemove_at_start - bytesToRemove << ", bytes to remove at end " << bytesToRemove << ", purge duration " << purge_duration);
963
964 int sleep_time = m_configuration.m_purgeInterval - purge_duration;
965 if (sleep_time > 0)
966 {
967 sleep(sleep_time);
968 }
969 }
970}
#define fstat(a, b)
Definition XrdPosix.hh:57
virtual int Opendir(const char *path, XrdOucEnv &env)
Definition XrdOss.hh:79
long long Free
Definition XrdOssVS.hh:91
bool IsFileActiveOrPurgeProtected(const std::string &)
Definition XrdPfc.cc:684
map_t::iterator map_i

References XrdPfc::FPurgeState::begin_traversal(), XrdOssDF::Close(), Debug, XrdPfc::FPurgeState::end_traversal(), Error, XrdOssVSInfo::Free, fstat, XrdPfc::FPurgeState::getMinTime(), XrdPfc::FPurgeState::getNBytesTotal(), IsFileActiveOrPurgeProtected(), XrdPfc::FPurgeState::m_fmap, XrdPfc::FPurgeState::MoveListEntriesToMap(), XrdOssDF::Opendir(), XrdPfc::Info::s_infoExtension, XrdPfc::FPurgeState::setMinTime(), XrdPfc::FPurgeState::setUVKeepMinTime(), stat, XrdOssVSInfo::Total, TRACE, XrdPfc::FPurgeState::TraverseNamespace(), and XrdOssOK.

Referenced by PurgeThread().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ RefConfiguration()

const Configuration & XrdPfc::Cache::RefConfiguration ( ) const
inline

Reference XrdPfc configuration.

Definition at line 319 of file XrdPfc.hh.

319{ return m_configuration; }

Referenced by XrdPfc::IOFileBlock::IOFileBlock(), Attach(), and XrdOucGetCache().

Here is the caller graph for this function:

◆ RegisterPrefetchFile()

void Cache::RegisterPrefetchFile ( File * file)

Definition at line 697 of file XrdPfc.cc.

698{
699 // Can be called with other locks held.
700
701 if ( ! m_prefetch_enabled)
702 {
703 return;
704 }
705
706 m_prefetch_condVar.Lock();
707 m_prefetchList.push_back(file);
708 m_prefetch_condVar.Signal();
709 m_prefetch_condVar.UnLock();
710}

◆ ReleaseFile()

void Cache::ReleaseFile ( File * f,
IO * io )

Definition at line 497 of file XrdPfc.cc.

498{
499 // Called from virtual IO::DetachFinalize.
500
501 TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
502
503 {
504 XrdSysCondVarHelper lock(&m_active_cond);
505
506 f->RemoveIO(io);
507 }
508 dec_ref_cnt(f, true);
509}
void RemoveIO(IO *io)

References Debug, XrdPfc::File::GetLocalPath(), XrdPfc::File::RemoveIO(), and TRACE.

Referenced by XrdPfc::IOFile::DetachFinalize().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ ReleaseRAM()

void Cache::ReleaseRAM ( char * buf,
long long size )

Definition at line 397 of file XrdPfc.cc.

398{
399 bool std_size = (size == m_configuration.m_bufferSize);
400 {
401 XrdSysMutexHelper lock(&m_RAM_mutex);
402
403 m_RAM_used -= size;
404
405 if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
406 {
407 m_RAM_std_blocks.push_back(buf);
408 ++m_RAM_std_size;
409 return;
410 }
411 }
412 free(buf);
413}

◆ RemoveWriteQEntriesFor()

void Cache::RemoveWriteQEntriesFor ( File * f)

Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.

Definition at line 275 of file XrdPfc.cc.

276{
277 std::list<Block*> removed_blocks;
278 long long sum_size = 0;
279
280 m_writeQ.condVar.Lock();
281 std::list<Block*>::iterator i = m_writeQ.queue.begin();
282 while (i != m_writeQ.queue.end())
283 {
284 if ((*i)->m_file == file)
285 {
286 TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
287 std::list<Block*>::iterator j = i++;
288 removed_blocks.push_back(*j);
289 sum_size += (*j)->get_size();
290 m_writeQ.queue.erase(j);
291 --m_writeQ.size;
292 }
293 else
294 {
295 ++i;
296 }
297 }
298 m_writeQ.condVar.UnLock();
299
300 {
301 XrdSysMutexHelper lock(&m_RAM_mutex);
302 m_RAM_write_queue -= sum_size;
303 }
304
305 file->BlocksRemovedFromWriteQ(removed_blocks);
306}

References XrdPfc::File::BlocksRemovedFromWriteQ(), XrdPfc::File::lPath(), and TRACE.

Referenced by UnlinkFile().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ RequestRAM()

char * Cache::RequestRAM ( long long size)

Definition at line 357 of file XrdPfc.cc.

358{
359 static const size_t s_block_align = sysconf(_SC_PAGESIZE);
360
361 bool std_size = (size == m_configuration.m_bufferSize);
362
363 m_RAM_mutex.Lock();
364
365 long long total = m_RAM_used + size;
366
367 if (total <= m_configuration.m_RamAbsAvailable)
368 {
369 m_RAM_used = total;
370 if (std_size && m_RAM_std_size > 0)
371 {
372 char *buf = m_RAM_std_blocks.back();
373 m_RAM_std_blocks.pop_back();
374 --m_RAM_std_size;
375
376 m_RAM_mutex.UnLock();
377
378 return buf;
379 }
380 else
381 {
382 m_RAM_mutex.UnLock();
383 char *buf;
384 if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
385 {
386 // Report out of mem? Probably should report it at least the first time,
387 // then periodically.
388 return 0;
389 }
390 return buf;
391 }
392 }
393 m_RAM_mutex.UnLock();
394 return 0;
395}

◆ ResourceMonitorHeartBeat()

void XrdPfc::Cache::ResourceMonitorHeartBeat ( )

Thread function checking resource usage periodically.

Definition at line 606 of file XrdPfcPurge.cc.

607{
608 // static const char *trc_pfx = "ResourceMonitorHeartBeat() ";
609
610 // Pause before initial run
611 sleep(1);
612
613 // XXXX Setup initial / constant stats (total RAM, total disk, ???)
614
615 XrdOucCacheStats &S = Statistics;
616 XrdOucCacheStats::CacheStats &X = Statistics.X;
617
618 S.Lock();
619
620 X.DiskSize = m_configuration.m_diskTotalSpace;
621
622 X.MemSize = m_configuration.m_RamAbsAvailable;
623
624 S.UnLock();
625
626 // XXXX Schedule initial disk scan, time it!
627 //
628 // TRACE(Info, trc_pfx << "scheduling intial disk scan.");
629 // schedP->Schedule( new ScanAndPurgeJob("XrdPfc::ScanAndPurge") );
630 //
631 // bool scan_and_purge_running = true;
632
633 // XXXX Could we really hold last-usage for all files in memory?
634
635 // XXXX Think how to handle disk-full, scan/purge not finishing:
636 // - start dropping things out of write queue, but only when RAM gets near full;
637 // - monitoring this then becomes a high-priority job, inner loop with sleep of,
638 // say, 5 or 10 seconds.
639
640 while (true)
641 {
642 time_t heartbeat_start = time(0);
643
644 // TRACE(Info, trc_pfx << "HeartBeat starting ...");
645
646 // if sumary monitoring configured, pupulate OucCacheStats:
647 S.Lock();
648
649 // - available / used disk space (files usage calculated elsewhere (maybe))
650
651 // - RAM usage
652 { XrdSysMutexHelper lck(&m_RAM_mutex);
653 X.MemUsed = m_RAM_used;
654 X.MemWriteQ = m_RAM_write_queue;
655 }
656 // - files opened / closed etc
657
658 // do estimate of available space
659 S.UnLock();
660
661 // if needed, schedule purge in a different thread.
662 // purge is:
663 // - deep scan + gather FSPurgeState
664 // - actual purge
665 //
666 // this thread can continue running and, if needed, stop writing to disk
667 // if purge is taking too long.
668
669 // think how data is passed / synchronized between this and purge thread
670
671 // !!!! think how stat collection is done and propgated upwards;
672 // until now it was done once per purge-interval.
673 // now stats will be added up more often, but purge will be done
674 // only occasionally.
675 // also, do we report cumulative values or deltas? cumulative should
676 // be easier and consistent with summary data.
677 // still, some are state - like disk usage, num of files.
678
679 // Do we take care of directories that need to be newly added into DirState hierarchy?
680 // I.e., when user creates new directories and these are covered by either full
681 // spec or by root + depth declaration.
682
683 int heartbeat_duration = time(0) - heartbeat_start;
684
685 // TRACE(Info, trc_pfx << "HeartBeat finished, heartbeat_duration " << heartbeat_duration);
686
687 // int sleep_time = m_configuration.m_purgeInterval - heartbeat_duration;
688 int sleep_time = 60 - heartbeat_duration;
689 if (sleep_time > 0)
690 {
691 sleep(sleep_time);
692 }
693 }
694}
XrdOucCacheStats Statistics

References XrdOucCacheStats::CacheStats::DiskSize, XrdOucCacheStats::Lock(), XrdOucCacheStats::CacheStats::MemSize, XrdOucCacheStats::CacheStats::MemUsed, XrdOucCacheStats::CacheStats::MemWriteQ, XrdOucCache::Statistics, and XrdOucCacheStats::UnLock().

Referenced by ResourceMonitorHeartBeatThread().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ ScheduleFileSync()

void XrdPfc::Cache::ScheduleFileSync ( File * f)
inline

Definition at line 397 of file XrdPfc.hh.

397{ schedule_file_sync(f, false, false); }

◆ Stat()

int Cache::Stat ( const char * curl,
struct stat & sbuff )
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information. >0 - Stat could not be done, forward operation to next level.

Reimplemented from XrdOucCache.

Definition at line 1113 of file XrdPfc.cc.

1114{
1115 const char *tpfx = "Stat ";
1116
1117 XrdCl::URL url(curl);
1118 std::string f_name = url.GetPath();
1119
1120 File *file = nullptr;
1121 {
1122 XrdSysCondVarHelper lock(&m_active_cond);
1123 auto it = m_active.find(f_name);
1124 if (it != m_active.end()) {
1125 file = it->second;
1126 inc_ref_cnt(file, false, false);
1127 }
1128 }
1129 if (file) {
1130 int res = file->Fstat(sbuff);
1131 dec_ref_cnt(file, false);
1132 TRACE(Debug, tpfx << "from active file " << curl << " -> " << res);
1133 return res;
1134 }
1135
1136 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1137 if (res != XrdOssOK) {
1138 TRACE(Debug, tpfx << curl << " -> " << res);
1139 return 1; // res; -- for only-if-cached
1140 }
1141 if (S_ISDIR(sbuff.st_mode))
1142 {
1143 TRACE(Debug, tpfx << curl << " -> EISDIR");
1144 return -EISDIR;
1145 }
1146
1147 long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1148 if (file_size < 0) {
1149 TRACE(Debug, tpfx << curl << " -> " << file_size);
1150 return 1; // (int) file_size; -- for only-if-cached
1151 }
1152 sbuff.st_size = file_size;
1153 bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1154 if ( ! is_cached)
1155 sbuff.st_atime = 0;
1156
1157 TRACE(Debug, tpfx << "from disk " << curl << " -> " << res);
1158
1159 return 0;
1160}

References Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, stat, TRACE, and XrdOssOK.

Here is the call graph for this function:

◆ TheOne()

const Cache & Cache::TheOne ( )
static

Definition at line 164 of file XrdPfc.cc.

164{ return *m_instance; }

References Cache().

Here is the call graph for this function:

◆ Unlink()

int Cache::Unlink ( const char * curl)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information.

Reimplemented from XrdOucCache.

Definition at line 1169 of file XrdPfc.cc.

1170{
1171 XrdCl::URL url(curl);
1172 std::string f_name = url.GetPath();
1173
1174 // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1175
1176 return UnlinkFile(f_name, false);
1177}

References XrdCl::URL::GetPath(), and UnlinkFile().

Here is the call graph for this function:

◆ UnlinkFile()

int Cache::UnlinkFile ( const std::string & f_name,
bool fail_if_open )

Remove cinfo and data files from cache.

Definition at line 1179 of file XrdPfc.cc.

1180{
1181 ActiveMap_i it;
1182 File *file = 0;
1183 {
1184 XrdSysCondVarHelper lock(&m_active_cond);
1185
1186 it = m_active.find(f_name);
1187
1188 if (it != m_active.end())
1189 {
1190 if (fail_if_open)
1191 {
1192 TRACE(Info, "UnlinkCommon " << f_name << ", file currently open and force not requested - denying request");
1193 return -EBUSY;
1194 }
1195
1196 // Null File* in m_active map means an operation is ongoing, probably
1197 // Attach() with possible File::Open(). Ask for retry.
1198 if (it->second == 0)
1199 {
1200 TRACE(Info, "UnlinkCommon " << f_name << ", an operation on this file is ongoing - denying request");
1201 return -EAGAIN;
1202 }
1203
1204 file = it->second;
1206 it->second = 0;
1207 }
1208 else
1209 {
1210 it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1211 }
1212 }
1213
1214 if (file)
1215 {
1217 }
1218
1219 std::string i_name = f_name + Info::s_infoExtension;
1220
1221 // Unlink file & cinfo
1222 int f_ret = m_oss->Unlink(f_name.c_str());
1223 int i_ret = m_oss->Unlink(i_name.c_str());
1224
1225 TRACE(Debug, "UnlinkCommon " << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1226
1227 {
1228 XrdSysCondVarHelper lock(&m_active_cond);
1229
1230 m_active.erase(it);
1231 }
1232
1233 return std::min(f_ret, i_ret);
1234}
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition XrdPfc.cc:275
void initiate_emergency_shutdown()

References Debug, XrdPfc::File::initiate_emergency_shutdown(), RemoveWriteQEntriesFor(), XrdPfc::Info::s_infoExtension, and TRACE.

Referenced by ExecuteCommandUrl(), XrdPfc::File::Sync(), and Unlink().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ VCheck()

bool XrdPfc::Cache::VCheck ( XrdVersionInfo & urVersion)
inlinestatic

Version check.

Definition at line 346 of file XrdPfc.hh.

346{ return true; }

◆ WriteFileSizeXAttr()

void Cache::WriteFileSizeXAttr ( int cinfo_fd,
long long file_size )

Definition at line 913 of file XrdPfc.cc.

914{
915 if (m_metaXattr) {
916 int res = XrdSysXAttrActive->Set("pfc.fsize", &file_size, sizeof(long long), 0, cinfo_fd, 0);
917 if (res != 0) {
918 TRACE(Debug, "WriteFileSizeXAttr error setting xattr " << res);
919 }
920 }
921}
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0

References Debug, TRACE, and XrdSysXAttrActive.

Member Data Documentation

◆ schedP

XrdScheduler * Cache::schedP = 0
static

Definition at line 408 of file XrdPfc.hh.

Referenced by XrdPfc::IO::Detach(), Prepare(), and XrdOucGetCache().


The documentation for this class was generated from the following files: