Merge pull request #11121 from poettering/daemon-reload-race-fix

daemon reload race fix
This commit is contained in:
Lennart Poettering 2018-12-12 13:47:07 +01:00 committed by GitHub
commit c108ee33bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 130 additions and 148 deletions

View File

@ -89,7 +89,7 @@ void job_unlink(Job *j) {
j->timer_event_source = sd_event_source_unref(j->timer_event_source);
}
void job_free(Job *j) {
Job* job_free(Job *j) {
assert(j);
assert(!j->installed);
assert(!j->transaction_prev);
@ -102,7 +102,7 @@ void job_free(Job *j) {
sd_bus_track_unref(j->bus_track);
strv_free(j->deserialized_clients);
free(j);
return mfree(j);
}
static void job_set_state(Job *j, JobState state) {
@ -151,7 +151,7 @@ void job_uninstall(Job *j) {
unit_add_to_gc_queue(j->unit);
hashmap_remove(j->manager->jobs, UINT32_TO_PTR(j->id));
hashmap_remove_value(j->manager->jobs, UINT32_TO_PTR(j->id), j);
j->installed = false;
}
@ -244,23 +244,28 @@ Job* job_install(Job *j) {
int job_install_deserialized(Job *j) {
Job **pj;
int r;
assert(!j->installed);
if (j->type < 0 || j->type >= _JOB_TYPE_MAX_IN_TRANSACTION)
return log_debug_errno(SYNTHETIC_ERRNO(EINVAL),
return log_unit_debug_errno(j->unit, SYNTHETIC_ERRNO(EINVAL),
"Invalid job type %s in deserialization.",
strna(job_type_to_string(j->type)));
pj = (j->type == JOB_NOP) ? &j->unit->nop_job : &j->unit->job;
if (*pj) {
log_unit_debug(j->unit, "Unit already has a job installed. Not installing deserialized job.");
return -EEXIST;
}
if (*pj)
return log_unit_debug_errno(j->unit, SYNTHETIC_ERRNO(EEXIST),
"Unit already has a job installed. Not installing deserialized job.");
r = hashmap_put(j->manager->jobs, UINT32_TO_PTR(j->id), j);
if (r == -EEXIST)
return log_unit_debug_errno(j->unit, r, "Job ID %" PRIu32 " already used, cannot deserialize job.", j->id);
if (r < 0)
return log_unit_debug_errno(j->unit, r, "Failed to insert job into jobs hash table: %m");
*pj = j;
j->installed = true;
j->reloaded = true;
if (j->state == JOB_RUNNING)
j->unit->manager->n_running_jobs++;
@ -968,19 +973,6 @@ static void job_fail_dependencies(Unit *u, UnitDependency d) {
}
}
static int job_save_pending_finished_job(Job *j) {
int r;
assert(j);
r = set_ensure_allocated(&j->manager->pending_finished_jobs, NULL);
if (r < 0)
return r;
job_unlink(j);
return set_put(j->manager->pending_finished_jobs, j);
}
int job_finish_and_invalidate(Job *j, JobResult result, bool recursive, bool already) {
Unit *u;
Unit *other;
@ -1020,11 +1012,7 @@ int job_finish_and_invalidate(Job *j, JobResult result, bool recursive, bool alr
j->manager->n_failed_jobs++;
job_uninstall(j);
/* Keep jobs started before the reload to send singal later, free all others */
if (!MANAGER_IS_RELOADING(j->manager) ||
!j->reloaded ||
job_save_pending_finished_job(j) < 0)
job_free(j);
job_free(j);
/* Fail depending jobs on failure */
if (result != JOB_DONE && recursive) {

View File

@ -156,13 +156,12 @@ struct Job {
bool irreversible:1;
bool in_gc_queue:1;
bool ref_by_private_bus:1;
bool reloaded:1;
};
Job* job_new(Unit *unit, JobType type);
Job* job_new_raw(Unit *unit);
void job_unlink(Job *job);
void job_free(Job *job);
Job* job_free(Job *job);
Job* job_install(Job *j);
int job_install_deserialized(Job *j);
void job_uninstall(Job *j);
@ -223,6 +222,8 @@ void job_add_to_gc_queue(Job *j);
int job_get_before(Job *j, Job*** ret);
int job_get_after(Job *j, Job*** ret);
DEFINE_TRIVIAL_CLEANUP_FUNC(Job*, job_free);
const char* job_type_to_string(JobType t) _const_;
JobType job_type_from_string(const char *s) _pure_;

View File

@ -3464,17 +3464,6 @@ int manager_deserialize(Manager *m, FILE *f, FDSet *fds) {
return manager_deserialize_units(m, f, fds);
}
static void manager_flush_finished_jobs(Manager *m) {
Job *j;
while ((j = set_steal_first(m->pending_finished_jobs))) {
bus_job_send_removed_signal(j);
job_free(j);
}
m->pending_finished_jobs = set_free(m->pending_finished_jobs);
}
int manager_reload(Manager *m) {
_cleanup_(manager_reloading_stopp) Manager *reloading = NULL;
_cleanup_fdset_free_ FDSet *fds = NULL;
@ -3560,9 +3549,6 @@ int manager_reload(Manager *m) {
manager_ready(m);
if (!MANAGER_IS_RELOADING(m))
manager_flush_finished_jobs(m);
m->send_reloading_done = true;
return 0;
}

View File

@ -337,9 +337,6 @@ struct Manager {
/* non-zero if we are reloading or reexecuting, */
int n_reloading;
/* A set which contains all jobs that started before reload and finished
* during it */
Set *pending_finished_jobs;
unsigned n_installed_jobs;
unsigned n_failed_jobs;

View File

@ -2328,8 +2328,74 @@ static void unit_emit_audit_stop(Unit *u, UnitActiveState state) {
}
}
static bool unit_process_job(Job *j, UnitActiveState ns, UnitNotifyFlags flags) {
bool unexpected = false;
assert(j);
if (j->state == JOB_WAITING)
/* So we reached a different state for this job. Let's see if we can run it now if it failed previously
* due to EAGAIN. */
job_add_to_run_queue(j);
/* Let's check whether the unit's new state constitutes a finished job, or maybe contradicts a running job and
* hence needs to invalidate jobs. */
switch (j->type) {
case JOB_START:
case JOB_VERIFY_ACTIVE:
if (UNIT_IS_ACTIVE_OR_RELOADING(ns))
job_finish_and_invalidate(j, JOB_DONE, true, false);
else if (j->state == JOB_RUNNING && ns != UNIT_ACTIVATING) {
unexpected = true;
if (UNIT_IS_INACTIVE_OR_FAILED(ns))
job_finish_and_invalidate(j, ns == UNIT_FAILED ? JOB_FAILED : JOB_DONE, true, false);
}
break;
case JOB_RELOAD:
case JOB_RELOAD_OR_START:
case JOB_TRY_RELOAD:
if (j->state == JOB_RUNNING) {
if (ns == UNIT_ACTIVE)
job_finish_and_invalidate(j, (flags & UNIT_NOTIFY_RELOAD_FAILURE) ? JOB_FAILED : JOB_DONE, true, false);
else if (!IN_SET(ns, UNIT_ACTIVATING, UNIT_RELOADING)) {
unexpected = true;
if (UNIT_IS_INACTIVE_OR_FAILED(ns))
job_finish_and_invalidate(j, ns == UNIT_FAILED ? JOB_FAILED : JOB_DONE, true, false);
}
}
break;
case JOB_STOP:
case JOB_RESTART:
case JOB_TRY_RESTART:
if (UNIT_IS_INACTIVE_OR_FAILED(ns))
job_finish_and_invalidate(j, JOB_DONE, true, false);
else if (j->state == JOB_RUNNING && ns != UNIT_DEACTIVATING) {
unexpected = true;
job_finish_and_invalidate(j, JOB_FAILED, true, false);
}
break;
default:
assert_not_reached("Job type unknown");
}
return unexpected;
}
void unit_notify(Unit *u, UnitActiveState os, UnitActiveState ns, UnitNotifyFlags flags) {
bool unexpected;
const char *reason;
Manager *m;
@ -2373,81 +2439,18 @@ void unit_notify(Unit *u, UnitActiveState os, UnitActiveState ns, UnitNotifyFlag
unit_update_on_console(u);
if (u->job) {
unexpected = false;
if (u->job->state == JOB_WAITING)
/* So we reached a different state for this
* job. Let's see if we can run it now if it
* failed previously due to EAGAIN. */
job_add_to_run_queue(u->job);
/* Let's check whether this state change constitutes a
* finished job, or maybe contradicts a running job and
* hence needs to invalidate jobs. */
switch (u->job->type) {
case JOB_START:
case JOB_VERIFY_ACTIVE:
if (UNIT_IS_ACTIVE_OR_RELOADING(ns))
job_finish_and_invalidate(u->job, JOB_DONE, true, false);
else if (u->job->state == JOB_RUNNING && ns != UNIT_ACTIVATING) {
unexpected = true;
if (UNIT_IS_INACTIVE_OR_FAILED(ns))
job_finish_and_invalidate(u->job, ns == UNIT_FAILED ? JOB_FAILED : JOB_DONE, true, false);
}
break;
case JOB_RELOAD:
case JOB_RELOAD_OR_START:
case JOB_TRY_RELOAD:
if (u->job->state == JOB_RUNNING) {
if (ns == UNIT_ACTIVE)
job_finish_and_invalidate(u->job, (flags & UNIT_NOTIFY_RELOAD_FAILURE) ? JOB_FAILED : JOB_DONE, true, false);
else if (!IN_SET(ns, UNIT_ACTIVATING, UNIT_RELOADING)) {
unexpected = true;
if (UNIT_IS_INACTIVE_OR_FAILED(ns))
job_finish_and_invalidate(u->job, ns == UNIT_FAILED ? JOB_FAILED : JOB_DONE, true, false);
}
}
break;
case JOB_STOP:
case JOB_RESTART:
case JOB_TRY_RESTART:
if (UNIT_IS_INACTIVE_OR_FAILED(ns))
job_finish_and_invalidate(u->job, JOB_DONE, true, false);
else if (u->job->state == JOB_RUNNING && ns != UNIT_DEACTIVATING) {
unexpected = true;
job_finish_and_invalidate(u->job, JOB_FAILED, true, false);
}
break;
default:
assert_not_reached("Job type unknown");
}
} else
unexpected = true;
if (!MANAGER_IS_RELOADING(m)) {
bool unexpected;
/* If this state change happened without being
* requested by a job, then let's retroactively start
* or stop dependencies. We skip that step when
* deserializing, since we don't want to create any
* additional jobs just because something is already
* activated. */
/* Let's propagate state changes to the job */
if (u->job)
unexpected = unit_process_job(u->job, ns, flags);
else
unexpected = true;
/* If this state change happened without being requested by a job, then let's retroactively start or
* stop dependencies. We skip that step when deserializing, since we don't want to create any
* additional jobs just because something is already activated. */
if (unexpected) {
if (UNIT_IS_INACTIVE_OR_FAILED(os) && UNIT_IS_ACTIVE_OR_ACTIVATING(ns))
@ -3288,6 +3291,29 @@ int unit_serialize(Unit *u, FILE *f, FDSet *fds, bool serialize_jobs) {
return 0;
}
static int unit_deserialize_job(Unit *u, FILE *f) {
_cleanup_(job_freep) Job *j = NULL;
int r;
assert(u);
assert(f);
j = job_new_raw(u);
if (!j)
return log_oom();
r = job_deserialize(j, f);
if (r < 0)
return r;
r = job_install_deserialized(j);
if (r < 0)
return r;
TAKE_PTR(j);
return 0;
}
int unit_deserialize(Unit *u, FILE *f, FDSet *fds) {
int r;
@ -3321,32 +3347,11 @@ int unit_deserialize(Unit *u, FILE *f, FDSet *fds) {
if (streq(l, "job")) {
if (v[0] == '\0') {
/* new-style serialized job */
Job *j;
j = job_new_raw(u);
if (!j)
return log_oom();
r = job_deserialize(j, f);
if (r < 0) {
job_free(j);
/* New-style serialized job */
r = unit_deserialize_job(u, f);
if (r < 0)
return r;
}
r = hashmap_put(u->manager->jobs, UINT32_TO_PTR(j->id), j);
if (r < 0) {
job_free(j);
return r;
}
r = job_install_deserialized(j);
if (r < 0) {
hashmap_remove(u->manager->jobs, UINT32_TO_PTR(j->id));
job_free(j);
return r;
}
} else /* legacy for pre-44 */
} else /* Legacy for pre-44 */
log_unit_warning(u, "Update from too old systemd versions are unsupported, cannot deserialize job: %s", v);
continue;
} else if (streq(l, "state-change-timestamp")) {

View File

@ -433,11 +433,16 @@ typedef struct UnitVTable {
int (*load)(Unit *u);
/* During deserialization we only record the intended state to return to. With coldplug() we actually put the
* deserialized state in effect. This is where unit_notify() should be called to start things up. */
* deserialized state in effect. This is where unit_notify() should be called to start things up. Note that
* this callback is invoked *before* we leave the reloading state of the manager, i.e. *before* we consider the
* reloading to be complete. Thus, this callback should just restore the exact same state for any unit that was
* in effect before the reload, i.e. units should not catch up with changes happened during the reload. That's
* what catchup() below is for. */
int (*coldplug)(Unit *u);
/* This is called shortly after all units' coldplug() call was invoked. It's supposed to catch up state changes
* we missed so far (for example because they took place while we were reloading/reexecing) */
/* This is called shortly after all units' coldplug() call was invoked, and *after* the manager left the
* reloading state. It's supposed to catch up with state changes due to external events we missed so far (for
* example because they took place while we were reloading/reexecing) */
void (*catchup)(Unit *u);
void (*dump)(Unit *u, FILE *f, const char *prefix);