Merge branch '21128-toolbar-context-menu'
[arvados-workbench2.git] / src / store / process-logs-panel / process-logs-panel-actions.ts
index b0ddd7ee11f70dea0c122bb5afb2b4681976ee28..88b56a2c324379c2b9be44c859da7ad9e05eb28e 100644 (file)
@@ -3,28 +3,42 @@
 // SPDX-License-Identifier: AGPL-3.0
 
 import { unionize, ofType, UnionOf } from "common/unionize";
-import { ProcessLogs, getProcessLogsPanelCurrentUuid } from './process-logs-panel';
+import { ProcessLogs } from './process-logs-panel';
 import { LogEventType } from 'models/log';
 import { RootState } from 'store/store';
 import { ServiceRepository } from 'services/services';
 import { Dispatch } from 'redux';
-import { groupBy } from 'lodash';
-import { LogResource } from 'models/log';
-import { LogService } from 'services/log-service/log-service';
-import { ResourceEventMessage } from 'websocket/resource-event-message';
-import { getProcess } from 'store/processes/process';
-import { FilterBuilder } from "services/api/filter-builder";
-import { OrderBuilder } from "services/api/order-builder";
+import { LogFragment, LogService, logFileToLogType } from 'services/log-service/log-service';
+import { Process, getProcess } from 'store/processes/process';
 import { navigateTo } from 'store/navigation/navigation-action';
 import { snackbarActions, SnackbarKind } from 'store/snackbar/snackbar-actions';
+import { CollectionFile, CollectionFileType } from "models/collection-file";
+import { ContainerRequestResource, ContainerRequestState } from "models/container-request";
+
+const SNIPLINE = `================ ✀ ================ ✀ ========= Some log(s) were skipped ========= ✀ ================ ✀ ================`;
+const LOG_TIMESTAMP_PATTERN = /^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{9}Z/;
 
 export const processLogsPanelActions = unionize({
     RESET_PROCESS_LOGS_PANEL: ofType<{}>(),
     INIT_PROCESS_LOGS_PANEL: ofType<{ filters: string[], logs: ProcessLogs }>(),
     SET_PROCESS_LOGS_PANEL_FILTER: ofType<string>(),
-    ADD_PROCESS_LOGS_PANEL_ITEM: ofType<{ logType: string, log: string }>(),
+    ADD_PROCESS_LOGS_PANEL_ITEM: ofType<ProcessLogs>(),
 });
 
+// Max size of logs to fetch in bytes
+const maxLogFetchSize: number = 128 * 1000;
+
+type FileWithProgress = {
+    file: CollectionFile;
+    lastByte: number;
+}
+
+type SortableLine = {
+    logType: LogEventType,
+    timestamp: string;
+    contents: string;
+}
+
 export type ProcessLogsPanelAction = UnionOf<typeof processLogsPanelActions>;
 
 export const setProcessLogsPanelFilter = (filter: string) =>
@@ -32,72 +46,289 @@ export const setProcessLogsPanelFilter = (filter: string) =>
 
 export const initProcessLogsPanel = (processUuid: string) =>
     async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
-        dispatch(processLogsPanelActions.RESET_PROCESS_LOGS_PANEL());
-        const process = getProcess(processUuid)(getState().resources);
-        if (process && process.container) {
-            const logResources = await loadContainerLogs(process.container.uuid, logService);
-            const initialState = createInitialLogPanelState(logResources);
+        let process: Process | undefined;
+        try {
+            dispatch(processLogsPanelActions.RESET_PROCESS_LOGS_PANEL());
+            process = getProcess(processUuid)(getState().resources);
+            if (process?.containerRequest?.uuid) {
+                // Get log file size info
+                const logFiles = await loadContainerLogFileList(process.containerRequest, logService);
+
+                // Populate lastbyte 0 for each file
+                const filesWithProgress = logFiles.map((file) => ({ file, lastByte: 0 }));
+
+                // Fetch array of LogFragments
+                const logLines = await loadContainerLogFileContents(filesWithProgress, logService, process);
+
+                // Populate initial state with filters
+                const initialState = createInitialLogPanelState(logFiles, logLines);
+                dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
+            }
+        } catch (e) {
+            // On error, populate empty state to allow polling to start
+            const initialState = createInitialLogPanelState([], []);
             dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
+            // Only show toast on errors other than 404 since 404 is expected when logs do not exist yet
+            if (e.status !== 404) {
+                dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Error loading process logs', hideDuration: 4000, kind: SnackbarKind.ERROR }));
+            }
+            if (e.status === 404 && process?.containerRequest.state === ContainerRequestState.FINAL) {
+                dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Log collection was trashed or deleted.', hideDuration: 4000, kind: SnackbarKind.WARNING }));
+            }
         }
     };
 
-export const addProcessLogsPanelItem = (message: ResourceEventMessage<{ text: string }>) =>
+export const pollProcessLogs = (processUuid: string) =>
     async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
-        if (PROCESS_PANEL_LOG_EVENT_TYPES.indexOf(message.eventType) > -1) {
-            const uuid = getProcessLogsPanelCurrentUuid(getState().router);
-            if (uuid) {
-                const process = getProcess(uuid)(getState().resources);
-                if (process) {
-                    const { containerRequest, container } = process;
-                    if (message.objectUuid === containerRequest.uuid
-                        || (container && message.objectUuid === container.uuid)) {
-                        dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM({
-                            logType: COMBINED_FILTER_TYPE,
-                            log: message.properties.text
-                        }));
-                        dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM({
-                            logType: message.eventType,
-                            log: message.properties.text
-                        }));
+        try {
+            // Get log panel state and process from store
+            const currentState = getState().processLogsPanel;
+            const process = getProcess(processUuid)(getState().resources);
+
+            // Check if container request is present and initial logs state loaded
+            if (process?.containerRequest?.uuid && Object.keys(currentState.logs).length > 0) {
+                const logFiles = await loadContainerLogFileList(process.containerRequest, logService);
+
+                // Determine byte to fetch from while filtering unchanged files
+                const filesToUpdateWithProgress = logFiles.reduce((acc, updatedFile) => {
+                    // Fetch last byte or 0 for new log files
+                    const currentStateLogLastByte = currentState.logs[logFileToLogType(updatedFile)]?.lastByte || 0;
+
+                    const isNew = !Object.keys(currentState.logs).find((currentStateLogName) => (updatedFile.name.startsWith(currentStateLogName)));
+                    const isChanged = !isNew && currentStateLogLastByte < updatedFile.size;
+
+                    if (isNew || isChanged) {
+                        return acc.concat({ file: updatedFile, lastByte: currentStateLogLastByte });
+                    } else {
+                        return acc;
                     }
+                }, [] as FileWithProgress[]);
+
+                // Perform range request(s) for each file
+                const logFragments = await loadContainerLogFileContents(filesToUpdateWithProgress, logService, process);
+
+                if (logFragments.length) {
+                    // Convert LogFragments to ProcessLogs with All/Main sorting & line-merging
+                    const groupedLogs = groupLogs(logFiles, logFragments);
+                    await dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM(groupedLogs));
                 }
             }
+            return Promise.resolve();
+        } catch (e) {
+            // Remove log when polling error is handled in some way instead of being ignored
+            console.error("Error occurred in pollProcessLogs:", e);
+            return Promise.reject();
         }
     };
 
-const loadContainerLogs = async (containerUuid: string, logService: LogService) => {
-    const requestFilters = new FilterBuilder()
-        .addEqual('object_uuid', containerUuid)
-        .addIn('event_type', PROCESS_PANEL_LOG_EVENT_TYPES)
-        .getFilters();
-    const requestOrder = new OrderBuilder<LogResource>()
-        .addAsc('eventAt')
-        .getOrder();
-    const requestParams = {
-        limit: MAX_AMOUNT_OF_LOGS,
-        filters: requestFilters,
-        order: requestOrder,
-    };
-    const { items } = await logService.list(requestParams);
-    return items;
+const loadContainerLogFileList = async (containerRequest: ContainerRequestResource, logService: LogService) => {
+    const logCollectionContents = await logService.listLogFiles(containerRequest);
+
+    // Filter only root directory files matching log event types which have bytes
+    return logCollectionContents.filter((file): file is CollectionFile => (
+        file.type === CollectionFileType.FILE &&
+        PROCESS_PANEL_LOG_EVENT_TYPES.indexOf(logFileToLogType(file)) > -1 &&
+        file.size > 0
+    ));
 };
 
-const createInitialLogPanelState = (logResources: LogResource[]) => {
-    const allLogs = logsToLines(logResources);
-    const groupedLogResources = groupBy(logResources, log => log.eventType);
-    const groupedLogs = Object
-        .keys(groupedLogResources)
-        .reduce((grouped, key) => ({
-            ...grouped,
-            [key]: logsToLines(groupedLogResources[key])
-        }), {});
-    const filters = [COMBINED_FILTER_TYPE, ...Object.keys(groupedLogs)];
-    const logs = { [COMBINED_FILTER_TYPE]: allLogs, ...groupedLogs };
+/**
+ * Loads the contents of each file from each file's lastByte simultaneously
+ *   while respecting the maxLogFetchSize by requesting the start and end
+ *   of the desired block and inserting a snipline.
+ * @param logFilesWithProgress CollectionFiles with the last byte previously loaded
+ * @param logService
+ * @param process
+ * @returns LogFragment[] containing a single LogFragment corresponding to each input file
+ */
+const loadContainerLogFileContents = async (logFilesWithProgress: FileWithProgress[], logService: LogService, process: Process) => (
+    (await Promise.allSettled(logFilesWithProgress.filter(({ file }) => file.size > 0).map(({ file, lastByte }) => {
+        const requestSize = file.size - lastByte;
+        if (requestSize > maxLogFetchSize) {
+            const chunkSize = Math.floor(maxLogFetchSize / 2);
+            const firstChunkEnd = lastByte + chunkSize - 1;
+            return Promise.all([
+                logService.getLogFileContents(process.containerRequest, file, lastByte, firstChunkEnd),
+                logService.getLogFileContents(process.containerRequest, file, file.size - chunkSize, file.size - 1)
+            ] as Promise<(LogFragment)>[]);
+        } else {
+            return Promise.all([logService.getLogFileContents(process.containerRequest, file, lastByte, file.size - 1)]);
+        }
+    })).then((res) => {
+        if (res.length && res.every(promiseResult => (promiseResult.status === 'rejected'))) {
+            // Since allSettled does not pass promise rejection we throw an
+            //   error if every request failed
+            const error = res.find(
+                (promiseResult): promiseResult is PromiseRejectedResult => promiseResult.status === 'rejected'
+            )?.reason;
+            return Promise.reject(error);
+        }
+        return res.filter((promiseResult): promiseResult is PromiseFulfilledResult<LogFragment[]> => (
+            // Filter out log files with rejected promises
+            //   (Promise.all rejects on any failure)
+            promiseResult.status === 'fulfilled' &&
+            // Filter out files where any fragment is empty
+            //   (prevent incorrect snipline generation or an un-resumable situation)
+            !!promiseResult.value.every(logFragment => logFragment.contents.length)
+        )).map(one => one.value)
+    })).map((logResponseSet) => {
+        // For any multi fragment response set, modify the last line of non-final chunks to include a line break and snip line
+        //   Don't add snip line as a separate line so that sorting won't reorder it
+        for (let i = 1; i < logResponseSet.length; i++) {
+            const fragment = logResponseSet[i - 1];
+            const lastLineIndex = fragment.contents.length - 1;
+            const lastLineContents = fragment.contents[lastLineIndex];
+            const newLastLine = `${lastLineContents}\n${SNIPLINE}`;
+
+            logResponseSet[i - 1].contents[lastLineIndex] = newLastLine;
+        }
+
+        // Merge LogFragment Array (representing multiple log line arrays) into single LogLine[] / LogFragment
+        return logResponseSet.reduce((acc, curr: LogFragment) => ({
+            logType: curr.logType,
+            contents: [...(acc.contents || []), ...curr.contents]
+        }), {} as LogFragment);
+    })
+);
+
+const createInitialLogPanelState = (logFiles: CollectionFile[], logFragments: LogFragment[]): { filters: string[], logs: ProcessLogs } => {
+    const logs = groupLogs(logFiles, logFragments);
+    const filters = Object.keys(logs);
     return { filters, logs };
+}
+
+/**
+ * Converts LogFragments into ProcessLogs, grouping and sorting All/Main logs
+ * @param logFiles
+ * @param logFragments
+ * @returns ProcessLogs for the store
+ */
+const groupLogs = (logFiles: CollectionFile[], logFragments: LogFragment[]): ProcessLogs => {
+    const sortableLogFragments = mergeMultilineLoglines(logFragments);
+
+    const allLogs = mergeSortLogFragments(sortableLogFragments);
+    const mainLogs = mergeSortLogFragments(sortableLogFragments.filter((fragment) => (MAIN_EVENT_TYPES.includes(fragment.logType))));
+
+    const groupedLogs = logFragments.reduce((grouped, fragment) => ({
+        ...grouped,
+        [fragment.logType as string]: { lastByte: fetchLastByteNumber(logFiles, fragment.logType), contents: fragment.contents }
+    }), {});
+
+    return {
+        [MAIN_FILTER_TYPE]: { lastByte: undefined, contents: mainLogs },
+        [ALL_FILTER_TYPE]: { lastByte: undefined, contents: allLogs },
+        ...groupedLogs,
+    }
+};
+
+/**
+ * Checks for non-timestamped log lines and merges them with the previous line, assumes they are multi-line logs
+ *   If there is no previous line (first line has no timestamp), the line is deleted.
+ *   Only used for combined logs that need sorting by timestamp after merging
+ * @param logFragments
+ * @returns Modified LogFragment[]
+ */
+const mergeMultilineLoglines = (logFragments: LogFragment[]) => (
+    logFragments.map((fragment) => {
+        // Avoid altering the original fragment copy
+        let fragmentCopy: LogFragment = {
+            logType: fragment.logType,
+            contents: [...fragment.contents],
+        }
+        // Merge any non-timestamped lines in sortable log types with previous line
+        if (fragmentCopy.contents.length && !NON_SORTED_LOG_TYPES.includes(fragmentCopy.logType)) {
+            for (let i = 0; i < fragmentCopy.contents.length; i++) {
+                const lineContents = fragmentCopy.contents[i];
+                if (!lineContents.match(LOG_TIMESTAMP_PATTERN)) {
+                    // Partial line without timestamp detected
+                    if (i > 0) {
+                        // If not first line, copy line to previous line
+                        const previousLineContents = fragmentCopy.contents[i - 1];
+                        const newPreviousLineContents = `${previousLineContents}\n${lineContents}`;
+                        fragmentCopy.contents[i - 1] = newPreviousLineContents;
+                    }
+                    // Delete the current line and prevent iterating
+                    fragmentCopy.contents.splice(i, 1);
+                    i--;
+                }
+            }
+        }
+        return fragmentCopy;
+    })
+);
+
+/**
+ * Merges log lines of different types and sorts types that contain timestamps (are sortable)
+ * @param logFragments
+ * @returns string[] of merged and sorted log lines
+ */
+const mergeSortLogFragments = (logFragments: LogFragment[]): string[] => {
+    const sortableFragments = logFragments
+        .filter((fragment) => (!NON_SORTED_LOG_TYPES.includes(fragment.logType)));
+
+    const nonSortableLines = fragmentsToLines(logFragments
+        .filter((fragment) => (NON_SORTED_LOG_TYPES.includes(fragment.logType)))
+        .sort((a, b) => (a.logType.localeCompare(b.logType))));
+
+    return [...nonSortableLines, ...sortLogFragments(sortableFragments)];
 };
 
-const logsToLines = (logs: LogResource[]) =>
-    logs.map(({ properties }) => properties.text);
+/**
+ * Performs merge and sort of input log fragment lines
+ * @param logFragments set of sortable log fragments to be merged and sorted
+ * @returns A string array containing all lines, sorted by timestamp and
+ *          preserving line ordering and type grouping when timestamps match
+ */
+const sortLogFragments = (logFragments: LogFragment[]): string[] => {
+    const linesWithType: SortableLine[] = logFragments
+        // Map each logFragment into an array of SortableLine
+        .map((fragment: LogFragment): SortableLine[] => (
+            fragment.contents.map((singleLine: string) => {
+                const timestampMatch = singleLine.match(LOG_TIMESTAMP_PATTERN);
+                const timestamp = timestampMatch && timestampMatch[0] ? timestampMatch[0] : "";
+                return {
+                    logType: fragment.logType,
+                    timestamp: timestamp,
+                    contents: singleLine,
+                };
+            })
+        // Merge each array of SortableLine into single array
+        )).reduce((acc: SortableLine[], lines: SortableLine[]) => (
+            [...acc, ...lines]
+        ), [] as SortableLine[]);
+
+    return linesWithType
+        .sort(sortableLineSortFunc)
+        .map(lineWithType => lineWithType.contents);
+};
+
+/**
+ * Sort func to sort lines
+ *   Preserves original ordering of lines from the same source
+ *   Stably orders lines of differing type but same timestamp
+ *     (produces a block of same-timestamped lines of one type before a block
+ *     of same timestamped lines of another type for readability)
+ *   Sorts all other lines by contents (ie by timestamp)
+ */
+const sortableLineSortFunc = (a: SortableLine, b: SortableLine) => {
+    if (a.logType === b.logType) {
+        return 0;
+    } else if (a.timestamp === b.timestamp) {
+        return a.logType.localeCompare(b.logType);
+    } else {
+        return a.contents.localeCompare(b.contents);
+    }
+};
+
+const fragmentsToLines = (fragments: LogFragment[]): string[] => (
+    fragments.reduce((acc, fragment: LogFragment) => (
+        acc.concat(...fragment.contents)
+    ), [] as string[])
+);
+
+const fetchLastByteNumber = (logFiles: CollectionFile[], key: string) => {
+    return logFiles.find((file) => (file.name.startsWith(key)))?.size
+};
 
 export const navigateToLogCollection = (uuid: string) =>
     async (dispatch: Dispatch<any>, getState: () => RootState, services: ServiceRepository) => {
@@ -105,13 +336,18 @@ export const navigateToLogCollection = (uuid: string) =>
             await services.collectionService.get(uuid);
             dispatch<any>(navigateTo(uuid));
         } catch {
-            dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Could not request collection', hideDuration: 2000, kind: SnackbarKind.ERROR }));
+            dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Log collection was trashed or deleted.', hideDuration: 4000, kind: SnackbarKind.WARNING }));
         }
     };
 
-const MAX_AMOUNT_OF_LOGS = 10000;
+const ALL_FILTER_TYPE = 'All logs';
 
-const COMBINED_FILTER_TYPE = 'All logs';
+const MAIN_FILTER_TYPE = 'Main logs';
+const MAIN_EVENT_TYPES = [
+    LogEventType.CRUNCH_RUN,
+    LogEventType.STDERR,
+    LogEventType.STDOUT,
+];
 
 const PROCESS_PANEL_LOG_EVENT_TYPES = [
     LogEventType.ARV_MOUNT,
@@ -123,4 +359,10 @@ const PROCESS_PANEL_LOG_EVENT_TYPES = [
     LogEventType.STDERR,
     LogEventType.STDOUT,
     LogEventType.CONTAINER,
+    LogEventType.KEEPSTORE,
+];
+
+const NON_SORTED_LOG_TYPES = [
+    LogEventType.NODE_INFO,
+    LogEventType.CONTAINER,
 ];