Skip to content

Commit 0eecd8a

Browse files
authored
Merge pull request #31 from LightBitsLabs/anton/procstat_aggregator3
procstat aggregator
2 parents 4a510ea + 98ff8da commit 0eecd8a

File tree

2 files changed

+277
-7
lines changed

2 files changed

+277
-7
lines changed

src/procstat.c

Lines changed: 270 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,11 @@ enum {
5858
STATS_ENTRY_FLAG_REGISTERED = 1 << 0,
5959
STATS_ENTRY_FLAG_DIR = 1 << 1,
6060
STATS_ENTRY_FLAG_HISTOGRAM = 1 << 2,
61+
STATS_ENTRY_FLAG_AGGREGATOR = 1 << 3,
6162
};
6263

64+
#define SERIES_RESET_CLOCK CLOCK_MONOTONIC_COARSE
65+
6366
#define ATTRIBUTES_TIMEOUT_SEC (60.0 * 60)
6467
#define DNAME_INLINE_LEN 32
6568
struct procstat_dynamic_name {
@@ -379,6 +382,8 @@ static void fuse_readdir(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off,
379382

380383
if (!item_registered(iter))
381384
continue;
385+
if (iter->flags & STATS_ENTRY_FLAG_AGGREGATOR)
386+
continue;
382387
memset(&stat, 0, sizeof(stat));
383388
fname = procstat_item_name(iter);
384389
fill_item_stats(context, iter, &stat);
@@ -425,6 +430,7 @@ static bool allowed_open(struct procstat_item *item, struct fuse_file_info *fi)
425430
struct read_struct {
426431
ssize_t size;
427432
char buffer[READ_BUFFER_SIZE];
433+
void *ext;
428434
};
429435

430436
static void fuse_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi)
@@ -449,12 +455,15 @@ static void fuse_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi)
449455
if (!allowed_open(item, fi))
450456
goto out_locked;
451457

458+
read_buffer->ext = NULL;
452459
fi->fh = (uint64_t)read_buffer;
453460

454461
/* we dont know size of file in advance so use directio*/
455462
fi->direct_io = true;
456463

457464
++item->refcnt;
465+
if (item->flags & STATS_ENTRY_FLAG_AGGREGATOR)
466+
++item->parent->base.refcnt;
458467

459468
pthread_mutex_unlock(&context->global_lock);
460469
fuse_reply_open(req, fi);
@@ -467,12 +476,238 @@ static void fuse_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi)
467476
fuse_reply_err(req, ret);
468477
}
469478

479+
struct out_stream {
480+
char *buf;
481+
size_t size;
482+
size_t total;
483+
unsigned lines;
484+
unsigned discard_lines;
485+
};
486+
487+
#define AGGR_EXTRA_BYTES 0
488+
489+
struct aggregator_context {
490+
struct list_head *current;
491+
size_t off;
492+
unsigned discard_lines; /* from the front of the current */
493+
};
494+
495+
struct aggregator_struct {
496+
struct aggregator_context c;
497+
size_t buf_size;
498+
char buffer[0];
499+
};
500+
501+
#define MAX_PATH_LEN 120
502+
static int out_item(struct out_stream *out, char *path, struct procstat_item *item)
503+
{
504+
const char *fname;
505+
int len;
506+
int ret = 0;
507+
508+
fname = procstat_item_name(item);
509+
if (!item_type_directory(item)) {
510+
struct procstat_file *file = container_of(item, struct procstat_file, base);
511+
int space = out->size - out->total;
512+
size_t total = out->total;
513+
514+
if (!file->fmt)
515+
return 0; /* skipping write-only files */
516+
if (out->discard_lines) {
517+
--out->discard_lines;
518+
/*
519+
* Count the discarded lines, so in case we run out of the buffer space
520+
* on the first root directory item, the new discard_lines value will
521+
* include the previous one and the number of newly generated lines.
522+
*/
523+
++out->lines;
524+
return 0;
525+
}
526+
len = snprintf(&out->buf[total], space, "%s/%s:", path, fname);
527+
total += len > space ? space : len;
528+
if (len > space)
529+
return -1;
530+
space = out->size - total;
531+
if (!space)
532+
return -1;
533+
len = file->fmt(file->private, file->arg, &out->buf[total], space);
534+
total += len > space ? space : len;
535+
if (len > space)
536+
return -1;
537+
if (len == space)
538+
/* exact fit: snprintf clobbers the last char (\n) with a 0: restore it */
539+
out->buf[total - 1] = '\n';
540+
/* Now the line is fully generated */
541+
out->total = total;
542+
++out->lines;
543+
} else {
544+
/* directory walk */
545+
struct procstat_item *child;
546+
int path_len = strlen(path);
547+
int pos = path_len;
548+
int p_space = MAX_PATH_LEN - path_len;
549+
struct procstat_directory *dir = container_of(item, struct procstat_directory, base);
550+
551+
/* See fuse_read(): it is unsafe to read files under a directory that is marked unregistered */
552+
if (!item_registered(item))
553+
return 0;
554+
555+
if (pos && p_space) {
556+
path[pos++] = '/';
557+
--p_space;
558+
}
559+
strncpy(path + pos, fname, p_space);
560+
path[MAX_PATH_LEN - 1] = 0;
561+
562+
list_for_each_entry(child, &dir->children, entry) {
563+
ret = out_item(out, path, child);
564+
if (ret)
565+
break;
566+
}
567+
path[path_len] = 0;
568+
}
569+
570+
return ret;
571+
}
572+
573+
static void aggregator_read(fuse_req_t req, struct procstat_file *file, struct read_struct *rs, size_t size, off_t off)
574+
{
575+
struct aggregator_struct *as = (struct aggregator_struct *)rs->ext;
576+
struct procstat_directory *dir = file->base.parent;
577+
struct list_head *last = &dir->children;
578+
struct list_head *self = &file->base.entry;
579+
struct out_stream out;
580+
char path[MAX_PATH_LEN];
581+
582+
struct procstat_context *context = request_context(req);
583+
584+
if (!as || (as->buf_size < size + AGGR_EXTRA_BYTES)) {
585+
struct aggregator_context c;
586+
587+
if (!as) {
588+
c.current = NULL;
589+
c.discard_lines = 0;
590+
c.off = 0;
591+
} else {
592+
c = as->c;
593+
free(as);
594+
}
595+
596+
as = malloc(sizeof(*as) + size + AGGR_EXTRA_BYTES);
597+
if (!as) {
598+
fuse_reply_buf(req, NULL, 0);
599+
return;
600+
}
601+
as->c = c;
602+
as->buf_size = size + AGGR_EXTRA_BYTES;
603+
rs->ext = (void *)as;
604+
}
605+
606+
if (as->c.current == last) {
607+
fuse_reply_buf(req, NULL, 0);
608+
return;
609+
}
610+
611+
out.buf = &as->buffer[0];
612+
out.total = 0;
613+
out.size = size;
614+
out.discard_lines = as->c.discard_lines;
615+
as->c.discard_lines = 0;
616+
617+
if (off != as->c.off) {
618+
/* we do not support non-sequential read */
619+
out.total = sprintf(&out.buf[0], "Unexpected offset %ld wanted %ld size %ld\n", off, as->c.off, size);
620+
as->c.current = last;
621+
fuse_reply_buf(req, &out.buf[0], out.total);
622+
return;
623+
}
624+
625+
/*
626+
* While the aggregator node is open, the node and the parent directory node cannot be freed, so setting "last" above was safe.
627+
*/
628+
pthread_mutex_lock(&context->global_lock);
629+
630+
if (!as->c.current) {
631+
as->c.current = dir->children.next;
632+
} else if (as->c.current != last) {
633+
struct procstat_item *current = container_of(as->c.current, struct procstat_item, entry);
634+
--current->refcnt;
635+
/* If this node has been deleted it is removed from parent's children list */
636+
if (list_empty(&current->entry)) {
637+
as->c.current = last;
638+
}
639+
}
640+
641+
for (; as->c.current != last; as->c.current = as->c.current->next) {
642+
struct procstat_item *item;
643+
int ret;
644+
unsigned start_line = out.lines;
645+
646+
if (as->c.current == self)
647+
continue;
648+
649+
item = container_of(as->c.current, struct procstat_item, entry);
650+
path[0] = 0;
651+
ret = out_item(&out, path, item);
652+
if (ret) {
653+
/* out.total marks the end of the last complete line generated */
654+
if (out.total && (out.total < out.size)) {
655+
/* pad spaces at the end of the last complete line to the buffer end */
656+
memset(&out.buf[out.total - 1], ' ', out.size - out.total);
657+
out.total = out.size;
658+
out.buf[out.total - 1] = '\n';
659+
}
660+
as->c.discard_lines = out.lines - start_line; /* lines from current successfully written */
661+
break;
662+
}
663+
}
664+
665+
/* Protect the current item from being freed, so we can safely access it next time */
666+
if (as->c.current != last)
667+
++(container_of(as->c.current, struct procstat_item, entry)->refcnt);
668+
669+
as->c.off += out.total;
670+
pthread_mutex_unlock(&context->global_lock);
671+
fuse_reply_buf(req, &out.buf[0], out.total);
672+
}
673+
674+
static void aggregator_release_locked(struct procstat_item *item, struct fuse_file_info *fi)
675+
{
676+
struct read_struct *rs = (struct read_struct *)fi->fh;
677+
678+
if (rs) {
679+
struct aggregator_struct *as = (struct aggregator_struct *)rs->ext;
680+
681+
if (as && as->c.current) {
682+
if (as->c.current != &item->parent->children) {
683+
struct procstat_item *current = container_of(as->c.current, struct procstat_item, entry);
684+
685+
--current->refcnt;
686+
}
687+
}
688+
}
689+
--item->parent->base.refcnt;
690+
}
691+
470692
static void fuse_read(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, struct fuse_file_info *fi)
471693
{
472694
struct read_struct *read_buffer = (struct read_struct *)fi->fh;
473695
struct procstat_file *file = fuse_inode_to_file(ino);
474696

475-
if (!file->fmt) {
697+
if (file->base.flags & STATS_ENTRY_FLAG_AGGREGATOR) {
698+
aggregator_read(req, file, read_buffer, size, off);
699+
return;
700+
}
701+
702+
/*
703+
* An item unregistered via procstat_remove may still be reached here: the item itself has refcnt held from fuse_open.
704+
* If so, the owner may have freed the item stat memory, which is still ok to read.
705+
* HOWEVER, writing to this memory is prohibited and may cause memory corruption.
706+
* Write is possible only for series (see is_reset() and clear_values_...).
707+
* The item itself may not be marked as unregistered (refcnt != 0 in item_put_locked),
708+
* but since series are removed by directory we can rely on parent being marked as unregistered by procstat_remove().
709+
*/
710+
if (!file->fmt || !file->base.parent || !item_registered(&file->base.parent->base)) {
476711
fuse_reply_buf(req, NULL, 0);
477712
return;
478713
}
@@ -753,14 +988,36 @@ int procstat_create_simple(struct procstat_context *context,
753988
return -1;
754989
}
755990

991+
int procstat_create_aggregator(struct procstat_context *context,
992+
struct procstat_item *parent,
993+
const char *name)
994+
{
995+
parent = parent_or_root(context, parent);
996+
if (!parent) {
997+
errno = EINVAL;
998+
return -1;
999+
}
1000+
1001+
struct procstat_file *file;
1002+
1003+
file = create_file(context, (struct procstat_directory *)parent,
1004+
name, NULL, NULL, NULL);
1005+
if (!file)
1006+
return -1;
1007+
1008+
file->base.flags |= STATS_ENTRY_FLAG_AGGREGATOR;
1009+
1010+
return 0;
1011+
}
1012+
7561013
bool is_reset(struct reset_info* reset)
7571014
{
7581015
unsigned reset_requested = 0;
7591016
bool reset_interval_expired = false;
7601017
uint64_t reset_interval, time_since_last_reset;
7611018

7621019
struct timespec cur_time;
763-
if (clock_gettime(CLOCK_REALTIME, &cur_time) == 0) {
1020+
if (clock_gettime(SERIES_RESET_CLOCK, &cur_time) == 0) {
7641021
time_since_last_reset = cur_time.tv_sec - reset->last_reset_time;
7651022
reset_interval = __atomic_load_n(&reset->reset_interval, __ATOMIC_RELAXED);
7661023
if ((reset_interval) && (time_since_last_reset > reset_interval)) {
@@ -880,7 +1137,7 @@ static ssize_t series_u64_read(void *object, uint64_t arg, char *buffer, size_t
8801137
return -1;
8811138
}
8821139
write_zero:
883-
return snprintf(buffer, len, "0");
1140+
return snprintf(buffer, len, "0\n");
8841141
write_var:
8851142
return procstat_format_u64_decimal(data_ptr, arg, buffer, len);
8861143

@@ -971,7 +1228,7 @@ int procstat_create_u64_series(struct procstat_context *context, struct procstat
9711228
}
9721229

9731230
struct timespec cur_time;
974-
if (clock_gettime(CLOCK_REALTIME, &cur_time) == 0) {
1231+
if (clock_gettime(SERIES_RESET_CLOCK, &cur_time) == 0) {
9751232
series->reset.last_reset_time = cur_time.tv_sec;
9761233
} else {
9771234
series->reset.last_reset_time = 0;
@@ -1128,10 +1385,16 @@ static void fuse_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *
11281385
struct procstat_item *item = fuse_inode_to_item(request_context(req), ino);
11291386

11301387
pthread_mutex_lock(&context->global_lock);
1388+
if (item->flags & STATS_ENTRY_FLAG_AGGREGATOR)
1389+
aggregator_release_locked(item, fi);
11311390
if (--item->refcnt == 0)
11321391
free_item(item);
11331392
pthread_mutex_unlock(&context->global_lock);
1134-
free((void *)fi->fh);
1393+
if (fi->fh) {
1394+
struct read_struct *fh = (struct read_struct *)fi->fh;
1395+
free(fh->ext);
1396+
free(fh);
1397+
}
11351398
fuse_reply_err(req, 0);
11361399
}
11371400

@@ -1335,7 +1598,7 @@ static ssize_t histogram_u32_series_read(void *object, uint64_t arg, char *buffe
13351598
return -1;
13361599
}
13371600
write_zero:
1338-
return snprintf(buffer, len, "0");
1601+
return snprintf(buffer, len, "0\n");
13391602
write_var:
13401603
return procstat_format_u64_decimal(data_ptr, arg, buffer, len);
13411604
}
@@ -1435,7 +1698,7 @@ int procstat_create_histogram_u32_series(struct procstat_context *context, struc
14351698
}
14361699

14371700
struct timespec cur_time;
1438-
if (clock_gettime(CLOCK_REALTIME, &cur_time) == 0) {
1701+
if (clock_gettime(SERIES_RESET_CLOCK, &cur_time) == 0) {
14391702
series->reset.last_reset_time = cur_time.tv_sec;
14401703
} else {
14411704
series->reset.last_reset_time = 0;

src/procstat.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,13 @@ int procstat_create_simple(struct procstat_context *context,
154154
struct procstat_simple_handle *descriptors,
155155
size_t descriptors_len);
156156

157+
/**
158+
* @brief creates a file that on read outputs the contents of the entire directory tree.
159+
* @return 0 on success, -1 in case of failure and errno will be set accordingly
160+
*/
161+
int procstat_create_aggregator(struct procstat_context *context,
162+
struct procstat_item *parent,
163+
const char *name);
157164

158165

159166
#define DEFINE_PROCSTAT_FORMATTER(__type, __fmt, __fmt_name)\

0 commit comments

Comments
 (0)